You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/04/27 00:35:07 UTC

[1/2] hive git commit: HIVE-19124 : implement a basic major compactor for MM tables (Sergey Shelukhin, reviewed by Eugene Koifman and Gopal Vijayaraghavan)

Repository: hive
Updated Branches:
  refs/heads/branch-3 cc0ef469c -> ea18769f0
  refs/heads/master fc425933e -> 0dec5952a


HIVE-19124 : implement a basic major compactor for MM tables (Sergey Shelukhin, reviewed by Eugene Koifman and Gopal Vijayaraghavan)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0dec5952
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0dec5952
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0dec5952

Branch: refs/heads/master
Commit: 0dec5952aaacefb711d69e0e40f8da389c073d5a
Parents: fc42593
Author: sergey <se...@apache.org>
Authored: Thu Apr 26 17:19:33 2018 -0700
Committer: sergey <se...@apache.org>
Committed: Thu Apr 26 17:19:33 2018 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   3 +
 .../hive/ql/txn/compactor/TestCompactor.java    | 261 ++++++++++++--
 .../java/org/apache/hadoop/hive/ql/Driver.java  |  28 +-
 .../org/apache/hadoop/hive/ql/exec/DDLTask.java |   3 +-
 .../hadoop/hive/ql/lockmgr/DbTxnManager.java    |  19 +-
 .../hadoop/hive/ql/lockmgr/DummyTxnManager.java |   2 -
 .../hadoop/hive/ql/lockmgr/HiveTxnManager.java  |   3 +-
 .../apache/hadoop/hive/ql/plan/PlanUtils.java   |   3 +
 .../hive/ql/txn/compactor/CompactorMR.java      | 337 ++++++++++++++++++-
 .../hadoop/hive/ql/txn/compactor/Initiator.java |  25 +-
 .../hadoop/hive/ql/txn/compactor/Worker.java    |   6 +-
 .../hive/ql/TestTxnCommandsForMmTable.java      |  73 +---
 .../metastore/api/hive_metastoreConstants.java  |   7 +-
 .../hive/metastore/HiveMetaStoreClient.java     |   6 +-
 .../hadoop/hive/metastore/IMetaStoreClient.java |   4 +-
 .../hadoop/hive/metastore/txn/TxnUtils.java     |  20 +-
 .../HiveMetaStoreClientPreCatalog.java          |   4 +-
 .../hive/common/ValidReaderWriteIdList.java     |   4 +
 18 files changed, 654 insertions(+), 154 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/0dec5952/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index f40c606..049a594 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2531,6 +2531,9 @@ public class HiveConf extends Configuration {
         new TimeValidator(TimeUnit.MILLISECONDS), "Time between runs of the cleaner thread"),
     COMPACTOR_JOB_QUEUE("hive.compactor.job.queue", "", "Used to specify name of Hadoop queue to which\n" +
       "Compaction jobs will be submitted.  Set to empty string to let Hadoop choose the queue."),
+
+    HIVE_COMPACTOR_COMPACT_MM("hive.compactor.compact.insert.only", true,
+        "Whether the compactor should compact insert-only tables. A safety switch."),
     /**
      * @deprecated Use MetastoreConf.COMPACTOR_HISTORY_RETENTION_SUCCEEDED
      */

http://git-wip-us.apache.org/repos/asf/hive/blob/0dec5952/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
index 82ba775..4ebd096 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hive.ql.txn.compactor;
 
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -30,11 +31,13 @@ import java.util.TreeSet;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.curator.shaded.com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hive.cli.CliSessionState;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -45,9 +48,11 @@ import org.apache.hadoop.hive.metastore.api.CompactionRequest;
 import org.apache.hadoop.hive.metastore.api.CompactionType;
 import org.apache.hadoop.hive.metastore.api.LongColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
 import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
 import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
@@ -704,16 +709,7 @@ public class TestCompactor {
       // it has an open txn in it
       writeBatch(connection, writer, true);
 
-      // Now, compact
-      TxnStore txnHandler = TxnUtils.getTxnStore(conf);
-      txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MAJOR));
-      Worker t = new Worker();
-      t.setThreadId((int) t.getId());
-      t.setConf(conf);
-      AtomicBoolean stop = new AtomicBoolean(true);
-      AtomicBoolean looped = new AtomicBoolean();
-      t.init(stop, looped);
-      t.run();
+      runMajorCompaction(dbName, tblName);
 
       // Find the location of the table
       IMetaStoreClient msClient = new HiveMetaStoreClient(conf);
@@ -827,16 +823,7 @@ public class TestCompactor {
       txnBatch.abort();
 
 
-      // Now, compact
-      TxnStore txnHandler = TxnUtils.getTxnStore(conf);
-      txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MAJOR));
-      Worker t = new Worker();
-      t.setThreadId((int) t.getId());
-      t.setConf(conf);
-      AtomicBoolean stop = new AtomicBoolean(true);
-      AtomicBoolean looped = new AtomicBoolean();
-      t.init(stop, looped);
-      t.run();
+      runMajorCompaction(dbName, tblName);
 
       // Find the location of the table
       IMetaStoreClient msClient = new HiveMetaStoreClient(conf);
@@ -860,6 +847,228 @@ public class TestCompactor {
     }
   }
 
+
+  @Test
+  public void mmTable() throws Exception {
+    String dbName = "default";
+    String tblName = "mm_nonpart";
+    executeStatementOnDriver("drop table if exists " + tblName, driver);
+    executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) STORED AS ORC" +
+        " TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')",
+        driver);
+    IMetaStoreClient msClient = new HiveMetaStoreClient(conf);
+    Table table = msClient.getTable(dbName, tblName);
+    msClient.close();
+
+    executeStatementOnDriver("INSERT INTO " + tblName +"(a,b) VALUES(1, 'foo')", driver);
+    executeStatementOnDriver("INSERT INTO " + tblName +"(a,b) VALUES(2, 'bar')", driver);
+
+    verifyFooBarResult(tblName, 1);
+
+    // Check that we have two deltas.
+    FileSystem fs = FileSystem.get(conf);
+    verifyDeltaCount(table.getSd(), fs, 2);
+
+    runMajorCompaction(dbName, tblName);
+    verifyFooBarResult(tblName, 1);
+    verifyHasBase(table.getSd(), fs, "base_0000002");
+
+    // Make sure we don't compact if we don't need to compact.
+    runMajorCompaction(dbName, tblName);
+    verifyFooBarResult(tblName, 1);
+    verifyHasBase(table.getSd(), fs, "base_0000002");
+  }
+
+  @Test
+  public void mmTableBucketed() throws Exception {
+    String dbName = "default";
+    String tblName = "mm_nonpart";
+    executeStatementOnDriver("drop table if exists " + tblName, driver);
+    executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) CLUSTERED BY (a) " +
+        "INTO 64 BUCKETS STORED AS ORC TBLPROPERTIES ('transactional'='true', " +
+        "'transactional_properties'='insert_only')", driver);
+    IMetaStoreClient msClient = new HiveMetaStoreClient(conf);
+    Table table = msClient.getTable(dbName, tblName);
+    msClient.close();
+
+    executeStatementOnDriver("INSERT INTO " + tblName +"(a,b) VALUES(1, 'foo')", driver);
+    executeStatementOnDriver("INSERT INTO " + tblName +"(a,b) VALUES(2, 'bar')", driver);
+
+    verifyFooBarResult(tblName, 1);
+
+    // Check that we have two deltas.
+    FileSystem fs = FileSystem.get(conf);
+    verifyDeltaCount(table.getSd(), fs, 2);
+
+    runMajorCompaction(dbName, tblName);
+    verifyFooBarResult(tblName, 1);
+    String baseDir = "base_0000002";
+    verifyHasBase(table.getSd(), fs, baseDir);
+
+    FileStatus[] files = fs.listStatus(new Path(table.getSd().getLocation(), baseDir),
+        AcidUtils.hiddenFileFilter);
+    Assert.assertEquals(Lists.newArrayList(files).toString(), 64, files.length);
+  }
+
+  @Test
+  public void mmTableOpenWriteId() throws Exception {
+    String dbName = "default";
+    String tblName = "mm_nonpart";
+    executeStatementOnDriver("drop table if exists " + tblName, driver);
+    executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) STORED AS TEXTFILE" +
+        " TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')",
+        driver);
+    IMetaStoreClient msClient = new HiveMetaStoreClient(conf);
+    Table table = msClient.getTable(dbName, tblName);
+    msClient.close();
+
+    executeStatementOnDriver("INSERT INTO " + tblName +"(a,b) VALUES(1, 'foo')", driver);
+    executeStatementOnDriver("INSERT INTO " + tblName +"(a,b) VALUES(2, 'bar')", driver);
+
+    verifyFooBarResult(tblName, 1);
+
+    long openTxnId = msClient.openTxn("test");
+    long openWriteId = msClient.allocateTableWriteId(openTxnId, dbName, tblName);
+    Assert.assertEquals(3, openWriteId); // Just check to make sure base_5 below is not new.
+
+    executeStatementOnDriver("INSERT INTO " + tblName +"(a,b) VALUES(1, 'foo')", driver);
+    executeStatementOnDriver("INSERT INTO " + tblName +"(a,b) VALUES(2, 'bar')", driver);
+
+    verifyFooBarResult(tblName, 2);
+
+    runMajorCompaction(dbName, tblName); // Don't compact 4 and 5; 3 is opened.
+    FileSystem fs = FileSystem.get(conf);
+    verifyHasBase(table.getSd(), fs, "base_0000002");
+    verifyDirCount(table.getSd(), fs, 1, AcidUtils.baseFileFilter);
+    verifyFooBarResult(tblName, 2);
+
+    runCleaner(conf);
+    verifyHasDir(table.getSd(), fs, "delta_0000004_0000004_0000", AcidUtils.deltaFileFilter);
+    verifyHasDir(table.getSd(), fs, "delta_0000005_0000005_0000", AcidUtils.deltaFileFilter);
+    verifyFooBarResult(tblName, 2);
+
+    msClient.abortTxns(Lists.newArrayList(openTxnId)); // Now abort 3.
+    runMajorCompaction(dbName, tblName); // Compact 4 and 5.
+    verifyFooBarResult(tblName, 2);
+    verifyHasBase(table.getSd(), fs, "base_0000005"); 
+    runCleaner(conf);
+    verifyDeltaCount(table.getSd(), fs, 0);
+  }
+
+  private void verifyHasBase(
+      StorageDescriptor sd, FileSystem fs, String baseName) throws Exception {
+    verifyHasDir(sd, fs, baseName, AcidUtils.baseFileFilter);
+  }
+
+  private void verifyHasDir(
+      StorageDescriptor sd, FileSystem fs, String name, PathFilter filter) throws Exception {
+    FileStatus[] stat = fs.listStatus(new Path(sd.getLocation()), filter);
+    for (FileStatus file : stat) {
+      if (name.equals(file.getPath().getName())) return;
+    }
+    Assert.fail("Cannot find " + name + ": " + Arrays.toString(stat));
+  }
+
+  private void verifyDeltaCount(
+      StorageDescriptor sd, FileSystem fs, int count) throws Exception {
+    verifyDirCount(sd, fs, count, AcidUtils.deltaFileFilter);
+  }
+
+  private void verifyDirCount(
+      StorageDescriptor sd, FileSystem fs, int count, PathFilter filter) throws Exception {
+    FileStatus[] stat = fs.listStatus(new Path(sd.getLocation()), filter);
+    Assert.assertEquals(Arrays.toString(stat), count, stat.length);
+  }
+
+  @Test
+  public void mmTablePartitioned() throws Exception {
+    String dbName = "default";
+    String tblName = "mm_part";
+    executeStatementOnDriver("drop table if exists " + tblName, driver);
+    executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
+        " PARTITIONED BY(ds int) STORED AS TEXTFILE" +
+        " TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')",
+        driver);
+
+    executeStatementOnDriver("INSERT INTO " + tblName + " partition (ds) VALUES(1, 'foo', 1)", driver);
+    executeStatementOnDriver("INSERT INTO " + tblName + " partition (ds) VALUES(1, 'foo', 1)", driver);
+    executeStatementOnDriver("INSERT INTO " + tblName + " partition (ds) VALUES(2, 'bar', 1)", driver);
+    executeStatementOnDriver("INSERT INTO " + tblName + " partition (ds) VALUES(1, 'foo', 2)", driver);
+    executeStatementOnDriver("INSERT INTO " + tblName + " partition (ds) VALUES(2, 'bar', 2)", driver);
+    executeStatementOnDriver("INSERT INTO " + tblName + " partition (ds) VALUES(2, 'bar', 3)", driver);
+
+    verifyFooBarResult(tblName, 3);
+
+    IMetaStoreClient msClient = new HiveMetaStoreClient(conf);
+    Partition p1 = msClient.getPartition(dbName, tblName, "ds=1"),
+        p2 = msClient.getPartition(dbName, tblName, "ds=2"),
+        p3 = msClient.getPartition(dbName, tblName, "ds=3");
+    msClient.close();
+ 
+    FileSystem fs = FileSystem.get(conf);
+    verifyDeltaCount(p1.getSd(), fs, 3);
+    verifyDeltaCount(p2.getSd(), fs, 2);
+    verifyDeltaCount(p3.getSd(), fs, 1);
+
+    runMajorCompaction(dbName, tblName, "ds=1", "ds=2", "ds=3");
+
+    verifyFooBarResult(tblName, 3);
+    verifyDeltaCount(p3.getSd(), fs, 1);
+    verifyHasBase(p1.getSd(), fs, "base_0000006");
+    verifyHasBase(p2.getSd(), fs, "base_0000006");
+
+    executeStatementOnDriver("INSERT INTO " + tblName + " partition (ds) VALUES(1, 'foo', 2)", driver);
+    executeStatementOnDriver("INSERT INTO " + tblName + " partition (ds) VALUES(2, 'bar', 2)", driver);
+
+    runMajorCompaction(dbName, tblName, "ds=1", "ds=2", "ds=3");
+
+    // Make sure we don't compact if we don't need to compact; but do if we do.
+    verifyFooBarResult(tblName, 4);
+    verifyDeltaCount(p3.getSd(), fs, 1);
+    verifyHasBase(p1.getSd(), fs, "base_0000006");
+    verifyHasBase(p2.getSd(), fs, "base_0000008");
+
+  }
+
+  private void verifyFooBarResult(String tblName, int count) throws Exception, IOException {
+    List<String> valuesReadFromHiveDriver = new ArrayList<String>();
+    executeStatementOnDriver("SELECT a,b FROM " + tblName, driver);
+    driver.getResults(valuesReadFromHiveDriver);
+    Assert.assertEquals(2 * count, valuesReadFromHiveDriver.size());
+    int fooCount = 0, barCount = 0;
+    for (String s : valuesReadFromHiveDriver) {
+      if ("1\tfoo".equals(s)) {
+        ++fooCount;
+      } else if ("2\tbar".equals(s)) {
+        ++barCount;
+      } else {
+        Assert.fail("Unexpected " + s);
+      }
+    }
+    Assert.assertEquals(fooCount, count);
+    Assert.assertEquals(barCount, count);
+  }
+
+  private void runMajorCompaction(
+      String dbName, String tblName, String... partNames) throws MetaException {
+    TxnStore txnHandler = TxnUtils.getTxnStore(conf);
+    Worker t = new Worker();
+    t.setThreadId((int) t.getId());
+    t.setConf(conf);
+    t.init(new AtomicBoolean(true), new AtomicBoolean());
+    if (partNames.length == 0) {
+      txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MAJOR));
+      t.run();
+    } else {
+      for (String partName : partNames) {
+        CompactionRequest cr = new CompactionRequest(dbName, tblName, CompactionType.MAJOR);
+        cr.setPartitionname(partName);
+        txnHandler.compact(cr);
+        t.run();
+      }
+    }
+  }
+
   @Test
   public void majorCompactWhileStreamingForSplitUpdate() throws Exception {
     String dbName = "default";
@@ -885,16 +1094,7 @@ public class TestCompactor {
       // Start a third batch, but don't close it.
       writeBatch(connection, writer, true);
 
-      // Now, compact
-      TxnStore txnHandler = TxnUtils.getTxnStore(conf);
-      txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MAJOR));
-      Worker t = new Worker();
-      t.setThreadId((int) t.getId());
-      t.setConf(conf);
-      AtomicBoolean stop = new AtomicBoolean(true);
-      AtomicBoolean looped = new AtomicBoolean();
-      t.init(stop, looped);
-      t.run();
+      runMajorCompaction(dbName, tblName);
 
       // Find the location of the table
       IMetaStoreClient msClient = new HiveMetaStoreClient(conf);
