You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2018/04/26 04:32:13 UTC

hive git commit: HIVE-19233 : Add utility for acid 1.0 to 2.0 migration (Eugene Koifman via Ashutosh Chauhan)

Repository: hive
Updated Branches:
  refs/heads/master f30efbebf -> 087ef7b63


HIVE-19233 : Add utility for acid 1.0 to 2.0 migration (Eugene Koifman via Ashutosh Chauhan)

Signed-off-by: Ashutosh Chauhan <ha...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/087ef7b6
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/087ef7b6
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/087ef7b6

Branch: refs/heads/master
Commit: 087ef7b638f8a2d803287c2a11d29df5c3393f80
Parents: f30efbe
Author: Eugene Koifman <ek...@apache.org>
Authored: Wed Apr 25 21:31:39 2018 -0700
Committer: Ashutosh Chauhan <ha...@apache.org>
Committed: Wed Apr 25 21:31:39 2018 -0700

----------------------------------------------------------------------
 pom.xml                                         |   2 +-
 .../org/apache/hadoop/hive/ql/TestTxnExIm.java  |  37 ++
 standalone-metastore/pom.xml                    |  16 +
 .../hive/metastore/tools/HiveMetaTool.java      | 392 ++++++++++++++++++-
 4 files changed, 445 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/087ef7b6/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 21ce5cb..50f416e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -106,7 +106,7 @@
     <maven.jar.plugin.version>2.4</maven.jar.plugin.version>
     <maven.javadoc.plugin.version>2.4</maven.javadoc.plugin.version>
     <maven.shade.plugin.version>3.1.0</maven.shade.plugin.version>
-    <maven.surefire.plugin.version>2.20.1</maven.surefire.plugin.version>
+    <maven.surefire.plugin.version>2.21.0</maven.surefire.plugin.version>
     <maven.war.plugin.version>2.4</maven.war.plugin.version>
     <maven.dependency.plugin.version>2.8</maven.dependency.plugin.version>
     <maven.eclipse.plugin.version>2.9</maven.eclipse.plugin.version>

http://git-wip-us.apache.org/repos/asf/hive/blob/087ef7b6/ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java
index 0e53697..6daac1b 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql;
 
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.tools.HiveMetaTool;
 import org.junit.Assert;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -535,4 +536,40 @@ target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands-1521148657811/
         TestTxnCommands2.stringifyValues(data), rs);
 
   }
