You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2018/04/26 04:32:13 UTC
hive git commit: HIVE-19233 : Add utility for acid 1.0 to 2.0
migration (Eugene Koifman via Ashutosh Chauhan)
Repository: hive
Updated Branches:
refs/heads/master f30efbebf -> 087ef7b63
HIVE-19233 : Add utility for acid 1.0 to 2.0 migration (Eugene Koifman via Ashutosh Chauhan)
Signed-off-by: Ashutosh Chauhan <ha...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/087ef7b6
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/087ef7b6
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/087ef7b6
Branch: refs/heads/master
Commit: 087ef7b638f8a2d803287c2a11d29df5c3393f80
Parents: f30efbe
Author: Eugene Koifman <ek...@apache.org>
Authored: Wed Apr 25 21:31:39 2018 -0700
Committer: Ashutosh Chauhan <ha...@apache.org>
Committed: Wed Apr 25 21:31:39 2018 -0700
----------------------------------------------------------------------
pom.xml | 2 +-
.../org/apache/hadoop/hive/ql/TestTxnExIm.java | 37 ++
standalone-metastore/pom.xml | 16 +
.../hive/metastore/tools/HiveMetaTool.java | 392 ++++++++++++++++++-
4 files changed, 445 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/087ef7b6/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 21ce5cb..50f416e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -106,7 +106,7 @@
<maven.jar.plugin.version>2.4</maven.jar.plugin.version>
<maven.javadoc.plugin.version>2.4</maven.javadoc.plugin.version>
<maven.shade.plugin.version>3.1.0</maven.shade.plugin.version>
- <maven.surefire.plugin.version>2.20.1</maven.surefire.plugin.version>
+ <maven.surefire.plugin.version>2.21.0</maven.surefire.plugin.version>
<maven.war.plugin.version>2.4</maven.war.plugin.version>
<maven.dependency.plugin.version>2.8</maven.dependency.plugin.version>
<maven.eclipse.plugin.version>2.9</maven.eclipse.plugin.version>
http://git-wip-us.apache.org/repos/asf/hive/blob/087ef7b6/ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java
index 0e53697..6daac1b 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.tools.HiveMetaTool;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
@@ -535,4 +536,40 @@ target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands-1521148657811/
TestTxnCommands2.stringifyValues(data), rs);
}
+ @Test
+ public void testUpgrade() throws Exception {
+ int[][] data = {{1,2}, {3, 4}, {5, 6}};
+ int[][] dataPart = {{1, 2, 10}, {3, 4, 11}, {5, 6, 12}};
+ runStatementOnDriver("drop table if exists TAcid");
+ runStatementOnDriver("drop table if exists TAcidPart");
+ runStatementOnDriver("drop table if exists TFlat");
+ runStatementOnDriver("drop table if exists TFlatText");
+ runStatementOnDriver("create table TAcid (a int, b int) stored as orc");
+ runStatementOnDriver("create table TAcidPart (a int, b int) partitioned by (p int) stored" +
+ " as orc");
+ runStatementOnDriver("create table TFlat (a int, b int) stored as orc tblproperties('transactional'='false')");
+ runStatementOnDriver("create table TFlatText (a int, b int) stored as textfile tblproperties('transactional'='false')");
+
+
+ //this needs major compaction
+ runStatementOnDriver("insert into TAcid" + TestTxnCommands2.makeValuesClause(data));
+ runStatementOnDriver("update TAcid set a = 1 where b = 2");
+
+ //this table needs to be converted to Acid
+ runStatementOnDriver("insert into TFlat" + TestTxnCommands2.makeValuesClause(data));
+
+ //this table needs to be converted to MM
+ runStatementOnDriver("insert into TFlatText" + TestTxnCommands2.makeValuesClause(data));
+
+ //p=10 needs major compaction
+ runStatementOnDriver("insert into TAcidPart" + TestTxnCommands2.makeValuesClause(dataPart));
+ runStatementOnDriver("update TAcidPart set a = 1 where b = 2 and p = 10");
+
+ //todo: add partitioned table that needs conversion to MM/Acid
+
+ //todo: rename files case
+ String[] args = new String[1];
+ args[0] = new String("-prepareAcidUpgrade");
+ HiveMetaTool.main(args);
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/087ef7b6/standalone-metastore/pom.xml
----------------------------------------------------------------------
diff --git a/standalone-metastore/pom.xml b/standalone-metastore/pom.xml
index c340fe2..10b1bfa 100644
--- a/standalone-metastore/pom.xml
+++ b/standalone-metastore/pom.xml
@@ -80,6 +80,7 @@
<libthrift.version>0.9.3</libthrift.version>
<log4j2.version>2.8.2</log4j2.version>
<mockito-all.version>1.10.19</mockito-all.version>
+ <orc.version>1.4.3</orc.version>
<protobuf.version>2.5.0</protobuf.version>
<sqlline.version>1.3.0</sqlline.version>
<storage-api.version>2.6.0-SNAPSHOT</storage-api.version>
@@ -93,6 +94,21 @@
<dependencies>
<dependency>
+ <groupId>org.apache.orc</groupId>
+ <artifactId>orc-core</artifactId>
+ <version>${orc.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-storage-api</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
http://git-wip-us.apache.org/repos/asf/hive/blob/087ef7b6/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/HiveMetaTool.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/HiveMetaTool.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/HiveMetaTool.java
index f4eacd5..02e8c69 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/HiveMetaTool.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/HiveMetaTool.java
@@ -18,8 +18,13 @@
package org.apache.hadoop.hive.metastore.tools;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
import java.net.URI;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -34,7 +39,25 @@ import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.utils.StringUtils;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.impl.AcidStats;
+import org.apache.orc.impl.OrcAcidUtils;
+import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.Path;
@@ -104,6 +127,12 @@ public class HiveMetaTool {
.withDescription("Specify the key for table property to be updated. tablePropKey option " +
"is valid only with updateLocation option.")
.create("tablePropKey");
+ Option prepareAcidUpgrade =
+ OptionBuilder.withArgName("find-compactions")
+ .hasOptionalArg() //directory to output results to
+ .withDescription("Generates a set Compaction commands to run to prepare for Hive 2.x" +
+ " to 3.0 upgrade")
+ .create("prepareAcidUpgrade");
cmdLineOptions.addOption(help);
cmdLineOptions.addOption(listFSRoot);
@@ -112,6 +141,7 @@ public class HiveMetaTool {
cmdLineOptions.addOption(dryRun);
cmdLineOptions.addOption(serdePropKey);
cmdLineOptions.addOption(tablePropKey);
+ cmdLineOptions.addOption(prepareAcidUpgrade);
}
private void initObjectStore(Configuration conf) {
@@ -362,7 +392,358 @@ public class HiveMetaTool {
printSerdePropURIUpdateSummary(updateSerdeURIretVal, serdePropKey, isDryRun);
}
}
+ private void prepareAcidUpgrade(HiveMetaTool metaTool) {
+ try {
+ prepareAcidUpgradeInternal();
+ }
+ catch(TException|IOException ex) {
+ System.err.println(StringUtils.stringifyException(ex));
+ printAndExit(metaTool);
+ }
+ }
+ private static class CompactionMetaInfo {
+ /**
+ * total number of bytes to be compacted across all compaction commands
+ */
+ long numberOfBytes;
+ }
+ /**
+ * todo: make sure compaction queue is configured and has ample capacity
+ * todo: what to do on failure? Suppose some table/part is not readable. should it produce
+ * todo: should probably suppor dryRun mode where we output scripts but instead of renaming files
+ * we generate a renaming script. Alternatively, always generate a renaming script and have
+ * user execute it - this is probably a better option. If script is not empty on rerun someone
+ * added files to table to be made Acid.
+ * commands for all other tables?
+ * todo: how do we test this? even if we had 2.x data it won't be readable in 3.0. even w/o any
+ * updates, txnids in the data won't make sense in 3.0 w/o actually performing equivalent of
+ * metastore upgrade to init writeid table. Also, can we even create a new table and set location
+ * to existing files to simulate a 2.x table?
+ * todo: generate some instructions in compaction script to include tottal compactions to perform,
+ * total data volume to handle and anything else that may help users guess at how long it will
+ * take. Also, highlight tuning options to speed this up.
+ * todo: can we make the script blocking so that it waits for cleaner to delete files?
+ * need to poll SHOW COMPACTIONS and make sure that all partitions are in "finished" state
+ * todo: this should accept a file of table names to exclude from non-acid to acid conversion
+ * todo: change script comments to a preamble instead of a footer
+ *
+ * @throws MetaException
+ * @throws TException
+ */
+ private void prepareAcidUpgradeInternal() throws MetaException, TException, IOException {
+ Configuration conf = MetastoreConf.newMetastoreConf();
+ System.out.println("Looking for Acid tables that need to be compacted");
+ //todo: check if acid is enabled and bail if not
+ //todo: check that running on 2.x?
+ HiveMetaStoreClient hms = new HiveMetaStoreClient(conf);//MetaException
+ List<String> databases = hms.getAllDatabases();//TException
+ System.out.println("Found " + databases.size() + " databases to process");
+ List<String> compactions = new ArrayList<>();
+ List<String> convertToAcid = new ArrayList<>();
+ List<String> convertToMM = new ArrayList<>();
+ final String scriptLocation = ".";//todo: get this from input
+ final CompactionMetaInfo compactionMetaInfo = new CompactionMetaInfo();
+ for(String dbName : databases) {
+ List<String> tables = hms.getAllTables(dbName);
+ System.out.println("found " + tables.size() + " tables in " + dbName);
+ for(String tableName : tables) {
+ Table t = hms.getTable(dbName, tableName);
+
+ //ql depends on metastore and is not accessible here... and if it was, I would not be using
+ //2.6 exec jar, but 3.0.... which is not what we want
+ List<String> compactionCommands = getCompactionCommands(t, conf, hms, compactionMetaInfo);
+ compactions.addAll(compactionCommands);
+ processConversion(t, convertToAcid, convertToMM, hms);
+ /*todo: handle renaming files somewhere
+ * */
+ }
+ }
+ makeCompactionScript(compactions, scriptLocation, compactionMetaInfo);
+ makeConvertTableScript(convertToAcid, convertToMM, scriptLocation);
+ makeRenameFileScript(scriptLocation);
+ }
+ //todo: handle exclusion list
+ private static void processConversion(Table t, List<String> convertToAcid,
+ List<String> convertToMM, HiveMetaStoreClient hms) throws TException {
+ if(isFullAcidTable(t)) {
+ return;
+ }
+ if(!TableType.MANAGED_TABLE.name().equalsIgnoreCase(t.getTableType())) {
+ return;
+ }
+ String fullTableName = Warehouse.getQualifiedName(t);
+ if(t.getPartitionKeysSize() <= 0) {
+ if(canBeMadeAcid(fullTableName, t.getSd())) {
+ convertToAcid.add("ALTER TABLE " + Warehouse.getQualifiedName(t) + " SET TBLPROPERTIES (" +
+ "'transactional'='true')");
+ }
+ else {
+ convertToMM.add("ALTER TABLE " + Warehouse.getQualifiedName(t) + " SET TBLPROPERTIES (" +
+ "'transactional'='true', 'transactional_properties'='insert_only')");
+ }
+ }
+ else {
+ List<String> partNames = hms.listPartitionNames(t.getDbName(), t.getTableName(), (short)-1);
+ int batchSize = 10000;//todo: right size?
+ int numWholeBatches = partNames.size()/batchSize;
+ for(int i = 0; i < numWholeBatches; i++) {
+ List<Partition> partitionList = hms.getPartitionsByNames(t.getDbName(), t.getTableName(),
+ partNames.subList(i * batchSize, (i + 1) * batchSize));
+ for(Partition p : partitionList) {
+ if(!canBeMadeAcid(fullTableName, p.getSd())) {
+ convertToMM.add("ALTER TABLE " + Warehouse.getQualifiedName(t) + " SET TBLPROPERTIES (" +
+ "'transactional'='true', 'transactional_properties'='insert_only')");
+ return;
+ }
+ }
+ }
+ if(numWholeBatches * batchSize < partNames.size()) {
+ //last partial batch
+ List<Partition> partitionList = hms.getPartitionsByNames(t.getDbName(), t.getTableName(),
+ partNames.subList(numWholeBatches * batchSize, partNames.size()));
+ for (Partition p : partitionList) {
+ if (!canBeMadeAcid(fullTableName, p.getSd())) {
+ convertToMM.add("ALTER TABLE " + Warehouse.getQualifiedName(t) + " SET TBLPROPERTIES (" +
+ "'transactional'='true', 'transactional_properties'='insert_only')");
+ return;
+ }
+ }
+ }
+ //if here checked all parts and they are Acid compatible - make it acid
+ convertToAcid.add("ALTER TABLE " + Warehouse.getQualifiedName(t) + " SET TBLPROPERTIES (" +
+ "'transactional'='true')");
+ }
+ }
+ private static boolean canBeMadeAcid(String fullTableName, StorageDescriptor sd) {
+ return isAcidInputOutputFormat(fullTableName, sd) && sd.getSortColsSize() <= 0;
+ }
+ private static boolean isAcidInputOutputFormat(String fullTableName, StorageDescriptor sd) {
+ try {
+ Class inputFormatClass = sd.getInputFormat() == null ? null :
+ Class.forName(sd.getInputFormat());
+ Class outputFormatClass = sd.getOutputFormat() == null ? null :
+ Class.forName(sd.getOutputFormat());
+
+ if (inputFormatClass != null && outputFormatClass != null &&
+ Class.forName("org.apache.hadoop.hive.ql.io.AcidInputFormat")
+ .isAssignableFrom(inputFormatClass) &&
+ Class.forName("org.apache.hadoop.hive.ql.io.AcidOutputFormat")
+ .isAssignableFrom(outputFormatClass)) {
+ return true;
+ }
+ } catch (ClassNotFoundException e) {
+ //if a table is using some custom I/O format and it's not in the classpath, we won't mark
+ //the table for Acid, but today (Hive 3.1 and earlier) OrcInput/OutputFormat is the only
+ //Acid format
+ System.err.println("Could not determine if " + fullTableName +
+ " can be made Acid due to: " + e.getMessage());
+ return false;
+ }
+ return false;
+ }
+ /**
+ * currently writes to current dir (whatever that is).
+ * If there is nothing to compact, outputs empty file so as not to confuse the output with a
+ * failed run.
+ * todo: add some config to tell it where to put the script
+ */
+ private static void makeCompactionScript(List<String> commands, String scriptLocation,
+ CompactionMetaInfo compactionMetaInfo) throws IOException {
+ try(PrintWriter pw = createScript(commands, "compacts_" + System.currentTimeMillis() +
+ ".sql", scriptLocation)) {
+ //add post script
+ pw.println("-- Generated total of " + commands.size() + " compaction commands");
+ if(compactionMetaInfo.numberOfBytes < Math.pow(2, 20)) {
+ //to see it working in UTs
+ pw.println("-- The total volume of data to be compacted is " +
+ String.format("%.6fMB", compactionMetaInfo.numberOfBytes/Math.pow(2, 20)));
+ }
+ else {
+ pw.println("-- The total volume of data to be compacted is " +
+ String.format("%.3fGB", compactionMetaInfo.numberOfBytes/Math.pow(2, 30)));
+ }
+ pw.println();
+ pw.println(
+ "-- Please note that compaction may be a heavyweight and time consuming process.\n" +
+ "-- Submitting all of these commands will enqueue them to a scheduling queue from\n" +
+ "-- which they will be picked up by compactor Workers. The max number of\n" +
+ "-- concurrent Workers is controlled by hive.compactor.worker.threads configured\n" +
+ "-- for the standalone metastore process. Compaction itself is a Map-Reduce job\n" +
+ "-- which is submitted to the YARN queue identified by hive.compactor.job.queue\n" +
+ "-- property if defined or 'default' if not defined. It's advisable to set the\n" +
+ "-- capacity of this queue appropriately");
+ }
+ }
+ private static void makeConvertTableScript(List<String> alterTableAcid, List<String> alterTableMm,
+ String scriptLocation) throws IOException {
+ try(PrintWriter pw = createScript(alterTableAcid, "convertToAcid_" + System.currentTimeMillis() + ".sql",
+ scriptLocation)) {
+ pw.println("-- These commands may be executed by Hive 1.x later");
+ }
+ try(PrintWriter pw = createScript(alterTableMm, "convertToMM_" + System.currentTimeMillis() + ".sql",
+ scriptLocation)) {
+ pw.println("-- These commands must be executed by Hive 3.0 or later");
+ }
+ }
+
+ private static PrintWriter createScript(List<String> commands, String fileName,
+ String scriptLocation) throws IOException {
+ //todo: make sure to create the file in 'scriptLocation' dir
+ FileWriter fw = new FileWriter(scriptLocation + "/" + fileName);
+ PrintWriter pw = new PrintWriter(fw);
+ for(String cmd : commands) {
+ pw.println(cmd + ";");
+ }
+ return pw;
+ }
+ private static void makeRenameFileScript(String scriptLocation) throws IOException {
+ createScript(Collections.emptyList(), "normalizeFileNames_" +
+ System.currentTimeMillis() + ".sh", scriptLocation);
+ }
+ /**
+ * @return any compaction commands to run for {@code Table t}
+ */
+ private static List<String> getCompactionCommands(Table t, Configuration conf,
+ HiveMetaStoreClient hms, CompactionMetaInfo compactionMetaInfo)
+ throws IOException, TException {
+ if(!isFullAcidTable(t)) {
+ return Collections.emptyList();
+ }
+ if(t.getPartitionKeysSize() <= 0) {
+ //not partitioned
+ if(!needsCompaction(new Path(t.getSd().getLocation()), conf, compactionMetaInfo)) {
+ return Collections.emptyList();
+ }
+
+ List<String> cmds = new ArrayList<>();
+ cmds.add(getCompactionCommand(t, null));
+ return cmds;
+ }
+ List<String> partNames = hms.listPartitionNames(t.getDbName(), t.getTableName(), (short)-1);
+ int batchSize = 10000;//todo: right size?
+ int numWholeBatches = partNames.size()/batchSize;
+ List<String> compactionCommands = new ArrayList<>();
+ for(int i = 0; i < numWholeBatches; i++) {
+ List<Partition> partitionList = hms.getPartitionsByNames(t.getDbName(), t.getTableName(), partNames.subList(i * batchSize, (i + 1) * batchSize));
+ for(Partition p : partitionList) {
+ if(needsCompaction(new Path(p.getSd().getLocation()), conf, compactionMetaInfo)) {
+ compactionCommands.add(getCompactionCommand(t, p));
+ }
+ }
+ }
+ if(numWholeBatches * batchSize < partNames.size()) {
+ //last partial batch
+ List<Partition> partitionList = hms.getPartitionsByNames(t.getDbName(), t.getTableName(), partNames.subList(numWholeBatches * batchSize, partNames.size()));
+ for (Partition p : partitionList) {
+ if (needsCompaction(new Path(p.getSd().getLocation()), conf, compactionMetaInfo)) {
+ compactionCommands.add(getCompactionCommand(t, p));
+ }
+ }
+ }
+ return compactionCommands;
+ }
+ /**
+ *
+ * @param location - path to a partition (or table if not partitioned) dir
+ */
+ private static boolean needsCompaction(Path location, Configuration conf,
+ CompactionMetaInfo compactionMetaInfo) throws IOException {
+ FileSystem fs = location.getFileSystem(conf);
+ FileStatus[] deltas = fs.listStatus(location, new PathFilter() {
+ @Override
+ public boolean accept(Path path) {
+ //checking for delete_delta is only so that this functionality can be exercised by code 3.0
+ //which cannot produce any deltas with mix of update/insert events
+ return path.getName().startsWith("delta_") || path.getName().startsWith("delete_delta_");
+ }
+ });
+ if(deltas == null || deltas.length == 0) {
+ //base_n cannot contain update/delete. Original files are all 'insert' and we need to compact
+ //only if there are update/delete events.
+ return false;
+ }
+ deltaLoop: for(FileStatus delta : deltas) {
+ if(!delta.isDirectory()) {
+ //should never happen - just in case
+ continue;
+ }
+ FileStatus[] buckets = fs.listStatus(delta.getPath(), new PathFilter() {
+ @Override
+ public boolean accept(Path path) {
+ //since this is inside a delta dir created by Hive 2.x or earlier it can only contain
+ //bucket_x or bucket_x__flush_length
+ return path.getName().startsWith("bucket_");
+ }
+ });
+ for(FileStatus bucket : buckets) {
+ if(bucket.getPath().getName().endsWith("_flush_length")) {
+ //streaming ingest dir - cannot have update/delete events
+ continue deltaLoop;
+ }
+ if(needsCompaction(bucket, fs)) {
+ //found delete events - this 'location' needs compacting
+ compactionMetaInfo.numberOfBytes += getDataSize(location, conf);
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+ /**
+ * @param location - path to a partition (or table if not partitioned) dir
+ * @throws IOException
+ */
+ private static long getDataSize(Path location, Configuration conf) throws IOException {
+ /*
+ * todo: Figure out the size of the partition. The
+ * best way is to getAcidState() and look at each file - this way it takes care of
+ * original files vs base and any other obsolete files. For now just brute force it,
+ * it's likely close enough for a rough estimate.*/
+ FileSystem fs = location.getFileSystem(conf);
+ ContentSummary cs = fs.getContentSummary(location);
+ return cs.getLength();
+ }
+ private static boolean needsCompaction(FileStatus bucket, FileSystem fs) throws IOException {
+ //create reader, look at footer
+ //no need to check side file since it can only be in a streaming ingest delta
+ Reader orcReader = OrcFile.createReader(bucket.getPath(),OrcFile.readerOptions(fs.getConf())
+ .filesystem(fs));
+ AcidStats as = OrcAcidUtils.parseAcidStats(orcReader);
+ if(as == null) {
+ //should never happen since we are reading bucket_x written by acid write
+ throw new IllegalStateException("AcidStats missing in " + bucket.getPath());
+ }
+ return as.deletes > 0 || as.updates > 0;
+ }
+ private static String getCompactionCommand(Table t, Partition p) {
+ StringBuilder sb = new StringBuilder("ALTER TABLE ").append(Warehouse.getQualifiedName(t));
+ if(t.getPartitionKeysSize() > 0) {
+ assert p != null : "must supply partition for partitioned table " +
+ Warehouse.getQualifiedName(t);
+ sb.append(" PARTITION(");
+ for (int i = 0; i < t.getPartitionKeysSize(); i++) {
+ //todo: should these be quoted? HiveUtils.unparseIdentifier() - if value is String should definitely quote
+ sb.append(t.getPartitionKeys().get(i).getName()).append('=')
+ .append(p.getValues().get(i)).append(",");
+ }
+ sb.setCharAt(sb.length() - 1, ')');//replace trailing ','
+ }
+ return sb.append(" COMPACT 'major'").toString();
+ }
+ private static boolean isFullAcidTable(Table t) {
+ if (t.getParametersSize() <= 0) {
+ //cannot be acid
+ return false;
+ }
+ String transacationalValue = t.getParameters()
+ .get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL);
+ if (transacationalValue != null && "true".equalsIgnoreCase(transacationalValue)) {
+ System.out.println("Found Acid table: " + Warehouse.getQualifiedName(t));
+ return true;
+ }
+ return false;
+ }
private static void printAndExit(HiveMetaTool metaTool) {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("metatool", metaTool.cmdLineOptions);
@@ -460,7 +841,16 @@ public class HiveMetaTool {
} else {
metaTool.updateFSRootLocation(oldURI, newURI, serdepropKey, tablePropKey, isDryRun);
}
- } else {
+ } else if(line.hasOption("prepareAcidUpgrade")) {
+ String[] values = line.getOptionValues("prepareAcidUpgrade");
+ String targetDir = null;
+ if(values != null && values.length > 0) {
+ if(values.length > 1) {
+ System.err.println("HiveMetaTool: prepareAcidUpgrade");
+ }
+ }
+ metaTool.prepareAcidUpgrade(metaTool);
+ } else {
if (line.hasOption("dryRun")) {
System.err.println("HiveMetaTool: dryRun is not a valid standalone option");
} else if (line.hasOption("serdePropKey")) {