@@ -1461,6 +1661,7 @@ public class TestCompactor {
       throw new IOException("Failed to execute \"" + cmd + "\". Driver returned: " + cpr);
     }
   }
+
   static void createTestDataFile(String filename, String[] lines) throws IOException {
     FileWriter writer = null;
     try {

http://git-wip-us.apache.org/repos/asf/hive/blob/0dec5952/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index f83bdaf..41ad002 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -46,6 +46,8 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.common.ValidCompactorWriteIdList;
+import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
@@ -61,6 +63,8 @@ import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Schema;
+import org.apache.hadoop.hive.metastore.api.TableValidWriteIds;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.hadoop.hive.ql.cache.results.CacheUsage;
 import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache;
 import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache.CacheEntry;
@@ -209,6 +213,7 @@ public class Driver implements IDriver {
 
   private CacheUsage cacheUsage;
   private CacheEntry usedCacheEntry;
+  private ValidWriteIdList compactionWriteIds = null;
 
   private enum DriverState {
     INITIALIZED,
@@ -540,8 +545,10 @@ public class Driver implements IDriver {
 
     conf.setQueryString(queryStr);
     // FIXME: sideeffect will leave the last query set at the session level
-    SessionState.get().getConf().setQueryString(queryStr);
-    SessionState.get().setupQueryCurrentTimestamp();
+    if (SessionState.get() != null) {
+      SessionState.get().getConf().setQueryString(queryStr);
+      SessionState.get().setupQueryCurrentTimestamp();
+    }
 
     // Whether any error occurred during query compilation. Used for query lifetime hook.
     boolean compileError = false;
@@ -1311,7 +1318,18 @@ public class Driver implements IDriver {
       throw new IllegalStateException("calling recordValidWritsIdss() without initializing ValidTxnList " +
               JavaUtils.txnIdToString(txnMgr.getCurrentTxnId()));
     }
-    ValidTxnWriteIdList txnWriteIds = txnMgr.getValidWriteIds(getTransactionalTableList(plan), txnString);
+    List<String> txnTables = getTransactionalTableList(plan);
+    ValidTxnWriteIdList txnWriteIds = null;
+    if (compactionWriteIds != null) {
+      if (txnTables.size() != 1) {
+        throw new LockException("Unexpected tables in compaction: " + txnTables);
+      }
+      String fullTableName = txnTables.get(0);
+      txnWriteIds = new ValidTxnWriteIdList(0L); // No transaction for the compaction for now.
+      txnWriteIds.addTableValidWriteIdList(compactionWriteIds);
+    } else {
+      txnWriteIds = txnMgr.getValidWriteIds(txnTables, txnString);
+    }
     String writeIdStr = txnWriteIds.toString();
     conf.set(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY, writeIdStr);
     if (plan.getFetchTask() != null) {
@@ -2776,4 +2794,8 @@ public class Driver implements IDriver {
       return false;
     }
   }
+
+  public void setCompactionWriteIds(ValidWriteIdList val) {
+    this.compactionWriteIds = val;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/0dec5952/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index 15e6c34..3141a7e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@ -2724,7 +2724,8 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
     return prop_string;
   }
 
-  private StringBuilder appendSerdeParams(StringBuilder builder, Map<String, String> serdeParam) {
+  public static StringBuilder appendSerdeParams(
+      StringBuilder builder, Map<String, String> serdeParam) {
     serdeParam = new TreeMap<String, String>(serdeParam);
     builder.append("WITH SERDEPROPERTIES ( \n");
     List<String> serdeCols = new ArrayList<String>();

http://git-wip-us.apache.org/repos/asf/hive/blob/0dec5952/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index 7b7fd5d..76569d5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -18,6 +18,8 @@
 package org.apache.hadoop.hive.ql.lockmgr;
 
 import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.curator.shaded.com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
@@ -31,12 +33,14 @@ import org.apache.hive.common.util.ShutdownHookManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.common.ValidCompactorWriteIdList;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.LockComponentBuilder;
 import org.apache.hadoop.hive.metastore.LockRequestBuilder;
 import org.apache.hadoop.hive.metastore.api.*;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.QueryPlan;
@@ -69,7 +73,7 @@ import java.util.concurrent.atomic.AtomicReference;
  * with a single thread accessing it at a time, with the exception of {@link #heartbeat()} method.
  * The later may (usually will) be called from a timer thread.
  * See {@link #getMS()} for more important concurrency/metastore access notes.
- * 
+ *
  * Each statement that the TM (transaction manager) should be aware of should belong to a transaction.
  * Effectively, that means any statement that has side effects.  Exceptions are statements like
  * Show Compactions, Show Tables, Use Database foo, etc.  The transaction is started either
@@ -111,9 +115,9 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
   /**
    * if {@code true} it means current transaction is started via START TRANSACTION which means it cannot
    * include any Operations which cannot be rolled back (drop partition; write to  non-acid table).
-   * If false, it's a single statement transaction which can include any statement.  This is not a 
+   * If false, it's a single statement transaction which can include any statement.  This is not a
    * contradiction from the user point of view who doesn't know anything about the implicit txn
-   * and cannot call rollback (the statement of course can fail in which case there is nothing to 
+   * and cannot call rollback (the statement of course can fail in which case there is nothing to
    * rollback (assuming the statement is well implemented)).
    *
    * This is done so that all commands run in a transaction which simplifies implementation and
@@ -292,7 +296,7 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
   /**
    * Ensures that the current SQL statement is appropriate for the current state of the
    * Transaction Manager (e.g. can call commit unless you called start transaction)
-   * 
+   *
    * Note that support for multi-statement txns is a work-in-progress so it's only supported in
    * HiveConf#HIVE_IN_TEST/HiveConf#TEZ_HIVE_IN_TEST.
    * @param queryPlan
@@ -300,7 +304,7 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
    */
   private void verifyState(QueryPlan queryPlan) throws LockException {
     if(!isTxnOpen()) {
-      throw new LockException("No transaction context for operation: " + queryPlan.getOperationName() + 
+      throw new LockException("No transaction context for operation: " + queryPlan.getOperationName() +
         " for " + getQueryIdWaterMark(queryPlan));
     }
     if(queryPlan.getOperation() == null) {
@@ -820,7 +824,8 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
     assert isTxnOpen();
     assert validTxnList != null && !validTxnList.isEmpty();
     try {
-      return getMS().getValidWriteIds(txnId, tableList, validTxnList);
+      return TxnUtils.createValidTxnWriteIdList(
+          txnId, getMS().getValidWriteIds(tableList, validTxnList));
     } catch (TException e) {
       throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(), e);
     }
@@ -1013,7 +1018,7 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
       throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(), e);
     }
   }
-  
+
   public void replAllocateTableWriteIdsBatch(String dbName, String tableName, String replPolicy,
                                              List<TxnToWriteId> srcTxnToWriteIdList) throws LockException {
     try {

http://git-wip-us.apache.org/repos/asf/hive/blob/0dec5952/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
index 78eedd3..a74670b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
@@ -394,6 +394,4 @@ class DummyTxnManager extends HiveTxnManagerImpl {
     }
     return locks;
   }
-
-
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/0dec5952/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
index ec11fec..f239535 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hive.ql.plan.LockDatabaseDesc;
 import org.apache.hadoop.hive.ql.plan.LockTableDesc;
 import org.apache.hadoop.hive.ql.plan.UnlockDatabaseDesc;
 import org.apache.hadoop.hive.ql.plan.UnlockTableDesc;
+
 import java.util.List;
 
 /**
@@ -253,7 +254,7 @@ public interface HiveTxnManager {
   boolean recordSnapshot(QueryPlan queryPlan);
 
   boolean isImplicitTransactionOpen();
-  
+
   boolean isTxnOpen();
   /**
    * if {@code isTxnOpen()}, returns the currently active transaction ID.

http://git-wip-us.apache.org/repos/asf/hive/blob/0dec5952/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
index dde20ed..056dfa4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
@@ -110,6 +110,9 @@ public final class PlanUtils {
 
   public static TableDesc getDefaultTableDesc(CreateTableDesc directoryDesc,
       String cols, String colTypes ) {
+    // TODO: this should have an option for directory to inherit from the parent table,
+    //       including bucketing and list bucketing, for the use in compaction when the
+    //       latter runs inside a transaction.
     TableDesc ret = getDefaultTableDesc(Integer.toString(Utilities.ctrlaCode), cols,
         colTypes, false);;
     if (directoryDesc == null) {

http://git-wip-us.apache.org/repos/asf/hive/blob/0dec5952/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index b1c2288..b698c84 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -17,13 +17,21 @@
  */
 package org.apache.hadoop.hive.ql.txn.compactor;
 
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.HashSet;
+
+import com.google.common.collect.Lists;
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 import java.util.regex.Matcher;
 
@@ -33,24 +41,43 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.common.StringableMap;
 import org.apache.hadoop.hive.common.ValidCompactorWriteIdList;
+import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
 import org.apache.hadoop.hive.metastore.api.CompactionType;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.SkewedInfo;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.QueryState;
+import org.apache.hadoop.hive.ql.exec.DDLTask;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
 import org.apache.hadoop.hive.ql.io.AcidInputFormat;
 import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.AcidUtils.Directory;
 import org.apache.hadoop.hive.ql.io.HiveInputFormat;
 import org.apache.hadoop.hive.ql.io.IOConstants;
 import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId;
 import org.apache.hadoop.hive.shims.ShimLoader;
@@ -70,7 +97,9 @@ import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.TaskAttemptContext;
 import org.apache.hadoop.mapred.lib.NullOutputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hive.common.util.HiveStringUtils;
 import org.apache.hive.common.util.Ref;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -128,7 +157,7 @@ public class CompactorMR {
     }
 
     job.set(FINAL_LOCATION, sd.getLocation());
-    job.set(TMP_LOCATION, sd.getLocation() + "/" + TMPDIR + "_" + UUID.randomUUID().toString());
+    job.set(TMP_LOCATION, generateTmpPath(sd));
     job.set(INPUT_FORMAT_CLASS_NAME, sd.getInputFormat());
     job.set(OUTPUT_FORMAT_CLASS_NAME, sd.getOutputFormat());
     job.setBoolean(IS_COMPRESSED, sd.isCompressed());
@@ -200,19 +229,17 @@ public class CompactorMR {
    * @param ci CompactionInfo
    * @throws java.io.IOException if the job fails
    */
-  void run(HiveConf conf, String jobName, Table t, StorageDescriptor sd, ValidWriteIdList writeIds,
+  void run(HiveConf conf, String jobName, Table t, Partition p, StorageDescriptor sd, ValidWriteIdList writeIds,
            CompactionInfo ci, Worker.StatsUpdater su, TxnStore txnHandler) throws IOException {
 
     if(conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST) && conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION)) {
       throw new RuntimeException(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION.name() + "=true");
     }
 
-    // For MM tables we don't need to launch MR jobs as there is no compaction needed.
-    // We just need to delete the directories for aborted transactions.
     if (AcidUtils.isInsertOnlyTable(t.getParameters())) {
-      LOG.debug("Going to delete directories for aborted transactions for MM table "
-          + t.getDbName() + "." + t.getTableName());
-      removeFiles(conf, sd.getLocation(), writeIds, t);
+      if (HiveConf.getBoolVar(conf, ConfVars.HIVE_COMPACTOR_COMPACT_MM)) {
+        runMmCompaction(conf, t, p, sd, writeIds, ci);
+      }
       return;
     }
 
@@ -294,6 +321,255 @@ public class CompactorMR {
     su.gatherStats();
   }
 
+  private void runMmCompaction(HiveConf conf, Table t, Partition p,
+      StorageDescriptor sd, ValidWriteIdList writeIds, CompactionInfo ci) throws IOException {
+    LOG.debug("Going to delete directories for aborted transactions for MM table "
+        + t.getDbName() + "." + t.getTableName());
+    AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(sd.getLocation()),
+        conf, writeIds, Ref.from(false), false, t.getParameters());
+    removeFilesForMmTable(conf, dir);
+
+    // Then, actually do the compaction.
+    if (!ci.isMajorCompaction()) {
+      // Not supported for MM tables right now.
+      LOG.info("Not compacting " + sd.getLocation() + "; not a major compaction");
+      return;
+    }
+
+    int deltaCount = dir.getCurrentDirectories().size();
+    if ((deltaCount + (dir.getBaseDirectory() == null ? 0 : 1)) <= 1) {
+      LOG.debug("Not compacting " + sd.getLocation() + "; current base is "
+        + dir.getBaseDirectory() + " and there are " + deltaCount + " deltas");
+      return;
+    }
+    try {
+      String tmpLocation = generateTmpPath(sd);
+      Path baseLocation = new Path(tmpLocation, "_base");
+
+      // Set up the session for driver.
+      conf = new HiveConf(conf);
+      conf.set(ConfVars.HIVE_QUOTEDID_SUPPORT.varname, "column");
+
+      String user = UserGroupInformation.getCurrentUser().getShortUserName();
+      SessionState sessionState = setUpSessionState(conf, user);
+
+      // Note: we could skip creating the table and just add table type stuff directly to the
+      //       "insert overwrite directory" command if there were no bucketing or list bucketing.
+      String tmpPrefix = t.getDbName() + ".tmp_compactor_" + t.getTableName() + "_", tmpTableName;
+      while (true) {
+        tmpTableName = tmpPrefix + System.currentTimeMillis();
+        String query = buildMmCompactionCtQuery(tmpTableName, t,
+            p == null ? t.getSd() : p.getSd(), baseLocation.toString());
+        LOG.info("Compacting a MM table into " + query);
+        try {
+          runOnDriver(conf, user, sessionState, query, null);
+          break;
+        } catch (Exception ex) {
+          Throwable cause = ex;
+          while (cause != null && !(cause instanceof AlreadyExistsException)) {
+            cause = cause.getCause();
+          }
+          if (cause == null) {
+            throw new IOException(ex);
+          }
+        }
+      }
+
+      String query = buildMmCompactionQuery(conf, t, p, tmpTableName);
+      LOG.info("Compacting a MM table via " + query);
+      runOnDriver(conf, user, sessionState, query, writeIds);
+      commitMmCompaction(tmpLocation, sd.getLocation(), conf, writeIds);
+      runOnDriver(conf, user, sessionState, "drop table if exists " + tmpTableName, null);
+    } catch (HiveException e) {
+      LOG.error("Error compacting a MM table", e);
+      throw new IOException(e);
+    }
+  }
+
+  public SessionState setUpSessionState(HiveConf conf, String user) {
+    SessionState sessionState = SessionState.get();
+    if (sessionState == null) {
+      // Note: we assume that workers run on the same threads repeatedly, so we can set up
+      //       the session here and it will be reused without explicitly storing in the worker.
+      sessionState = new SessionState(conf, user);
+      SessionState.setCurrentSessionState(sessionState);
+    }
+    return sessionState;
+  }
+
+  private String generateTmpPath(StorageDescriptor sd) {
+    return sd.getLocation() + "/" + TMPDIR + "_" + UUID.randomUUID().toString();
+  }
+
+  private String buildMmCompactionCtQuery(
+      String fullName, Table t, StorageDescriptor sd, String location) {
+    StringBuilder query = new StringBuilder("create temporary table ")
+      .append(fullName).append("(");
+    List<FieldSchema> cols = t.getSd().getCols();
+    boolean isFirst = true;
+    for (FieldSchema col : cols) {
+      if (!isFirst) {
+        query.append(", ");
+      }
+      isFirst = false;
+      query.append("`").append(col.getName()).append("` ").append(col.getType());
+    }
+    query.append(") ");
+
+    // Bucketing.
+    List<String> buckCols = t.getSd().getBucketCols();
+    if (buckCols.size() > 0) {
+      query.append("CLUSTERED BY (").append(StringUtils.join(",", buckCols)).append(") ");
+      List<Order> sortCols = t.getSd().getSortCols();
+      if (sortCols.size() > 0) {
+        query.append("SORTED BY (");
+        List<String> sortKeys = new ArrayList<String>();
+        isFirst = true;
+        for (Order sortCol : sortCols) {
+          if (!isFirst) {
+            query.append(", ");
+          }
+          isFirst = false;
+          query.append(sortCol.getCol()).append(" ");
+          if (sortCol.getOrder() == BaseSemanticAnalyzer.HIVE_COLUMN_ORDER_ASC) {
+            query.append("ASC");
+          } else if (sortCol.getOrder() == BaseSemanticAnalyzer.HIVE_COLUMN_ORDER_DESC) {
+            query.append("DESC");
+          }
+        }
+        query.append(") ");
+      }
+      query.append("INTO ").append(t.getSd().getNumBuckets()).append(" BUCKETS");
+    }
+
+    // Stored as directories. We don't care about the skew otherwise.
+    if (t.getSd().isStoredAsSubDirectories()) {
+      SkewedInfo skewedInfo = t.getSd().getSkewedInfo();
+      if (skewedInfo != null && !skewedInfo.getSkewedColNames().isEmpty()) {
+        query.append(" SKEWED BY (").append(
+            StringUtils.join(", ", skewedInfo.getSkewedColNames())).append(") ON ");
+        isFirst = true;
+        for (List<String> colValues : skewedInfo.getSkewedColValues()) {
+          if (!isFirst) {
+            query.append(", ");
+          }
+          isFirst = false;
+          query.append("('").append(StringUtils.join("','", colValues)).append("')");
+        }
+        query.append(") STORED AS DIRECTORIES");
+      }
+    }
+
+    SerDeInfo serdeInfo = sd.getSerdeInfo();
+    Map<String, String> serdeParams = serdeInfo.getParameters();
+    query.append(" ROW FORMAT SERDE '").append(HiveStringUtils.escapeHiveCommand(
+        serdeInfo.getSerializationLib())).append("'");
+    String sh = t.getParameters().get(hive_metastoreConstants.META_TABLE_STORAGE);
+    assert sh == null; // Not supposed to be a compactable table.
+    if (!serdeParams.isEmpty()) {
+      DDLTask.appendSerdeParams(query, serdeParams);
+    }
+    query.append("STORED AS INPUTFORMAT '").append(
+        HiveStringUtils.escapeHiveCommand(sd.getInputFormat())).append("' OUTPUTFORMAT '").append(
+        HiveStringUtils.escapeHiveCommand(sd.getOutputFormat())).append("' LOCATION '").append(
+        HiveStringUtils.escapeHiveCommand(location)).append("' TBLPROPERTIES (");
+    // Exclude all standard table properties.
+    Set<String> excludes = getHiveMetastoreConstants();
+    excludes.addAll(Lists.newArrayList(StatsSetupConst.TABLE_PARAMS_STATS_KEYS));
+    isFirst = true;
+    for (Map.Entry<String, String> e : t.getParameters().entrySet()) {
+      if (e.getValue() == null) continue;
+      if (excludes.contains(e.getKey())) continue;
+      if (!isFirst) {
+        query.append(", ");
+      }
+      isFirst = false;
+      query.append("'").append(e.getKey()).append("'='").append(
+          HiveStringUtils.escapeHiveCommand(e.getValue())).append("'");
+    }
+    if (!isFirst) {
+      query.append(", ");
+    }
+    query.append("'transactional'='false')");
+    return query.toString();
+
+  }
+
+  private static Set<String> getHiveMetastoreConstants() {
+    HashSet<String> result = new HashSet<>();
+    for (Field f : hive_metastoreConstants.class.getDeclaredFields()) {
+      if (!Modifier.isStatic(f.getModifiers())) continue;
+      if (!Modifier.isFinal(f.getModifiers())) continue;
+      if (!String.class.equals(f.getType())) continue;
+      f.setAccessible(true);
+      try {
+        result.add((String)f.get(null));
+      } catch (IllegalAccessException e) {
+        throw new RuntimeException(e);
+      }
+    }
+    return result;
+  }
+
+  private void runOnDriver(HiveConf conf, String user,
+      SessionState sessionState, String query, ValidWriteIdList writeIds) throws HiveException {
+    boolean isOk = false;
+    try {
+      QueryState qs = new QueryState.Builder().withHiveConf(conf).nonIsolated().build();
+      Driver driver = new Driver(qs, user, null, null);
+      driver.setCompactionWriteIds(writeIds);
+      try {
+        CommandProcessorResponse cpr = driver.run(query);
+        if (cpr.getResponseCode() != 0) {
+          LOG.error("Failed to run " + query, cpr.getException());
+          throw new HiveException("Failed to run " + query, cpr.getException());
+        }
+      } finally {
+        driver.close();
+        driver.destroy();
+      }
+      isOk = true;
+    } finally {
+      if (!isOk) {
+        try {
+          sessionState.close(); // This also resets SessionState.get.
+        } catch (Throwable th) {
+          LOG.warn("Failed to close a bad session", th);
+          SessionState.detachSession();
+        }
+      }
+    }
+  }
+
+  private String buildMmCompactionQuery(HiveConf conf, Table t, Partition p, String tmpName) {
+    String fullName = t.getDbName() + "." + t.getTableName();
+    // TODO: ideally we should make a special form of insert overwrite so that we:
+    //       1) Could use fast merge path for ORC and RC.
+    //       2) Didn't have to create a table.
+
+    String query = "insert overwrite table " + tmpName + " ";
+    String filter = "";
+    if (p != null) {
+      filter = " where ";
+      List<String> vals = p.getValues();
+      List<FieldSchema> keys = t.getPartitionKeys();
+      assert keys.size() == vals.size();
+      for (int i = 0; i < keys.size(); ++i) {
+        filter += (i == 0 ? "`" : " and `") + (keys.get(i).getName() + "`='" + vals.get(i) + "'");
+      }
+      query += " select ";
+      // Use table descriptor for columns.
+      List<FieldSchema> cols = t.getSd().getCols();
+      for (int i = 0; i < cols.size(); ++i) {
+        query += (i == 0 ? "`" : ", `") + (cols.get(i).getName() + "`");
+      }
+    } else {
+      query += "select *";
+    }
+    query += " from "  + fullName + filter;
+    return query;
+  }
+
   /**
    * @param baseDir if not null, it's either table/partition root folder or base_xxxx.
    *                If it's base_xxxx, it's in dirsToSearch, else the actual original files
@@ -309,6 +585,10 @@ public class CompactorMR {
       dirsToSearch = new StringableList();
     }
     StringableList deltaDirs = new StringableList();
+    // Note: if compaction creates a delta, it won't replace an existing base dir, so the txn ID
+    //       of the base dir won't be a part of delta's range. If otoh compaction creates a base,
+    //       we don't care about this value because bases don't have min txn ID in the name.
+    //       However logically this should also take base into account if it's included.
     long minTxn = Long.MAX_VALUE;
     long maxTxn = Long.MIN_VALUE;
     for (AcidUtils.ParsedDelta delta : parsedDeltas) {
@@ -356,7 +636,7 @@ public class CompactorMR {
    * to use.
    * @param job the job to update
    * @param cols the columns of the table
-   * @param map 
+   * @param map
    */
   private void setColumnTypes(JobConf job, List<FieldSchema> cols) {
     StringBuilder colNames = new StringBuilder();
@@ -378,10 +658,7 @@ public class CompactorMR {
   }
 
   // Remove the directories for aborted transactions only
-  private void removeFiles(HiveConf conf, String location, ValidWriteIdList writeIdList, Table t)
-      throws IOException {
-    AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(location), conf, writeIdList,
-        Ref.from(false), false, t.getParameters());
+  private void removeFilesForMmTable(HiveConf conf, Directory dir) throws IOException {
     // For MM table, we only want to delete delta dirs for aborted txns.
     List<FileStatus> abortedDirs = dir.getAbortedDirectories();
     List<Path> filesToDelete = new ArrayList<>(abortedDirs.size());
@@ -389,11 +666,9 @@ public class CompactorMR {
       filesToDelete.add(stat.getPath());
     }
     if (filesToDelete.size() < 1) {
-      LOG.warn("Hmm, nothing to delete in the worker for directory " + location +
-          ", that hardly seems right.");
       return;
     }
-    LOG.info("About to remove " + filesToDelete.size() + " aborted directories from " + location);
+    LOG.info("About to remove " + filesToDelete.size() + " aborted directories from " + dir);
     FileSystem fs = filesToDelete.get(0).getFileSystem(conf);
     for (Path dead : filesToDelete) {
       LOG.debug("Going to delete path " + dead.toString());
@@ -940,6 +1215,7 @@ public class CompactorMR {
       FileStatus[] contents = fs.listStatus(tmpLocation);//expect 1 base or delta dir in this list
       //we have MIN_TXN, MAX_TXN and IS_MAJOR in JobConf so we could figure out exactly what the dir
       //name is that we want to rename; leave it for another day
+      // TODO: if we expect one dir why don't we enforce it?
       for (FileStatus fileStatus : contents) {
         //newPath is the base/delta dir
         Path newPath = new Path(finalLocation, fileStatus.getPath().getName());
@@ -969,4 +1245,35 @@ public class CompactorMR {
       fs.delete(tmpLocation, true);
     }
   }
+
+  /**
+   * Note: similar logic to the main committer; however, no ORC versions and stuff like that.
+   * @param from The temp directory used for compactor output. Not the actual base/delta.
+   * @param to The final directory; basically a SD directory. Not the actual base/delta.
+   */
+  private void commitMmCompaction(String from, String to, Configuration conf,
+      ValidWriteIdList actualWriteIds) throws IOException {
+    Path fromPath = new Path(from), toPath = new Path(to);
+    FileSystem fs = fromPath.getFileSystem(conf);
+    // Assume the high watermark can be used as maximum transaction ID.
+    long maxTxn = actualWriteIds.getHighWatermark();
+    AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf)
+      .writingBase(true).isCompressed(false).maximumWriteId(maxTxn).bucket(0).statementId(-1);
+    Path newBaseDir = AcidUtils.createFilename(toPath, options).getParent();
+    if (!fs.exists(fromPath)) {
+      LOG.info(from + " not found.  Assuming 0 splits. Creating " + newBaseDir);
+      fs.mkdirs(newBaseDir);
+      AcidUtils.MetaDataFile.createCompactorMarker(toPath, fs);
+      return;
+    }
+    LOG.info("Moving contents of " + from + " to " + to);
+    FileStatus[] children = fs.listStatus(fromPath);
+    if (children.length != 1) {
+      throw new IOException("Unexpected files in the source: " + Arrays.toString(children));
+    }
+    FileStatus dirPath = children[0];
+    fs.rename(dirPath.getPath(), newBaseDir);
+    AcidUtils.MetaDataFile.createCompactorMarker(newBaseDir, fs);
+    fs.delete(fromPath, true);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/0dec5952/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index c95daaf..a61b6e8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -240,11 +240,6 @@ public class Initiator extends CompactorThread {
       return CompactionType.MAJOR;
     }
 
-    // If it is for insert-only transactional table, return null.
-    if (AcidUtils.isInsertOnlyTable(tblproperties)) {
-      return null;
-    }
-
     if (runJobAsSelf(runAs)) {
       return determineCompactionType(ci, writeIds, sd, tblproperties);
     } else {
@@ -333,14 +328,20 @@ public class Initiator extends CompactorThread {
         HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD) :
         Integer.parseInt(deltaNumProp);
     boolean enough = deltas.size() > deltaNumThreshold;
-    if (enough) {
-      LOG.debug("Found " + deltas.size() + " delta files, threshold is " + deltaNumThreshold +
-          (enough ? "" : "not") + " and no base, requesting " + (noBase ? "major" : "minor") +
-          " compaction");
-      // If there's no base file, do a major compaction
-      return noBase ? CompactionType.MAJOR : CompactionType.MINOR;
+    if (!enough) {
+      return null;
+    }
+    if (AcidUtils.isInsertOnlyTable(tblproperties)) {
+      LOG.debug("Requesting a major compaction for a MM table; found " + deltas.size()
+          + " delta files, threshold is " + deltaNumThreshold);
+      return CompactionType.MAJOR;
     }
-    return null;
+    // TODO: this log statement looks wrong
+    LOG.debug("Found " + deltas.size() + " delta files, threshold is " + deltaNumThreshold +
+        (enough ? "" : "not") + " and no base, requesting " + (noBase ? "major" : "minor") +
+        " compaction");
+    // If there's no base file, do a major compaction
+    return noBase ? CompactionType.MAJOR : CompactionType.MINOR;
   }
 
   private long sumDirSize(FileSystem fs, Path dir) throws IOException {

http://git-wip-us.apache.org/repos/asf/hive/blob/0dec5952/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
index fe0aaa4..7461299 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
@@ -84,6 +84,7 @@ public class Worker extends CompactorThread {
       // so wrap it in a big catch Throwable statement.
       try {
         final CompactionInfo ci = txnHandler.findNextToCompact(name);
+        LOG.debug("Processing compaction request " + ci);
 
         if (ci == null && !stop.get()) {
           try {
@@ -170,14 +171,15 @@ public class Worker extends CompactorThread {
         launchedJob = true;
         try {
           if (runJobAsSelf(runAs)) {
-            mr.run(conf, jobName.toString(), t, sd, tblValidWriteIds, ci, su, txnHandler);
+            mr.run(conf, jobName.toString(), t, p, sd, tblValidWriteIds, ci, su, txnHandler);
           } else {
             UserGroupInformation ugi = UserGroupInformation.createProxyUser(t.getOwner(),
               UserGroupInformation.getLoginUser());
+            final Partition fp = p;
             ugi.doAs(new PrivilegedExceptionAction<Object>() {
               @Override
               public Object run() throws Exception {
-                mr.run(conf, jobName.toString(), t, sd, tblValidWriteIds, ci, su, txnHandler);
+                mr.run(conf, jobName.toString(), t, fp, sd, tblValidWriteIds, ci, su, txnHandler);
                 return null;
               }
             });

http://git-wip-us.apache.org/repos/asf/hive/blob/0dec5952/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java
index c053860..f357275 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java
@@ -29,6 +29,7 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.curator.shaded.com.google.common.collect.Lists;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
@@ -200,32 +201,7 @@ public class TestTxnCommandsForMmTable extends TxnCommandsBaseForTests {
       Assert.assertTrue(status[i].getPath().getName().matches("delta_.*"));
     }
 
-    // 2. Perform a major compaction.
-    runStatementOnDriver("alter table "+ TableExtended.MMTBL + " compact 'MAJOR'");
-    runWorker(hiveConf);
-    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (TableExtended.MMTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
-    // There should be 2 delta dirs.
-    Assert.assertEquals(2, status.length);
-    boolean sawBase = false;
-    int deltaCount = 0;
-    for (int i = 0; i < status.length; i++) {
-      String dirName = status[i].getPath().getName();
-      if (dirName.matches("delta_.*")) {
-        deltaCount++;
-      } else {
-        sawBase = true;
-        Assert.assertTrue(dirName.matches("base_.*"));
-      }
-    }
-    Assert.assertEquals(2, deltaCount);
-    Assert.assertFalse(sawBase);
-    // Verify query result
-    int [][] resultData = new int[][] {{1,2},{3,4}};
-    List<String> rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBL + " order by a,b");
-    Assert.assertEquals(stringifyValues(resultData), rs);
-
-    // 3. INSERT OVERWRITE
+    // 2. INSERT OVERWRITE
     // Prepare data for the source table
     runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(5,6),(7,8)");
     // Insert overwrite MM table from source table
@@ -235,40 +211,12 @@ public class TestTxnCommandsForMmTable extends TxnCommandsBaseForTests {
     // There should be 2 delta dirs, plus 1 base dir in the location
     Assert.assertEquals(3, status.length);
     int baseCount = 0;
-    deltaCount = 0;
-    for (int i = 0; i < status.length; i++) {
-      String dirName = status[i].getPath().getName();
-      if (dirName.matches("delta_.*")) {
-        deltaCount++;
-      } else {
-        baseCount++;
-      }
-    }
-    Assert.assertEquals(2, deltaCount);
-    Assert.assertEquals(1, baseCount);
-
-    // Verify query result
-    resultData = new int[][] {{5,6},{7,8}};
-    rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBL + " order by a,b");
-    Assert.assertEquals(stringifyValues(resultData), rs);
-
-    // 4. Perform a minor compaction. Nothing should change.
-    // Both deltas and the base dir should have the same name.
-    // Re-verify directory layout and query result by using the same logic as above
-    runStatementOnDriver("alter table "+ TableExtended.MMTBL + " compact 'MINOR'");
-    runWorker(hiveConf);
-    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (TableExtended.MMTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
-    // There should be 2 delta dirs, plus 1 base dir in the location
-    Assert.assertEquals(3, status.length);
-    baseCount = 0;
-    deltaCount = 0;
+    int deltaCount = 0;
     for (int i = 0; i < status.length; i++) {
       String dirName = status[i].getPath().getName();
       if (dirName.matches("delta_.*")) {
         deltaCount++;
       } else {
-        Assert.assertTrue(dirName.matches("base_.*"));
         baseCount++;
       }
     }
@@ -276,19 +224,8 @@ public class TestTxnCommandsForMmTable extends TxnCommandsBaseForTests {
     Assert.assertEquals(1, baseCount);
 
     // Verify query result
-    rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBL + " order by a,b");
-    Assert.assertEquals(stringifyValues(resultData), rs);
-
-    // 5. Run Cleaner. It should remove the 2 delta dirs.
-    runCleaner(hiveConf);
-    // There should be only 1 directory left: base_xxxxxxx.
-    // The delta dirs should have been cleaned up.
-    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (TableExtended.MMTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
-    Assert.assertEquals(1, status.length);
-    Assert.assertTrue(status[0].getPath().getName().matches("base_.*"));
-    // Verify query result
-    rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBL + " order by a,b");
+    int[][] resultData = new int[][] {{5,6},{7,8}};
+    List<String> rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBL + " order by a,b");
     Assert.assertEquals(stringifyValues(resultData), rs);
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/0dec5952/standalone-metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java b/standalone-metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java
index cb1d40a..2b35e6f 100644
--- a/standalone-metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java
+++ b/standalone-metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java
@@ -6,10 +6,11 @@
  */
 package org.apache.hadoop.hive.metastore.api;
 
+import java.lang.reflect.Modifier;
+import java.lang.reflect.Field;
 import org.apache.thrift.scheme.IScheme;
 import org.apache.thrift.scheme.SchemeFactory;
 import org.apache.thrift.scheme.StandardScheme;
-
 import org.apache.thrift.scheme.TupleScheme;
 import org.apache.thrift.protocol.TTupleProtocol;
 import org.apache.thrift.protocol.TProtocolException;
@@ -34,7 +35,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@org.apache.hadoop.classification.InterfaceAudience.Public @org.apache.hadoop.classification.InterfaceStability.Stable public class hive_metastoreConstants {
+@org.apache.hadoop.classification.InterfaceAudience.Public
+@org.apache.hadoop.classification.InterfaceStability.Stable
+public class hive_metastoreConstants {
 
   public static final String DDL_TIME = "transient_lastDdlTime";
 

http://git-wip-us.apache.org/repos/asf/hive/blob/0dec5952/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
index 1c8d223..1138ed3 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
@@ -2403,10 +2403,10 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
   }
 
   @Override
-  public ValidTxnWriteIdList getValidWriteIds(Long currentTxnId, List<String> tablesList, String validTxnList)
-          throws TException {
+  public List<TableValidWriteIds> getValidWriteIds(
+      List<String> tablesList, String validTxnList) throws TException {
     GetValidWriteIdsRequest rqst = new GetValidWriteIdsRequest(tablesList, validTxnList);
-    return TxnUtils.createValidTxnWriteIdList(currentTxnId, client.get_valid_write_ids(rqst));
+    return client.get_valid_write_ids(rqst).getTblValidWriteIds();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/0dec5952/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
index aee416d..72b814d 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
@@ -110,6 +110,7 @@ import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
 import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.TableMeta;
+import org.apache.hadoop.hive.metastore.api.TableValidWriteIds;
 import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
 import org.apache.hadoop.hive.metastore.api.TxnOpenException;
 import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
@@ -2749,14 +2750,13 @@ public interface IMetaStoreClient {
 
   /**
    * Get a structure that details valid write ids list for all tables read by current txn.
-   * @param currentTxnId current txn ID for which we try to get valid write ids list
    * @param tablesList list of tables (format: <db_name>.<table_name>) read from the current transaction
    *                   for which needs to populate the valid write ids
    * @param validTxnList snapshot of valid txns for the current txn
    * @return list of valid write ids for the given list of tables.
    * @throws TException
    */
-  ValidTxnWriteIdList getValidWriteIds(Long currentTxnId, List<String> tablesList, String validTxnList)
+  List<TableValidWriteIds> getValidWriteIds(List<String> tablesList, String validTxnList)
           throws TException;
 
   /**

http://git-wip-us.apache.org/repos/asf/hive/blob/0dec5952/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
index 7b02865..1880d44 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
 import org.apache.hadoop.hive.common.ValidReadTxnList;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.metastore.TransactionalValidationListener;
 import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse;
 import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsResponse;
@@ -80,13 +81,13 @@ public class TxnUtils {
    * {@link org.apache.hadoop.hive.common.ValidTxnWriteIdList}.  This assumes that the caller intends to
    * read the files, and thus treats both open and aborted transactions as invalid.
    * @param currentTxnId current txn ID for which we get the valid write ids list
-   * @param validWriteIds valid write ids list from the metastore
+   * @param list valid write ids list from the metastore
    * @return a valid write IDs list for the whole transaction.
    */
   public static ValidTxnWriteIdList createValidTxnWriteIdList(Long currentTxnId,
-                                                              GetValidWriteIdsResponse validWriteIds) {
+                                                              List<TableValidWriteIds> validIds) {
     ValidTxnWriteIdList validTxnWriteIdList = new ValidTxnWriteIdList(currentTxnId);
-    for (TableValidWriteIds tableWriteIds : validWriteIds.getTblValidWriteIds()) {
+    for (TableValidWriteIds tableWriteIds : validIds) {
       validTxnWriteIdList.addTableValidWriteIdList(createValidReaderWriteIdList(tableWriteIds));
     }
     return validTxnWriteIdList;
@@ -155,6 +156,17 @@ public class TxnUtils {
     }
   }
 
+  public static ValidReaderWriteIdList updateForCompactionQuery(ValidReaderWriteIdList ids) {
+    // This is based on the existing valid write ID list that was built for a select query;
+    // therefore we assume all the aborted txns, etc. were already accounted for.
+    // All we do is adjust the high watermark to only include contiguous txns.
+    Long minOpenWriteId = ids.getMinOpenWriteId();
+    if (minOpenWriteId != null && minOpenWriteId != Long.MAX_VALUE) {
+      return ids.updateHighWatermark(ids.getMinOpenWriteId() - 1);
+    }
+    return ids;
+  }
+
   /**
    * Get an instance of the TxnStore that is appropriate for this store
    * @param conf configuration
@@ -212,7 +224,7 @@ public class TxnUtils {
 
 
   /**
-   * Build a query (or queries if one query is too big but only for the case of 'IN' 
+   * Build a query (or queries if one query is too big but only for the case of 'IN'
    * composite clause. For the case of 'NOT IN' clauses, multiple queries change
    * the semantics of the intended query.
    * E.g., Let's assume that input "inList" parameter has [5, 6] and that

http://git-wip-us.apache.org/repos/asf/hive/blob/0dec5952/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java
index bf87cfc..8ae899f 100644
--- a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java
+++ b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java
@@ -2140,10 +2140,10 @@ public class HiveMetaStoreClientPreCatalog implements IMetaStoreClient, AutoClos
   }
 
   @Override
-  public ValidTxnWriteIdList getValidWriteIds(Long currentTxnId, List<String> tablesList, String validTxnList)
+  public List<TableValidWriteIds> getValidWriteIds(List<String> tablesList, String validTxnList)
           throws TException {
     GetValidWriteIdsRequest rqst = new GetValidWriteIdsRequest(tablesList, validTxnList);
-    return TxnUtils.createValidTxnWriteIdList(currentTxnId, client.get_valid_write_ids(rqst));
+    return client.get_valid_write_ids(rqst).getTblValidWriteIds();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/0dec5952/storage-api/src/java/org/apache/hadoop/hive/common/ValidReaderWriteIdList.java
----------------------------------------------------------------------
diff --git a/storage-api/src/java/org/apache/hadoop/hive/common/ValidReaderWriteIdList.java b/storage-api/src/java/org/apache/hadoop/hive/common/ValidReaderWriteIdList.java
index 107ea90..95a0b56 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/common/ValidReaderWriteIdList.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/common/ValidReaderWriteIdList.java
@@ -250,5 +250,9 @@ public class ValidReaderWriteIdList implements ValidWriteIdList {
       return RangeResponse.SOME;
     }
   }
+
+  public ValidReaderWriteIdList updateHighWatermark(long value) {
+    return new ValidReaderWriteIdList(tableName, exceptions, abortedBits, value, minOpenWriteId);
+  }
 }
 


[2/2] hive git commit: HIVE-19124 : implement a basic major compactor for MM tables (Sergey Shelukhin, reviewed by Eugene Koifman and Gopal Vijayaraghavan)

Posted by se...@apache.org.
HIVE-19124 : implement a basic major compactor for MM tables (Sergey Shelukhin, reviewed by Eugene Koifman and Gopal Vijayaraghavan)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/ea18769f
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/ea18769f
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/ea18769f

Branch: refs/heads/branch-3
Commit: ea18769f026429ea6ebbbd66858920ebf869a9d6
Parents: cc0ef46
Author: sergey <se...@apache.org>
Authored: Thu Apr 26 17:19:33 2018 -0700
Committer: sergey <se...@apache.org>
Committed: Thu Apr 26 17:19:52 2018 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   3 +
 .../hive/ql/txn/compactor/TestCompactor.java    | 261 ++++++++++++--
 .../java/org/apache/hadoop/hive/ql/Driver.java  |  28 +-
 .../org/apache/hadoop/hive/ql/exec/DDLTask.java |   3 +-
 .../hadoop/hive/ql/lockmgr/DbTxnManager.java    |  19 +-
 .../hadoop/hive/ql/lockmgr/DummyTxnManager.java |   2 -
 .../hadoop/hive/ql/lockmgr/HiveTxnManager.java  |   3 +-
 .../apache/hadoop/hive/ql/plan/PlanUtils.java   |   3 +
 .../hive/ql/txn/compactor/CompactorMR.java      | 337 ++++++++++++++++++-
 .../hadoop/hive/ql/txn/compactor/Initiator.java |  25 +-
 .../hadoop/hive/ql/txn/compactor/Worker.java    |   6 +-
 .../hive/ql/TestTxnCommandsForMmTable.java      |  73 +---
 .../metastore/api/hive_metastoreConstants.java  |   7 +-
 .../hive/metastore/HiveMetaStoreClient.java     |   6 +-
 .../hadoop/hive/metastore/IMetaStoreClient.java |   4 +-
 .../hadoop/hive/metastore/txn/TxnUtils.java     |  20 +-
 .../HiveMetaStoreClientPreCatalog.java          |   4 +-
 .../hive/common/ValidReaderWriteIdList.java     |   4 +
 18 files changed, 654 insertions(+), 154 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/ea18769f/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index b588ebb..3e40f63 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2531,6 +2531,9 @@ public class HiveConf extends Configuration {
         new TimeValidator(TimeUnit.MILLISECONDS), "Time between runs of the cleaner thread"),
     COMPACTOR_JOB_QUEUE("hive.compactor.job.queue", "", "Used to specify name of Hadoop queue to which\n" +
       "Compaction jobs will be submitted.  Set to empty string to let Hadoop choose the queue."),
+
+    HIVE_COMPACTOR_COMPACT_MM("hive.compactor.compact.insert.only", true,
+        "Whether the compactor should compact insert-only tables. A safety switch."),
     /**
      * @deprecated Use MetastoreConf.COMPACTOR_HISTORY_RETENTION_SUCCEEDED
      */

http://git-wip-us.apache.org/repos/asf/hive/blob/ea18769f/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
index b19aa23..5b5d818 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hive.ql.txn.compactor;
 
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -30,11 +31,13 @@ import java.util.TreeSet;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.curator.shaded.com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hive.cli.CliSessionState;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -45,9 +48,11 @@ import org.apache.hadoop.hive.metastore.api.CompactionRequest;
 import org.apache.hadoop.hive.metastore.api.CompactionType;
 import org.apache.hadoop.hive.metastore.api.LongColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
 import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
 import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
@@ -704,16 +709,7 @@ public class TestCompactor {
       // it has an open txn in it
       writeBatch(connection, writer, true);
 
-      // Now, compact
-      TxnStore txnHandler = TxnUtils.getTxnStore(conf);
-      txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MAJOR));
-      Worker t = new Worker();
-      t.setThreadId((int) t.getId());
-      t.setConf(conf);
-      AtomicBoolean stop = new AtomicBoolean(true);
-      AtomicBoolean looped = new AtomicBoolean();
-      t.init(stop, looped);
-      t.run();
+      runMajorCompaction(dbName, tblName);
 
       // Find the location of the table
       IMetaStoreClient msClient = new HiveMetaStoreClient(conf);
@@ -827,16 +823,7 @@ public class TestCompactor {
       txnBatch.abort();
 
 
-      // Now, compact
-      TxnStore txnHandler = TxnUtils.getTxnStore(conf);
-      txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MAJOR));
-      Worker t = new Worker();
-      t.setThreadId((int) t.getId());
-      t.setConf(conf);
-      AtomicBoolean stop = new AtomicBoolean(true);
-      AtomicBoolean looped = new AtomicBoolean();
-      t.init(stop, looped);
-      t.run();
+      runMajorCompaction(dbName, tblName);
 
       // Find the location of the table
       IMetaStoreClient msClient = new HiveMetaStoreClient(conf);
@@ -860,6 +847,228 @@ public class TestCompactor {
     }
   }
 
+
+  @Test
+  public void mmTable() throws Exception {
+    String dbName = "default";
+    String tblName = "mm_nonpart";
+    executeStatementOnDriver("drop table if exists " + tblName, driver);
+    executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) STORED AS ORC" +
+        " TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')",
+        driver);
+    IMetaStoreClient msClient = new HiveMetaStoreClient(conf);
+    Table table = msClient.getTable(dbName, tblName);
+    msClient.close();
+
+    executeStatementOnDriver("INSERT INTO " + tblName +"(a,b) VALUES(1, 'foo')", driver);
+    executeStatementOnDriver("INSERT INTO " + tblName +"(a,b) VALUES(2, 'bar')", driver);
+
+    verifyFooBarResult(tblName, 1);
+
+    // Check that we have two deltas.
+    FileSystem fs = FileSystem.get(conf);
+    verifyDeltaCount(table.getSd(), fs, 2);
+
+    runMajorCompaction(dbName, tblName);
+    verifyFooBarResult(tblName, 1);
+    verifyHasBase(table.getSd(), fs, "base_0000002");
+
+    // Make sure we don't compact if we don't need to compact.
+    runMajorCompaction(dbName, tblName);
+    verifyFooBarResult(tblName, 1);
+    verifyHasBase(table.getSd(), fs, "base_0000002");
+  }
+
+  @Test
+  public void mmTableBucketed() throws Exception {
+    String dbName = "default";
+    String tblName = "mm_nonpart";
+    executeStatementOnDriver("drop table if exists " + tblName, driver);
+    executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) CLUSTERED BY (a) " +
+        "INTO 64 BUCKETS STORED AS ORC TBLPROPERTIES ('transactional'='true', " +
+        "'transactional_properties'='insert_only')", driver);
+    IMetaStoreClient msClient = new HiveMetaStoreClient(conf);
+    Table table = msClient.getTable(dbName, tblName);
+    msClient.close();
+
+    executeStatementOnDriver("INSERT INTO " + tblName +"(a,b) VALUES(1, 'foo')", driver);
+    executeStatementOnDriver("INSERT INTO " + tblName +"(a,b) VALUES(2, 'bar')", driver);
+
+    verifyFooBarResult(tblName, 1);
+
+    // Check that we have two deltas.
+    FileSystem fs = FileSystem.get(conf);
+    verifyDeltaCount(table.getSd(), fs, 2);
+
+    runMajorCompaction(dbName, tblName);
+    verifyFooBarResult(tblName, 1);
+    String baseDir = "base_0000002";
+    verifyHasBase(table.getSd(), fs, baseDir);
+
+    FileStatus[] files = fs.listStatus(new Path(table.getSd().getLocation(), baseDir),
+        AcidUtils.hiddenFileFilter);
+    Assert.assertEquals(Lists.newArrayList(files).toString(), 64, files.length);
+  }
+
+  @Test
+  public void mmTableOpenWriteId() throws Exception {
+    String dbName = "default";
+    String tblName = "mm_nonpart";
+    executeStatementOnDriver("drop table if exists " + tblName, driver);
+    executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) STORED AS TEXTFILE" +
+        " TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')",
+        driver);
+    IMetaStoreClient msClient = new HiveMetaStoreClient(conf);
+    Table table = msClient.getTable(dbName, tblName);
+    msClient.close();
+
+    executeStatementOnDriver("INSERT INTO " + tblName +"(a,b) VALUES(1, 'foo')", driver);
+    executeStatementOnDriver("INSERT INTO " + tblName +"(a,b) VALUES(2, 'bar')", driver);
+
+    verifyFooBarResult(tblName, 1);
+
+    long openTxnId = msClient.openTxn("test");
+    long openWriteId = msClient.allocateTableWriteId(openTxnId, dbName, tblName);
+    Assert.assertEquals(3, openWriteId); // Just check to make sure base_5 below is not new.
+
+    executeStatementOnDriver("INSERT INTO " + tblName +"(a,b) VALUES(1, 'foo')", driver);
+    executeStatementOnDriver("INSERT INTO " + tblName +"(a,b) VALUES(2, 'bar')", driver);
+
+    verifyFooBarResult(tblName, 2);
+
+    runMajorCompaction(dbName, tblName); // Don't compact 4 and 5; 3 is opened.
+    FileSystem fs = FileSystem.get(conf);
+    verifyHasBase(table.getSd(), fs, "base_0000002");
+    verifyDirCount(table.getSd(), fs, 1, AcidUtils.baseFileFilter);
+    verifyFooBarResult(tblName, 2);
+
+    runCleaner(conf);
+    verifyHasDir(table.getSd(), fs, "delta_0000004_0000004_0000", AcidUtils.deltaFileFilter);
+    verifyHasDir(table.getSd(), fs, "delta_0000005_0000005_0000", AcidUtils.deltaFileFilter);
+    verifyFooBarResult(tblName, 2);
+
+    msClient.abortTxns(Lists.newArrayList(openTxnId)); // Now abort 3.
+    runMajorCompaction(dbName, tblName); // Compact 4 and 5.
+    verifyFooBarResult(tblName, 2);
+    verifyHasBase(table.getSd(), fs, "base_0000005"); 
+    runCleaner(conf);
+    verifyDeltaCount(table.getSd(), fs, 0);
+  }
+
+  private void verifyHasBase(
+      StorageDescriptor sd, FileSystem fs, String baseName) throws Exception {
+    verifyHasDir(sd, fs, baseName, AcidUtils.baseFileFilter);
+  }
+
+  private void verifyHasDir(
+      StorageDescriptor sd, FileSystem fs, String name, PathFilter filter) throws Exception {
+    FileStatus[] stat = fs.listStatus(new Path(sd.getLocation()), filter);
+    for (FileStatus file : stat) {
+      if (name.equals(file.getPath().getName())) return;
+    }
+    Assert.fail("Cannot find " + name + ": " + Arrays.toString(stat));
+  }
+
+  private void verifyDeltaCount(
+      StorageDescriptor sd, FileSystem fs, int count) throws Exception {
+    verifyDirCount(sd, fs, count, AcidUtils.deltaFileFilter);
+  }
+
+  private void verifyDirCount(
+      StorageDescriptor sd, FileSystem fs, int count, PathFilter filter) throws Exception {
+    FileStatus[] stat = fs.listStatus(new Path(sd.getLocation()), filter);
+    Assert.assertEquals(Arrays.toString(stat), count, stat.length);
+  }
+
+  @Test
+  public void mmTablePartitioned() throws Exception {
+    String dbName = "default";
+    String tblName = "mm_part";
+    executeStatementOnDriver("drop table if exists " + tblName, driver);
+    executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
+        " PARTITIONED BY(ds int) STORED AS TEXTFILE" +
+        " TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')",
+        driver);
+
+    executeStatementOnDriver("INSERT INTO " + tblName + " partition (ds) VALUES(1, 'foo', 1)", driver);
+    executeStatementOnDriver("INSERT INTO " + tblName + " partition (ds) VALUES(1, 'foo', 1)", driver);
+    executeStatementOnDriver("INSERT INTO " + tblName + " partition (ds) VALUES(2, 'bar', 1)", driver);
+    executeStatementOnDriver("INSERT INTO " + tblName + " partition (ds) VALUES(1, 'foo', 2)", driver);
+    executeStatementOnDriver("INSERT INTO " + tblName + " partition (ds) VALUES(2, 'bar', 2)", driver);
+    executeStatementOnDriver("INSERT INTO " + tblName + " partition (ds) VALUES(2, 'bar', 3)", driver);
+
+    verifyFooBarResult(tblName, 3);
+
+    IMetaStoreClient msClient = new HiveMetaStoreClient(conf);
+    Partition p1 = msClient.getPartition(dbName, tblName, "ds=1"),
+        p2 = msClient.getPartition(dbName, tblName, "ds=2"),
+        p3 = msClient.getPartition(dbName, tblName, "ds=3");
+    msClient.close();
+ 
+    FileSystem fs = FileSystem.get(conf);
+    verifyDeltaCount(p1.getSd(), fs, 3);
+    verifyDeltaCount(p2.getSd(), fs, 2);
+    verifyDeltaCount(p3.getSd(), fs, 1);
+
+    runMajorCompaction(dbName, tblName, "ds=1", "ds=2", "ds=3");
+
+    verifyFooBarResult(tblName, 3);
+    verifyDeltaCount(p3.getSd(), fs, 1);
+    verifyHasBase(p1.getSd(), fs, "base_0000006");
+    verifyHasBase(p2.getSd(), fs, "base_0000006");
+
+    executeStatementOnDriver("INSERT INTO " + tblName + " partition (ds) VALUES(1, 'foo', 2)", driver);
+    executeStatementOnDriver("INSERT INTO " + tblName + " partition (ds) VALUES(2, 'bar', 2)", driver);
+
+    runMajorCompaction(dbName, tblName, "ds=1", "ds=2", "ds=3");
+
+    // Make sure we don't compact if we don't need to compact; but do if we do.
+    verifyFooBarResult(tblName, 4);
+    verifyDeltaCount(p3.getSd(), fs, 1);
+    verifyHasBase(p1.getSd(), fs, "base_0000006");
+    verifyHasBase(p2.getSd(), fs, "base_0000008");
+
+  }
+
+  private void verifyFooBarResult(String tblName, int count) throws Exception, IOException {
+    List<String> valuesReadFromHiveDriver = new ArrayList<String>();
+    executeStatementOnDriver("SELECT a,b FROM " + tblName, driver);
+    driver.getResults(valuesReadFromHiveDriver);
+    Assert.assertEquals(2 * count, valuesReadFromHiveDriver.size());
+    int fooCount = 0, barCount = 0;
+    for (String s : valuesReadFromHiveDriver) {
+      if ("1\tfoo".equals(s)) {
+        ++fooCount;
+      } else if ("2\tbar".equals(s)) {
+        ++barCount;
+      } else {
+        Assert.fail("Unexpected " + s);
+      }
+    }
+    Assert.assertEquals(fooCount, count);
+    Assert.assertEquals(barCount, count);
+  }
+
+  private void runMajorCompaction(
+      String dbName, String tblName, String... partNames) throws MetaException {
+    TxnStore txnHandler = TxnUtils.getTxnStore(conf);
+    Worker t = new Worker();
+    t.setThreadId((int) t.getId());
+    t.setConf(conf);
+    t.init(new AtomicBoolean(true), new AtomicBoolean());
+    if (partNames.length == 0) {
+      txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MAJOR));
+      t.run();
+    } else {
+      for (String partName : partNames) {
+        CompactionRequest cr = new CompactionRequest(dbName, tblName, CompactionType.MAJOR);
+        cr.setPartitionname(partName);
+        txnHandler.compact(cr);
+        t.run();
+      }
+    }
+  }
+
   @Test
   public void majorCompactWhileStreamingForSplitUpdate() throws Exception {
     String dbName = "default";
@@ -885,16 +1094,7 @@ public class TestCompactor {
       // Start a third batch, but don't close it.
       writeBatch(connection, writer, true);
 
-      // Now, compact
-      TxnStore txnHandler = TxnUtils.getTxnStore(conf);
-      txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MAJOR));
-      Worker t = new Worker();
-      t.setThreadId((int) t.getId());
-      t.setConf(conf);
-      AtomicBoolean stop = new AtomicBoolean(true);
-      AtomicBoolean looped = new AtomicBoolean();
-      t.init(stop, looped);
-      t.run();
+      runMajorCompaction(dbName, tblName);
 
       // Find the location of the table
       IMetaStoreClient msClient = new HiveMetaStoreClient(conf);
@@ -1443,6 +1643,7 @@ public class TestCompactor {
       throw new IOException("Failed to execute \"" + cmd + "\". Driver returned: " + cpr);
     }
   }
+
   static void createTestDataFile(String filename, String[] lines) throws IOException {
     FileWriter writer = null;
     try {

http://git-wip-us.apache.org/repos/asf/hive/blob/ea18769f/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index e8feeb8..e8c9ea0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -46,6 +46,8 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.common.ValidCompactorWriteIdList;
+import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
@@ -61,6 +63,8 @@ import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Schema;
+import org.apache.hadoop.hive.metastore.api.TableValidWriteIds;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.hadoop.hive.ql.cache.results.CacheUsage;
 import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache;
 import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache.CacheEntry;
@@ -209,6 +213,7 @@ public class Driver implements IDriver {
 
   private CacheUsage cacheUsage;
   private CacheEntry usedCacheEntry;
+  private ValidWriteIdList compactionWriteIds = null;
 
   private enum DriverState {
     INITIALIZED,
@@ -541,8 +546,10 @@ public class Driver implements IDriver {
 
     conf.setQueryString(queryStr);
     // FIXME: sideeffect will leave the last query set at the session level
-    SessionState.get().getConf().setQueryString(queryStr);
-    SessionState.get().setupQueryCurrentTimestamp();
+    if (SessionState.get() != null) {
+      SessionState.get().getConf().setQueryString(queryStr);
+      SessionState.get().setupQueryCurrentTimestamp();
+    }
 
     // Whether any error occurred during query compilation. Used for query lifetime hook.
     boolean compileError = false;
@@ -1309,7 +1316,18 @@ public class Driver implements IDriver {
       throw new IllegalStateException("calling recordValidWritsIdss() without initializing ValidTxnList " +
               JavaUtils.txnIdToString(txnMgr.getCurrentTxnId()));
     }
-    ValidTxnWriteIdList txnWriteIds = txnMgr.getValidWriteIds(getTransactionalTableList(plan), txnString);
+    List<String> txnTables = getTransactionalTableList(plan);
+    ValidTxnWriteIdList txnWriteIds = null;
+    if (compactionWriteIds != null) {
+      if (txnTables.size() != 1) {
+        throw new LockException("Unexpected tables in compaction: " + txnTables);
+      }
+      String fullTableName = txnTables.get(0);
+      txnWriteIds = new ValidTxnWriteIdList(0L); // No transaction for the compaction for now.
+      txnWriteIds.addTableValidWriteIdList(compactionWriteIds);
+    } else {
+      txnWriteIds = txnMgr.getValidWriteIds(txnTables, txnString);
+    }
     String writeIdStr = txnWriteIds.toString();
     conf.set(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY, writeIdStr);
     if (plan.getFetchTask() != null) {
@@ -2773,4 +2791,8 @@ public class Driver implements IDriver {
       return false;
     }
   }
+
+  public void setCompactionWriteIds(ValidWriteIdList val) {
+    this.compactionWriteIds = val;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/ea18769f/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index 5b8a120..d6f5666 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@ -2724,7 +2724,8 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
     return prop_string;
   }
 
-  private StringBuilder appendSerdeParams(StringBuilder builder, Map<String, String> serdeParam) {
+  public static StringBuilder appendSerdeParams(
+      StringBuilder builder, Map<String, String> serdeParam) {
     serdeParam = new TreeMap<String, String>(serdeParam);
     builder.append("WITH SERDEPROPERTIES ( \n");
     List<String> serdeCols = new ArrayList<String>();

http://git-wip-us.apache.org/repos/asf/hive/blob/ea18769f/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index 7b7fd5d..76569d5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -18,6 +18,8 @@
 package org.apache.hadoop.hive.ql.lockmgr;
 
 import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.curator.shaded.com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
@@ -31,12 +33,14 @@ import org.apache.hive.common.util.ShutdownHookManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.common.ValidCompactorWriteIdList;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.LockComponentBuilder;
 import org.apache.hadoop.hive.metastore.LockRequestBuilder;
 import org.apache.hadoop.hive.metastore.api.*;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.QueryPlan;
@@ -69,7 +73,7 @@ import java.util.concurrent.atomic.AtomicReference;
  * with a single thread accessing it at a time, with the exception of {@link #heartbeat()} method.
  * The later may (usually will) be called from a timer thread.
  * See {@link #getMS()} for more important concurrency/metastore access notes.
- * 
+ *
  * Each statement that the TM (transaction manager) should be aware of should belong to a transaction.
  * Effectively, that means any statement that has side effects.  Exceptions are statements like
  * Show Compactions, Show Tables, Use Database foo, etc.  The transaction is started either
@@ -111,9 +115,9 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
   /**
    * if {@code true} it means current transaction is started via START TRANSACTION which means it cannot
    * include any Operations which cannot be rolled back (drop partition; write to  non-acid table).
-   * If false, it's a single statement transaction which can include any statement.  This is not a 
+   * If false, it's a single statement transaction which can include any statement.  This is not a
    * contradiction from the user point of view who doesn't know anything about the implicit txn
-   * and cannot call rollback (the statement of course can fail in which case there is nothing to 
+   * and cannot call rollback (the statement of course can fail in which case there is nothing to
    * rollback (assuming the statement is well implemented)).
    *
    * This is done so that all commands run in a transaction which simplifies implementation and
@@ -292,7 +296,7 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
   /**
    * Ensures that the current SQL statement is appropriate for the current state of the
    * Transaction Manager (e.g. can call commit unless you called start transaction)
-   * 
+   *
    * Note that support for multi-statement txns is a work-in-progress so it's only supported in
    * HiveConf#HIVE_IN_TEST/HiveConf#TEZ_HIVE_IN_TEST.
    * @param queryPlan
@@ -300,7 +304,7 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
    */
   private void verifyState(QueryPlan queryPlan) throws LockException {
     if(!isTxnOpen()) {
-      throw new LockException("No transaction context for operation: " + queryPlan.getOperationName() + 
+      throw new LockException("No transaction context for operation: " + queryPlan.getOperationName() +
         " for " + getQueryIdWaterMark(queryPlan));
     }
     if(queryPlan.getOperation() == null) {
@@ -820,7 +824,8 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
     assert isTxnOpen();
     assert validTxnList != null && !validTxnList.isEmpty();
     try {
-      return getMS().getValidWriteIds(txnId, tableList, validTxnList);
+      return TxnUtils.createValidTxnWriteIdList(
+          txnId, getMS().getValidWriteIds(tableList, validTxnList));
     } catch (TException e) {
       throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(), e);
     }
@@ -1013,7 +1018,7 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
       throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(), e);
     }
   }
-  
+
   public void replAllocateTableWriteIdsBatch(String dbName, String tableName, String replPolicy,
                                              List<TxnToWriteId> srcTxnToWriteIdList) throws LockException {
     try {

http://git-wip-us.apache.org/repos/asf/hive/blob/ea18769f/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
index 78eedd3..a74670b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
@@ -394,6 +394,4 @@ class DummyTxnManager extends HiveTxnManagerImpl {
     }
     return locks;
   }
-
-
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/ea18769f/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
index ec11fec..f239535 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hive.ql.plan.LockDatabaseDesc;
 import org.apache.hadoop.hive.ql.plan.LockTableDesc;
 import org.apache.hadoop.hive.ql.plan.UnlockDatabaseDesc;
 import org.apache.hadoop.hive.ql.plan.UnlockTableDesc;
+
 import java.util.List;
 
 /**
@@ -253,7 +254,7 @@ public interface HiveTxnManager {
   boolean recordSnapshot(QueryPlan queryPlan);
 
   boolean isImplicitTransactionOpen();
-  
+
   boolean isTxnOpen();
   /**
    * if {@code isTxnOpen()}, returns the currently active transaction ID.

http://git-wip-us.apache.org/repos/asf/hive/blob/ea18769f/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
index dde20ed..056dfa4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
@@ -110,6 +110,9 @@ public final class PlanUtils {
 
   public static TableDesc getDefaultTableDesc(CreateTableDesc directoryDesc,
       String cols, String colTypes ) {
+    // TODO: this should have an option for directory to inherit from the parent table,
+    //       including bucketing and list bucketing, for the use in compaction when the
+    //       latter runs inside a transaction.
     TableDesc ret = getDefaultTableDesc(Integer.toString(Utilities.ctrlaCode), cols,
         colTypes, false);;
     if (directoryDesc == null) {

http://git-wip-us.apache.org/repos/asf/hive/blob/ea18769f/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index b1c2288..b698c84 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -17,13 +17,21 @@
  */
 package org.apache.hadoop.hive.ql.txn.compactor;
 
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.HashSet;
+
+import com.google.common.collect.Lists;
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 import java.util.regex.Matcher;
 
@@ -33,24 +41,43 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.common.StringableMap;
 import org.apache.hadoop.hive.common.ValidCompactorWriteIdList;
+import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
 import org.apache.hadoop.hive.metastore.api.CompactionType;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.SkewedInfo;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.QueryState;
+import org.apache.hadoop.hive.ql.exec.DDLTask;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
 import org.apache.hadoop.hive.ql.io.AcidInputFormat;
 import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.AcidUtils.Directory;
 import org.apache.hadoop.hive.ql.io.HiveInputFormat;
 import org.apache.hadoop.hive.ql.io.IOConstants;
 import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId;
 import org.apache.hadoop.hive.shims.ShimLoader;
@@ -70,7 +97,9 @@ import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.TaskAttemptContext;
 import org.apache.hadoop.mapred.lib.NullOutputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hive.common.util.HiveStringUtils;
 import org.apache.hive.common.util.Ref;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -128,7 +157,7 @@ public class CompactorMR {
     }
 
     job.set(FINAL_LOCATION, sd.getLocation());
-    job.set(TMP_LOCATION, sd.getLocation() + "/" + TMPDIR + "_" + UUID.randomUUID().toString());
+    job.set(TMP_LOCATION, generateTmpPath(sd));
     job.set(INPUT_FORMAT_CLASS_NAME, sd.getInputFormat());
     job.set(OUTPUT_FORMAT_CLASS_NAME, sd.getOutputFormat());
     job.setBoolean(IS_COMPRESSED, sd.isCompressed());
@@ -200,19 +229,17 @@ public class CompactorMR {
    * @param ci CompactionInfo
    * @throws java.io.IOException if the job fails
    */
-  void run(HiveConf conf, String jobName, Table t, StorageDescriptor sd, ValidWriteIdList writeIds,
+  void run(HiveConf conf, String jobName, Table t, Partition p, StorageDescriptor sd, ValidWriteIdList writeIds,
            CompactionInfo ci, Worker.StatsUpdater su, TxnStore txnHandler) throws IOException {
 
     if(conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST) && conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION)) {
       throw new RuntimeException(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION.name() + "=true");
     }
 
-    // For MM tables we don't need to launch MR jobs as there is no compaction needed.
-    // We just need to delete the directories for aborted transactions.
     if (AcidUtils.isInsertOnlyTable(t.getParameters())) {
-      LOG.debug("Going to delete directories for aborted transactions for MM table "
-          + t.getDbName() + "." + t.getTableName());
-      removeFiles(conf, sd.getLocation(), writeIds, t);
+      if (HiveConf.getBoolVar(conf, ConfVars.HIVE_COMPACTOR_COMPACT_MM)) {
+        runMmCompaction(conf, t, p, sd, writeIds, ci);
+      }
       return;
     }
 
@@ -294,6 +321,255 @@ public class CompactorMR {
     su.gatherStats();
   }
 
+  private void runMmCompaction(HiveConf conf, Table t, Partition p,
+      StorageDescriptor sd, ValidWriteIdList writeIds, CompactionInfo ci) throws IOException {
+    LOG.debug("Going to delete directories for aborted transactions for MM table "
+        + t.getDbName() + "." + t.getTableName());
+    AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(sd.getLocation()),
+        conf, writeIds, Ref.from(false), false, t.getParameters());
+    removeFilesForMmTable(conf, dir);
+
+    // Then, actually do the compaction.
+    if (!ci.isMajorCompaction()) {
+      // Not supported for MM tables right now.
+      LOG.info("Not compacting " + sd.getLocation() + "; not a major compaction");
+      return;
+    }
+
+    int deltaCount = dir.getCurrentDirectories().size();
+    if ((deltaCount + (dir.getBaseDirectory() == null ? 0 : 1)) <= 1) {
+      LOG.debug("Not compacting " + sd.getLocation() + "; current base is "
+        + dir.getBaseDirectory() + " and there are " + deltaCount + " deltas");
+      return;
+    }
+    try {
+      String tmpLocation = generateTmpPath(sd);
+      Path baseLocation = new Path(tmpLocation, "_base");
+
+      // Set up the session for driver.
+      conf = new HiveConf(conf);
+      conf.set(ConfVars.HIVE_QUOTEDID_SUPPORT.varname, "column");
+
+      String user = UserGroupInformation.getCurrentUser().getShortUserName();
+      SessionState sessionState = setUpSessionState(conf, user);
+
+      // Note: we could skip creating the table and just add table type stuff directly to the
+      //       "insert overwrite directory" command if there were no bucketing or list bucketing.
+      String tmpPrefix = t.getDbName() + ".tmp_compactor_" + t.getTableName() + "_", tmpTableName;
+      while (true) {
+        tmpTableName = tmpPrefix + System.currentTimeMillis();
+        String query = buildMmCompactionCtQuery(tmpTableName, t,
+            p == null ? t.getSd() : p.getSd(), baseLocation.toString());
+        LOG.info("Compacting a MM table into " + query);
+        try {
+          runOnDriver(conf, user, sessionState, query, null);
+          break;
+        } catch (Exception ex) {
+          Throwable cause = ex;
+          while (cause != null && !(cause instanceof AlreadyExistsException)) {
+            cause = cause.getCause();
+          }
+          if (cause == null) {
+            throw new IOException(ex);
+          }
+        }
+      }
+
+      String query = buildMmCompactionQuery(conf, t, p, tmpTableName);
+      LOG.info("Compacting a MM table via " + query);
+      runOnDriver(conf, user, sessionState, query, writeIds);
+      commitMmCompaction(tmpLocation, sd.getLocation(), conf, writeIds);
+      runOnDriver(conf, user, sessionState, "drop table if exists " + tmpTableName, null);
+    } catch (HiveException e) {
+      LOG.error("Error compacting a MM table", e);
+      throw new IOException(e);
+    }
+  }
+
+  public SessionState setUpSessionState(HiveConf conf, String user) {
+    SessionState sessionState = SessionState.get();
+    if (sessionState == null) {
+      // Note: we assume that workers run on the same threads repeatedly, so we can set up
+      //       the session here and it will be reused without explicitly storing in the worker.
+      sessionState = new SessionState(conf, user);
+      SessionState.setCurrentSessionState(sessionState);
+    }
+    return sessionState;
+  }
+
+  private String generateTmpPath(StorageDescriptor sd) {
+    return sd.getLocation() + "/" + TMPDIR + "_" + UUID.randomUUID().toString();
+  }
+
+  private String buildMmCompactionCtQuery(
+      String fullName, Table t, StorageDescriptor sd, String location) {
+    StringBuilder query = new StringBuilder("create temporary table ")
+      .append(fullName).append("(");
+    List<FieldSchema> cols = t.getSd().getCols();
+    boolean isFirst = true;
+    for (FieldSchema col : cols) {
+      if (!isFirst) {
+        query.append(", ");
+      }
+      isFirst = false;
+      query.append("`").append(col.getName()).append("` ").append(col.getType());
+    }
+    query.append(") ");
+
+    // Bucketing.
+    List<String> buckCols = t.getSd().getBucketCols();
+    if (buckCols.size() > 0) {
+      query.append("CLUSTERED BY (").append(StringUtils.join(",", buckCols)).append(") ");
+      List<Order> sortCols = t.getSd().getSortCols();
+      if (sortCols.size() > 0) {
+        query.append("SORTED BY (");
+        List<String> sortKeys = new ArrayList<String>();
+        isFirst = true;
+        for (Order sortCol : sortCols) {
+          if (!isFirst) {
+            query.append(", ");
+          }
+          isFirst = false;
+          query.append(sortCol.getCol()).append(" ");
+          if (sortCol.getOrder() == BaseSemanticAnalyzer.HIVE_COLUMN_ORDER_ASC) {
+            query.append("ASC");
+          } else if (sortCol.getOrder() == BaseSemanticAnalyzer.HIVE_COLUMN_ORDER_DESC) {
+            query.append("DESC");
+          }
+        }
+        query.append(") ");
+      }
+      query.append("INTO ").append(t.getSd().getNumBuckets()).append(" BUCKETS");
+    }
+
+    // Stored as directories. We don't care about the skew otherwise.
+    if (t.getSd().isStoredAsSubDirectories()) {
+      SkewedInfo skewedInfo = t.getSd().getSkewedInfo();
+      if (skewedInfo != null && !skewedInfo.getSkewedColNames().isEmpty()) {
+        query.append(" SKEWED BY (").append(
+            StringUtils.join(", ", skewedInfo.getSkewedColNames())).append(") ON ");
+        isFirst = true;
+        for (List<String> colValues : skewedInfo.getSkewedColValues()) {
+          if (!isFirst) {
+            query.append(", ");
+          }
+          isFirst = false;
+          query.append("('").append(StringUtils.join("','", colValues)).append("')");
+        }
+        query.append(") STORED AS DIRECTORIES");
+      }
+    }
+
+    SerDeInfo serdeInfo = sd.getSerdeInfo();
+    Map<String, String> serdeParams = serdeInfo.getParameters();
+    query.append(" ROW FORMAT SERDE '").append(HiveStringUtils.escapeHiveCommand(
+        serdeInfo.getSerializationLib())).append("'");
+    String sh = t.getParameters().get(hive_metastoreConstants.META_TABLE_STORAGE);
+    assert sh == null; // Not supposed to be a compactable table.
+    if (!serdeParams.isEmpty()) {
+      DDLTask.appendSerdeParams(query, serdeParams);
+    }
+    query.append("STORED AS INPUTFORMAT '").append(
+        HiveStringUtils.escapeHiveCommand(sd.getInputFormat())).append("' OUTPUTFORMAT '").append(
+        HiveStringUtils.escapeHiveCommand(sd.getOutputFormat())).append("' LOCATION '").append(
+        HiveStringUtils.escapeHiveCommand(location)).append("' TBLPROPERTIES (");
+    // Exclude all standard table properties.
+    Set<String> excludes = getHiveMetastoreConstants();
+    excludes.addAll(Lists.newArrayList(StatsSetupConst.TABLE_PARAMS_STATS_KEYS));
+    isFirst = true;
+    for (Map.Entry<String, String> e : t.getParameters().entrySet()) {
+      if (e.getValue() == null) continue;
+      if (excludes.contains(e.getKey())) continue;
+      if (!isFirst) {
+        query.append(", ");
+      }
+      isFirst = false;
+      query.append("'").append(e.getKey()).append("'='").append(
+          HiveStringUtils.escapeHiveCommand(e.getValue())).append("'");
+    }
+    if (!isFirst) {
+      query.append(", ");
+    }
+    query.append("'transactional'='false')");
+    return query.toString();
+
+  }
+
+  private static Set<String> getHiveMetastoreConstants() {
+    HashSet<String> result = new HashSet<>();
+    for (Field f : hive_metastoreConstants.class.getDeclaredFields()) {
+      if (!Modifier.isStatic(f.getModifiers())) continue;
+      if (!Modifier.isFinal(f.getModifiers())) continue;
+      if (!String.class.equals(f.getType())) continue;
+      f.setAccessible(true);
+      try {
+        result.add((String)f.get(null));
+      } catch (IllegalAccessException e) {
+        throw new RuntimeException(e);
+      }
+    }
+    return result;
+  }
+
+  private void runOnDriver(HiveConf conf, String user,
+      SessionState sessionState, String query, ValidWriteIdList writeIds) throws HiveException {
+    boolean isOk = false;
+    try {
+      QueryState qs = new QueryState.Builder().withHiveConf(conf).nonIsolated().build();
+      Driver driver = new Driver(qs, user, null, null);
+      driver.setCompactionWriteIds(writeIds);
+      try {
+        CommandProcessorResponse cpr = driver.run(query);
+        if (cpr.getResponseCode() != 0) {
+          LOG.error("Failed to run " + query, cpr.getException());
+          throw new HiveException("Failed to run " + query, cpr.getException());
+        }
+      } finally {
+        driver.close();
+        driver.destroy();
+      }
+      isOk = true;
+    } finally {
+      if (!isOk) {
+        try {
+          sessionState.close(); // This also resets SessionState.get.
+        } catch (Throwable th) {
+          LOG.warn("Failed to close a bad session", th);
+          SessionState.detachSession();
+        }
+      }
+    }
+  }
+
+  private String buildMmCompactionQuery(HiveConf conf, Table t, Partition p, String tmpName) {
+    String fullName = t.getDbName() + "." + t.getTableName();
+    // TODO: ideally we should make a special form of insert overwrite so that we:
+    //       1) Could use fast merge path for ORC and RC.
+    //       2) Didn't have to create a table.
+
+    String query = "insert overwrite table " + tmpName + " ";
+    String filter = "";
+    if (p != null) {
+      filter = " where ";
+      List<String> vals = p.getValues();
+      List<FieldSchema> keys = t.getPartitionKeys();
+      assert keys.size() == vals.size();
+      for (int i = 0; i < keys.size(); ++i) {
+        filter += (i == 0 ? "`" : " and `") + (keys.get(i).getName() + "`='" + vals.get(i) + "'");
+      }
+      query += " select ";
+      // Use table descriptor for columns.
+      List<FieldSchema> cols = t.getSd().getCols();
+      for (int i = 0; i < cols.size(); ++i) {
+        query += (i == 0 ? "`" : ", `") + (cols.get(i).getName() + "`");
+      }
+    } else {
+      query += "select *";
+    }
+    query += " from "  + fullName + filter;
+    return query;
+  }
+
   /**
    * @param baseDir if not null, it's either table/partition root folder or base_xxxx.
    *                If it's base_xxxx, it's in dirsToSearch, else the actual original files
@@ -309,6 +585,10 @@ public class CompactorMR {
       dirsToSearch = new StringableList();
     }
     StringableList deltaDirs = new StringableList();
+    // Note: if compaction creates a delta, it won't replace an existing base dir, so the txn ID
+    //       of the base dir won't be a part of delta's range. If otoh compaction creates a base,
+    //       we don't care about this value because bases don't have min txn ID in the name.
+    //       However logically this should also take base into account if it's included.
     long minTxn = Long.MAX_VALUE;
     long maxTxn = Long.MIN_VALUE;
     for (AcidUtils.ParsedDelta delta : parsedDeltas) {
@@ -356,7 +636,7 @@ public class CompactorMR {
    * to use.
    * @param job the job to update
    * @param cols the columns of the table
-   * @param map 
+   * @param map
    */
   private void setColumnTypes(JobConf job, List<FieldSchema> cols) {
     StringBuilder colNames = new StringBuilder();
@@ -378,10 +658,7 @@ public class CompactorMR {
   }
 
   // Remove the directories for aborted transactions only
-  private void removeFiles(HiveConf conf, String location, ValidWriteIdList writeIdList, Table t)
-      throws IOException {
-    AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(location), conf, writeIdList,
-        Ref.from(false), false, t.getParameters());
+  private void removeFilesForMmTable(HiveConf conf, Directory dir) throws IOException {
     // For MM table, we only want to delete delta dirs for aborted txns.
     List<FileStatus> abortedDirs = dir.getAbortedDirectories();
     List<Path> filesToDelete = new ArrayList<>(abortedDirs.size());
@@ -389,11 +666,9 @@ public class CompactorMR {
       filesToDelete.add(stat.getPath());
     }
     if (filesToDelete.size() < 1) {
-      LOG.warn("Hmm, nothing to delete in the worker for directory " + location +
-          ", that hardly seems right.");
       return;
     }
-    LOG.info("About to remove " + filesToDelete.size() + " aborted directories from " + location);
+    LOG.info("About to remove " + filesToDelete.size() + " aborted directories from " + dir);
     FileSystem fs = filesToDelete.get(0).getFileSystem(conf);
     for (Path dead : filesToDelete) {
       LOG.debug("Going to delete path " + dead.toString());
@@ -940,6 +1215,7 @@ public class CompactorMR {
       FileStatus[] contents = fs.listStatus(tmpLocation);//expect 1 base or delta dir in this list
       //we have MIN_TXN, MAX_TXN and IS_MAJOR in JobConf so we could figure out exactly what the dir
       //name is that we want to rename; leave it for another day
+      // TODO: if we expect one dir why don't we enforce it?
       for (FileStatus fileStatus : contents) {
         //newPath is the base/delta dir
         Path newPath = new Path(finalLocation, fileStatus.getPath().getName());
@@ -969,4 +1245,35 @@ public class CompactorMR {
       fs.delete(tmpLocation, true);
     }
   }
+
+  /**
+   * Note: similar logic to the main committer; however, no ORC versions and stuff like that.
+   * @param from The temp directory used for compactor output. Not the actual base/delta.
+   * @param to The final directory; basically a SD directory. Not the actual base/delta.
+   */
+  private void commitMmCompaction(String from, String to, Configuration conf,
+      ValidWriteIdList actualWriteIds) throws IOException {
+    Path fromPath = new Path(from), toPath = new Path(to);
+    FileSystem fs = fromPath.getFileSystem(conf);
+    // Assume the high watermark can be used as maximum transaction ID.
+    long maxTxn = actualWriteIds.getHighWatermark();
+    AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf)
+      .writingBase(true).isCompressed(false).maximumWriteId(maxTxn).bucket(0).statementId(-1);
+    Path newBaseDir = AcidUtils.createFilename(toPath, options).getParent();
+    if (!fs.exists(fromPath)) {
+      LOG.info(from + " not found.  Assuming 0 splits. Creating " + newBaseDir);
+      fs.mkdirs(newBaseDir);
+      AcidUtils.MetaDataFile.createCompactorMarker(toPath, fs);
+      return;
+    }
+    LOG.info("Moving contents of " + from + " to " + to);
+    FileStatus[] children = fs.listStatus(fromPath);
+    if (children.length != 1) {
+      throw new IOException("Unexpected files in the source: " + Arrays.toString(children));
+    }
+    FileStatus dirPath = children[0];
+    fs.rename(dirPath.getPath(), newBaseDir);
+    AcidUtils.MetaDataFile.createCompactorMarker(newBaseDir, fs);
+    fs.delete(fromPath, true);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/ea18769f/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index c95daaf..a61b6e8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -240,11 +240,6 @@ public class Initiator extends CompactorThread {
       return CompactionType.MAJOR;
     }
 
-    // If it is for insert-only transactional table, return null.
-    if (AcidUtils.isInsertOnlyTable(tblproperties)) {
-      return null;
-    }
-
     if (runJobAsSelf(runAs)) {
       return determineCompactionType(ci, writeIds, sd, tblproperties);
     } else {
@@ -333,14 +328,20 @@ public class Initiator extends CompactorThread {
         HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD) :
         Integer.parseInt(deltaNumProp);
     boolean enough = deltas.size() > deltaNumThreshold;
-    if (enough) {
-      LOG.debug("Found " + deltas.size() + " delta files, threshold is " + deltaNumThreshold +
-          (enough ? "" : "not") + " and no base, requesting " + (noBase ? "major" : "minor") +
-          " compaction");
-      // If there's no base file, do a major compaction
-      return noBase ? CompactionType.MAJOR : CompactionType.MINOR;
+    if (!enough) {
+      return null;
+    }
+    if (AcidUtils.isInsertOnlyTable(tblproperties)) {
+      LOG.debug("Requesting a major compaction for a MM table; found " + deltas.size()
+          + " delta files, threshold is " + deltaNumThreshold);
+      return CompactionType.MAJOR;
     }
-    return null;
+    // TODO: this log statement looks wrong
+    LOG.debug("Found " + deltas.size() + " delta files, threshold is " + deltaNumThreshold +
+        (enough ? "" : "not") + " and no base, requesting " + (noBase ? "major" : "minor") +
+        " compaction");
+    // If there's no base file, do a major compaction
+    return noBase ? CompactionType.MAJOR : CompactionType.MINOR;
   }
 
   private long sumDirSize(FileSystem fs, Path dir) throws IOException {

http://git-wip-us.apache.org/repos/asf/hive/blob/ea18769f/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
index fe0aaa4..7461299 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
@@ -84,6 +84,7 @@ public class Worker extends CompactorThread {
       // so wrap it in a big catch Throwable statement.
       try {
         final CompactionInfo ci = txnHandler.findNextToCompact(name);
+        LOG.debug("Processing compaction request " + ci);
 
         if (ci == null && !stop.get()) {
           try {
@@ -170,14 +171,15 @@ public class Worker extends CompactorThread {
         launchedJob = true;
         try {
           if (runJobAsSelf(runAs)) {
-            mr.run(conf, jobName.toString(), t, sd, tblValidWriteIds, ci, su, txnHandler);
+            mr.run(conf, jobName.toString(), t, p, sd, tblValidWriteIds, ci, su, txnHandler);
           } else {
             UserGroupInformation ugi = UserGroupInformation.createProxyUser(t.getOwner(),
               UserGroupInformation.getLoginUser());
+            final Partition fp = p;
             ugi.doAs(new PrivilegedExceptionAction<Object>() {
               @Override
               public Object run() throws Exception {
-                mr.run(conf, jobName.toString(), t, sd, tblValidWriteIds, ci, su, txnHandler);
+                mr.run(conf, jobName.toString(), t, fp, sd, tblValidWriteIds, ci, su, txnHandler);
                 return null;
               }
             });

http://git-wip-us.apache.org/repos/asf/hive/blob/ea18769f/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java
index c053860..f357275 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java
@@ -29,6 +29,7 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.curator.shaded.com.google.common.collect.Lists;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
@@ -200,32 +201,7 @@ public class TestTxnCommandsForMmTable extends TxnCommandsBaseForTests {
       Assert.assertTrue(status[i].getPath().getName().matches("delta_.*"));
     }
 
-    // 2. Perform a major compaction.
-    runStatementOnDriver("alter table "+ TableExtended.MMTBL + " compact 'MAJOR'");
-    runWorker(hiveConf);
-    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (TableExtended.MMTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
-    // There should be 2 delta dirs.
-    Assert.assertEquals(2, status.length);
-    boolean sawBase = false;
-    int deltaCount = 0;
-    for (int i = 0; i < status.length; i++) {
-      String dirName = status[i].getPath().getName();
-      if (dirName.matches("delta_.*")) {
-        deltaCount++;
-      } else {
-        sawBase = true;
-        Assert.assertTrue(dirName.matches("base_.*"));
-      }
-    }
-    Assert.assertEquals(2, deltaCount);
-    Assert.assertFalse(sawBase);
-    // Verify query result
-    int [][] resultData = new int[][] {{1,2},{3,4}};
-    List<String> rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBL + " order by a,b");
-    Assert.assertEquals(stringifyValues(resultData), rs);
-
-    // 3. INSERT OVERWRITE
+    // 2. INSERT OVERWRITE
     // Prepare data for the source table
     runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(5,6),(7,8)");
     // Insert overwrite MM table from source table
@@ -235,40 +211,12 @@ public class TestTxnCommandsForMmTable extends TxnCommandsBaseForTests {
     // There should be 2 delta dirs, plus 1 base dir in the location
     Assert.assertEquals(3, status.length);
     int baseCount = 0;
-    deltaCount = 0;
-    for (int i = 0; i < status.length; i++) {
-      String dirName = status[i].getPath().getName();
-      if (dirName.matches("delta_.*")) {
-        deltaCount++;
-      } else {
-        baseCount++;
-      }
-    }
-    Assert.assertEquals(2, deltaCount);
-    Assert.assertEquals(1, baseCount);
-
-    // Verify query result
-    resultData = new int[][] {{5,6},{7,8}};
-    rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBL + " order by a,b");
-    Assert.assertEquals(stringifyValues(resultData), rs);
-
-    // 4. Perform a minor compaction. Nothing should change.
-    // Both deltas and the base dir should have the same name.
-    // Re-verify directory layout and query result by using the same logic as above
-    runStatementOnDriver("alter table "+ TableExtended.MMTBL + " compact 'MINOR'");
-    runWorker(hiveConf);
-    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (TableExtended.MMTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
-    // There should be 2 delta dirs, plus 1 base dir in the location
-    Assert.assertEquals(3, status.length);
-    baseCount = 0;
-    deltaCount = 0;
+    int deltaCount = 0;
     for (int i = 0; i < status.length; i++) {
       String dirName = status[i].getPath().getName();
       if (dirName.matches("delta_.*")) {
         deltaCount++;
       } else {
-        Assert.assertTrue(dirName.matches("base_.*"));
         baseCount++;
       }
     }
@@ -276,19 +224,8 @@ public class TestTxnCommandsForMmTable extends TxnCommandsBaseForTests {
     Assert.assertEquals(1, baseCount);
 
     // Verify query result
-    rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBL + " order by a,b");
-    Assert.assertEquals(stringifyValues(resultData), rs);
-
-    // 5. Run Cleaner. It should remove the 2 delta dirs.
-    runCleaner(hiveConf);
-    // There should be only 1 directory left: base_xxxxxxx.
-    // The delta dirs should have been cleaned up.
-    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
-        (TableExtended.MMTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
-    Assert.assertEquals(1, status.length);
-    Assert.assertTrue(status[0].getPath().getName().matches("base_.*"));
-    // Verify query result
-    rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBL + " order by a,b");
+    int[][] resultData = new int[][] {{5,6},{7,8}};
+    List<String> rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBL + " order by a,b");
     Assert.assertEquals(stringifyValues(resultData), rs);
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/ea18769f/standalone-metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java b/standalone-metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java
index cb1d40a..2b35e6f 100644
--- a/standalone-metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java
+++ b/standalone-metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java
@@ -6,10 +6,11 @@
  */
 package org.apache.hadoop.hive.metastore.api;
 
+import java.lang.reflect.Modifier;
+import java.lang.reflect.Field;
 import org.apache.thrift.scheme.IScheme;
 import org.apache.thrift.scheme.SchemeFactory;
 import org.apache.thrift.scheme.StandardScheme;
-
 import org.apache.thrift.scheme.TupleScheme;
 import org.apache.thrift.protocol.TTupleProtocol;
 import org.apache.thrift.protocol.TProtocolException;
@@ -34,7 +35,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@org.apache.hadoop.classification.InterfaceAudience.Public @org.apache.hadoop.classification.InterfaceStability.Stable public class hive_metastoreConstants {
+@org.apache.hadoop.classification.InterfaceAudience.Public
+@org.apache.hadoop.classification.InterfaceStability.Stable
+public class hive_metastoreConstants {
 
   public static final String DDL_TIME = "transient_lastDdlTime";
 

http://git-wip-us.apache.org/repos/asf/hive/blob/ea18769f/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
index 5a0cc6a..c766f10 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
@@ -2392,10 +2392,10 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
   }
 
   @Override
-  public ValidTxnWriteIdList getValidWriteIds(Long currentTxnId, List<String> tablesList, String validTxnList)
-          throws TException {
+  public List<TableValidWriteIds> getValidWriteIds(
+      List<String> tablesList, String validTxnList) throws TException {
     GetValidWriteIdsRequest rqst = new GetValidWriteIdsRequest(tablesList, validTxnList);
-    return TxnUtils.createValidTxnWriteIdList(currentTxnId, client.get_valid_write_ids(rqst));
+    return client.get_valid_write_ids(rqst).getTblValidWriteIds();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/ea18769f/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
index aee416d..72b814d 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
@@ -110,6 +110,7 @@ import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
 import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.TableMeta;
+import org.apache.hadoop.hive.metastore.api.TableValidWriteIds;
 import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
 import org.apache.hadoop.hive.metastore.api.TxnOpenException;
 import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
@@ -2749,14 +2750,13 @@ public interface IMetaStoreClient {
 
   /**
    * Get a structure that details valid write ids list for all tables read by current txn.
-   * @param currentTxnId current txn ID for which we try to get valid write ids list
    * @param tablesList list of tables (format: <db_name>.<table_name>) read from the current transaction
    *                   for which needs to populate the valid write ids
    * @param validTxnList snapshot of valid txns for the current txn
    * @return list of valid write ids for the given list of tables.
    * @throws TException
    */
-  ValidTxnWriteIdList getValidWriteIds(Long currentTxnId, List<String> tablesList, String validTxnList)
+  List<TableValidWriteIds> getValidWriteIds(List<String> tablesList, String validTxnList)
           throws TException;
 
   /**

http://git-wip-us.apache.org/repos/asf/hive/blob/ea18769f/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
index 7b02865..1880d44 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
 import org.apache.hadoop.hive.common.ValidReadTxnList;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.metastore.TransactionalValidationListener;
 import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse;
 import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsResponse;
@@ -80,13 +81,13 @@ public class TxnUtils {
    * {@link org.apache.hadoop.hive.common.ValidTxnWriteIdList}.  This assumes that the caller intends to
    * read the files, and thus treats both open and aborted transactions as invalid.
    * @param currentTxnId current txn ID for which we get the valid write ids list
-   * @param validWriteIds valid write ids list from the metastore
+   * @param list valid write ids list from the metastore
    * @return a valid write IDs list for the whole transaction.
    */
   public static ValidTxnWriteIdList createValidTxnWriteIdList(Long currentTxnId,
-                                                              GetValidWriteIdsResponse validWriteIds) {
+                                                              List<TableValidWriteIds> validIds) {
     ValidTxnWriteIdList validTxnWriteIdList = new ValidTxnWriteIdList(currentTxnId);
-    for (TableValidWriteIds tableWriteIds : validWriteIds.getTblValidWriteIds()) {
+    for (TableValidWriteIds tableWriteIds : validIds) {
       validTxnWriteIdList.addTableValidWriteIdList(createValidReaderWriteIdList(tableWriteIds));
     }
     return validTxnWriteIdList;
@@ -155,6 +156,17 @@ public class TxnUtils {
     }
   }
 
+  public static ValidReaderWriteIdList updateForCompactionQuery(ValidReaderWriteIdList ids) {
+    // This is based on the existing valid write ID list that was built for a select query;
+    // therefore we assume all the aborted txns, etc. were already accounted for.
+    // All we do is adjust the high watermark to only include contiguous txns.
+    Long minOpenWriteId = ids.getMinOpenWriteId();
+    if (minOpenWriteId != null && minOpenWriteId != Long.MAX_VALUE) {
+      return ids.updateHighWatermark(ids.getMinOpenWriteId() - 1);
+    }
+    return ids;
+  }
+
   /**
    * Get an instance of the TxnStore that is appropriate for this store
    * @param conf configuration
@@ -212,7 +224,7 @@ public class TxnUtils {
 
 
   /**
-   * Build a query (or queries if one query is too big but only for the case of 'IN' 
+   * Build a query (or queries if one query is too big but only for the case of 'IN'
    * composite clause. For the case of 'NOT IN' clauses, multiple queries change
    * the semantics of the intended query.
    * E.g., Let's assume that input "inList" parameter has [5, 6] and that

http://git-wip-us.apache.org/repos/asf/hive/blob/ea18769f/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java
index bf87cfc..8ae899f 100644
--- a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java
+++ b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java
@@ -2140,10 +2140,10 @@ public class HiveMetaStoreClientPreCatalog implements IMetaStoreClient, AutoClos
   }
 
   @Override
-  public ValidTxnWriteIdList getValidWriteIds(Long currentTxnId, List<String> tablesList, String validTxnList)
+  public List<TableValidWriteIds> getValidWriteIds(List<String> tablesList, String validTxnList)
           throws TException {
     GetValidWriteIdsRequest rqst = new GetValidWriteIdsRequest(tablesList, validTxnList);
-    return TxnUtils.createValidTxnWriteIdList(currentTxnId, client.get_valid_write_ids(rqst));
+    return client.get_valid_write_ids(rqst).getTblValidWriteIds();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/ea18769f/storage-api/src/java/org/apache/hadoop/hive/common/ValidReaderWriteIdList.java
----------------------------------------------------------------------
diff --git a/storage-api/src/java/org/apache/hadoop/hive/common/ValidReaderWriteIdList.java b/storage-api/src/java/org/apache/hadoop/hive/common/ValidReaderWriteIdList.java
index 107ea90..95a0b56 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/common/ValidReaderWriteIdList.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/common/ValidReaderWriteIdList.java
@@ -250,5 +250,9 @@ public class ValidReaderWriteIdList implements ValidWriteIdList {
       return RangeResponse.SOME;
     }
   }
+
+  public ValidReaderWriteIdList updateHighWatermark(long value) {
+    return new ValidReaderWriteIdList(tableName, exceptions, abortedBits, value, minOpenWriteId);
+  }
 }