You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2017/12/01 02:40:05 UTC
[1/3] hive git commit: HIVE-17361 Support LOAD DATA for transactional
tables (Eugene Koifman, reviewed by Alan Gates)
Repository: hive
Updated Branches:
refs/heads/master c03001e98 -> 508d7e6f2
http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java
new file mode 100644
index 0000000..b98c74a
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java
@@ -0,0 +1,467 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.io.NullWritable;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Tests related to support of LOAD DATA with Acid tables
+ * Most tests run in vectorized and non-vectorized mode since we currently have a vectorized and
+ * a non-vectorized acid readers and it's critical that ROW_IDs are generated the same way.
+ */
+public class TestTxnLoadData extends TxnCommandsBaseForTests {
+ static final private Logger LOG = LoggerFactory.getLogger(TestTxnLoadData.class);
+ private static final String TEST_DATA_DIR =
+ new File(System.getProperty("java.io.tmpdir") +
+ File.separator + TestTxnLoadData.class.getCanonicalName()
+ + "-" + System.currentTimeMillis()
+ ).getPath().replaceAll("\\\\", "/");
+ @Rule
+ public TemporaryFolder folder= new TemporaryFolder();
+
+ @Override
+ String getTestDataDir() {
+ return TEST_DATA_DIR;
+ }
+
+ @Test
+ public void loadData() throws Exception {
+ loadData(false);
+ }
+ @Test
+ public void loadDataVectorized() throws Exception {
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true);
+ loadData(true);
+ }
+ @Test
+ public void loadDataUpdate() throws Exception {
+ loadDataUpdate(false);
+ }
+ @Test
+ public void loadDataUpdateVectorized() throws Exception {
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true);
+ loadDataUpdate(true);
+ }
+ @Test
+ public void loadDataNonAcid2AcidConversion() throws Exception {
+ loadDataNonAcid2AcidConversion(false);
+ }
+ @Test
+ public void loadDataNonAcid2AcidConversionVectorized() throws Exception {
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true);
+ loadDataNonAcid2AcidConversion(true);
+ }
+ @Test
+ public void testMultiStatement() throws Exception {
+ testMultiStatement(false);
+ }
+ @Test
+ public void testMultiStatementVectorized() throws Exception {
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true);
+ testMultiStatement(true);
+ }
+ private void loadDataUpdate(boolean isVectorized) throws Exception {
+ runStatementOnDriver("drop table if exists T");
+ runStatementOnDriver("drop table if exists Tstage");
+ runStatementOnDriver(
+ "create table T (a int, b int) stored as orc tblproperties('transactional'='true')");
+ //Tstage is just a simple way to generate test data
+ runStatementOnDriver("create table Tstage (a int, b int) stored as orc");
+ runStatementOnDriver("insert into Tstage values(1,2),(3,4)");
+ //this creates an ORC data file with correct schema under table root
+ runStatementOnDriver("export table Tstage to '" + getWarehouseDir() + "/1'");
+ //"load data local inpath" doesn't delete source files so clean it here
+ runStatementOnDriver("truncate table Tstage");
+ //and do a Load Data into the same table, which should now land in a delta_x_x.
+ // 'data' is created by export command/
+ runStatementOnDriver("load data local inpath '" + getWarehouseDir() + "/1/data' into table T");
+
+ String testQuery = isVectorized ? "select ROW__ID, a, b from T order by ROW__ID" :
+ "select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID";
+ String[][] expected = new String[][]{
+ {"{\"transactionid\":20,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000020_0000020_0000/000000_0"},
+ {"{\"transactionid\":20,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000020_0000020_0000/000000_0"}};
+ checkResult(expected, testQuery, isVectorized, "load data inpath");
+ runStatementOnDriver("update T set b = 17 where a = 1");
+ String[][] expected2 = new String[][]{
+ {"{\"transactionid\":20,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000020_0000020_0000/000000_0"},
+ {"{\"transactionid\":23,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/delta_0000023_0000023_0000/bucket_00000"}
+ };
+ checkResult(expected2, testQuery, isVectorized, "update");
+
+ runStatementOnDriver("insert into T values(2,2)");
+ runStatementOnDriver("delete from T where a = 3");
+ //test minor compaction
+ runStatementOnDriver("alter table T compact 'minor'");
+ TestTxnCommands2.runWorker(hiveConf);
+ String[][] expected3 = new String[][] {
+ {"{\"transactionid\":23,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/delta_0000020_0000027/bucket_00000"},
+ {"{\"transactionid\":26,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/delta_0000020_0000027/bucket_00000"}
+ };
+ checkResult(expected3, testQuery, isVectorized, "delete compact minor");
+
+ runStatementOnDriver("load data local inpath '" + getWarehouseDir() + "/1/data' overwrite into table T");
+ String[][] expected4 = new String[][]{
+ {"{\"transactionid\":31,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/base_0000031/000000_0"},
+ {"{\"transactionid\":31,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/base_0000031/000000_0"}};
+ checkResult(expected4, testQuery, isVectorized, "load data inpath overwrite");
+
+ //load same data again (additive)
+ runStatementOnDriver("load data local inpath '" + getWarehouseDir() + "/1/data' into table T");
+ runStatementOnDriver("update T set b = 17 where a = 1");//matches 2 rows
+ runStatementOnDriver("delete from T where a = 3");//matches 2 rows
+ runStatementOnDriver("insert into T values(2,2)");
+ String[][] expected5 = new String[][]{
+ {"{\"transactionid\":35,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/delta_0000035_0000035_0000/bucket_00000"},
+ {"{\"transactionid\":35,\"bucketid\":536870912,\"rowid\":1}\t1\t17", "t/delta_0000035_0000035_0000/bucket_00000"},
+ {"{\"transactionid\":37,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/delta_0000037_0000037_0000/bucket_00000"}
+ };
+ checkResult(expected5, testQuery, isVectorized, "load data inpath overwrite update");
+
+ //test major compaction
+ runStatementOnDriver("alter table T compact 'major'");
+ TestTxnCommands2.runWorker(hiveConf);
+ String[][] expected6 = new String[][]{
+ {"{\"transactionid\":35,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/base_0000037/bucket_00000"},
+ {"{\"transactionid\":35,\"bucketid\":536870912,\"rowid\":1}\t1\t17", "t/base_0000037/bucket_00000"},
+ {"{\"transactionid\":37,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/base_0000037/bucket_00000"}
+ };
+ checkResult(expected6, testQuery, isVectorized, "load data inpath compact major");
+ }
+ private void loadData(boolean isVectorized) throws Exception {
+ runStatementOnDriver("drop table if exists T");
+ runStatementOnDriver("drop table if exists Tstage");
+ runStatementOnDriver("create table T (a int, b int) stored as orc tblproperties('transactional'='true')");
+ runStatementOnDriver("insert into T values(0,2),(0,4)");
+ //Tstage is just a simple way to generate test data
+ runStatementOnDriver("create table Tstage (a int, b int) stored as orc");
+ runStatementOnDriver("insert into Tstage values(1,2),(3,4)");
+ //this creates an ORC data file with correct schema under table root
+ runStatementOnDriver("export table Tstage to '" + getWarehouseDir() +"/1'");
+ //"load data local inpath" doesn't delete source files so clean it here
+ runStatementOnDriver("truncate table Tstage");
+ //and do a Load Data into the same table, which should now land in a delta_x_x.
+ // 'data' is created by export command/
+ runStatementOnDriver("load data local inpath '" + getWarehouseDir() + "/1/data' into table T");
+
+ String testQuery = isVectorized ? "select ROW__ID, a, b from T order by ROW__ID" :
+ "select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID";
+ String[][] expected = new String[][] {
+ //normal insert
+ {"{\"transactionid\":16,\"bucketid\":536870912,\"rowid\":0}\t0\t2", "t/delta_0000016_0000016_0000/bucket_00000"},
+ {"{\"transactionid\":16,\"bucketid\":536870912,\"rowid\":1}\t0\t4", "t/delta_0000016_0000016_0000/bucket_00000"},
+ //Load Data
+ {"{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000021_0000021_0000/000000_0"},
+ {"{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000021_0000021_0000/000000_0"}};
+ checkResult(expected, testQuery, isVectorized, "load data inpath");
+
+ //test minor compaction
+ runStatementOnDriver("alter table T compact 'minor'");
+ TestTxnCommands2.runWorker(hiveConf);
+ String[][] expected1 = new String[][] {
+ {"{\"transactionid\":16,\"bucketid\":536870912,\"rowid\":0}\t0\t2", "t/delta_0000016_0000021/bucket_00000"},
+ {"{\"transactionid\":16,\"bucketid\":536870912,\"rowid\":1}\t0\t4", "t/delta_0000016_0000021/bucket_00000"},
+ {"{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000016_0000021/bucket_00000"},
+ {"{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000016_0000021/bucket_00000"}
+ };
+ checkResult(expected1, testQuery, isVectorized, "load data inpath (minor)");
+
+ //test major compaction
+ runStatementOnDriver("insert into T values(2,2)");
+ runStatementOnDriver("alter table T compact 'major'");
+ TestTxnCommands2.runWorker(hiveConf);
+ String[][] expected2 = new String[][] {
+ {"{\"transactionid\":16,\"bucketid\":536870912,\"rowid\":0}\t0\t2", "t/base_0000027/bucket_00000"},
+ {"{\"transactionid\":16,\"bucketid\":536870912,\"rowid\":1}\t0\t4", "t/base_0000027/bucket_00000"},
+ {"{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/base_0000027/bucket_00000"},
+ {"{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/base_0000027/bucket_00000"},
+ {"{\"transactionid\":27,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/base_0000027/bucket_00000"}
+ };
+ checkResult(expected2, testQuery, isVectorized, "load data inpath (major)");
+
+ //create more staging data and test Load Data Overwrite
+ runStatementOnDriver("insert into Tstage values(5,6),(7,8)");
+ runStatementOnDriver("export table Tstage to '" + getWarehouseDir() +"/2'");
+ runStatementOnDriver("load data inpath '" + getWarehouseDir() + "/2/data' overwrite into table T");
+ String[][] expected3 = new String[][] {
+ {"{\"transactionid\":33,\"bucketid\":536870912,\"rowid\":0}\t5\t6", "t/base_0000033/000000_0"},
+ {"{\"transactionid\":33,\"bucketid\":536870912,\"rowid\":1}\t7\t8", "t/base_0000033/000000_0"}};
+ checkResult(expected3, testQuery, isVectorized, "load data inpath overwrite");
+
+ //one more major compaction
+ runStatementOnDriver("insert into T values(6,6)");
+ runStatementOnDriver("alter table T compact 'major'");
+ TestTxnCommands2.runWorker(hiveConf);
+ String[][] expected4 = new String[][] {
+ {"{\"transactionid\":33,\"bucketid\":536870912,\"rowid\":0}\t5\t6", "t/base_0000036/bucket_00000"},
+ {"{\"transactionid\":33,\"bucketid\":536870912,\"rowid\":1}\t7\t8", "t/base_0000036/bucket_00000"},
+ {"{\"transactionid\":36,\"bucketid\":536870912,\"rowid\":0}\t6\t6", "t/base_0000036/bucket_00000"}};
+ checkResult(expected4, testQuery, isVectorized, "load data inpath overwrite (major)");
+ }
+ /**
+ * Load Data [overwrite] in to an (un-)partitioned acid converted table
+ */
+ private void loadDataNonAcid2AcidConversion(boolean isVectorized) throws Exception {
+ runStatementOnDriver("drop table if exists T");
+ runStatementOnDriver("drop table if exists Tstage");
+ runStatementOnDriver("create table T (a int, b int) stored as orc");
+ //per acid write to test nonAcid2acid conversion mixed with load data
+ runStatementOnDriver("insert into T values(0,2),(0,4)");
+ runStatementOnDriver("create table Tstage (a int, b int) stored as orc");
+ runStatementOnDriver("insert into Tstage values(1,2),(3,4)");
+ //make 2 more inserts so that we have 000000_0_copy_1, 000000_0_copy_2 files in export
+ //export works at file level so if you have copy_N in the table dir, you'll have those in output
+ runStatementOnDriver("insert into Tstage values(2,2),(3,3)");
+ runStatementOnDriver("insert into Tstage values(4,4),(5,5)");
+ //create a file we'll import later
+ runStatementOnDriver("export table Tstage to '" + getWarehouseDir() +"/1'");
+ runStatementOnDriver("truncate table Tstage");//clean the staging table
+
+ //now convert T to acid
+ runStatementOnDriver("alter table T SET TBLPROPERTIES ('transactional' = 'true')");
+ //and do a Load Data into the same table, which should now land in a delta/
+ // (with 000000_0, 000000_0_copy_1, 000000_0_copy_2)
+ runStatementOnDriver("load data local inpath '" + getWarehouseDir() + "/1/data' into table T");
+
+ String testQuery = isVectorized ? "select ROW__ID, a, b from T order by ROW__ID" :
+ "select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID";
+
+ String[][] expected = new String[][] {
+ //from pre-acid insert
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t0\t2", "t/000000_0"},
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t0\t4", "t/000000_0"},
+ //from Load Data into acid converted table
+ {"{\"transactionid\":24,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000024_0000024_0000/000000_0"},
+ {"{\"transactionid\":24,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000024_0000024_0000/000000_0"},
+ {"{\"transactionid\":24,\"bucketid\":536870912,\"rowid\":2}\t2\t2", "t/delta_0000024_0000024_0000/000000_0_copy_1"},
+ {"{\"transactionid\":24,\"bucketid\":536870912,\"rowid\":3}\t3\t3", "t/delta_0000024_0000024_0000/000000_0_copy_1"},
+ {"{\"transactionid\":24,\"bucketid\":536870912,\"rowid\":4}\t4\t4", "t/delta_0000024_0000024_0000/000000_0_copy_2"},
+ {"{\"transactionid\":24,\"bucketid\":536870912,\"rowid\":5}\t5\t5", "t/delta_0000024_0000024_0000/000000_0_copy_2"},
+ };
+ checkResult(expected, testQuery, isVectorized, "load data inpath");
+
+
+ //create more staging data with copy_N files and do LD+Overwrite
+ runStatementOnDriver("insert into Tstage values(5,6),(7,8)");
+ runStatementOnDriver("insert into Tstage values(8,8)");
+ runStatementOnDriver("export table Tstage to '" + getWarehouseDir() +"/2'");
+ runStatementOnDriver("load data local inpath '" + getWarehouseDir() + "/2/data' overwrite into table T");
+
+ String[][] expected2 = new String[][] {
+ {"{\"transactionid\":30,\"bucketid\":536870912,\"rowid\":0}\t5\t6", "t/base_0000030/000000_0"},
+ {"{\"transactionid\":30,\"bucketid\":536870912,\"rowid\":1}\t7\t8", "t/base_0000030/000000_0"},
+ {"{\"transactionid\":30,\"bucketid\":536870912,\"rowid\":2}\t8\t8", "t/base_0000030/000000_0_copy_1"}
+ };
+ checkResult(expected2, testQuery, isVectorized, "load data inpath overwrite");
+
+ //create 1 more delta_x_x so that compactor has > dir file to compact
+ runStatementOnDriver("insert into T values(9,9)");
+ runStatementOnDriver("alter table T compact 'major'");
+ TestTxnCommands2.runWorker(hiveConf);
+
+ String[][] expected3 = new String[][] {
+ {"{\"transactionid\":30,\"bucketid\":536870912,\"rowid\":0}\t5\t6", "t/base_0000033/bucket_00000"},
+ {"{\"transactionid\":30,\"bucketid\":536870912,\"rowid\":1}\t7\t8", "t/base_0000033/bucket_00000"},
+ {"{\"transactionid\":30,\"bucketid\":536870912,\"rowid\":2}\t8\t8", "t/base_0000033/bucket_00000"},
+ {"{\"transactionid\":33,\"bucketid\":536870912,\"rowid\":0}\t9\t9", "t/base_0000033/bucket_00000"}
+
+ };
+ checkResult(expected3, testQuery, isVectorized, "load data inpath overwrite (major)");
+ }
+ /**
+ * Load Data [overwrite] in to a partitioned transactional table
+ */
+ @Test
+ public void loadDataPartitioned() throws Exception {
+ runStatementOnDriver("drop table if exists T");
+ runStatementOnDriver("drop table if exists Tstage");
+ runStatementOnDriver("create table T (a int, b int) partitioned by (p int) stored as orc tblproperties('transactional'='true')");
+ runStatementOnDriver("create table Tstage (a int, b int) stored as orc");
+
+ runStatementOnDriver("insert into Tstage values(0,2),(0,4)");
+ runStatementOnDriver("export table Tstage to '" + getWarehouseDir() +"/1'");
+ runStatementOnDriver("truncate table Tstage");//because 'local' inpath doesn't delete source files
+ runStatementOnDriver("load data local inpath '" + getWarehouseDir() + "/1/data' into table T partition(p=0)");
+
+ runStatementOnDriver("insert into Tstage values(1,2),(1,4)");
+ runStatementOnDriver("export table Tstage to '" + getWarehouseDir() +"/2'");
+ runStatementOnDriver("truncate table Tstage");
+ runStatementOnDriver("load data local inpath '" + getWarehouseDir() + "/2/data' into table T partition(p=1)");
+
+ runStatementOnDriver("insert into Tstage values(2,2),(2,4)");
+ runStatementOnDriver("export table Tstage to '" + getWarehouseDir() +"/3'");
+ runStatementOnDriver("truncate table Tstage");
+ runStatementOnDriver("load data local inpath '" + getWarehouseDir() + "/3/data' into table T partition(p=1)");
+
+ List<String> rs = runStatementOnDriver("select ROW__ID, p, a, b, INPUT__FILE__NAME from T order by p, ROW__ID");
+ String[][] expected = new String[][] {
+ {"{\"transactionid\":20,\"bucketid\":536870912,\"rowid\":0}\t0\t0\t2", "t/p=0/delta_0000020_0000020_0000/000000_0"},
+ {"{\"transactionid\":20,\"bucketid\":536870912,\"rowid\":1}\t0\t0\t4", "t/p=0/delta_0000020_0000020_0000/000000_0"},
+ {"{\"transactionid\":24,\"bucketid\":536870912,\"rowid\":0}\t1\t1\t2", "t/p=1/delta_0000024_0000024_0000/000000_0"},
+ {"{\"transactionid\":24,\"bucketid\":536870912,\"rowid\":1}\t1\t1\t4", "t/p=1/delta_0000024_0000024_0000/000000_0"},
+ {"{\"transactionid\":28,\"bucketid\":536870912,\"rowid\":0}\t1\t2\t2", "t/p=1/delta_0000028_0000028_0000/000000_0"},
+ {"{\"transactionid\":28,\"bucketid\":536870912,\"rowid\":1}\t1\t2\t4", "t/p=1/delta_0000028_0000028_0000/000000_0"}};
+ checkExpected(rs, expected, "load data inpath partitioned");
+
+
+ runStatementOnDriver("insert into Tstage values(5,2),(5,4)");
+ runStatementOnDriver("export table Tstage to '" + getWarehouseDir() +"/4'");
+ runStatementOnDriver("truncate table Tstage");
+ runStatementOnDriver("load data inpath '" + getWarehouseDir() + "/4/data' overwrite into table T partition(p=1)");
+ String[][] expected2 = new String[][] {
+ {"{\"transactionid\":20,\"bucketid\":536870912,\"rowid\":0}\t0\t0\t2", "t/p=0/delta_0000020_0000020_0000/000000_0"},
+ {"{\"transactionid\":20,\"bucketid\":536870912,\"rowid\":1}\t0\t0\t4", "t/p=0/delta_0000020_0000020_0000/000000_0"},
+ {"{\"transactionid\":33,\"bucketid\":536870912,\"rowid\":0}\t1\t5\t2", "t/p=1/base_0000033/000000_0"},
+ {"{\"transactionid\":33,\"bucketid\":536870912,\"rowid\":1}\t1\t5\t4", "t/p=1/base_0000033/000000_0"}};
+ rs = runStatementOnDriver("select ROW__ID, p, a, b, INPUT__FILE__NAME from T order by p, ROW__ID");
+ checkExpected(rs, expected2, "load data inpath partitioned overwrite");
+ }
+
+ /**
+ * By default you can't load into bucketed tables. Things will break badly in acid (data loss, etc)
+ * if loaded data is not bucketed properly. This test is to capture that this is still the default.
+ * If the default is changed, Load Data should probably do more validation to ensure data is
+ * properly distributed into files and files are named correctly.
+ */
+ @Test
+ public void testValidations() throws Exception {
+ runStatementOnDriver("drop table if exists T");
+ runStatementOnDriver("drop table if exists Tstage");
+ runStatementOnDriver("create table T (a int, b int) clustered by (a) into 2 buckets stored as orc tblproperties('transactional'='true')");
+ File createdFile= folder.newFile("myfile.txt");
+ FileUtils.writeStringToFile(createdFile, "hello world");
+ runStatementOnDriver("create table Tstage (a int, b int) stored as orc");
+ //this creates an ORC data file with correct schema under table root
+ runStatementOnDriver("insert into Tstage values(1,2),(3,4)");
+ CommandProcessorResponse cpr = runStatementOnDriverNegative("load data local inpath '" + getWarehouseDir() + "' into table T");
+ Assert.assertTrue(cpr.getErrorMessage().contains("Load into bucketed tables are disabled"));
+ }
+ private void checkExpected(List<String> rs, String[][] expected, String msg) {
+ super.checkExpected(rs, expected, msg, LOG, true);
+ }
+ @Test
+ public void testMMOrcTable() throws Exception {
+ runStatementOnDriver("drop table if exists T");
+ runStatementOnDriver("create table T (a int, b int) stored as orc tblproperties('transactional'='true', 'transactional_properties'='insert_only')");
+ int[][] values = {{1,2},{3,4}};
+ runStatementOnDriver("insert into T " + makeValuesClause(values));
+ List<String> rs = runStatementOnDriver("select a, b from T order by b");
+ Assert.assertEquals("", stringifyValues(values), rs);
+ }
+
+ /**
+ * Make sure Load Data assigns ROW_IDs correctly when there is statementId suffix on delta dir
+ * For example, delta_x_x_0001.
+ */
+ private void testMultiStatement(boolean isVectorized) throws Exception {
+ runStatementOnDriver("drop table if exists T");
+ runStatementOnDriver("drop table if exists Tstage");
+ runStatementOnDriver("create table T (a int, b int) stored as orc tblproperties('transactional'='true')");
+ //Tstage is just a simple way to generate test data
+ runStatementOnDriver("create table Tstage (a int, b int) stored as orc");
+ runStatementOnDriver("insert into Tstage values(5,5),(6,6)");
+ //this creates an ORC data file with correct schema under table root
+ runStatementOnDriver("export table Tstage to '" + getWarehouseDir() + "/1'");
+ //and do a Load Data into the same table, which should now land in a delta_x_x.
+ // 'data' is created by export command/
+ runStatementOnDriver("START TRANSACTION");
+ //statementId = 0
+ runStatementOnDriver("insert into T values(1,2),(3,4)");
+ //statementId = 1
+ runStatementOnDriver("load data local inpath '" + getWarehouseDir() + "/1/data' into table T");
+ runStatementOnDriver("COMMIT");
+
+ String testQuery = isVectorized ? "select ROW__ID, a, b from T order by ROW__ID" :
+ "select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID";
+ String[][] expected = new String[][] {
+ {"{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000019_0000019_0000/bucket_00000"},
+ {"{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000019_0000019_0000/bucket_00000"},
+ {"{\"transactionid\":19,\"bucketid\":536870913,\"rowid\":0}\t5\t5", "t/delta_0000019_0000019_0001/000000_0"},
+ {"{\"transactionid\":19,\"bucketid\":536870913,\"rowid\":1}\t6\t6", "t/delta_0000019_0000019_0001/000000_0"}
+ };
+ checkResult(expected, testQuery, isVectorized, "load data inpath");
+
+ runStatementOnDriver("alter table T compact 'major'");
+ TestTxnCommands2.runWorker(hiveConf);
+ String[][] expected2 = new String[][] {
+ {"{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/base_0000019/bucket_00000"},
+ {"{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/base_0000019/bucket_00000"},
+ {"{\"transactionid\":19,\"bucketid\":536870913,\"rowid\":0}\t5\t5", "t/base_0000019/bucket_00000"},
+ {"{\"transactionid\":19,\"bucketid\":536870913,\"rowid\":1}\t6\t6", "t/base_0000019/bucket_00000"}
+ };
+ checkResult(expected2, testQuery, isVectorized, "load data inpath (major)");
+ //at lest for now, Load Data w/Overwrite is not allowed in a txn: HIVE-18154
+ }
+ @Test
+ public void testAbort() throws Exception {
+ boolean isVectorized = false;
+ runStatementOnDriver("drop table if exists T");
+ runStatementOnDriver("drop table if exists Tstage");
+ runStatementOnDriver("create table T (a int, b int) stored as orc tblproperties('transactional'='true')");
+ //Tstage is just a simple way to generate test data
+ runStatementOnDriver("create table Tstage (a int, b int) stored as orc");
+ runStatementOnDriver("insert into Tstage values(5,5),(6,6)");
+ //this creates an ORC data file with correct schema under table root
+ runStatementOnDriver("export table Tstage to '" + getWarehouseDir() + "/1'");
+ //and do a Load Data into the same table, which should now land in a delta_x_x.
+ // 'data' is created by export command/
+ runStatementOnDriver("insert into T values(1,2),(3,4)");
+ runStatementOnDriver("START TRANSACTION");
+ runStatementOnDriver("load data local inpath '" + getWarehouseDir() + "/1/data' into table T");
+ runStatementOnDriver("ROLLBACK");
+
+ String testQuery = isVectorized ? "select ROW__ID, a, b from T order by ROW__ID" :
+ "select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID";
+ String[][] expected = new String[][] {
+ {"{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000019_0000019_0000/bucket_00000"},
+ {"{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000019_0000019_0000/bucket_00000"}
+ };
+ checkResult(expected, testQuery, isVectorized, "load data inpath");
+ }
+ /**
+ * We have to use a different query to check results for Vectorized tests because to get the
+ * file name info we need to use {@link org.apache.hadoop.hive.ql.metadata.VirtualColumn#FILENAME}
+ * which will currently make the query non-vectorizable. This means we can't check the file name
+ * for vectorized version of the test.
+ */
+ private void checkResult(String[][] expectedResult, String query, boolean isVectorized, String msg) throws Exception{
+ List<String> rs = runStatementOnDriver(query);
+ checkExpected(rs, expectedResult, msg + (isVectorized ? " vect" : ""), LOG, !isVectorized);
+ assertVectorized(isVectorized, query);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java
index 7f5e091..f5f8cc8 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java
@@ -264,18 +264,6 @@ ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree /Users/ekoifman/dev/hiver
};
checkExpected(rs, expected, "Unexpected row count after ctas");
}
- private void checkExpected(List<String> rs, String[][] expected, String msg) {
- LOG.warn(testName.getMethodName() + ": read data(" + msg + "): ");
- for(String s : rs) {
- LOG.warn(s);
- }
- Assert.assertEquals( testName.getMethodName() + ": " + msg, expected.length, rs.size());
- //verify data and layout
- for(int i = 0; i < expected.length; i++) {
- Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected[i][0]));
- Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith(expected[i][1]));
- }
- }
/**
* The idea here is to create a non acid table that was written by multiple writers, i.e.
* unbucketed table that has 000000_0 & 000001_0, for example.
@@ -363,7 +351,9 @@ ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree /Users/ekoifman/dev/hiver
Assert.assertEquals(2, BucketCodec.determineVersion(537001984).decodeWriterId(537001984));
Assert.assertEquals(1, BucketCodec.determineVersion(536936448).decodeWriterId(536936448));
+ assertVectorized(true, "update T set b = 88 where b = 80");
runStatementOnDriver("update T set b = 88 where b = 80");
+ assertVectorized(true, "delete from T where b = 8");
runStatementOnDriver("delete from T where b = 8");
String expected3[][] = {
{"{\"transactionid\":0,\"bucketid\":537001984,\"rowid\":0}\t1\t2", "warehouse/t/000002_0"},
@@ -374,7 +364,7 @@ ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree /Users/ekoifman/dev/hiver
{"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t12\t12", "warehouse/t/000000_0_copy_1"},
{"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t20\t40", "warehouse/t/HIVE_UNION_SUBDIR_15/000000_0"},
{"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}\t50\t60", "warehouse/t/HIVE_UNION_SUBDIR_16/000000_0"},
- {"{\"transactionid\":23,\"bucketid\":536870912,\"rowid\":0}\t60\t88", "warehouse/t/delta_0000023_0000023_0000/bucket_00000"},
+ {"{\"transactionid\":24,\"bucketid\":536870912,\"rowid\":0}\t60\t88", "warehouse/t/delta_0000024_0000024_0000/bucket_00000"},
};
rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from T order by a, b, INPUT__FILE__NAME");
checkExpected(rs, expected3,"after converting to acid (no compaction with updates)");
@@ -386,15 +376,15 @@ ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree /Users/ekoifman/dev/hiver
/*Compaction preserves location of rows wrt buckets/tranches (for now)*/
String expected4[][] = {
- {"{\"transactionid\":0,\"bucketid\":537001984,\"rowid\":0}\t1\t2", "warehouse/t/base_0000024/bucket_00002"},
- {"{\"transactionid\":0,\"bucketid\":537001984,\"rowid\":1}\t2\t4", "warehouse/t/base_0000024/bucket_00002"},
- {"{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":1}\t5\t6", "warehouse/t/base_0000024/bucket_00001"},
- {"{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":0}\t9\t10", "warehouse/t/base_0000024/bucket_00001"},
- {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":2}\t10\t20", "warehouse/t/base_0000024/bucket_00000"},
- {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t12\t12", "warehouse/t/base_0000024/bucket_00000"},
- {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t20\t40", "warehouse/t/base_0000024/bucket_00000"},
- {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}\t50\t60", "warehouse/t/base_0000024/bucket_00000"},
- {"{\"transactionid\":23,\"bucketid\":536870912,\"rowid\":0}\t60\t88", "warehouse/t/base_0000024/bucket_00000"},
+ {"{\"transactionid\":0,\"bucketid\":537001984,\"rowid\":0}\t1\t2", "warehouse/t/base_0000026/bucket_00002"},
+ {"{\"transactionid\":0,\"bucketid\":537001984,\"rowid\":1}\t2\t4", "warehouse/t/base_0000026/bucket_00002"},
+ {"{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":1}\t5\t6", "warehouse/t/base_0000026/bucket_00001"},
+ {"{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":0}\t9\t10", "warehouse/t/base_0000026/bucket_00001"},
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":2}\t10\t20", "warehouse/t/base_0000026/bucket_00000"},
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t12\t12", "warehouse/t/base_0000026/bucket_00000"},
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t20\t40", "warehouse/t/base_0000026/bucket_00000"},
+ {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}\t50\t60", "warehouse/t/base_0000026/bucket_00000"},
+ {"{\"transactionid\":24,\"bucketid\":536870912,\"rowid\":0}\t60\t88", "warehouse/t/base_0000026/bucket_00000"},
};
checkExpected(rs, expected4,"after major compact");
}
@@ -635,15 +625,8 @@ ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree /Users/ekoifman/dev/hiver
//vectorized because there is INPUT__FILE__NAME
assertVectorized(false, query);
}
- private void assertVectorized(boolean vectorized, String query) throws Exception {
- List<String> rs = runStatementOnDriver("EXPLAIN VECTORIZATION DETAIL " + query);
- for(String line : rs) {
- if(line != null && line.contains("Execution mode: vectorized")) {
- Assert.assertTrue("Was vectorized when it wasn't expected", vectorized);
- return;
- }
- }
- Assert.assertTrue("Din't find expected 'vectorized' in plan", !vectorized);
+ private void checkExpected(List<String> rs, String[][] expected, String msg) {
+ super.checkExpected(rs, expected, msg, LOG, true);
}
/**
* HIVE-17900
http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
index 9f31eb1..6a2164f 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
@@ -33,6 +33,7 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.rules.TestName;
+import org.slf4j.Logger;
import java.io.File;
import java.util.ArrayList;
@@ -74,7 +75,6 @@ public abstract class TxnCommandsBaseForTests {
hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, getWarehouseDir());
- hiveConf.setVar(HiveConf.ConfVars.HIVEMAPREDMODE, "nonstrict");
hiveConf.setVar(HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName());
hiveConf
.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
@@ -151,6 +151,21 @@ public abstract class TxnCommandsBaseForTests {
}
throw new RuntimeException("Didn't get expected failure!");
}
+
+ /**
+ * Runs Vectorized Explain on the query and checks if the plan is vectorized as expected
+ * @param vectorized {@code true} - assert that it's vectorized
+ */
+ void assertVectorized(boolean vectorized, String query) throws Exception {
+ List<String> rs = runStatementOnDriver("EXPLAIN VECTORIZATION DETAIL " + query);
+ for(String line : rs) {
+ if(line != null && line.contains("Execution mode: vectorized")) {
+ Assert.assertTrue("Was vectorized when it wasn't expected", vectorized);
+ return;
+ }
+ }
+ Assert.assertTrue("Din't find expected 'vectorized' in plan", !vectorized);
+ }
/**
* Will assert that actual files match expected.
* @param expectedFiles - suffixes of expected Paths. Must be the same length
@@ -176,4 +191,18 @@ public abstract class TxnCommandsBaseForTests {
}
Assert.assertEquals("Unexpected file list", expectedFiles, actualFiles);
}
+ void checkExpected(List<String> rs, String[][] expected, String msg, Logger LOG, boolean checkFileName) {
+ LOG.warn(testName.getMethodName() + ": read data(" + msg + "): ");
+ for(String s : rs) {
+ LOG.warn(s);
+ }
+ Assert.assertEquals( testName.getMethodName() + ": " + msg, expected.length, rs.size());
+ //verify data and layout
+ for(int i = 0; i < expected.length; i++) {
+ Assert.assertTrue("Actual line (data) " + i + " data: " + rs.get(i), rs.get(i).startsWith(expected[i][0]));
+ if(checkFileName) {
+ Assert.assertTrue("Actual line(file) " + i + " file: " + rs.get(i), rs.get(i).endsWith(expected[i][1]));
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java
index d5ab079..afccf64 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java
@@ -142,7 +142,7 @@ public class TestExecDriver extends TestCase {
db.createTable(src, cols, null, TextInputFormat.class,
HiveIgnoreKeyTextOutputFormat.class);
db.loadTable(hadoopDataFile[i], src, LoadFileType.KEEP_EXISTING,
- true, false, false, false, null, 0, false);
+ true, false, false, false, null, 0);
i++;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
index ccd7d8e..5d26524 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
@@ -836,16 +836,22 @@ public class TestInputOutputFormat {
public void testEtlCombinedStrategy() throws Exception {
conf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, "ETL");
conf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_DIRECTORY_BATCH_MS.varname, "1000000");
+ AcidUtils.setTransactionalTableScan(conf, true);
+ conf.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true);
+ conf.set(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES, "default");
+
OrcInputFormat.Context context = new OrcInputFormat.Context(conf);
MockFileSystem fs = new MockFileSystem(conf,
new MockFile("mock:/a/1/part-00", 1000, new byte[1]),
new MockFile("mock:/a/1/part-01", 1000, new byte[1]),
new MockFile("mock:/a/2/part-00", 1000, new byte[1]),
new MockFile("mock:/a/2/part-01", 1000, new byte[1]),
- new MockFile("mock:/a/3/base_0/1", 1000, new byte[1]),
- new MockFile("mock:/a/4/base_0/1", 1000, new byte[1]),
- new MockFile("mock:/a/5/base_0/1", 1000, new byte[1]),
- new MockFile("mock:/a/5/delta_0_25/1", 1000, new byte[1])
+ new MockFile("mock:/a/3/base_0/bucket_00001", 1000, new byte[1]),
+ new MockFile("mock:/a/4/base_0/bucket_00001", 1000, new byte[1]),
+ new MockFile("mock:/a/5/base_0/bucket_00001", 1000, new byte[1]),
+ new MockFile("mock:/a/5/delta_0_25/bucket_00001", 1000, new byte[1]),
+ new MockFile("mock:/a/6/delta_27_29/bucket_00001", 1000, new byte[1]),
+ new MockFile("mock:/a/6/delete_delta_27_29/bucket_00001", 1000, new byte[1])
);
OrcInputFormat.CombinedCtx combineCtx = new OrcInputFormat.CombinedCtx();
@@ -891,20 +897,27 @@ public class TestInputOutputFormat {
assertTrue(combineCtx.combined instanceof OrcInputFormat.ETLSplitStrategy);
assertEquals(2, etlSs.files.size());
assertEquals(2, etlSs.dirs.size());
- // The fifth will not be combined because of delta files.
+ // The fifth could be combined again.
ss = createOrCombineStrategies(context, fs, "mock:/a/5", combineCtx);
+ assertTrue(ss.isEmpty());
+ assertTrue(combineCtx.combined instanceof OrcInputFormat.ETLSplitStrategy);
+ assertEquals(4, etlSs.files.size());
+ assertEquals(3, etlSs.dirs.size());
+
+ // The sixth will not be combined because of delete delta files. Is that desired? HIVE-18110
+ ss = createOrCombineStrategies(context, fs, "mock:/a/6", combineCtx);
assertEquals(1, ss.size());
assertTrue(ss.get(0) instanceof OrcInputFormat.ETLSplitStrategy);
assertNotSame(etlSs, ss);
- assertEquals(2, etlSs.files.size());
- assertEquals(2, etlSs.dirs.size());
+ assertEquals(4, etlSs.files.size());
+ assertEquals(3, etlSs.dirs.size());
}
public List<SplitStrategy<?>> createOrCombineStrategies(OrcInputFormat.Context context,
MockFileSystem fs, String path, OrcInputFormat.CombinedCtx combineCtx) throws IOException {
OrcInputFormat.AcidDirInfo adi = createAdi(context, fs, path);
return OrcInputFormat.determineSplitStrategies(combineCtx, context,
- adi.fs, adi.splitPath, adi.baseFiles, adi.parsedDeltas,
+ adi.fs, adi.splitPath, adi.baseFiles, adi.deleteEvents,
null, null, true);
}
@@ -918,7 +931,7 @@ public class TestInputOutputFormat {
OrcInputFormat.Context context, OrcInputFormat.FileGenerator gen) throws IOException {
OrcInputFormat.AcidDirInfo adi = gen.call();
return OrcInputFormat.determineSplitStrategies(
- null, context, adi.fs, adi.splitPath, adi.baseFiles, adi.parsedDeltas,
+ null, context, adi.fs, adi.splitPath, adi.baseFiles, adi.deleteEvents,
null, null, true);
}
@@ -3586,10 +3599,14 @@ public class TestInputOutputFormat {
readOpsDelta = statistics.getReadOps() - readOpsBefore;
}
}
- // call-1: open to read data - split 1 => mock:/mocktable8/0_0
- // call-2: split 2 - find hive.acid.key.index in footer of delta_x_y/bucket_00001
- // call-3: split 2 - read delta_x_y/bucket_00001
- assertEquals(5, readOpsDelta);
+ // call-1: open(mock:/mocktable7/0_0)
+ // call-2: open(mock:/mocktable7/0_0)
+ // call-3: listLocatedFileStatuses(mock:/mocktable7)
+ // call-4: getFileStatus(mock:/mocktable7/delta_0000001_0000001_0000/_metadata_acid)
+ // call-5: open(mock:/mocktable7/delta_0000001_0000001_0000/bucket_00001)
+ // call-6: getFileStatus(mock:/mocktable7/delta_0000001_0000001_0000/_metadata_acid)
+ // call-7: open(mock:/mocktable7/delta_0000001_0000001_0000/bucket_00001)
+ assertEquals(7, readOpsDelta);
// revert back to local fs
conf.set("fs.defaultFS", "file:///");
@@ -3662,9 +3679,11 @@ public class TestInputOutputFormat {
}
}
// call-1: open to read data - split 1 => mock:/mocktable8/0_0
- // call-2: split 2 - find hive.acid.key.index in footer of delta_x_y/bucket_00001
- // call-3: split 2 - read delta_x_y/bucket_00001
- assertEquals(3, readOpsDelta);
+ // call-2: listLocatedFileStatus(mock:/mocktable8)
+ // call-3: getFileStatus(mock:/mocktable8/delta_0000001_0000001_0000/_metadata_acid)
+ // call-4: getFileStatus(mock:/mocktable8/delta_0000001_0000001_0000/_metadata_acid)
+ // call-5: open(mock:/mocktable8/delta_0000001_0000001_0000/bucket_00001)
+ assertEquals(5, readOpsDelta);
// revert back to local fs
conf.set("fs.defaultFS", "file:///");
http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
index 9628a40..030f012 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
@@ -298,7 +298,7 @@ public class TestOrcRawRecordMerger {
int BUCKET = 10;
ReaderKey key = new ReaderKey();
Configuration conf = new Configuration();
- int bucketProperty = OrcRawRecordMerger.encodeBucketId(conf, BUCKET);
+ int bucketProperty = OrcRawRecordMerger.encodeBucketId(conf, BUCKET, 0);
Reader reader = createMockOriginalReader();
RecordIdentifier minKey = new RecordIdentifier(0, bucketProperty, 1);
RecordIdentifier maxKey = new RecordIdentifier(0, bucketProperty, 3);
@@ -308,7 +308,7 @@ public class TestOrcRawRecordMerger {
fs.makeQualified(root);
fs.create(root);
ReaderPair pair = new OrcRawRecordMerger.OriginalReaderPairToRead(key, reader, BUCKET, minKey, maxKey,
- new Reader.Options().include(includes), new OrcRawRecordMerger.Options().rootPath(root), conf, new ValidReadTxnList());
+ new Reader.Options().include(includes), new OrcRawRecordMerger.Options().rootPath(root), conf, new ValidReadTxnList(), 0);
RecordReader recordReader = pair.getRecordReader();
assertEquals(0, key.getTransactionId());
assertEquals(bucketProperty, key.getBucketProperty());
@@ -338,13 +338,13 @@ public class TestOrcRawRecordMerger {
ReaderKey key = new ReaderKey();
Reader reader = createMockOriginalReader();
Configuration conf = new Configuration();
- int bucketProperty = OrcRawRecordMerger.encodeBucketId(conf, BUCKET);
+ int bucketProperty = OrcRawRecordMerger.encodeBucketId(conf, BUCKET, 0);
FileSystem fs = FileSystem.getLocal(conf);
Path root = new Path(tmpDir, "testOriginalReaderPairNoMin");
fs.makeQualified(root);
fs.create(root);
ReaderPair pair = new OrcRawRecordMerger.OriginalReaderPairToRead(key, reader, BUCKET, null, null,
- new Reader.Options(), new OrcRawRecordMerger.Options().rootPath(root), conf, new ValidReadTxnList());
+ new Reader.Options(), new OrcRawRecordMerger.Options().rootPath(root), conf, new ValidReadTxnList(), 0);
assertEquals("first", value(pair.nextRecord()));
assertEquals(0, key.getTransactionId());
assertEquals(bucketProperty, key.getBucketProperty());
@@ -835,6 +835,8 @@ public class TestOrcRawRecordMerger {
assertEquals(null, merger.getMaxKey());
assertEquals(true, merger.next(id, event));
+ //minor comp, so we ignore 'base_0000100' files so all Deletes end up first since
+ // they all modify primordial rows
assertEquals(OrcRecordUpdater.DELETE_OPERATION,
OrcRecordUpdater.getOperation(event));
assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 0, 200), id);
@@ -891,10 +893,10 @@ public class TestOrcRawRecordMerger {
baseReader = OrcFile.createReader(basePath,
OrcFile.readerOptions(conf));
merger =
- new OrcRawRecordMerger(conf, true, baseReader, false, BUCKET,
+ new OrcRawRecordMerger(conf, true, null, false, BUCKET,
createMaximalTxnList(), new Reader.Options(),
AcidUtils.getPaths(directory.getCurrentDirectories()), new OrcRawRecordMerger.Options()
- .isCompacting(true).isMajorCompaction(true));
+ .isCompacting(true).isMajorCompaction(true).baseDir(new Path(root, "base_0000100")));
assertEquals(null, merger.getMinKey());
assertEquals(null, merger.getMaxKey());
http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
index b2ac687..95e3463 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
@@ -48,7 +48,7 @@ import org.apache.hadoop.mapred.Reporter;
import org.apache.orc.TypeDescription;
import org.junit.Before;
import org.junit.Test;
-import org.mockito.Mockito;
+
/**
* This class tests the VectorizedOrcAcidRowBatchReader by creating an actual split and a set
* of delete delta files. The split is on an insert delta and there are multiple delete deltas
@@ -186,7 +186,7 @@ public class TestVectorizedOrcAcidRowBatchReader {
OrcInputFormat.FileGenerator gen = new OrcInputFormat.FileGenerator(context, fs, root, false, null);
OrcInputFormat.AcidDirInfo adi = gen.call();
List<OrcInputFormat.SplitStrategy<?>> splitStrategies = OrcInputFormat.determineSplitStrategies(
- null, context, adi.fs, adi.splitPath, adi.baseFiles, adi.parsedDeltas,
+ null, context, adi.fs, adi.splitPath, adi.baseFiles, adi.deleteEvents,
null, null, true);
assertEquals(1, splitStrategies.size());
List<OrcSplit> splits = ((OrcInputFormat.ACIDSplitStrategy)splitStrategies.get(0)).getSplits();
http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/test/queries/clientnegative/load_data_into_acid.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientnegative/load_data_into_acid.q b/ql/src/test/queries/clientnegative/load_data_into_acid.q
index fba1496..2ac5b56 100644
--- a/ql/src/test/queries/clientnegative/load_data_into_acid.q
+++ b/ql/src/test/queries/clientnegative/load_data_into_acid.q
@@ -1,7 +1,5 @@
-set hive.strict.checks.bucketing=false;
set hive.support.concurrency=true;
set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
-set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
create table acid_ivot(
ctinyint TINYINT,
@@ -15,7 +13,7 @@ create table acid_ivot(
ctimestamp1 TIMESTAMP,
ctimestamp2 TIMESTAMP,
cboolean1 BOOLEAN,
- cboolean2 BOOLEAN) clustered by (cint) into 1 buckets stored as orc TBLPROPERTIES ('transactional'='true');
+ cboolean2 BOOLEAN) stored as orc TBLPROPERTIES ('transactional'='true');
LOAD DATA LOCAL INPATH "../../data/files/alltypesorc" into table acid_ivot;
http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/test/results/clientnegative/load_data_into_acid.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientnegative/load_data_into_acid.q.out b/ql/src/test/results/clientnegative/load_data_into_acid.q.out
index cd829ba..46b5cdd 100644
--- a/ql/src/test/results/clientnegative/load_data_into_acid.q.out
+++ b/ql/src/test/results/clientnegative/load_data_into_acid.q.out
@@ -10,7 +10,7 @@ PREHOOK: query: create table acid_ivot(
ctimestamp1 TIMESTAMP,
ctimestamp2 TIMESTAMP,
cboolean1 BOOLEAN,
- cboolean2 BOOLEAN) clustered by (cint) into 1 buckets stored as orc TBLPROPERTIES ('transactional'='true')
+ cboolean2 BOOLEAN) stored as orc TBLPROPERTIES ('transactional'='true')
PREHOOK: type: CREATETABLE
PREHOOK: Output: database:default
PREHOOK: Output: default@acid_ivot
@@ -26,8 +26,8 @@ POSTHOOK: query: create table acid_ivot(
ctimestamp1 TIMESTAMP,
ctimestamp2 TIMESTAMP,
cboolean1 BOOLEAN,
- cboolean2 BOOLEAN) clustered by (cint) into 1 buckets stored as orc TBLPROPERTIES ('transactional'='true')
+ cboolean2 BOOLEAN) stored as orc TBLPROPERTIES ('transactional'='true')
POSTHOOK: type: CREATETABLE
POSTHOOK: Output: database:default
POSTHOOK: Output: default@acid_ivot
-FAILED: SemanticException [Error 10266]: LOAD DATA... statement is not supported on transactional table default@acid_ivot.
+FAILED: SemanticException [Error 30023]: alltypesorc file name is not valid in Load Data into Acid table default.acid_ivot. Examples of valid names are: 00000_0, 00000_0_copy_1
http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/test/results/clientpositive/acid_table_stats.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/acid_table_stats.q.out b/ql/src/test/results/clientpositive/acid_table_stats.q.out
index d0fbcac..4c8297e 100644
--- a/ql/src/test/results/clientpositive/acid_table_stats.q.out
+++ b/ql/src/test/results/clientpositive/acid_table_stats.q.out
@@ -38,6 +38,7 @@ Table Parameters:
rawDataSize 0
totalSize 0
transactional true
+ transactional_properties default
#### A masked pattern was here ####
# Storage Information
http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/test/results/clientpositive/autoColumnStats_4.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/autoColumnStats_4.q.out b/ql/src/test/results/clientpositive/autoColumnStats_4.q.out
index 2bc1789..b3df04f 100644
--- a/ql/src/test/results/clientpositive/autoColumnStats_4.q.out
+++ b/ql/src/test/results/clientpositive/autoColumnStats_4.q.out
@@ -29,6 +29,7 @@ Table Parameters:
rawDataSize 0
totalSize 0
transactional true
+ transactional_properties default
#### A masked pattern was here ####
# Storage Information
@@ -198,6 +199,7 @@ Table Parameters:
rawDataSize 0
totalSize 1798
transactional true
+ transactional_properties default
#### A masked pattern was here ####
# Storage Information
@@ -241,6 +243,7 @@ Table Parameters:
rawDataSize 0
totalSize 2909
transactional true
+ transactional_properties default
#### A masked pattern was here ####
# Storage Information
http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/test/results/clientpositive/mm_default.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/mm_default.q.out b/ql/src/test/results/clientpositive/mm_default.q.out
index ebbcb9d..1345efd 100644
--- a/ql/src/test/results/clientpositive/mm_default.q.out
+++ b/ql/src/test/results/clientpositive/mm_default.q.out
@@ -324,6 +324,7 @@ Table Parameters:
rawDataSize 0
totalSize 0
transactional true
+ transactional_properties default
#### A masked pattern was here ####
# Storage Information
http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java
index 25caf29..da10313 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java
@@ -128,7 +128,12 @@ public final class TransactionalValidationListener extends MetaStorePreEventList
parameters.put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, transactionalValue);
}
if ("true".equalsIgnoreCase(transactionalValue) && !"true".equalsIgnoreCase(oldTransactionalValue)) {
- //only need to check conformance if alter table enabled aicd
+ if(!isTransactionalPropertiesPresent) {
+ normazlieTransactionalPropertyDefault(newTable);
+ isTransactionalPropertiesPresent = true;
+ transactionalPropertiesValue = DEFAULT_TRANSACTIONAL_PROPERTY;
+ }
+ //only need to check conformance if alter table enabled acid
if (!conformToAcid(newTable)) {
// INSERT_ONLY tables don't have to conform to ACID requirement like ORC or bucketing
if (transactionalPropertiesValue == null || !"insert_only".equalsIgnoreCase(transactionalPropertiesValue)) {
@@ -232,6 +237,9 @@ public final class TransactionalValidationListener extends MetaStorePreEventList
// normalize prop name
parameters.put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, Boolean.TRUE.toString());
+ if(transactionalProperties == null) {
+ normazlieTransactionalPropertyDefault(newTable);
+ }
initializeTransactionalProperties(newTable);
return;
}
@@ -241,6 +249,16 @@ public final class TransactionalValidationListener extends MetaStorePreEventList
}
/**
+ * When a table is marked transactional=true but transactional_properties is not set then
+ * transactional_properties should take on the default value. Easier to make this explicit in
+ * table definition than keep checking everywhere if it's set or not.
+ */
+ private void normazlieTransactionalPropertyDefault(Table table) {
+ table.getParameters().put(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES,
+ DEFAULT_TRANSACTIONAL_PROPERTY);
+
+ }
+ /**
* Check that InputFormatClass/OutputFormatClass should implement
* AcidInputFormat/AcidOutputFormat
*/
[3/3] hive git commit: HIVE-17361 Support LOAD DATA for transactional
tables (Eugene Koifman, reviewed by Alan Gates)
Posted by ek...@apache.org.
HIVE-17361 Support LOAD DATA for transactional tables (Eugene Koifman, reviewed by Alan Gates)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/508d7e6f
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/508d7e6f
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/508d7e6f
Branch: refs/heads/master
Commit: 508d7e6f269398a47147a697aecdbe546425679b
Parents: c03001e
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Thu Nov 30 18:39:42 2017 -0800
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Thu Nov 30 18:39:42 2017 -0800
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 2 +-
.../hadoop/hive/ql/history/TestHiveHistory.java | 2 +-
.../hive/ql/txn/compactor/TestCompactor.java | 33 +-
.../org/apache/hadoop/hive/ql/ErrorMsg.java | 3 +-
.../org/apache/hadoop/hive/ql/exec/DDLTask.java | 2 +-
.../apache/hadoop/hive/ql/exec/MoveTask.java | 8 +-
.../repl/bootstrap/load/table/LoadTable.java | 1 +
.../org/apache/hadoop/hive/ql/io/AcidUtils.java | 267 ++++++++---
.../hadoop/hive/ql/io/HiveInputFormat.java | 3 +
.../hadoop/hive/ql/io/orc/OrcInputFormat.java | 128 +++--
.../hive/ql/io/orc/OrcRawRecordMerger.java | 330 +++++++++++--
.../hadoop/hive/ql/io/orc/OrcRecordUpdater.java | 4 +-
.../apache/hadoop/hive/ql/io/orc/OrcSplit.java | 3 +
.../io/orc/VectorizedOrcAcidRowBatchReader.java | 75 +--
.../ql/io/orc/VectorizedOrcInputFormat.java | 2 -
.../hadoop/hive/ql/lockmgr/DbTxnManager.java | 32 ++
.../apache/hadoop/hive/ql/metadata/Hive.java | 78 +++-
.../hive/ql/parse/ImportSemanticAnalyzer.java | 6 +-
.../hive/ql/parse/LoadSemanticAnalyzer.java | 22 +-
.../hadoop/hive/ql/plan/LoadTableDesc.java | 25 +-
.../hive/ql/txn/compactor/CompactorMR.java | 18 +-
.../apache/hadoop/hive/ql/TestTxnCommands.java | 13 +-
.../apache/hadoop/hive/ql/TestTxnCommands2.java | 7 +-
.../apache/hadoop/hive/ql/TestTxnLoadData.java | 467 +++++++++++++++++++
.../apache/hadoop/hive/ql/TestTxnNoBuckets.java | 45 +-
.../hadoop/hive/ql/TxnCommandsBaseForTests.java | 31 +-
.../hadoop/hive/ql/exec/TestExecDriver.java | 2 +-
.../hive/ql/io/orc/TestInputOutputFormat.java | 51 +-
.../hive/ql/io/orc/TestOrcRawRecordMerger.java | 14 +-
.../TestVectorizedOrcAcidRowBatchReader.java | 4 +-
.../clientnegative/load_data_into_acid.q | 4 +-
.../clientnegative/load_data_into_acid.q.out | 6 +-
.../clientpositive/acid_table_stats.q.out | 1 +
.../clientpositive/autoColumnStats_4.q.out | 3 +
.../results/clientpositive/mm_default.q.out | 1 +
.../TransactionalValidationListener.java | 20 +-
36 files changed, 1392 insertions(+), 321 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index ada2318..3be5a8d 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1912,7 +1912,7 @@ public class HiveConf extends Configuration {
"1: Enable split-update feature found in the newer version of Hive ACID subsystem\n" +
"4: Make the table 'quarter-acid' as it only supports insert. But it doesn't require ORC or bucketing.\n" +
"This is intended to be used as an internal property for future versions of ACID. (See\n" +
- "HIVE-14035 for details.)"),
+ "HIVE-14035 for details. User sets it tblproperites via transactional_properties.)", true),
HIVE_MAX_OPEN_TXNS("hive.max.open.txns", 100000, "Maximum number of open transactions. If \n" +
"current open transactions reach this limit, future open transaction requests will be \n" +
http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java
index 2f0efce..d73cd64 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java
@@ -106,7 +106,7 @@ public class TestHiveHistory extends TestCase {
db.createTable(src, cols, null, TextInputFormat.class,
IgnoreKeyTextOutputFormat.class);
db.loadTable(hadoopDataFile[i], src,
- LoadFileType.KEEP_EXISTING, false, false, false, false, null, 0, false);
+ LoadFileType.KEEP_EXISTING, false, false, false, false, null, 0);
i++;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
index 7103fb9..a1cd9eb 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
@@ -114,8 +115,6 @@ public class TestCompactor {
hiveConf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE, TEST_WAREHOUSE_DIR);
hiveConf.setVar(HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName());
hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict");
- hiveConf.setVar(HiveConf.ConfVars.HIVEMAPREDMODE, "nonstrict");
- //"org.apache.hadoop.hive.ql.io.HiveInputFormat"
TxnDbUtil.setConfValues(hiveConf);
TxnDbUtil.cleanDb(hiveConf);
@@ -669,7 +668,7 @@ public class TestCompactor {
if (!Arrays.deepEquals(expected, names)) {
Assert.fail("Expected: " + Arrays.toString(expected) + ", found: " + Arrays.toString(names));
}
- checkExpectedTxnsPresent(null, new Path[]{resultFile},columnNamesProperty, columnTypesProperty, 0, 3L, 6L);
+ checkExpectedTxnsPresent(null, new Path[]{resultFile},columnNamesProperty, columnTypesProperty, 0, 3L, 6L, 1);
} finally {
connection.close();
@@ -697,7 +696,8 @@ public class TestCompactor {
writeBatch(connection, writer, false);
}
- // Start a third batch, but don't close it.
+ // Start a third batch, but don't close it. this delta will be ignored by compaction since
+ // it has an open txn in it
writeBatch(connection, writer, true);
// Now, compact
@@ -722,7 +722,7 @@ public class TestCompactor {
}
String name = stat[0].getPath().getName();
Assert.assertEquals(name, "base_0000006");
- checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 3L, 6L);
+ checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 3L, 6L, 1);
} finally {
connection.close();
}
@@ -788,7 +788,7 @@ public class TestCompactor {
if (!Arrays.deepEquals(expected, names)) {
Assert.fail("Expected: " + Arrays.toString(expected) + ", found: " + Arrays.toString(names));
}
- checkExpectedTxnsPresent(null, new Path[]{resultDelta}, columnNamesProperty, columnTypesProperty, 0, 3L, 6L);
+ checkExpectedTxnsPresent(null, new Path[]{resultDelta}, columnNamesProperty, columnTypesProperty, 0, 3L, 6L, 1);
} finally {
connection.close();
}
@@ -850,7 +850,7 @@ public class TestCompactor {
if (!name.equals("base_0000006")) {
Assert.fail("majorCompactAfterAbort name " + name + " not equals to base_0000006");
}
- checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 3L, 6L);
+ checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 3L, 6L, 1);
} finally {
connection.close();
}
@@ -903,7 +903,7 @@ public class TestCompactor {
}
String name = stat[0].getPath().getName();
Assert.assertEquals(name, "base_0000006");
- checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 3L, 6L);
+ checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 3L, 6L, 2);
} finally {
connection.close();
}
@@ -966,7 +966,7 @@ public class TestCompactor {
if (!Arrays.deepEquals(expectedDeltas, deltas)) {
Assert.fail("Expected: " + Arrays.toString(expectedDeltas) + ", found: " + Arrays.toString(deltas));
}
- checkExpectedTxnsPresent(null, new Path[]{minorCompactedDelta}, columnNamesProperty, columnTypesProperty, 0, 3L, 4L);
+ checkExpectedTxnsPresent(null, new Path[]{minorCompactedDelta}, columnNamesProperty, columnTypesProperty, 0, 3L, 4L, 1);
// Verify that we have got correct set of delete_deltas.
FileStatus[] deleteDeltaStat =
@@ -984,7 +984,7 @@ public class TestCompactor {
if (!Arrays.deepEquals(expectedDeleteDeltas, deleteDeltas)) {
Assert.fail("Expected: " + Arrays.toString(expectedDeleteDeltas) + ", found: " + Arrays.toString(deleteDeltas));
}
- checkExpectedTxnsPresent(null, new Path[]{minorCompactedDeleteDelta}, columnNamesProperty, columnTypesProperty, 0, 4L, 4L);
+ checkExpectedTxnsPresent(null, new Path[]{minorCompactedDeleteDelta}, columnNamesProperty, columnTypesProperty, 0, 4L, 4L, 1);
}
@Test
@@ -1043,7 +1043,7 @@ public class TestCompactor {
if (!Arrays.deepEquals(expectedDeltas, deltas)) {
Assert.fail("Expected: " + Arrays.toString(expectedDeltas) + ", found: " + Arrays.toString(deltas));
}
- checkExpectedTxnsPresent(null, new Path[]{minorCompactedDelta}, columnNamesProperty, columnTypesProperty, 0, 3L, 4L);
+ checkExpectedTxnsPresent(null, new Path[]{minorCompactedDelta}, columnNamesProperty, columnTypesProperty, 0, 3L, 4L, 1);
// Verify that we have got correct set of delete_deltas.
FileStatus[] deleteDeltaStat =
@@ -1062,7 +1062,7 @@ public class TestCompactor {
Assert.fail("Expected: " + Arrays.toString(expectedDeleteDeltas) + ", found: " + Arrays.toString(deleteDeltas));
}
// There should be no rows in the delete_delta because there have been no delete events.
- checkExpectedTxnsPresent(null, new Path[]{minorCompactedDeleteDelta}, columnNamesProperty, columnTypesProperty, 0, 0L, 0L);
+ checkExpectedTxnsPresent(null, new Path[]{minorCompactedDeleteDelta}, columnNamesProperty, columnTypesProperty, 0, 0L, 0L, 1);
}
@Test
@@ -1121,7 +1121,7 @@ public class TestCompactor {
if (!Arrays.deepEquals(expected, names)) {
Assert.fail("Expected: " + Arrays.toString(expected) + ", found: " + Arrays.toString(names));
}
- checkExpectedTxnsPresent(null, new Path[]{resultFile},columnNamesProperty, columnTypesProperty, 0, 3L, 6L);
+ checkExpectedTxnsPresent(null, new Path[]{resultFile},columnNamesProperty, columnTypesProperty, 0, 3L, 6L, 1);
// Verify that we have got correct set of delete_deltas also
FileStatus[] deleteDeltaStat =
@@ -1140,7 +1140,7 @@ public class TestCompactor {
Assert.fail("Expected: " + Arrays.toString(expectedDeleteDeltas) + ", found: " + Arrays.toString(deleteDeltas));
}
// There should be no rows in the delete_delta because there have been no delete events.
- checkExpectedTxnsPresent(null, new Path[]{minorCompactedDeleteDelta}, columnNamesProperty, columnTypesProperty, 0, 0L, 0L);
+ checkExpectedTxnsPresent(null, new Path[]{minorCompactedDeleteDelta}, columnNamesProperty, columnTypesProperty, 0, 0L, 0L, 1);
} finally {
connection.close();
@@ -1295,7 +1295,7 @@ public class TestCompactor {
}
private void checkExpectedTxnsPresent(Path base, Path[] deltas, String columnNamesProperty,
- String columnTypesProperty, int bucket, long min, long max)
+ String columnTypesProperty, int bucket, long min, long max, int numBuckets)
throws IOException {
ValidTxnList txnList = new ValidTxnList() {
@Override
@@ -1351,9 +1351,10 @@ public class TestCompactor {
Configuration conf = new Configuration();
conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, columnNamesProperty);
conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, columnTypesProperty);
+ conf.set(hive_metastoreConstants.BUCKET_COUNT, Integer.toString(numBuckets));
HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN, true);
AcidInputFormat.RawReader<OrcStruct> reader =
- aif.getRawReader(conf, false, bucket, txnList, base, deltas);
+ aif.getRawReader(conf, true, bucket, txnList, base, deltas);
RecordIdentifier identifier = reader.createKey();
OrcStruct value = reader.createValue();
long currentTxn = min;
http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
index 186d580..2f7284f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
@@ -376,7 +376,6 @@ public enum ErrorMsg {
DBTXNMGR_REQUIRES_CONCURRENCY(10264,
"To use DbTxnManager you must set hive.support.concurrency=true"),
TXNMGR_NOT_ACID(10265, "This command is not allowed on an ACID table {0}.{1} with a non-ACID transaction manager", true),
- LOAD_DATA_ON_ACID_TABLE(10266, "LOAD DATA... statement is not supported on transactional table {0}.", true),
LOCK_NO_SUCH_LOCK(10270, "No record of lock {0} could be found, " +
"may have timed out", true),
LOCK_REQUEST_UNSUPPORTED(10271, "Current transaction manager does not " +
@@ -550,6 +549,8 @@ public enum ErrorMsg {
ACID_TABLES_MUST_BE_READ_WITH_ACID_READER(30021, "An ORC ACID reader required to read ACID tables"),
ACID_TABLES_MUST_BE_READ_WITH_HIVEINPUTFORMAT(30022, "Must use HiveInputFormat to read ACID tables " +
"(set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat)"),
+ ACID_LOAD_DATA_INVALID_FILE_NAME(30023, "{0} file name is not valid in Load Data into Acid " +
+ "table {1}. Examples of valid names are: 00000_0, 00000_0_copy_1", true),
CONCATENATE_UNSUPPORTED_FILE_FORMAT(30030, "Concatenate/Merge only supported for RCFile and ORCFile formats"),
CONCATENATE_UNSUPPORTED_TABLE_BUCKETED(30031, "Concatenate/Merge can not be performed on bucketed tables"),
http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index 4076a9f..9184844 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@ -4488,7 +4488,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
part.getTPartition().getParameters().putAll(alterTbl.getProps());
} else {
boolean isFromMmTable = AcidUtils.isInsertOnlyTable(tbl.getParameters());
- Boolean isToMmTable = AcidUtils.isToInsertOnlyTable(alterTbl.getProps());
+ Boolean isToMmTable = AcidUtils.isToInsertOnlyTable(tbl, alterTbl.getProps());
if (isToMmTable != null) {
if (!isFromMmTable && isToMmTable) {
result = generateAddMmTasks(tbl);
http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
index e2f8c1f..6d13773 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
@@ -215,7 +215,7 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
Context ctx = driverContext.getCtx();
if(ctx.getHiveTxnManager().supportsAcid()) {
- //Acid LM doesn't maintain getOutputLockObjects(); this 'if' just makes it more explicit
+ //Acid LM doesn't maintain getOutputLockObjects(); this 'if' just makes logic more explicit
return;
}
HiveLockManager lockMgr = ctx.getHiveTxnManager().getLockManager();
@@ -290,7 +290,7 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
} else {
Utilities.FILE_OP_LOGGER.debug("MoveTask moving " + sourcePath + " to " + targetPath);
if(lfd.getWriteType() == AcidUtils.Operation.INSERT) {
- //'targetPath' is table root of un-partitioned table/partition
+ //'targetPath' is table root of un-partitioned table or partition
//'sourcePath' result of 'select ...' part of CTAS statement
assert lfd.getIsDfsDir();
FileSystem srcFs = sourcePath.getFileSystem(conf);
@@ -367,7 +367,7 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
checkFileFormats(db, tbd, table);
boolean isFullAcidOp = work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID
- && !tbd.isMmTable();
+ && !tbd.isMmTable(); //it seems that LoadTableDesc has Operation.INSERT only for CTAS...
// Create a data container
DataContainer dc = null;
@@ -379,7 +379,7 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
}
db.loadTable(tbd.getSourcePath(), tbd.getTable().getTableName(), tbd.getLoadFileType(),
work.isSrcLocal(), isSkewedStoredAsDirs(tbd), isFullAcidOp, hasFollowingStatsTask(),
- tbd.getTxnId(), tbd.getStmtId(), tbd.isMmTable());
+ tbd.getTxnId(), tbd.getStmtId());
if (work.getOutputs() != null) {
DDLTask.addIfAbsentByName(new WriteEntity(table,
getWriteType(tbd, work.getLoadTableWork().getWriteType())), work.getOutputs());
http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
index bb1f4e5..545b7a8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
@@ -229,6 +229,7 @@ public class LoadTable {
LoadTableDesc loadTableWork = new LoadTableDesc(
tmpPath, Utilities.getTableDesc(table), new TreeMap<>(),
replicationSpec.isReplace() ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING,
+ //todo: what is the point of this? If this is for replication, who would have opened a txn?
SessionState.get().getTxnMgr().getCurrentTxnId()
);
MoveWork moveWork =
http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index 4c0b71f..9ab028d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -22,7 +22,7 @@ import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.Comparator;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -30,6 +30,8 @@ import java.util.Set;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -39,18 +41,17 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.api.DataOperationType;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
-import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.TransactionalValidationListener;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.io.orc.OrcRecordUpdater;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.plan.CreateTableDesc;
-import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.shims.HadoopShims;
import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hive.common.util.Ref;
import org.apache.orc.impl.OrcAcidUtils;
+import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -122,13 +123,14 @@ public class AcidUtils {
public static final Pattern BUCKET_DIGIT_PATTERN = Pattern.compile("[0-9]{5}$");
public static final Pattern LEGACY_BUCKET_DIGIT_PATTERN = Pattern.compile("^[0-9]{6}");
/**
- * This does not need to use ORIGINAL_PATTERN_COPY because it's used to read
- * a "delta" dir written by a real Acid write - cannot have any copies
+ * A write into a non-aicd table produces files like 0000_0 or 0000_0_copy_1
+ * (Unless via Load Data statment)
*/
public static final PathFilter originalBucketFilter = new PathFilter() {
@Override
public boolean accept(Path path) {
- return ORIGINAL_PATTERN.matcher(path.getName()).matches();
+ return ORIGINAL_PATTERN.matcher(path.getName()).matches() ||
+ ORIGINAL_PATTERN_COPY.matcher(path.getName()).matches();
}
};
@@ -137,6 +139,7 @@ public class AcidUtils {
}
private static final Logger LOG = LoggerFactory.getLogger(AcidUtils.class);
+ public static final Pattern BUCKET_PATTERN = Pattern.compile(BUCKET_PREFIX + "_[0-9]{5}$");
public static final Pattern ORIGINAL_PATTERN =
Pattern.compile("[0-9]+_[0-9]+");
/**
@@ -156,14 +159,30 @@ public class AcidUtils {
private static final HadoopShims SHIMS = ShimLoader.getHadoopShims();
/**
- * Create the bucket filename.
+ * Create the bucket filename in Acid format
* @param subdir the subdirectory for the bucket.
* @param bucket the bucket number
* @return the filename
*/
public static Path createBucketFile(Path subdir, int bucket) {
- return new Path(subdir,
+ return createBucketFile(subdir, bucket, true);
+ }
+
+ /**
+ * Create acid or original bucket name
+ * @param subdir the subdirectory for the bucket.
+ * @param bucket the bucket number
+ * @return the filename
+ */
+ private static Path createBucketFile(Path subdir, int bucket, boolean isAcidSchema) {
+ if(isAcidSchema) {
+ return new Path(subdir,
BUCKET_PREFIX + String.format(BUCKET_DIGITS, bucket));
+ }
+ else {
+ return new Path(subdir,
+ String.format(BUCKET_DIGITS, bucket));
+ }
}
/**
@@ -244,7 +263,7 @@ public class AcidUtils {
* @param path the base directory name
* @return the maximum transaction id that is included
*/
- static long parseBase(Path path) {
+ public static long parseBase(Path path) {
String filename = path.getName();
if (filename.startsWith(BASE_PREFIX)) {
return Long.parseLong(filename.substring(BASE_PREFIX.length()));
@@ -262,7 +281,7 @@ public class AcidUtils {
*/
public static AcidOutputFormat.Options
parseBaseOrDeltaBucketFilename(Path bucketFile,
- Configuration conf) {
+ Configuration conf) throws IOException {
AcidOutputFormat.Options result = new AcidOutputFormat.Options(conf);
String filename = bucketFile.getName();
if (ORIGINAL_PATTERN.matcher(filename).matches()) {
@@ -273,7 +292,7 @@ public class AcidUtils {
.minimumTransactionId(0)
.maximumTransactionId(0)
.bucket(bucket)
- .writingBase(true);
+ .writingBase(!bucketFile.getParent().getName().startsWith(DELTA_PREFIX));
}
else if(ORIGINAL_PATTERN_COPY.matcher(filename).matches()) {
//todo: define groups in regex and use parseInt(Matcher.group(2))....
@@ -286,7 +305,7 @@ public class AcidUtils {
.maximumTransactionId(0)
.bucket(bucket)
.copyNumber(copyNumber)
- .writingBase(true);
+ .writingBase(!bucketFile.getParent().getName().startsWith(DELTA_PREFIX));
}
else if (filename.startsWith(BUCKET_PREFIX)) {
int bucket =
@@ -299,14 +318,16 @@ public class AcidUtils {
.bucket(bucket)
.writingBase(true);
} else if (bucketFile.getParent().getName().startsWith(DELTA_PREFIX)) {
- ParsedDelta parsedDelta = parsedDelta(bucketFile.getParent(), DELTA_PREFIX);
+ ParsedDelta parsedDelta = parsedDelta(bucketFile.getParent(), DELTA_PREFIX,
+ bucketFile.getFileSystem(conf));
result
.setOldStyle(false)
.minimumTransactionId(parsedDelta.minTransaction)
.maximumTransactionId(parsedDelta.maxTransaction)
.bucket(bucket);
} else if (bucketFile.getParent().getName().startsWith(DELETE_DELTA_PREFIX)) {
- ParsedDelta parsedDelta = parsedDelta(bucketFile.getParent(), DELETE_DELTA_PREFIX);
+ ParsedDelta parsedDelta = parsedDelta(bucketFile.getParent(), DELETE_DELTA_PREFIX,
+ bucketFile.getFileSystem(conf));
result
.setOldStyle(false)
.minimumTransactionId(parsedDelta.minTransaction)
@@ -344,11 +365,17 @@ public class AcidUtils {
throw new IllegalArgumentException("Unexpected Operation: " + op);
}
}
-
public enum AcidBaseFileType {
- COMPACTED_BASE, // a regular base file generated through major compaction
- ORIGINAL_BASE, // a non-acid schema file for tables that got converted to acid
- INSERT_DELTA; // a delta file with only insert events that can be treated as base for split-update
+ /**
+ * File w/o Acid meta columns. This this would be the case for files that were added to the table
+ * before it was converted to Acid but not yet major compacted. May also be the the result of
+ * Load Data statement on an acid table.
+ */
+ ORIGINAL_BASE,
+ /**
+ * File that has Acid metadata columns embedded in it. Found in base_x/ or delta_x_y/.
+ */
+ ACID_SCHEMA,
}
/**
@@ -366,16 +393,12 @@ public class AcidUtils {
this.acidBaseFileType = acidBaseFileType;
}
- public boolean isCompactedBase() {
- return this.acidBaseFileType == AcidBaseFileType.COMPACTED_BASE;
- }
-
public boolean isOriginal() {
return this.acidBaseFileType == AcidBaseFileType.ORIGINAL_BASE;
}
- public boolean isInsertDelta() {
- return this.acidBaseFileType == AcidBaseFileType.INSERT_DELTA;
+ public boolean isAcidSchema() {
+ return this.acidBaseFileType == AcidBaseFileType.ACID_SCHEMA;
}
public HdfsFileStatusWithId getHdfsFileStatusWithId() {
@@ -545,6 +568,7 @@ public class AcidUtils {
* @return the base directory to read
*/
Path getBaseDirectory();
+ boolean isBaseInRawFormat();
/**
* Get the list of original files. Not {@code null}. Must be sorted.
@@ -576,7 +600,10 @@ public class AcidUtils {
List<FileStatus> getAbortedDirectories();
}
- public static class ParsedDelta implements Comparable<ParsedDelta> {
+ /**
+ * Immutable
+ */
+ public static final class ParsedDelta implements Comparable<ParsedDelta> {
private final long minTransaction;
private final long maxTransaction;
private final FileStatus path;
@@ -584,19 +611,24 @@ public class AcidUtils {
//had no statement ID
private final int statementId;
private final boolean isDeleteDelta; // records whether delta dir is of type 'delete_delta_x_y...'
+ private final boolean isRawFormat;
/**
* for pre 1.3.x delta files
*/
- ParsedDelta(long min, long max, FileStatus path, boolean isDeleteDelta) {
- this(min, max, path, -1, isDeleteDelta);
+ private ParsedDelta(long min, long max, FileStatus path, boolean isDeleteDelta,
+ boolean isRawFormat) {
+ this(min, max, path, -1, isDeleteDelta, isRawFormat);
}
- ParsedDelta(long min, long max, FileStatus path, int statementId, boolean isDeleteDelta) {
+ private ParsedDelta(long min, long max, FileStatus path, int statementId,
+ boolean isDeleteDelta, boolean isRawFormat) {
this.minTransaction = min;
this.maxTransaction = max;
this.path = path;
this.statementId = statementId;
this.isDeleteDelta = isDeleteDelta;
+ this.isRawFormat = isRawFormat;
+ assert !isDeleteDelta || !isRawFormat : " deleteDelta should not be raw format";
}
public long getMinTransaction() {
@@ -618,7 +650,12 @@ public class AcidUtils {
public boolean isDeleteDelta() {
return isDeleteDelta;
}
-
+ /**
+ * Files w/o Acid meta columns embedded in the file. See {@link AcidBaseFileType#ORIGINAL_BASE}
+ */
+ public boolean isRawFormat() {
+ return isRawFormat;
+ }
/**
* Compactions (Major/Minor) merge deltas/bases but delete of old files
* happens in a different process; thus it's possible to have bases/deltas with
@@ -698,29 +735,6 @@ public class AcidUtils {
}
/**
- * Convert the list of begin/end transaction id pairs to a list of delta
- * directories. Note that there may be multiple delta files for the exact same txn range starting
- * with 1.3.x;
- * see {@link org.apache.hadoop.hive.ql.io.AcidUtils#deltaSubdir(long, long, int)}
- * @param root the root directory
- * @param deltas list of begin/end transaction id pairs
- * @return the list of delta paths
- */
- public static Path[] deserializeDeltas(Path root, final List<AcidInputFormat.DeltaMetaData> deltas) throws IOException {
- List<Path> results = new ArrayList<Path>(deltas.size());
- for(AcidInputFormat.DeltaMetaData dmd : deltas) {
- if(dmd.getStmtIds().isEmpty()) {
- results.add(new Path(root, deltaSubdir(dmd.getMinTxnId(), dmd.getMaxTxnId())));
- continue;
- }
- for(Integer stmtId : dmd.getStmtIds()) {
- results.add(new Path(root, deltaSubdir(dmd.getMinTxnId(), dmd.getMaxTxnId(), stmtId)));
- }
- }
- return results.toArray(new Path[results.size()]);
- }
-
- /**
* Convert the list of begin/end transaction id pairs to a list of delete delta
* directories. Note that there may be multiple delete_delta files for the exact same txn range starting
* with 2.2.x;
@@ -743,25 +757,29 @@ public class AcidUtils {
return results.toArray(new Path[results.size()]);
}
- public static ParsedDelta parsedDelta(Path deltaDir) {
+ public static ParsedDelta parsedDelta(Path deltaDir, FileSystem fs) throws IOException {
String deltaDirName = deltaDir.getName();
if (deltaDirName.startsWith(DELETE_DELTA_PREFIX)) {
- return parsedDelta(deltaDir, DELETE_DELTA_PREFIX);
+ return parsedDelta(deltaDir, DELETE_DELTA_PREFIX, fs);
}
- return parsedDelta(deltaDir, DELTA_PREFIX); // default prefix is delta_prefix
+ return parsedDelta(deltaDir, DELTA_PREFIX, fs); // default prefix is delta_prefix
}
- private static ParsedDelta parseDelta(FileStatus path, String deltaPrefix) {
- ParsedDelta p = parsedDelta(path.getPath(), deltaPrefix);
+ private static ParsedDelta parseDelta(FileStatus path, String deltaPrefix, FileSystem fs)
+ throws IOException {
+ ParsedDelta p = parsedDelta(path.getPath(), deltaPrefix, fs);
boolean isDeleteDelta = deltaPrefix.equals(DELETE_DELTA_PREFIX);
return new ParsedDelta(p.getMinTransaction(),
- p.getMaxTransaction(), path, p.statementId, isDeleteDelta);
+ p.getMaxTransaction(), path, p.statementId, isDeleteDelta, p.isRawFormat());
}
- public static ParsedDelta parsedDelta(Path deltaDir, String deltaPrefix) {
+ public static ParsedDelta parsedDelta(Path deltaDir, String deltaPrefix, FileSystem fs)
+ throws IOException {
String filename = deltaDir.getName();
boolean isDeleteDelta = deltaPrefix.equals(DELETE_DELTA_PREFIX);
if (filename.startsWith(deltaPrefix)) {
+ //small optimization - delete delta can't be in raw format
+ boolean isRawFormat = !isDeleteDelta && MetaDataFile.isRawFormat(deltaDir, fs);
String rest = filename.substring(deltaPrefix.length());
int split = rest.indexOf('_');
int split2 = rest.indexOf('_', split + 1);//may be -1 if no statementId
@@ -770,10 +788,10 @@ public class AcidUtils {
Long.parseLong(rest.substring(split + 1)) :
Long.parseLong(rest.substring(split + 1, split2));
if(split2 == -1) {
- return new ParsedDelta(min, max, null, isDeleteDelta);
+ return new ParsedDelta(min, max, null, isDeleteDelta, isRawFormat);
}
int statementId = Integer.parseInt(rest.substring(split2 + 1));
- return new ParsedDelta(min, max, null, statementId, isDeleteDelta);
+ return new ParsedDelta(min, max, null, statementId, isDeleteDelta, isRawFormat);
}
throw new IllegalArgumentException(deltaDir + " does not start with " +
deltaPrefix);
@@ -871,13 +889,13 @@ public class AcidUtils {
if (childrenWithId != null) {
for (HdfsFileStatusWithId child : childrenWithId) {
getChildState(child.getFileStatus(), child, txnList, working, originalDirectories, original,
- obsolete, bestBase, ignoreEmptyFiles, abortedDirectories, tblproperties);
+ obsolete, bestBase, ignoreEmptyFiles, abortedDirectories, tblproperties, fs);
}
} else {
List<FileStatus> children = HdfsUtils.listLocatedStatus(fs, directory, hiddenFileFilter);
for (FileStatus child : children) {
getChildState(child, null, txnList, working, originalDirectories, original, obsolete,
- bestBase, ignoreEmptyFiles, abortedDirectories, tblproperties);
+ bestBase, ignoreEmptyFiles, abortedDirectories, tblproperties, fs);
}
}
@@ -976,12 +994,18 @@ public class AcidUtils {
//this does "Path.uri.compareTo(that.uri)"
return o1.getFileStatus().compareTo(o2.getFileStatus());
});
- return new Directory(){
+
+ final boolean isBaseInRawFormat = base != null && MetaDataFile.isRawFormat(base, fs);
+ return new Directory() {
@Override
public Path getBaseDirectory() {
return base;
}
+ @Override
+ public boolean isBaseInRawFormat() {
+ return isBaseInRawFormat;
+ }
@Override
public List<HdfsFileStatusWithId> getOriginalFiles() {
@@ -1022,7 +1046,8 @@ public class AcidUtils {
private static void getChildState(FileStatus child, HdfsFileStatusWithId childWithId,
ValidTxnList txnList, List<ParsedDelta> working, List<FileStatus> originalDirectories,
List<HdfsFileStatusWithId> original, List<FileStatus> obsolete, TxnBase bestBase,
- boolean ignoreEmptyFiles, List<FileStatus> aborted, Map<String, String> tblproperties) throws IOException {
+ boolean ignoreEmptyFiles, List<FileStatus> aborted, Map<String, String> tblproperties,
+ FileSystem fs) throws IOException {
Path p = child.getPath();
String fn = p.getName();
if (fn.startsWith(BASE_PREFIX) && child.isDir()) {
@@ -1050,7 +1075,7 @@ public class AcidUtils {
&& child.isDir()) {
String deltaPrefix =
(fn.startsWith(DELTA_PREFIX)) ? DELTA_PREFIX : DELETE_DELTA_PREFIX;
- ParsedDelta delta = parseDelta(child, deltaPrefix);
+ ParsedDelta delta = parseDelta(child, deltaPrefix, fs);
if (tblproperties != null && AcidUtils.isInsertOnlyTable(tblproperties) &&
ValidTxnList.RangeResponse.ALL == txnList.isTxnRangeAborted(delta.minTransaction, delta.maxTransaction)) {
aborted.add(child);
@@ -1171,8 +1196,11 @@ public class AcidUtils {
parameters.put(ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN.varname, Boolean.toString(isAcidTable));
}
- public static void setTransactionalTableScan(Configuration conf, boolean isAcidTable) {
- HiveConf.setBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN, isAcidTable);
+ /**
+ * Means it's a full acid table
+ */
+ public static void setTransactionalTableScan(Configuration conf, boolean isFullAcidTable) {
+ HiveConf.setBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN, isFullAcidTable);
}
/**
* @param p - not null
@@ -1185,6 +1213,8 @@ public class AcidUtils {
* SessionState.get().getTxnMgr().supportsAcid() here
* @param table table
* @return true if table is a legit ACID table, false otherwise
+ * ToDo: this shoudl be renamed isTransactionalTable() since that is what it's checking and covers
+ * both Acid and MM tables. HIVE-18124
*/
public static boolean isAcidTable(Table table) {
if (table == null) {
@@ -1197,6 +1227,10 @@ public class AcidUtils {
return tableIsTransactional != null && tableIsTransactional.equalsIgnoreCase("true");
}
+ /**
+ * ToDo: this shoudl be renamed isTransactionalTable() since that is what it's checking and convers
+ * both Acid and MM tables. HIVE-18124
+ */
public static boolean isAcidTable(CreateTableDesc table) {
if (table == null || table.getTblProps() == null) {
return false;
@@ -1208,8 +1242,11 @@ public class AcidUtils {
return tableIsTransactional != null && tableIsTransactional.equalsIgnoreCase("true");
}
+ /**
+ * after isTransactionalTable() then make this isAcid() HIVE-18124
+ */
public static boolean isFullAcidTable(Table table) {
- return isAcidTable(table) && !AcidUtils.isInsertOnlyTable(table.getParameters());
+ return isAcidTable(table) && !AcidUtils.isInsertOnlyTable(table);
}
/**
@@ -1336,6 +1373,9 @@ public class AcidUtils {
public static boolean isInsertOnlyTable(Map<String, String> params) {
return isInsertOnlyTable(params, false);
}
+ public static boolean isInsertOnlyTable(Table table) {
+ return isAcidTable(table) && getAcidOperationalProperties(table).isInsertOnly();
+ }
// TODO [MM gap]: CTAS may currently be broken. It used to work. See the old code, and why isCtas isn't used?
public static boolean isInsertOnlyTable(Map<String, String> params, boolean isCtas) {
@@ -1349,13 +1389,21 @@ public class AcidUtils {
return (transactionalProp != null && "insert_only".equalsIgnoreCase(transactionalProp));
}
- /** The method for altering table props; may set the table to MM, non-MM, or not affect MM. */
- public static Boolean isToInsertOnlyTable(Map<String, String> props) {
+ /**
+ * The method for altering table props; may set the table to MM, non-MM, or not affect MM.
+ * todo: All such validation logic should be TransactionValidationListener
+ * @param tbl object image before alter table command
+ * @param props prop values set in this alter table command
+ */
+ public static Boolean isToInsertOnlyTable(Table tbl, Map<String, String> props) {
// Note: Setting these separately is a very hairy issue in certain combinations, since we
// cannot decide what type of table this becomes without taking both into account, and
// in many cases the conversion might be illegal.
// The only thing we allow is tx = true w/o tx-props, for backward compat.
String transactional = props.get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL);
+ if(transactional == null) {
+ transactional = tbl.getParameters().get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL);
+ }
String transactionalProp = props.get(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES);
if (transactional == null && transactionalProp == null) return null; // Not affected.
boolean isSetToTxn = "true".equalsIgnoreCase(transactional);
@@ -1378,4 +1426,81 @@ public class AcidUtils {
hasProps = removedSet.contains(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES);
return hasTxn || hasProps;
}
+
+ /**
+ * Load Data commands against Acid tables write {@link AcidBaseFileType#ORIGINAL_BASE} type files
+ * into delta_x_x/ (or base_x in case there is Overwrite clause). {@link MetaDataFile} is a
+ * small JSON file in this directory that indicates that these files don't have Acid metadata
+ * columns and so the values for these columns need to be assigned at read time/compaction.
+ */
+ public static class MetaDataFile {
+ //export command uses _metadata....
+ private static final String METADATA_FILE = "_metadata_acid";
+ private static final String CURRENT_VERSION = "0";
+ //todo: enums? that have both field name and value list
+ private interface Field {
+ String VERSION = "thisFileVersion";
+ String DATA_FORMAT = "dataFormat";
+ }
+ private interface Value {
+ //plain ORC file
+ String RAW = "raw";
+ //result of acid write, i.e. decorated with ROW__ID info
+ String NATIVE = "native";
+ }
+
+ /**
+ * @param baseOrDeltaDir detla or base dir, must exist
+ */
+ public static void createMetaFile(Path baseOrDeltaDir, FileSystem fs, boolean isRawFormat)
+ throws IOException {
+ /**
+ * create _meta_data json file in baseOrDeltaDir
+ * write thisFileVersion, dataFormat
+ *
+ * on read if the file is not there, assume version 0 and dataFormat=acid
+ */
+ Path formatFile = new Path(baseOrDeltaDir, METADATA_FILE);
+ Map<String, String> metaData = new HashMap<>();
+ metaData.put(Field.VERSION, CURRENT_VERSION);
+ metaData.put(Field.DATA_FORMAT, isRawFormat ? Value.RAW : Value.NATIVE);
+ try (FSDataOutputStream strm = fs.create(formatFile, false)) {
+ new ObjectMapper().writeValue(strm, metaData);
+ } catch (IOException ioe) {
+ String msg = "Failed to create " + baseOrDeltaDir + "/" + METADATA_FILE
+ + ": " + ioe.getMessage();
+ LOG.error(msg, ioe);
+ throw ioe;
+ }
+ }
+ public static boolean isRawFormat(Path baseOrDeltaDir, FileSystem fs) throws IOException {
+ Path formatFile = new Path(baseOrDeltaDir, METADATA_FILE);
+ if(!fs.exists(formatFile)) {
+ return false;
+ }
+ try (FSDataInputStream strm = fs.open(formatFile)) {
+ Map<String, String> metaData = new ObjectMapper().readValue(strm, Map.class);
+ if(!CURRENT_VERSION.equalsIgnoreCase(metaData.get(Field.VERSION))) {
+ throw new IllegalStateException("Unexpected Meta Data version: "
+ + metaData.get(Field.VERSION));
+ }
+ String dataFormat = metaData.getOrDefault(Field.DATA_FORMAT, "null");
+ switch (dataFormat) {
+ case Value.NATIVE:
+ return false;
+ case Value.RAW:
+ return true;
+ default:
+ throw new IllegalArgumentException("Unexpected value for " + Field.DATA_FORMAT
+ + ": " + dataFormat);
+ }
+ }
+ catch(IOException e) {
+ String msg = "Failed to read " + baseOrDeltaDir + "/" + METADATA_FILE
+ + ": " + e.getMessage();
+ LOG.error(msg, e);
+ throw e;
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
index 6a1dc72..819c2a2 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
@@ -468,6 +468,9 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
try {
Utilities.copyTablePropertiesToConf(table, conf);
+ if(tableScan != null) {
+ AcidUtils.setTransactionalTableScan(conf, tableScan.getConf().isAcidTable());
+ }
} catch (HiveException e) {
throw new IOException(e);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index dda9f93..becdc71 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.io.orc;
import org.apache.hadoop.hive.ql.plan.DynamicValue.NoDynamicValuesException;
+import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import java.io.IOException;
@@ -409,7 +410,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
* @param readerSchema the types for the reader
* @param conf the configuration
*/
- public static boolean[] genIncludedColumns(TypeDescription readerSchema,
+ static boolean[] genIncludedColumns(TypeDescription readerSchema,
Configuration conf) {
if (!ColumnProjectionUtils.isReadAllColumns(conf)) {
List<Integer> included = ColumnProjectionUtils.getReadColumnIDs(conf);
@@ -419,7 +420,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
}
}
- public static String[] getSargColumnNames(String[] originalColumnNames,
+ private static String[] getSargColumnNames(String[] originalColumnNames,
List<OrcProto.Type> types, boolean[] includedColumns, boolean isOriginal) {
int rootColumn = getRootColumn(isOriginal);
String[] columnNames = new String[types.size() - rootColumn];
@@ -695,21 +696,29 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
*/
@VisibleForTesting
static final class AcidDirInfo {
- public AcidDirInfo(FileSystem fs, Path splitPath, Directory acidInfo,
+ AcidDirInfo(FileSystem fs, Path splitPath, Directory acidInfo,
List<AcidBaseFileInfo> baseFiles,
- List<ParsedDelta> parsedDeltas) {
+ List<ParsedDelta> deleteEvents) {
this.splitPath = splitPath;
this.acidInfo = acidInfo;
this.baseFiles = baseFiles;
this.fs = fs;
- this.parsedDeltas = parsedDeltas;
+ this.deleteEvents = deleteEvents;
}
final FileSystem fs;
final Path splitPath;
final AcidUtils.Directory acidInfo;
final List<AcidBaseFileInfo> baseFiles;
- final List<ParsedDelta> parsedDeltas;
+ final List<ParsedDelta> deleteEvents;
+
+ /**
+ * No (qualifying) data files found in {@link #splitPath}
+ * @return
+ */
+ boolean isEmpty() {
+ return (baseFiles == null || baseFiles.isEmpty());
+ }
}
@VisibleForTesting
@@ -884,7 +893,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
public CombineResult combineWith(FileSystem fs, Path dir,
List<HdfsFileStatusWithId> otherFiles, boolean isOriginal) {
if ((files.size() + otherFiles.size()) > ETL_COMBINE_FILE_LIMIT
- || this.isOriginal != isOriginal) {
+ || this.isOriginal != isOriginal) {//todo: what is this checking????
return (files.size() > otherFiles.size())
? CombineResult.NO_AND_SWAP : CombineResult.NO_AND_CONTINUE;
}
@@ -1083,6 +1092,12 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
static final class FileGenerator implements Callable<AcidDirInfo> {
private final Context context;
private final FileSystem fs;
+ /**
+ * For plain or acid tables this is the root of the partition (or table if not partitioned).
+ * For MM table this is delta/ or base/ dir. In MM case applying of the ValidTxnList that
+ * {@link AcidUtils#getAcidState(Path, Configuration, ValidTxnList)} normally does has already
+ * been done in {@link HiveInputFormat#processPathsForMmRead(List, JobConf, ValidTxnList)}.
+ */
private final Path dir;
private final Ref<Boolean> useFileIds;
private final UserGroupInformation ugi;
@@ -1119,25 +1134,27 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
}
private AcidDirInfo callInternal() throws IOException {
+ //todo: shouldn't ignoreEmptyFiles be set based on ExecutionEngine?
AcidUtils.Directory dirInfo = AcidUtils.getAcidState(dir, context.conf,
context.transactionList, useFileIds, true, null);
- Path base = dirInfo.getBaseDirectory();
// find the base files (original or new style)
- List<AcidBaseFileInfo> baseFiles = new ArrayList<AcidBaseFileInfo>();
- if (base == null) {
+ List<AcidBaseFileInfo> baseFiles = new ArrayList<>();
+ if (dirInfo.getBaseDirectory() == null) {
+ //for non-acid tables, all data files are in getOriginalFiles() list
for (HdfsFileStatusWithId fileId : dirInfo.getOriginalFiles()) {
baseFiles.add(new AcidBaseFileInfo(fileId, AcidUtils.AcidBaseFileType.ORIGINAL_BASE));
}
} else {
- List<HdfsFileStatusWithId> compactedBaseFiles = findBaseFiles(base, useFileIds);
+ List<HdfsFileStatusWithId> compactedBaseFiles = findBaseFiles(dirInfo.getBaseDirectory(), useFileIds);
for (HdfsFileStatusWithId fileId : compactedBaseFiles) {
- baseFiles.add(new AcidBaseFileInfo(fileId, AcidUtils.AcidBaseFileType.COMPACTED_BASE));
+ baseFiles.add(new AcidBaseFileInfo(fileId, dirInfo.isBaseInRawFormat() ?
+ AcidUtils.AcidBaseFileType.ORIGINAL_BASE : AcidUtils.AcidBaseFileType.ACID_SCHEMA));
}
}
// Find the parsed deltas- some of them containing only the insert delta events
// may get treated as base if split-update is enabled for ACID. (See HIVE-14035 for details)
- List<ParsedDelta> parsedDeltas = new ArrayList<ParsedDelta>();
+ List<ParsedDelta> parsedDeltas = new ArrayList<>();
if (context.acidOperationalProperties != null &&
context.acidOperationalProperties.isSplitUpdate()) {
@@ -1154,15 +1171,26 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
if (parsedDelta.isDeleteDelta()) {
parsedDeltas.add(parsedDelta);
} else {
+ AcidUtils.AcidBaseFileType deltaType = parsedDelta.isRawFormat() ?
+ AcidUtils.AcidBaseFileType.ORIGINAL_BASE : AcidUtils.AcidBaseFileType.ACID_SCHEMA;
+ PathFilter bucketFilter = parsedDelta.isRawFormat() ?
+ AcidUtils.originalBucketFilter : AcidUtils.bucketFileFilter;
+ if(parsedDelta.isRawFormat() && parsedDelta.getMinTransaction() !=
+ parsedDelta.getMaxTransaction()) {
+ //delta/ with files in raw format are a result of Load Data (as opposed to compaction
+ //or streaming ingest so must have interval length == 1.
+ throw new IllegalStateException("Delta in " + AcidUtils.AcidBaseFileType.ORIGINAL_BASE
+ + " format but txnIds are out of range: " + parsedDelta.getPath());
+ }
// This is a normal insert delta, which only has insert events and hence all the files
// in this delta directory can be considered as a base.
Boolean val = useFileIds.value;
if (val == null || val) {
try {
List<HdfsFileStatusWithId> insertDeltaFiles =
- SHIMS.listLocatedHdfsStatus(fs, parsedDelta.getPath(), AcidUtils.bucketFileFilter);
+ SHIMS.listLocatedHdfsStatus(fs, parsedDelta.getPath(), bucketFilter);
for (HdfsFileStatusWithId fileId : insertDeltaFiles) {
- baseFiles.add(new AcidBaseFileInfo(fileId, AcidUtils.AcidBaseFileType.INSERT_DELTA));
+ baseFiles.add(new AcidBaseFileInfo(fileId, deltaType));
}
if (val == null) {
useFileIds.value = true; // The call succeeded, so presumably the API is there.
@@ -1176,15 +1204,20 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
}
}
// Fall back to regular API and create statuses without ID.
- List<FileStatus> children = HdfsUtils.listLocatedStatus(fs, parsedDelta.getPath(), AcidUtils.bucketFileFilter);
+ List<FileStatus> children = HdfsUtils.listLocatedStatus(fs, parsedDelta.getPath(), bucketFilter);
for (FileStatus child : children) {
HdfsFileStatusWithId fileId = AcidUtils.createOriginalObj(null, child);
- baseFiles.add(new AcidBaseFileInfo(fileId, AcidUtils.AcidBaseFileType.INSERT_DELTA));
+ baseFiles.add(new AcidBaseFileInfo(fileId, deltaType));
}
}
}
} else {
+ /*
+ We already handled all delete deltas above and there should not be any other deltas for
+ any table type. (this was acid 1.0 code path).
+ */
+ assert dirInfo.getCurrentDirectories().isEmpty() : "Non empty curDir list?!: " + dir;
// When split-update is not enabled, then all the deltas in the current directories
// should be considered as usual.
parsedDeltas.addAll(dirInfo.getCurrentDirectories());
@@ -1658,7 +1691,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
pathFutures.add(ecs.submit(fileGenerator));
}
- boolean isTransactionalTableScan =//this never seems to be set correctly
+ boolean isTransactionalTableScan =
HiveConf.getBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN);
boolean isSchemaEvolution = HiveConf.getBoolVar(conf, ConfVars.HIVE_SCHEMA_EVOLUTION);
TypeDescription readerSchema =
@@ -1700,13 +1733,16 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
// We have received a new directory information, make split strategies.
--resultsLeft;
-
+ if(adi.isEmpty()) {
+ //no files found, for example empty table/partition
+ continue;
+ }
// The reason why we can get a list of split strategies here is because for ACID split-update
// case when we have a mix of original base files & insert deltas, we will produce two
// independent split strategies for them. There is a global flag 'isOriginal' that is set
// on a per split strategy basis and it has to be same for all the files in that strategy.
List<SplitStrategy<?>> splitStrategies = determineSplitStrategies(combinedCtx, context, adi.fs,
- adi.splitPath, adi.baseFiles, adi.parsedDeltas, readerTypes, ugi,
+ adi.splitPath, adi.baseFiles, adi.deleteEvents, readerTypes, ugi,
allowSyntheticFileIds);
for (SplitStrategy<?> splitStrategy : splitStrategies) {
@@ -1790,6 +1826,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
boolean isOriginal, UserGroupInformation ugi, boolean allowSyntheticFileIds,
boolean isDefaultFs) {
if (!deltas.isEmpty() || combinedCtx == null) {
+ //why is this checking for deltas.isEmpty() - HIVE-18110
return new ETLSplitStrategy(
context, fs, dir, files, readerTypes, isOriginal, deltas, covered, ugi,
allowSyntheticFileIds, isDefaultFs);
@@ -1955,6 +1992,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
final Reader reader = OrcInputFormat.createOrcReaderForSplit(conf, split);
OrcRawRecordMerger.Options mergerOptions = new OrcRawRecordMerger.Options().isCompacting(false);
mergerOptions.rootPath(split.getRootDir());
+ mergerOptions.bucketPath(split.getPath());
final int bucket;
if (split.hasBase()) {
AcidOutputFormat.Options acidIOOptions =
@@ -1968,8 +2006,9 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
}
} else {
bucket = (int) split.getStart();
+ assert false : "We should never have a split w/o base in acid 2.0 for full acid: " + split.getPath();
}
-
+ //todo: createOptionsForReader() assumes it's !isOriginal.... why?
final Reader.Options readOptions = OrcInputFormat.createOptionsForReader(conf);
readOptions.range(split.getStart(), split.getLength());
@@ -2041,6 +2080,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
// TODO: Convert genIncludedColumns and setSearchArgument to use TypeDescription.
final List<OrcProto.Type> schemaTypes = OrcUtils.getOrcTypes(schema);
readerOptions.include(OrcInputFormat.genIncludedColumns(schema, conf));
+ //todo: last param is bogus. why is this hardcoded?
OrcInputFormat.setSearchArgument(readerOptions, schemaTypes, conf, true);
return readerOptions;
}
@@ -2144,7 +2184,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
List<AcidBaseFileInfo> baseFiles,
List<ParsedDelta> parsedDeltas,
List<OrcProto.Type> readerTypes,
- UserGroupInformation ugi, boolean allowSyntheticFileIds) {
+ UserGroupInformation ugi, boolean allowSyntheticFileIds) throws IOException {
List<SplitStrategy<?>> splitStrategies = new ArrayList<SplitStrategy<?>>();
SplitStrategy<?> splitStrategy;
@@ -2153,23 +2193,24 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
boolean isDefaultFs = (!checkDefaultFs) || ((fs instanceof DistributedFileSystem)
&& HdfsUtils.isDefaultFs((DistributedFileSystem) fs));
- // When no baseFiles, we will just generate a single split strategy and return.
- List<HdfsFileStatusWithId> acidSchemaFiles = new ArrayList<HdfsFileStatusWithId>();
if (baseFiles.isEmpty()) {
- splitStrategy = determineSplitStrategy(combinedCtx, context, fs, dir, acidSchemaFiles,
+ assert false : "acid 2.0 no base?!: " + dir;
+ splitStrategy = determineSplitStrategy(combinedCtx, context, fs, dir, Collections.emptyList(),
false, parsedDeltas, readerTypes, ugi, allowSyntheticFileIds, isDefaultFs);
if (splitStrategy != null) {
splitStrategies.add(splitStrategy);
}
- return splitStrategies; // return here
+ return splitStrategies;
}
+ List<HdfsFileStatusWithId> acidSchemaFiles = new ArrayList<>();
List<HdfsFileStatusWithId> originalSchemaFiles = new ArrayList<HdfsFileStatusWithId>();
// Separate the base files into acid schema and non-acid(original) schema files.
for (AcidBaseFileInfo acidBaseFileInfo : baseFiles) {
if (acidBaseFileInfo.isOriginal()) {
originalSchemaFiles.add(acidBaseFileInfo.getHdfsFileStatusWithId());
} else {
+ assert acidBaseFileInfo.isAcidSchema();
acidSchemaFiles.add(acidBaseFileInfo.getHdfsFileStatusWithId());
}
}
@@ -2195,14 +2236,14 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
return splitStrategies;
}
- @VisibleForTesting
- static SplitStrategy<?> determineSplitStrategy(CombinedCtx combinedCtx, Context context,
+ private static SplitStrategy<?> determineSplitStrategy(CombinedCtx combinedCtx, Context context,
FileSystem fs, Path dir,
List<HdfsFileStatusWithId> baseFiles,
boolean isOriginal,
List<ParsedDelta> parsedDeltas,
List<OrcProto.Type> readerTypes,
- UserGroupInformation ugi, boolean allowSyntheticFileIds, boolean isDefaultFs) {
+ UserGroupInformation ugi, boolean allowSyntheticFileIds, boolean isDefaultFs)
+ throws IOException {
List<DeltaMetaData> deltas = AcidUtils.serializeDeltas(parsedDeltas);
boolean[] covered = new boolean[context.numBuckets];
@@ -2250,6 +2291,10 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
}
}
+ /**
+ *
+ * @param bucket bucket/writer ID for this split of the compaction job
+ */
@Override
public RawReader<OrcStruct> getRawReader(Configuration conf,
boolean collapseEvents,
@@ -2258,25 +2303,26 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
Path baseDirectory,
Path[] deltaDirectory
) throws IOException {
- Reader reader = null;
boolean isOriginal = false;
+ OrcRawRecordMerger.Options mergerOptions = new OrcRawRecordMerger.Options().isCompacting(true)
+ .isMajorCompaction(collapseEvents);
if (baseDirectory != null) {//this is NULL for minor compaction
- Path bucketFile = null;
+ //it may also be null if there is no base - only deltas
+ mergerOptions.baseDir(baseDirectory);
if (baseDirectory.getName().startsWith(AcidUtils.BASE_PREFIX)) {
- bucketFile = AcidUtils.createBucketFile(baseDirectory, bucket);
+ isOriginal = AcidUtils.MetaDataFile.isRawFormat(baseDirectory, baseDirectory.getFileSystem(conf));
+ mergerOptions.rootPath(baseDirectory.getParent());
} else {
- /**we don't know which file to start reading -
- * {@link OrcRawRecordMerger.OriginalReaderPairToCompact} does*/
isOriginal = true;
+ mergerOptions.rootPath(baseDirectory);
}
- if(bucketFile != null) {
- reader = OrcFile.createReader(bucketFile, OrcFile.readerOptions(conf));
- }
}
- OrcRawRecordMerger.Options mergerOptions = new OrcRawRecordMerger.Options()
- .isCompacting(true)
- .rootPath(baseDirectory).isMajorCompaction(baseDirectory != null);
- return new OrcRawRecordMerger(conf, collapseEvents, reader, isOriginal,
+ else {
+ //since we have no base, there must be at least 1 delta which must a result of acid write
+ //so it must be immediate child of the partition
+ mergerOptions.rootPath(deltaDirectory[0].getParent());
+ }
+ return new OrcRawRecordMerger(conf, collapseEvents, null, isOriginal,
bucket, validTxnList, new Reader.Options(), deltaDirectory, mergerOptions);
}
[2/3] hive git commit: HIVE-17361 Support LOAD DATA for transactional
tables (Eugene Koifman, reviewed by Alan Gates)
Posted by ek...@apache.org.
http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
index 95a60dc..73f27e5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
@@ -88,11 +88,11 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
*/
private int statementId;//sort on this descending, like currentTransactionId
- public ReaderKey() {
+ ReaderKey() {
this(-1, -1, -1, -1, 0);
}
- public ReaderKey(long originalTransaction, int bucket, long rowId,
+ ReaderKey(long originalTransaction, int bucket, long rowId,
long currentTransactionId) {
this(originalTransaction, bucket, rowId, currentTransactionId, 0);
}
@@ -196,6 +196,34 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
void next(OrcStruct next) throws IOException;
}
/**
+ * Used when base_x/bucket_N is missing - makes control flow a bit easier
+ */
+ private class EmptyReaderPair implements ReaderPair {
+ @Override public OrcStruct nextRecord() {
+ return null;
+ }
+ @Override public int getColumns() {
+ return 0;
+ }
+ @Override public RecordReader getRecordReader() {
+ return null;
+ }
+ @Override public Reader getReader() {
+ return null;
+ }
+ @Override public RecordIdentifier getMinKey() {
+ return null;
+ }
+ @Override public RecordIdentifier getMaxKey() {
+ return null;
+ }
+ @Override public ReaderKey getKey() {
+ return null;
+ }
+ @Override public void next(OrcStruct next) throws IOException {
+ }
+ }
+ /**
* A reader and the next record from that reader. The code reads ahead so that
* we can return the lowest ReaderKey from each of the readers. Thus, the
* next available row is nextRecord and only following records are still in
@@ -209,6 +237,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
private final ReaderKey key;
private final RecordIdentifier minKey;
private final RecordIdentifier maxKey;
+ @Deprecated//HIVE-18158
private final int statementId;
/**
@@ -320,12 +349,18 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
private final ReaderKey key;
final int bucketId;
final int bucketProperty;
+ /**
+ * TransactionId to use when generating synthetic ROW_IDs
+ */
+ final long transactionId;
- OriginalReaderPair(ReaderKey key, int bucketId, Configuration conf) throws IOException {
+ OriginalReaderPair(ReaderKey key, int bucketId, Configuration conf, Options mergeOptions,
+ int statementId) throws IOException {
this.key = key;
this.bucketId = bucketId;
assert bucketId >= 0 : "don't support non-bucketed tables yet";
- this.bucketProperty = encodeBucketId(conf, bucketId);
+ this.bucketProperty = encodeBucketId(conf, bucketId, statementId);
+ transactionId = mergeOptions.getTransactionId();
}
@Override public final OrcStruct nextRecord() {
return nextRecord;
@@ -337,7 +372,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
@Override
public final ReaderKey getKey() { return key; }
/**
- * The cumulative number of row in all files of the logical bucket that precede the file
+ * The cumulative number of rows in all files of the logical bucket that precede the file
* represented by {@link #getRecordReader()}
*/
abstract long getRowIdOffset();
@@ -355,9 +390,9 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
new IntWritable(OrcRecordUpdater.INSERT_OPERATION);
nextRecord().setFieldValue(OrcRecordUpdater.OPERATION, operation);
nextRecord().setFieldValue(OrcRecordUpdater.CURRENT_TRANSACTION,
- new LongWritable(0));
+ new LongWritable(transactionId));
nextRecord().setFieldValue(OrcRecordUpdater.ORIGINAL_TRANSACTION,
- new LongWritable(0));
+ new LongWritable(transactionId));
nextRecord().setFieldValue(OrcRecordUpdater.BUCKET,
new IntWritable(bucketProperty));
nextRecord().setFieldValue(OrcRecordUpdater.ROW_ID,
@@ -369,17 +404,17 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
((IntWritable) next.getFieldValue(OrcRecordUpdater.OPERATION))
.set(OrcRecordUpdater.INSERT_OPERATION);
((LongWritable) next.getFieldValue(OrcRecordUpdater.ORIGINAL_TRANSACTION))
- .set(0);
+ .set(transactionId);
((IntWritable) next.getFieldValue(OrcRecordUpdater.BUCKET))
.set(bucketProperty);
((LongWritable) next.getFieldValue(OrcRecordUpdater.CURRENT_TRANSACTION))
- .set(0);
+ .set(transactionId);
((LongWritable) next.getFieldValue(OrcRecordUpdater.ROW_ID))
.set(nextRowId);
nextRecord().setFieldValue(OrcRecordUpdater.ROW,
getRecordReader().next(OrcRecordUpdater.getRow(next)));
}
- key.setValues(0L, bucketProperty, nextRowId, 0L, 0);
+ key.setValues(transactionId, bucketProperty, nextRowId, transactionId, 0);
if (getMaxKey() != null && key.compareRow(getMaxKey()) > 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("key " + key + " > maxkey " + getMaxKey());
@@ -391,9 +426,15 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
return false;//reached EndOfFile
}
}
- static int encodeBucketId(Configuration conf, int bucketId) {
- return BucketCodec.V1.encode(new AcidOutputFormat.Options(conf).bucket(bucketId));
+ static int encodeBucketId(Configuration conf, int bucketId, int statementId) {
+ return BucketCodec.V1.encode(new AcidOutputFormat.Options(conf).bucket(bucketId)
+ .statementId(statementId));
}
+ /**
+ * This handles normal read (as opposed to Compaction) of a {@link AcidUtils.AcidBaseFileType#ORIGINAL_BASE}
+ * file. These may be a result of Load Data or it may be a file that was written to the table
+ * before it was converted to acid.
+ */
@VisibleForTesting
final static class OriginalReaderPairToRead extends OriginalReaderPair {
private final long rowIdOffset;
@@ -401,12 +442,11 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
private final RecordReader recordReader;
private final RecordIdentifier minKey;
private final RecordIdentifier maxKey;
-
OriginalReaderPairToRead(ReaderKey key, Reader reader, int bucketId,
final RecordIdentifier minKey, final RecordIdentifier maxKey,
Reader.Options options, Options mergerOptions, Configuration conf,
- ValidTxnList validTxnList) throws IOException {
- super(key, bucketId, conf);
+ ValidTxnList validTxnList, int statementId) throws IOException {
+ super(key, bucketId, conf, mergerOptions, statementId);
this.reader = reader;
assert !mergerOptions.isCompacting();
assert mergerOptions.getRootPath() != null : "Since we have original files";
@@ -426,6 +466,11 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
boolean haveSeenCurrentFile = false;
long rowIdOffsetTmp = 0;
{
+ /**
+ * Note that for reading base_x/ or delta_x_x/ with non-acid schema,
+ * {@link Options#getRootPath()} is set to base_x/ or delta_x_x/ which causes all it's
+ * contents to be in {@link org.apache.hadoop.hive.ql.io.AcidUtils.Directory#getOriginalFiles()}
+ */
//the split is from something other than the 1st file of the logical bucket - compute offset
AcidUtils.Directory directoryState = AcidUtils.getAcidState(mergerOptions.getRootPath(),
conf, validTxnList, false, true);
@@ -458,7 +503,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
if (rowIdOffset > 0) {
//rowIdOffset could be 0 if all files before current one are empty
/**
- * Since we already done {@link OrcRawRecordMerger#discoverOriginalKeyBounds(Reader, int, Reader.Options, Configuration)}
+ * Since we already done {@link OrcRawRecordMerger#discoverOriginalKeyBounds(Reader, int, Reader.Options, Configuration, Options)}
* need to fix min/max key since these are used by
* {@link #next(OrcStruct)} which uses {@link #rowIdOffset} to generate rowId for
* the key. Clear? */
@@ -469,7 +514,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
* If this is not the 1st file, set minKey 1 less than the start of current file
* (Would not need to set minKey if we knew that there are no delta files)
* {@link #advanceToMinKey()} needs this */
- newMinKey = new RecordIdentifier(0, bucketProperty,rowIdOffset - 1);
+ newMinKey = new RecordIdentifier(transactionId, bucketProperty,rowIdOffset - 1);
}
if (maxKey != null) {
maxKey.setRowId(maxKey.getRowId() + rowIdOffset);
@@ -482,7 +527,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
* of the file so we want to leave it blank to make sure any insert events in delta
* files are included; Conversely, if it's not the last file, set the maxKey so that
* events from deltas that don't modify anything in the current split are excluded*/
- newMaxKey = new RecordIdentifier(0, bucketProperty,
+ newMaxKey = new RecordIdentifier(transactionId, bucketProperty,
rowIdOffset + reader.getNumberOfRows() - 1);
}
this.minKey = newMinKey;
@@ -532,8 +577,8 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
OriginalReaderPairToCompact(ReaderKey key, int bucketId,
Reader.Options options, Options mergerOptions, Configuration conf,
- ValidTxnList validTxnList) throws IOException {
- super(key, bucketId, conf);
+ ValidTxnList validTxnList, int statementId) throws IOException {
+ super(key, bucketId, conf, mergerOptions, statementId);
assert mergerOptions.isCompacting() : "Should only be used for Compaction";
this.conf = conf;
this.options = options;
@@ -544,9 +589,15 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
assert options.getMaxOffset() == Long.MAX_VALUE;
AcidUtils.Directory directoryState = AcidUtils.getAcidState(
mergerOptions.getRootPath(), conf, validTxnList, false, true);
+ /**
+ * Note that for reading base_x/ or delta_x_x/ with non-acid schema,
+ * {@link Options#getRootPath()} is set to base_x/ or delta_x_x/ which causes all it's
+ * contents to be in {@link org.apache.hadoop.hive.ql.io.AcidUtils.Directory#getOriginalFiles()}
+ */
originalFiles = directoryState.getOriginalFiles();
assert originalFiles.size() > 0;
- this.reader = advanceToNextFile();//in case of Compaction, this is the 1st file of the current bucket
+ //in case of Compaction, this is the 1st file of the current bucket
+ this.reader = advanceToNextFile();
if (reader == null) {
//Compactor generated a split for a bucket that has no data?
throw new IllegalStateException("No 'original' files found for bucketId=" + this.bucketId +
@@ -655,7 +706,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
*/
private KeyInterval discoverOriginalKeyBounds(Reader reader, int bucket,
Reader.Options options,
- Configuration conf) throws IOException {
+ Configuration conf, Options mergerOptions) throws IOException {
long rowLength = 0;
long rowOffset = 0;
long offset = options.getOffset();//this would usually be at block boundary
@@ -663,7 +714,9 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
boolean isTail = true;
RecordIdentifier minKey = null;
RecordIdentifier maxKey = null;
- int bucketProperty = encodeBucketId(conf, bucket);
+ TransactionMetaData tfp = TransactionMetaData.findTransactionIDForSynthetcRowIDs(
+ mergerOptions.getBucketPath(), mergerOptions.getRootPath(), conf);
+ int bucketProperty = encodeBucketId(conf, bucket, tfp.statementId);
/**
* options.getOffset() and getMaxOffset() would usually be at block boundary which doesn't
* necessarily match stripe boundary. So we want to come up with minKey to be one before the 1st
@@ -755,13 +808,15 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
* {@link OrcRawRecordMerger} Acid reader is used slightly differently in various contexts.
* This makes the "context" explicit.
*/
- static class Options {
+ static class Options implements Cloneable {
private int copyIndex = 0;
private boolean isCompacting = false;
private Path bucketPath;
private Path rootPath;
+ private Path baseDir;
private boolean isMajorCompaction = false;
private boolean isDeleteReader = false;
+ private long transactionId = 0;
Options copyIndex(int copyIndex) {
assert copyIndex >= 0;
this.copyIndex = copyIndex;
@@ -790,6 +845,14 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
assert !isCompacting;
return this;
}
+ Options transactionId(long transactionId) {
+ this.transactionId = transactionId;
+ return this;
+ }
+ Options baseDir(Path baseDir) {
+ this.baseDir = baseDir;
+ return this;
+ }
/**
* 0 means it's the original file, without {@link Utilities#COPY_KEYWORD} suffix
*/
@@ -825,13 +888,48 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
boolean isDeleteReader() {
return isDeleteReader;
}
+ /**
+ * for reading "original" files - i.e. not native acid schema. Default value of 0 is
+ * appropriate for files that existed in a table before it was made transactional. 0 is the
+ * primordial transaction. For non-native files resulting from Load Data command, they
+ * are located and base_x or delta_x_x and then transactionId == x.
+ */
+ long getTransactionId() {
+ return transactionId;
+ }
+
+ /**
+ * In case of isMajorCompaction() this is the base dir from the Compactor, i.e. either a base_x
+ * or {@link #rootPath} if it's the 1st major compaction after non-acid2acid conversion
+ */
+ Path getBaseDir() {
+ return baseDir;
+ }
+ /**
+ * shallow clone
+ */
+ public Options clone() {
+ try {
+ return (Options) super.clone();
+ }
+ catch(CloneNotSupportedException ex) {
+ throw new AssertionError();
+ }
+ }
}
/**
- * Create a reader that merge sorts the ACID events together.
+ * Create a reader that merge sorts the ACID events together. This handles
+ * 1. 'normal' reads on behalf of a query (non vectorized)
+ * 2. Compaction reads (major/minor)
+ * 3. Delete event reads - to create a sorted view of all delete events for vectorized read
+ *
+ * This makes the logic in the constructor confusing and needs to be refactored. Liberal use of
+ * asserts below is primarily for documentation purposes.
+ *
* @param conf the configuration
* @param collapseEvents should the events on the same row be collapsed
- * @param isOriginal is the base file a pre-acid file
- * @param bucket the bucket we are reading
+ * @param isOriginal if reading filws w/o acid schema - {@link AcidUtils.AcidBaseFileType#ORIGINAL_BASE}
+ * @param bucket the bucket/writer id of the file we are reading
* @param options the options to read with
* @param deltaDirectory the list of delta directories to include
* @throws IOException
@@ -887,11 +985,15 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
objectInspector = OrcRecordUpdater.createEventSchema
(OrcStruct.createObjectInspector(0, OrcUtils.getOrcTypes(typeDescr)));
+ assert !(mergerOptions.isCompacting() && reader != null) : "don't need a reader for compaction";
// modify the options to reflect the event instead of the base row
Reader.Options eventOptions = createEventOptions(options);
+ //suppose it's the first Major compaction so we only have deltas
+ boolean isMajorNoBase = mergerOptions.isCompacting() && mergerOptions.isMajorCompaction()
+ && mergerOptions.getBaseDir() == null;
if((mergerOptions.isCompacting() && mergerOptions.isMinorCompaction()) ||
- mergerOptions.isDeleteReader()) {
+ mergerOptions.isDeleteReader() || isMajorNoBase) {
//for minor compaction, there is no progress report and we don't filter deltas
baseReader = null;
minKey = maxKey = null;
@@ -906,27 +1008,68 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
} else {
// find the min/max based on the offset and length (and more for 'original')
if (isOriginal) {
- keyInterval = discoverOriginalKeyBounds(reader, bucket, options, conf);
+ //note that this KeyInterval may be adjusted later due to copy_N files
+ keyInterval = discoverOriginalKeyBounds(reader, bucket, options, conf, mergerOptions);
} else {
keyInterval = discoverKeyBounds(reader, options);
}
}
LOG.info("min key = " + keyInterval.getMinKey() + ", max key = " + keyInterval.getMaxKey());
// use the min/max instead of the byte range
- ReaderPair pair;
+ ReaderPair pair = null;
ReaderKey key = new ReaderKey();
if (isOriginal) {
options = options.clone();
if(mergerOptions.isCompacting()) {
- pair = new OriginalReaderPairToCompact(key, bucket, options, mergerOptions,
- conf, validTxnList);
+ assert mergerOptions.isMajorCompaction();
+ Options readerPairOptions = mergerOptions;
+ if(mergerOptions.getBaseDir().getName().startsWith(AcidUtils.BASE_PREFIX)) {
+ readerPairOptions = modifyForNonAcidSchemaRead(mergerOptions,
+ AcidUtils.parseBase(mergerOptions.getBaseDir()), mergerOptions.getBaseDir());
+ }
+ pair = new OriginalReaderPairToCompact(key, bucket, options, readerPairOptions,
+ conf, validTxnList,
+ 0);//0 since base_x doesn't have a suffix (neither does pre acid write)
} else {
+ assert mergerOptions.getBucketPath() != null : " since this is not compaction: "
+ + mergerOptions.getRootPath();
+ //if here it's a non-acid schema file - check if from before table was marked transactional
+ //or in base_x/delta_x_x from Load Data
+ Options readerPairOptions = mergerOptions;
+ TransactionMetaData tfp = TransactionMetaData.findTransactionIDForSynthetcRowIDs(
+ mergerOptions.getBucketPath(), mergerOptions.getRootPath(), conf);
+ if(tfp.syntheticTransactionId > 0) {
+ readerPairOptions = modifyForNonAcidSchemaRead(mergerOptions,
+ tfp.syntheticTransactionId, tfp.folder);
+ }
pair = new OriginalReaderPairToRead(key, reader, bucket, keyInterval.getMinKey(),
- keyInterval.getMaxKey(), options, mergerOptions, conf, validTxnList);
+ keyInterval.getMaxKey(), options, readerPairOptions, conf, validTxnList, tfp.statementId);
}
} else {
- pair = new ReaderPairAcid(key, reader, keyInterval.getMinKey(), keyInterval.getMaxKey(),
- eventOptions, 0);
+ if(mergerOptions.isCompacting()) {
+ assert mergerOptions.isMajorCompaction() : "expected major compaction: "
+ + mergerOptions.getBaseDir() + ":" + bucket;
+ assert mergerOptions.getBaseDir() != null : "no baseDir?: " + mergerOptions.getRootPath();
+ //we are compacting and it's acid schema so create a reader for the 1st bucket file that is not empty
+ FileSystem fs = mergerOptions.getBaseDir().getFileSystem(conf);
+ Path bucketPath = AcidUtils.createBucketFile(mergerOptions.getBaseDir(), bucket);
+ if(fs.exists(bucketPath) && fs.getFileStatus(bucketPath).getLen() > 0) {
+ //doing major compaction - it's possible where full compliment of bucket files is not
+ //required (on Tez) that base_x/ doesn't have a file for 'bucket'
+ reader = OrcFile.createReader(bucketPath, OrcFile.readerOptions(conf));
+ pair = new ReaderPairAcid(key, reader, keyInterval.getMinKey(), keyInterval.getMaxKey(),
+ eventOptions, 0);
+ }
+ else {
+ pair = new EmptyReaderPair();
+ LOG.info("No non-empty " + bucketPath + " was found for Major compaction");
+ }
+ }
+ else {
+ assert reader != null : "no reader? " + mergerOptions.getRootPath();
+ pair = new ReaderPairAcid(key, reader, keyInterval.getMinKey(), keyInterval.getMaxKey(),
+ eventOptions, 0);
+ }
}
minKey = pair.getMinKey();
maxKey = pair.getMaxKey();
@@ -937,11 +1080,13 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
}
baseReader = pair.getRecordReader();
}
-
- if (deltaDirectory != null) {
- /*whatever SARG maybe applicable to base it's not applicable to delete_delta since it has no
- * user columns
- * HIVE-17320: we should compute a SARG to push down min/max key to delete_delta*/
+ /*now process the delta files. For normal read these should only be delete deltas. For
+ * Compaction these may be any delta_x_y/. The files inside any delta_x_y/ may be in Acid
+ * format (i.e. with Acid metadata columns) or 'original'.*/
+ if (deltaDirectory != null && deltaDirectory.length > 0) {
+ /*For reads, whatever SARG maybe applicable to base it's not applicable to delete_delta since it has no
+ * user columns. For Compaction there is never a SARG.
+ * */
Reader.Options deltaEventOptions = eventOptions.clone()
.searchArgument(null, null).range(0, Long.MAX_VALUE);
for(Path delta: deltaDirectory) {
@@ -950,17 +1095,50 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
throw new IllegalStateException(delta + " is not delete delta and is not compacting.");
}
ReaderKey key = new ReaderKey();
- AcidUtils.ParsedDelta deltaDir = AcidUtils.parsedDelta(delta);
+ AcidUtils.ParsedDelta deltaDir = AcidUtils.parsedDelta(delta, delta.getFileSystem(conf));
+ if(deltaDir.isRawFormat()) {
+ assert !deltaDir.isDeleteDelta() : delta.toString();
+ assert mergerOptions.isCompacting() : "during regular read anything which is not a" +
+ " delete_delta is treated like base: " + delta;
+ Options rawCompactOptions = modifyForNonAcidSchemaRead(mergerOptions,
+ deltaDir.getMinTransaction(), delta);
+ //this will also handle copy_N files if any
+ ReaderPair deltaPair = new OriginalReaderPairToCompact(key, bucket, options,
+ rawCompactOptions, conf, validTxnList, deltaDir.getStatementId());
+ if (deltaPair.nextRecord() != null) {
+ readers.put(key, deltaPair);
+ }
+ continue;
+ }
for (Path deltaFile : getDeltaFiles(delta, bucket, conf, mergerOptions, isBucketed)) {
FileSystem fs = deltaFile.getFileSystem(conf);
if(!fs.exists(deltaFile)) {
+ /**
+ * it's possible that the file for a specific {@link bucket} doesn't exist in any given
+ * delta since since no rows hashed to it (and not configured to create empty buckets)
+ */
continue;
}
+ if(deltaDir.isDeleteDelta()) {
+ //if here it maybe compaction or regular read or Delete event sorter
+ //in the later 2 cases we should do:
+ //HIVE-17320: we should compute a SARG to push down min/max key to delete_delta
+ Reader deltaReader = OrcFile.createReader(deltaFile, OrcFile.readerOptions(conf));
+ ReaderPair deltaPair = new ReaderPairAcid(key, deltaReader, minKey, maxKey,
+ deltaEventOptions, deltaDir.getStatementId());
+ if (deltaPair.nextRecord() != null) {
+ readers.put(key, deltaPair);
+ }
+ continue;
+ }
+ //if here then we must be compacting
+ assert mergerOptions.isCompacting() : "not compacting and not delete delta : " + delta;
/* side files are only created by streaming ingest. If this is a compaction, we may
* have an insert delta/ here with side files there because the original writer died.*/
long length = AcidUtils.getLogicalLength(fs, fs.getFileStatus(deltaFile));
assert length >= 0;
Reader deltaReader = OrcFile.createReader(deltaFile, OrcFile.readerOptions(conf).maxLength(length));
+ //must get statementId from file name since Acid 1.0 doesn't write it into bucketProperty
ReaderPairAcid deltaPair = new ReaderPairAcid(key, deltaReader, minKey, maxKey,
deltaEventOptions, deltaDir.getStatementId());
if (deltaPair.nextRecord() != null) {
@@ -988,6 +1166,76 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
}
/**
+ * For use with Load Data statement which places {@link AcidUtils.AcidBaseFileType#ORIGINAL_BASE}
+ * type files into a base_x/ or delta_x_x. The data in these are then assigned ROW_IDs at read
+ * time and made permanent at compaction time. This is identical to how 'original' files (i.e.
+ * those that existed in the table before it was converted to an Acid table) except that the
+ * transaction ID to use in the ROW_ID should be that of the transaction that ran the Load Data.
+ */
+ static final class TransactionMetaData {
+ final long syntheticTransactionId;
+ /**
+ * folder which determines the transaction id to use in synthetic ROW_IDs
+ */
+ final Path folder;
+ final int statementId;
+ TransactionMetaData(long syntheticTransactionId, Path folder) {
+ this(syntheticTransactionId, folder, 0);
+ }
+ TransactionMetaData(long syntheticTransactionId, Path folder, int statementId) {
+ this.syntheticTransactionId = syntheticTransactionId;
+ this.folder = folder;
+ this.statementId = statementId;
+ }
+ static TransactionMetaData findTransactionIDForSynthetcRowIDs(Path splitPath, Path rootPath,
+ Configuration conf) throws IOException {
+ Path parent = splitPath.getParent();
+ if(rootPath.equals(parent)) {
+ //the 'isOriginal' file is at the root of the partition (or table) thus it is
+ //from a pre-acid conversion write and belongs to primordial txnid:0.
+ return new TransactionMetaData(0, parent);
+ }
+ while(parent != null && !rootPath.equals(parent)) {
+ boolean isBase = parent.getName().startsWith(AcidUtils.BASE_PREFIX);
+ boolean isDelta = parent.getName().startsWith(AcidUtils.DELTA_PREFIX);
+ if(isBase || isDelta) {
+ if(isBase) {
+ return new TransactionMetaData(AcidUtils.parseBase(parent), parent);
+ }
+ else {
+ AcidUtils.ParsedDelta pd = AcidUtils.parsedDelta(parent, AcidUtils.DELTA_PREFIX,
+ parent.getFileSystem(conf));
+ assert pd.getMinTransaction() == pd.getMaxTransaction() :
+ "This a delta with raw non acid schema, must be result of single write, no compaction: "
+ + splitPath;
+ return new TransactionMetaData(pd.getMinTransaction(), parent, pd.getStatementId());
+ }
+ }
+ parent = parent.getParent();
+ }
+ if(parent == null) {
+ //spit is marked isOriginal but it's not an immediate child of a partition nor is it in a
+ //base/ or delta/ - this should never happen
+ throw new IllegalStateException("Cannot determine transaction id for original file "
+ + splitPath + " in " + rootPath);
+ }
+ //"warehouse/t/HIVE_UNION_SUBDIR_15/000000_0" is a meaningful path for nonAcid2acid
+ // converted table
+ return new TransactionMetaData(0, rootPath);
+ }
+ }
+ /**
+ * This is done to read non-acid schema files ("original") located in base_x/ or delta_x_x/ which
+ * happens as a result of Load Data statement. Setting {@code rootPath} to base_x/ or delta_x_x
+ * causes {@link AcidUtils#getAcidState(Path, Configuration, ValidTxnList)} in subsequent
+ * {@link OriginalReaderPair} object to return the files in this dir
+ * in {@link AcidUtils.Directory#getOriginalFiles()}
+ * @return modified clone of {@code baseOptions}
+ */
+ private Options modifyForNonAcidSchemaRead(Options baseOptions, long transactionId, Path rootPath) {
+ return baseOptions.clone().transactionId(transactionId).rootPath(rootPath);
+ }
+ /**
* This determines the set of {@link ReaderPairAcid} to create for a given delta/.
* For unbucketed tables {@code bucket} can be thought of as a write tranche.
*/
http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
index 315cc1d..8af38b2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
@@ -196,7 +196,9 @@ public class OrcRecordUpdater implements RecordUpdater {
fields.add(new OrcStruct.Field("row", rowInspector, ROW));
return new OrcStruct.OrcStructInspector(fields);
}
-
+ /**
+ * @param path - partition root
+ */
OrcRecordUpdater(Path path,
AcidOutputFormat.Options options) throws IOException {
this.options = options;
http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
index 58638b5..edffa5b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
@@ -51,6 +51,9 @@ public class OrcSplit extends FileSplit implements ColumnarSplit, LlapAwareSplit
private static final Logger LOG = LoggerFactory.getLogger(OrcSplit.class);
private OrcTail orcTail;
private boolean hasFooter;
+ /**
+ * This means {@link AcidUtils.AcidBaseFileType#ORIGINAL_BASE}
+ */
private boolean isOriginal;
private boolean hasBase;
//partition root
http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
index bcde4fc..d571bd0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
@@ -45,7 +45,6 @@ import org.apache.hadoop.hive.ql.io.RecordIdentifier;
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
import org.apache.hadoop.hive.shims.HadoopShims;
import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
import org.apache.orc.impl.AcidStats;
@@ -156,7 +155,7 @@ public class VectorizedOrcAcidRowBatchReader
this.vectorizedRowBatchBase = baseReader.createValue();
}
- private VectorizedOrcAcidRowBatchReader(JobConf conf, OrcSplit inputSplit, Reporter reporter,
+ private VectorizedOrcAcidRowBatchReader(JobConf conf, OrcSplit orcSplit, Reporter reporter,
VectorizedRowBatchCtx rowBatchCtx) throws IOException {
this.rbCtx = rowBatchCtx;
final boolean isAcidRead = HiveConf.getBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN);
@@ -165,12 +164,10 @@ public class VectorizedOrcAcidRowBatchReader
// This type of VectorizedOrcAcidRowBatchReader can only be created when split-update is
// enabled for an ACID case and the file format is ORC.
- boolean isReadNotAllowed = !isAcidRead || !acidOperationalProperties.isSplitUpdate()
- || !(inputSplit instanceof OrcSplit);
+ boolean isReadNotAllowed = !isAcidRead || !acidOperationalProperties.isSplitUpdate();
if (isReadNotAllowed) {
OrcInputFormat.raiseAcidTablesMustBeReadWithAcidReaderException(conf);
}
- final OrcSplit orcSplit = (OrcSplit) inputSplit;
reporter.setStatus(orcSplit.toString());
readerOptions = OrcRawRecordMerger.createEventOptions(OrcInputFormat.createOptionsForReader(conf));
@@ -226,9 +223,11 @@ public class VectorizedOrcAcidRowBatchReader
private static final class OffsetAndBucketProperty {
private final long rowIdOffset;
private final int bucketProperty;
- private OffsetAndBucketProperty(long rowIdOffset, int bucketProperty) {
+ private final long syntheticTxnId;
+ private OffsetAndBucketProperty(long rowIdOffset, int bucketProperty, long syntheticTxnId) {
this.rowIdOffset = rowIdOffset;
this.bucketProperty = bucketProperty;
+ this.syntheticTxnId = syntheticTxnId;
}
}
/**
@@ -240,17 +239,34 @@ public class VectorizedOrcAcidRowBatchReader
*
* todo: This logic is executed per split of every "original" file. The computed result is the
* same for every split form the same file so this could be optimized by moving it to
- * before/during splt computation and passing the info in the split. (HIVE-17917)
+ * before/during split computation and passing the info in the split. (HIVE-17917)
*/
private OffsetAndBucketProperty computeOffsetAndBucket(
OrcSplit split, JobConf conf,ValidTxnList validTxnList) throws IOException {
- if(!needSyntheticRowIds(split, !deleteEventRegistry.isEmpty(), rowIdProjected)) {
- return new OffsetAndBucketProperty(0,0);
+ if(!needSyntheticRowIds(split.isOriginal(), !deleteEventRegistry.isEmpty(), rowIdProjected)) {
+ if(split.isOriginal()) {
+ /**
+ * Even if we don't need to project ROW_IDs, we still need to check the transaction ID that
+ * created the file to see if it's committed. See more in
+ * {@link #next(NullWritable, VectorizedRowBatch)}. (In practice getAcidState() should
+ * filter out base/delta files but this makes fewer dependencies)
+ */
+ OrcRawRecordMerger.TransactionMetaData syntheticTxnInfo =
+ OrcRawRecordMerger.TransactionMetaData.findTransactionIDForSynthetcRowIDs(split.getPath(),
+ split.getRootDir(), conf);
+ return new OffsetAndBucketProperty(-1,-1,
+ syntheticTxnInfo.syntheticTransactionId);
+ }
+ return null;
}
long rowIdOffset = 0;
+ OrcRawRecordMerger.TransactionMetaData syntheticTxnInfo =
+ OrcRawRecordMerger.TransactionMetaData.findTransactionIDForSynthetcRowIDs(split.getPath(),
+ split.getRootDir(), conf);
int bucketId = AcidUtils.parseBaseOrDeltaBucketFilename(split.getPath(), conf).getBucketId();
- int bucketProperty = BucketCodec.V1.encode(new AcidOutputFormat.Options(conf).statementId(0).bucket(bucketId));
- AcidUtils.Directory directoryState = AcidUtils.getAcidState(split.getRootDir(), conf,
+ int bucketProperty = BucketCodec.V1.encode(new AcidOutputFormat.Options(conf)
+ .statementId(syntheticTxnInfo.statementId).bucket(bucketId));
+ AcidUtils.Directory directoryState = AcidUtils.getAcidState( syntheticTxnInfo.folder, conf,
validTxnList, false, true);
for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) {
AcidOutputFormat.Options bucketOptions =
@@ -266,7 +282,8 @@ public class VectorizedOrcAcidRowBatchReader
OrcFile.readerOptions(conf));
rowIdOffset += reader.getNumberOfRows();
}
- return new OffsetAndBucketProperty(rowIdOffset, bucketProperty);
+ return new OffsetAndBucketProperty(rowIdOffset, bucketProperty,
+ syntheticTxnInfo.syntheticTransactionId);
}
/**
* {@link VectorizedOrcAcidRowBatchReader} is always used for vectorized reads of acid tables.
@@ -284,7 +301,7 @@ public class VectorizedOrcAcidRowBatchReader
if(rbCtx == null) {
throw new IllegalStateException("Could not create VectorizedRowBatchCtx for " + split.getPath());
}
- return !needSyntheticRowIds(split, hasDeletes, areRowIdsProjected(rbCtx));
+ return !needSyntheticRowIds(split.isOriginal(), hasDeletes, areRowIdsProjected(rbCtx));
}
/**
@@ -292,8 +309,8 @@ public class VectorizedOrcAcidRowBatchReader
* Even if ROW__ID is not projected you still need to decorate the rows with them to see if
* any of the delete events apply.
*/
- private static boolean needSyntheticRowIds(OrcSplit split, boolean hasDeletes, boolean rowIdProjected) {
- return split.isOriginal() && (hasDeletes || rowIdProjected);
+ private static boolean needSyntheticRowIds(boolean isOriginal, boolean hasDeletes, boolean rowIdProjected) {
+ return isOriginal && (hasDeletes || rowIdProjected);
}
private static boolean areRowIdsProjected(VectorizedRowBatchCtx rbCtx) {
if(rbCtx.getVirtualColumnCount() == 0) {
@@ -316,7 +333,7 @@ public class VectorizedOrcAcidRowBatchReader
if (orcSplit.isOriginal()) {
root = orcSplit.getRootDir();
} else {
- root = path.getParent().getParent();
+ root = path.getParent().getParent();//todo: why not just use getRootDir()?
assert root.equals(orcSplit.getRootDir()) : "root mismatch: baseDir=" + orcSplit.getRootDir() +
" path.p.p=" + root;
}
@@ -398,7 +415,9 @@ public class VectorizedOrcAcidRowBatchReader
* If there are deletes and reading original file, we must produce synthetic ROW_IDs in order
* to see if any deletes apply
*/
- if(rowIdProjected || !deleteEventRegistry.isEmpty()) {
+ if(needSyntheticRowIds(true, !deleteEventRegistry.isEmpty(), rowIdProjected)) {
+ assert syntheticProps != null && syntheticProps.rowIdOffset >= 0 : "" + syntheticProps;
+ assert syntheticProps != null && syntheticProps.bucketProperty >= 0 : "" + syntheticProps;
if(innerReader == null) {
throw new IllegalStateException(getClass().getName() + " requires " +
org.apache.orc.RecordReader.class +
@@ -409,8 +428,7 @@ public class VectorizedOrcAcidRowBatchReader
*/
recordIdColumnVector.fields[0].noNulls = true;
recordIdColumnVector.fields[0].isRepeating = true;
- //all "original" is considered written by txnid:0 which committed
- ((LongColumnVector)recordIdColumnVector.fields[0]).vector[0] = 0;
+ ((LongColumnVector)recordIdColumnVector.fields[0]).vector[0] = syntheticProps.syntheticTxnId;
/**
* This is {@link RecordIdentifier#getBucketProperty()}
* Also see {@link BucketCodec}
@@ -433,15 +451,21 @@ public class VectorizedOrcAcidRowBatchReader
innerRecordIdColumnVector[OrcRecordUpdater.ORIGINAL_TRANSACTION] = recordIdColumnVector.fields[0];
innerRecordIdColumnVector[OrcRecordUpdater.BUCKET] = recordIdColumnVector.fields[1];
innerRecordIdColumnVector[OrcRecordUpdater.ROW_ID] = recordIdColumnVector.fields[2];
+ //these are insert events so (original txn == current) txn for all rows
+ innerRecordIdColumnVector[OrcRecordUpdater.CURRENT_TRANSACTION] = recordIdColumnVector.fields[0];
+ }
+ if(syntheticProps.syntheticTxnId > 0) {
+ //"originals" (written before table was converted to acid) is considered written by
+ // txnid:0 which is always committed so there is no need to check wrt invalid transactions
+ //But originals written by Load Data for example can be in base_x or delta_x_x so we must
+ //check if 'x' is committed or not evn if ROW_ID is not needed in the Operator pipeline.
+ findRecordsWithInvalidTransactionIds(innerRecordIdColumnVector,
+ vectorizedRowBatchBase.size, selectedBitSet);
}
}
else {
// Case 1- find rows which belong to transactions that are not valid.
findRecordsWithInvalidTransactionIds(vectorizedRowBatchBase, selectedBitSet);
- /**
- * All "original" data belongs to txnid:0 and is always valid/committed for every reader
- * So only do findRecordsWithInvalidTransactionIds() wrt {@link validTxnList} for !isOriginal
- */
}
// Case 2- find rows which have been deleted.
@@ -473,11 +497,6 @@ public class VectorizedOrcAcidRowBatchReader
}
else {
// Finally, link up the columnVector from the base VectorizedRowBatch to outgoing batch.
- // NOTE: We only link up the user columns and not the ACID metadata columns because this
- // vectorized code path is not being used in cases of update/delete, when the metadata columns
- // would be expected to be passed up the operator pipeline. This is because
- // currently the update/delete specifically disable vectorized code paths.
- // This happens at ql/exec/Utilities.java::3293 when it checks for mapWork.getVectorMode()
StructColumnVector payloadStruct = (StructColumnVector) vectorizedRowBatchBase.cols[OrcRecordUpdater.ROW];
// Transfer columnVector objects from base batch to outgoing batch.
System.arraycopy(payloadStruct.fields, 0, value.cols, 0, value.getDataColumnCount());
http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
index f7388a4..736034d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
@@ -27,12 +27,10 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
-import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.InputFormatChecker;
import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface;
import org.apache.hadoop.io.NullWritable;
http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index 6fb0c43..fdb3603 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hive.ql.plan.LockDatabaseDesc;
import org.apache.hadoop.hive.ql.plan.LockTableDesc;
import org.apache.hadoop.hive.ql.plan.UnlockDatabaseDesc;
import org.apache.hadoop.hive.ql.plan.UnlockTableDesc;
+import org.apache.hadoop.hive.ql.plan.api.Query;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hive.common.util.ShutdownHookManager;
import org.slf4j.Logger;
@@ -297,6 +298,10 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
break;
default:
if(!queryPlan.getOperation().isAllowedInTransaction() && isExplicitTransaction) {
+ if(allowOperationInATransaction(queryPlan)) {
+ break;
+ }
+ //look at queryPlan.outputs(WriteEntity.t - that's the table)
//for example, drop table in an explicit txn is not allowed
//in some cases this requires looking at more than just the operation
//for example HiveOperation.LOAD - OK if target is MM table but not OK if non-acid table
@@ -311,6 +316,33 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
any non acid and raise an appropriate error
* Driver.acidSinks and Driver.acidInQuery can be used if any acid is in the query*/
}
+
+ /**
+ * This modifies the logic wrt what operations are allowed in a transaction. Multi-statement
+ * transaction support is incomplete but it makes some Acid tests cases much easier to write.
+ */
+ private boolean allowOperationInATransaction(QueryPlan queryPlan) {
+ //Acid and MM tables support Load Data with transactional semantics. This will allow Load Data
+ //in a txn assuming we can determine the target is a suitable table type.
+ if(queryPlan.getOperation() == HiveOperation.LOAD && queryPlan.getOutputs() != null && queryPlan.getOutputs().size() == 1) {
+ WriteEntity writeEntity = queryPlan.getOutputs().iterator().next();
+ if(AcidUtils.isFullAcidTable(writeEntity.getTable()) || AcidUtils.isInsertOnlyTable(writeEntity.getTable())) {
+ switch (writeEntity.getWriteType()) {
+ case INSERT:
+ //allow operation in a txn
+ return true;
+ case INSERT_OVERWRITE:
+ //see HIVE-18154
+ return false;
+ default:
+ //not relevant for LOAD
+ return false;
+ }
+ }
+ }
+ //todo: handle Insert Overwrite as well: HIVE-18154
+ return false;
+ }
/**
* Normally client should call {@link #acquireLocks(org.apache.hadoop.hive.ql.QueryPlan, org.apache.hadoop.hive.ql.Context, String)}
* @param isBlocking if false, the method will return immediately; thus the locks may be in LockState.WAITING
http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 1a37bf7..9f2c6d8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -146,6 +146,7 @@ import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.index.HiveIndexHandler;
import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.optimizer.listbucketingpruner.ListBucketingPrunerUtils;
import org.apache.hadoop.hive.ql.plan.AddPartitionDesc;
@@ -1705,18 +1706,20 @@ public class Hive {
* location/inputformat/outputformat/serde details from table spec
* @param isSrcLocal
* If the source directory is LOCAL
- * @param isAcid
- * true if this is an ACID operation
+ * @param isAcidIUDoperation
+ * true if this is an ACID operation Insert/Update/Delete operation
* @param hasFollowingStatsTask
* true if there is a following task which updates the stats, so, this method need not update.
* @return Partition object being loaded with data
*/
public Partition loadPartition(Path loadPath, Table tbl, Map<String, String> partSpec,
LoadFileType loadFileType, boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir,
- boolean isSrcLocal, boolean isAcid, boolean hasFollowingStatsTask, Long txnId, int stmtId)
+ boolean isSrcLocal, boolean isAcidIUDoperation, boolean hasFollowingStatsTask, Long txnId, int stmtId)
throws HiveException {
Path tblDataLocationPath = tbl.getDataLocation();
boolean isMmTableWrite = AcidUtils.isInsertOnlyTable(tbl.getParameters());
+ assert tbl.getPath() != null : "null==getPath() for " + tbl.getTableName();
+ boolean isFullAcidTable = AcidUtils.isFullAcidTable(tbl);
try {
// Get the partition object if it already exists
Partition oldPart = getPartition(tbl, partSpec, false);
@@ -1768,7 +1771,7 @@ public class Hive {
if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
Utilities.FILE_OP_LOGGER.trace("not moving " + loadPath + " to " + newPartPath + " (MM)");
}
- assert !isAcid;
+ assert !isAcidIUDoperation;
if (areEventsForDmlNeeded(tbl, oldPart)) {
newFiles = listFilesCreatedByQuery(loadPath, txnId, stmtId);
}
@@ -1792,16 +1795,22 @@ public class Hive {
filter = (loadFileType == LoadFileType.REPLACE_ALL)
? new JavaUtils.IdPathFilter(txnId, stmtId, false, true) : filter;
}
+ else if(!isAcidIUDoperation && isFullAcidTable) {
+ destPath = fixFullAcidPathForLoadData(loadFileType, destPath, txnId, stmtId, tbl);
+ }
if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
Utilities.FILE_OP_LOGGER.trace("moving " + loadPath + " to " + destPath);
}
- if ((loadFileType == LoadFileType.REPLACE_ALL) || (oldPart == null && !isAcid)) {
+ //todo: why is "&& !isAcidIUDoperation" needed here?
+ if (!isFullAcidTable && ((loadFileType == LoadFileType.REPLACE_ALL) || (oldPart == null && !isAcidIUDoperation))) {
+ //for fullAcid tables we don't delete files for commands with OVERWRITE - we create a new
+ // base_x. (there is Insert Overwrite and Load Data Overwrite)
boolean isAutoPurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge"));
replaceFiles(tbl.getPath(), loadPath, destPath, oldPartPath, getConf(),
isSrcLocal, isAutoPurge, newFiles, filter, isMmTableWrite);
} else {
FileSystem fs = tbl.getDataLocation().getFileSystem(conf);
- copyFiles(conf, loadPath, destPath, fs, isSrcLocal, isAcid,
+ copyFiles(conf, loadPath, destPath, fs, isSrcLocal, isAcidIUDoperation,
(loadFileType == LoadFileType.OVERWRITE_EXISTING), newFiles);
}
}
@@ -1891,6 +1900,38 @@ public class Hive {
}
}
+ /**
+ * Load Data commands for fullAcid tables write to base_x (if there is overwrite clause) or
+ * delta_x_x directory - same as any other Acid write. This method modifies the destPath to add
+ * this path component.
+ * @param txnId - id of current transaction (in which this operation is running)
+ * @param stmtId - see {@link DbTxnManager#getWriteIdAndIncrement()}
+ * @return appropriately modified path
+ */
+ private Path fixFullAcidPathForLoadData(LoadFileType loadFileType, Path destPath, long txnId, int stmtId, Table tbl) throws HiveException {
+ switch (loadFileType) {
+ case REPLACE_ALL:
+ destPath = new Path(destPath, AcidUtils.baseDir(txnId));
+ break;
+ case KEEP_EXISTING:
+ destPath = new Path(destPath, AcidUtils.deltaSubdir(txnId, txnId, stmtId));
+ break;
+ case OVERWRITE_EXISTING:
+ //should not happen here - this is for replication
+ default:
+ throw new IllegalArgumentException("Unexpected " + LoadFileType.class.getName() + " " + loadFileType);
+ }
+ try {
+ FileSystem fs = tbl.getDataLocation().getFileSystem(SessionState.getSessionConf());
+ if(!FileUtils.mkdir(fs, destPath, conf)) {
+ LOG.warn(destPath + " already exists?!?!");
+ }
+ AcidUtils.MetaDataFile.createMetaFile(destPath, fs, true);
+ } catch (IOException e) {
+ throw new HiveException("load: error while creating " + destPath + ";loadFileType=" + loadFileType, e);
+ }
+ return destPath;
+ }
private boolean areEventsForDmlNeeded(Table tbl, Partition oldPart) {
return conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary() && oldPart != null;
@@ -2125,7 +2166,6 @@ private void constructOneLBLocationMap(FileStatus fSta,
* @param partSpec
* @param loadFileType
* @param numDP number of dynamic partitions
- * @param listBucketingEnabled
* @param isAcid true if this is an ACID operation
* @param txnId txnId, can be 0 unless isAcid == true
* @return partition map details (PartitionSpec and Partition)
@@ -2273,14 +2313,16 @@ private void constructOneLBLocationMap(FileStatus fSta,
* if list bucketing enabled
* @param hasFollowingStatsTask
* if there is any following stats task
- * @param isAcid true if this is an ACID based write
+ * @param isAcidIUDoperation true if this is an ACID based Insert [overwrite]/update/delete
*/
public void loadTable(Path loadPath, String tableName, LoadFileType loadFileType, boolean isSrcLocal,
- boolean isSkewedStoreAsSubdir, boolean isAcid, boolean hasFollowingStatsTask,
- Long txnId, int stmtId, boolean isMmTable) throws HiveException {
-
+ boolean isSkewedStoreAsSubdir, boolean isAcidIUDoperation, boolean hasFollowingStatsTask,
+ Long txnId, int stmtId) throws HiveException {
List<Path> newFiles = null;
Table tbl = getTable(tableName);
+ assert tbl.getPath() != null : "null==getPath() for " + tbl.getTableName();
+ boolean isMmTable = AcidUtils.isInsertOnlyTable(tbl);
+ boolean isFullAcidTable = AcidUtils.isFullAcidTable(tbl);
HiveConf sessionConf = SessionState.getSessionConf();
if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary()) {
newFiles = Collections.synchronizedList(new ArrayList<Path>());
@@ -2298,24 +2340,31 @@ private void constructOneLBLocationMap(FileStatus fSta,
newFiles = listFilesCreatedByQuery(loadPath, txnId, stmtId);
} else {
// Either a non-MM query, or a load into MM table from an external source.
- Path tblPath = tbl.getPath(), destPath = tblPath;
+ Path tblPath = tbl.getPath();
+ Path destPath = tblPath;
PathFilter filter = FileUtils.HIDDEN_FILES_PATH_FILTER;
if (isMmTable) {
+ assert !isAcidIUDoperation;
// We will load into MM directory, and delete from the parent if needed.
destPath = new Path(destPath, AcidUtils.deltaSubdir(txnId, txnId, stmtId));
filter = loadFileType == LoadFileType.REPLACE_ALL
? new JavaUtils.IdPathFilter(txnId, stmtId, false, true) : filter;
}
+ else if(!isAcidIUDoperation && isFullAcidTable) {
+ destPath = fixFullAcidPathForLoadData(loadFileType, destPath, txnId, stmtId, tbl);
+ }
Utilities.FILE_OP_LOGGER.debug("moving " + loadPath + " to " + tblPath
+ " (replace = " + loadFileType + ")");
- if (loadFileType == LoadFileType.REPLACE_ALL) {
+ if (loadFileType == LoadFileType.REPLACE_ALL && !isFullAcidTable) {
+ //for fullAcid we don't want to delete any files even for OVERWRITE see HIVE-14988/HIVE-17361
+ //todo: should probably do the same for MM IOW
boolean isAutopurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge"));
replaceFiles(tblPath, loadPath, destPath, tblPath,
sessionConf, isSrcLocal, isAutopurge, newFiles, filter, isMmTable);
} else {
try {
FileSystem fs = tbl.getDataLocation().getFileSystem(sessionConf);
- copyFiles(sessionConf, loadPath, destPath, fs, isSrcLocal, isAcid,
+ copyFiles(sessionConf, loadPath, destPath, fs, isSrcLocal, isAcidIUDoperation,
loadFileType == LoadFileType.OVERWRITE_EXISTING, newFiles);
} catch (IOException e) {
throw new HiveException("addFiles: filesystem error in check phase", e);
@@ -2358,7 +2407,6 @@ private void constructOneLBLocationMap(FileStatus fSta,
fireInsertEvent(tbl, null, (loadFileType == LoadFileType.REPLACE_ALL), newFiles);
}
-
/**
* Creates a partition.
*
http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index cd75130..a1b6cda 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@ -391,7 +391,6 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
LoadTableDesc loadTableWork = new LoadTableDesc(destPath,
Utilities.getTableDesc(table), new TreeMap<>(),
replace ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING, txnId);
- loadTableWork.setTxnId(txnId);
loadTableWork.setStmtId(stmtId);
MoveWork mv = new MoveWork(x.getInputs(), x.getOutputs(), loadTableWork, null, false, SessionState.get().getLineageState());
Task<?> loadTableTask = TaskFactory.get(mv, x.getConf());
@@ -400,6 +399,10 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
return loadTableTask;
}
+ /**
+ * todo: this is odd: transactions are opened for all statements. what is this supposed to check?
+ */
+ @Deprecated
private static boolean isAcid(Long txnId) {
return (txnId != null) && (txnId != 0);
}
@@ -490,7 +493,6 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
partSpec.getPartSpec(),
replicationSpec.isReplace() ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING,
txnId);
- loadTableWork.setTxnId(txnId);
loadTableWork.setStmtId(stmtId);
loadTableWork.setInheritTableSpecs(false);
Task<?> loadPartTask = TaskFactory.get(new MoveWork(
http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
index 238fbd6..cc956da 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
@@ -136,7 +136,7 @@ public class LoadSemanticAnalyzer extends BaseSemanticAnalyzer {
}
private List<FileStatus> applyConstraintsAndGetFiles(URI fromURI, Tree ast,
- boolean isLocal) throws SemanticException {
+ boolean isLocal, Table table) throws SemanticException {
FileStatus[] srcs = null;
@@ -159,6 +159,14 @@ public class LoadSemanticAnalyzer extends BaseSemanticAnalyzer {
throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(ast,
"source contains directory: " + oneSrc.getPath().toString()));
}
+ if(AcidUtils.isFullAcidTable(table)) {
+ if(!AcidUtils.originalBucketFilter.accept(oneSrc.getPath())) {
+ //acid files (e.g. bucket_0000) have ROW_ID embedded in them and so can't be simply
+ //copied to a table so only allow non-acid files for now
+ throw new SemanticException(ErrorMsg.ACID_LOAD_DATA_INVALID_FILE_NAME,
+ oneSrc.getPath().getName(), table.getDbName() + "." + table.getTableName());
+ }
+ }
}
} catch (IOException e) {
// Has to use full name to make sure it does not conflict with
@@ -230,11 +238,8 @@ public class LoadSemanticAnalyzer extends BaseSemanticAnalyzer {
}
}
- if(AcidUtils.isAcidTable(ts.tableHandle) && !AcidUtils.isInsertOnlyTable(ts.tableHandle.getParameters())) {
- throw new SemanticException(ErrorMsg.LOAD_DATA_ON_ACID_TABLE, ts.tableHandle.getCompleteName());
- }
// make sure the arguments make sense
- List<FileStatus> files = applyConstraintsAndGetFiles(fromURI, fromTree, isLocal);
+ List<FileStatus> files = applyConstraintsAndGetFiles(fromURI, fromTree, isLocal, ts.tableHandle);
// for managed tables, make sure the file formats match
if (TableType.MANAGED_TABLE.equals(ts.tableHandle.getTableType())
@@ -277,17 +282,16 @@ public class LoadSemanticAnalyzer extends BaseSemanticAnalyzer {
}
Long txnId = null;
- int stmtId = 0;
- Table tbl = ts.tableHandle;
- if (AcidUtils.isInsertOnlyTable(tbl.getParameters())) {
+ int stmtId = -1;
+ if (AcidUtils.isAcidTable(ts.tableHandle)) {
txnId = SessionState.get().getTxnMgr().getCurrentTxnId();
+ stmtId = SessionState.get().getTxnMgr().getWriteIdAndIncrement();
}
LoadTableDesc loadTableWork;
loadTableWork = new LoadTableDesc(new Path(fromURI),
Utilities.getTableDesc(ts.tableHandle), partSpec,
isOverWrite ? LoadFileType.REPLACE_ALL : LoadFileType.KEEP_EXISTING, txnId);
- loadTableWork.setTxnId(txnId);
loadTableWork.setStmtId(stmtId);
if (preservePartitionSpecs){
// Note : preservePartitionSpecs=true implies inheritTableSpecs=false but
http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
index 1fa7b40..4683c9c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
@@ -47,9 +47,22 @@ public class LoadTableDesc extends LoadDesc implements Serializable {
private Map<String, String> partitionSpec; // NOTE: this partitionSpec has to be ordered map
public enum LoadFileType {
- REPLACE_ALL, // Remove all existing data before copy/move
- KEEP_EXISTING, // If any file exist while copy, then just duplicate the file
- OVERWRITE_EXISTING // If any file exist while copy, then just overwrite the file
+ /**
+ * This corresponds to INSERT OVERWRITE and REPL LOAD for INSERT OVERWRITE event.
+ * Remove all existing data before copy/move
+ */
+ REPLACE_ALL,
+ /**
+ * This corresponds to INSERT INTO and LOAD DATA.
+ * If any file exist while copy, then just duplicate the file
+ */
+ KEEP_EXISTING,
+ /**
+ * This corresponds to REPL LOAD where if we re-apply the same event then need to overwrite
+ * the file instead of making a duplicate copy.
+ * If any file exist while copy, then just overwrite the file
+ */
+ OVERWRITE_EXISTING
}
public LoadTableDesc(final LoadTableDesc o) {
super(o.getSourcePath(), o.getWriteType());
@@ -215,14 +228,10 @@ public class LoadTableDesc extends LoadDesc implements Serializable {
return currentTransactionId == null ? 0 : currentTransactionId;
}
- public void setTxnId(Long txnId) {
- this.currentTransactionId = txnId;
- }
-
public int getStmtId() {
return stmtId;
}
-
+ //todo: should this not be passed in the c'tor?
public void setStmtId(int stmtId) {
this.stmtId = stmtId;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index 7d4d379..a804527 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -37,7 +37,6 @@ import org.apache.hadoop.hive.common.StringableMap;
import org.apache.hadoop.hive.common.ValidCompactorTxnList;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.api.CompactionType;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
@@ -577,11 +576,16 @@ public class CompactorMR {
dir.getName().startsWith(AcidUtils.DELTA_PREFIX) ||
dir.getName().startsWith(AcidUtils.DELETE_DELTA_PREFIX)) {
boolean sawBase = dir.getName().startsWith(AcidUtils.BASE_PREFIX);
+ boolean isRawFormat = !dir.getName().startsWith(AcidUtils.DELETE_DELTA_PREFIX)
+ && AcidUtils.MetaDataFile.isRawFormat(dir, fs);//deltes can't be raw format
- FileStatus[] files = fs.listStatus(dir, AcidUtils.bucketFileFilter);
+ FileStatus[] files = fs.listStatus(dir, isRawFormat ? AcidUtils.originalBucketFilter
+ : AcidUtils.bucketFileFilter);
for(FileStatus f : files) {
// For each file, figure out which bucket it is.
- Matcher matcher = AcidUtils.BUCKET_DIGIT_PATTERN.matcher(f.getPath().getName());
+ Matcher matcher = isRawFormat ?
+ AcidUtils.LEGACY_BUCKET_DIGIT_PATTERN.matcher(f.getPath().getName())
+ : AcidUtils.BUCKET_DIGIT_PATTERN.matcher(f.getPath().getName());
addFileToMap(matcher, f.getPath(), sawBase, splitToBucketMap);
}
} else {
@@ -612,8 +616,12 @@ public class CompactorMR {
private void addFileToMap(Matcher matcher, Path file, boolean sawBase,
Map<Integer, BucketTracker> splitToBucketMap) {
if (!matcher.find()) {
- LOG.warn("Found a non-bucket file that we thought matched the bucket pattern! " +
- file.toString() + " Matcher=" + matcher.toString());
+ String msg = "Found a non-bucket file that we thought matched the bucket pattern! " +
+ file.toString() + " Matcher=" + matcher.toString();
+ LOG.error(msg);
+ //following matcher.group() would fail anyway and we don't want to skip files since that
+ //may be a data loss scenario
+ throw new IllegalArgumentException(msg);
}
int bucketNum = Integer.parseInt(matcher.group());
BucketTracker bt = splitToBucketMap.get(bucketNum);
http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
index 52257c4..319e0ee 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
@@ -344,7 +344,7 @@ public class TestTxnCommands extends TxnCommandsBaseForTests {
//this should fail because txn aborted due to timeout
CommandProcessorResponse cpr = runStatementOnDriverNegative("delete from " + Table.ACIDTBL + " where a = 5");
Assert.assertTrue("Actual: " + cpr.getErrorMessage(), cpr.getErrorMessage().contains("Transaction manager has aborted the transaction txnid:1"));
-
+
//now test that we don't timeout locks we should not
//heartbeater should be running in the background every 1/2 second
hiveConf.setTimeVar(HiveConf.ConfVars.HIVE_TXN_TIMEOUT, 1, TimeUnit.SECONDS);
@@ -354,9 +354,9 @@ public class TestTxnCommands extends TxnCommandsBaseForTests {
runStatementOnDriver("start transaction");
runStatementOnDriver("select count(*) from " + Table.ACIDTBL + " where a = 17");
pause(750);
-
+
TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
-
+
//since there is txn open, we are heartbeating the txn not individual locks
GetOpenTxnsInfoResponse txnsInfoResponse = txnHandler.getOpenTxnsInfo();
Assert.assertEquals(2, txnsInfoResponse.getOpen_txns().size());
@@ -377,7 +377,7 @@ public class TestTxnCommands extends TxnCommandsBaseForTests {
//these 2 values are equal when TXN entry is made. Should never be equal after 1st heartbeat, which we
//expect to have happened by now since HIVE_TXN_TIMEOUT=1sec
Assert.assertNotEquals("Didn't see heartbeat happen", Long.parseLong(vals[0]), lastHeartbeat);
-
+
ShowLocksResponse slr = txnHandler.showLocks(new ShowLocksRequest());
TestDbTxnManager2.checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", Table.ACIDTBL.name, null, slr.getLocks());
pause(750);
@@ -525,7 +525,8 @@ public class TestTxnCommands extends TxnCommandsBaseForTests {
String stmt = "merge into target t using (" + teeCurMatch + ") s on t.key=s.key and t.cur=1 and s.`o/p\\n`=1 " +
"when matched then update set cur=0 " +
"when not matched then insert values(s.key,s.data,1)";
-
+ //to allow cross join from 'teeCurMatch'
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_STRICT_CHECKS_CARTESIAN, false);
runStatementOnDriver(stmt);
int[][] resultVals = {{1,5,0},{1,7,1},{1,18,0},{2,6,1},{3,8,1}};
List<String> r = runStatementOnDriver("select * from target order by key,data,cur");
@@ -569,7 +570,7 @@ public class TestTxnCommands extends TxnCommandsBaseForTests {
List<String> r = runStatementOnDriver("select * from target order by key,data,cur");
Assert.assertEquals(stringifyValues(resultVals), r);
}
-
+
@Test
public void testMergeOnTezEdges() throws Exception {
String query = "merge into " + Table.ACIDTBL +
http://git-wip-us.apache.org/repos/asf/hive/blob/508d7e6f/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index 17d976a..ab5f969 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -77,7 +77,7 @@ public class TestTxnCommands2 {
).getPath().replaceAll("\\\\", "/");
protected static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse";
//bucket count for test tables; set it to 1 for easier debugging
- protected static int BUCKET_COUNT = 2;
+ static int BUCKET_COUNT = 2;
@Rule
public TestName testName = new TestName();
@@ -117,12 +117,11 @@ public class TestTxnCommands2 {
setUpWithTableProperties("'transactional'='true'");
}
- protected void setUpWithTableProperties(String tableProperties) throws Exception {
+ void setUpWithTableProperties(String tableProperties) throws Exception {
hiveConf = new HiveConf(this.getClass());
hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, TEST_WAREHOUSE_DIR);
- hiveConf.setVar(HiveConf.ConfVars.HIVEMAPREDMODE, "nonstrict");
hiveConf.setVar(HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName());
hiveConf
.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
@@ -406,7 +405,7 @@ public class TestTxnCommands2 {
expectedException.expect(RuntimeException.class);
expectedException.expectMessage("TBLPROPERTIES with 'transactional_properties' cannot be altered after the table is created");
runStatementOnDriver("create table acidTblLegacy (a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')");
- runStatementOnDriver("alter table acidTblLegacy SET TBLPROPERTIES ('transactional_properties' = 'default')");
+ runStatementOnDriver("alter table acidTblLegacy SET TBLPROPERTIES ('transactional_properties' = 'insert_only')");
}
/**
* Test the query correctness and directory layout for ACID table conversion