You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2016/08/12 17:31:49 UTC
[1/3] hive git commit: HIVE-14035 Enable predicate pushdown to delta
files created by ACID Transactions (Saket Saurabh via Eugene Koifman)
Repository: hive
Updated Branches:
refs/heads/master 333fa8763 -> ecab0d072
http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdate.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdate.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdate.java
new file mode 100644
index 0000000..becb22a
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdate.java
@@ -0,0 +1,584 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.ql.io.HiveInputFormat;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+/**
+ * Same as TestTxnCommands2 but tests ACID tables with 'transactional_properties' set to 'default'.
+ * This tests whether ACID tables with split-update turned on are working correctly or not
+ * for the same set of tests when it is turned off. Of course, it also adds a few tests to test
+ * specific behaviors of ACID tables with split-update turned on.
+ */
+public class TestTxnCommands2WithSplitUpdate extends TestTxnCommands2 {
+
+ public TestTxnCommands2WithSplitUpdate() {
+ super();
+ }
+
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ setUpWithTableProperties("'transactional'='true','transactional_properties'='default'");
+ }
+
+ @Override
+ @Test
+ public void testOrcPPD() throws Exception {
+ final String defaultUnset = "unset";
+ String oldSplitStrategyValue = hiveConf.get(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, defaultUnset);
+ // TODO: Setting split strategy as 'BI' is workaround for HIVE-14448 until it is resolved.
+ hiveConf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, "BI");
+
+ super.testOrcPPD();
+
+ // Restore the previous value for split strategy, or unset if not previously set.
+ if (oldSplitStrategyValue.equals(defaultUnset)) {
+ hiveConf.unset(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname);
+ } else {
+ hiveConf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, oldSplitStrategyValue);
+ }
+ }
+
+ @Override
+ @Test
+ public void testOrcNoPPD() throws Exception {
+ final String defaultUnset = "unset";
+ String oldSplitStrategyValue = hiveConf.get(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, defaultUnset);
+ // TODO: Setting split strategy as 'BI' is workaround for HIVE-14448 until it is resolved.
+ hiveConf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, "BI");
+
+ super.testOrcNoPPD();
+
+ // Restore the previous value for split strategy, or unset if not previously set.
+ if (oldSplitStrategyValue.equals(defaultUnset)) {
+ hiveConf.unset(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname);
+ } else {
+ hiveConf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, oldSplitStrategyValue);
+ }
+ }
+
+ @Override
+ @Test
+ public void testInitiatorWithMultipleFailedCompactions() throws Exception {
+ // Test with split-update turned on.
+ testInitiatorWithMultipleFailedCompactionsForVariousTblProperties("'transactional'='true','transactional_properties'='default'");
+ }
+
+ @Override
+ @Test
+ public void writeBetweenWorkerAndCleaner() throws Exception {
+ writeBetweenWorkerAndCleanerForVariousTblProperties("'transactional'='true','transactional_properties'='default'");
+ }
+
+ @Override
+ @Test
+ public void testACIDwithSchemaEvolutionAndCompaction() throws Exception {
+ testACIDwithSchemaEvolutionForVariousTblProperties("'transactional'='true','transactional_properties'='default'");
+ }
+
+ /**
+ * In current implementation of ACID, altering the value of transactional_properties or trying to
+ * set a value for previously unset value for an acid table will throw an exception.
+ * @throws Exception
+ */
+ @Test
+ public void testFailureOnAlteringTransactionalProperties() throws Exception {
+ expectedException.expect(RuntimeException.class);
+ expectedException.expectMessage("TBLPROPERTIES with 'transactional_properties' cannot be altered after the table is created");
+ runStatementOnDriver("create table acidTblLegacy (a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+ runStatementOnDriver("alter table acidTblLegacy SET TBLPROPERTIES ('transactional_properties' = 'default')");
+ }
+
+ /**
+ * Test the query correctness and directory layout for ACID table conversion with split-update
+ * enabled.
+ * 1. Insert a row to Non-ACID table
+ * 2. Convert Non-ACID to ACID table with split-update enabled
+ * 3. Insert a row to ACID table
+ * 4. Perform Major compaction
+ * 5. Clean
+ * @throws Exception
+ */
+ @Test
+ public void testNonAcidToAcidSplitUpdateConversion1() throws Exception {
+ FileSystem fs = FileSystem.get(hiveConf);
+ FileStatus[] status;
+
+ // 1. Insert a row to Non-ACID table
+ runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)");
+ status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+ (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+ // There should be 2 original bucket files in the location (000000_0 and 000001_0)
+ Assert.assertEquals(BUCKET_COUNT, status.length);
+ for (int i = 0; i < status.length; i++) {
+ Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
+ }
+ List<String> rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
+ int [][] resultData = new int[][] {{1, 2}};
+ Assert.assertEquals(stringifyValues(resultData), rs);
+ rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
+ int resultCount = 1;
+ Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
+
+ // 2. Convert NONACIDORCTBL to ACID table
+ runStatementOnDriver("alter table " + Table.NONACIDORCTBL
+ + " SET TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default')");
+ status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+ (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+ // Everything should be same as before
+ Assert.assertEquals(BUCKET_COUNT, status.length);
+ for (int i = 0; i < status.length; i++) {
+ Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
+ }
+ rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
+ resultData = new int[][] {{1, 2}};
+ Assert.assertEquals(stringifyValues(resultData), rs);
+ rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
+ resultCount = 1;
+ Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
+
+ // 3. Insert another row to newly-converted ACID table
+ runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(3,4)");
+ status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+ (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+ // There should be 2 original bucket files (000000_0 and 000001_0), plus a new delta directory.
+ // The delta directory should also have only 1 bucket file (bucket_00001)
+ Assert.assertEquals(3, status.length);
+ boolean sawNewDelta = false;
+ for (int i = 0; i < status.length; i++) {
+ if (status[i].getPath().getName().matches("delta_.*")) {
+ sawNewDelta = true;
+ FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
+ Assert.assertEquals(1, buckets.length); // only one bucket file
+ Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001"));
+ } else {
+ Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
+ }
+ }
+ Assert.assertTrue(sawNewDelta);
+ rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL + " order by a,b");
+ resultData = new int[][] {{1, 2}, {3, 4}};
+ Assert.assertEquals(stringifyValues(resultData), rs);
+ rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
+ resultCount = 2;
+ Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
+
+ // 4. Perform a major compaction
+ runStatementOnDriver("alter table "+ Table.NONACIDORCTBL + " compact 'MAJOR'");
+ runWorker(hiveConf);
+ // There should be 1 new directory: base_xxxxxxx.
+ // Original bucket files and delta directory should stay until Cleaner kicks in.
+ status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+ (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+ Assert.assertEquals(4, status.length);
+ boolean sawNewBase = false;
+ for (int i = 0; i < status.length; i++) {
+ if (status[i].getPath().getName().matches("base_.*")) {
+ sawNewBase = true;
+ FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
+ Assert.assertEquals(1, buckets.length);
+ Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001"));
+ }
+ }
+ Assert.assertTrue(sawNewBase);
+ rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
+ resultData = new int[][] {{1, 2}, {3, 4}};
+ Assert.assertEquals(stringifyValues(resultData), rs);
+ rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
+ resultCount = 2;
+ Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
+
+ // 5. Let Cleaner delete obsolete files/dirs
+ // Note, here we create a fake directory along with fake files as original directories/files
+ String fakeFile0 = TEST_WAREHOUSE_DIR + "/" + (Table.NONACIDORCTBL).toString().toLowerCase() +
+ "/subdir/000000_0";
+ String fakeFile1 = TEST_WAREHOUSE_DIR + "/" + (Table.NONACIDORCTBL).toString().toLowerCase() +
+ "/subdir/000000_1";
+ fs.create(new Path(fakeFile0));
+ fs.create(new Path(fakeFile1));
+ status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+ (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+ // Before Cleaner, there should be 5 items:
+ // 2 original files, 1 original directory, 1 base directory and 1 delta directory
+ Assert.assertEquals(5, status.length);
+ runCleaner(hiveConf);
+ // There should be only 1 directory left: base_xxxxxxx.
+ // Original bucket files and delta directory should have been cleaned up.
+ status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+ (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+ Assert.assertEquals(1, status.length);
+ Assert.assertTrue(status[0].getPath().getName().matches("base_.*"));
+ FileStatus[] buckets = fs.listStatus(status[0].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
+ Assert.assertEquals(1, buckets.length);
+ Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001"));
+ rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
+ resultData = new int[][] {{1, 2}, {3, 4}};
+ Assert.assertEquals(stringifyValues(resultData), rs);
+ rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
+ resultCount = 2;
+ Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
+ }
+
+ /**
+ * Test the query correctness and directory layout for ACID table conversion with split-update
+ * enabled.
+ * 1. Insert a row to Non-ACID table
+ * 2. Convert Non-ACID to ACID table with split update enabled.
+ * 3. Update the existing row in ACID table
+ * 4. Perform Major compaction
+ * 5. Clean
+ * @throws Exception
+ */
+ @Test
+ public void testNonAcidToAcidSplitUpdateConversion2() throws Exception {
+ FileSystem fs = FileSystem.get(hiveConf);
+ FileStatus[] status;
+
+ // 1. Insert a row to Non-ACID table
+ runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)");
+ status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+ (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+ // There should be 2 original bucket files in the location (000000_0 and 000001_0)
+ Assert.assertEquals(BUCKET_COUNT, status.length);
+ for (int i = 0; i < status.length; i++) {
+ Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
+ }
+ List<String> rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
+ int [][] resultData = new int[][] {{1, 2}};
+ Assert.assertEquals(stringifyValues(resultData), rs);
+ rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
+ int resultCount = 1;
+ Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
+
+ // 2. Convert NONACIDORCTBL to ACID table
+ runStatementOnDriver("alter table " + Table.NONACIDORCTBL
+ + " SET TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default')");
+ status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+ (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+ // Everything should be same as before
+ Assert.assertEquals(BUCKET_COUNT, status.length);
+ for (int i = 0; i < status.length; i++) {
+ Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
+ }
+ rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
+ resultData = new int[][] {{1, 2}};
+ Assert.assertEquals(stringifyValues(resultData), rs);
+ rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
+ resultCount = 1;
+ Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
+
+ // 3. Update the existing row in newly-converted ACID table
+ runStatementOnDriver("update " + Table.NONACIDORCTBL + " set b=3 where a=1");
+ status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+ (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+ // There should be 2 original bucket files (000000_0 and 000001_0), plus one delta directory
+ // and one delete_delta directory. When split-update is enabled, an update event is split into
+ // a combination of delete and insert, that generates the delete_delta directory.
+ // The delta directory should also have 2 bucket files (bucket_00000 and bucket_00001)
+ // and so should the delete_delta directory.
+ Assert.assertEquals(4, status.length);
+ boolean sawNewDelta = false;
+ boolean sawNewDeleteDelta = false;
+ for (int i = 0; i < status.length; i++) {
+ if (status[i].getPath().getName().matches("delta_.*")) {
+ sawNewDelta = true;
+ FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
+ Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
+ Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
+ } else if (status[i].getPath().getName().matches("delete_delta_.*")) {
+ sawNewDeleteDelta = true;
+ FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
+ Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
+ Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
+ } else {
+ Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
+ }
+ }
+ Assert.assertTrue(sawNewDelta);
+ Assert.assertTrue(sawNewDeleteDelta);
+ rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
+ resultData = new int[][] {{1, 3}};
+ Assert.assertEquals(stringifyValues(resultData), rs);
+ rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
+ resultCount = 1;
+ Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
+
+ // 4. Perform a major compaction
+ runStatementOnDriver("alter table "+ Table.NONACIDORCTBL + " compact 'MAJOR'");
+ runWorker(hiveConf);
+ // There should be 1 new directory: base_0000001.
+ // Original bucket files and delta directory should stay until Cleaner kicks in.
+ status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+ (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+ Assert.assertEquals(5, status.length);
+ boolean sawNewBase = false;
+ for (int i = 0; i < status.length; i++) {
+ if (status[i].getPath().getName().matches("base_.*")) {
+ sawNewBase = true;
+ FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
+ Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
+ Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001"));
+ }
+ }
+ Assert.assertTrue(sawNewBase);
+ rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
+ resultData = new int[][] {{1, 3}};
+ Assert.assertEquals(stringifyValues(resultData), rs);
+ rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
+ resultCount = 1;
+ Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
+
+ // 5. Let Cleaner delete obsolete files/dirs
+ status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+ (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+ // Before Cleaner, there should be 5 items:
+ // 2 original files, 1 delta directory, 1 delete_delta directory and 1 base directory
+ Assert.assertEquals(5, status.length);
+ runCleaner(hiveConf);
+ // There should be only 1 directory left: base_0000001.
+ // Original bucket files, delta directory and delete_delta should have been cleaned up.
+ status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+ (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+ Assert.assertEquals(1, status.length);
+ Assert.assertTrue(status[0].getPath().getName().matches("base_.*"));
+ FileStatus[] buckets = fs.listStatus(status[0].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
+ Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
+ Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001"));
+ rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
+ resultData = new int[][] {{1, 3}};
+ Assert.assertEquals(stringifyValues(resultData), rs);
+ rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
+ resultCount = 1;
+ Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
+ }
+
+ /**
+ * Test the query correctness and directory layout for ACID table conversion with split-update
+ * enabled.
+ * 1. Insert a row to Non-ACID table
+ * 2. Convert Non-ACID to ACID table with split-update enabled
+ * 3. Perform Major compaction
+ * 4. Insert a new row to ACID table
+ * 5. Perform another Major compaction
+ * 6. Clean
+ * @throws Exception
+ */
+ @Test
+ public void testNonAcidToAcidSplitUpdateConversion3() throws Exception {
+ FileSystem fs = FileSystem.get(hiveConf);
+ FileStatus[] status;
+
+ // 1. Insert a row to Non-ACID table
+ runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)");
+ status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+ (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+ // There should be 2 original bucket files in the location (000000_0 and 000001_0)
+ Assert.assertEquals(BUCKET_COUNT, status.length);
+ for (int i = 0; i < status.length; i++) {
+ Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
+ }
+ List<String> rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
+ int [][] resultData = new int[][] {{1, 2}};
+ Assert.assertEquals(stringifyValues(resultData), rs);
+ rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
+ int resultCount = 1;
+ Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
+
+ // 2. Convert NONACIDORCTBL to ACID table with split_update enabled. (txn_props=default)
+ runStatementOnDriver("alter table " + Table.NONACIDORCTBL
+ + " SET TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default')");
+ status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+ (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+ // Everything should be same as before
+ Assert.assertEquals(BUCKET_COUNT, status.length);
+ for (int i = 0; i < status.length; i++) {
+ Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
+ }
+ rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
+ resultData = new int[][] {{1, 2}};
+ Assert.assertEquals(stringifyValues(resultData), rs);
+ rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
+ resultCount = 1;
+ Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
+
+ // 3. Perform a major compaction
+ runStatementOnDriver("alter table "+ Table.NONACIDORCTBL + " compact 'MAJOR'");
+ runWorker(hiveConf);
+ // There should be 1 new directory: base_-9223372036854775808
+ // Original bucket files should stay until Cleaner kicks in.
+ status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+ (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+ Assert.assertEquals(3, status.length);
+ boolean sawNewBase = false;
+ for (int i = 0; i < status.length; i++) {
+ if (status[i].getPath().getName().matches("base_.*")) {
+ Assert.assertEquals("base_-9223372036854775808", status[i].getPath().getName());
+ sawNewBase = true;
+ FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
+ Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
+ Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
+ }
+ }
+ Assert.assertTrue(sawNewBase);
+ rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
+ resultData = new int[][] {{1, 2}};
+ Assert.assertEquals(stringifyValues(resultData), rs);
+ rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
+ resultCount = 1;
+ Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
+
+ // 4. Update the existing row, and insert another row to newly-converted ACID table
+ runStatementOnDriver("update " + Table.NONACIDORCTBL + " set b=3 where a=1");
+ runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(3,4)");
+ status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+ (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+ Arrays.sort(status); // make sure delta_0000001_0000001_0000 appears before delta_0000002_0000002_0000
+ // There should be 2 original bucket files (000000_0 and 000001_0), a base directory,
+ // plus two new delta directories and one delete_delta directory that would be created due to
+ // the update statement (remember split-update U=D+I)!
+ Assert.assertEquals(6, status.length);
+ int numDelta = 0;
+ int numDeleteDelta = 0;
+ sawNewBase = false;
+ for (int i = 0; i < status.length; i++) {
+ if (status[i].getPath().getName().matches("delta_.*")) {
+ numDelta++;
+ FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
+ Arrays.sort(buckets);
+ if (numDelta == 1) {
+ Assert.assertEquals("delta_0000001_0000001_0000", status[i].getPath().getName());
+ Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
+ Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
+ } else if (numDelta == 2) {
+ Assert.assertEquals("delta_0000002_0000002_0000", status[i].getPath().getName());
+ Assert.assertEquals(1, buckets.length);
+ Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
+ }
+ } else if (status[i].getPath().getName().matches("delete_delta_.*")) {
+ numDeleteDelta++;
+ FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
+ Arrays.sort(buckets);
+ if (numDeleteDelta == 1) {
+ Assert.assertEquals("delete_delta_0000001_0000001_0000", status[i].getPath().getName());
+ Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
+ Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
+ }
+ } else if (status[i].getPath().getName().matches("base_.*")) {
+ Assert.assertEquals("base_-9223372036854775808", status[i].getPath().getName());
+ sawNewBase = true;
+ FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
+ Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
+ Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
+ } else {
+ Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0"));
+ }
+ }
+ Assert.assertEquals(2, numDelta);
+ Assert.assertEquals(1, numDeleteDelta);
+ Assert.assertTrue(sawNewBase);
+
+ rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
+ resultData = new int[][] {{1, 3}, {3, 4}};
+ Assert.assertEquals(stringifyValues(resultData), rs);
+ rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
+ resultCount = 2;
+ Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
+
+ // 5. Perform another major compaction
+ runStatementOnDriver("alter table "+ Table.NONACIDORCTBL + " compact 'MAJOR'");
+ runWorker(hiveConf);
+ // There should be 1 new base directory: base_0000001
+ // Original bucket files, delta directories, delete_delta directories and the
+ // previous base directory should stay until Cleaner kicks in.
+ status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+ (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+ Arrays.sort(status);
+ Assert.assertEquals(7, status.length);
+ int numBase = 0;
+ for (int i = 0; i < status.length; i++) {
+ if (status[i].getPath().getName().matches("base_.*")) {
+ numBase++;
+ FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
+ Arrays.sort(buckets);
+ if (numBase == 1) {
+ Assert.assertEquals("base_-9223372036854775808", status[i].getPath().getName());
+ Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
+ Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
+ } else if (numBase == 2) {
+ // The new base dir now has two bucket files, since the delta dir has two bucket files
+ Assert.assertEquals("base_0000002", status[i].getPath().getName());
+ Assert.assertEquals(1, buckets.length);
+ Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
+ }
+ }
+ }
+ Assert.assertEquals(2, numBase);
+ rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
+ resultData = new int[][] {{1, 3}, {3, 4}};
+ Assert.assertEquals(stringifyValues(resultData), rs);
+ rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
+ resultCount = 2;
+ Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
+
+ // 6. Let Cleaner delete obsolete files/dirs
+ status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+ (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+ // Before Cleaner, there should be 6 items:
+ // 2 original files, 2 delta directories, 1 delete_delta directory and 2 base directories
+ Assert.assertEquals(7, status.length);
+ runCleaner(hiveConf);
+ // There should be only 1 directory left: base_0000001.
+ // Original bucket files, delta directories and previous base directory should have been cleaned up.
+ status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+ (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+ Assert.assertEquals(1, status.length);
+ Assert.assertEquals("base_0000002", status[0].getPath().getName());
+ FileStatus[] buckets = fs.listStatus(status[0].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
+ Arrays.sort(buckets);
+ Assert.assertEquals(1, buckets.length);
+ Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
+ rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
+ resultData = new int[][] {{1, 3}, {3, 4}};
+ Assert.assertEquals(stringifyValues(resultData), rs);
+ rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
+ resultCount = 2;
+ Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
index 556df18..a7ff9a3 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
@@ -17,11 +17,21 @@
*/
package org.apache.hadoop.hive.ql.io;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.ValidCompactorTxnList;
import org.apache.hadoop.hive.common.ValidReadTxnList;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.ql.io.AcidUtils.AcidOperationalProperties;
import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat;
import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat.MockFile;
import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat.MockFileSystem;
@@ -30,13 +40,6 @@ import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId;
import org.junit.Assert;
import org.junit.Test;
-import java.io.IOException;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
public class TestAcidUtils {
@Test
@@ -60,12 +63,23 @@ public class TestAcidUtils {
options.writingBase(false);
assertEquals("/tmp/delta_0000100_0000200_0000/bucket_00023",
AcidUtils.createFilename(p, options).toString());
+ options.writingDeleteDelta(true);
+ assertEquals("/tmp/delete_delta_0000100_0000200_0000/bucket_00023",
+ AcidUtils.createFilename(p, options).toString());
+ options.writingDeleteDelta(false);
options.statementId(-1);
assertEquals("/tmp/delta_0000100_0000200/bucket_00023",
AcidUtils.createFilename(p, options).toString());
+ options.writingDeleteDelta(true);
+ assertEquals("/tmp/delete_delta_0000100_0000200/bucket_00023",
+ AcidUtils.createFilename(p, options).toString());
+ options.writingDeleteDelta(false);
options.statementId(7);
assertEquals("/tmp/delta_0000100_0000200_0007/bucket_00023",
AcidUtils.createFilename(p, options).toString());
+ options.writingDeleteDelta(true);
+ assertEquals("/tmp/delete_delta_0000100_0000200_0007/bucket_00023",
+ AcidUtils.createFilename(p, options).toString());
}
@Test
public void testCreateFilenameLargeIds() throws Exception {
@@ -86,7 +100,6 @@ public class TestAcidUtils {
assertEquals("/tmp/delta_1234567880_1234567890_0000/bucket_00023",
AcidUtils.createFilename(p, options).toString());
}
-
@Test
public void testParsing() throws Exception {
@@ -94,19 +107,34 @@ public class TestAcidUtils {
Path dir = new Path("/tmp/tbl");
Configuration conf = new Configuration();
AcidOutputFormat.Options opts =
- AcidUtils.parseBaseBucketFilename(new Path(dir, "base_567/bucket_123"),
+ AcidUtils.parseBaseOrDeltaBucketFilename(new Path(dir, "base_567/bucket_123"),
conf);
assertEquals(false, opts.getOldStyle());
assertEquals(true, opts.isWritingBase());
assertEquals(567, opts.getMaximumTransactionId());
assertEquals(0, opts.getMinimumTransactionId());
assertEquals(123, opts.getBucket());
- opts = AcidUtils.parseBaseBucketFilename(new Path(dir, "000123_0"), conf);
+ opts = AcidUtils.parseBaseOrDeltaBucketFilename(new Path(dir, "delta_000005_000006/bucket_00001"),
+ conf);
+ assertEquals(false, opts.getOldStyle());
+ assertEquals(false, opts.isWritingBase());
+ assertEquals(6, opts.getMaximumTransactionId());
+ assertEquals(5, opts.getMinimumTransactionId());
+ assertEquals(1, opts.getBucket());
+ opts = AcidUtils.parseBaseOrDeltaBucketFilename(new Path(dir, "delete_delta_000005_000006/bucket_00001"),
+ conf);
+ assertEquals(false, opts.getOldStyle());
+ assertEquals(false, opts.isWritingBase());
+ assertEquals(6, opts.getMaximumTransactionId());
+ assertEquals(5, opts.getMinimumTransactionId());
+ assertEquals(1, opts.getBucket());
+ opts = AcidUtils.parseBaseOrDeltaBucketFilename(new Path(dir, "000123_0"), conf);
assertEquals(true, opts.getOldStyle());
assertEquals(true, opts.isWritingBase());
assertEquals(123, opts.getBucket());
assertEquals(0, opts.getMinimumTransactionId());
assertEquals(0, opts.getMaximumTransactionId());
+
}
@Test
@@ -471,5 +499,230 @@ public class TestAcidUtils {
assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString());
}
+ @Test
+ public void testBaseWithDeleteDeltas() throws Exception {
+ Configuration conf = new Configuration();
+ conf.setInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname,
+ AcidOperationalProperties.getDefault().toInt());
+ MockFileSystem fs = new MockFileSystem(conf,
+ new MockFile("mock:/tbl/part1/base_5/bucket_0", 500, new byte[0]),
+ new MockFile("mock:/tbl/part1/base_10/bucket_0", 500, new byte[0]),
+ new MockFile("mock:/tbl/part1/base_49/bucket_0", 500, new byte[0]),
+ new MockFile("mock:/tbl/part1/delta_025_025/bucket_0", 0, new byte[0]),
+ new MockFile("mock:/tbl/part1/delta_029_029/bucket_0", 0, new byte[0]),
+ new MockFile("mock:/tbl/part1/delete_delta_029_029/bucket_0", 0, new byte[0]),
+ new MockFile("mock:/tbl/part1/delta_025_030/bucket_0", 0, new byte[0]),
+ new MockFile("mock:/tbl/part1/delete_delta_025_030/bucket_0", 0, new byte[0]),
+ new MockFile("mock:/tbl/part1/delta_050_105/bucket_0", 0, new byte[0]),
+ new MockFile("mock:/tbl/part1/delete_delta_050_105/bucket_0", 0, new byte[0]),
+ new MockFile("mock:/tbl/part1/delete_delta_110_110/bucket_0", 0, new byte[0]));
+ AcidUtils.Directory dir =
+ AcidUtils.getAcidState(new TestInputOutputFormat.MockPath(fs,
+ "mock:/tbl/part1"), conf, new ValidReadTxnList("100:" + Long.MAX_VALUE + ":"));
+ assertEquals("mock:/tbl/part1/base_49", dir.getBaseDirectory().toString());
+ List<FileStatus> obsolete = dir.getObsolete();
+ assertEquals(7, obsolete.size());
+ assertEquals("mock:/tbl/part1/base_10", obsolete.get(0).getPath().toString());
+ assertEquals("mock:/tbl/part1/base_5", obsolete.get(1).getPath().toString());
+ assertEquals("mock:/tbl/part1/delete_delta_025_030", obsolete.get(2).getPath().toString());
+ assertEquals("mock:/tbl/part1/delta_025_030", obsolete.get(3).getPath().toString());
+ assertEquals("mock:/tbl/part1/delta_025_025", obsolete.get(4).getPath().toString());
+ assertEquals("mock:/tbl/part1/delete_delta_029_029", obsolete.get(5).getPath().toString());
+ assertEquals("mock:/tbl/part1/delta_029_029", obsolete.get(6).getPath().toString());
+ assertEquals(0, dir.getOriginalFiles().size());
+ List<AcidUtils.ParsedDelta> deltas = dir.getCurrentDirectories();
+ assertEquals(2, deltas.size());
+ assertEquals("mock:/tbl/part1/delete_delta_050_105", deltas.get(0).getPath().toString());
+ assertEquals("mock:/tbl/part1/delta_050_105", deltas.get(1).getPath().toString());
+ // The delete_delta_110_110 should not be read because it is greater than the high watermark.
+ }
+
+ @Test
+ public void testOverlapingDeltaAndDeleteDelta() throws Exception {
+ Configuration conf = new Configuration();
+ conf.setInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname,
+ AcidOperationalProperties.getDefault().toInt());
+ MockFileSystem fs = new MockFileSystem(conf,
+ new MockFile("mock:/tbl/part1/delta_0000063_63/bucket_0", 500, new byte[0]),
+ new MockFile("mock:/tbl/part1/delta_000062_62/bucket_0", 500, new byte[0]),
+ new MockFile("mock:/tbl/part1/delta_00061_61/bucket_0", 500, new byte[0]),
+ new MockFile("mock:/tbl/part1/delete_delta_00064_64/bucket_0", 500, new byte[0]),
+ new MockFile("mock:/tbl/part1/delta_40_60/bucket_0", 500, new byte[0]),
+ new MockFile("mock:/tbl/part1/delete_delta_40_60/bucket_0", 500, new byte[0]),
+ new MockFile("mock:/tbl/part1/delta_0060_60/bucket_0", 500, new byte[0]),
+ new MockFile("mock:/tbl/part1/delta_052_55/bucket_0", 500, new byte[0]),
+ new MockFile("mock:/tbl/part1/delete_delta_052_55/bucket_0", 500, new byte[0]),
+ new MockFile("mock:/tbl/part1/base_50/bucket_0", 500, new byte[0]));
+ Path part = new MockPath(fs, "mock:/tbl/part1");
+ AcidUtils.Directory dir =
+ AcidUtils.getAcidState(part, conf, new ValidReadTxnList("100:" + Long.MAX_VALUE + ":"));
+ assertEquals("mock:/tbl/part1/base_50", dir.getBaseDirectory().toString());
+ List<FileStatus> obsolete = dir.getObsolete();
+ assertEquals(3, obsolete.size());
+ assertEquals("mock:/tbl/part1/delete_delta_052_55", obsolete.get(0).getPath().toString());
+ assertEquals("mock:/tbl/part1/delta_052_55", obsolete.get(1).getPath().toString());
+ assertEquals("mock:/tbl/part1/delta_0060_60", obsolete.get(2).getPath().toString());
+ List<AcidUtils.ParsedDelta> delts = dir.getCurrentDirectories();
+ assertEquals(6, delts.size());
+ assertEquals("mock:/tbl/part1/delete_delta_40_60", delts.get(0).getPath().toString());
+ assertEquals("mock:/tbl/part1/delta_40_60", delts.get(1).getPath().toString());
+ assertEquals("mock:/tbl/part1/delta_00061_61", delts.get(2).getPath().toString());
+ assertEquals("mock:/tbl/part1/delta_000062_62", delts.get(3).getPath().toString());
+ assertEquals("mock:/tbl/part1/delta_0000063_63", delts.get(4).getPath().toString());
+ assertEquals("mock:/tbl/part1/delete_delta_00064_64", delts.get(5).getPath().toString());
+ }
-}
+ @Test
+ public void testMinorCompactedDeltaMakesInBetweenDelteDeltaObsolete() throws Exception {
+ // This test checks that if we have a minor compacted delta for the txn range [40,60]
+ // then it will make any delete delta in that range as obsolete.
+ Configuration conf = new Configuration();
+ conf.setInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname,
+ AcidUtils.AcidOperationalProperties.getDefault().toInt());
+ MockFileSystem fs = new MockFileSystem(conf,
+ new MockFile("mock:/tbl/part1/delta_40_60/bucket_0", 500, new byte[0]),
+ new MockFile("mock:/tbl/part1/delete_delta_50_50/bucket_0", 500, new byte[0]));
+ Path part = new MockPath(fs, "mock:/tbl/part1");
+ AcidUtils.Directory dir =
+ AcidUtils.getAcidState(part, conf, new ValidReadTxnList("100:" + Long.MAX_VALUE + ":"));
+ List<FileStatus> obsolete = dir.getObsolete();
+ assertEquals(1, obsolete.size());
+ assertEquals("mock:/tbl/part1/delete_delta_50_50", obsolete.get(0).getPath().toString());
+ List<AcidUtils.ParsedDelta> delts = dir.getCurrentDirectories();
+ assertEquals(1, delts.size());
+ assertEquals("mock:/tbl/part1/delta_40_60", delts.get(0).getPath().toString());
+ }
+
+ @Test
+ public void deltasAndDeleteDeltasWithOpenTxnsNotInCompact() throws Exception {
+ // This tests checks that appropriate delta and delete_deltas are included when minor
+ // compactions specifies a valid open txn range.
+ Configuration conf = new Configuration();
+ conf.setInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname,
+ AcidUtils.AcidOperationalProperties.getDefault().toInt());
+ MockFileSystem fs = new MockFileSystem(conf,
+ new MockFile("mock:/tbl/part1/delta_1_1/bucket_0", 500, new byte[0]),
+ new MockFile("mock:/tbl/part1/delete_delta_2_2/bucket_0", 500, new byte[0]),
+ new MockFile("mock:/tbl/part1/delta_2_5/bucket_0", 500, new byte[0]),
+ new MockFile("mock:/tbl/part1/delete_delta_2_5/bucket_0", 500, new byte[0]),
+ new MockFile("mock:/tbl/part1/delta_2_5/bucket_0" + AcidUtils.DELTA_SIDE_FILE_SUFFIX, 500,
+ new byte[0]),
+ new MockFile("mock:/tbl/part1/delete_delta_7_7/bucket_0", 500, new byte[0]),
+ new MockFile("mock:/tbl/part1/delta_6_10/bucket_0", 500, new byte[0]));
+ Path part = new MockPath(fs, "mock:/tbl/part1");
+ AcidUtils.Directory dir =
+ AcidUtils.getAcidState(part, conf, new ValidCompactorTxnList("4:" + Long.MAX_VALUE + ":"));
+ List<AcidUtils.ParsedDelta> delts = dir.getCurrentDirectories();
+ assertEquals(2, delts.size());
+ assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString());
+ assertEquals("mock:/tbl/part1/delete_delta_2_2", delts.get(1).getPath().toString());
+ }
+
+ @Test
+ public void deleteDeltasWithOpenTxnInRead() throws Exception {
+ Configuration conf = new Configuration();
+ conf.setInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname,
+ AcidUtils.AcidOperationalProperties.getDefault().toInt());
+ MockFileSystem fs = new MockFileSystem(conf,
+ new MockFile("mock:/tbl/part1/delta_1_1/bucket_0", 500, new byte[0]),
+ new MockFile("mock:/tbl/part1/delta_2_5/bucket_0", 500, new byte[0]),
+ new MockFile("mock:/tbl/part1/delete_delta_2_5/bucket_0", 500, new byte[0]),
+ new MockFile("mock:/tbl/part1/delete_delta_3_3/bucket_0", 500, new byte[0]),
+ new MockFile("mock:/tbl/part1/delta_4_4_1/bucket_0", 500, new byte[0]),
+ new MockFile("mock:/tbl/part1/delta_4_4_3/bucket_0", 500, new byte[0]),
+ new MockFile("mock:/tbl/part1/delta_101_101_1/bucket_0", 500, new byte[0]));
+ Path part = new MockPath(fs, "mock:/tbl/part1");
+ AcidUtils.Directory dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("100:4:4"));
+ List<AcidUtils.ParsedDelta> delts = dir.getCurrentDirectories();
+ assertEquals(3, delts.size());
+ assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString());
+ assertEquals("mock:/tbl/part1/delete_delta_2_5", delts.get(1).getPath().toString());
+ assertEquals("mock:/tbl/part1/delta_2_5", delts.get(2).getPath().toString());
+ // Note that delete_delta_3_3 should not be read, when a minor compacted
+ // [delete_]delta_2_5 is present.
+ }
+
+ @Test
+ public void testDeleteDeltaSubdirPathGeneration() throws Exception {
+ String deleteDeltaSubdirPath = AcidUtils.deleteDeltaSubdir(1, 10);
+ assertEquals("delete_delta_0000001_0000010", deleteDeltaSubdirPath);
+ deleteDeltaSubdirPath = AcidUtils.deleteDeltaSubdir(1, 10, 5);
+ assertEquals("delete_delta_0000001_0000010_0005", deleteDeltaSubdirPath);
+ }
+
+ @Test
+ public void testDeleteEventDeltaDirPathFilter() throws Exception {
+ Path positivePath = new Path("delete_delta_000001_000010");
+ Path negativePath = new Path("delta_000001_000010");
+ assertEquals(true, AcidUtils.deleteEventDeltaDirFilter.accept(positivePath));
+ assertEquals(false, AcidUtils.deleteEventDeltaDirFilter.accept(negativePath));
+ }
+
+ @Test
+ public void testAcidOperationalProperties() throws Exception {
+ AcidUtils.AcidOperationalProperties testObj = AcidUtils.AcidOperationalProperties.getLegacy();
+ assertsForAcidOperationalProperties(testObj, "legacy");
+
+ testObj = AcidUtils.AcidOperationalProperties.getDefault();
+ assertsForAcidOperationalProperties(testObj, "default");
+
+ testObj = AcidUtils.AcidOperationalProperties.parseInt(0);
+ assertsForAcidOperationalProperties(testObj, "legacy");
+
+ testObj = AcidUtils.AcidOperationalProperties.parseInt(1);
+ assertsForAcidOperationalProperties(testObj, "split_update");
+
+ testObj = AcidUtils.AcidOperationalProperties.parseString("legacy");
+ assertsForAcidOperationalProperties(testObj, "legacy");
+
+ testObj = AcidUtils.AcidOperationalProperties.parseString("default");
+ assertsForAcidOperationalProperties(testObj, "default");
+
+ }
+
+ private void assertsForAcidOperationalProperties(AcidUtils.AcidOperationalProperties testObj,
+ String type) throws Exception {
+ switch(type) {
+ case "split_update":
+ case "default":
+ assertEquals(true, testObj.isSplitUpdate());
+ assertEquals(false, testObj.isHashBasedMerge());
+ assertEquals(1, testObj.toInt());
+ assertEquals("|split_update", testObj.toString());
+ break;
+ case "legacy":
+ assertEquals(false, testObj.isSplitUpdate());
+ assertEquals(false, testObj.isHashBasedMerge());
+ assertEquals(0, testObj.toInt());
+ assertEquals("", testObj.toString());
+ break;
+ default:
+ break;
+ }
+ }
+
+ @Test
+ public void testAcidOperationalPropertiesSettersAndGetters() throws Exception {
+ AcidUtils.AcidOperationalProperties oprProps = AcidUtils.AcidOperationalProperties.getDefault();
+ Configuration testConf = new Configuration();
+ // Test setter for configuration object.
+ AcidUtils.setAcidOperationalProperties(testConf, oprProps);
+ assertEquals(1, testConf.getInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname, 0));
+ // Test getter for configuration object.
+ assertEquals(oprProps.toString(), AcidUtils.getAcidOperationalProperties(testConf).toString());
+
+ Map<String, String> parameters = new HashMap<String, String>();
+ // Test setter for map object.
+ AcidUtils.setAcidOperationalProperties(parameters, oprProps);
+ assertEquals(oprProps.toString(),
+ parameters.get(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname));
+ // Test getter for map object.
+ // Calling a get on the 'parameters' will still return legacy type because the setters/getters
+ // for map work on different string keys. The setter will set the HIVE_TXN_OPERATIONAL_PROPERTIES
+ // while the getter will try to read the key TABLE_TRANSACTIONAL_PROPERTIES.
+ assertEquals(0, AcidUtils.getAcidOperationalProperties(parameters).toInt());
+ parameters.put(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES, oprProps.toString());
+ // Set the appropriate key in the map and test that we are able to read it back correctly.
+ assertEquals(1, AcidUtils.getAcidOperationalProperties(parameters).toInt());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
index 6648829..2c1bb6f 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
@@ -520,7 +520,9 @@ public class TestInputOutputFormat {
conf, n);
OrcInputFormat.FileGenerator gen = new OrcInputFormat.FileGenerator(
context, fs, new MockPath(fs, "mock:/a/b"), false, null);
- final SplitStrategy splitStrategy = createSplitStrategy(context, gen);
+ List<SplitStrategy<?>> splitStrategies = createSplitStrategies(context, gen);
+ assertEquals(1, splitStrategies.size());
+ final SplitStrategy splitStrategy = splitStrategies.get(0);
assertTrue(
String.format(
"Split strategy for %d files x %d size for %d splits", c, s,
@@ -541,7 +543,9 @@ public class TestInputOutputFormat {
conf, n);
OrcInputFormat.FileGenerator gen = new OrcInputFormat.FileGenerator(
context, fs, new MockPath(fs, "mock:/a/b"), false, null);
- final SplitStrategy splitStrategy = createSplitStrategy(context, gen);
+ List<SplitStrategy<?>> splitStrategies = createSplitStrategies(context, gen);
+ assertEquals(1, splitStrategies.size());
+ final SplitStrategy splitStrategy = splitStrategies.get(0);
assertTrue(
String.format(
"Split strategy for %d files x %d size for %d splits", c, s,
@@ -565,8 +569,9 @@ public class TestInputOutputFormat {
OrcInputFormat.FileGenerator gen =
new OrcInputFormat.FileGenerator(context, fs,
new MockPath(fs, "mock:/a/b"), false, null);
- OrcInputFormat.SplitStrategy splitStrategy = createSplitStrategy(context, gen);
- assertEquals(true, splitStrategy instanceof OrcInputFormat.BISplitStrategy);
+ List<OrcInputFormat.SplitStrategy<?>> splitStrategies = createSplitStrategies(context, gen);
+ assertEquals(1, splitStrategies.size());
+ assertEquals(true, splitStrategies.get(0) instanceof OrcInputFormat.BISplitStrategy);
conf.set("mapreduce.input.fileinputformat.split.maxsize", "500");
context = new OrcInputFormat.Context(conf);
@@ -578,8 +583,9 @@ public class TestInputOutputFormat {
new MockFile("mock:/a/b/part-04", 1000, new byte[1000]));
gen = new OrcInputFormat.FileGenerator(context, fs,
new MockPath(fs, "mock:/a/b"), false, null);
- splitStrategy = createSplitStrategy(context, gen);
- assertEquals(true, splitStrategy instanceof OrcInputFormat.ETLSplitStrategy);
+ splitStrategies = createSplitStrategies(context, gen);
+ assertEquals(1, splitStrategies.size());
+ assertEquals(true, splitStrategies.get(0) instanceof OrcInputFormat.ETLSplitStrategy);
}
@Test
@@ -594,9 +600,9 @@ public class TestInputOutputFormat {
OrcInputFormat.FileGenerator gen =
new OrcInputFormat.FileGenerator(context, fs,
new MockPath(fs, "mock:/a"), false, null);
- OrcInputFormat.SplitStrategy splitStrategy = createSplitStrategy(context, gen);
- assertEquals(true, splitStrategy instanceof OrcInputFormat.ACIDSplitStrategy);
- List<OrcSplit> splits = splitStrategy.getSplits();
+ List<OrcInputFormat.SplitStrategy<?>> splitStrategies = createSplitStrategies(context, gen);
+ assertEquals(true, splitStrategies.get(0) instanceof OrcInputFormat.ACIDSplitStrategy);
+ List<OrcSplit> splits = ((OrcInputFormat.ACIDSplitStrategy)splitStrategies.get(0)).getSplits();
ColumnarSplitSizeEstimator splitSizeEstimator = new ColumnarSplitSizeEstimator();
for (OrcSplit split: splits) {
assertEquals(Integer.MAX_VALUE, splitSizeEstimator.getEstimatedSize(split));
@@ -605,6 +611,127 @@ public class TestInputOutputFormat {
}
@Test
+ public void testACIDSplitStrategyForSplitUpdate() throws Exception {
+ conf.set("bucket_count", "2");
+ conf.set(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, "true");
+ conf.set(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES, "default");
+ OrcInputFormat.Context context = new OrcInputFormat.Context(conf);
+
+ // Case 1: Test with just originals => Single split strategy with two splits.
+ MockFileSystem fs = new MockFileSystem(conf,
+ new MockFile("mock:/a/b/000000_0", 1000, new byte[1], new MockBlock("host1")),
+ new MockFile("mock:/a/b/000000_1", 1000, new byte[1], new MockBlock("host1")));
+ OrcInputFormat.FileGenerator gen =
+ new OrcInputFormat.FileGenerator(context, fs,
+ new MockPath(fs, "mock:/a"), false, null);
+ List<OrcInputFormat.SplitStrategy<?>> splitStrategies = createSplitStrategies(context, gen);
+ assertEquals(1, splitStrategies.size());
+ assertEquals(true, splitStrategies.get(0) instanceof OrcInputFormat.ACIDSplitStrategy);
+ List<OrcSplit> splits = ((OrcInputFormat.ACIDSplitStrategy)splitStrategies.get(0)).getSplits();
+ assertEquals(2, splits.size());
+ assertEquals("mock:/a/b/000000_0", splits.get(0).getPath().toUri().toString());
+ assertEquals("mock:/a/b/000000_1", splits.get(1).getPath().toUri().toString());
+ assertTrue(splits.get(0).isOriginal());
+ assertTrue(splits.get(1).isOriginal());
+
+ // Case 2: Test with originals and base => Single split strategy with two splits on compacted
+ // base since the presence of a base will make the originals obsolete.
+ fs = new MockFileSystem(conf,
+ new MockFile("mock:/a/b/000000_0", 1000, new byte[1], new MockBlock("host1")),
+ new MockFile("mock:/a/b/000000_1", 1000, new byte[1], new MockBlock("host1")),
+ new MockFile("mock:/a/base_0000001/bucket_00000", 1000, new byte[1], new MockBlock("host1")),
+ new MockFile("mock:/a/base_0000001/bucket_00001", 1000, new byte[1], new MockBlock("host1")));
+ gen = new OrcInputFormat.FileGenerator(context, fs, new MockPath(fs, "mock:/a"), false, null);
+ splitStrategies = createSplitStrategies(context, gen);
+ assertEquals(1, splitStrategies.size());
+ assertEquals(true, splitStrategies.get(0) instanceof OrcInputFormat.ACIDSplitStrategy);
+ splits = ((OrcInputFormat.ACIDSplitStrategy)splitStrategies.get(0)).getSplits();
+ assertEquals(2, splits.size());
+ assertEquals("mock:/a/base_0000001/bucket_00000", splits.get(0).getPath().toUri().toString());
+ assertEquals("mock:/a/base_0000001/bucket_00001", splits.get(1).getPath().toUri().toString());
+ assertFalse(splits.get(0).isOriginal());
+ assertFalse(splits.get(1).isOriginal());
+
+ // Case 3: Test with originals and deltas => Two split strategies with two splits for each.
+ fs = new MockFileSystem(conf,
+ new MockFile("mock:/a/b/000000_0", 1000, new byte[1], new MockBlock("host1")),
+ new MockFile("mock:/a/b/000000_1", 1000, new byte[1], new MockBlock("host1")),
+ new MockFile("mock:/a/delta_0000001_0000001_0000/bucket_00000", 1000, new byte[1], new MockBlock("host1")),
+ new MockFile("mock:/a/delta_0000001_0000001_0000/bucket_00001", 1000, new byte[1], new MockBlock("host1")),
+ new MockFile("mock:/a/delete_delta_0000001_0000001_0000/bucket_00000", 1000, new byte[1], new MockBlock("host1")),
+ new MockFile("mock:/a/delete_delta_0000001_0000001_0000/bucket_00001", 1000, new byte[1], new MockBlock("host1")));
+ gen = new OrcInputFormat.FileGenerator(context, fs, new MockPath(fs, "mock:/a"), false, null);
+ splitStrategies = createSplitStrategies(context, gen);
+ assertEquals(2, splitStrategies.size());
+ assertEquals(true, splitStrategies.get(0) instanceof OrcInputFormat.ACIDSplitStrategy);
+ splits = ((OrcInputFormat.ACIDSplitStrategy)splitStrategies.get(0)).getSplits();
+ assertEquals(2, splits.size());
+ assertEquals("mock:/a/b/000000_0", splits.get(0).getPath().toUri().toString());
+ assertEquals("mock:/a/b/000000_1", splits.get(1).getPath().toUri().toString());
+ assertTrue(splits.get(0).isOriginal());
+ assertTrue(splits.get(1).isOriginal());
+ assertEquals(true, splitStrategies.get(1) instanceof OrcInputFormat.ACIDSplitStrategy);
+ splits = ((OrcInputFormat.ACIDSplitStrategy)splitStrategies.get(1)).getSplits();
+ assertEquals(2, splits.size());
+ assertEquals("mock:/a/delta_0000001_0000001_0000/bucket_00000", splits.get(0).getPath().toUri().toString());
+ assertEquals("mock:/a/delta_0000001_0000001_0000/bucket_00001", splits.get(1).getPath().toUri().toString());
+ assertFalse(splits.get(0).isOriginal());
+ assertFalse(splits.get(1).isOriginal());
+
+ // Case 4: Test with originals and deltas but now with only one bucket covered, i.e. we will
+ // have originals & insert_deltas for only one bucket, but the delete_deltas will be for two
+ // buckets => Two strategies with one split for each.
+ // When split-update is enabled, we do not need to account for buckets that aren't covered.
+ // The reason why we are able to do so is because the valid user data has already been considered
+ // as base for the covered buckets. Hence, the uncovered buckets do not have any relevant
+ // data and we can just ignore them.
+ fs = new MockFileSystem(conf,
+ new MockFile("mock:/a/b/000000_0", 1000, new byte[1], new MockBlock("host1")),
+ new MockFile("mock:/a/delta_0000001_0000001_0000/bucket_00000", 1000, new byte[1], new MockBlock("host1")),
+ new MockFile("mock:/a/delete_delta_0000001_0000001_0000/bucket_00000", 1000, new byte[1], new MockBlock("host1")),
+ new MockFile("mock:/a/delete_delta_0000001_0000001_0000/bucket_00001", 1000, new byte[1], new MockBlock("host1")));
+ gen = new OrcInputFormat.FileGenerator(context, fs, new MockPath(fs, "mock:/a"), false, null);
+ splitStrategies = createSplitStrategies(context, gen);
+ assertEquals(2, splitStrategies.size());
+ assertEquals(true, splitStrategies.get(0) instanceof OrcInputFormat.ACIDSplitStrategy);
+ splits = ((OrcInputFormat.ACIDSplitStrategy)splitStrategies.get(0)).getSplits();
+ assertEquals(1, splits.size());
+ assertEquals("mock:/a/b/000000_0", splits.get(0).getPath().toUri().toString());
+ assertTrue(splits.get(0).isOriginal());
+ assertEquals(true, splitStrategies.get(1) instanceof OrcInputFormat.ACIDSplitStrategy);
+ splits = ((OrcInputFormat.ACIDSplitStrategy)splitStrategies.get(1)).getSplits();
+ assertEquals(1, splits.size());
+ assertEquals("mock:/a/delta_0000001_0000001_0000/bucket_00000", splits.get(0).getPath().toUri().toString());
+ assertFalse(splits.get(0).isOriginal());
+
+ // Case 5: Test with originals, compacted_base, insert_deltas, delete_deltas (exhaustive test)
+ // This should just generate one strategy with splits for base and insert_deltas.
+ fs = new MockFileSystem(conf,
+ new MockFile("mock:/a/b/000000_0", 1000, new byte[1], new MockBlock("host1")),
+ new MockFile("mock:/a/b/000000_1", 1000, new byte[1], new MockBlock("host1")),
+ new MockFile("mock:/a/base_0000001/bucket_00000", 1000, new byte[1], new MockBlock("host1")),
+ new MockFile("mock:/a/base_0000001/bucket_00001", 1000, new byte[1], new MockBlock("host1")),
+ new MockFile("mock:/a/delta_0000002_0000002_0000/bucket_00000", 1000, new byte[1], new MockBlock("host1")),
+ new MockFile("mock:/a/delta_0000002_0000002_0000/bucket_00001", 1000, new byte[1], new MockBlock("host1")),
+ new MockFile("mock:/a/delete_delta_0000002_0000002_0000/bucket_00000", 1000, new byte[1], new MockBlock("host1")),
+ new MockFile("mock:/a/delete_delta_0000002_0000002_0000/bucket_00001", 1000, new byte[1], new MockBlock("host1")));
+ gen = new OrcInputFormat.FileGenerator(context, fs, new MockPath(fs, "mock:/a"), false, null);
+ splitStrategies = createSplitStrategies(context, gen);
+ assertEquals(1, splitStrategies.size());
+ assertEquals(true, splitStrategies.get(0) instanceof OrcInputFormat.ACIDSplitStrategy);
+ splits = ((OrcInputFormat.ACIDSplitStrategy)splitStrategies.get(0)).getSplits();
+ assertEquals(4, splits.size());
+ assertEquals("mock:/a/base_0000001/bucket_00000", splits.get(0).getPath().toUri().toString());
+ assertEquals("mock:/a/base_0000001/bucket_00001", splits.get(1).getPath().toUri().toString());
+ assertEquals("mock:/a/delta_0000002_0000002_0000/bucket_00000", splits.get(2).getPath().toUri().toString());
+ assertEquals("mock:/a/delta_0000002_0000002_0000/bucket_00001", splits.get(3).getPath().toUri().toString());
+ assertFalse(splits.get(0).isOriginal());
+ assertFalse(splits.get(1).isOriginal());
+ assertFalse(splits.get(2).isOriginal());
+ assertFalse(splits.get(3).isOriginal());
+ }
+
+ @Test
public void testBIStrategySplitBlockBoundary() throws Exception {
conf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, "BI");
OrcInputFormat.Context context = new OrcInputFormat.Context(conf);
@@ -617,9 +744,10 @@ public class TestInputOutputFormat {
OrcInputFormat.FileGenerator gen =
new OrcInputFormat.FileGenerator(context, fs,
new MockPath(fs, "mock:/a/b"), false, null);
- OrcInputFormat.SplitStrategy splitStrategy = createSplitStrategy(context, gen);
- assertEquals(true, splitStrategy instanceof OrcInputFormat.BISplitStrategy);
- List<OrcSplit> splits = splitStrategy.getSplits();
+ List<OrcInputFormat.SplitStrategy<?>> splitStrategies = createSplitStrategies(context, gen);
+ assertEquals(1, splitStrategies.size());
+ assertEquals(true, splitStrategies.get(0) instanceof OrcInputFormat.BISplitStrategy);
+ List<OrcSplit> splits = ((OrcInputFormat.BISplitStrategy)splitStrategies.get(0)).getSplits();
int numSplits = splits.size();
assertEquals(5, numSplits);
@@ -632,9 +760,10 @@ public class TestInputOutputFormat {
new MockFile("mock:/a/b/part-04", 1000, new byte[1000], new MockBlock("host1", "host2")));
gen = new OrcInputFormat.FileGenerator(context, fs,
new MockPath(fs, "mock:/a/b"), false, null);
- splitStrategy = createSplitStrategy(context, gen);
- assertEquals(true, splitStrategy instanceof OrcInputFormat.BISplitStrategy);
- splits = splitStrategy.getSplits();
+ splitStrategies = createSplitStrategies(context, gen);
+ assertEquals(1, splitStrategies.size());
+ assertEquals(true, splitStrategies.get(0) instanceof OrcInputFormat.BISplitStrategy);
+ splits = ((OrcInputFormat.BISplitStrategy)splitStrategies.get(0)).getSplits();
numSplits = splits.size();
assertEquals(5, numSplits);
@@ -652,9 +781,10 @@ public class TestInputOutputFormat {
new MockBlock("host1", "host2")));
gen = new OrcInputFormat.FileGenerator(context, fs,
new MockPath(fs, "mock:/a/b"), false, null);
- splitStrategy = createSplitStrategy(context, gen);
- assertEquals(true, splitStrategy instanceof OrcInputFormat.BISplitStrategy);
- splits = splitStrategy.getSplits();
+ splitStrategies = createSplitStrategies(context, gen);
+ assertEquals(1, splitStrategies.size());
+ assertEquals(true, splitStrategies.get(0) instanceof OrcInputFormat.BISplitStrategy);
+ splits = ((OrcInputFormat.BISplitStrategy)splitStrategies.get(0)).getSplits();
numSplits = splits.size();
assertEquals(10, numSplits);
@@ -672,9 +802,10 @@ public class TestInputOutputFormat {
new MockBlock("host1", "host2")));
gen = new OrcInputFormat.FileGenerator(context, fs,
new MockPath(fs, "mock:/a/b"), false, null);
- splitStrategy = createSplitStrategy(context, gen);
- assertEquals(true, splitStrategy instanceof OrcInputFormat.BISplitStrategy);
- splits = splitStrategy.getSplits();
+ splitStrategies = createSplitStrategies(context, gen);
+ assertEquals(1, splitStrategies.size());
+ assertEquals(true, splitStrategies.get(0) instanceof OrcInputFormat.BISplitStrategy);
+ splits = ((OrcInputFormat.BISplitStrategy)splitStrategies.get(0)).getSplits();
numSplits = splits.size();
assertEquals(10, numSplits);
@@ -692,9 +823,10 @@ public class TestInputOutputFormat {
new MockBlock("host1", "host2"), new MockBlock("host1", "host2")));
gen = new OrcInputFormat.FileGenerator(context, fs,
new MockPath(fs, "mock:/a/b"), false, null);
- splitStrategy = createSplitStrategy(context, gen);
- assertEquals(true, splitStrategy instanceof OrcInputFormat.BISplitStrategy);
- splits = splitStrategy.getSplits();
+ splitStrategies = createSplitStrategies(context, gen);
+ assertEquals(1, splitStrategies.size());
+ assertEquals(true, splitStrategies.get(0) instanceof OrcInputFormat.BISplitStrategy);
+ splits = ((OrcInputFormat.BISplitStrategy)splitStrategies.get(0)).getSplits();
numSplits = splits.size();
assertEquals(15, numSplits);
}
@@ -717,22 +849,23 @@ public class TestInputOutputFormat {
OrcInputFormat.CombinedCtx combineCtx = new OrcInputFormat.CombinedCtx();
// The first directory becomes the base for combining.
- SplitStrategy<?> ss = createOrCombineStrategy(context, fs, "mock:/a/1", combineCtx);
- assertNull(ss);
+ List<SplitStrategy<?>> ss = createOrCombineStrategies(context, fs, "mock:/a/1", combineCtx);
+ assertTrue(ss.isEmpty());
assertTrue(combineCtx.combined instanceof OrcInputFormat.ETLSplitStrategy);
OrcInputFormat.ETLSplitStrategy etlSs = combineCtx.combined;
assertEquals(2, etlSs.files.size());
assertTrue(etlSs.isOriginal);
assertEquals(1, etlSs.dirs.size());
// The second one should be combined into the first.
- ss = createOrCombineStrategy(context, fs, "mock:/a/2", combineCtx);
- assertNull(ss);
+ ss = createOrCombineStrategies(context, fs, "mock:/a/2", combineCtx);
+ assertTrue(ss.isEmpty());
assertTrue(combineCtx.combined instanceof OrcInputFormat.ETLSplitStrategy);
assertEquals(4, etlSs.files.size());
assertEquals(2, etlSs.dirs.size());
// The third one has the base file, so it shouldn't be combined but could be a base.
- ss = createOrCombineStrategy(context, fs, "mock:/a/3", combineCtx);
- assertSame(etlSs, ss);
+ ss = createOrCombineStrategies(context, fs, "mock:/a/3", combineCtx);
+ assertEquals(1, ss.size());
+ assertSame(etlSs, ss.get(0));
assertEquals(4, etlSs.files.size());
assertEquals(2, etlSs.dirs.size());
assertTrue(combineCtx.combined instanceof OrcInputFormat.ETLSplitStrategy);
@@ -741,34 +874,37 @@ public class TestInputOutputFormat {
assertFalse(etlSs.isOriginal);
assertEquals(1, etlSs.dirs.size());
// Try the first again, it would not be combined and we'd retain the old base (less files).
- ss = createOrCombineStrategy(context, fs, "mock:/a/1", combineCtx);
- assertTrue(ss instanceof OrcInputFormat.ETLSplitStrategy);
- assertNotSame(etlSs, ss);
- OrcInputFormat.ETLSplitStrategy rejectedEtlSs = (OrcInputFormat.ETLSplitStrategy)ss;
+ ss = createOrCombineStrategies(context, fs, "mock:/a/1", combineCtx);
+ assertEquals(1, ss.size());
+ assertTrue(ss.get(0) instanceof OrcInputFormat.ETLSplitStrategy);
+ assertNotSame(etlSs, ss.get(0));
+ OrcInputFormat.ETLSplitStrategy rejectedEtlSs = (OrcInputFormat.ETLSplitStrategy)ss.get(0);
assertEquals(2, rejectedEtlSs.files.size());
assertEquals(1, rejectedEtlSs.dirs.size());
assertTrue(rejectedEtlSs.isOriginal);
assertEquals(1, etlSs.files.size());
assertEquals(1, etlSs.dirs.size());
// The fourth could be combined again.
- ss = createOrCombineStrategy(context, fs, "mock:/a/4", combineCtx);
- assertNull(ss);
+ ss = createOrCombineStrategies(context, fs, "mock:/a/4", combineCtx);
+ assertTrue(ss.isEmpty());
assertTrue(combineCtx.combined instanceof OrcInputFormat.ETLSplitStrategy);
assertEquals(2, etlSs.files.size());
assertEquals(2, etlSs.dirs.size());
// The fifth will not be combined because of delta files.
- ss = createOrCombineStrategy(context, fs, "mock:/a/5", combineCtx);
- assertTrue(ss instanceof OrcInputFormat.ETLSplitStrategy);
+ ss = createOrCombineStrategies(context, fs, "mock:/a/5", combineCtx);
+ assertEquals(1, ss.size());
+ assertTrue(ss.get(0) instanceof OrcInputFormat.ETLSplitStrategy);
assertNotSame(etlSs, ss);
assertEquals(2, etlSs.files.size());
assertEquals(2, etlSs.dirs.size());
}
- public SplitStrategy<?> createOrCombineStrategy(OrcInputFormat.Context context,
+ public List<SplitStrategy<?>> createOrCombineStrategies(OrcInputFormat.Context context,
MockFileSystem fs, String path, OrcInputFormat.CombinedCtx combineCtx) throws IOException {
OrcInputFormat.AcidDirInfo adi = createAdi(context, fs, path);
- return OrcInputFormat.determineSplitStrategy(combineCtx, context,
- adi.fs, adi.splitPath, adi.acidInfo, adi.baseOrOriginalFiles, null, null, true);
+ return OrcInputFormat.determineSplitStrategies(combineCtx, context,
+ adi.fs, adi.splitPath, adi.acidInfo, adi.baseFiles, adi.parsedDeltas,
+ null, null, true);
}
public OrcInputFormat.AcidDirInfo createAdi(
@@ -777,11 +913,12 @@ public class TestInputOutputFormat {
context, fs, new MockPath(fs, path), false, null).call();
}
- private OrcInputFormat.SplitStrategy createSplitStrategy(
+ private List<OrcInputFormat.SplitStrategy<?>> createSplitStrategies(
OrcInputFormat.Context context, OrcInputFormat.FileGenerator gen) throws IOException {
OrcInputFormat.AcidDirInfo adi = gen.call();
- return OrcInputFormat.determineSplitStrategy(
- null, context, adi.fs, adi.splitPath, adi.acidInfo, adi.baseOrOriginalFiles, null, null, true);
+ return OrcInputFormat.determineSplitStrategies(
+ null, context, adi.fs, adi.splitPath, adi.acidInfo, adi.baseFiles, adi.parsedDeltas,
+ null, null, true);
}
public static class MockBlock {
[3/3] hive git commit: HIVE-14035 Enable predicate pushdown to delta
files created by ACID Transactions (Saket Saurabh via Eugene Koifman)
Posted by ek...@apache.org.
HIVE-14035 Enable predicate pushdown to delta files created by ACID Transactions (Saket Saurabh via Eugene Koifman)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/ecab0d07
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/ecab0d07
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/ecab0d07
Branch: refs/heads/master
Commit: ecab0d072b50a8d85dca6e850e47425d96c1ac09
Parents: 333fa87
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Fri Aug 12 10:31:39 2016 -0700
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Fri Aug 12 10:31:39 2016 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 57 +-
.../mapreduce/FosterStorageHandler.java | 3 +
.../streaming/AbstractRecordWriter.java | 7 +
.../hive/ql/txn/compactor/TestCompactor.java | 318 +++++++++-
metastore/if/hive_metastore.thrift | 1 +
.../thrift/gen-cpp/hive_metastore_constants.cpp | 2 +
.../thrift/gen-cpp/hive_metastore_constants.h | 1 +
.../metastore/api/hive_metastoreConstants.java | 2 +
.../src/gen/thrift/gen-php/metastore/Types.php | 5 +
.../thrift/gen-py/hive_metastore/constants.py | 1 +
.../thrift/gen-rb/hive_metastore_constants.rb | 2 +
.../TransactionalValidationListener.java | 126 +++-
.../apache/hadoop/hive/ql/exec/FetchTask.java | 1 +
.../hadoop/hive/ql/exec/SMBMapJoinOperator.java | 1 +
.../hadoop/hive/ql/exec/mr/MapredLocalTask.java | 1 +
.../hadoop/hive/ql/io/AcidOutputFormat.java | 25 +-
.../org/apache/hadoop/hive/ql/io/AcidUtils.java | 456 +++++++++++++--
.../hadoop/hive/ql/io/HiveInputFormat.java | 1 +
.../hadoop/hive/ql/io/orc/OrcInputFormat.java | 248 ++++++--
.../hadoop/hive/ql/io/orc/OrcRecordUpdater.java | 152 ++++-
.../apache/hadoop/hive/ql/metadata/Hive.java | 95 +--
.../hadoop/hive/ql/plan/TableScanDesc.java | 9 +
.../hive/ql/txn/compactor/CompactorMR.java | 99 +++-
.../apache/hadoop/hive/ql/TestTxnCommands2.java | 106 +++-
.../ql/TestTxnCommands2WithSplitUpdate.java | 584 +++++++++++++++++++
.../apache/hadoop/hive/ql/io/TestAcidUtils.java | 275 ++++++++-
.../hive/ql/io/orc/TestInputOutputFormat.java | 225 +++++--
27 files changed, 2515 insertions(+), 288 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 3e9f6ec..0abb788 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -18,7 +18,32 @@
package org.apache.hadoop.hive.conf;
-import com.google.common.base.Joiner;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintStream;
+import java.io.UnsupportedEncodingException;
+import java.net.URI;
+import java.net.URL;
+import java.net.URLDecoder;
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import javax.security.auth.login.LoginException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
@@ -42,27 +67,7 @@ import org.apache.hive.common.HiveCompat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.security.auth.login.LoginException;
-
-import java.io.*;
-import java.net.URI;
-import java.net.URL;
-import java.net.URLDecoder;
-import java.net.URLEncoder;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
+import com.google.common.base.Joiner;
/**
* Hive Configuration.
@@ -261,6 +266,7 @@ public class HiveConf extends Configuration {
HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
HiveConf.ConfVars.HIVE_TXN_MANAGER,
HiveConf.ConfVars.HIVE_TXN_TIMEOUT,
+ HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES,
HiveConf.ConfVars.HIVE_TXN_HEARTBEAT_THREADPOOL_SIZE,
HiveConf.ConfVars.HIVE_TXN_MAX_OPEN_BATCH,
HiveConf.ConfVars.HIVE_TXN_RETRYABLE_SQLEX_REGEX,
@@ -1762,6 +1768,13 @@ public class HiveConf extends Configuration {
" of the lock manager is dumped to log file. This is for debugging. See also " +
"hive.lock.numretries and hive.lock.sleep.between.retries."),
+ HIVE_TXN_OPERATIONAL_PROPERTIES("hive.txn.operational.properties", 0,
+ "Sets the operational properties that control the appropriate behavior for various\n"
+ + "versions of the Hive ACID subsystem. Setting it to zero will turn on the legacy mode\n"
+ + "for ACID, while setting it to one will enable a split-update feature found in the newer\n"
+ + "version of Hive ACID subsystem. Mostly it is intended to be used as an internal property\n"
+ + "for future versions of ACID. (See HIVE-14035 for details.)"),
+
HIVE_MAX_OPEN_TXNS("hive.max.open.txns", 100000, "Maximum number of open transactions. If \n" +
"current open transactions reach this limit, future open transaction requests will be \n" +
"rejected, until this number goes below the limit."),
http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java
----------------------------------------------------------------------
diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java
index 14f7316..b970153 100644
--- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java
+++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java
@@ -133,6 +133,9 @@ public class FosterStorageHandler extends DefaultStorageHandler {
boolean isAcidTable = AcidUtils.isTablePropertyTransactional(tableProperties);
AcidUtils.setTransactionalTableScan(jobProperties, isAcidTable);
+ AcidUtils.AcidOperationalProperties acidOperationalProperties =
+ AcidUtils.getAcidOperationalProperties(tableProperties);
+ AcidUtils.setAcidOperationalProperties(jobProperties, acidOperationalProperties);
}
} catch (IOException e) {
throw new IllegalStateException("Failed to set output path", e);
http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
index 974c6b8..b8615cb 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
@@ -48,6 +48,7 @@ import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
+import java.util.Properties;
public abstract class AbstractRecordWriter implements RecordWriter {
@@ -265,10 +266,16 @@ public abstract class AbstractRecordWriter implements RecordWriter {
private RecordUpdater createRecordUpdater(int bucketId, Long minTxnId, Long maxTxnID)
throws IOException, SerializationError {
try {
+ // Initialize table properties from the table parameters. This is required because the table
+ // may define certain table parameters that may be required while writing. The table parameter
+ // 'transactional_properties' is one such example.
+ Properties tblProperties = new Properties();
+ tblProperties.putAll(tbl.getParameters());
return outf.getRecordUpdater(partitionPath,
new AcidOutputFormat.Options(conf)
.inspector(getSerde().getObjectInspector())
.bucket(bucketId)
+ .tableProperties(tblProperties)
.minimumTransactionId(minTxnId)
.maximumTransactionId(maxTxnID)
.statementId(-1)
http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
index 731caa8..f81752f 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
@@ -17,6 +17,19 @@
*/
package org.apache.hadoop.hive.ql.txn.compactor;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -68,20 +81,6 @@ import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.TimeUnit;
-
/**
*/
public class TestCompactor {
@@ -857,6 +856,297 @@ public class TestCompactor {
}
}
+ @Test
+ public void majorCompactWhileStreamingForSplitUpdate() throws Exception {
+ String dbName = "default";
+ String tblName = "cws";
+ List<String> colNames = Arrays.asList("a", "b");
+ String columnNamesProperty = "a,b";
+ String columnTypesProperty = "int:string";
+ executeStatementOnDriver("drop table if exists " + tblName, driver);
+ executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
+ " CLUSTERED BY(a) INTO 2 BUCKETS" + //currently ACID requires table to be bucketed
+ " STORED AS ORC TBLPROPERTIES ('transactional'='true', "
+ + "'transactional_properties'='default') ", driver); // this turns on split-update U=D+I
+
+ HiveEndPoint endPt = new HiveEndPoint(null, dbName, tblName, null);
+ DelimitedInputWriter writer = new DelimitedInputWriter(new String[] {"a","b"},",", endPt);
+ StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName());
+ try {
+ // Write a couple of batches
+ for (int i = 0; i < 2; i++) {
+ writeBatch(connection, writer, false);
+ }
+
+ // Start a third batch, but don't close it.
+ writeBatch(connection, writer, true);
+
+ // Now, compact
+ TxnStore txnHandler = TxnUtils.getTxnStore(conf);
+ txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MAJOR));
+ Worker t = new Worker();
+ t.setThreadId((int) t.getId());
+ t.setHiveConf(conf);
+ AtomicBoolean stop = new AtomicBoolean(true);
+ AtomicBoolean looped = new AtomicBoolean();
+ t.init(stop, looped);
+ t.run();
+
+ // Find the location of the table
+ IMetaStoreClient msClient = new HiveMetaStoreClient(conf);
+ Table table = msClient.getTable(dbName, tblName);
+ FileSystem fs = FileSystem.get(conf);
+ FileStatus[] stat =
+ fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.baseFileFilter);
+ if (1 != stat.length) {
+ Assert.fail("Expecting 1 file \"base_0000004\" and found " + stat.length + " files " + Arrays.toString(stat));
+ }
+ String name = stat[0].getPath().getName();
+ Assert.assertEquals(name, "base_0000004");
+ checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 1L, 4L);
+ } finally {
+ connection.close();
+ }
+ }
+
+ @Test
+ public void testMinorCompactionForSplitUpdateWithInsertsAndDeletes() throws Exception {
+ String agentInfo = "UT_" + Thread.currentThread().getName();
+ String dbName = "default";
+ String tblName = "cws";
+ List<String> colNames = Arrays.asList("a", "b");
+ String columnNamesProperty = "a,b";
+ String columnTypesProperty = "int:string";
+ executeStatementOnDriver("drop table if exists " + tblName, driver);
+ executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
+ " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed
+ " STORED AS ORC TBLPROPERTIES ('transactional'='true',"
+ + "'transactional_properties'='default')", driver);
+
+ // Insert some data -> this will generate only insert deltas and no delete deltas: delta_1_1
+ executeStatementOnDriver("INSERT INTO " + tblName +"(a,b) VALUES(1, 'foo')", driver);
+
+ // Insert some data -> this will again generate only insert deltas and no delete deltas: delta_2_2
+ executeStatementOnDriver("INSERT INTO " + tblName +"(a,b) VALUES(2, 'bar')", driver);
+
+ // Delete some data -> this will generate only delete deltas and no insert deltas: delete_delta_3_3
+ executeStatementOnDriver("DELETE FROM " + tblName +" WHERE a = 2", driver);
+
+ // Now, compact -> Compaction produces a single range for both delta and delete delta
+ // That is, both delta and delete_deltas would be compacted into delta_1_3 and delete_delta_1_3
+ // even though there are only two delta_1_1, delta_2_2 and one delete_delta_3_3.
+ TxnStore txnHandler = TxnUtils.getTxnStore(conf);
+ txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MINOR));
+ Worker t = new Worker();
+ t.setThreadId((int) t.getId());
+ t.setHiveConf(conf);
+ AtomicBoolean stop = new AtomicBoolean(true);
+ AtomicBoolean looped = new AtomicBoolean();
+ t.init(stop, looped);
+ t.run();
+
+ // Find the location of the table
+ IMetaStoreClient msClient = new HiveMetaStoreClient(conf);
+ Table table = msClient.getTable(dbName, tblName);
+ FileSystem fs = FileSystem.get(conf);
+
+ // Verify that we have got correct set of deltas.
+ FileStatus[] stat =
+ fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.deltaFileFilter);
+ String[] deltas = new String[stat.length];
+ Path minorCompactedDelta = null;
+ for (int i = 0; i < deltas.length; i++) {
+ deltas[i] = stat[i].getPath().getName();
+ if (deltas[i].equals("delta_0000001_0000003")) {
+ minorCompactedDelta = stat[i].getPath();
+ }
+ }
+ Arrays.sort(deltas);
+ String[] expectedDeltas = new String[]{"delta_0000001_0000001_0000", "delta_0000001_0000003", "delta_0000002_0000002_0000"};
+ if (!Arrays.deepEquals(expectedDeltas, deltas)) {
+ Assert.fail("Expected: " + Arrays.toString(expectedDeltas) + ", found: " + Arrays.toString(deltas));
+ }
+ checkExpectedTxnsPresent(null, new Path[]{minorCompactedDelta}, columnNamesProperty, columnTypesProperty, 0, 1L, 2L);
+
+ // Verify that we have got correct set of delete_deltas.
+ FileStatus[] deleteDeltaStat =
+ fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.deleteEventDeltaDirFilter);
+ String[] deleteDeltas = new String[deleteDeltaStat.length];
+ Path minorCompactedDeleteDelta = null;
+ for (int i = 0; i < deleteDeltas.length; i++) {
+ deleteDeltas[i] = deleteDeltaStat[i].getPath().getName();
+ if (deleteDeltas[i].equals("delete_delta_0000001_0000003")) {
+ minorCompactedDeleteDelta = deleteDeltaStat[i].getPath();
+ }
+ }
+ Arrays.sort(deleteDeltas);
+ String[] expectedDeleteDeltas = new String[]{"delete_delta_0000001_0000003", "delete_delta_0000003_0000003_0000"};
+ if (!Arrays.deepEquals(expectedDeleteDeltas, deleteDeltas)) {
+ Assert.fail("Expected: " + Arrays.toString(expectedDeleteDeltas) + ", found: " + Arrays.toString(deleteDeltas));
+ }
+ checkExpectedTxnsPresent(null, new Path[]{minorCompactedDeleteDelta}, columnNamesProperty, columnTypesProperty, 0, 2L, 2L);
+ }
+
+ @Test
+ public void testMinorCompactionForSplitUpdateWithOnlyInserts() throws Exception {
+ String agentInfo = "UT_" + Thread.currentThread().getName();
+ String dbName = "default";
+ String tblName = "cws";
+ List<String> colNames = Arrays.asList("a", "b");
+ String columnNamesProperty = "a,b";
+ String columnTypesProperty = "int:string";
+ executeStatementOnDriver("drop table if exists " + tblName, driver);
+ executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
+ " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed
+ " STORED AS ORC TBLPROPERTIES ('transactional'='true',"
+ + "'transactional_properties'='default')", driver);
+
+ // Insert some data -> this will generate only insert deltas and no delete deltas: delta_1_1
+ executeStatementOnDriver("INSERT INTO " + tblName +"(a,b) VALUES(1, 'foo')", driver);
+
+ // Insert some data -> this will again generate only insert deltas and no delete deltas: delta_2_2
+ executeStatementOnDriver("INSERT INTO " + tblName +"(a,b) VALUES(2, 'bar')", driver);
+
+ // Now, compact
+ // One important thing to note in this test is that minor compaction always produces
+ // delta_x_y and a counterpart delete_delta_x_y, even when there are no delete_delta events.
+ // Such a choice has been made to simplify processing of AcidUtils.getAcidState().
+
+ TxnStore txnHandler = TxnUtils.getTxnStore(conf);
+ txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MINOR));
+ Worker t = new Worker();
+ t.setThreadId((int) t.getId());
+ t.setHiveConf(conf);
+ AtomicBoolean stop = new AtomicBoolean(true);
+ AtomicBoolean looped = new AtomicBoolean();
+ t.init(stop, looped);
+ t.run();
+
+ // Find the location of the table
+ IMetaStoreClient msClient = new HiveMetaStoreClient(conf);
+ Table table = msClient.getTable(dbName, tblName);
+ FileSystem fs = FileSystem.get(conf);
+
+ // Verify that we have got correct set of deltas.
+ FileStatus[] stat =
+ fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.deltaFileFilter);
+ String[] deltas = new String[stat.length];
+ Path minorCompactedDelta = null;
+ for (int i = 0; i < deltas.length; i++) {
+ deltas[i] = stat[i].getPath().getName();
+ if (deltas[i].equals("delta_0000001_0000002")) {
+ minorCompactedDelta = stat[i].getPath();
+ }
+ }
+ Arrays.sort(deltas);
+ String[] expectedDeltas = new String[]{"delta_0000001_0000001_0000", "delta_0000001_0000002", "delta_0000002_0000002_0000"};
+ if (!Arrays.deepEquals(expectedDeltas, deltas)) {
+ Assert.fail("Expected: " + Arrays.toString(expectedDeltas) + ", found: " + Arrays.toString(deltas));
+ }
+ checkExpectedTxnsPresent(null, new Path[]{minorCompactedDelta}, columnNamesProperty, columnTypesProperty, 0, 1L, 2L);
+
+ // Verify that we have got correct set of delete_deltas.
+ FileStatus[] deleteDeltaStat =
+ fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.deleteEventDeltaDirFilter);
+ String[] deleteDeltas = new String[deleteDeltaStat.length];
+ Path minorCompactedDeleteDelta = null;
+ for (int i = 0; i < deleteDeltas.length; i++) {
+ deleteDeltas[i] = deleteDeltaStat[i].getPath().getName();
+ if (deleteDeltas[i].equals("delete_delta_0000001_0000002")) {
+ minorCompactedDeleteDelta = deleteDeltaStat[i].getPath();
+ }
+ }
+ Arrays.sort(deleteDeltas);
+ String[] expectedDeleteDeltas = new String[]{"delete_delta_0000001_0000002"};
+ if (!Arrays.deepEquals(expectedDeleteDeltas, deleteDeltas)) {
+ Assert.fail("Expected: " + Arrays.toString(expectedDeleteDeltas) + ", found: " + Arrays.toString(deleteDeltas));
+ }
+ // There should be no rows in the delete_delta because there have been no delete events.
+ checkExpectedTxnsPresent(null, new Path[]{minorCompactedDeleteDelta}, columnNamesProperty, columnTypesProperty, 0, 0L, 0L);
+ }
+
+ @Test
+ public void minorCompactWhileStreamingWithSplitUpdate() throws Exception {
+ String dbName = "default";
+ String tblName = "cws";
+ List<String> colNames = Arrays.asList("a", "b");
+ String columnNamesProperty = "a,b";
+ String columnTypesProperty = "int:string";
+ executeStatementOnDriver("drop table if exists " + tblName, driver);
+ executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
+ " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed
+ " STORED AS ORC TBLPROPERTIES ('transactional'='true',"
+ + "'transactional_properties'='default')", driver);
+
+ HiveEndPoint endPt = new HiveEndPoint(null, dbName, tblName, null);
+ DelimitedInputWriter writer = new DelimitedInputWriter(new String[] {"a","b"},",", endPt);
+ StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName());
+ try {
+ // Write a couple of batches
+ for (int i = 0; i < 2; i++) {
+ writeBatch(connection, writer, false);
+ }
+
+ // Start a third batch, but don't close it.
+ writeBatch(connection, writer, true);
+
+ // Now, compact
+ TxnStore txnHandler = TxnUtils.getTxnStore(conf);
+ txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MINOR));
+ Worker t = new Worker();
+ t.setThreadId((int) t.getId());
+ t.setHiveConf(conf);
+ AtomicBoolean stop = new AtomicBoolean(true);
+ AtomicBoolean looped = new AtomicBoolean();
+ t.init(stop, looped);
+ t.run();
+
+ // Find the location of the table
+ IMetaStoreClient msClient = new HiveMetaStoreClient(conf);
+ Table table = msClient.getTable(dbName, tblName);
+ FileSystem fs = FileSystem.get(conf);
+ FileStatus[] stat =
+ fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.deltaFileFilter);
+ String[] names = new String[stat.length];
+ Path resultFile = null;
+ for (int i = 0; i < names.length; i++) {
+ names[i] = stat[i].getPath().getName();
+ if (names[i].equals("delta_0000001_0000004")) {
+ resultFile = stat[i].getPath();
+ }
+ }
+ Arrays.sort(names);
+ String[] expected = new String[]{"delta_0000001_0000002",
+ "delta_0000001_0000004", "delta_0000003_0000004", "delta_0000005_0000006"};
+ if (!Arrays.deepEquals(expected, names)) {
+ Assert.fail("Expected: " + Arrays.toString(expected) + ", found: " + Arrays.toString(names));
+ }
+ checkExpectedTxnsPresent(null, new Path[]{resultFile},columnNamesProperty, columnTypesProperty, 0, 1L, 4L);
+
+ // Verify that we have got correct set of delete_deltas also
+ FileStatus[] deleteDeltaStat =
+ fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.deleteEventDeltaDirFilter);
+ String[] deleteDeltas = new String[deleteDeltaStat.length];
+ Path minorCompactedDeleteDelta = null;
+ for (int i = 0; i < deleteDeltas.length; i++) {
+ deleteDeltas[i] = deleteDeltaStat[i].getPath().getName();
+ if (deleteDeltas[i].equals("delete_delta_0000001_0000004")) {
+ minorCompactedDeleteDelta = deleteDeltaStat[i].getPath();
+ }
+ }
+ Arrays.sort(deleteDeltas);
+ String[] expectedDeleteDeltas = new String[]{"delete_delta_0000001_0000004"};
+ if (!Arrays.deepEquals(expectedDeleteDeltas, deleteDeltas)) {
+ Assert.fail("Expected: " + Arrays.toString(expectedDeleteDeltas) + ", found: " + Arrays.toString(deleteDeltas));
+ }
+ // There should be no rows in the delete_delta because there have been no delete events.
+ checkExpectedTxnsPresent(null, new Path[]{minorCompactedDeleteDelta}, columnNamesProperty, columnTypesProperty, 0, 0L, 0L);
+
+ } finally {
+ connection.close();
+ }
+ }
+
/**
* Users have the choice of specifying compaction related tblproperties either in CREATE TABLE
* statement or in ALTER TABLE .. COMPACT statement. This tests both cases.
http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/metastore/if/hive_metastore.thrift
----------------------------------------------------------------------
diff --git a/metastore/if/hive_metastore.thrift b/metastore/if/hive_metastore.thrift
index a2e35b8..872c0f3 100755
--- a/metastore/if/hive_metastore.thrift
+++ b/metastore/if/hive_metastore.thrift
@@ -1475,5 +1475,6 @@ const string FILE_OUTPUT_FORMAT = "file.outputformat",
const string META_TABLE_STORAGE = "storage_handler",
const string TABLE_IS_TRANSACTIONAL = "transactional",
const string TABLE_NO_AUTO_COMPACT = "no_auto_compaction",
+const string TABLE_TRANSACTIONAL_PROPERTIES = "transactional_properties",
http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/metastore/src/gen/thrift/gen-cpp/hive_metastore_constants.cpp
----------------------------------------------------------------------
diff --git a/metastore/src/gen/thrift/gen-cpp/hive_metastore_constants.cpp b/metastore/src/gen/thrift/gen-cpp/hive_metastore_constants.cpp
index f982bf2..1cbd176 100644
--- a/metastore/src/gen/thrift/gen-cpp/hive_metastore_constants.cpp
+++ b/metastore/src/gen/thrift/gen-cpp/hive_metastore_constants.cpp
@@ -57,6 +57,8 @@ hive_metastoreConstants::hive_metastoreConstants() {
TABLE_NO_AUTO_COMPACT = "no_auto_compaction";
+ TABLE_TRANSACTIONAL_PROPERTIES = "transactional_properties";
+
}
}}} // namespace
http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/metastore/src/gen/thrift/gen-cpp/hive_metastore_constants.h
----------------------------------------------------------------------
diff --git a/metastore/src/gen/thrift/gen-cpp/hive_metastore_constants.h b/metastore/src/gen/thrift/gen-cpp/hive_metastore_constants.h
index ae14bd1..3d068c3 100644
--- a/metastore/src/gen/thrift/gen-cpp/hive_metastore_constants.h
+++ b/metastore/src/gen/thrift/gen-cpp/hive_metastore_constants.h
@@ -38,6 +38,7 @@ class hive_metastoreConstants {
std::string META_TABLE_STORAGE;
std::string TABLE_IS_TRANSACTIONAL;
std::string TABLE_NO_AUTO_COMPACT;
+ std::string TABLE_TRANSACTIONAL_PROPERTIES;
};
extern const hive_metastoreConstants g_hive_metastore_constants;
http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java
----------------------------------------------------------------------
diff --git a/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java b/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java
index 5a666f2..8de8896 100644
--- a/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java
+++ b/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java
@@ -82,4 +82,6 @@ public class hive_metastoreConstants {
public static final String TABLE_NO_AUTO_COMPACT = "no_auto_compaction";
+ public static final String TABLE_TRANSACTIONAL_PROPERTIES = "transactional_properties";
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/metastore/src/gen/thrift/gen-php/metastore/Types.php
----------------------------------------------------------------------
diff --git a/metastore/src/gen/thrift/gen-php/metastore/Types.php b/metastore/src/gen/thrift/gen-php/metastore/Types.php
index d6f7f49..2f9cc9b 100644
--- a/metastore/src/gen/thrift/gen-php/metastore/Types.php
+++ b/metastore/src/gen/thrift/gen-php/metastore/Types.php
@@ -18865,6 +18865,7 @@ final class Constant extends \Thrift\Type\TConstant {
static protected $META_TABLE_STORAGE;
static protected $TABLE_IS_TRANSACTIONAL;
static protected $TABLE_NO_AUTO_COMPACT;
+ static protected $TABLE_TRANSACTIONAL_PROPERTIES;
static protected function init_DDL_TIME() {
return "transient_lastDdlTime";
@@ -18957,6 +18958,10 @@ final class Constant extends \Thrift\Type\TConstant {
static protected function init_TABLE_NO_AUTO_COMPACT() {
return "no_auto_compaction";
}
+
+ static protected function init_TABLE_TRANSACTIONAL_PROPERTIES() {
+ return "transactional_properties";
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/metastore/src/gen/thrift/gen-py/hive_metastore/constants.py
----------------------------------------------------------------------
diff --git a/metastore/src/gen/thrift/gen-py/hive_metastore/constants.py b/metastore/src/gen/thrift/gen-py/hive_metastore/constants.py
index d1c07a5..5100236 100644
--- a/metastore/src/gen/thrift/gen-py/hive_metastore/constants.py
+++ b/metastore/src/gen/thrift/gen-py/hive_metastore/constants.py
@@ -32,3 +32,4 @@ FILE_OUTPUT_FORMAT = "file.outputformat"
META_TABLE_STORAGE = "storage_handler"
TABLE_IS_TRANSACTIONAL = "transactional"
TABLE_NO_AUTO_COMPACT = "no_auto_compaction"
+TABLE_TRANSACTIONAL_PROPERTIES = "transactional_properties"
http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/metastore/src/gen/thrift/gen-rb/hive_metastore_constants.rb
----------------------------------------------------------------------
diff --git a/metastore/src/gen/thrift/gen-rb/hive_metastore_constants.rb b/metastore/src/gen/thrift/gen-rb/hive_metastore_constants.rb
index eeccc84..6aa7143 100644
--- a/metastore/src/gen/thrift/gen-rb/hive_metastore_constants.rb
+++ b/metastore/src/gen/thrift/gen-rb/hive_metastore_constants.rb
@@ -53,3 +53,5 @@ TABLE_IS_TRANSACTIONAL = %q"transactional"
TABLE_NO_AUTO_COMPACT = %q"no_auto_compaction"
+TABLE_TRANSACTIONAL_PROPERTIES = %q"transactional_properties"
+
http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java b/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java
index 3e74675..0f08f43 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java
@@ -17,25 +17,35 @@
*/
package org.apache.hadoop.hive.metastore;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.metastore.api.*;
+import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.events.PreAlterTableEvent;
import org.apache.hadoop.hive.metastore.events.PreCreateTableEvent;
import org.apache.hadoop.hive.metastore.events.PreEventContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-final class TransactionalValidationListener extends MetaStorePreEventListener {
+public final class TransactionalValidationListener extends MetaStorePreEventListener {
public static final Logger LOG = LoggerFactory.getLogger(TransactionalValidationListener.class);
+ // These constants are also imported by org.apache.hadoop.hive.ql.io.AcidUtils.
+ public static final String DEFAULT_TRANSACTIONAL_PROPERTY = "default";
+ public static final String LEGACY_TRANSACTIONAL_PROPERTY = "legacy";
+
TransactionalValidationListener(Configuration conf) {
super(conf);
}
+ @Override
public void onEvent(PreEventContext context) throws MetaException, NoSuchObjectException,
InvalidOperationException {
switch (context.getEventType()) {
@@ -60,6 +70,8 @@ final class TransactionalValidationListener extends MetaStorePreEventListener {
/**
* once a table is marked transactional, you cannot go back. Enforce this.
+ * Also in current version, 'transactional_properties' of the table cannot be altered after
+ * the table is created. Any attempt to alter it will throw a MetaException.
*/
private void handleAlterTableTransactionalProp(PreAlterTableEvent context) throws MetaException {
Table newTable = context.getNewTable();
@@ -70,12 +82,22 @@ final class TransactionalValidationListener extends MetaStorePreEventListener {
Set<String> keys = new HashSet<>(parameters.keySet());
String transactionalValue = null;
boolean transactionalValuePresent = false;
+ boolean isTransactionalPropertiesPresent = false;
+ String transactionalPropertiesValue = null;
+ boolean hasValidTransactionalValue = false;
+
for (String key : keys) {
if(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL.equalsIgnoreCase(key)) {
transactionalValuePresent = true;
transactionalValue = parameters.get(key);
parameters.remove(key);
}
+ if(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES.equalsIgnoreCase(key)) {
+ isTransactionalPropertiesPresent = true;
+ transactionalPropertiesValue = parameters.get(key);
+ // Do not remove the parameter yet, because we have separate initialization routine
+ // that will use it down below.
+ }
}
if (transactionalValuePresent) {
//normalize prop name
@@ -91,24 +113,52 @@ final class TransactionalValidationListener extends MetaStorePreEventListener {
throw new MetaException(newTable.getDbName() + "." + newTable.getTableName() +
" cannot be declared transactional because it's an external table");
}
-
- return;
+ hasValidTransactionalValue = true;
}
+
Table oldTable = context.getOldTable();
String oldTransactionalValue = null;
+ String oldTransactionalPropertiesValue = null;
for (String key : oldTable.getParameters().keySet()) {
if (hive_metastoreConstants.TABLE_IS_TRANSACTIONAL.equalsIgnoreCase(key)) {
oldTransactionalValue = oldTable.getParameters().get(key);
}
+ if (hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES.equalsIgnoreCase(key)) {
+ oldTransactionalPropertiesValue = oldTable.getParameters().get(key);
+ }
}
+
+
if (oldTransactionalValue == null ? transactionalValue == null
: oldTransactionalValue.equalsIgnoreCase(transactionalValue)) {
//this covers backward compat cases where this prop may have been set already
- return;
+ hasValidTransactionalValue = true;
+ }
+
+ if (!hasValidTransactionalValue) {
+ // if here, there is attempt to set transactional to something other than 'true'
+ // and NOT the same value it was before
+ throw new MetaException("TBLPROPERTIES with 'transactional'='true' cannot be unset");
+ }
+
+ if (isTransactionalPropertiesPresent) {
+ // Now validate transactional_properties for the table.
+ if (oldTransactionalValue == null) {
+ // If this is the first time the table is being initialized to 'transactional=true',
+ // any valid value can be set for the 'transactional_properties'.
+ initializeTransactionalProperties(newTable);
+ } else {
+ // If the table was already marked as 'transactional=true', then the new value of
+ // 'transactional_properties' must match the old value. Any attempt to alter the previous
+ // value will throw an error. An exception will still be thrown if the previous value was
+ // null and an attempt is made to set it. This behaviour can be changed in the future.
+ if (oldTransactionalPropertiesValue == null
+ || !oldTransactionalPropertiesValue.equalsIgnoreCase(transactionalPropertiesValue) ) {
+ throw new MetaException("TBLPROPERTIES with 'transactional_properties' cannot be "
+ + "altered after the table is created");
+ }
+ }
}
- // if here, there is attempt to set transactional to something other than 'true'
- // and NOT the same value it was before
- throw new MetaException("TBLPROPERTIES with 'transactional'='true' cannot be unset");
}
/**
@@ -157,6 +207,7 @@ final class TransactionalValidationListener extends MetaStorePreEventListener {
// normalize prop name
parameters.put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, Boolean.TRUE.toString());
+ initializeTransactionalProperties(newTable);
return;
}
@@ -187,4 +238,53 @@ final class TransactionalValidationListener extends MetaStorePreEventListener {
return true;
}
-}
\ No newline at end of file
+
+ private void initializeTransactionalProperties(Table table) throws MetaException {
+ // All new versions of Acid tables created after the introduction of Acid version/type system
+ // can have TRANSACTIONAL_PROPERTIES property defined. This parameter can be used to change
+ // the operational behavior of ACID. However if this parameter is not defined, the new Acid
+ // tables will still behave as the old ones. This is done so to preserve the behavior
+ // in case of rolling downgrade.
+
+ // Initialize transaction table properties with default string value.
+ String tableTransactionalProperties = null;
+
+ Map<String, String> parameters = table.getParameters();
+ if (parameters != null) {
+ Set<String> keys = parameters.keySet();
+ for (String key : keys) {
+ if (hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES.equalsIgnoreCase(key)) {
+ tableTransactionalProperties = parameters.get(key).toLowerCase();
+ parameters.remove(key);
+ String validationError = validateTransactionalProperties(tableTransactionalProperties);
+ if (validationError != null) {
+ throw new MetaException("Invalid transactional properties specified for the "
+ + "table with the error " + validationError);
+ }
+ break;
+ }
+ }
+ }
+
+ if (tableTransactionalProperties != null) {
+ parameters.put(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES,
+ tableTransactionalProperties);
+ }
+ }
+
+ private String validateTransactionalProperties(String transactionalProperties) {
+ boolean isValid = false;
+ switch (transactionalProperties) {
+ case DEFAULT_TRANSACTIONAL_PROPERTY:
+ case LEGACY_TRANSACTIONAL_PROPERTY:
+ isValid = true;
+ break;
+ default:
+ isValid = false;
+ }
+ if (!isValid) {
+ return "unknown value " + transactionalProperties + " for transactional_properties";
+ }
+ return null; // All checks passed, return null.
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
index db6848a..8c7d99d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
@@ -81,6 +81,7 @@ public class FetchTask extends Task<FetchWork> implements Serializable {
HiveInputFormat.pushFilters(job, ts);
AcidUtils.setTransactionalTableScan(job, ts.getConf().isAcidTable());
+ AcidUtils.setAcidOperationalProperties(job, ts.getConf().getAcidOperationalProperties());
}
sink = work.getSink();
fetch = new FetchOperator(work, job, source, getVirtualColumns(source));
http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java
index 57b6c67..584eff4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java
@@ -212,6 +212,7 @@ public class SMBMapJoinOperator extends AbstractMapJoinOperator<SMBJoinDesc> imp
HiveInputFormat.pushFilters(jobClone, ts);
AcidUtils.setTransactionalTableScan(jobClone, ts.getConf().isAcidTable());
+ AcidUtils.setAcidOperationalProperties(jobClone, ts.getConf().getAcidOperationalProperties());
ts.passExecContext(getExecContext());
http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
index 26e6443..ac922ce 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
@@ -472,6 +472,7 @@ public class MapredLocalTask extends Task<MapredLocalWork> implements Serializab
HiveInputFormat.pushFilters(jobClone, ts);
AcidUtils.setTransactionalTableScan(jobClone, ts.getConf().isAcidTable());
+ AcidUtils.setAcidOperationalProperties(jobClone, ts.getConf().getAcidOperationalProperties());
// create a fetch operator
FetchOperator fetchOp = new FetchOperator(entry.getValue(), jobClone);
http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java
index dd90a95..b85b827 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java
@@ -18,6 +18,10 @@
package org.apache.hadoop.hive.ql.io;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.Properties;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -26,10 +30,6 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.Reporter;
-import java.io.IOException;
-import java.io.PrintStream;
-import java.util.Properties;
-
/**
* An extension for OutputFormats that want to implement ACID transactions.
* @param <V> the row type of the file
@@ -44,6 +44,7 @@ public interface AcidOutputFormat<K extends WritableComparable, V> extends HiveO
private FileSystem fs;
private ObjectInspector inspector;
private boolean writingBase = false;
+ private boolean writingDeleteDelta = false;
private boolean isCompressed = false;
private Properties properties;
private Reporter reporter;
@@ -98,6 +99,16 @@ public interface AcidOutputFormat<K extends WritableComparable, V> extends HiveO
}
/**
+ * Is this writing a delete delta directory?
+ * @param val is this a delete delta file?
+ * @return this
+ */
+ public Options writingDeleteDelta(boolean val) {
+ this.writingDeleteDelta = val;
+ return this;
+ }
+
+ /**
* Provide a file system to the writer. Otherwise, the filesystem for the
* path will be used.
* @param fs the file system that corresponds to the the path
@@ -223,7 +234,7 @@ public interface AcidOutputFormat<K extends WritableComparable, V> extends HiveO
this.finalDestination = p;
return this;
}
-
+
public Configuration getConfiguration() {
return configuration;
}
@@ -260,6 +271,10 @@ public interface AcidOutputFormat<K extends WritableComparable, V> extends HiveO
return writingBase;
}
+ public boolean isWritingDeleteDelta() {
+ return writingDeleteDelta;
+ }
+
public int getBucket() {
return bucket;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index 449d889..cda5f39 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -18,10 +18,14 @@
package org.apache.hadoop.hive.ql.io;
-import org.apache.hadoop.hive.metastore.api.DataOperationType;
-import org.apache.hadoop.hive.ql.ErrorMsg;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.regex.Pattern;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -30,22 +34,19 @@ import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.api.DataOperationType;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.metastore.TransactionalValidationListener;
+import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.shims.HadoopShims;
-import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.regex.Pattern;
-
/**
* Utilities that are shared by all of the ACID input and output formats. They
* are used by the compactor and cleaner and thus must be format agnostic.
@@ -61,6 +62,7 @@ public class AcidUtils {
}
};
public static final String DELTA_PREFIX = "delta_";
+ public static final String DELETE_DELTA_PREFIX = "delete_delta_";
public static final String DELTA_SIDE_FILE_SUFFIX = "_flush_length";
public static final PathFilter deltaFileFilter = new PathFilter() {
@Override
@@ -68,6 +70,12 @@ public class AcidUtils {
return path.getName().startsWith(DELTA_PREFIX);
}
};
+ public static final PathFilter deleteEventDeltaDirFilter = new PathFilter() {
+ @Override
+ public boolean accept(Path path) {
+ return path.getName().startsWith(DELETE_DELTA_PREFIX);
+ }
+ };
public static final String BUCKET_PREFIX = "bucket_";
public static final PathFilter bucketFileFilter = new PathFilter() {
@Override
@@ -142,6 +150,25 @@ public class AcidUtils {
return deltaSubdir(min, max) + "_" + String.format(STATEMENT_DIGITS, statementId);
}
+ /**
+ * This is format of delete delta dir name prior to Hive 2.2.x
+ */
+ @VisibleForTesting
+ static String deleteDeltaSubdir(long min, long max) {
+ return DELETE_DELTA_PREFIX + String.format(DELTA_DIGITS, min) + "_" +
+ String.format(DELTA_DIGITS, max);
+ }
+
+ /**
+ * Each write statement in a transaction creates its own delete delta dir,
+ * when split-update acid operational property is turned on.
+ * @since 2.2.x
+ */
+ @VisibleForTesting
+ static String deleteDeltaSubdir(long min, long max, int statementId) {
+ return deleteDeltaSubdir(min, max) + "_" + String.format(STATEMENT_DIGITS, statementId);
+ }
+
public static String baseDir(long txnId) {
return BASE_PREFIX + String.format(DELTA_DIGITS, txnId);
}
@@ -163,12 +190,19 @@ public class AcidUtils {
} else if(options.getStatementId() == -1) {
//when minor compaction runs, we collapse per statement delta files inside a single
//transaction so we no longer need a statementId in the file name
- subdir = deltaSubdir(options.getMinimumTransactionId(),
- options.getMaximumTransactionId());
+ subdir = options.isWritingDeleteDelta() ?
+ deleteDeltaSubdir(options.getMinimumTransactionId(),
+ options.getMaximumTransactionId())
+ : deltaSubdir(options.getMinimumTransactionId(),
+ options.getMaximumTransactionId());
} else {
- subdir = deltaSubdir(options.getMinimumTransactionId(),
- options.getMaximumTransactionId(),
- options.getStatementId());
+ subdir = options.isWritingDeleteDelta() ?
+ deleteDeltaSubdir(options.getMinimumTransactionId(),
+ options.getMaximumTransactionId(),
+ options.getStatementId())
+ : deltaSubdir(options.getMinimumTransactionId(),
+ options.getMaximumTransactionId(),
+ options.getStatementId());
}
return createBucketFile(new Path(directory, subdir), options.getBucket());
}
@@ -195,11 +229,10 @@ public class AcidUtils {
* @return the options used to create that filename
*/
public static AcidOutputFormat.Options
- parseBaseBucketFilename(Path bucketFile,
- Configuration conf) {
+ parseBaseOrDeltaBucketFilename(Path bucketFile,
+ Configuration conf) {
AcidOutputFormat.Options result = new AcidOutputFormat.Options(conf);
String filename = bucketFile.getName();
- result.writingBase(true);
if (ORIGINAL_PATTERN.matcher(filename).matches()) {
int bucket =
Integer.parseInt(filename.substring(0, filename.indexOf('_')));
@@ -207,15 +240,33 @@ public class AcidUtils {
.setOldStyle(true)
.minimumTransactionId(0)
.maximumTransactionId(0)
- .bucket(bucket);
+ .bucket(bucket)
+ .writingBase(true);
} else if (filename.startsWith(BUCKET_PREFIX)) {
int bucket =
Integer.parseInt(filename.substring(filename.indexOf('_') + 1));
- result
- .setOldStyle(false)
- .minimumTransactionId(0)
- .maximumTransactionId(parseBase(bucketFile.getParent()))
- .bucket(bucket);
+ if (bucketFile.getParent().getName().startsWith(BASE_PREFIX)) {
+ result
+ .setOldStyle(false)
+ .minimumTransactionId(0)
+ .maximumTransactionId(parseBase(bucketFile.getParent()))
+ .bucket(bucket)
+ .writingBase(true);
+ } else if (bucketFile.getParent().getName().startsWith(DELTA_PREFIX)) {
+ ParsedDelta parsedDelta = parsedDelta(bucketFile.getParent(), DELTA_PREFIX);
+ result
+ .setOldStyle(false)
+ .minimumTransactionId(parsedDelta.minTransaction)
+ .maximumTransactionId(parsedDelta.maxTransaction)
+ .bucket(bucket);
+ } else if (bucketFile.getParent().getName().startsWith(DELETE_DELTA_PREFIX)) {
+ ParsedDelta parsedDelta = parsedDelta(bucketFile.getParent(), DELETE_DELTA_PREFIX);
+ result
+ .setOldStyle(false)
+ .minimumTransactionId(parsedDelta.minTransaction)
+ .maximumTransactionId(parsedDelta.maxTransaction)
+ .bucket(bucket);
+ }
} else {
result.setOldStyle(true).bucket(-1).minimumTransactionId(0)
.maximumTransactionId(0);
@@ -248,6 +299,179 @@ public class AcidUtils {
}
}
+ public enum AcidBaseFileType {
+ COMPACTED_BASE, // a regular base file generated through major compaction
+ ORIGINAL_BASE, // a non-acid schema file for tables that got converted to acid
+ INSERT_DELTA; // a delta file with only insert events that can be treated as base for split-update
+ }
+
+ /**
+ * A simple wrapper class that stores the information about a base file and its type.
+ * Orc splits can be generated on three kinds of base files: an original file (non-acid converted
+ * files), a regular base file (created by major compaction) or an insert delta (which can be
+ * treated as a base when split-update is enabled for acid).
+ */
+ public static class AcidBaseFileInfo {
+ final private HdfsFileStatusWithId fileId;
+ final private AcidBaseFileType acidBaseFileType;
+
+ public AcidBaseFileInfo(HdfsFileStatusWithId fileId, AcidBaseFileType acidBaseFileType) {
+ this.fileId = fileId;
+ this.acidBaseFileType = acidBaseFileType;
+ }
+
+ public boolean isCompactedBase() {
+ return this.acidBaseFileType == AcidBaseFileType.COMPACTED_BASE;
+ }
+
+ public boolean isOriginal() {
+ return this.acidBaseFileType == AcidBaseFileType.ORIGINAL_BASE;
+ }
+
+ public boolean isInsertDelta() {
+ return this.acidBaseFileType == AcidBaseFileType.INSERT_DELTA;
+ }
+
+ public HdfsFileStatusWithId getHdfsFileStatusWithId() {
+ return this.fileId;
+ }
+ }
+
+ public static class AcidOperationalProperties {
+ private int description = 0x00;
+ public static final int SPLIT_UPDATE_BIT = 0x01;
+ public static final String SPLIT_UPDATE_STRING = "split_update";
+ public static final int HASH_BASED_MERGE_BIT = 0x02;
+ public static final String HASH_BASED_MERGE_STRING = "hash_merge";
+ public static final String DEFAULT_VALUE_STRING = TransactionalValidationListener.DEFAULT_TRANSACTIONAL_PROPERTY;
+ public static final String LEGACY_VALUE_STRING = TransactionalValidationListener.LEGACY_TRANSACTIONAL_PROPERTY;
+
+ private AcidOperationalProperties() {
+ }
+
+ /**
+ * Returns an acidOperationalProperties object that represents ACID behavior for legacy tables
+ * that were created before ACID type system using operational properties was put in place.
+ * @return the acidOperationalProperties object
+ */
+ public static AcidOperationalProperties getLegacy() {
+ AcidOperationalProperties obj = new AcidOperationalProperties();
+ // In legacy mode, none of these properties are turned on.
+ return obj;
+ }
+
+ /**
+ * Returns an acidOperationalProperties object that represents default ACID behavior for tables
+ * that do no explicitly specify/override the default behavior.
+ * @return the acidOperationalProperties object.
+ */
+ public static AcidOperationalProperties getDefault() {
+ AcidOperationalProperties obj = new AcidOperationalProperties();
+ obj.setSplitUpdate(true);
+ obj.setHashBasedMerge(false);
+ return obj;
+ }
+
+ /**
+ * Returns an acidOperationalProperties object that is represented by an encoded string.
+ * @param propertiesStr an encoded string representing the acidOperationalProperties.
+ * @return the acidOperationalProperties object.
+ */
+ public static AcidOperationalProperties parseString(String propertiesStr) {
+ if (propertiesStr == null) {
+ return AcidOperationalProperties.getLegacy();
+ }
+ if (propertiesStr.equalsIgnoreCase(DEFAULT_VALUE_STRING)) {
+ return AcidOperationalProperties.getDefault();
+ }
+ if (propertiesStr.equalsIgnoreCase(LEGACY_VALUE_STRING)) {
+ return AcidOperationalProperties.getLegacy();
+ }
+ AcidOperationalProperties obj = new AcidOperationalProperties();
+ String[] options = propertiesStr.split("\\|");
+ for (String option : options) {
+ if (option.trim().length() == 0) continue; // ignore empty strings
+ switch (option) {
+ case SPLIT_UPDATE_STRING:
+ obj.setSplitUpdate(true);
+ break;
+ case HASH_BASED_MERGE_STRING:
+ obj.setHashBasedMerge(true);
+ break;
+ default:
+ throw new IllegalArgumentException(
+ "Unexpected value " + option + " for ACID operational properties!");
+ }
+ }
+ return obj;
+ }
+
+ /**
+ * Returns an acidOperationalProperties object that is represented by an encoded 32-bit integer.
+ * @param properties an encoded 32-bit representing the acidOperationalProperties.
+ * @return the acidOperationalProperties object.
+ */
+ public static AcidOperationalProperties parseInt(int properties) {
+ AcidOperationalProperties obj = new AcidOperationalProperties();
+ if ((properties & SPLIT_UPDATE_BIT) > 0) {
+ obj.setSplitUpdate(true);
+ }
+ if ((properties & HASH_BASED_MERGE_BIT) > 0) {
+ obj.setHashBasedMerge(true);
+ }
+ return obj;
+ }
+
+ /**
+ * Sets the split update property for ACID operations based on the boolean argument.
+ * When split update is turned on, an update ACID event is interpreted as a combination of
+ * delete event followed by an update event.
+ * @param isSplitUpdate a boolean property that turns on split update when true.
+ * @return the acidOperationalProperties object.
+ */
+ public AcidOperationalProperties setSplitUpdate(boolean isSplitUpdate) {
+ description = (isSplitUpdate
+ ? (description | SPLIT_UPDATE_BIT) : (description & ~SPLIT_UPDATE_BIT));
+ return this;
+ }
+
+ /**
+ * Sets the hash-based merge property for ACID operations that combines delta files using
+ * GRACE hash join based approach, when turned on. (Currently unimplemented!)
+ * @param isHashBasedMerge a boolean property that turns on hash-based merge when true.
+ * @return the acidOperationalProperties object.
+ */
+ public AcidOperationalProperties setHashBasedMerge(boolean isHashBasedMerge) {
+ description = (isHashBasedMerge
+ ? (description | HASH_BASED_MERGE_BIT) : (description & ~HASH_BASED_MERGE_BIT));
+ return this;
+ }
+
+ public boolean isSplitUpdate() {
+ return (description & SPLIT_UPDATE_BIT) > 0;
+ }
+
+ public boolean isHashBasedMerge() {
+ return (description & HASH_BASED_MERGE_BIT) > 0;
+ }
+
+ public int toInt() {
+ return description;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder str = new StringBuilder();
+ if (isSplitUpdate()) {
+ str.append("|" + SPLIT_UPDATE_STRING);
+ }
+ if (isHashBasedMerge()) {
+ str.append("|" + HASH_BASED_MERGE_STRING);
+ }
+ return str.toString();
+ }
+ }
+
public static interface Directory {
/**
@@ -287,18 +511,20 @@ public class AcidUtils {
//-1 is for internal (getAcidState()) purposes and means the delta dir
//had no statement ID
private final int statementId;
+ private final boolean isDeleteDelta; // records whether delta dir is of type 'delete_delta_x_y...'
/**
* for pre 1.3.x delta files
*/
- ParsedDelta(long min, long max, FileStatus path) {
- this(min, max, path, -1);
+ ParsedDelta(long min, long max, FileStatus path, boolean isDeleteDelta) {
+ this(min, max, path, -1, isDeleteDelta);
}
- ParsedDelta(long min, long max, FileStatus path, int statementId) {
+ ParsedDelta(long min, long max, FileStatus path, int statementId, boolean isDeleteDelta) {
this.minTransaction = min;
this.maxTransaction = max;
this.path = path;
this.statementId = statementId;
+ this.isDeleteDelta = isDeleteDelta;
}
public long getMinTransaction() {
@@ -317,6 +543,10 @@ public class AcidUtils {
return statementId == -1 ? 0 : statementId;
}
+ public boolean isDeleteDelta() {
+ return isDeleteDelta;
+ }
+
/**
* Compactions (Major/Minor) merge deltas/bases but delete of old files
* happens in a different process; thus it's possible to have bases/deltas with
@@ -418,15 +648,49 @@ public class AcidUtils {
return results.toArray(new Path[results.size()]);
}
- private static ParsedDelta parseDelta(FileStatus path) {
- ParsedDelta p = parsedDelta(path.getPath());
- return new ParsedDelta(p.getMinTransaction(),
- p.getMaxTransaction(), path, p.statementId);
+ /**
+ * Convert the list of begin/end transaction id pairs to a list of delete delta
+ * directories. Note that there may be multiple delete_delta files for the exact same txn range starting
+ * with 2.2.x;
+ * see {@link org.apache.hadoop.hive.ql.io.AcidUtils#deltaSubdir(long, long, int)}
+ * @param root the root directory
+ * @param deleteDeltas list of begin/end transaction id pairs
+ * @return the list of delta paths
+ */
+ public static Path[] deserializeDeleteDeltas(Path root, final List<AcidInputFormat.DeltaMetaData> deleteDeltas) throws IOException {
+ List<Path> results = new ArrayList<Path>(deleteDeltas.size());
+ for(AcidInputFormat.DeltaMetaData dmd : deleteDeltas) {
+ if(dmd.getStmtIds().isEmpty()) {
+ results.add(new Path(root, deleteDeltaSubdir(dmd.getMinTxnId(), dmd.getMaxTxnId())));
+ continue;
+ }
+ for(Integer stmtId : dmd.getStmtIds()) {
+ results.add(new Path(root, deleteDeltaSubdir(dmd.getMinTxnId(), dmd.getMaxTxnId(), stmtId)));
+ }
+ }
+ return results.toArray(new Path[results.size()]);
}
+
public static ParsedDelta parsedDelta(Path deltaDir) {
+ String deltaDirName = deltaDir.getName();
+ if (deltaDirName.startsWith(DELETE_DELTA_PREFIX)) {
+ return parsedDelta(deltaDir, DELETE_DELTA_PREFIX);
+ }
+ return parsedDelta(deltaDir, DELTA_PREFIX); // default prefix is delta_prefix
+ }
+
+ private static ParsedDelta parseDelta(FileStatus path, String deltaPrefix) {
+ ParsedDelta p = parsedDelta(path.getPath(), deltaPrefix);
+ boolean isDeleteDelta = deltaPrefix.equals(DELETE_DELTA_PREFIX);
+ return new ParsedDelta(p.getMinTransaction(),
+ p.getMaxTransaction(), path, p.statementId, isDeleteDelta);
+ }
+
+ public static ParsedDelta parsedDelta(Path deltaDir, String deltaPrefix) {
String filename = deltaDir.getName();
- if (filename.startsWith(DELTA_PREFIX)) {
- String rest = filename.substring(DELTA_PREFIX.length());
+ boolean isDeleteDelta = deltaPrefix.equals(DELETE_DELTA_PREFIX);
+ if (filename.startsWith(deltaPrefix)) {
+ String rest = filename.substring(deltaPrefix.length());
int split = rest.indexOf('_');
int split2 = rest.indexOf('_', split + 1);//may be -1 if no statementId
long min = Long.parseLong(rest.substring(0, split));
@@ -434,13 +698,13 @@ public class AcidUtils {
Long.parseLong(rest.substring(split + 1)) :
Long.parseLong(rest.substring(split + 1, split2));
if(split2 == -1) {
- return new ParsedDelta(min, max, null);
+ return new ParsedDelta(min, max, null, isDeleteDelta);
}
int statementId = Integer.parseInt(rest.substring(split2 + 1));
- return new ParsedDelta(min, max, null, statementId);
+ return new ParsedDelta(min, max, null, statementId, isDeleteDelta);
}
throw new IllegalArgumentException(deltaDir + " does not start with " +
- DELTA_PREFIX);
+ deltaPrefix);
}
/**
@@ -456,7 +720,8 @@ public class AcidUtils {
for(FileStatus file: fs.listStatus(directory)) {
String filename = file.getPath().getName();
if (filename.startsWith(BASE_PREFIX) ||
- filename.startsWith(DELTA_PREFIX)) {
+ filename.startsWith(DELTA_PREFIX) ||
+ filename.startsWith(DELETE_DELTA_PREFIX)) {
if (file.isDir()) {
return true;
}
@@ -499,6 +764,7 @@ public class AcidUtils {
boolean ignoreEmptyFiles
) throws IOException {
FileSystem fs = directory.getFileSystem(conf);
+ // The following 'deltas' includes all kinds of delta files including insert & delete deltas.
final List<ParsedDelta> deltas = new ArrayList<ParsedDelta>();
List<ParsedDelta> working = new ArrayList<ParsedDelta>();
List<FileStatus> originalDirectories = new ArrayList<FileStatus>();
@@ -553,6 +819,7 @@ public class AcidUtils {
//subject to list of 'exceptions' in 'txnList' (not show in above example).
long current = bestBase.txn;
int lastStmtId = -1;
+ ParsedDelta prev = null;
for(ParsedDelta next: working) {
if (next.maxTransaction > current) {
// are any of the new transactions ones that we care about?
@@ -561,6 +828,7 @@ public class AcidUtils {
deltas.add(next);
current = next.maxTransaction;
lastStmtId = next.statementId;
+ prev = next;
}
}
else if(next.maxTransaction == current && lastStmtId >= 0) {
@@ -568,6 +836,24 @@ public class AcidUtils {
//generate multiple delta files with the same txnId range
//of course, if maxTransaction has already been minor compacted, all per statement deltas are obsolete
deltas.add(next);
+ prev = next;
+ }
+ else if (prev != null && next.maxTransaction == prev.maxTransaction
+ && next.minTransaction == prev.minTransaction
+ && next.statementId == prev.statementId) {
+ // The 'next' parsedDelta may have everything equal to the 'prev' parsedDelta, except
+ // the path. This may happen when we have split update and we have two types of delta
+ // directories- 'delta_x_y' and 'delete_delta_x_y' for the SAME txn range.
+
+ // Also note that any delete_deltas in between a given delta_x_y range would be made
+ // obsolete. For example, a delta_30_50 would make delete_delta_40_40 obsolete.
+ // This is valid because minor compaction always compacts the normal deltas and the delete
+ // deltas for the same range. That is, if we had 3 directories, delta_30_30,
+ // delete_delta_40_40 and delta_50_50, then running minor compaction would produce
+ // delta_30_50 and delete_delta_30_50.
+
+ deltas.add(next);
+ prev = next;
}
else {
obsolete.add(next.path);
@@ -638,7 +924,8 @@ public class AcidUtils {
}
private static void getChildState(FileStatus child, HdfsFileStatusWithId childWithId,
ValidTxnList txnList, List<ParsedDelta> working, List<FileStatus> originalDirectories,
- List<HdfsFileStatusWithId> original, List<FileStatus> obsolete, TxnBase bestBase, boolean ignoreEmptyFiles) {
+ List<HdfsFileStatusWithId> original, List<FileStatus> obsolete, TxnBase bestBase,
+ boolean ignoreEmptyFiles) throws IOException {
Path p = child.getPath();
String fn = p.getName();
if (fn.startsWith(BASE_PREFIX) && child.isDir()) {
@@ -662,8 +949,11 @@ public class AcidUtils {
} else {
obsolete.add(child);
}
- } else if (fn.startsWith(DELTA_PREFIX) && child.isDir()) {
- ParsedDelta delta = parseDelta(child);
+ } else if ((fn.startsWith(DELTA_PREFIX) || fn.startsWith(DELETE_DELTA_PREFIX))
+ && child.isDir()) {
+ String deltaPrefix =
+ (fn.startsWith(DELTA_PREFIX)) ? DELTA_PREFIX : DELETE_DELTA_PREFIX;
+ ParsedDelta delta = parseDelta(child, deltaPrefix);
if (txnList.isTxnRangeValid(delta.minTransaction,
delta.maxTransaction) !=
ValidTxnList.RangeResponse.NONE) {
@@ -791,4 +1081,84 @@ public class AcidUtils {
return tableIsTransactional != null && tableIsTransactional.equalsIgnoreCase("true");
}
+
+ /**
+ * Sets the acidOperationalProperties in the configuration object argument.
+ * @param conf Mutable configuration object
+ * @param properties An acidOperationalProperties object to initialize from.
+ */
+ public static void setAcidOperationalProperties(Configuration conf,
+ AcidOperationalProperties properties) {
+ if (properties != null) {
+ HiveConf.setIntVar(conf, ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES, properties.toInt());
+ }
+ }
+
+ /**
+ * Sets the acidOperationalProperties in the map object argument.
+ * @param parameters Mutable map object
+ * @param properties An acidOperationalProperties object to initialize from.
+ */
+ public static void setAcidOperationalProperties(
+ Map<String, String> parameters, AcidOperationalProperties properties) {
+ if (properties != null) {
+ parameters.put(ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname, properties.toString());
+ }
+ }
+
+ /**
+ * Returns the acidOperationalProperties for a given table.
+ * @param table A table object
+ * @return the acidOperationalProperties object for the corresponding table.
+ */
+ public static AcidOperationalProperties getAcidOperationalProperties(Table table) {
+ String transactionalProperties = table.getProperty(
+ hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES);
+ if (transactionalProperties == null) {
+ // If the table does not define any transactional properties, we return a legacy type.
+ return AcidOperationalProperties.getLegacy();
+ }
+ return AcidOperationalProperties.parseString(transactionalProperties);
+ }
+
+ /**
+ * Returns the acidOperationalProperties for a given configuration.
+ * @param conf A configuration object
+ * @return the acidOperationalProperties object for the corresponding configuration.
+ */
+ public static AcidOperationalProperties getAcidOperationalProperties(Configuration conf) {
+ // If the conf does not define any transactional properties, the parseInt() should receive
+ // a value of zero, which will set AcidOperationalProperties to a legacy type and return that.
+ return AcidOperationalProperties.parseInt(
+ HiveConf.getIntVar(conf, ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES));
+ }
+
+ /**
+ * Returns the acidOperationalProperties for a given set of properties.
+ * @param props A properties object
+ * @return the acidOperationalProperties object for the corresponding properties.
+ */
+ public static AcidOperationalProperties getAcidOperationalProperties(Properties props) {
+ String resultStr = props.getProperty(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES);
+ if (resultStr == null) {
+ // If the properties does not define any transactional properties, we return a legacy type.
+ return AcidOperationalProperties.getLegacy();
+ }
+ return AcidOperationalProperties.parseString(resultStr);
+ }
+
+ /**
+ * Returns the acidOperationalProperties for a given map.
+ * @param parameters A parameters object
+ * @return the acidOperationalProperties object for the corresponding map.
+ */
+ public static AcidOperationalProperties getAcidOperationalProperties(
+ Map<String, String> parameters) {
+ String resultStr = parameters.get(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES);
+ if (resultStr == null) {
+ // If the parameters does not define any transactional properties, we return a legacy type.
+ return AcidOperationalProperties.getLegacy();
+ }
+ return AcidOperationalProperties.parseString(resultStr);
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
index 945b828..c4b9940 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
@@ -637,6 +637,7 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
pushFilters(jobConf, ts);
AcidUtils.setTransactionalTableScan(job, ts.getConf().isAcidTable());
+ AcidUtils.setAcidOperationalProperties(job, ts.getConf().getAcidOperationalProperties());
}
}
}
[2/3] hive git commit: HIVE-14035 Enable predicate pushdown to delta
files created by ACID Transactions (Saket Saurabh via Eugene Koifman)
Posted by ek...@apache.org.
http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index 334cb31..6261a14 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -80,7 +80,10 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
import org.apache.hadoop.hive.ql.io.AcidInputFormat;
import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.AcidUtils.AcidBaseFileInfo;
+import org.apache.hadoop.hive.ql.io.AcidUtils.AcidOperationalProperties;
import org.apache.hadoop.hive.ql.io.AcidUtils.Directory;
+import org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDelta;
import org.apache.hadoop.hive.ql.io.BatchToRowInputFormat;
import org.apache.hadoop.hive.ql.io.BatchToRowReader;
import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
@@ -525,6 +528,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
private final ValidTxnList transactionList;
private SplitStrategyKind splitStrategyKind;
private final SearchArgument sarg;
+ private final AcidOperationalProperties acidOperationalProperties;
Context(Configuration conf) throws IOException {
this(conf, 1, null);
@@ -606,6 +610,15 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
}
String value = conf.get(ValidTxnList.VALID_TXNS_KEY);
transactionList = value == null ? new ValidReadTxnList() : new ValidReadTxnList(value);
+
+ // Determine the transactional_properties of the table from the job conf stored in context.
+ // The table properties are copied to job conf at HiveInputFormat::addSplitsForGroup(),
+ // & therefore we should be able to retrieve them here and determine appropriate behavior.
+ // Note that this will be meaningless for non-acid tables & will be set to null.
+ boolean isTableTransactional = conf.getBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, false);
+ String transactionalProperties = conf.get(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES);
+ this.acidOperationalProperties = isTableTransactional ?
+ AcidOperationalProperties.parseString(transactionalProperties) : null;
}
@VisibleForTesting
@@ -639,17 +652,20 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
@VisibleForTesting
static final class AcidDirInfo {
public AcidDirInfo(FileSystem fs, Path splitPath, Directory acidInfo,
- List<HdfsFileStatusWithId> baseOrOriginalFiles) {
+ List<AcidBaseFileInfo> baseFiles,
+ List<ParsedDelta> parsedDeltas) {
this.splitPath = splitPath;
this.acidInfo = acidInfo;
- this.baseOrOriginalFiles = baseOrOriginalFiles;
+ this.baseFiles = baseFiles;
this.fs = fs;
+ this.parsedDeltas = parsedDeltas;
}
final FileSystem fs;
final Path splitPath;
final AcidUtils.Directory acidInfo;
- final List<HdfsFileStatusWithId> baseOrOriginalFiles;
+ final List<AcidBaseFileInfo> baseFiles;
+ final List<ParsedDelta> parsedDeltas;
}
@VisibleForTesting
@@ -672,7 +688,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
SplitInfo(Context context, FileSystem fs, HdfsFileStatusWithId fileWithId, OrcTail orcTail,
List<OrcProto.Type> readerTypes, boolean isOriginal, List<DeltaMetaData> deltas,
boolean hasBase, Path dir, boolean[] covered, ByteBuffer ppdResult) throws IOException {
- super(dir, context.numBuckets, deltas, covered);
+ super(dir, context.numBuckets, deltas, covered, context.acidOperationalProperties);
this.context = context;
this.fs = fs;
this.fileWithId = fileWithId;
@@ -916,7 +932,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
public BISplitStrategy(Context context, FileSystem fs,
Path dir, List<HdfsFileStatusWithId> fileStatuses, boolean isOriginal,
List<DeltaMetaData> deltas, boolean[] covered, boolean allowSyntheticFileIds) {
- super(dir, context.numBuckets, deltas, covered);
+ super(dir, context.numBuckets, deltas, covered, context.acidOperationalProperties);
this.fileStatuses = fileStatuses;
this.isOriginal = isOriginal;
this.deltas = deltas;
@@ -964,20 +980,33 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
List<DeltaMetaData> deltas;
boolean[] covered;
int numBuckets;
+ AcidOperationalProperties acidOperationalProperties;
- public ACIDSplitStrategy(Path dir, int numBuckets, List<DeltaMetaData> deltas, boolean[] covered) {
+ public ACIDSplitStrategy(Path dir, int numBuckets, List<DeltaMetaData> deltas, boolean[] covered,
+ AcidOperationalProperties acidOperationalProperties) {
this.dir = dir;
this.numBuckets = numBuckets;
this.deltas = deltas;
this.covered = covered;
+ this.acidOperationalProperties = acidOperationalProperties;
}
@Override
public List<OrcSplit> getSplits() throws IOException {
+ List<OrcSplit> splits = Lists.newArrayList();
+
+ // When split-update is enabled, we do not need to account for buckets that aren't covered.
+ // This is a huge performance benefit of split-update. And the reason why we are able to
+ // do so is because the 'deltas' here are actually only the delete_deltas. All the insert_deltas
+ // with valid user payload data has already been considered as base for the covered buckets.
+ // Hence, the uncovered buckets do not have any relevant data and we can just ignore them.
+ if (acidOperationalProperties != null && acidOperationalProperties.isSplitUpdate()) {
+ return splits; // return an empty list.
+ }
+
// Generate a split for any buckets that weren't covered.
// This happens in the case where a bucket just has deltas and no
// base.
- List<OrcSplit> splits = Lists.newArrayList();
if (!deltas.isEmpty()) {
for (int b = 0; b < numBuckets; ++b) {
if (!covered[b]) {
@@ -1032,13 +1061,70 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
}
private AcidDirInfo callInternal() throws IOException {
- AcidUtils.Directory dirInfo = AcidUtils.getAcidState(dir,
- context.conf, context.transactionList, useFileIds, true);
+ AcidUtils.Directory dirInfo = AcidUtils.getAcidState(dir, context.conf,
+ context.transactionList, useFileIds, true);
Path base = dirInfo.getBaseDirectory();
// find the base files (original or new style)
- List<HdfsFileStatusWithId> children = (base == null)
- ? dirInfo.getOriginalFiles() : findBaseFiles(base, useFileIds);
- return new AcidDirInfo(fs, dir, dirInfo, children);
+ List<AcidBaseFileInfo> baseFiles = new ArrayList<AcidBaseFileInfo>();
+ if (base == null) {
+ for (HdfsFileStatusWithId fileId : dirInfo.getOriginalFiles()) {
+ baseFiles.add(new AcidBaseFileInfo(fileId, AcidUtils.AcidBaseFileType.ORIGINAL_BASE));
+ }
+ } else {
+ List<HdfsFileStatusWithId> compactedBaseFiles = findBaseFiles(base, useFileIds);
+ for (HdfsFileStatusWithId fileId : compactedBaseFiles) {
+ baseFiles.add(new AcidBaseFileInfo(fileId, AcidUtils.AcidBaseFileType.COMPACTED_BASE));
+ }
+ }
+
+ // Find the parsed deltas- some of them containing only the insert delta events
+ // may get treated as base if split-update is enabled for ACID. (See HIVE-14035 for details)
+ List<ParsedDelta> parsedDeltas = new ArrayList<ParsedDelta>();
+
+ if (context.acidOperationalProperties != null &&
+ context.acidOperationalProperties.isSplitUpdate()) {
+ // If we have split-update turned on for this table, then the delta events have already been
+ // split into two directories- delta_x_y/ and delete_delta_x_y/.
+ // When you have split-update turned on, the insert events go to delta_x_y/ directory and all
+ // the delete events go to delete_x_y/. An update event will generate two events-
+ // a delete event for the old record that is put into delete_delta_x_y/,
+ // followed by an insert event for the updated record put into the usual delta_x_y/.
+ // Therefore, everything inside delta_x_y/ is an insert event and all the files in delta_x_y/
+ // can be treated like base files. Hence, each of these are added to baseOrOriginalFiles list.
+
+ for (ParsedDelta parsedDelta : dirInfo.getCurrentDirectories()) {
+ if (parsedDelta.isDeleteDelta()) {
+ parsedDeltas.add(parsedDelta);
+ } else {
+ // This is a normal insert delta, which only has insert events and hence all the files
+ // in this delta directory can be considered as a base.
+ if (useFileIds) {
+ try {
+ List<HdfsFileStatusWithId> insertDeltaFiles =
+ SHIMS.listLocatedHdfsStatus(fs, parsedDelta.getPath(), AcidUtils.hiddenFileFilter);
+ for (HdfsFileStatusWithId fileId : insertDeltaFiles) {
+ baseFiles.add(new AcidBaseFileInfo(fileId, AcidUtils.AcidBaseFileType.INSERT_DELTA));
+ }
+ continue; // move on to process to the next parsedDelta.
+ } catch (Throwable t) {
+ LOG.error("Failed to get files with ID; using regular API: " + t.getMessage());
+ }
+ }
+ // Fall back to regular API and create statuses without ID.
+ List<FileStatus> children = HdfsUtils.listLocatedStatus(fs, parsedDelta.getPath(), AcidUtils.hiddenFileFilter);
+ for (FileStatus child : children) {
+ HdfsFileStatusWithId fileId = AcidUtils.createOriginalObj(null, child);
+ baseFiles.add(new AcidBaseFileInfo(fileId, AcidUtils.AcidBaseFileType.INSERT_DELTA));
+ }
+ }
+ }
+
+ } else {
+ // When split-update is not enabled, then all the deltas in the current directories
+ // should be considered as usual.
+ parsedDeltas.addAll(dirInfo.getCurrentDirectories());
+ }
+ return new AcidDirInfo(fs, dir, dirInfo, baseFiles, parsedDeltas);
}
private List<HdfsFileStatusWithId> findBaseFiles(
@@ -1526,26 +1612,32 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
continue;
}
- // We have received a new directory information, make a split strategy.
+ // We have received a new directory information, make split strategies.
--resultsLeft;
- SplitStrategy<?> splitStrategy = determineSplitStrategy(combinedCtx, context, adi.fs,
- adi.splitPath, adi.acidInfo, adi.baseOrOriginalFiles, readerTypes, ugi,
+
+ // The reason why we can get a list of split strategies here is because for ACID split-update
+ // case when we have a mix of original base files & insert deltas, we will produce two
+ // independent split strategies for them. There is a global flag 'isOriginal' that is set
+ // on a per split strategy basis and it has to be same for all the files in that strategy.
+ List<SplitStrategy<?>> splitStrategies = determineSplitStrategies(combinedCtx, context, adi.fs,
+ adi.splitPath, adi.acidInfo, adi.baseFiles, adi.parsedDeltas, readerTypes, ugi,
allowSyntheticFileIds);
- if (splitStrategy == null) continue; // Combined.
- if (isDebugEnabled) {
- LOG.debug("Split strategy: {}", splitStrategy);
- }
+ for (SplitStrategy<?> splitStrategy : splitStrategies) {
+ if (isDebugEnabled) {
+ LOG.debug("Split strategy: {}", splitStrategy);
+ }
- // Hack note - different split strategies return differently typed lists, yay Java.
- // This works purely by magic, because we know which strategy produces which type.
- if (splitStrategy instanceof ETLSplitStrategy) {
- scheduleSplits((ETLSplitStrategy)splitStrategy,
- context, splitFutures, strategyFutures, splits);
- } else {
- @SuppressWarnings("unchecked")
- List<OrcSplit> readySplits = (List<OrcSplit>)splitStrategy.getSplits();
- splits.addAll(readySplits);
+ // Hack note - different split strategies return differently typed lists, yay Java.
+ // This works purely by magic, because we know which strategy produces which type.
+ if (splitStrategy instanceof ETLSplitStrategy) {
+ scheduleSplits((ETLSplitStrategy)splitStrategy,
+ context, splitFutures, strategyFutures, splits);
+ } else {
+ @SuppressWarnings("unchecked")
+ List<OrcSplit> readySplits = (List<OrcSplit>)splitStrategy.getSplits();
+ splits.addAll(readySplits);
+ }
}
}
@@ -1763,6 +1855,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
final OrcSplit split = (OrcSplit) inputSplit;
final Path path = split.getPath();
+
Path root;
if (split.hasBase()) {
if (split.isOriginal()) {
@@ -1773,7 +1866,20 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
} else {
root = path;
}
- final Path[] deltas = AcidUtils.deserializeDeltas(root, split.getDeltas());
+
+ // Retrieve the acidOperationalProperties for the table, initialized in HiveInputFormat.
+ AcidUtils.AcidOperationalProperties acidOperationalProperties
+ = AcidUtils.getAcidOperationalProperties(options.getConfiguration());
+
+ // The deltas are decided based on whether split-update has been turned on for the table or not.
+ // When split-update is turned off, everything in the delta_x_y/ directory should be treated
+ // as delta. However if split-update is turned on, only the files in delete_delta_x_y/ directory
+ // need to be considered as delta, because files in delta_x_y/ will be processed as base files
+ // since they only have insert events in them.
+ final Path[] deltas =
+ acidOperationalProperties.isSplitUpdate() ?
+ AcidUtils.deserializeDeleteDeltas(root, split.getDeltas())
+ : AcidUtils.deserializeDeltas(root, split.getDeltas());
final Configuration conf = options.getConfiguration();
@@ -1793,7 +1899,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
setSearchArgument(readOptions, schemaTypes, conf, SCHEMA_TYPES_IS_ORIGINAL);
if (split.hasBase()) {
- bucket = AcidUtils.parseBaseBucketFilename(split.getPath(), conf)
+ bucket = AcidUtils.parseBaseOrDeltaBucketFilename(split.getPath(), conf)
.getBucket();
OrcFile.ReaderOptions readerOptions = OrcFile.readerOptions(conf)
.maxLength(split.getFileLength());
@@ -1948,23 +2054,76 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
}
@VisibleForTesting
+ static List<SplitStrategy<?>> determineSplitStrategies(CombinedCtx combinedCtx, Context context,
+ FileSystem fs, Path dir, AcidUtils.Directory dirInfo,
+ List<AcidBaseFileInfo> baseFiles,
+ List<ParsedDelta> parsedDeltas,
+ List<OrcProto.Type> readerTypes,
+ UserGroupInformation ugi, boolean allowSyntheticFileIds) {
+ List<SplitStrategy<?>> splitStrategies = new ArrayList<SplitStrategy<?>>();
+ SplitStrategy<?> splitStrategy;
+
+ // When no baseFiles, we will just generate a single split strategy and return.
+ List<HdfsFileStatusWithId> acidSchemaFiles = new ArrayList<HdfsFileStatusWithId>();
+ if (baseFiles.isEmpty()) {
+ splitStrategy = determineSplitStrategy(combinedCtx, context, fs, dir, dirInfo,
+ acidSchemaFiles, false, parsedDeltas, readerTypes, ugi, allowSyntheticFileIds);
+ if (splitStrategy != null) {
+ splitStrategies.add(splitStrategy);
+ }
+ return splitStrategies; // return here
+ }
+
+ List<HdfsFileStatusWithId> originalSchemaFiles = new ArrayList<HdfsFileStatusWithId>();
+ // Separate the base files into acid schema and non-acid(original) schema files.
+ for (AcidBaseFileInfo acidBaseFileInfo : baseFiles) {
+ if (acidBaseFileInfo.isOriginal()) {
+ originalSchemaFiles.add(acidBaseFileInfo.getHdfsFileStatusWithId());
+ } else {
+ acidSchemaFiles.add(acidBaseFileInfo.getHdfsFileStatusWithId());
+ }
+ }
+
+ // Generate split strategy for non-acid schema original files, if any.
+ if (!originalSchemaFiles.isEmpty()) {
+ splitStrategy = determineSplitStrategy(combinedCtx, context, fs, dir, dirInfo,
+ originalSchemaFiles, true, parsedDeltas, readerTypes, ugi, allowSyntheticFileIds);
+ if (splitStrategy != null) {
+ splitStrategies.add(splitStrategy);
+ }
+ }
+
+ // Generate split strategy for acid schema files, if any.
+ if (!acidSchemaFiles.isEmpty()) {
+ splitStrategy = determineSplitStrategy(combinedCtx, context, fs, dir, dirInfo,
+ acidSchemaFiles, false, parsedDeltas, readerTypes, ugi, allowSyntheticFileIds);
+ if (splitStrategy != null) {
+ splitStrategies.add(splitStrategy);
+ }
+ }
+
+ return splitStrategies;
+ }
+
+ @VisibleForTesting
static SplitStrategy<?> determineSplitStrategy(CombinedCtx combinedCtx, Context context,
FileSystem fs, Path dir, AcidUtils.Directory dirInfo,
- List<HdfsFileStatusWithId> baseOrOriginalFiles, List<OrcProto.Type> readerTypes,
+ List<HdfsFileStatusWithId> baseFiles,
+ boolean isOriginal,
+ List<ParsedDelta> parsedDeltas,
+ List<OrcProto.Type> readerTypes,
UserGroupInformation ugi, boolean allowSyntheticFileIds) {
- Path base = dirInfo.getBaseDirectory();
- List<HdfsFileStatusWithId> original = dirInfo.getOriginalFiles();
- List<DeltaMetaData> deltas = AcidUtils.serializeDeltas(dirInfo.getCurrentDirectories());
+ List<DeltaMetaData> deltas = AcidUtils.serializeDeltas(parsedDeltas);
boolean[] covered = new boolean[context.numBuckets];
- boolean isOriginal = base == null;
// if we have a base to work from
- if (base != null || !original.isEmpty()) {
+ if (!baseFiles.isEmpty()) {
long totalFileSize = 0;
- for (HdfsFileStatusWithId child : baseOrOriginalFiles) {
+ for (HdfsFileStatusWithId child : baseFiles) {
totalFileSize += child.getFileStatus().getLen();
- AcidOutputFormat.Options opts = AcidUtils.parseBaseBucketFilename
+ AcidOutputFormat.Options opts = AcidUtils.parseBaseOrDeltaBucketFilename
(child.getFileStatus().getPath(), context.conf);
+ opts.writingBase(true);
int b = opts.getBucket();
// If the bucket is in the valid range, mark it as covered.
// I wish Hive actually enforced bucketing all of the time.
@@ -1973,31 +2132,32 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
}
}
- int numFiles = baseOrOriginalFiles.size();
+ int numFiles = baseFiles.size();
long avgFileSize = totalFileSize / numFiles;
int totalFiles = context.numFilesCounter.addAndGet(numFiles);
switch(context.splitStrategyKind) {
case BI:
// BI strategy requested through config
- return new BISplitStrategy(context, fs, dir, baseOrOriginalFiles,
+ return new BISplitStrategy(context, fs, dir, baseFiles,
isOriginal, deltas, covered, allowSyntheticFileIds);
case ETL:
// ETL strategy requested through config
- return combineOrCreateETLStrategy(combinedCtx, context, fs, dir, baseOrOriginalFiles,
+ return combineOrCreateETLStrategy(combinedCtx, context, fs, dir, baseFiles,
deltas, covered, readerTypes, isOriginal, ugi, allowSyntheticFileIds);
default:
// HYBRID strategy
if (avgFileSize > context.maxSize || totalFiles <= context.etlFileThreshold) {
- return combineOrCreateETLStrategy(combinedCtx, context, fs, dir, baseOrOriginalFiles,
+ return combineOrCreateETLStrategy(combinedCtx, context, fs, dir, baseFiles,
deltas, covered, readerTypes, isOriginal, ugi, allowSyntheticFileIds);
} else {
- return new BISplitStrategy(context, fs, dir, baseOrOriginalFiles,
+ return new BISplitStrategy(context, fs, dir, baseFiles,
isOriginal, deltas, covered, allowSyntheticFileIds);
}
}
} else {
// no base, only deltas
- return new ACIDSplitStrategy(dir, context.numBuckets, deltas, covered);
+ return new ACIDSplitStrategy(dir, context.numBuckets, deltas, covered,
+ context.acidOperationalProperties);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
index 1a1af28..492c64c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
@@ -25,11 +25,6 @@ import java.nio.charset.CharsetDecoder;
import java.util.ArrayList;
import java.util.List;
-import org.apache.orc.impl.AcidStats;
-import org.apache.orc.impl.OrcAcidUtils;
-import org.apache.orc.OrcConf;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -42,10 +37,16 @@ import org.apache.hadoop.hive.serde2.SerDeStats;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
+import org.apache.orc.OrcConf;
+import org.apache.orc.impl.AcidStats;
+import org.apache.orc.impl.OrcAcidUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
@@ -79,9 +80,13 @@ public class OrcRecordUpdater implements RecordUpdater {
private static final Charset UTF8 = Charset.forName("UTF-8");
private final AcidOutputFormat.Options options;
+ private final AcidUtils.AcidOperationalProperties acidOperationalProperties;
private final Path path;
+ private Path deleteEventPath;
private final FileSystem fs;
+ private OrcFile.WriterOptions writerOptions;
private Writer writer;
+ private Writer deleteEventWriter = null;
private final FSDataOutputStream flushLengths;
private final OrcStruct item;
private final IntWritable operation = new IntWritable();
@@ -95,9 +100,11 @@ public class OrcRecordUpdater implements RecordUpdater {
// because that is monotonically increasing to give new unique row ids.
private long rowCountDelta = 0;
private final KeyIndexBuilder indexBuilder = new KeyIndexBuilder();
+ private KeyIndexBuilder deleteEventIndexBuilder;
private StructField recIdField = null; // field to look for the record identifier in
private StructField rowIdField = null; // field inside recId to look for row id in
private StructField originalTxnField = null; // field inside recId to look for original txn in
+ private StructField bucketField = null; // field inside recId to look for bucket in
private StructObjectInspector rowInspector; // OI for the original row
private StructObjectInspector recIdInspector; // OI for the record identifier struct
private LongObjectInspector rowIdInspector; // OI for the long row id inside the recordIdentifier
@@ -180,8 +187,22 @@ public class OrcRecordUpdater implements RecordUpdater {
OrcRecordUpdater(Path path,
AcidOutputFormat.Options options) throws IOException {
this.options = options;
+ // Initialize acidOperationalProperties based on table properties, and
+ // if they are not available, see if we can find it in the job configuration.
+ // We have to look at these two places instead of just the conf, because Streaming Ingest
+ // uses table properties, while normal Hive SQL inserts/updates/deletes will place this
+ // value in the configuration object.
+ if (options.getTableProperties() != null) {
+ this.acidOperationalProperties =
+ AcidUtils.getAcidOperationalProperties(options.getTableProperties());
+ } else {
+ this.acidOperationalProperties =
+ AcidUtils.getAcidOperationalProperties(options.getConfiguration());
+ }
this.bucket.set(options.getBucket());
this.path = AcidUtils.createFilename(path, options);
+ this.deleteEventWriter = null;
+ this.deleteEventPath = null;
FileSystem fs = options.getFilesystem();
if (fs == null) {
fs = path.getFileSystem(options.getConfiguration());
@@ -205,7 +226,7 @@ public class OrcRecordUpdater implements RecordUpdater {
} else {
flushLengths = null;
}
- OrcFile.WriterOptions writerOptions = null;
+ this.writerOptions = null;
// If writing delta dirs, we need to make a clone of original options, to avoid polluting it for
// the base writer
if (options.isWritingBase()) {
@@ -242,6 +263,13 @@ public class OrcRecordUpdater implements RecordUpdater {
writerOptions.inspector(createEventSchema(findRecId(options.getInspector(),
options.getRecordIdColumn())));
this.writer = OrcFile.createWriter(this.path, writerOptions);
+ if (this.acidOperationalProperties.isSplitUpdate()) {
+ // If this is a split-update, we initialize a delete delta file path in anticipation that
+ // they would write update/delete events to that separate file.
+ // This writes to a file in directory which starts with "delete_delta_..."
+ // The actual initialization of a writer only happens if any delete events are written.
+ this.deleteEventPath = AcidUtils.createFilename(path, options.writingDeleteDelta(true));
+ }
item = new OrcStruct(FIELDS);
item.setFieldValue(OPERATION, operation);
item.setFieldValue(CURRENT_TRANSACTION, currentTransaction);
@@ -250,6 +278,7 @@ public class OrcRecordUpdater implements RecordUpdater {
item.setFieldValue(ROW_ID, rowId);
}
+ @Override
public String toString() {
return getClass().getName() + "[" + path +"]";
}
@@ -264,14 +293,16 @@ public class OrcRecordUpdater implements RecordUpdater {
* 1. need to know bucket we are writing to
* 2. need to know which delta dir it's in
* Then,
- * 1. find the same bucket file in previous delta dir for this txn
+ * 1. find the same bucket file in previous (insert) delta dir for this txn
+ * (Note: in case of split_update, we can ignore the delete_delta dirs)
* 2. read the footer and get AcidStats which has insert count
- * 2.1 if AcidStats.inserts>0 done
+ * 2.1 if AcidStats.inserts>0 add to the insert count.
* else go to previous delta file
* For example, consider insert/update/insert case...*/
if(options.getStatementId() <= 0) {
return 0;//there is only 1 statement in this transaction (so far)
}
+ long totalInserts = 0;
for(int pastStmt = options.getStatementId() - 1; pastStmt >= 0; pastStmt--) {
Path matchingBucket = AcidUtils.createFilename(options.getFinalDestination(), options.clone().statementId(pastStmt));
if(!fs.exists(matchingBucket)) {
@@ -281,12 +312,10 @@ public class OrcRecordUpdater implements RecordUpdater {
//no close() on Reader?!
AcidStats acidStats = OrcAcidUtils.parseAcidStats(reader);
if(acidStats.inserts > 0) {
- return acidStats.inserts;
+ totalInserts += acidStats.inserts;
}
}
- //if we got here, we looked at all delta files in this txn, prior to current statement and didn't
- //find any inserts...
- return 0;
+ return totalInserts;
}
// Find the record identifier column (if there) and return a possibly new ObjectInspector that
// will strain out the record id for the underlying writer.
@@ -307,6 +336,7 @@ public class OrcRecordUpdater implements RecordUpdater {
// in RecordIdentifier is transactionId, bucketId, rowId
originalTxnField = fields.get(0);
origTxnInspector = (LongObjectInspector)originalTxnField.getFieldObjectInspector();
+ bucketField = fields.get(1);
rowIdField = fields.get(2);
rowIdInspector = (LongObjectInspector)rowIdField.getFieldObjectInspector();
@@ -316,7 +346,7 @@ public class OrcRecordUpdater implements RecordUpdater {
}
}
- private void addEvent(int operation, long currentTransaction, long rowId, Object row)
+ private void addSimpleEvent(int operation, long currentTransaction, long rowId, Object row)
throws IOException {
this.operation.set(operation);
this.currentTransaction.set(currentTransaction);
@@ -334,11 +364,60 @@ public class OrcRecordUpdater implements RecordUpdater {
}
this.rowId.set(rowId);
this.originalTransaction.set(originalTransaction);
+ item.setFieldValue(OrcRecordUpdater.OPERATION, new IntWritable(operation));
item.setFieldValue(OrcRecordUpdater.ROW, (operation == DELETE_OPERATION ? null : row));
indexBuilder.addKey(operation, originalTransaction, bucket.get(), rowId);
writer.addRow(item);
}
+ private void addSplitUpdateEvent(int operation, long currentTransaction, long rowId, Object row)
+ throws IOException {
+ if (operation == INSERT_OPERATION) {
+ // Just insert the record in the usual way, i.e., default to the simple behavior.
+ addSimpleEvent(operation, currentTransaction, rowId, row);
+ return;
+ }
+ this.operation.set(operation);
+ this.currentTransaction.set(currentTransaction);
+ Object rowValue = rowInspector.getStructFieldData(row, recIdField);
+ long originalTransaction = origTxnInspector.get(
+ recIdInspector.getStructFieldData(rowValue, originalTxnField));
+ rowId = rowIdInspector.get(
+ recIdInspector.getStructFieldData(rowValue, rowIdField));
+
+ if (operation == DELETE_OPERATION || operation == UPDATE_OPERATION) {
+ // Initialize a deleteEventWriter if not yet done. (Lazy initialization)
+ if (deleteEventWriter == null) {
+ // Initialize an indexBuilder for deleteEvents.
+ deleteEventIndexBuilder = new KeyIndexBuilder();
+ // Change the indexBuilder callback too for the deleteEvent file, the remaining writer
+ // options remain the same.
+
+ // TODO: When we change the callback, we are essentially mutating the writerOptions.
+ // This works but perhaps is not a good thing. The proper way to do this would be
+ // to clone the writerOptions, however it requires that the parent OrcFile.writerOptions
+ // implements a clone() method (which it does not for now). HIVE-14514 is currently an open
+ // JIRA to fix this.
+
+ this.deleteEventWriter = OrcFile.createWriter(deleteEventPath,
+ writerOptions.callback(deleteEventIndexBuilder));
+ }
+
+ // A delete/update generates a delete event for the original row.
+ this.rowId.set(rowId);
+ this.originalTransaction.set(originalTransaction);
+ item.setFieldValue(OrcRecordUpdater.OPERATION, new IntWritable(DELETE_OPERATION));
+ item.setFieldValue(OrcRecordUpdater.ROW, null); // ROW is null for delete events.
+ deleteEventIndexBuilder.addKey(DELETE_OPERATION, originalTransaction, bucket.get(), rowId);
+ deleteEventWriter.addRow(item);
+ }
+
+ if (operation == UPDATE_OPERATION) {
+ // A new row is also inserted in the usual delta file for an update event.
+ addSimpleEvent(INSERT_OPERATION, currentTransaction, insertedRows++, row);
+ }
+ }
+
@Override
public void insert(long currentTransaction, Object row) throws IOException {
if (this.currentTransaction.get() != currentTransaction) {
@@ -347,7 +426,11 @@ public class OrcRecordUpdater implements RecordUpdater {
//always true in that case
rowIdOffset = findRowIdOffsetForInsert();
}
- addEvent(INSERT_OPERATION, currentTransaction, insertedRows++, row);
+ if (acidOperationalProperties.isSplitUpdate()) {
+ addSplitUpdateEvent(INSERT_OPERATION, currentTransaction, insertedRows++, row);
+ } else {
+ addSimpleEvent(INSERT_OPERATION, currentTransaction, insertedRows++, row);
+ }
rowCountDelta++;
}
@@ -355,8 +438,13 @@ public class OrcRecordUpdater implements RecordUpdater {
public void update(long currentTransaction, Object row) throws IOException {
if (this.currentTransaction.get() != currentTransaction) {
insertedRows = 0;
+ rowIdOffset = findRowIdOffsetForInsert();
+ }
+ if (acidOperationalProperties.isSplitUpdate()) {
+ addSplitUpdateEvent(UPDATE_OPERATION, currentTransaction, -1L, row);
+ } else {
+ addSimpleEvent(UPDATE_OPERATION, currentTransaction, -1L, row);
}
- addEvent(UPDATE_OPERATION, currentTransaction, -1L, row);
}
@Override
@@ -364,9 +452,12 @@ public class OrcRecordUpdater implements RecordUpdater {
if (this.currentTransaction.get() != currentTransaction) {
insertedRows = 0;
}
- addEvent(DELETE_OPERATION, currentTransaction, -1, row);
+ if (acidOperationalProperties.isSplitUpdate()) {
+ addSplitUpdateEvent(DELETE_OPERATION, currentTransaction, -1L, row);
+ } else {
+ addSimpleEvent(DELETE_OPERATION, currentTransaction, -1L, row);
+ }
rowCountDelta--;
-
}
@Override
@@ -390,13 +481,38 @@ public class OrcRecordUpdater implements RecordUpdater {
fs.delete(path, false);
}
} else {
- if (writer != null) writer.close();
+ if (writer != null) {
+ if (acidOperationalProperties.isSplitUpdate()) {
+ // When split-update is enabled, we can choose not to write
+ // any delta files when there are no inserts. In such cases only the delete_deltas
+ // would be written & they are closed separately below.
+ if (indexBuilder.acidStats.inserts > 0) {
+ writer.close(); // normal close, when there are inserts.
+ } else {
+ // Just remove insert delta paths, when there are no insert events.
+ fs.delete(path, false);
+ }
+ } else {
+ writer.close(); // normal close.
+ }
+ }
+ if (deleteEventWriter != null) {
+ if (deleteEventIndexBuilder.acidStats.deletes > 0) {
+ // Only need to write out & close the delete_delta if there have been any.
+ deleteEventWriter.close();
+ } else {
+ // Just remove delete_delta, if there have been no delete events.
+ fs.delete(deleteEventPath, false);
+ }
+ }
+
}
if (flushLengths != null) {
flushLengths.close();
fs.delete(OrcAcidUtils.getSideFile(path), false);
}
writer = null;
+ deleteEventWriter = null;
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 8cb5e8a..5f53aef 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -61,6 +61,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.common.HiveStatsUtils;
import org.apache.hadoop.hive.common.ObjectPair;
@@ -3167,10 +3168,11 @@ private void constructOneLBLocationMap(FileStatus fSta,
private static void moveAcidFiles(FileSystem fs, FileStatus[] stats, Path dst,
List<Path> newFiles) throws HiveException {
- // The layout for ACID files is table|partname/base|delta/bucket
+ // The layout for ACID files is table|partname/base|delta|delete_delta/bucket
// We will always only be writing delta files. In the buckets created by FileSinkOperator
- // it will look like bucket/delta/bucket. So we need to move that into the above structure.
- // For the first mover there will be no delta directory, so we can move the whole directory.
+ // it will look like bucket/delta|delete_delta/bucket. So we need to move that into
+ // the above structure. For the first mover there will be no delta directory,
+ // so we can move the whole directory.
// For everyone else we will need to just move the buckets under the existing delta
// directory.
@@ -3193,49 +3195,58 @@ private void constructOneLBLocationMap(FileStatus fSta,
for (FileStatus origBucketStat : origBucketStats) {
Path origBucketPath = origBucketStat.getPath();
- LOG.debug("Acid move looking for delta files in bucket " + origBucketPath);
+ moveAcidDeltaFiles(AcidUtils.DELTA_PREFIX, AcidUtils.deltaFileFilter,
+ fs, dst, origBucketPath, createdDeltaDirs, newFiles);
+ moveAcidDeltaFiles(AcidUtils.DELETE_DELTA_PREFIX, AcidUtils.deleteEventDeltaDirFilter,
+ fs, dst,origBucketPath, createdDeltaDirs, newFiles);
+ }
+ }
+ }
- FileStatus[] deltaStats = null;
- try {
- deltaStats = fs.listStatus(origBucketPath, AcidUtils.deltaFileFilter);
- } catch (IOException e) {
- throw new HiveException("Unable to look for delta files in original bucket " +
- origBucketPath.toUri().toString(), e);
- }
- LOG.debug("Acid move found " + deltaStats.length + " delta files");
-
- for (FileStatus deltaStat : deltaStats) {
- Path deltaPath = deltaStat.getPath();
- // Create the delta directory. Don't worry if it already exists,
- // as that likely means another task got to it first. Then move each of the buckets.
- // it would be more efficient to try to move the delta with it's buckets but that is
- // harder to make race condition proof.
- Path deltaDest = new Path(dst, deltaPath.getName());
+ private static void moveAcidDeltaFiles(String deltaFileType, PathFilter pathFilter, FileSystem fs,
+ Path dst, Path origBucketPath, Set<Path> createdDeltaDirs,
+ List<Path> newFiles) throws HiveException {
+ LOG.debug("Acid move looking for " + deltaFileType + " files in bucket " + origBucketPath);
+
+ FileStatus[] deltaStats = null;
+ try {
+ deltaStats = fs.listStatus(origBucketPath, pathFilter);
+ } catch (IOException e) {
+ throw new HiveException("Unable to look for " + deltaFileType + " files in original bucket " +
+ origBucketPath.toUri().toString(), e);
+ }
+ LOG.debug("Acid move found " + deltaStats.length + " " + deltaFileType + " files");
+
+ for (FileStatus deltaStat : deltaStats) {
+ Path deltaPath = deltaStat.getPath();
+ // Create the delta directory. Don't worry if it already exists,
+ // as that likely means another task got to it first. Then move each of the buckets.
+ // it would be more efficient to try to move the delta with it's buckets but that is
+ // harder to make race condition proof.
+ Path deltaDest = new Path(dst, deltaPath.getName());
+ try {
+ if (!createdDeltaDirs.contains(deltaDest)) {
try {
- if (!createdDeltaDirs.contains(deltaDest)) {
- try {
- fs.mkdirs(deltaDest);
- createdDeltaDirs.add(deltaDest);
- } catch (IOException swallowIt) {
- // Don't worry about this, as it likely just means it's already been created.
- LOG.info("Unable to create delta directory " + deltaDest +
- ", assuming it already exists: " + swallowIt.getMessage());
- }
- }
- FileStatus[] bucketStats = fs.listStatus(deltaPath, AcidUtils.bucketFileFilter);
- LOG.debug("Acid move found " + bucketStats.length + " bucket files");
- for (FileStatus bucketStat : bucketStats) {
- Path bucketSrc = bucketStat.getPath();
- Path bucketDest = new Path(deltaDest, bucketSrc.getName());
- LOG.info("Moving bucket " + bucketSrc.toUri().toString() + " to " +
- bucketDest.toUri().toString());
- fs.rename(bucketSrc, bucketDest);
- if (newFiles != null) newFiles.add(bucketDest);
- }
- } catch (IOException e) {
- throw new HiveException("Error moving acid files " + e.getMessage(), e);
+ fs.mkdirs(deltaDest);
+ createdDeltaDirs.add(deltaDest);
+ } catch (IOException swallowIt) {
+ // Don't worry about this, as it likely just means it's already been created.
+ LOG.info("Unable to create " + deltaFileType + " directory " + deltaDest +
+ ", assuming it already exists: " + swallowIt.getMessage());
}
}
+ FileStatus[] bucketStats = fs.listStatus(deltaPath, AcidUtils.bucketFileFilter);
+ LOG.debug("Acid move found " + bucketStats.length + " bucket files");
+ for (FileStatus bucketStat : bucketStats) {
+ Path bucketSrc = bucketStat.getPath();
+ Path bucketDest = new Path(deltaDest, bucketSrc.getName());
+ LOG.info("Moving bucket " + bucketSrc.toUri().toString() + " to " +
+ bucketDest.toUri().toString());
+ fs.rename(bucketSrc, bucketDest);
+ if (newFiles != null) newFiles.add(bucketDest);
+ }
+ } catch (IOException e) {
+ throw new HiveException("Error moving acid files " + e.getMessage(), e);
}
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
index 8cf261d..47c65bb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
@@ -101,6 +101,8 @@ public class TableScanDesc extends AbstractOperatorDesc {
private boolean isAcidTable;
+ private AcidUtils.AcidOperationalProperties acidOperationalProperties = null;
+
private transient TableSample tableSample;
private transient Table tableMetadata;
@@ -127,6 +129,9 @@ public class TableScanDesc extends AbstractOperatorDesc {
this.virtualCols = vcs;
this.tableMetadata = tblMetadata;
isAcidTable = AcidUtils.isAcidTable(this.tableMetadata);
+ if (isAcidTable) {
+ acidOperationalProperties = AcidUtils.getAcidOperationalProperties(this.tableMetadata);
+ }
}
@Override
@@ -159,6 +164,10 @@ public class TableScanDesc extends AbstractOperatorDesc {
return isAcidTable;
}
+ public AcidUtils.AcidOperationalProperties getAcidOperationalProperties() {
+ return acidOperationalProperties;
+ }
+
@Explain(displayName = "Output", explainLevels = { Level.USER })
public List<String> getOutputColumnNames() {
return this.neededColumns;
http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index 6caca98..c3e3982 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -17,9 +17,16 @@
*/
package org.apache.hadoop.hive.ql.txn.compactor;
-import org.apache.hadoop.hive.common.ValidCompactorTxnList;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.regex.Matcher;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
@@ -27,6 +34,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.common.StringableMap;
+import org.apache.hadoop.hive.common.ValidCompactorTxnList;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.CompactionType;
@@ -61,12 +69,8 @@ import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TaskAttemptContext;
import org.apache.hadoop.mapred.lib.NullOutputFormat;
import org.apache.hadoop.util.StringUtils;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.*;
-import java.util.regex.Matcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Class to do compactions via an MR job. This has to be in the ql package rather than metastore
@@ -129,7 +133,7 @@ public class CompactorMR {
job.setInt(NUM_BUCKETS, sd.getNumBuckets());
job.set(ValidTxnList.VALID_TXNS_KEY, txns.toString());
overrideMRProps(job, t.getParameters()); // override MR properties from tblproperties if applicable
- if (ci.properties != null) { // override MR properties and general tblproperties if applicable
+ if (ci.properties != null) {
overrideTblProps(job, t.getParameters(), ci.properties);
}
setColumnTypes(job, sd.getCols());
@@ -137,6 +141,11 @@ public class CompactorMR {
//to generate the target dir in the Map task, there is no easy way to pass it to OutputCommitter
//to do the final move
job.setBoolean("mapreduce.map.speculative", false);
+
+ // Set appropriate Acid readers/writers based on the table properties.
+ AcidUtils.setAcidOperationalProperties(job,
+ AcidUtils.getAcidOperationalProperties(t.getParameters()));
+
return job;
}
@@ -501,12 +510,18 @@ public class CompactorMR {
Map<Integer, BucketTracker> splitToBucketMap = new HashMap<Integer, BucketTracker>();
for (Path dir : dirsToSearch) {
FileSystem fs = dir.getFileSystem(entries);
+ // When we have split-update and there are two kinds of delta directories-
+ // the delta_x_y/ directory one which has only insert events and
+ // the delete_delta_x_y/ directory which has only the delete events.
+ // The clever thing about this kind of splitting is that everything in the delta_x_y/
+ // directory can be processed as base files. However, this is left out currently
+ // as an improvement for the future.
- // If this is a base or delta directory, then we need to be looking for the bucket files.
- // But if it's a legacy file then we need to add it directly.
if (dir.getName().startsWith(AcidUtils.BASE_PREFIX) ||
- dir.getName().startsWith(AcidUtils.DELTA_PREFIX)) {
+ dir.getName().startsWith(AcidUtils.DELTA_PREFIX) ||
+ dir.getName().startsWith(AcidUtils.DELETE_DELTA_PREFIX)) {
boolean sawBase = dir.getName().startsWith(AcidUtils.BASE_PREFIX);
+
FileStatus[] files = fs.listStatus(dir, AcidUtils.bucketFileFilter);
for(FileStatus f : files) {
// For each file, figure out which bucket it is.
@@ -519,6 +534,8 @@ public class CompactorMR {
addFileToMap(matcher, dir, true, splitToBucketMap);
}
}
+
+
List<InputSplit> splits = new ArrayList<InputSplit>(splitToBucketMap.size());
for (Map.Entry<Integer, BucketTracker> e : splitToBucketMap.entrySet()) {
BucketTracker bt = e.getValue();
@@ -613,7 +630,8 @@ public class CompactorMR {
implements Mapper<WritableComparable, CompactorInputSplit, NullWritable, NullWritable> {
JobConf jobConf;
- RecordWriter writer;
+ RecordWriter writer = null;
+ RecordWriter deleteEventWriter = null;
@Override
public void map(WritableComparable key, CompactorInputSplit split,
@@ -636,10 +654,30 @@ public class CompactorMR {
RecordIdentifier identifier = reader.createKey();
V value = reader.createValue();
getWriter(reporter, reader.getObjectInspector(), split.getBucket());
+
+ AcidUtils.AcidOperationalProperties acidOperationalProperties
+ = AcidUtils.getAcidOperationalProperties(jobConf);
+
+ if (!isMajor && acidOperationalProperties.isSplitUpdate()) {
+ // When split-update is enabled for ACID, we initialize a separate deleteEventWriter
+ // that is used to write all the delete events (in case of minor compaction only). For major
+ // compaction, history is not required to be maintained hence the delete events are processed
+ // but not re-written separately.
+ getDeleteEventWriter(reporter, reader.getObjectInspector(), split.getBucket());
+ }
+
while (reader.next(identifier, value)) {
- if (isMajor && reader.isDelete(value)) continue;
- writer.write(value);
- reporter.progress();
+ boolean sawDeleteRecord = reader.isDelete(value);
+ if (isMajor && sawDeleteRecord) continue;
+ if (sawDeleteRecord && deleteEventWriter != null) {
+ // When minor compacting, write delete events to a separate file when split-update is
+ // turned on.
+ deleteEventWriter.write(value);
+ reporter.progress();
+ } else {
+ writer.write(value);
+ reporter.progress();
+ }
}
}
@@ -653,6 +691,9 @@ public class CompactorMR {
if (writer != null) {
writer.close(false);
}
+ if (deleteEventWriter != null) {
+ deleteEventWriter.close(false);
+ }
}
private void getWriter(Reporter reporter, ObjectInspector inspector,
@@ -679,6 +720,30 @@ public class CompactorMR {
}
}
+ private void getDeleteEventWriter(Reporter reporter, ObjectInspector inspector,
+ int bucket) throws IOException {
+ if (deleteEventWriter == null) {
+ AcidOutputFormat.Options options = new AcidOutputFormat.Options(jobConf);
+ options.inspector(inspector)
+ .writingBase(false)
+ .writingDeleteDelta(true) // this is the option which will make it a delete writer
+ .isCompressed(jobConf.getBoolean(IS_COMPRESSED, false))
+ .tableProperties(new StringableMap(jobConf.get(TABLE_PROPS)).toProperties())
+ .reporter(reporter)
+ .minimumTransactionId(jobConf.getLong(MIN_TXN, Long.MAX_VALUE))
+ .maximumTransactionId(jobConf.getLong(MAX_TXN, Long.MIN_VALUE))
+ .bucket(bucket)
+ .statementId(-1);//setting statementId == -1 makes compacted delta files use
+ //delta_xxxx_yyyy format
+
+ // Instantiate the underlying output format
+ @SuppressWarnings("unchecked")//since there is no way to parametrize instance of Class
+ AcidOutputFormat<WritableComparable, V> aof =
+ instantiate(AcidOutputFormat.class, jobConf.get(OUTPUT_FORMAT_CLASS_NAME));
+
+ deleteEventWriter = aof.getRawRecordWriter(new Path(jobConf.get(TMP_LOCATION)), options);
+ }
+ }
}
static class StringableList extends ArrayList<Path> {
http://git-wip-us.apache.org/repos/asf/hive/blob/ecab0d07/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index af192fb..08ca9d5 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -70,18 +70,19 @@ import org.junit.rules.TestName;
* specifically the tests; the supporting code here is just a clone of TestTxnCommands
*/
public class TestTxnCommands2 {
- private static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") +
+ protected static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") +
File.separator + TestTxnCommands2.class.getCanonicalName()
+ "-" + System.currentTimeMillis()
).getPath().replaceAll("\\\\", "/");
- private static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse";
+ protected static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse";
//bucket count for test tables; set it to 1 for easier debugging
- private static int BUCKET_COUNT = 2;
+ protected static int BUCKET_COUNT = 2;
@Rule
public TestName testName = new TestName();
- private HiveConf hiveConf;
- private Driver d;
- private static enum Table {
+
+ protected HiveConf hiveConf;
+ protected Driver d;
+ protected static enum Table {
ACIDTBL("acidTbl"),
ACIDTBLPART("acidTblPart"),
NONACIDORCTBL("nonAcidOrcTbl"),
@@ -99,6 +100,10 @@ public class TestTxnCommands2 {
@Before
public void setUp() throws Exception {
+ setUpWithTableProperties("'transactional'='true'");
+ }
+
+ protected void setUpWithTableProperties(String tableProperties) throws Exception {
tearDown();
hiveConf = new HiveConf(this.getClass());
hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
@@ -122,12 +127,13 @@ public class TestTxnCommands2 {
SessionState.start(new SessionState(hiveConf));
d = new Driver(hiveConf);
dropTables();
- runStatementOnDriver("create table " + Table.ACIDTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')");
- runStatementOnDriver("create table " + Table.ACIDTBLPART + "(a int, b int) partitioned by (p string) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+ runStatementOnDriver("create table " + Table.ACIDTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES (" + tableProperties + ")");
+ runStatementOnDriver("create table " + Table.ACIDTBLPART + "(a int, b int) partitioned by (p string) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES (" + tableProperties + ")");
runStatementOnDriver("create table " + Table.NONACIDORCTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='false')");
runStatementOnDriver("create table " + Table.NONACIDPART + "(a int, b int) partitioned by (p string) stored as orc TBLPROPERTIES ('transactional'='false')");
}
- private void dropTables() throws Exception {
+
+ protected void dropTables() throws Exception {
for(Table t : Table.values()) {
runStatementOnDriver("drop table if exists " + t);
}
@@ -731,6 +737,8 @@ public class TestTxnCommands2 {
Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
}
+
+
@Test
public void testValidTxnsBookkeeping() throws Exception {
// 1. Run a query against a non-ACID table, and we shouldn't have txn logged in conf
@@ -859,11 +867,15 @@ public class TestTxnCommands2 {
*/
@Test
public void testInitiatorWithMultipleFailedCompactions() throws Exception {
+ testInitiatorWithMultipleFailedCompactionsForVariousTblProperties("'transactional'='true'");
+ }
+
+ void testInitiatorWithMultipleFailedCompactionsForVariousTblProperties(String tblProperties) throws Exception {
String tblName = "hive12353";
runStatementOnDriver("drop table if exists " + tblName);
runStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
" CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed
- " STORED AS ORC TBLPROPERTIES ('transactional'='true')");
+ " STORED AS ORC TBLPROPERTIES ( " + tblProperties + " )");
hiveConf.setIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD, 4);
for(int i = 0; i < 5; i++) {
//generate enough delta files so that Initiator can trigger auto compaction
@@ -1074,11 +1086,15 @@ public class TestTxnCommands2 {
*/
@Test
public void writeBetweenWorkerAndCleaner() throws Exception {
+ writeBetweenWorkerAndCleanerForVariousTblProperties("'transactional'='true'");
+ }
+
+ protected void writeBetweenWorkerAndCleanerForVariousTblProperties(String tblProperties) throws Exception {
String tblName = "hive12352";
runStatementOnDriver("drop table if exists " + tblName);
runStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
" CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed
- " STORED AS ORC TBLPROPERTIES ('transactional'='true')");
+ " STORED AS ORC TBLPROPERTIES ( " + tblProperties + " )");
//create some data
runStatementOnDriver("insert into " + tblName + " values(1, 'foo'),(2, 'bar'),(3, 'baz')");
@@ -1125,7 +1141,6 @@ public class TestTxnCommands2 {
Assert.assertEquals("", expected,
runStatementOnDriver("select a,b from " + tblName + " order by a"));
}
-
/**
* Simulate the scenario when a heartbeat failed due to client errors such as no locks or no txns being found.
* When a heartbeat fails, the query should be failed too.
@@ -1215,17 +1230,78 @@ public class TestTxnCommands2 {
hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, true);
runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData));
hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, false);
-
+
runStatementOnDriver("alter table "+ Table.ACIDTBL + " compact 'MAJOR'");
runWorker(hiveConf);
runCleaner(hiveConf);
runStatementOnDriver("select count(*) from " + Table.ACIDTBL);
}
+
+ @Test
+ public void testACIDwithSchemaEvolutionAndCompaction() throws Exception {
+ testACIDwithSchemaEvolutionForVariousTblProperties("'transactional'='true'");
+ }
+
+ protected void testACIDwithSchemaEvolutionForVariousTblProperties(String tblProperties) throws Exception {
+ String tblName = "acidWithSchemaEvol";
+ int numBuckets = 1;
+ runStatementOnDriver("drop table if exists " + tblName);
+ runStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
+ " CLUSTERED BY(a) INTO " + numBuckets +" BUCKETS" + //currently ACID requires table to be bucketed
+ " STORED AS ORC TBLPROPERTIES ( " + tblProperties + " )");
+
+ // create some data
+ runStatementOnDriver("insert into " + tblName + " values(1, 'foo'),(2, 'bar'),(3, 'baz')");
+ runStatementOnDriver("update " + tblName + " set b = 'blah' where a = 3");
+
+ // apply schema evolution by adding some columns
+ runStatementOnDriver("alter table " + tblName + " add columns(c int, d string)");
+
+ // insert some data in new schema
+ runStatementOnDriver("insert into " + tblName + " values(4, 'acid', 100, 'orc'),"
+ + "(5, 'llap', 200, 'tez')");
+
+ // update old data with values for the new schema columns
+ runStatementOnDriver("update " + tblName + " set d = 'hive' where a <= 3");
+ runStatementOnDriver("update " + tblName + " set c = 999 where a <= 3");
+
+ // read the entire data back and see if did everything right
+ List<String> rs = runStatementOnDriver("select * from " + tblName + " order by a");
+ String[] expectedResult = { "1\tfoo\t999\thive", "2\tbar\t999\thive", "3\tblah\t999\thive", "4\tacid\t100\torc", "5\tllap\t200\ttez" };
+ Assert.assertEquals(Arrays.asList(expectedResult), rs);
+
+ // now compact and see if compaction still preserves the data correctness
+ runStatementOnDriver("alter table "+ tblName + " compact 'MAJOR'");
+ runWorker(hiveConf);
+ runCleaner(hiveConf); // Cleaner would remove the obsolete files.
+
+ // Verify that there is now only 1 new directory: base_xxxxxxx and the rest have have been cleaned.
+ FileSystem fs = FileSystem.get(hiveConf);
+ FileStatus[] status;
+ status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + tblName.toString().toLowerCase()),
+ FileUtils.STAGING_DIR_PATH_FILTER);
+ Assert.assertEquals(1, status.length);
+ boolean sawNewBase = false;
+ for (int i = 0; i < status.length; i++) {
+ if (status[i].getPath().getName().matches("base_.*")) {
+ sawNewBase = true;
+ FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
+ Assert.assertEquals(numBuckets, buckets.length);
+ Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00000"));
+ }
+ }
+ Assert.assertTrue(sawNewBase);
+
+ rs = runStatementOnDriver("select * from " + tblName + " order by a");
+ Assert.assertEquals(Arrays.asList(expectedResult), rs);
+ }
+
+
/**
* takes raw data and turns it into a string as if from Driver.getResults()
* sorts rows in dictionary order
*/
- private List<String> stringifyValues(int[][] rowsIn) {
+ protected List<String> stringifyValues(int[][] rowsIn) {
assert rowsIn.length > 0;
int[][] rows = rowsIn.clone();
Arrays.sort(rows, new RowComp());
@@ -1275,7 +1351,7 @@ public class TestTxnCommands2 {
return sb.toString();
}
- private List<String> runStatementOnDriver(String stmt) throws Exception {
+ protected List<String> runStatementOnDriver(String stmt) throws Exception {
CommandProcessorResponse cpr = d.run(stmt);
if(cpr.getResponseCode() != 0) {
throw new RuntimeException(stmt + " failed: " + cpr);