+  @Test
+  public void testUpgrade() throws Exception {
+    int[][] data = {{1,2}, {3, 4}, {5, 6}};
+    int[][] dataPart = {{1, 2, 10}, {3, 4, 11}, {5, 6, 12}};
+    runStatementOnDriver("drop table if exists TAcid");
+    runStatementOnDriver("drop table if exists TAcidPart");
+    runStatementOnDriver("drop table if exists TFlat");
+    runStatementOnDriver("drop table if exists TFlatText");
+    runStatementOnDriver("create table TAcid (a int, b int) stored as orc");
+    runStatementOnDriver("create table TAcidPart (a int, b int) partitioned by (p int) stored" +
+        " as orc");
+    runStatementOnDriver("create table TFlat (a int, b int) stored as orc tblproperties('transactional'='false')");
+    runStatementOnDriver("create table TFlatText (a int, b int) stored as textfile tblproperties('transactional'='false')");
+
+
+    //this needs major compaction
+    runStatementOnDriver("insert into TAcid" + TestTxnCommands2.makeValuesClause(data));
+    runStatementOnDriver("update TAcid set a = 1 where b = 2");
+
+    //this table needs to be converted to Acid
+    runStatementOnDriver("insert into TFlat" + TestTxnCommands2.makeValuesClause(data));
+
+    //this table needs to be converted to MM
+    runStatementOnDriver("insert into TFlatText" + TestTxnCommands2.makeValuesClause(data));
+
+    //p=10 needs major compaction
+    runStatementOnDriver("insert into TAcidPart" + TestTxnCommands2.makeValuesClause(dataPart));
+    runStatementOnDriver("update TAcidPart set a = 1 where b = 2 and p = 10");
+
+    //todo: add partitioned table that needs conversion to MM/Acid
+
+    //todo: rename files case
+    String[] args = new String[1];
+    args[0] = new String("-prepareAcidUpgrade");
+    HiveMetaTool.main(args);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/087ef7b6/standalone-metastore/pom.xml
----------------------------------------------------------------------
diff --git a/standalone-metastore/pom.xml b/standalone-metastore/pom.xml
index c340fe2..10b1bfa 100644
--- a/standalone-metastore/pom.xml
+++ b/standalone-metastore/pom.xml
@@ -80,6 +80,7 @@
     <libthrift.version>0.9.3</libthrift.version>
     <log4j2.version>2.8.2</log4j2.version>
     <mockito-all.version>1.10.19</mockito-all.version>
+    <orc.version>1.4.3</orc.version>
     <protobuf.version>2.5.0</protobuf.version>
     <sqlline.version>1.3.0</sqlline.version>
     <storage-api.version>2.6.0-SNAPSHOT</storage-api.version>
@@ -93,6 +94,21 @@
 
   <dependencies>
     <dependency>
+      <groupId>org.apache.orc</groupId>
+      <artifactId>orc-core</artifactId>
+      <version>${orc.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-common</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.hive</groupId>
+          <artifactId>hive-storage-api</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-databind</artifactId>
       <version>${jackson.version}</version>

http://git-wip-us.apache.org/repos/asf/hive/blob/087ef7b6/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/HiveMetaTool.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/HiveMetaTool.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/HiveMetaTool.java
index f4eacd5..02e8c69 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/HiveMetaTool.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/HiveMetaTool.java
@@ -18,8 +18,13 @@
 
 package org.apache.hadoop.hive.metastore.tools;
 
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
 import java.net.URI;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -34,7 +39,25 @@ import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.utils.StringUtils;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.impl.AcidStats;
+import org.apache.orc.impl.OrcAcidUtils;
+import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.fs.Path;
@@ -104,6 +127,12 @@ public class HiveMetaTool {
         .withDescription("Specify the key for table property to be updated. tablePropKey option " +
           "is valid only with updateLocation option.")
         .create("tablePropKey");
+    Option prepareAcidUpgrade =
+        OptionBuilder.withArgName("find-compactions")
+            .hasOptionalArg() //directory to output results to
+            .withDescription("Generates a set Compaction commands to run to prepare for Hive 2.x" +
+                " to 3.0 upgrade")
+            .create("prepareAcidUpgrade");
 
     cmdLineOptions.addOption(help);
     cmdLineOptions.addOption(listFSRoot);
@@ -112,6 +141,7 @@ public class HiveMetaTool {
     cmdLineOptions.addOption(dryRun);
     cmdLineOptions.addOption(serdePropKey);
     cmdLineOptions.addOption(tablePropKey);
+    cmdLineOptions.addOption(prepareAcidUpgrade);
   }
 
   private void initObjectStore(Configuration conf) {
@@ -362,7 +392,358 @@ public class HiveMetaTool {
       printSerdePropURIUpdateSummary(updateSerdeURIretVal, serdePropKey, isDryRun);
     }
   }
+  private void prepareAcidUpgrade(HiveMetaTool metaTool) {
+    try {
+      prepareAcidUpgradeInternal();
+    }
+    catch(TException|IOException ex) {
+      System.err.println(StringUtils.stringifyException(ex));
+      printAndExit(metaTool);
+    }
+  }
+  private static class CompactionMetaInfo {
+    /**
+     * total number of bytes to be compacted across all compaction commands
+     */
+    long numberOfBytes;
+  }
+  /**
+   * todo: make sure compaction queue is configured and has ample capacity
+   * todo: what to do on failure?  Suppose some table/part is not readable.  should it produce
+   * todo: should probably suppor dryRun mode where we output scripts but instead of renaming files
+   *  we generate a renaming script.  Alternatively, always generate a renaming script and have
+   *  user execute it - this is probably a better option.  If script is not empty on rerun someone
+   *  added files to table to be made Acid.
+   * commands for all other tables?
+   * todo: how do we test this?  even if we had 2.x data it won't be readable in 3.0.  even w/o any
+   * updates, txnids in the data won't make sense in 3.0 w/o actually performing equivalent of
+   * metastore upgrade to init writeid table.  Also, can we even create a new table and set location
+   * to existing files to simulate a 2.x table?
+   * todo: generate some instructions in compaction script to include tottal compactions to perform,
+   * total data volume to handle and anything else that may help users guess at how long it will
+   * take.  Also, highlight tuning options to speed this up.
+   * todo: can we make the script blocking so that it waits for cleaner to delete files?
+   * need to poll SHOW COMPACTIONS and make sure that all partitions are in "finished" state
+   * todo: this should accept a file of table names to exclude from non-acid to acid conversion
+   * todo: change script comments to a preamble instead of a footer
+   *
+   * @throws MetaException
+   * @throws TException
+   */
+  private void prepareAcidUpgradeInternal() throws MetaException, TException, IOException {
+    Configuration conf = MetastoreConf.newMetastoreConf();
+    System.out.println("Looking for Acid tables that need to be compacted");
+    //todo: check if acid is enabled and bail if not
+    //todo: check that running on 2.x?
+    HiveMetaStoreClient hms = new HiveMetaStoreClient(conf);//MetaException
+    List<String> databases = hms.getAllDatabases();//TException
+    System.out.println("Found " + databases.size() + " databases to process");
+    List<String> compactions = new ArrayList<>();
+    List<String> convertToAcid = new ArrayList<>();
+    List<String> convertToMM = new ArrayList<>();
+    final String scriptLocation = ".";//todo: get this from input
+    final CompactionMetaInfo compactionMetaInfo = new CompactionMetaInfo();
+    for(String dbName : databases) {
+      List<String> tables = hms.getAllTables(dbName);
+      System.out.println("found " + tables.size() + " tables in " + dbName);
+      for(String tableName : tables) {
+        Table t = hms.getTable(dbName, tableName);
+
+        //ql depends on metastore and is not accessible here...  and if it was, I would not be using
+        //2.6 exec jar, but 3.0.... which is not what we want
+        List<String> compactionCommands = getCompactionCommands(t, conf, hms, compactionMetaInfo);
+        compactions.addAll(compactionCommands);
+        processConversion(t, convertToAcid, convertToMM, hms);
+          /*todo: handle renaming files somewhere
+           * */
+      }
+    }
+    makeCompactionScript(compactions, scriptLocation, compactionMetaInfo);
+    makeConvertTableScript(convertToAcid, convertToMM, scriptLocation);
+    makeRenameFileScript(scriptLocation);
+  }
+  //todo: handle exclusion list
+  private static void processConversion(Table t, List<String> convertToAcid,
+      List<String> convertToMM, HiveMetaStoreClient hms) throws TException {
+    if(isFullAcidTable(t)) {
+      return;
+    }
+    if(!TableType.MANAGED_TABLE.name().equalsIgnoreCase(t.getTableType())) {
+      return;
+    }
+    String fullTableName = Warehouse.getQualifiedName(t);
+    if(t.getPartitionKeysSize() <= 0) {
+      if(canBeMadeAcid(fullTableName, t.getSd())) {
+        convertToAcid.add("ALTER TABLE " + Warehouse.getQualifiedName(t) + " SET TBLPROPERTIES (" +
+            "'transactional'='true')");
+      }
+      else {
+        convertToMM.add("ALTER TABLE " + Warehouse.getQualifiedName(t) + " SET TBLPROPERTIES (" +
+            "'transactional'='true', 'transactional_properties'='insert_only')");
+      }
+    }
+    else {
+      List<String> partNames = hms.listPartitionNames(t.getDbName(), t.getTableName(), (short)-1);
+      int batchSize = 10000;//todo: right size?
+      int numWholeBatches = partNames.size()/batchSize;
+      for(int i = 0; i < numWholeBatches; i++) {
+        List<Partition> partitionList = hms.getPartitionsByNames(t.getDbName(), t.getTableName(),
+            partNames.subList(i * batchSize, (i + 1) * batchSize));
+        for(Partition p : partitionList) {
+          if(!canBeMadeAcid(fullTableName, p.getSd())) {
+            convertToMM.add("ALTER TABLE " + Warehouse.getQualifiedName(t) + " SET TBLPROPERTIES (" +
+                "'transactional'='true', 'transactional_properties'='insert_only')");
+            return;
+          }
+        }
+      }
+      if(numWholeBatches * batchSize < partNames.size()) {
+        //last partial batch
+        List<Partition> partitionList = hms.getPartitionsByNames(t.getDbName(), t.getTableName(),
+            partNames.subList(numWholeBatches * batchSize, partNames.size()));
+        for (Partition p : partitionList) {
+          if (!canBeMadeAcid(fullTableName, p.getSd())) {
+            convertToMM.add("ALTER TABLE " + Warehouse.getQualifiedName(t) + " SET TBLPROPERTIES (" +
+                "'transactional'='true', 'transactional_properties'='insert_only')");
+            return;
+          }
+        }
+      }
+      //if here checked all parts and they are Acid compatible - make it acid
+      convertToAcid.add("ALTER TABLE " + Warehouse.getQualifiedName(t) + " SET TBLPROPERTIES (" +
+          "'transactional'='true')");
+    }
+  }
+  private static boolean canBeMadeAcid(String fullTableName, StorageDescriptor sd) {
+    return isAcidInputOutputFormat(fullTableName, sd) && sd.getSortColsSize() <= 0;
+  }
+  private static boolean isAcidInputOutputFormat(String fullTableName, StorageDescriptor sd) {
+    try {
+      Class inputFormatClass = sd.getInputFormat() == null ? null :
+          Class.forName(sd.getInputFormat());
+      Class outputFormatClass = sd.getOutputFormat() == null ? null :
+          Class.forName(sd.getOutputFormat());
+
+      if (inputFormatClass != null && outputFormatClass != null &&
+          Class.forName("org.apache.hadoop.hive.ql.io.AcidInputFormat")
+              .isAssignableFrom(inputFormatClass) &&
+          Class.forName("org.apache.hadoop.hive.ql.io.AcidOutputFormat")
+              .isAssignableFrom(outputFormatClass)) {
+        return true;
+      }
+    } catch (ClassNotFoundException e) {
+      //if a table is using some custom I/O format and it's not in the classpath, we won't mark
+      //the table for Acid, but today (Hive 3.1 and earlier) OrcInput/OutputFormat is the only
+      //Acid format
+      System.err.println("Could not determine if " + fullTableName +
+          " can be made Acid due to: " + e.getMessage());
+      return false;
+    }
+    return false;
+  }
+  /**
+   * currently writes to current dir (whatever that is).
+   * If there is nothing to compact, outputs empty file so as not to confuse the output with a
+   * failed run.
+   * todo: add some config to tell it where to put the script
+   */
+  private static void makeCompactionScript(List<String> commands, String scriptLocation,
+      CompactionMetaInfo compactionMetaInfo) throws IOException {
+    try(PrintWriter pw = createScript(commands, "compacts_" + System.currentTimeMillis() +
+        ".sql", scriptLocation)) {
+      //add post script
+      pw.println("-- Generated total of " + commands.size() + " compaction commands");
+      if(compactionMetaInfo.numberOfBytes < Math.pow(2, 20)) {
+        //to see it working in UTs
+        pw.println("-- The total volume of data to be compacted is " +
+            String.format("%.6fMB", compactionMetaInfo.numberOfBytes/Math.pow(2, 20)));
+      }
+      else {
+        pw.println("-- The total volume of data to be compacted is " +
+            String.format("%.3fGB", compactionMetaInfo.numberOfBytes/Math.pow(2, 30)));
+      }
+      pw.println();
+      pw.println(
+          "-- Please note that compaction may be a heavyweight and time consuming process.\n" +
+              "-- Submitting all of these commands will enqueue them to a scheduling queue from\n" +
+              "-- which they will be picked up by compactor Workers.  The max number of\n" +
+              "-- concurrent Workers is controlled by hive.compactor.worker.threads configured\n" +
+              "-- for the standalone metastore process.  Compaction itself is a Map-Reduce job\n" +
+              "-- which is submitted to the YARN queue identified by hive.compactor.job.queue\n" +
+              "-- property if defined or 'default' if not defined.  It's advisable to set the\n" +
+              "-- capacity of this queue appropriately");
+    }
+  }
+  private static void makeConvertTableScript(List<String> alterTableAcid, List<String> alterTableMm,
+      String scriptLocation) throws IOException {
+    try(PrintWriter pw = createScript(alterTableAcid, "convertToAcid_" + System.currentTimeMillis() + ".sql",
+        scriptLocation)) {
+      pw.println("-- These commands may be executed by Hive 1.x later");
+    }
+    try(PrintWriter pw = createScript(alterTableMm, "convertToMM_" + System.currentTimeMillis() + ".sql",
+        scriptLocation)) {
+      pw.println("-- These commands must be executed by Hive 3.0 or later");
+    }
+  }
+
+  private static PrintWriter createScript(List<String> commands, String fileName,
+      String scriptLocation) throws IOException {
+    //todo: make sure to create the file in 'scriptLocation' dir
+    FileWriter fw = new FileWriter(scriptLocation + "/" + fileName);
+    PrintWriter pw = new PrintWriter(fw);
+    for(String cmd : commands) {
+      pw.println(cmd + ";");
+    }
+    return pw;
+  }
+  private static void makeRenameFileScript(String scriptLocation) throws IOException {
+    createScript(Collections.emptyList(), "normalizeFileNames_" +
+        System.currentTimeMillis() + ".sh", scriptLocation);
+  }
+  /**
+   * @return any compaction commands to run for {@code Table t}
+   */
+  private static List<String> getCompactionCommands(Table t, Configuration conf,
+      HiveMetaStoreClient hms, CompactionMetaInfo compactionMetaInfo)
+      throws IOException, TException {
+    if(!isFullAcidTable(t)) {
+      return Collections.emptyList();
+    }
+    if(t.getPartitionKeysSize() <= 0) {
+      //not partitioned
+      if(!needsCompaction(new Path(t.getSd().getLocation()), conf, compactionMetaInfo)) {
+        return Collections.emptyList();
+      }
+
+      List<String> cmds = new ArrayList<>();
+      cmds.add(getCompactionCommand(t, null));
+      return cmds;
+    }
+    List<String> partNames = hms.listPartitionNames(t.getDbName(), t.getTableName(), (short)-1);
+    int batchSize = 10000;//todo: right size?
+    int numWholeBatches = partNames.size()/batchSize;
+    List<String> compactionCommands = new ArrayList<>();
+    for(int i = 0; i < numWholeBatches; i++) {
+      List<Partition> partitionList = hms.getPartitionsByNames(t.getDbName(), t.getTableName(), partNames.subList(i * batchSize, (i + 1) * batchSize));
+      for(Partition p : partitionList) {
+        if(needsCompaction(new Path(p.getSd().getLocation()), conf, compactionMetaInfo)) {
+          compactionCommands.add(getCompactionCommand(t, p));
+        }
+      }
+    }
+    if(numWholeBatches * batchSize < partNames.size()) {
+      //last partial batch
+      List<Partition> partitionList = hms.getPartitionsByNames(t.getDbName(), t.getTableName(), partNames.subList(numWholeBatches * batchSize, partNames.size()));
+      for (Partition p : partitionList) {
+        if (needsCompaction(new Path(p.getSd().getLocation()), conf, compactionMetaInfo)) {
+          compactionCommands.add(getCompactionCommand(t, p));
+        }
+      }
+    }
+    return compactionCommands;
+  }
+  /**
+   *
+   * @param location - path to a partition (or table if not partitioned) dir
+   */
+  private static boolean needsCompaction(Path location, Configuration conf,
+      CompactionMetaInfo compactionMetaInfo) throws IOException {
+    FileSystem fs = location.getFileSystem(conf);
+    FileStatus[] deltas = fs.listStatus(location, new PathFilter() {
+      @Override
+      public boolean accept(Path path) {
+        //checking for delete_delta is only so that this functionality can be exercised by code 3.0
+        //which cannot produce any deltas with mix of update/insert events
+        return path.getName().startsWith("delta_") || path.getName().startsWith("delete_delta_");
+      }
+    });
+    if(deltas == null || deltas.length == 0) {
+      //base_n cannot contain update/delete.  Original files are all 'insert' and we need to compact
+      //only if there are update/delete events.
+      return false;
+    }
+    deltaLoop: for(FileStatus delta : deltas) {
+      if(!delta.isDirectory()) {
+        //should never happen - just in case
+        continue;
+      }
+      FileStatus[] buckets = fs.listStatus(delta.getPath(), new PathFilter() {
+        @Override
+        public boolean accept(Path path) {
+          //since this is inside a delta dir created by Hive 2.x or earlier it can only contain
+          //bucket_x or bucket_x__flush_length
+          return path.getName().startsWith("bucket_");
+        }
+      });
+      for(FileStatus bucket : buckets) {
+        if(bucket.getPath().getName().endsWith("_flush_length")) {
+          //streaming ingest dir - cannot have update/delete events
+          continue deltaLoop;
+        }
+        if(needsCompaction(bucket, fs)) {
+          //found delete events - this 'location' needs compacting
+          compactionMetaInfo.numberOfBytes += getDataSize(location, conf);
+          return true;
+        }
+      }
+    }
+    return false;
+  }
 
+  /**
+   * @param location - path to a partition (or table if not partitioned) dir
+   * @throws IOException
+   */
+  private static long getDataSize(Path location, Configuration conf) throws IOException {
+    /*
+     * todo: Figure out the size of the partition.  The
+     * best way is to getAcidState() and look at each file - this way it takes care of
+     * original files vs base and any other obsolete files.  For now just brute force it,
+      * it's likely close enough for a rough estimate.*/
+    FileSystem fs = location.getFileSystem(conf);
+    ContentSummary cs = fs.getContentSummary(location);
+    return cs.getLength();
+  }
+  private static boolean needsCompaction(FileStatus bucket, FileSystem fs) throws IOException {
+    //create reader, look at footer
+    //no need to check side file since it can only be in a streaming ingest delta
+    Reader orcReader = OrcFile.createReader(bucket.getPath(),OrcFile.readerOptions(fs.getConf())
+            .filesystem(fs));
+    AcidStats as = OrcAcidUtils.parseAcidStats(orcReader);
+    if(as == null) {
+      //should never happen since we are reading bucket_x written by acid write
+      throw new IllegalStateException("AcidStats missing in " + bucket.getPath());
+    }
+    return as.deletes > 0 || as.updates > 0;
+  }
+  private static String getCompactionCommand(Table t, Partition p) {
+    StringBuilder sb = new StringBuilder("ALTER TABLE ").append(Warehouse.getQualifiedName(t));
+    if(t.getPartitionKeysSize() > 0) {
+      assert p != null : "must supply partition for partitioned table " +
+          Warehouse.getQualifiedName(t);
+      sb.append(" PARTITION(");
+      for (int i = 0; i < t.getPartitionKeysSize(); i++) {
+        //todo: should these be quoted?  HiveUtils.unparseIdentifier() - if value is String should definitely quote
+        sb.append(t.getPartitionKeys().get(i).getName()).append('=')
+            .append(p.getValues().get(i)).append(",");
+      }
+      sb.setCharAt(sb.length() - 1, ')');//replace trailing ','
+    }
+    return sb.append(" COMPACT 'major'").toString();
+  }
+  private static boolean isFullAcidTable(Table t) {
+    if (t.getParametersSize() <= 0) {
+      //cannot be acid
+      return false;
+    }
+    String transacationalValue = t.getParameters()
+        .get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL);
+    if (transacationalValue != null && "true".equalsIgnoreCase(transacationalValue)) {
+      System.out.println("Found Acid table: " + Warehouse.getQualifiedName(t));
+      return true;
+    }
+    return false;
+  }
   private static void printAndExit(HiveMetaTool metaTool) {
     HelpFormatter formatter = new HelpFormatter();
     formatter.printHelp("metatool", metaTool.cmdLineOptions);
@@ -460,7 +841,16 @@ public class HiveMetaTool {
           } else {
             metaTool.updateFSRootLocation(oldURI, newURI, serdepropKey, tablePropKey, isDryRun);
           }
-        } else {
+      } else if(line.hasOption("prepareAcidUpgrade")) {
+        String[] values = line.getOptionValues("prepareAcidUpgrade");
+        String targetDir = null;
+        if(values != null && values.length > 0) {
+          if(values.length > 1) {
+            System.err.println("HiveMetaTool: prepareAcidUpgrade");
+          }
+        }
+        metaTool.prepareAcidUpgrade(metaTool);
+      } else {
           if (line.hasOption("dryRun")) {
             System.err.println("HiveMetaTool: dryRun is not a valid standalone option");
           } else if (line.hasOption("serdePropKey")) {