You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2018/06/01 20:26:22 UTC
[1/2] hive git commit: HIVE-19598 : Add Acid V1 to V2 upgrade module
(Eugene Koifman via Ashutosh Chauhan)
Repository: hive
Updated Branches:
refs/heads/branch-3 6f5d4dd87 -> b9e683714
http://git-wip-us.apache.org/repos/asf/hive/blob/b9e68371/upgrade-acid/src/test/java/org/apache/hadoop/hive/upgrade/acid/TestUpgradeTool.java
----------------------------------------------------------------------
diff --git a/upgrade-acid/src/test/java/org/apache/hadoop/hive/upgrade/acid/TestUpgradeTool.java b/upgrade-acid/src/test/java/org/apache/hadoop/hive/upgrade/acid/TestUpgradeTool.java
new file mode 100644
index 0000000..c8964a4
--- /dev/null
+++ b/upgrade-acid/src/test/java/org/apache/hadoop/hive/upgrade/acid/TestUpgradeTool.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.upgrade.acid;
+
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
+import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.QueryState;
+import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.HiveInputFormat;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.txn.compactor.Worker;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class TestUpgradeTool {
+ private static final Logger LOG = LoggerFactory.getLogger(TestUpgradeTool.class);
+ private static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") +
+ File.separator + TestUpgradeTool.class.getCanonicalName() + "-" + System.currentTimeMillis()
+ ).getPath().replaceAll("\\\\", "/");
+
+ private String getTestDataDir() {
+ return TEST_DATA_DIR;
+ }
+
+ /**
+ * preUpgrade: test tables that need to be compacted, waits for compaction
+ * postUpgrade: generates scripts w/o asserts
+ */
+ @Test
+ public void testUpgrade() throws Exception {
+ int[][] data = {{1,2}, {3, 4}, {5, 6}};
+ int[][] dataPart = {{1, 2, 10}, {3, 4, 11}, {5, 6, 12}};
+ runStatementOnDriver("drop table if exists TAcid");
+ runStatementOnDriver("drop table if exists TAcidPart");
+ runStatementOnDriver("drop table if exists TFlat");
+ runStatementOnDriver("drop table if exists TFlatText");
+
+ runStatementOnDriver("create table TAcid (a int, b int) clustered by (b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+ runStatementOnDriver("create table TAcidPart (a int, b int) partitioned by (p tinyint) clustered by (b) into 2 buckets stored" +
+ " as orc TBLPROPERTIES ('transactional'='true')");
+ runStatementOnDriver("create table TFlat (a int, b int) stored as orc tblproperties('transactional'='false')");
+ runStatementOnDriver("create table TFlatText (a int, b int) stored as textfile tblproperties('transactional'='false')");
+
+
+ //this needs major compaction
+ runStatementOnDriver("insert into TAcid" + makeValuesClause(data));
+ runStatementOnDriver("update TAcid set a = 1 where b = 2");
+
+ //this table needs to be converted to CRUD Acid
+ runStatementOnDriver("insert into TFlat" + makeValuesClause(data));
+
+ //this table needs to be converted to MM
+ runStatementOnDriver("insert into TFlatText" + makeValuesClause(data));
+
+ //p=10 needs major compaction
+ runStatementOnDriver("insert into TAcidPart partition(p)" + makeValuesClause(dataPart));
+ runStatementOnDriver("update TAcidPart set a = 1 where b = 2 and p = 10");
+
+ //todo: add partitioned table that needs conversion to MM/Acid
+
+ //todo: rename files case
+ String[] args = {"-location", getTestDataDir(), "-preUpgrade", "-execute"};
+ UpgradeTool.callback = new UpgradeTool.Callback() {
+ @Override
+ void onWaitForCompaction() throws MetaException {
+ runWorker(hiveConf);
+ }
+ };
+ UpgradeTool.pollIntervalMs = 1;
+ UpgradeTool.hiveConf = hiveConf;
+ UpgradeTool.main(args);
+ /*
+ todo: parse
+ target/tmp/org.apache.hadoop.hive.upgrade.acid.TestUpgradeTool-1527286256834/compacts_1527286277624.sql
+ make sure it's the only 'compacts' file and contains
+ ALTER TABLE default.tacid COMPACT 'major';
+ALTER TABLE default.tacidpart PARTITION(p=10Y) COMPACT 'major';
+ * */
+
+ TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
+
+ ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest());
+ Assert.assertEquals(2, resp.getCompactsSize());
+ for(ShowCompactResponseElement e : resp.getCompacts()) {
+ Assert.assertEquals(e.toString(), TxnStore.CLEANING_RESPONSE, e.getState());
+ }
+
+ String[] args2 = {"-location", getTestDataDir(), "-postUpgrade"};
+ UpgradeTool.main(args2);
+ /*
+ * todo: parse
+ * convertToAcid_1527286288784.sql make sure it has
+ * ALTER TABLE default.tflat SET TBLPROPERTIES ('transactional'='true');
+ * convertToMM_1527286288784.sql make sure it has
+ * ALTER TABLE default.tflattext SET TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only');
+ * */
+ }
+
+ /**
+ * includes 'execute' for postUpgrade
+ * @throws Exception
+ */
+ @Test
+ public void testPostUpgrade() throws Exception {
+ int[][] dataPart = {{1, 2, 10}, {3, 4, 11}, {5, 6, 12}};
+ hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "dynamic");
+ runStatementOnDriver("drop table if exists TAcid");
+ runStatementOnDriver("drop table if exists TAcidPart");
+ runStatementOnDriver("drop table if exists TFlat");
+ runStatementOnDriver("drop table if exists TFlatText");
+
+ runStatementOnDriver("create table TAcid (a int, b int) clustered by (b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='false')");
+ runStatementOnDriver("create table TAcidPart (a int, b int) partitioned by (p int) clustered by (b) into 2 buckets stored" +
+ " as orc TBLPROPERTIES ('transactional'='false')");
+ //to create some partitions
+ runStatementOnDriver("insert into TAcidPart partition(p)" + makeValuesClause(dataPart));
+
+
+ //todo: to test these need to link against 3.x libs - maven profiles?
+ //runStatementOnDriver("create table TFlat (a int, b int) stored as orc tblproperties('transactional'='false')");
+ //runStatementOnDriver("create table TFlatText (a int, b int) stored as textfile tblproperties('transactional'='false')");
+
+ Hive db = Hive.get(hiveConf);
+ Table tacid = db.getTable("default", "tacid");
+ Assert.assertEquals("Expected TAcid to become full acid", false, AcidUtils.isAcidTable(tacid));
+ Table tacidpart = db.getTable("default", "tacidpart");
+ Assert.assertEquals("Expected TAcidPart to become full acid", false,
+ AcidUtils.isAcidTable(tacidpart));
+
+
+ String[] args2 = {"-location", getTestDataDir(), "-postUpgrade", "-execute"};
+ UpgradeTool.isTestMode = true;
+ UpgradeTool.hiveConf = hiveConf;
+ UpgradeTool.main(args2);
+
+ tacid = db.getTable("default", "tacid");
+ Assert.assertEquals("Expected TAcid to become full acid", true, AcidUtils.isAcidTable(tacid));
+ tacidpart = db.getTable("default", "tacidpart");
+ Assert.assertEquals("Expected TAcidPart to become full acid", true,
+ AcidUtils.isAcidTable(tacidpart));
+
+ /**
+ todo: parse
+ target/tmp/org.apache.hadoop.hive.upgrade.acid.TestUpgradeTool-1527286026461/convertToAcid_1527286063065.sql
+ make sure it has:
+ ALTER TABLE default.tacid SET TBLPROPERTIES ('transactional'='true');
+ ALTER TABLE default.tacidpart SET TBLPROPERTIES ('transactional'='true');
+ */
+ }
+ private static void runWorker(HiveConf hiveConf) throws MetaException {
+ AtomicBoolean stop = new AtomicBoolean(true);
+ Worker t = new Worker();
+ t.setThreadId((int) t.getId());
+ t.setHiveConf(hiveConf);
+ AtomicBoolean looped = new AtomicBoolean();
+ t.init(stop, looped);
+ t.run();
+ }
+
+ private static String makeValuesClause(int[][] rows) {
+ assert rows.length > 0;
+ StringBuilder sb = new StringBuilder(" values");
+ for(int[] row : rows) {
+ assert row.length > 0;
+ if(row.length > 1) {
+ sb.append("(");
+ }
+ for(int value : row) {
+ sb.append(value).append(",");
+ }
+ sb.setLength(sb.length() - 1);//remove trailing comma
+ if(row.length > 1) {
+ sb.append(")");
+ }
+ sb.append(",");
+ }
+ sb.setLength(sb.length() - 1);//remove trailing comma
+ return sb.toString();
+ }
+
+ private List<String> runStatementOnDriver(String stmt) throws Exception {
+ CommandProcessorResponse cpr = d.run(stmt);
+ if(cpr.getResponseCode() != 0) {
+ throw new RuntimeException(stmt + " failed: " + cpr);
+ }
+ List<String> rs = new ArrayList<String>();
+ d.getResults(rs);
+ return rs;
+ }
+ @Before
+ public void setUp() throws Exception {
+ setUpInternal();
+ }
+ private void initHiveConf() {
+ hiveConf = new HiveConf(this.getClass());
+ }
+ @Rule
+ public TestName testName = new TestName();
+ private HiveConf hiveConf;
+ private Driver d;
+ private void setUpInternal() throws Exception {
+ initHiveConf();
+ TxnDbUtil.cleanDb();//todo: api changed in 3.0
+ FileUtils.deleteDirectory(new File(getTestDataDir()));
+
+ Path workDir = new Path(System.getProperty("test.tmp.dir",
+ "target" + File.separator + "test" + File.separator + "tmp"));
+ hiveConf.set("mapred.local.dir", workDir + File.separator + this.getClass().getSimpleName()
+ + File.separator + "mapred" + File.separator + "local");
+ hiveConf.set("mapred.system.dir", workDir + File.separator + this.getClass().getSimpleName()
+ + File.separator + "mapred" + File.separator + "system");
+ hiveConf.set("mapreduce.jobtracker.staging.root.dir", workDir + File.separator + this.getClass().getSimpleName()
+ + File.separator + "mapred" + File.separator + "staging");
+ hiveConf.set("mapred.temp.dir", workDir + File.separator + this.getClass().getSimpleName()
+ + File.separator + "mapred" + File.separator + "temp");
+ hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+ hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+ hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, getWarehouseDir());
+ hiveConf.setVar(HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName());
+ hiveConf
+ .setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
+ "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
+ hiveConf.setBoolVar(HiveConf.ConfVars.MERGE_CARDINALITY_VIOLATION_CHECK, true);
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSCOLAUTOGATHER, false);
+ TxnDbUtil.setConfValues(hiveConf);
+ TxnDbUtil.prepDb();//todo: api changed in 3.0
+ File f = new File(getWarehouseDir());
+ if (f.exists()) {
+ FileUtil.fullyDelete(f);
+ }
+ if (!(new File(getWarehouseDir()).mkdirs())) {
+ throw new RuntimeException("Could not create " + getWarehouseDir());
+ }
+ SessionState ss = SessionState.start(hiveConf);
+ ss.applyAuthorizationPolicy();
+ d = new Driver(new QueryState(hiveConf), null);
+ d.setMaxRows(10000);
+ }
+ private String getWarehouseDir() {
+ return getTestDataDir() + "/warehouse";
+ }
+ @After
+ public void tearDown() throws Exception {
+ if (d != null) {
+ d.close();
+ d.destroy();
+ d = null;
+ }
+ }
+
+}
[2/2] hive git commit: HIVE-19598 : Add Acid V1 to V2 upgrade module
(Eugene Koifman via Ashutosh Chauhan)
Posted by ha...@apache.org.
HIVE-19598 : Add Acid V1 to V2 upgrade module (Eugene Koifman via Ashutosh Chauhan)
Signed-off-by: Ashutosh Chauhan <ha...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b9e68371
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b9e68371
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b9e68371
Branch: refs/heads/branch-3
Commit: b9e6837144b59e3e1eb431f818644f66c03b2ab1
Parents: 6f5d4dd
Author: Eugene Koifman <ek...@apache.org>
Authored: Fri Jun 1 11:34:46 2018 -0700
Committer: Ashutosh Chauhan <ha...@apache.org>
Committed: Fri Jun 1 11:34:46 2018 -0700
----------------------------------------------------------------------
packaging/pom.xml | 7 +-
packaging/src/main/assembly/src.xml | 1 +
pom.xml | 6 +
.../org/apache/hadoop/hive/ql/TestTxnExIm.java | 38 -
.../hive/metastore/tools/HiveMetaTool.java | 409 ----------
upgrade-acid/pom.xml | 296 +++++++
.../hadoop/hive/upgrade/acid/UpgradeTool.java | 808 +++++++++++++++++++
.../hive/upgrade/acid/TestUpgradeTool.java | 291 +++++++
8 files changed, 1408 insertions(+), 448 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/b9e68371/packaging/pom.xml
----------------------------------------------------------------------
diff --git a/packaging/pom.xml b/packaging/pom.xml
index fe1aac8..e3389fc 100644
--- a/packaging/pom.xml
+++ b/packaging/pom.xml
@@ -136,7 +136,7 @@
<dependencies>
- <!-- dependencies are always listed in sorted order by groupId, artifectId -->
+ <!-- dependencies are always listed in sorted order by groupId, artifactId -->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-common</artifactId>
@@ -283,6 +283,11 @@
<version>${project.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-upgrade-acid</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.hive.hcatalog</groupId>
<artifactId>hive-webhcat</artifactId>
<version>${project.version}</version>
http://git-wip-us.apache.org/repos/asf/hive/blob/b9e68371/packaging/src/main/assembly/src.xml
----------------------------------------------------------------------
diff --git a/packaging/src/main/assembly/src.xml b/packaging/src/main/assembly/src.xml
index c477194..c9aed3b 100644
--- a/packaging/src/main/assembly/src.xml
+++ b/packaging/src/main/assembly/src.xml
@@ -99,6 +99,7 @@
<include>standalone-metastore/**/*</include>
<include>streaming/**/*</include>
<include>testutils/**/*</include>
+ <include>upgrade-acid/**/*</include>
<include>vector-code-gen/**/*</include>
<include>kryo-registrator/**/*</include>
</includes>
http://git-wip-us.apache.org/repos/asf/hive/blob/b9e68371/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 3542adc..5b584d7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -62,6 +62,7 @@
<module>testutils</module>
<module>packaging</module>
<module>standalone-metastore</module>
+ <module>upgrade-acid</module>
</modules>
<properties>
@@ -998,6 +999,11 @@
<version>${slf4j.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-upgrade-acid</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/hive/blob/b9e68371/ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java
index 13d1a5d..91f2d13 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java
@@ -536,42 +536,4 @@ target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands-1521148657811/
TestTxnCommands2.stringifyValues(data), rs);
}
-
- @Ignore("HIVE-19509: Disable tests that are failing continuously")
- @Test
- public void testUpgrade() throws Exception {
- int[][] data = {{1,2}, {3, 4}, {5, 6}};
- int[][] dataPart = {{1, 2, 10}, {3, 4, 11}, {5, 6, 12}};
- runStatementOnDriver("drop table if exists TAcid");
- runStatementOnDriver("drop table if exists TAcidPart");
- runStatementOnDriver("drop table if exists TFlat");
- runStatementOnDriver("drop table if exists TFlatText");
- runStatementOnDriver("create table TAcid (a int, b int) stored as orc");
- runStatementOnDriver("create table TAcidPart (a int, b int) partitioned by (p int) stored" +
- " as orc");
- runStatementOnDriver("create table TFlat (a int, b int) stored as orc tblproperties('transactional'='false')");
- runStatementOnDriver("create table TFlatText (a int, b int) stored as textfile tblproperties('transactional'='false')");
-
-
- //this needs major compaction
- runStatementOnDriver("insert into TAcid" + TestTxnCommands2.makeValuesClause(data));
- runStatementOnDriver("update TAcid set a = 1 where b = 2");
-
- //this table needs to be converted to Acid
- runStatementOnDriver("insert into TFlat" + TestTxnCommands2.makeValuesClause(data));
-
- //this table needs to be converted to MM
- runStatementOnDriver("insert into TFlatText" + TestTxnCommands2.makeValuesClause(data));
-
- //p=10 needs major compaction
- runStatementOnDriver("insert into TAcidPart" + TestTxnCommands2.makeValuesClause(dataPart));
- runStatementOnDriver("update TAcidPart set a = 1 where b = 2 and p = 10");
-
- //todo: add partitioned table that needs conversion to MM/Acid
-
- //todo: rename files case
- String[] args = new String[1];
- args[0] = new String("-prepareAcidUpgrade");
- HiveMetaTool.main(args);
- }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b9e68371/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/HiveMetaTool.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/HiveMetaTool.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/HiveMetaTool.java
index 875eba3..a50c0a3 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/HiveMetaTool.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/HiveMetaTool.java
@@ -18,13 +18,8 @@
package org.apache.hadoop.hive.metastore.tools;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.PrintWriter;
import java.net.URI;
-import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -39,25 +34,7 @@ import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.ContentSummary;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.TableType;
-import org.apache.hadoop.hive.metastore.Warehouse;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import org.apache.hadoop.hive.metastore.utils.StringUtils;
-import org.apache.orc.OrcFile;
-import org.apache.orc.Reader;
-import org.apache.orc.impl.AcidStats;
-import org.apache.orc.impl.OrcAcidUtils;
-import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.Path;
@@ -392,380 +369,6 @@ public class HiveMetaTool {
printSerdePropURIUpdateSummary(updateSerdeURIretVal, serdePropKey, isDryRun);
}
}
- private void prepareAcidUpgrade(String scriptLocation) {
- try {
- prepareAcidUpgradeInternal(scriptLocation);
- }
- catch(TException|IOException ex) {
- System.err.println(StringUtils.stringifyException(ex));
- printAndExit(this);
- }
- }
- private static class CompactionMetaInfo {
- /**
- * total number of bytes to be compacted across all compaction commands
- */
- long numberOfBytes;
- }
- /**
- * todo: make sure compaction queue is configured and has ample capacity
- * todo: what to do on failure? Suppose some table/part is not readable. should it produce
- * todo: should probably suppor dryRun mode where we output scripts but instead of renaming files
- * we generate a renaming script. Alternatively, always generate a renaming script and have
- * user execute it - this is probably a better option. If script is not empty on rerun someone
- * added files to table to be made Acid.
- * commands for all other tables?
- * todo: how do we test this? even if we had 2.x data it won't be readable in 3.0. even w/o any
- * updates, txnids in the data won't make sense in 3.0 w/o actually performing equivalent of
- * metastore upgrade to init writeid table. Also, can we even create a new table and set location
- * to existing files to simulate a 2.x table?
- * todo: generate some instructions in compaction script to include tottal compactions to perform,
- * total data volume to handle and anything else that may help users guess at how long it will
- * take. Also, highlight tuning options to speed this up.
- * todo: can we make the script blocking so that it waits for cleaner to delete files?
- * need to poll SHOW COMPACTIONS and make sure that all partitions are in "finished" state
- * todo: this should accept a file of table names to exclude from non-acid to acid conversion
- * todo: change script comments to a preamble instead of a footer
- *
- * @throws MetaException
- * @throws TException
- */
- private void prepareAcidUpgradeInternal(String scriptLocation) throws MetaException, TException, IOException {
- Configuration conf = MetastoreConf.newMetastoreConf();
- System.out.println("Looking for Acid tables that need to be compacted");
- //todo: check if acid is enabled and bail if not
- //todo: check that running on 2.x?
- HiveMetaStoreClient hms = new HiveMetaStoreClient(conf);//MetaException
- List<String> databases = hms.getAllDatabases();//TException
- System.out.println("Found " + databases.size() + " databases to process");
- List<String> compactions = new ArrayList<>();
- List<String> convertToAcid = new ArrayList<>();
- List<String> convertToMM = new ArrayList<>();
- final CompactionMetaInfo compactionMetaInfo = new CompactionMetaInfo();
- for(String dbName : databases) {
- List<String> tables = hms.getAllTables(dbName);
- System.out.println("found " + tables.size() + " tables in " + dbName);
- for(String tableName : tables) {
- Table t = hms.getTable(dbName, tableName);
-
- //ql depends on metastore and is not accessible here... and if it was, I would not be using
- //2.6 exec jar, but 3.0.... which is not what we want
- List<String> compactionCommands = getCompactionCommands(t, conf, hms, compactionMetaInfo);
- compactions.addAll(compactionCommands);
- processConversion(t, convertToAcid, convertToMM, hms);
- /*todo: handle renaming files somewhere
- * */
- }
- }
- makeCompactionScript(compactions, scriptLocation, compactionMetaInfo);
- makeConvertTableScript(convertToAcid, convertToMM, scriptLocation);
- makeRenameFileScript(scriptLocation);
- }
- //todo: handle exclusion list
- private static void processConversion(Table t, List<String> convertToAcid,
- List<String> convertToMM, HiveMetaStoreClient hms) throws TException {
- if(isFullAcidTable(t)) {
- return;
- }
- if(!TableType.MANAGED_TABLE.name().equalsIgnoreCase(t.getTableType())) {
- return;
- }
- String fullTableName = Warehouse.getQualifiedName(t);
- if(t.getPartitionKeysSize() <= 0) {
- if(canBeMadeAcid(fullTableName, t.getSd())) {
- convertToAcid.add("ALTER TABLE " + Warehouse.getQualifiedName(t) + " SET TBLPROPERTIES (" +
- "'transactional'='true')");
- }
- else {
- convertToMM.add("ALTER TABLE " + Warehouse.getQualifiedName(t) + " SET TBLPROPERTIES (" +
- "'transactional'='true', 'transactional_properties'='insert_only')");
- }
- }
- else {
- List<String> partNames = hms.listPartitionNames(t.getDbName(), t.getTableName(), (short)-1);
- int batchSize = 10000;//todo: right size?
- int numWholeBatches = partNames.size()/batchSize;
- for(int i = 0; i < numWholeBatches; i++) {
- List<Partition> partitionList = hms.getPartitionsByNames(t.getDbName(), t.getTableName(),
- partNames.subList(i * batchSize, (i + 1) * batchSize));
- for(Partition p : partitionList) {
- if(!canBeMadeAcid(fullTableName, p.getSd())) {
- convertToMM.add("ALTER TABLE " + Warehouse.getQualifiedName(t) + " SET TBLPROPERTIES (" +
- "'transactional'='true', 'transactional_properties'='insert_only')");
- return;
- }
- }
- }
- if(numWholeBatches * batchSize < partNames.size()) {
- //last partial batch
- List<Partition> partitionList = hms.getPartitionsByNames(t.getDbName(), t.getTableName(),
- partNames.subList(numWholeBatches * batchSize, partNames.size()));
- for (Partition p : partitionList) {
- if (!canBeMadeAcid(fullTableName, p.getSd())) {
- convertToMM.add("ALTER TABLE " + Warehouse.getQualifiedName(t) + " SET TBLPROPERTIES (" +
- "'transactional'='true', 'transactional_properties'='insert_only')");
- return;
- }
- }
- }
- //if here checked all parts and they are Acid compatible - make it acid
- convertToAcid.add("ALTER TABLE " + Warehouse.getQualifiedName(t) + " SET TBLPROPERTIES (" +
- "'transactional'='true')");
- }
- }
- private static boolean canBeMadeAcid(String fullTableName, StorageDescriptor sd) {
- return isAcidInputOutputFormat(fullTableName, sd) && sd.getSortColsSize() <= 0;
- }
- private static boolean isAcidInputOutputFormat(String fullTableName, StorageDescriptor sd) {
- try {
- Class inputFormatClass = sd.getInputFormat() == null ? null :
- Class.forName(sd.getInputFormat());
- Class outputFormatClass = sd.getOutputFormat() == null ? null :
- Class.forName(sd.getOutputFormat());
-
- if (inputFormatClass != null && outputFormatClass != null &&
- Class.forName("org.apache.hadoop.hive.ql.io.AcidInputFormat")
- .isAssignableFrom(inputFormatClass) &&
- Class.forName("org.apache.hadoop.hive.ql.io.AcidOutputFormat")
- .isAssignableFrom(outputFormatClass)) {
- return true;
- }
- } catch (ClassNotFoundException e) {
- //if a table is using some custom I/O format and it's not in the classpath, we won't mark
- //the table for Acid, but today (Hive 3.1 and earlier) OrcInput/OutputFormat is the only
- //Acid format
- System.err.println("Could not determine if " + fullTableName +
- " can be made Acid due to: " + e.getMessage());
- return false;
- }
- return false;
- }
- /**
- * currently writes to current dir (whatever that is).
- * If there is nothing to compact, outputs empty file so as not to confuse the output with a
- * failed run.
- * todo: add some config to tell it where to put the script
- */
- private static void makeCompactionScript(List<String> commands, String scriptLocation,
- CompactionMetaInfo compactionMetaInfo) throws IOException {
- if (commands.isEmpty()) {
- System.out.println("No compaction is necessary");
- return;
- }
- String fileName = "compacts_" + System.currentTimeMillis() + ".sql";
- System.out.println("Writing compaction commands to " + fileName);
- try(PrintWriter pw = createScript(commands, fileName, scriptLocation)) {
- //add post script
- pw.println("-- Generated total of " + commands.size() + " compaction commands");
- if(compactionMetaInfo.numberOfBytes < Math.pow(2, 20)) {
- //to see it working in UTs
- pw.println("-- The total volume of data to be compacted is " +
- String.format("%.6fMB", compactionMetaInfo.numberOfBytes/Math.pow(2, 20)));
- }
- else {
- pw.println("-- The total volume of data to be compacted is " +
- String.format("%.3fGB", compactionMetaInfo.numberOfBytes/Math.pow(2, 30)));
- }
- pw.println();
- pw.println(
- "-- Please note that compaction may be a heavyweight and time consuming process.\n" +
- "-- Submitting all of these commands will enqueue them to a scheduling queue from\n" +
- "-- which they will be picked up by compactor Workers. The max number of\n" +
- "-- concurrent Workers is controlled by hive.compactor.worker.threads configured\n" +
- "-- for the standalone metastore process. Compaction itself is a Map-Reduce job\n" +
- "-- which is submitted to the YARN queue identified by hive.compactor.job.queue\n" +
- "-- property if defined or 'default' if not defined. It's advisable to set the\n" +
- "-- capacity of this queue appropriately");
- }
- }
- private static void makeConvertTableScript(List<String> alterTableAcid, List<String> alterTableMm,
- String scriptLocation) throws IOException {
- if (alterTableAcid.isEmpty()) {
- System.out.println("No acid conversion is necessary");
- } else {
- String fileName = "convertToAcid_" + System.currentTimeMillis() + ".sql";
- System.out.println("Writing acid conversion commands to " + fileName);
- try(PrintWriter pw = createScript(alterTableAcid, fileName, scriptLocation)) {
- pw.println("-- These commands may be executed by Hive 1.x later");
- }
- }
-
- if (alterTableMm.isEmpty()) {
- System.out.println("No managed table conversion is necessary");
- } else {
- String fileName = "convertToMM_" + System.currentTimeMillis() + ".sql";
- System.out.println("Writing managed table conversion commands to " + fileName);
- try(PrintWriter pw = createScript(alterTableMm, fileName, scriptLocation)) {
- pw.println("-- These commands must be executed by Hive 3.0 or later");
- }
- }
- }
-
- private static PrintWriter createScript(List<String> commands, String fileName,
- String scriptLocation) throws IOException {
- //todo: make sure to create the file in 'scriptLocation' dir
- FileWriter fw = new FileWriter(scriptLocation + "/" + fileName);
- PrintWriter pw = new PrintWriter(fw);
- for(String cmd : commands) {
- pw.println(cmd + ";");
- }
- return pw;
- }
- private static void makeRenameFileScript(String scriptLocation) throws IOException {
- List<String> commands = Collections.emptyList();
- if (commands.isEmpty()) {
- System.out.println("No file renaming is necessary");
- } else {
- String fileName = "normalizeFileNames_" + System.currentTimeMillis() + ".sh";
- System.out.println("Writing file renaming commands to " + fileName);
- PrintWriter pw = createScript(commands, fileName, scriptLocation);
- pw.close();
- }
- }
- /**
- * @return any compaction commands to run for {@code Table t}
- */
- private static List<String> getCompactionCommands(Table t, Configuration conf,
- HiveMetaStoreClient hms, CompactionMetaInfo compactionMetaInfo)
- throws IOException, TException {
- if(!isFullAcidTable(t)) {
- return Collections.emptyList();
- }
- if(t.getPartitionKeysSize() <= 0) {
- //not partitioned
- if(!needsCompaction(new Path(t.getSd().getLocation()), conf, compactionMetaInfo)) {
- return Collections.emptyList();
- }
-
- List<String> cmds = new ArrayList<>();
- cmds.add(getCompactionCommand(t, null));
- return cmds;
- }
- List<String> partNames = hms.listPartitionNames(t.getDbName(), t.getTableName(), (short)-1);
- int batchSize = 10000;//todo: right size?
- int numWholeBatches = partNames.size()/batchSize;
- List<String> compactionCommands = new ArrayList<>();
- for(int i = 0; i < numWholeBatches; i++) {
- List<Partition> partitionList = hms.getPartitionsByNames(t.getDbName(), t.getTableName(), partNames.subList(i * batchSize, (i + 1) * batchSize));
- for(Partition p : partitionList) {
- if(needsCompaction(new Path(p.getSd().getLocation()), conf, compactionMetaInfo)) {
- compactionCommands.add(getCompactionCommand(t, p));
- }
- }
- }
- if(numWholeBatches * batchSize < partNames.size()) {
- //last partial batch
- List<Partition> partitionList = hms.getPartitionsByNames(t.getDbName(), t.getTableName(), partNames.subList(numWholeBatches * batchSize, partNames.size()));
- for (Partition p : partitionList) {
- if (needsCompaction(new Path(p.getSd().getLocation()), conf, compactionMetaInfo)) {
- compactionCommands.add(getCompactionCommand(t, p));
- }
- }
- }
- return compactionCommands;
- }
- /**
- *
- * @param location - path to a partition (or table if not partitioned) dir
- */
- private static boolean needsCompaction(Path location, Configuration conf,
- CompactionMetaInfo compactionMetaInfo) throws IOException {
- FileSystem fs = location.getFileSystem(conf);
- FileStatus[] deltas = fs.listStatus(location, new PathFilter() {
- @Override
- public boolean accept(Path path) {
- //checking for delete_delta is only so that this functionality can be exercised by code 3.0
- //which cannot produce any deltas with mix of update/insert events
- return path.getName().startsWith("delta_") || path.getName().startsWith("delete_delta_");
- }
- });
- if(deltas == null || deltas.length == 0) {
- //base_n cannot contain update/delete. Original files are all 'insert' and we need to compact
- //only if there are update/delete events.
- return false;
- }
- deltaLoop: for(FileStatus delta : deltas) {
- if(!delta.isDirectory()) {
- //should never happen - just in case
- continue;
- }
- FileStatus[] buckets = fs.listStatus(delta.getPath(), new PathFilter() {
- @Override
- public boolean accept(Path path) {
- //since this is inside a delta dir created by Hive 2.x or earlier it can only contain
- //bucket_x or bucket_x__flush_length
- return path.getName().startsWith("bucket_");
- }
- });
- for(FileStatus bucket : buckets) {
- if(bucket.getPath().getName().endsWith("_flush_length")) {
- //streaming ingest dir - cannot have update/delete events
- continue deltaLoop;
- }
- if(needsCompaction(bucket, fs)) {
- //found delete events - this 'location' needs compacting
- compactionMetaInfo.numberOfBytes += getDataSize(location, conf);
- return true;
- }
- }
- }
- return false;
- }
-
- /**
- * @param location - path to a partition (or table if not partitioned) dir
- * @throws IOException
- */
- private static long getDataSize(Path location, Configuration conf) throws IOException {
- /*
- * todo: Figure out the size of the partition. The
- * best way is to getAcidState() and look at each file - this way it takes care of
- * original files vs base and any other obsolete files. For now just brute force it,
- * it's likely close enough for a rough estimate.*/
- FileSystem fs = location.getFileSystem(conf);
- ContentSummary cs = fs.getContentSummary(location);
- return cs.getLength();
- }
- private static boolean needsCompaction(FileStatus bucket, FileSystem fs) throws IOException {
- //create reader, look at footer
- //no need to check side file since it can only be in a streaming ingest delta
- Reader orcReader = OrcFile.createReader(bucket.getPath(),OrcFile.readerOptions(fs.getConf())
- .filesystem(fs));
- AcidStats as = OrcAcidUtils.parseAcidStats(orcReader);
- if(as == null) {
- //should never happen since we are reading bucket_x written by acid write
- throw new IllegalStateException("AcidStats missing in " + bucket.getPath());
- }
- return as.deletes > 0 || as.updates > 0;
- }
- private static String getCompactionCommand(Table t, Partition p) {
- StringBuilder sb = new StringBuilder("ALTER TABLE ").append(Warehouse.getQualifiedName(t));
- if(t.getPartitionKeysSize() > 0) {
- assert p != null : "must supply partition for partitioned table " +
- Warehouse.getQualifiedName(t);
- sb.append(" PARTITION(");
- for (int i = 0; i < t.getPartitionKeysSize(); i++) {
- //todo: should these be quoted? HiveUtils.unparseIdentifier() - if value is String should definitely quote
- sb.append(t.getPartitionKeys().get(i).getName()).append('=')
- .append(p.getValues().get(i)).append(",");
- }
- sb.setCharAt(sb.length() - 1, ')');//replace trailing ','
- }
- return sb.append(" COMPACT 'major'").toString();
- }
- private static boolean isFullAcidTable(Table t) {
- if (t.getParametersSize() <= 0) {
- //cannot be acid
- return false;
- }
- String transacationalValue = t.getParameters()
- .get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL);
- if (transacationalValue != null && "true".equalsIgnoreCase(transacationalValue)) {
- System.out.println("Found Acid table: " + Warehouse.getQualifiedName(t));
- return true;
- }
- return false;
- }
private static void printAndExit(HiveMetaTool metaTool) {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("metatool", metaTool.cmdLineOptions);
@@ -863,18 +466,6 @@ public class HiveMetaTool {
} else {
metaTool.updateFSRootLocation(oldURI, newURI, serdepropKey, tablePropKey, isDryRun);
}
- } else if(line.hasOption("prepareAcidUpgrade")) {
- String[] values = line.getOptionValues("prepareAcidUpgrade");
- String targetDir = ".";
- if(values != null && values.length > 0) {
- if(values.length > 1) {
- System.err.println("HiveMetaTool: prepareAcidUpgrade");
- printAndExit(metaTool);
- } else {
- targetDir = values[0];
- }
- }
- metaTool.prepareAcidUpgrade(targetDir);
} else {
if (line.hasOption("dryRun")) {
System.err.println("HiveMetaTool: dryRun is not a valid standalone option");
http://git-wip-us.apache.org/repos/asf/hive/blob/b9e68371/upgrade-acid/pom.xml
----------------------------------------------------------------------
diff --git a/upgrade-acid/pom.xml b/upgrade-acid/pom.xml
new file mode 100644
index 0000000..f53d096
--- /dev/null
+++ b/upgrade-acid/pom.xml
@@ -0,0 +1,296 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <parent>
+ <!--disconnected from hive root pom since this module needs 2.x jars -->
+ <groupId>org.apache</groupId>
+ <artifactId>apache</artifactId>
+ <version>18</version>
+ <relativePath></relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+ <!--this module is added to parent pom so that it builds and releases with the reset of Hive-->
+ <groupId>org.apache.hive</groupId>
+ <version>3.1.0-SNAPSHOT</version>
+ <artifactId>hive-upgrade-acid</artifactId>
+ <name>Hive Upgrade Acid</name>
+ <packaging>jar</packaging>
+
+ <properties>
+ <!-- Build properties -->
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <maven.compiler.source>1.8</maven.compiler.source>
+ <maven.compiler.target>1.8</maven.compiler.target>
+ <maven.compiler.useIncrementalCompilation>false</maven.compiler.useIncrementalCompilation>
+ <maven.repo.local>${settings.localRepository}</maven.repo.local>
+ <maven.assembly.plugin.version>2.3</maven.assembly.plugin.version>
+ <maven.exec.plugin.version>1.6.0</maven.exec.plugin.version>
+ <hive.path.to.root>..</hive.path.to.root>
+ <!-- Plugin versions -->
+ <ant.contrib.version>1.0b3</ant.contrib.version>
+ <maven.antrun.plugin.version>1.7</maven.antrun.plugin.version>
+ <checkstyle.conf.dir>${basedir}/checkstyle/</checkstyle.conf.dir>
+ <maven.checkstyle.plugin.version>2.17</maven.checkstyle.plugin.version>
+ <maven.surefire.version>2.20.1</maven.surefire.version>
+
+ <!-- Test Properties -->
+ <test.conf.dir>${project.build.directory}/testconf</test.conf.dir>
+ <test.log4j.scheme>file://</test.log4j.scheme>
+ <log4j.conf.dir>${project.basedir}/src/test/resources</log4j.conf.dir>
+ <test.tmp.dir>${project.build.directory}/tmp</test.tmp.dir>
+ <test.warehouse.dir>${project.build.directory}/warehouse</test.warehouse.dir>
+ <test.warehouse.scheme>file://</test.warehouse.scheme>
+ <test.forkcount>1</test.forkcount>
+ <skipITests>true</skipITests>
+ </properties>
+ <dependencies>
+ <!--scope is 'provided' for all. The UpgradeTool is provided as part of Hive 3.x and
+ supports 2 modes - preUpgrade which runs with 2.x jars on the classpath and postUpgrade
+ which runs with 3.x jars. 'provided' should pull these jars for compile/test but not
+ for packaging.-->
+ <dependency>
+ <groupId>commons-cli</groupId>
+ <artifactId>commons-cli</artifactId>
+ <version>1.2</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-metastore</artifactId>
+ <version>2.3.3</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-exec</artifactId>
+ <version>2.3.3</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>2.7.2</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <!-- w/o this we get this, even though mapreduce.framework.name=mapred.job.tracker=local
+ https://stackoverflow.com/questions/24096834/org-apache-hadoop-mapred-localclientprotocolprovider-not-found
+
+ 2018-05-23T13:01:50,122 ERROR [main] exec.Task: Job Submission failed with exception 'java.io.IOException(Cannot initialize Cluster. Please check yo\
+ur configuration for mapreduce.framework.name and the correspond server addresses.)'
+java.io.IOException: Cannot initialize Cluster. Please check your configuration for mapreduce.framework.name and the correspond server addresses.
+ at org.apache.hadoop.mapreduce.Cluster.initialize(Cluster.java:120)
+ at org.apache.hadoop.mapreduce.Cluster.<init>(Cluster.java:82)
+ at org.apache.hadoop.mapreduce.Cluster.<init>(Cluster.java:75)
+ at org.apache.hadoop.mapred.JobClient.init(JobClient.java:470)
+ at org.apache.hadoop.mapred.JobClient.<init>(JobClient.java:449)
+ at org.apache.hadoop.hive.ql.exec.mr.ExecDriver.execute(ExecDriver.java:369)
+ at org.apache.hadoop.hive.ql.exec.mr.MapRedTask.execute(MapRedTask.java:151)
+ at org.apache.hadoop.hive.ql.exec.Task.executeTask(Task.java:199)
+ at org.apache.hadoop.hive.ql.exec.TaskRunner.runSequential(TaskRunner.java:100)
+ at org.apache.hadoop.hive.ql.Driver.launchTask(Driver.java:2183)
+ at org.apache.hadoop.hive.ql.Driver.execute(Driver.java:1839)
+ at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1526)
+ at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1237)
+
+ -->
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-common</artifactId>
+ <version>2.7.2</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.orc</groupId>
+ <artifactId>orc-core</artifactId>
+ <version>1.3.3</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+
+
+ <build>
+ <resources>
+ <resource>
+ <directory>${basedir}/src/main/resources</directory>
+ <includes>
+ <include>package.jdo</include>
+ </includes>
+ </resource>
+ </resources>
+
+ <pluginManagement>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <version>${maven.antrun.plugin.version}</version>
+ <dependencies>
+ <dependency>
+ <groupId>ant-contrib</groupId>
+ <artifactId>ant-contrib</artifactId>
+ <version>${ant.contrib.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>ant</groupId>
+ <artifactId>ant</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <version>${maven.checkstyle.plugin.version}</version>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>exec-maven-plugin</artifactId>
+ <version>${maven.exec.plugin.version}</version>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+ <plugins>
+ <!-- plugins are always listed in sorted order by groupId, artifectId -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>setup-test-dirs</id>
+ <phase>process-test-resources</phase>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ <configuration>
+ <target>
+ <delete dir="${test.conf.dir}" />
+ <delete dir="${test.tmp.dir}" />
+ <delete dir="${test.warehouse.dir}" />
+ <mkdir dir="${test.tmp.dir}" />
+ <mkdir dir="${test.warehouse.dir}" />
+ <mkdir dir="${test.conf.dir}" />
+ <!-- copies hive-site.xml so it can be modified -->
+ <copy todir="${test.conf.dir}">
+ <fileset dir="${basedir}/${hive.path.to.root}/data/conf/"/>
+ </copy>
+ </target>
+ </configuration>
+ </execution>
+ <execution>
+ <id>setup-metastore-scripts</id>
+ <phase>process-test-resources</phase>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ <configuration>
+ <target>
+ <mkdir dir="${test.tmp.dir}/scripts/metastore" />
+ <copy todir="${test.tmp.dir}/scripts/metastore">
+ <fileset dir="${basedir}/${hive.path.to.root}/metastore/scripts/"/>
+ </copy>
+ <mkdir dir="${test.tmp.dir}/scripts/metastore/upgrade" />
+ <copy todir="${test.tmp.dir}/scripts/metastore/upgrade">
+ <fileset dir="${basedir}/${hive.path.to.root}/standalone-metastore/src/main/sql/"/>
+ </copy>
+ </target>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <version>2.20.1</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>integration-test</goal>
+ <goal>verify</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <redirectTestOutputToFile>true</redirectTestOutputToFile>
+ <reuseForks>false</reuseForks>
+ <argLine>-Xmx2048m</argLine>
+ <failIfNoTests>false</failIfNoTests>
+ <systemPropertyVariables>
+ <log4j.debug>true</log4j.debug>
+ <java.io.tmpdir>${test.tmp.dir}</java.io.tmpdir>
+ <test.tmp.dir>${test.tmp.dir}</test.tmp.dir>
+ <hive.in.test>true</hive.in.test>
+ </systemPropertyVariables>
+ <additionalClasspathElements>
+ <additionalClasspathElement>${log4j.conf.dir}</additionalClasspathElement>
+ </additionalClasspathElements>
+ <skipITs>${skipITests}</skipITs> <!-- set this to false to run these tests -->
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>${maven.surefire.version}</version>
+ <configuration>
+ <redirectTestOutputToFile>true</redirectTestOutputToFile>
+ <reuseForks>false</reuseForks>
+ <forkCount>${test.forkcount}</forkCount>
+ <argLine>-Xmx2048m</argLine>
+ <failIfNoTests>false</failIfNoTests>
+ <systemPropertyVariables>
+ <build.dir>${project.build.directory}</build.dir>
+ <datanucleus.schema.autoCreateAll>true</datanucleus.schema.autoCreateAll>
+ <derby.version>${derby.version}</derby.version>
+ <derby.stream.error.file>${test.tmp.dir}/derby.log</derby.stream.error.file>
+ <!--next line needed to get hive.log-->
+ <log4j.configurationFile>${test.log4j.scheme}${test.conf.dir}/hive-log4j2.properties</log4j.configurationFile>
+ <log4j.debug>true</log4j.debug>
+ <java.io.tmpdir>${test.tmp.dir}</java.io.tmpdir>
+ <!--
+ use 'memory' to make it run faster
+ <javax.jdo.option.ConnectionURL>jdbc:derby:memory:${test.tmp.dir}/junit_metastore_db;create=true</javax.jdo.option.ConnectionURL>-->
+ <javax.jdo.option.ConnectionURL>jdbc:derby:${test.tmp.dir}/junit_metastore_db;create=true</javax.jdo.option.ConnectionURL>
+ <metastore.schema.verification>false</metastore.schema.verification>
+ <test.tmp.dir>${test.tmp.dir}</test.tmp.dir>
+ <metastore.warehouse.dir>${test.warehouse.scheme}${test.warehouse.dir}</metastore.warehouse.dir>
+ <!-- both default to 'local'
+ <mapred.job.tracker>local</mapred.job.tracker>
+ <mapreduce.framework.name>local</mapreduce.framework.name>-->
+ </systemPropertyVariables>
+ <additionalClasspathElements>
+ <additionalClasspathElement>${log4j.conf.dir}</additionalClasspathElement>
+ <additionalClasspathElement>${test.conf.dir}</additionalClasspathElement>
+ <!--puts hive-site.xml on classpath - w/o HMS tables are not created-->
+ <additionalClasspathElement>${test.conf.dir}/conf</additionalClasspathElement>
+ </additionalClasspathElements>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/b9e68371/upgrade-acid/src/main/java/org/apache/hadoop/hive/upgrade/acid/UpgradeTool.java
----------------------------------------------------------------------
diff --git a/upgrade-acid/src/main/java/org/apache/hadoop/hive/upgrade/acid/UpgradeTool.java b/upgrade-acid/src/main/java/org/apache/hadoop/hive/upgrade/acid/UpgradeTool.java
new file mode 100644
index 0000000..78c0843
--- /dev/null
+++ b/upgrade-acid/src/main/java/org/apache/hadoop/hive/upgrade/acid/UpgradeTool.java
@@ -0,0 +1,808 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.upgrade.acid;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.CompactionResponse;
+import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.shims.HadoopShims;
+import org.apache.hive.common.util.HiveVersionInfo;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.impl.AcidStats;
+import org.apache.orc.impl.OrcAcidUtils;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.escapeSQLString;
+
+/**
+ * This utility is designed to help with upgrading to Hive 3.0. On-disk layout for transactional
+ * tables has changed in 3.0 and require pre-processing before upgrade to ensure they are readable
+ * by Hive 3.0. Some transactional tables (identified by this utility) require Major compaction
+ * to be run on them before upgrading to 3.0. Once this compaction starts, no more
+ * update/delete/merge statements may be executed on these tables until upgrade is finished.
+ *
+ * Additionally, a new type of transactional tables was added in 3.0 - insert-only tables. These
+ * tables support ACID semantics and work with any Input/OutputFormat. Any Managed tables may
+ * be made insert-only transactional table. These tables don't support Update/Delete/Merge commands.
+ *
+ * This utility works in 2 modes: preUpgrade and postUpgrade.
+ * In preUpgrade mode it has to have 2.x Hive jars on the classpath. It will perform analysis on
+ * existing transactional tables, determine which require compaction and generate a set of SQL
+ * commands to launch all of these compactions.
+ *
+ * Note that depending on the number of tables/partitions and amount of data in them compactions
+ * may take a significant amount of time and resources. The script output by this utility includes
+ * some heuristics that may help estimate the time required. If no script is produced, no action
+ * is needed. For compactions to run an instance of standalone Hive Metastore must be running.
+ * Please make sure hive.compactor.worker.threads is sufficiently high - this specifies the limit
+ * of concurrent compactions that may be run. Each compaction job is a Map-Reduce job.
+ * hive.compactor.job.queue may be used to set a Yarn queue ame where all compaction jobs will be
+ * submitted.
+ *
+ * In postUpgrade mode, Hive 3.0 jars/hive-site.xml should be on the classpath. This utility will
+ * find all the tables that may be made transactional (with ful CRUD support) and generate
+ * Alter Table commands to do so. It will also find all tables that may not support full CRUD
+ * but can be made insert-only transactional tables and generate corresponding Alter Table commands.
+ *
+ * TODO: rename files
+ *
+ * "execute" option may be supplied in both modes to have the utility automatically execute the
+ * equivalent of the generated commands
+ *
+ * "location" option may be supplied followed by a path to set the location for the generated
+ * scripts.
+ */
+public class UpgradeTool {
+ private static final Logger LOG = LoggerFactory.getLogger(UpgradeTool.class);
+ private static final int PARTITION_BATCH_SIZE = 10000;
+ private final Options cmdLineOptions = new Options();
+
+ public static void main(String[] args) throws Exception {
+ UpgradeTool tool = new UpgradeTool();
+ tool.init();
+ CommandLineParser parser = new GnuParser();
+ CommandLine line ;
+ String outputDir = ".";
+ boolean preUpgrade = false, postUpgrade = false, execute = false, nonBlocking = false;
+ try {
+ line = parser.parse(tool.cmdLineOptions, args);
+ } catch (ParseException e) {
+ System.err.println("UpgradeTool: Parsing failed. Reason: " + e.getLocalizedMessage());
+ printAndExit(tool);
+ return;
+ }
+ if (line.hasOption("help")) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("upgrade-acid", tool.cmdLineOptions);
+ return;
+ }
+ if(line.hasOption("location")) {
+ outputDir = line.getOptionValue("location");
+ }
+ if(line.hasOption("execute")) {
+ execute = true;
+ }
+ if(line.hasOption("preUpgrade")) {
+ preUpgrade = true;
+ }
+ if(line.hasOption("postUpgrade")) {
+ postUpgrade = true;
+ }
+ LOG.info("Starting with preUpgrade=" + preUpgrade + ", postUpgrade=" + postUpgrade +
+ ", execute=" + execute + ", location=" + outputDir);
+ if(preUpgrade && postUpgrade) {
+ throw new IllegalArgumentException("Cannot specify both preUpgrade and postUpgrade");
+ }
+
+ try {
+ String hiveVer = HiveVersionInfo.getShortVersion();
+ if(preUpgrade) {
+ if(!hiveVer.startsWith("2.")) {
+ throw new IllegalStateException("preUpgrade requires Hive 2.x. Actual: " + hiveVer);
+ }
+ }
+ if(postUpgrade && execute && !isTestMode) {
+ if(!hiveVer.startsWith("3.")) {
+ throw new IllegalStateException("postUpgrade w/execute requires Hive 3.x. Actual: " +
+ hiveVer);
+ }
+ }
+ tool.prepareAcidUpgradeInternal(outputDir, preUpgrade, postUpgrade, execute);
+ }
+ catch(Exception ex) {
+ LOG.error("UpgradeTool failed", ex);
+ throw ex;
+ }
+ }
+ private static void printAndExit(UpgradeTool tool) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("upgrade-acid", tool.cmdLineOptions);
+ System.exit(1);
+ }
+
+ private void init() {
+ try {
+ cmdLineOptions.addOption(new Option("help", "print this message"));
+ cmdLineOptions.addOption(new Option("preUpgrade",
+ "Generates a script to execute on 2.x cluster. This requires 2.x binaries" +
+ " on the classpath and hive-site.xml."));
+ cmdLineOptions.addOption(new Option("postUpgrade",
+ "Generates a script to execute on 3.x cluster. This requires 3.x binaries" +
+ " on the classpath and hive-site.xml."));
+ Option exec = new Option("execute",
+ "Executes commands equivalent to generated scrips");
+ exec.setOptionalArg(true);
+ cmdLineOptions.addOption(exec);
+ cmdLineOptions.addOption(new Option("location", true,
+ "Location to write scripts to. Default is CWD."));
+ }
+ catch(Exception ex) {
+ LOG.error("init()", ex);
+ throw ex;
+ }
+ }
+ /**
+ * todo: this should accept a file of table names to exclude from non-acid to acid conversion
+ * todo: change script comments to a preamble instead of a footer
+ *
+ * how does rename script work? "hadoop fs -mv oldname newname" * and what what about S3?
+ * How does this actually get executed?
+ * all other actions are done via embedded JDBC
+ *
+ *
+ */
+ private void prepareAcidUpgradeInternal(String scriptLocation, boolean preUpgrade,
+ boolean postUpgrade, boolean execute) throws HiveException, TException, IOException {
+ HiveConf conf = hiveConf != null ? hiveConf : new HiveConf();
+ boolean isAcidEnabled = isAcidEnabled(conf);
+ HiveMetaStoreClient hms = new HiveMetaStoreClient(conf);//MetaException
+ LOG.debug("Looking for databases");
+ List<String> databases = hms.getAllDatabases();//TException
+ LOG.debug("Found " + databases.size() + " databases to process");
+ List<String> compactions = new ArrayList<>();
+ List<String> convertToAcid = new ArrayList<>();
+ List<String> convertToMM = new ArrayList<>();
+ final CompactionMetaInfo compactionMetaInfo = new CompactionMetaInfo();
+ ValidTxnList txns = null;
+ Hive db = null;
+ if(execute) {
+ db = Hive.get(conf);
+ }
+
+ for(String dbName : databases) {
+ List<String> tables = hms.getAllTables(dbName);
+ LOG.debug("found " + tables.size() + " tables in " + dbName);
+ for(String tableName : tables) {
+ Table t = hms.getTable(dbName, tableName);
+ LOG.debug("processing table " + Warehouse.getQualifiedName(t));
+ if(preUpgrade && isAcidEnabled) {
+ //if acid is off, there can't be any acid tables - nothing to compact
+ if(execute && txns == null) {
+ /*
+ This API changed from 2.x to 3.0. so this won't even compile with 3.0
+ but it doesn't need to since we only run this preUpgrade
+ */
+ TxnStore txnHandler = TxnUtils.getTxnStore(conf);
+ txns = TxnUtils.createValidCompactTxnList(txnHandler.getOpenTxnsInfo());
+ }
+ List<String> compactionCommands =
+ getCompactionCommands(t, conf, hms, compactionMetaInfo, execute, db, txns);
+ compactions.addAll(compactionCommands);
+ }
+ if(postUpgrade && isAcidEnabled) {
+ //if acid is off post upgrade, you can't make any tables acid - will throw
+ processConversion(t, convertToAcid, convertToMM, hms, db, execute);
+ }
+ /*todo: handle renaming files somewhere*/
+ }
+ }
+ makeCompactionScript(compactions, scriptLocation, compactionMetaInfo);
+ makeConvertTableScript(convertToAcid, convertToMM, scriptLocation);
+ makeRenameFileScript(scriptLocation);//todo: is this pre or post upgrade?
+ //todo: can different tables be in different FileSystems?
+ if(preUpgrade && execute) {
+ while(compactionMetaInfo.compactionIds.size() > 0) {
+ LOG.debug("Will wait for " + compactionMetaInfo.compactionIds.size() +
+ " compactions to complete");
+ ShowCompactResponse resp = db.showCompactions();
+ for(ShowCompactResponseElement e : resp.getCompacts()) {
+ final String state = e.getState();
+ boolean removed;
+ switch (state) {
+ case TxnStore.CLEANING_RESPONSE:
+ case TxnStore.SUCCEEDED_RESPONSE:
+ removed = compactionMetaInfo.compactionIds.remove(e.getId());
+ if(removed) {
+ LOG.debug("Required compaction succeeded: " + e.toString());
+ }
+ break;
+ case TxnStore.ATTEMPTED_RESPONSE:
+ case TxnStore.FAILED_RESPONSE:
+ removed = compactionMetaInfo.compactionIds.remove(e.getId());
+ if(removed) {
+ LOG.warn("Required compaction failed: " + e.toString());
+ }
+ break;
+ case TxnStore.INITIATED_RESPONSE:
+ //may flood the log
+ //LOG.debug("Still waiting on: " + e.toString());
+ break;
+ case TxnStore.WORKING_RESPONSE:
+ LOG.debug("Still working on: " + e.toString());
+ break;
+ default://shouldn't be any others
+ LOG.error("Unexpected state for : " + e.toString());
+ }
+ }
+ if(compactionMetaInfo.compactionIds.size() > 0) {
+ try {
+ if (callback != null) {
+ callback.onWaitForCompaction();
+ }
+ Thread.sleep(pollIntervalMs);
+ } catch (InterruptedException ex) {
+ ;//this only responds to ^C
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Actualy makes the table transactional
+ */
+ private static void alterTable(Table t, Hive db, boolean isMM)
+ throws HiveException, InvalidOperationException {
+ org.apache.hadoop.hive.ql.metadata.Table metaTable =
+ //clone to make sure new prop doesn't leak
+ new org.apache.hadoop.hive.ql.metadata.Table(t.deepCopy());
+ metaTable.getParameters().put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, "true");
+ if(isMM) {
+ metaTable.getParameters()
+ .put(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES, "insert_only");
+ }
+ db.alterTable(Warehouse.getQualifiedName(t), metaTable, false, null);
+ }
+
+ /**
+ * todo: handle exclusion list
+ * Figures out which tables to make Acid, MM and (optionally, performs the operation)
+ */
+ private static void processConversion(Table t, List<String> convertToAcid,
+ List<String> convertToMM, HiveMetaStoreClient hms, Hive db, boolean execute)
+ throws TException, HiveException {
+ if(isFullAcidTable(t)) {
+ return;
+ }
+ if(!TableType.MANAGED_TABLE.name().equalsIgnoreCase(t.getTableType())) {
+ return;
+ }
+ String fullTableName = Warehouse.getQualifiedName(t);
+ if(t.getPartitionKeysSize() <= 0) {
+ if(canBeMadeAcid(fullTableName, t.getSd())) {
+ convertToAcid.add("ALTER TABLE " + Warehouse.getQualifiedName(t) + " SET TBLPROPERTIES (" +
+ "'transactional'='true')");
+ if(execute) {
+ alterTable(t, db, false);
+ }
+ }
+ else {
+ convertToMM.add("ALTER TABLE " + Warehouse.getQualifiedName(t) + " SET TBLPROPERTIES (" +
+ "'transactional'='true', 'transactional_properties'='insert_only')");
+ if(execute) {
+ alterTable(t, db, true);
+ }
+ }
+ }
+ else {
+ /*
+ each Partition may have different I/O Format so have to check them all before deciding to
+ make a full CRUD table.
+ Run in batches to prevent OOM
+ */
+ List<String> partNames = hms.listPartitionNames(t.getDbName(), t.getTableName(), (short)-1);
+ int batchSize = PARTITION_BATCH_SIZE;
+ int numWholeBatches = partNames.size()/batchSize;
+ for(int i = 0; i < numWholeBatches; i++) {
+ List<Partition> partitionList = hms.getPartitionsByNames(t.getDbName(), t.getTableName(),
+ partNames.subList(i * batchSize, (i + 1) * batchSize));
+ if(alterTable(fullTableName, partitionList, convertToMM, t, db, execute)) {
+ return;
+ }
+ }
+ if(numWholeBatches * batchSize < partNames.size()) {
+ //last partial batch
+ List<Partition> partitionList = hms.getPartitionsByNames(t.getDbName(), t.getTableName(),
+ partNames.subList(numWholeBatches * batchSize, partNames.size()));
+ if(alterTable(fullTableName, partitionList, convertToMM, t, db, execute)) {
+ return;
+ }
+ }
+ //if here checked all parts and they are Acid compatible - make it acid
+ convertToAcid.add("ALTER TABLE " + Warehouse.getQualifiedName(t) + " SET TBLPROPERTIES (" +
+ "'transactional'='true')");
+ if(execute) {
+ alterTable(t, db, false);
+ }
+ }
+ }
+ /**
+ * @return true if table was converted/command generated
+ */
+ private static boolean alterTable(String fullTableName, List<Partition> partitionList,
+ List<String> convertToMM, Table t, Hive db, boolean execute)
+ throws InvalidOperationException, HiveException {
+ for(Partition p : partitionList) {
+ if(!canBeMadeAcid(fullTableName, p.getSd())) {
+ convertToMM.add("ALTER TABLE " + Warehouse.getQualifiedName(t) + " SET TBLPROPERTIES (" +
+ "'transactional'='true', 'transactional_properties'='insert_only')");
+ if(execute) {
+ alterTable(t, db, true);
+ }
+ return true;
+ }
+ }
+ return false;
+ }
+ private static boolean canBeMadeAcid(String fullTableName, StorageDescriptor sd) {
+ return isAcidInputOutputFormat(fullTableName, sd) && sd.getSortColsSize() <= 0;
+ }
+ private static boolean isAcidInputOutputFormat(String fullTableName, StorageDescriptor sd) {
+ try {
+ Class inputFormatClass = sd.getInputFormat() == null ? null :
+ Class.forName(sd.getInputFormat());
+ Class outputFormatClass = sd.getOutputFormat() == null ? null :
+ Class.forName(sd.getOutputFormat());
+
+ if (inputFormatClass != null && outputFormatClass != null &&
+ Class.forName("org.apache.hadoop.hive.ql.io.AcidInputFormat")
+ .isAssignableFrom(inputFormatClass) &&
+ Class.forName("org.apache.hadoop.hive.ql.io.AcidOutputFormat")
+ .isAssignableFrom(outputFormatClass)) {
+ return true;
+ }
+ } catch (ClassNotFoundException e) {
+ //if a table is using some custom I/O format and it's not in the classpath, we won't mark
+ //the table for Acid, but today (Hive 3.1 and earlier) OrcInput/OutputFormat is the only
+ //Acid format
+ LOG.error("Could not determine if " + fullTableName +
+ " can be made Acid due to: " + e.getMessage(), e);
+ return false;
+ }
+ return false;
+ }
+ /**
+ * Generates a set compaction commands to run on pre Hive 3 cluster
+ */
+ private static void makeCompactionScript(List<String> commands, String scriptLocation,
+ CompactionMetaInfo compactionMetaInfo) throws IOException {
+ if (commands.isEmpty()) {
+ LOG.info("No compaction is necessary");
+ return;
+ }
+ String fileName = "compacts_" + System.currentTimeMillis() + ".sql";
+ LOG.debug("Writing compaction commands to " + fileName);
+ try(PrintWriter pw = createScript(commands, fileName, scriptLocation)) {
+ //add post script
+ pw.println("-- Generated total of " + commands.size() + " compaction commands");
+ if(compactionMetaInfo.numberOfBytes < Math.pow(2, 20)) {
+ //to see it working in UTs
+ pw.println("-- The total volume of data to be compacted is " +
+ String.format("%.6fMB", compactionMetaInfo.numberOfBytes/Math.pow(2, 20)));
+ }
+ else {
+ pw.println("-- The total volume of data to be compacted is " +
+ String.format("%.3fGB", compactionMetaInfo.numberOfBytes/Math.pow(2, 30)));
+ }
+ pw.println();
+ //todo: should be at the top of the file...
+ pw.println(
+ "-- Please note that compaction may be a heavyweight and time consuming process.\n" +
+ "-- Submitting all of these commands will enqueue them to a scheduling queue from\n" +
+ "-- which they will be picked up by compactor Workers. The max number of\n" +
+ "-- concurrent Workers is controlled by hive.compactor.worker.threads configured\n" +
+ "-- for the standalone metastore process. Compaction itself is a Map-Reduce job\n" +
+ "-- which is submitted to the YARN queue identified by hive.compactor.job.queue\n" +
+ "-- property if defined or 'default' if not defined. It's advisable to set the\n" +
+ "-- capacity of this queue appropriately");
+ }
+ }
+ private static void makeConvertTableScript(List<String> alterTableAcid, List<String> alterTableMm,
+ String scriptLocation) throws IOException {
+ if (alterTableAcid.isEmpty()) {
+ LOG.info("No acid conversion is necessary");
+ } else {
+ String fileName = "convertToAcid_" + System.currentTimeMillis() + ".sql";
+ LOG.debug("Writing CRUD conversion commands to " + fileName);
+ try(PrintWriter pw = createScript(alterTableAcid, fileName, scriptLocation)) {
+ //todo: fix this - it has to run in 3.0 since tables may be unbucketed
+ pw.println("-- These commands may be executed by Hive 1.x later");
+ }
+ }
+
+ if (alterTableMm.isEmpty()) {
+ LOG.info("No managed table conversion is necessary");
+ } else {
+ String fileName = "convertToMM_" + System.currentTimeMillis() + ".sql";
+ LOG.debug("Writing managed table conversion commands to " + fileName);
+ try(PrintWriter pw = createScript(alterTableMm, fileName, scriptLocation)) {
+ pw.println("-- These commands must be executed by Hive 3.0 or later");
+ }
+ }
+ }
+
+ private static PrintWriter createScript(List<String> commands, String fileName,
+ String scriptLocation) throws IOException {
+ FileWriter fw = new FileWriter(scriptLocation + "/" + fileName);
+ PrintWriter pw = new PrintWriter(fw);
+ for(String cmd : commands) {
+ pw.println(cmd + ";");
+ }
+ return pw;
+ }
+ private static void makeRenameFileScript(String scriptLocation) throws IOException {
+ List<String> commands = Collections.emptyList();
+ if (commands.isEmpty()) {
+ LOG.info("No file renaming is necessary");
+ } else {
+ String fileName = "normalizeFileNames_" + System.currentTimeMillis() + ".sh";
+ LOG.debug("Writing file renaming commands to " + fileName);
+ PrintWriter pw = createScript(commands, fileName, scriptLocation);
+ pw.close();
+ }
+ }
+ /**
+ * @return any compaction commands to run for {@code Table t}
+ */
+ private static List<String> getCompactionCommands(Table t, HiveConf conf,
+ HiveMetaStoreClient hms, CompactionMetaInfo compactionMetaInfo, boolean execute, Hive db,
+ ValidTxnList txns) throws IOException, TException, HiveException {
+ if(!isFullAcidTable(t)) {
+ return Collections.emptyList();
+ }
+ if(t.getPartitionKeysSize() <= 0) {
+ //not partitioned
+ if(!needsCompaction(new Path(t.getSd().getLocation()), conf, compactionMetaInfo, txns)) {
+ return Collections.emptyList();
+ }
+
+ List<String> cmds = new ArrayList<>();
+ cmds.add(getCompactionCommand(t, null));
+ if(execute) {
+ scheduleCompaction(t, null, db, compactionMetaInfo);
+ }
+ return cmds;
+ }
+ List<String> partNames = hms.listPartitionNames(t.getDbName(), t.getTableName(), (short)-1);
+ int batchSize = PARTITION_BATCH_SIZE;
+ int numWholeBatches = partNames.size()/batchSize;
+ List<String> compactionCommands = new ArrayList<>();
+ for(int i = 0; i < numWholeBatches; i++) {
+ List<Partition> partitionList = hms.getPartitionsByNames(t.getDbName(), t.getTableName(),
+ partNames.subList(i * batchSize, (i + 1) * batchSize));
+ getCompactionCommands(t, partitionList, db, execute, compactionCommands,
+ compactionMetaInfo, conf, txns);
+ }
+ if(numWholeBatches * batchSize < partNames.size()) {
+ //last partial batch
+ List<Partition> partitionList = hms.getPartitionsByNames(t.getDbName(), t.getTableName(),
+ partNames.subList(numWholeBatches * batchSize, partNames.size()));
+ getCompactionCommands(t, partitionList, db, execute, compactionCommands,
+ compactionMetaInfo, conf, txns);
+ }
+ return compactionCommands;
+ }
+ private static void getCompactionCommands(Table t, List<Partition> partitionList, Hive db,
+ boolean execute, List<String> compactionCommands, CompactionMetaInfo compactionMetaInfo,
+ HiveConf conf, ValidTxnList txns)
+ throws IOException, TException, HiveException {
+ for (Partition p : partitionList) {
+ if (needsCompaction(new Path(p.getSd().getLocation()), conf, compactionMetaInfo, txns)) {
+ compactionCommands.add(getCompactionCommand(t, p));
+ if (execute) {
+ scheduleCompaction(t, p, db, compactionMetaInfo);
+ }
+ }
+ }
+ }
+ private static void scheduleCompaction(Table t, Partition p, Hive db,
+ CompactionMetaInfo compactionMetaInfo) throws HiveException, MetaException {
+ String partName = p == null ? null :
+ Warehouse.makePartName(t.getPartitionKeys(), p.getValues());
+ CompactionResponse resp =
+ //this gives an easy way to get at compaction ID so we can only wait for those this
+ //utility started
+ db.compact2(t.getDbName(), t.getTableName(), partName, "major", null);
+ if(!resp.isAccepted()) {
+ LOG.info(Warehouse.getQualifiedName(t) + (p == null ? "" : "/" + partName) +
+ " is already being compacted with id=" + resp.getId());
+ }
+ else {
+ LOG.info("Scheduled compaction for " + Warehouse.getQualifiedName(t) +
+ (p == null ? "" : "/" + partName) + " with id=" + resp.getId());
+ }
+ compactionMetaInfo.compactionIds.add(resp.getId());
+ }
+ /**
+ *
+ * @param location - path to a partition (or table if not partitioned) dir
+ */
+ private static boolean needsCompaction2(Path location, HiveConf conf,
+ CompactionMetaInfo compactionMetaInfo) throws IOException {
+ FileSystem fs = location.getFileSystem(conf);
+ FileStatus[] deltas = fs.listStatus(location, new PathFilter() {
+ @Override
+ public boolean accept(Path path) {
+ //checking for delete_delta is only so that this functionality can be exercised by code 3.0
+ //which cannot produce any deltas with mix of update/insert events
+ return path.getName().startsWith("delta_") || path.getName().startsWith("delete_delta_");
+ }
+ });
+ if(deltas == null || deltas.length == 0) {
+ //base_n cannot contain update/delete. Original files are all 'insert' and we need to compact
+ //only if there are update/delete events.
+ return false;
+ }
+ deltaLoop: for(FileStatus delta : deltas) {
+ if(!delta.isDirectory()) {
+ //should never happen - just in case
+ continue;
+ }
+ FileStatus[] buckets = fs.listStatus(delta.getPath(), new PathFilter() {
+ @Override
+ public boolean accept(Path path) {
+ //since this is inside a delta dir created by Hive 2.x or earlier it can only contain
+ //bucket_x or bucket_x__flush_length
+ return path.getName().startsWith("bucket_");
+ }
+ });
+ for(FileStatus bucket : buckets) {
+ if(bucket.getPath().getName().endsWith("_flush_length")) {
+ //streaming ingest dir - cannot have update/delete events
+ continue deltaLoop;
+ }
+ if(needsCompaction(bucket, fs)) {
+ //found delete events - this 'location' needs compacting
+ compactionMetaInfo.numberOfBytes += getDataSize(location, conf);
+ //todo: this is not remotely accurate if you have many (relevant) original files
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+ /**
+ *
+ * @param location - path to a partition (or table if not partitioned) dir
+ */
+ private static boolean needsCompaction(Path location, HiveConf conf,
+ CompactionMetaInfo compactionMetaInfo, ValidTxnList txns) throws IOException {
+ FileSystem fs = location.getFileSystem(conf);
+ FileStatus[] deltas = fs.listStatus(location, new PathFilter() {
+ @Override
+ public boolean accept(Path path) {
+ //checking for delete_delta is only so that this functionality can be exercised by code 3.0
+ //which cannot produce any deltas with mix of update/insert events
+ return path.getName().startsWith("delta_") || path.getName().startsWith("delete_delta_");
+ }
+ });
+ if(deltas == null || deltas.length == 0) {
+ //base_n cannot contain update/delete. Original files are all 'insert' and we need to compact
+ //only if there are update/delete events.
+ return false;
+ }
+ /*getAcidState() is smart not to return any deltas in current if there is a base that covers
+ * them, i.e. if they were compacted but not yet cleaned. This means re-checking if
+ * compaction is needed should cheap(er)*/
+ AcidUtils.Directory dir = AcidUtils.getAcidState(location, conf, txns);
+ deltaLoop: for(AcidUtils.ParsedDelta delta : dir.getCurrentDirectories()) {
+ FileStatus[] buckets = fs.listStatus(delta.getPath(), new PathFilter() {
+ @Override
+ public boolean accept(Path path) {
+ //since this is inside a delta dir created by Hive 2.x or earlier it can only contain
+ //bucket_x or bucket_x__flush_length
+ return path.getName().startsWith("bucket_");
+ }
+ });
+ for(FileStatus bucket : buckets) {
+ if(bucket.getPath().getName().endsWith("_flush_length")) {
+ //streaming ingest dir - cannot have update/delete events
+ continue deltaLoop;
+ }
+ if(needsCompaction(bucket, fs)) {
+ //found delete events - this 'location' needs compacting
+ compactionMetaInfo.numberOfBytes += getDataSize(location, conf);
+
+ //if there are un-compacted original files, they will be included in compaction, so
+ //count at the size for 'cost' estimation later
+ for(HadoopShims.HdfsFileStatusWithId origFile : dir.getOriginalFiles()) {
+ FileStatus fileStatus = origFile.getFileStatus();
+ if(fileStatus != null) {
+ compactionMetaInfo.numberOfBytes += fileStatus.getLen();
+ }
+ }
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ /**
+ * @param location - path to a partition (or table if not partitioned) dir
+ */
+ private static long getDataSize(Path location, HiveConf conf) throws IOException {
+ FileSystem fs = location.getFileSystem(conf);
+ ContentSummary cs = fs.getContentSummary(location);
+ return cs.getLength();
+ }
+ private static boolean needsCompaction(FileStatus bucket, FileSystem fs) throws IOException {
+ //create reader, look at footer
+ //no need to check side file since it can only be in a streaming ingest delta
+ Reader orcReader = OrcFile.createReader(bucket.getPath(),OrcFile.readerOptions(fs.getConf())
+ .filesystem(fs));
+ AcidStats as = OrcAcidUtils.parseAcidStats(orcReader);
+ if(as == null) {
+ //should never happen since we are reading bucket_x written by acid write
+ throw new IllegalStateException("AcidStats missing in " + bucket.getPath());
+ }
+ return as.deletes > 0 || as.updates > 0;
+ }
+ private static String getCompactionCommand(Table t, Partition p) {
+ StringBuilder sb = new StringBuilder("ALTER TABLE ").append(Warehouse.getQualifiedName(t));
+ if(t.getPartitionKeysSize() > 0) {
+ assert p != null : "must supply partition for partitioned table " +
+ Warehouse.getQualifiedName(t);
+ sb.append(" PARTITION(");
+ for (int i = 0; i < t.getPartitionKeysSize(); i++) {
+ sb.append(t.getPartitionKeys().get(i).getName()).append('=').append(
+ genPartValueString(t.getPartitionKeys().get(i).getType(), p.getValues().get(i))).
+ append(",");
+ }
+ sb.setCharAt(sb.length() - 1, ')');//replace trailing ','
+ }
+ return sb.append(" COMPACT 'major'").toString();
+ }
+
+ /**
+ * This is copy-pasted from {@link org.apache.hadoop.hive.ql.parse.ColumnStatsSemanticAnalyzer},
+ * which can't be refactored since this is linked against Hive 2.x
+ */
+ private static String genPartValueString(String partColType, String partVal) {
+ String returnVal = partVal;
+ if (partColType.equals(serdeConstants.STRING_TYPE_NAME) ||
+ partColType.contains(serdeConstants.VARCHAR_TYPE_NAME) ||
+ partColType.contains(serdeConstants.CHAR_TYPE_NAME)) {
+ returnVal = "'" + escapeSQLString(partVal) + "'";
+ } else if (partColType.equals(serdeConstants.TINYINT_TYPE_NAME)) {
+ returnVal = partVal + "Y";
+ } else if (partColType.equals(serdeConstants.SMALLINT_TYPE_NAME)) {
+ returnVal = partVal + "S";
+ } else if (partColType.equals(serdeConstants.INT_TYPE_NAME)) {
+ returnVal = partVal;
+ } else if (partColType.equals(serdeConstants.BIGINT_TYPE_NAME)) {
+ returnVal = partVal + "L";
+ } else if (partColType.contains(serdeConstants.DECIMAL_TYPE_NAME)) {
+ returnVal = partVal + "BD";
+ } else if (partColType.equals(serdeConstants.DATE_TYPE_NAME) ||
+ partColType.equals(serdeConstants.TIMESTAMP_TYPE_NAME)) {
+ returnVal = partColType + " '" + escapeSQLString(partVal) + "'";
+ } else {
+ //for other usually not used types, just quote the value
+ returnVal = "'" + escapeSQLString(partVal) + "'";
+ }
+
+ return returnVal;
+ }
+ private static boolean isFullAcidTable(Table t) {
+ if (t.getParametersSize() <= 0) {
+ //cannot be acid
+ return false;
+ }
+ String transacationalValue = t.getParameters()
+ .get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL);
+ if (transacationalValue != null && "true".equalsIgnoreCase(transacationalValue)) {
+ System.out.println("Found Acid table: " + Warehouse.getQualifiedName(t));
+ return true;
+ }
+ return false;
+ }
+ private static boolean isAcidEnabled(HiveConf hiveConf) {
+ String txnMgr = hiveConf.getVar(HiveConf.ConfVars.HIVE_TXN_MANAGER);
+ boolean concurrency = hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY);
+ String dbTxnMgr = "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager";
+ return txnMgr.equals(dbTxnMgr) && concurrency;
+ }
+
+ private static class CompactionMetaInfo {
+ /**
+ * total number of bytes to be compacted across all compaction commands
+ */
+ long numberOfBytes;
+ /**
+ * IDs of compactions launched by this utility
+ */
+ Set<Long> compactionIds = new HashSet<>();
+ }
+
+ @VisibleForTesting
+ static abstract class Callback {
+ /**
+ * This is a hack enable Unit testing. Derby can't handle multiple concurrent threads but
+ * somehow Compactor needs to run to test "execute" mode. This callback can be used
+ * to run Worker. For TESTING ONLY.
+ */
+ void onWaitForCompaction() throws MetaException {}
+ }
+ @VisibleForTesting
+ static Callback callback;
+ @VisibleForTesting
+ static int pollIntervalMs = 1000*30;
+ /**
+ * Also to enable testing until I set up Maven profiles to be able to run with 3.0 jars
+ */
+ @VisibleForTesting
+ static boolean isTestMode = false;
+ /**
+ * can set it from tests to test when config needs something other than default values
+ */
+ @VisibleForTesting
+ static HiveConf hiveConf = null;
+}