You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2018/05/07 19:34:24 UTC
[2/3] hive git commit: HIVE-18288 - merge/concat not supported on
Acid table (Eugene Koifman, reviewed by Sergey Shelukhin)
HIVE-18288 - merge/concat not supported on Acid table (Eugene Koifman, reviewed by Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/cb61ed63
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/cb61ed63
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/cb61ed63
Branch: refs/heads/branch-3
Commit: cb61ed631c28811fc169e3a33280edd1f41f2186
Parents: a7d2fb8
Author: Eugene Koifman <ek...@apache.org>
Authored: Mon May 7 12:06:59 2018 -0700
Committer: Eugene Koifman <ek...@apache.org>
Committed: Mon May 7 12:06:59 2018 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 3 +
.../apache/hadoop/hive/ql/metadata/Hive.java | 4 +-
.../hive/ql/parse/DDLSemanticAnalyzer.java | 21 ++-
.../hadoop/hive/ql/TestTxnConcatenate.java | 159 +++++++++++++++++++
4 files changed, 177 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/cb61ed63/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 4b8dc19..090c255 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2565,6 +2565,9 @@ public class HiveConf extends Configuration {
COMPACTOR_JOB_QUEUE("hive.compactor.job.queue", "", "Used to specify name of Hadoop queue to which\n" +
"Compaction jobs will be submitted. Set to empty string to let Hadoop choose the queue."),
+ TRANSACTIONAL_CONCATENATE_NOBLOCK("hive.transactional.concatenate.noblock", false,
+ "Will cause 'alter table T concatenate' to be non-blocking"),
+
HIVE_COMPACTOR_COMPACT_MM("hive.compactor.compact.insert.only", true,
"Whether the compactor should compact insert-only tables. A safety switch."),
/**
http://git-wip-us.apache.org/repos/asf/hive/blob/cb61ed63/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 5ab86b4..3218f96 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -4405,9 +4405,9 @@ private void constructOneLBLocationMap(FileStatus fSta,
throws HiveException {
try {
CompactionType cr = null;
- if ("major".equals(compactType)) {
+ if ("major".equalsIgnoreCase(compactType)) {
cr = CompactionType.MAJOR;
- } else if ("minor".equals(compactType)) {
+ } else if ("minor".equalsIgnoreCase(compactType)) {
cr = CompactionType.MINOR;
} else {
throw new RuntimeException("Unknown compaction type " + compactType);
http://git-wip-us.apache.org/repos/asf/hive/blob/cb61ed63/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
index defb8be..f0b9eda 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
@@ -1964,9 +1964,19 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer {
try {
tblObj = getTable(tableName);
- // TODO: we should probably block all ACID tables here.
- if (AcidUtils.isInsertOnlyTable(tblObj.getParameters())) {
- throw new SemanticException("Merge is not supported for MM tables");
+ if(AcidUtils.isTransactionalTable(tblObj)) {
+ LinkedHashMap<String, String> newPartSpec = null;
+ if (partSpec != null) {
+ newPartSpec = new LinkedHashMap<>(partSpec);
+ }
+
+ boolean isBlocking = !HiveConf.getBoolVar(conf,
+ ConfVars.TRANSACTIONAL_CONCATENATE_NOBLOCK, false);
+ AlterTableSimpleDesc desc = new AlterTableSimpleDesc(
+ tableName, newPartSpec, "MAJOR", isBlocking);
+
+ rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), desc)));
+ return;
}
mergeDesc.setTableDesc(Utilities.getTableDesc(tblObj));
@@ -2039,11 +2049,6 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer {
throw new SemanticException(ErrorMsg.CONCATENATE_UNSUPPORTED_TABLE_NOT_MANAGED.getMsg());
}
- // transactional tables are compacted and no longer needs to be bucketed, so not safe for merge/concatenation
- boolean isAcid = AcidUtils.isTransactionalTable(tblObj);
- if (isAcid) {
- throw new SemanticException(ErrorMsg.CONCATENATE_UNSUPPORTED_TABLE_TRANSACTIONAL.getMsg());
- }
inputDir.add(oldTblPartLoc);
mergeDesc.setInputDir(inputDir);
http://git-wip-us.apache.org/repos/asf/hive/blob/cb61ed63/ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java
new file mode 100644
index 0000000..92bcefe
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java
@@ -0,0 +1,159 @@
+package org.apache.hadoop.hive.ql;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+
+public class TestTxnConcatenate extends TxnCommandsBaseForTests {
+ static final private Logger LOG = LoggerFactory.getLogger(TestTxnConcatenate.class);
+ private static final String TEST_DATA_DIR =
+ new File(System.getProperty("java.io.tmpdir") +
+ File.separator + TestTxnLoadData.class.getCanonicalName()
+ + "-" + System.currentTimeMillis()
+ ).getPath().replaceAll("\\\\", "/");
+ @Rule
+ public TemporaryFolder folder = new TemporaryFolder();
+
+ @Override
+ String getTestDataDir() {
+ return TEST_DATA_DIR;
+ }
+
+ @Test
+ public void testConcatenate() throws Exception {
+ runStatementOnDriver("insert into " + Table.ACIDTBL + " values(1,2),(4,5)");
+ runStatementOnDriver("update " + Table.ACIDTBL + " set b = 4");
+ runStatementOnDriver("insert into " + Table.ACIDTBL + " values(5,6),(8,8)");
+ String testQuery = "select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.ACIDTBL + " order by a, b";
+ String[][] expected = new String[][] {
+ {"{\"writeid\":2,\"bucketid\":536936448,\"rowid\":1}\t1\t4",
+ "acidtbl/delta_0000002_0000002_0000/bucket_00001"},
+ {"{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t4\t4",
+ "acidtbl/delta_0000002_0000002_0000/bucket_00001"},
+ {"{\"writeid\":3,\"bucketid\":536936448,\"rowid\":1}\t5\t6",
+ "acidtbl/delta_0000003_0000003_0000/bucket_00001"},
+ {"{\"writeid\":3,\"bucketid\":536936448,\"rowid\":0}\t8\t8",
+ "acidtbl/delta_0000003_0000003_0000/bucket_00001"}};
+ checkResult(expected, testQuery, false, "check data", LOG);
+
+ /*in UTs, there is no standalone HMS running to kick off compaction so it's done via runWorker()
+ but in normal usage 'concatenate' is blocking, */
+ hiveConf.setBoolVar(HiveConf.ConfVars.TRANSACTIONAL_CONCATENATE_NOBLOCK, true);
+ runStatementOnDriver("alter table " + Table.ACIDTBL + " concatenate");
+
+ TxnStore txnStore = TxnUtils.getTxnStore(hiveConf);
+ ShowCompactResponse rsp = txnStore.showCompact(new ShowCompactRequest());
+ Assert.assertEquals(1, rsp.getCompactsSize());
+ Assert.assertEquals(TxnStore.INITIATED_RESPONSE, rsp.getCompacts().get(0).getState());
+ runWorker(hiveConf);
+ rsp = txnStore.showCompact(new ShowCompactRequest());
+ Assert.assertEquals(1, rsp.getCompactsSize());
+ Assert.assertEquals(TxnStore.CLEANING_RESPONSE, rsp.getCompacts().get(0).getState());
+ String[][] expected2 = new String[][] {
+ {"{\"writeid\":2,\"bucketid\":536936448,\"rowid\":1}\t1\t4",
+ "acidtbl/base_0000003/bucket_00001"},
+ {"{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t4\t4",
+ "acidtbl/base_0000003/bucket_00001"},
+ {"{\"writeid\":3,\"bucketid\":536936448,\"rowid\":1}\t5\t6",
+ "acidtbl/base_0000003/bucket_00001"},
+ {"{\"writeid\":3,\"bucketid\":536936448,\"rowid\":0}\t8\t8",
+ "acidtbl/base_0000003/bucket_00001"}};
+ checkResult(expected2, testQuery, false, "check data after concatenate", LOG);
+ }
+ @Test
+ public void testConcatenatePart() throws Exception {
+ runStatementOnDriver("insert into " + Table.ACIDTBLPART + " values(1,2,'p1'),(4,5,'p2')");
+ runStatementOnDriver("update " + Table.ACIDTBLPART + " set b = 4 where p='p1'");
+ runStatementOnDriver("insert into " + Table.ACIDTBLPART + " values(5,6,'p1'),(8,8,'p2')");
+ String testQuery = "select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.ACIDTBLPART + " order by a, b";
+ String[][] expected = new String[][] {
+ {"{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t1\t4",
+ "acidtblpart/p=p1/delta_0000002_0000002_0000/bucket_00001"},
+ {"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t4\t5",
+ "acidtblpart/p=p2/delta_0000001_0000001_0000/bucket_00001"},
+ {"{\"writeid\":3,\"bucketid\":536936448,\"rowid\":0}\t5\t6",
+ "acidtblpart/p=p1/delta_0000003_0000003_0000/bucket_00001"},
+ {"{\"writeid\":3,\"bucketid\":536936448,\"rowid\":0}\t8\t8",
+ "acidtblpart/p=p2/delta_0000003_0000003_0000/bucket_00001"}};
+ checkResult(expected, testQuery, false, "check data", LOG);
+
+ /*in UTs, there is no standalone HMS running to kick off compaction so it's done via runWorker()
+ but in normal usage 'concatenate' is blocking, */
+ hiveConf.setBoolVar(HiveConf.ConfVars.TRANSACTIONAL_CONCATENATE_NOBLOCK, true);
+ runStatementOnDriver("alter table " + Table.ACIDTBLPART + " PARTITION(p='p1') concatenate");
+
+ TxnStore txnStore = TxnUtils.getTxnStore(hiveConf);
+ ShowCompactResponse rsp = txnStore.showCompact(new ShowCompactRequest());
+ Assert.assertEquals(1, rsp.getCompactsSize());
+ Assert.assertEquals(TxnStore.INITIATED_RESPONSE, rsp.getCompacts().get(0).getState());
+ runWorker(hiveConf);
+ rsp = txnStore.showCompact(new ShowCompactRequest());
+ Assert.assertEquals(1, rsp.getCompactsSize());
+ Assert.assertEquals(TxnStore.CLEANING_RESPONSE, rsp.getCompacts().get(0).getState());
+ String[][] expected2 = new String[][] {
+ {"{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t1\t4",
+ "acidtblpart/p=p1/base_0000003/bucket_00001"},
+ {"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t4\t5",
+ "acidtblpart/p=p2/delta_0000001_0000001_0000/bucket_00001"},
+ {"{\"writeid\":3,\"bucketid\":536936448,\"rowid\":0}\t5\t6",
+ "acidtblpart/p=p1/base_0000003/bucket_00001"},
+ {"{\"writeid\":3,\"bucketid\":536936448,\"rowid\":0}\t8\t8",
+ "acidtblpart/p=p2/delta_0000003_0000003_0000/bucket_00001"}};
+
+ checkResult(expected2, testQuery, false, "check data after concatenate", LOG);
+ }
+
+ @Test
+ public void testConcatenateMM() throws Exception {
+ HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_CREATE_TABLES_AS_INSERT_ONLY, true);
+ runStatementOnDriver("drop table if exists T");
+ runStatementOnDriver("create table T(a int, b int)");
+ runStatementOnDriver("insert into T values(1,2),(4,5)");
+ runStatementOnDriver("insert into T values(5,6),(8,8)");
+ String testQuery = "select a, b, INPUT__FILE__NAME from T order by a, b";
+ String[][] expected = new String[][] {
+ {"1\t2",
+ "t/delta_0000001_0000001_0000/000000_0"},
+ {"4\t5",
+ "t/delta_0000001_0000001_0000/000000_0"},
+ {"5\t6",
+ "t/delta_0000002_0000002_0000/000000_0"},
+ {"8\t8",
+ "t/delta_0000002_0000002_0000/000000_0"}};
+ checkResult(expected, testQuery, false, "check data", LOG);
+
+ /*in UTs, there is no standalone HMS running to kick off compaction so it's done via runWorker()
+ but in normal usage 'concatenate' is blocking, */
+ hiveConf.setBoolVar(HiveConf.ConfVars.TRANSACTIONAL_CONCATENATE_NOBLOCK, true);
+ runStatementOnDriver("alter table T concatenate");
+
+ TxnStore txnStore = TxnUtils.getTxnStore(hiveConf);
+ ShowCompactResponse rsp = txnStore.showCompact(new ShowCompactRequest());
+ Assert.assertEquals(1, rsp.getCompactsSize());
+ Assert.assertEquals(TxnStore.INITIATED_RESPONSE, rsp.getCompacts().get(0).getState());
+ runWorker(hiveConf);
+ rsp = txnStore.showCompact(new ShowCompactRequest());
+ Assert.assertEquals(1, rsp.getCompactsSize());
+ Assert.assertEquals(TxnStore.CLEANING_RESPONSE, rsp.getCompacts().get(0).getState());
+ String[][] expected2 = new String[][] {
+ {"1\t2",
+ "t/base_0000002/000000_0"},
+ {"4\t5",
+ "t/base_0000002/000000_0"},
+ {"5\t6",
+ "t/base_0000002/000000_0"},
+ {"8\t8",
+ "t/base_0000002/000000_0"}};
+ checkResult(expected2, testQuery, false, "check data after concatenate", LOG);
+ }
+}