You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by vg...@apache.org on 2019/02/05 01:49:07 UTC
[hive] branch master updated: HIVE-20699: Query based compactor for
full CRUD Acid tables (Vaibhav Gumashta reviewed by Eugene Koifman)
This is an automated email from the ASF dual-hosted git repository.
vgumashta pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 313e49f HIVE-20699: Query based compactor for full CRUD Acid tables (Vaibhav Gumashta reviewed by Eugene Koifman)
313e49f is described below
commit 313e49f6b706555a16288fab50c79b7aedf7ba77
Author: Vaibhav Gumashta <vg...@apache.org>
AuthorDate: Mon Feb 4 17:42:02 2019 -0800
HIVE-20699: Query based compactor for full CRUD Acid tables (Vaibhav Gumashta reviewed by Eugene Koifman)
---
.../java/org/apache/hadoop/hive/conf/HiveConf.java | 7 +
.../org/apache/hadoop/hive/ql/TestAcidOnTez.java | 54 ++-
.../ql/txn/compactor/TestCrudCompactorOnTez.java | 429 +++++++++++++++++++++
.../hadoop/hive/ql/exec/FunctionRegistry.java | 1 +
.../hive/ql/exec/tez/HiveSplitGenerator.java | 2 +-
.../hadoop/hive/ql/exec/tez/SplitGrouper.java | 164 +++++++-
.../org/apache/hadoop/hive/ql/io/AcidUtils.java | 8 +-
.../hadoop/hive/ql/io/orc/OrcRawRecordMerger.java | 3 -
.../hadoop/hive/ql/io/orc/OrcRecordUpdater.java | 2 +-
.../org/apache/hadoop/hive/ql/io/orc/OrcSplit.java | 38 +-
.../hadoop/hive/ql/parse/DDLSemanticAnalyzer.java | 6 +
.../hadoop/hive/ql/txn/compactor/CompactorMR.java | 199 +++++++++-
.../hadoop/hive/ql/txn/compactor/Initiator.java | 6 +
.../generic/GenericUDFValidateAcidSortOrder.java | 100 +++++
.../results/clientpositive/show_functions.q.out | 2 +
15 files changed, 987 insertions(+), 34 deletions(-)
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 414070e..a3b03ca 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2705,6 +2705,13 @@ public class HiveConf extends Configuration {
HIVE_COMPACTOR_COMPACT_MM("hive.compactor.compact.insert.only", true,
"Whether the compactor should compact insert-only tables. A safety switch."),
+ COMPACTOR_CRUD_QUERY_BASED("hive.compactor.crud.query.based", false,
+ "Means Major compaction on full CRUD tables is done as a query, "
+ + "and minor compaction will be disabled."),
+ SPLIT_GROUPING_MODE("hive.split.grouping.mode", "query", new StringSet("query", "compactor"),
+ "This is set to compactor from within the query based compactor. This enables the Tez SplitGrouper "
+ + "to group splits based on their bucket number, so that all rows from different bucket files "
+ + " for the same bucket number can end up in the same bucket file after the compaction."),
/**
* @deprecated Use MetastoreConf.COMPACTOR_HISTORY_RETENTION_SUCCEEDED
*/
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java
index d6a4191..142c2d2 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java
@@ -121,10 +121,15 @@ public class TestAcidOnTez {
SessionState.start(new SessionState(hiveConf));
d = DriverFactory.newDriver(hiveConf);
dropTables();
- runStatementOnDriver("create table " + Table.ACIDTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc " + getTblProperties());
- runStatementOnDriver("create table " + Table.ACIDTBLPART + "(a int, b int) partitioned by (p string) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc " + getTblProperties());
- runStatementOnDriver("create table " + Table.NONACIDORCTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc ");
- runStatementOnDriver("create table " + Table.NONACIDPART + "(a int, b int) partitioned by (p string) stored as orc ");
+ runStatementOnDriver("create table " + Table.ACIDTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT
+ + " buckets stored as orc " + getTblProperties());
+ runStatementOnDriver("create table " + Table.ACIDTBLPART
+ + "(a int, b int) partitioned by (p string) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc "
+ + getTblProperties());
+ runStatementOnDriver("create table " + Table.NONACIDORCTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT
+ + " buckets stored as orc ");
+ runStatementOnDriver("create table " + Table.NONACIDPART
+ + "(a int, b int) partitioned by (p string) stored as orc ");
runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(1,2)");
runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(3,4)");
runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(5,6)");
@@ -831,6 +836,42 @@ ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree ~/dev/hiverwgit/itests/h
// No transactions - just the header row
assertEquals(1, rows.size());
}
+
+ /**
+ * HIVE-20699
+ *
+ * see TestTxnCommands3.testCompactor
+ */
+ @Test
+ public void testCrudMajorCompactionSplitGrouper() throws Exception {
+ String tblName = "test_split_grouper";
+ // make a clone of existing hive conf
+ HiveConf confForTez = new HiveConf(hiveConf);
+ setupTez(confForTez); // one-time setup to make query able to run with Tez
+ HiveConf.setVar(confForTez, HiveConf.ConfVars.HIVEFETCHTASKCONVERSION, "none");
+ runStatementOnDriver("create transactional table " + tblName + " (a int, b int) clustered by (a) into 2 buckets "
+ + "stored as ORC TBLPROPERTIES('bucketing_version'='2', 'transactional'='true',"
+ + " 'transactional_properties'='default')", confForTez);
+ runStatementOnDriver("insert into " + tblName + " values(1,2),(1,3),(1,4),(2,2),(2,3),(2,4)", confForTez);
+ runStatementOnDriver("insert into " + tblName + " values(3,2),(3,3),(3,4),(4,2),(4,3),(4,4)", confForTez);
+ runStatementOnDriver("delete from " + tblName + " where b = 2");
+ List<String> expectedRs = new ArrayList<>();
+ expectedRs.add("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t2\t4");
+ expectedRs.add("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t2\t3");
+ expectedRs.add("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t3\t4");
+ expectedRs.add("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\t3\t3");
+ expectedRs.add("{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t1\t4");
+ expectedRs.add("{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t1\t3");
+ expectedRs.add("{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t4\t4");
+ expectedRs.add("{\"writeid\":2,\"bucketid\":536936448,\"rowid\":1}\t4\t3");
+ List<String> rs =
+ runStatementOnDriver("select ROW__ID, * from " + tblName + " order by ROW__ID.bucketid, ROW__ID", confForTez);
+ HiveConf.setVar(confForTez, HiveConf.ConfVars.SPLIT_GROUPING_MODE, "compactor");
+ // No order by needed: this should use the compactor split grouping to return the rows in correct order
+ List<String> rsCompact = runStatementOnDriver("select ROW__ID, * from " + tblName, confForTez);
+ Assert.assertEquals("normal read", expectedRs, rs);
+ Assert.assertEquals("compacted read", rs, rsCompact);
+ }
private void restartSessionAndDriver(HiveConf conf) throws Exception {
SessionState ss = SessionState.get();
@@ -910,11 +951,16 @@ ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree ~/dev/hiverwgit/itests/h
private void setupTez(HiveConf conf) {
conf.setVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "tez");
conf.setVar(HiveConf.ConfVars.HIVE_USER_INSTALL_DIR, TEST_DATA_DIR);
+ conf.set("tez.am.resource.memory.mb", "128");
+ conf.set("tez.am.dag.scheduler.class", "org.apache.tez.dag.app.dag.impl.DAGSchedulerNaturalOrderControlled");
conf.setBoolean("tez.local.mode", true);
conf.set("fs.defaultFS", "file:///");
conf.setBoolean("tez.runtime.optimize.local.fetch", true);
conf.set("tez.staging-dir", TEST_DATA_DIR);
conf.setBoolean("tez.ignore.lib.uris", true);
+ conf.set("hive.tez.container.size", "128");
+ conf.setBoolean("hive.merge.tezfiles", false);
+ conf.setBoolean("hive.in.tez.test", true);
}
private void setupMapJoin(HiveConf conf) {
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java
new file mode 100644
index 0000000..d59cfe5
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.CompactionRequest;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.ql.DriverFactory;
+import org.apache.hadoop.hive.ql.IDriver;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.HiveInputFormat;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.orc.OrcConf;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings("deprecation")
+// TODO: Add tests for bucketing_version=1 when HIVE-21167 is fixed
+public class TestCrudCompactorOnTez {
+ private static final AtomicInteger salt = new AtomicInteger(new Random().nextInt());
+ private static final Logger LOG = LoggerFactory.getLogger(TestCrudCompactorOnTez.class);
+ private static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") + File.separator
+ + TestCrudCompactorOnTez.class.getCanonicalName() + "-" + System.currentTimeMillis() + "_" + salt
+ .getAndIncrement()).getPath().replaceAll("\\\\", "/");
+ private static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse";
+ private HiveConf conf;
+ IMetaStoreClient msClient;
+ private IDriver driver;
+
+ @Before
+ // Note: we create a new conf and driver object before every test
+ public void setup() throws Exception {
+ File f = new File(TEST_WAREHOUSE_DIR);
+ if (f.exists()) {
+ FileUtil.fullyDelete(f);
+ }
+ if (!(new File(TEST_WAREHOUSE_DIR).mkdirs())) {
+ throw new RuntimeException("Could not create " + TEST_WAREHOUSE_DIR);
+ }
+ HiveConf hiveConf = new HiveConf(this.getClass());
+ hiveConf.setVar(HiveConf.ConfVars.PREEXECHOOKS, "");
+ hiveConf.setVar(HiveConf.ConfVars.POSTEXECHOOKS, "");
+ hiveConf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE, TEST_WAREHOUSE_DIR);
+ hiveConf.setVar(HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName());
+ hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict");
+ hiveConf.setVar(HiveConf.ConfVars.HIVEFETCHTASKCONVERSION, "none");
+ TxnDbUtil.setConfValues(hiveConf);
+ TxnDbUtil.cleanDb(hiveConf);
+ TxnDbUtil.prepDb(hiveConf);
+ conf = hiveConf;
+ // Use tez as execution engine for this test class
+ setupTez(conf);
+ msClient = new HiveMetaStoreClient(conf);
+ driver = DriverFactory.newDriver(conf);
+ SessionState.start(new CliSessionState(conf));
+ }
+
+ private void setupTez(HiveConf conf) {
+ conf.setVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "tez");
+ conf.setVar(HiveConf.ConfVars.HIVE_USER_INSTALL_DIR, TEST_DATA_DIR);
+ conf.set("tez.am.resource.memory.mb", "128");
+ conf.set("tez.am.dag.scheduler.class", "org.apache.tez.dag.app.dag.impl.DAGSchedulerNaturalOrderControlled");
+ conf.setBoolean("tez.local.mode", true);
+ conf.set("fs.defaultFS", "file:///");
+ conf.setBoolean("tez.runtime.optimize.local.fetch", true);
+ conf.set("tez.staging-dir", TEST_DATA_DIR);
+ conf.setBoolean("tez.ignore.lib.uris", true);
+ conf.set("hive.tez.container.size", "128");
+ conf.setBoolean("hive.merge.tezfiles", false);
+ conf.setBoolean("hive.in.tez.test", true);
+ }
+
+ @After
+ public void tearDown() {
+ if (msClient != null) {
+ msClient.close();
+ }
+ if (driver != null) {
+ driver.close();
+ }
+ conf = null;
+ }
+
+ @Test
+ public void testMajorCompaction() throws Exception {
+ String dbName = "default";
+ String tblName = "testMajorCompaction";
+ executeStatementOnDriver("drop table if exists " + tblName, driver);
+ executeStatementOnDriver("create transactional table " + tblName + " (a int, b int) clustered by (a) into 2 buckets"
+ + " stored as ORC TBLPROPERTIES('bucketing_version'='2', 'transactional'='true',"
+ + " 'transactional_properties'='default')", driver);
+ executeStatementOnDriver("insert into " + tblName + " values(1,2),(1,3),(1,4),(2,2),(2,3),(2,4)", driver);
+ executeStatementOnDriver("insert into " + tblName + " values(3,2),(3,3),(3,4),(4,2),(4,3),(4,4)", driver);
+ executeStatementOnDriver("delete from " + tblName + " where b = 2", driver);
+ // Find the location of the table
+ IMetaStoreClient msClient = new HiveMetaStoreClient(conf);
+ Table table = msClient.getTable(dbName, tblName);
+ FileSystem fs = FileSystem.get(conf);
+ // Verify deltas (delta_0000001_0000001_0000, delta_0000002_0000002_0000) are present
+ FileStatus[] filestatus = fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.deltaFileFilter);
+ String[] deltas = new String[filestatus.length];
+ for (int i = 0; i < deltas.length; i++) {
+ deltas[i] = filestatus[i].getPath().getName();
+ }
+ Arrays.sort(deltas);
+ String[] expectedDeltas = new String[] { "delta_0000001_0000001_0000", "delta_0000002_0000002_0000" };
+ if (!Arrays.deepEquals(expectedDeltas, deltas)) {
+ Assert.fail("Expected: " + Arrays.toString(expectedDeltas) + ", found: " + Arrays.toString(deltas));
+ }
+ // Verify that delete delta (delete_delta_0000003_0000003_0000) is present
+ FileStatus[] deleteDeltaStat = fs.listStatus(new Path(table.getSd().getLocation()),
+ AcidUtils.deleteEventDeltaDirFilter);
+ String[] deleteDeltas = new String[deleteDeltaStat.length];
+ for (int i = 0; i < deleteDeltas.length; i++) {
+ deleteDeltas[i] = deleteDeltaStat[i].getPath().getName();
+ }
+ Arrays.sort(deleteDeltas);
+ String[] expectedDeleteDeltas = new String[] { "delete_delta_0000003_0000003_0000" };
+ if (!Arrays.deepEquals(expectedDeleteDeltas, deleteDeltas)) {
+ Assert.fail("Expected: " + Arrays.toString(expectedDeleteDeltas) + ", found: " + Arrays.toString(deleteDeltas));
+ }
+ List<String> expectedRsBucket0 = new ArrayList<>();
+ expectedRsBucket0.add("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t2\t4");
+ expectedRsBucket0.add("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t2\t3");
+ expectedRsBucket0.add("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t3\t4");
+ expectedRsBucket0.add("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\t3\t3");
+ List<String> expectedRsBucket1 = new ArrayList<>();
+ expectedRsBucket1.add("{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t1\t4");
+ expectedRsBucket1.add("{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t1\t3");
+ expectedRsBucket1.add("{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t4\t4");
+ expectedRsBucket1.add("{\"writeid\":2,\"bucketid\":536936448,\"rowid\":1}\t4\t3");
+ // Bucket 0
+ List<String> rsBucket0 = executeStatementOnDriverAndReturnResults("select ROW__ID, * from " + tblName
+ + " where ROW__ID.bucketid = 536870912 order by ROW__ID", driver);
+ Assert.assertEquals("normal read", expectedRsBucket0, rsBucket0);
+ // Bucket 1
+ List<String> rsBucket1 = executeStatementOnDriverAndReturnResults("select ROW__ID, * from " + tblName
+ + " where ROW__ID.bucketid = 536936448 order by ROW__ID", driver);
+ Assert.assertEquals("normal read", expectedRsBucket1, rsBucket1);
+ // Run major compaction and cleaner
+ runCompaction(dbName, tblName, CompactionType.MAJOR);
+ runCleaner(conf);
+ // Should contain only one base directory now
+ filestatus = fs.listStatus(new Path(table.getSd().getLocation()));
+ String[] bases = new String[filestatus.length];
+ for (int i = 0; i < bases.length; i++) {
+ bases[i] = filestatus[i].getPath().getName();
+ }
+ Arrays.sort(bases);
+ String[] expectedBases = new String[] { "base_0000003_v0000008" };
+ if (!Arrays.deepEquals(expectedBases, bases)) {
+ Assert.fail("Expected: " + Arrays.toString(expectedBases) + ", found: " + Arrays.toString(bases));
+ }
+ // Bucket 0
+ List<String> rsCompactBucket0 = executeStatementOnDriverAndReturnResults("select ROW__ID, * from " + tblName
+ + " where ROW__ID.bucketid = 536870912", driver);
+ Assert.assertEquals("compacted read", rsBucket0, rsCompactBucket0);
+ // Bucket 1
+ List<String> rsCompactBucket1 = executeStatementOnDriverAndReturnResults("select ROW__ID, * from " + tblName
+ + " where ROW__ID.bucketid = 536936448", driver);
+ Assert.assertEquals("compacted read", rsBucket1, rsCompactBucket1);
+ // Clean up
+ executeStatementOnDriver("drop table " + tblName, driver);
+ }
+
+ @Test
+ public void testMinorCompactionDisabled() throws Exception {
+ String dbName = "default";
+ String tblName = "testMinorCompactionDisabled";
+ executeStatementOnDriver("drop table if exists " + tblName, driver);
+ executeStatementOnDriver("create transactional table " + tblName + " (a int, b int) clustered by (a) into 2 buckets"
+ + " stored as ORC TBLPROPERTIES('bucketing_version'='2', 'transactional'='true',"
+ + " 'transactional_properties'='default')", driver);
+ executeStatementOnDriver("insert into " + tblName + " values(1,2),(1,3),(1,4),(2,2),(2,3),(2,4)", driver);
+ executeStatementOnDriver("insert into " + tblName + " values(3,2),(3,3),(3,4),(4,2),(4,3),(4,4)", driver);
+ executeStatementOnDriver("delete from " + tblName + " where b = 2", driver);
+ // Find the location of the table
+ IMetaStoreClient msClient = new HiveMetaStoreClient(conf);
+ Table table = msClient.getTable(dbName, tblName);
+ FileSystem fs = FileSystem.get(conf);
+ // Verify deltas (delta_0000001_0000001_0000, delta_0000002_0000002_0000) are present
+ FileStatus[] filestatus = fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.deltaFileFilter);
+ String[] deltas = new String[filestatus.length];
+ for (int i = 0; i < deltas.length; i++) {
+ deltas[i] = filestatus[i].getPath().getName();
+ }
+ Arrays.sort(deltas);
+ String[] expectedDeltas = new String[] { "delta_0000001_0000001_0000", "delta_0000002_0000002_0000" };
+ if (!Arrays.deepEquals(expectedDeltas, deltas)) {
+ Assert.fail("Expected: " + Arrays.toString(expectedDeltas) + ", found: " + Arrays.toString(deltas));
+ }
+ // Verify that delete delta (delete_delta_0000003_0000003_0000) is present
+ FileStatus[] deleteDeltaStat = fs.listStatus(new Path(table.getSd().getLocation()),
+ AcidUtils.deleteEventDeltaDirFilter);
+ String[] deleteDeltas = new String[deleteDeltaStat.length];
+ for (int i = 0; i < deleteDeltas.length; i++) {
+ deleteDeltas[i] = deleteDeltaStat[i].getPath().getName();
+ }
+ Arrays.sort(deleteDeltas);
+ String[] expectedDeleteDeltas = new String[] { "delete_delta_0000003_0000003_0000" };
+ if (!Arrays.deepEquals(expectedDeleteDeltas, deleteDeltas)) {
+ Assert.fail("Expected: " + Arrays.toString(expectedDeleteDeltas) + ", found: " + Arrays.toString(deleteDeltas));
+ }
+ // Initiate a compaction, make sure it's not queued
+ runInitiator(conf);
+ TxnStore txnHandler = TxnUtils.getTxnStore(conf);
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+ Assert.assertEquals(0, compacts.size());
+ // Clean up
+ executeStatementOnDriver("drop table " + tblName, driver);
+ }
+
+ @Test
+ public void testCompactionWithSchemaEvolutionAndBuckets() throws Exception {
+ String dbName = "default";
+ String tblName = "testCompactionWithSchemaEvolutionAndBuckets";
+ executeStatementOnDriver("drop table if exists " + tblName, driver);
+ executeStatementOnDriver("create transactional table " + tblName
+ + " (a int, b int) partitioned by(ds string) clustered by (a) into 2 buckets"
+ + " stored as ORC TBLPROPERTIES('bucketing_version'='2', 'transactional'='true',"
+ + " 'transactional_properties'='default')", driver);
+ // Insert some data
+ executeStatementOnDriver("insert into " + tblName
+ + " partition (ds) values(1,2,'today'),(1,3,'today'),(1,4,'yesterday'),(2,2,'yesterday'),(2,3,'today'),(2,4,'today')",
+ driver);
+ // Add a new column
+ executeStatementOnDriver("alter table " + tblName + " add columns(c int)", driver);
+ // Insert more data
+ executeStatementOnDriver("insert into " + tblName
+ + " partition (ds) values(3,2,1000,'yesterday'),(3,3,1001,'today'),(3,4,1002,'yesterday'),(4,2,1003,'today'),"
+ + "(4,3,1004,'yesterday'),(4,4,1005,'today')", driver);
+ executeStatementOnDriver("delete from " + tblName + " where b = 2", driver);
+ // Run major compaction and cleaner
+ runCompaction(dbName, tblName, CompactionType.MAJOR, "ds=yesterday", "ds=today");
+ runCleaner(conf);
+ List<String> expectedRsBucket0PtnToday = new ArrayList<>();
+ expectedRsBucket0PtnToday.add("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t2\t3\tNULL\ttoday");
+ expectedRsBucket0PtnToday.add("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t2\t4\tNULL\ttoday");
+ expectedRsBucket0PtnToday.add("{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t3\t3\t1001\ttoday");
+ List<String> expectedRsBucket1PtnToday = new ArrayList<>();
+ expectedRsBucket1PtnToday.add("{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t1\t3\tNULL\ttoday");
+ expectedRsBucket1PtnToday.add("{\"writeid\":3,\"bucketid\":536936448,\"rowid\":1}\t4\t4\t1005\ttoday");
+ // Bucket 0, partition 'today'
+ List<String> rsCompactBucket0PtnToday = executeStatementOnDriverAndReturnResults("select ROW__ID, * from "
+ + tblName + " where ROW__ID.bucketid = 536870912 and ds='today'", driver);
+ Assert.assertEquals("compacted read", expectedRsBucket0PtnToday, rsCompactBucket0PtnToday);
+ // Bucket 1, partition 'today'
+ List<String> rsCompactBucket1PtnToday = executeStatementOnDriverAndReturnResults("select ROW__ID, * from "
+ + tblName + " where ROW__ID.bucketid = 536936448 and ds='today'", driver);
+ Assert.assertEquals("compacted read", expectedRsBucket1PtnToday, rsCompactBucket1PtnToday);
+ // Clean up
+ executeStatementOnDriver("drop table " + tblName, driver);
+ }
+
+ @Test
+ public void testCompactionWithSchemaEvolutionNoBucketsMultipleReducers() throws Exception {
+ HiveConf hiveConf = new HiveConf(conf);
+ hiveConf.setIntVar(HiveConf.ConfVars.MAXREDUCERS, 2);
+ hiveConf.setIntVar(HiveConf.ConfVars.HADOOPNUMREDUCERS, 2);
+ driver = DriverFactory.newDriver(hiveConf);
+ String dbName = "default";
+ String tblName = "testCompactionWithSchemaEvolutionNoBucketsMultipleReducers";
+ executeStatementOnDriver("drop table if exists " + tblName, driver);
+ executeStatementOnDriver("create transactional table " + tblName + " (a int, b int) partitioned by(ds string)"
+ + " stored as ORC TBLPROPERTIES('transactional'='true'," + " 'transactional_properties'='default')", driver);
+ // Insert some data
+ executeStatementOnDriver("insert into " + tblName
+ + " partition (ds) values(1,2,'today'),(1,3,'today'),(1,4,'yesterday'),(2,2,'yesterday'),(2,3,'today'),(2,4,'today')",
+ driver);
+ // Add a new column
+ executeStatementOnDriver("alter table " + tblName + " add columns(c int)", driver);
+ // Insert more data
+ executeStatementOnDriver("insert into " + tblName
+ + " partition (ds) values(3,2,1000,'yesterday'),(3,3,1001,'today'),(3,4,1002,'yesterday'),(4,2,1003,'today'),"
+ + "(4,3,1004,'yesterday'),(4,4,1005,'today')", driver);
+ executeStatementOnDriver("delete from " + tblName + " where b = 2", driver);
+ // Run major compaction and cleaner
+ runCompaction(dbName, tblName, CompactionType.MAJOR, "ds=yesterday", "ds=today");
+ runCleaner(hiveConf);
+ List<String> expectedRsPtnToday = new ArrayList<>();
+ expectedRsPtnToday.add("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t1\t3\tNULL\ttoday");
+ expectedRsPtnToday.add("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":2}\t2\t3\tNULL\ttoday");
+ expectedRsPtnToday.add("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":3}\t2\t4\tNULL\ttoday");
+ expectedRsPtnToday.add("{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t3\t3\t1001\ttoday");
+ expectedRsPtnToday.add("{\"writeid\":3,\"bucketid\":536870912,\"rowid\":2}\t4\t4\t1005\ttoday");
+ List<String> expectedRsPtnYesterday = new ArrayList<>();
+ expectedRsPtnYesterday.add("{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t1\t4\tNULL\tyesterday");
+ expectedRsPtnYesterday.add("{\"writeid\":3,\"bucketid\":536936448,\"rowid\":1}\t3\t4\t1002\tyesterday");
+ expectedRsPtnYesterday.add("{\"writeid\":3,\"bucketid\":536936448,\"rowid\":2}\t4\t3\t1004\tyesterday");
+ // Partition 'today'
+ List<String> rsCompactPtnToday = executeStatementOnDriverAndReturnResults("select ROW__ID, * from " + tblName
+ + " where ds='today'", driver);
+ Assert.assertEquals("compacted read", expectedRsPtnToday, rsCompactPtnToday);
+ // Partition 'yesterday'
+ List<String> rsCompactPtnYesterday = executeStatementOnDriverAndReturnResults("select ROW__ID, * from " + tblName
+ + " where ds='yesterday'", driver);
+ Assert.assertEquals("compacted read", expectedRsPtnYesterday, rsCompactPtnYesterday);
+ // Clean up
+ executeStatementOnDriver("drop table " + tblName, driver);
+ }
+
+ private void runCompaction(String dbName, String tblName, CompactionType compactionType, String... partNames)
+ throws Exception {
+ HiveConf hiveConf = new HiveConf(conf);
+ hiveConf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true);
+ TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
+ Worker t = new Worker();
+ t.setThreadId((int) t.getId());
+ t.setConf(hiveConf);
+ t.init(new AtomicBoolean(true), new AtomicBoolean());
+ if (partNames.length == 0) {
+ txnHandler.compact(new CompactionRequest(dbName, tblName, compactionType));
+ t.run();
+ } else {
+ for (String partName : partNames) {
+ CompactionRequest cr = new CompactionRequest(dbName, tblName, compactionType);
+ cr.setPartitionname(partName);
+ txnHandler.compact(cr);
+ t.run();
+ }
+ }
+ }
+
+ static void runInitiator(HiveConf hConf) throws Exception {
+ HiveConf hiveConf = new HiveConf(hConf);
+ hiveConf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true);
+ AtomicBoolean stop = new AtomicBoolean(true);
+ Initiator t = new Initiator();
+ t.setThreadId((int) t.getId());
+ t.setConf(hiveConf);
+ AtomicBoolean looped = new AtomicBoolean();
+ t.init(stop, looped);
+ t.run();
+ }
+
+ static void runWorker(HiveConf hConf) throws Exception {
+ HiveConf hiveConf = new HiveConf(hConf);
+ hiveConf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true);
+ AtomicBoolean stop = new AtomicBoolean(true);
+ Worker t = new Worker();
+ t.setThreadId((int) t.getId());
+ t.setConf(hiveConf);
+ AtomicBoolean looped = new AtomicBoolean();
+ t.init(stop, looped);
+ t.run();
+ }
+
+ static void runCleaner(HiveConf hConf) throws Exception {
+ HiveConf hiveConf = new HiveConf(hConf);
+ AtomicBoolean stop = new AtomicBoolean(true);
+ Cleaner t = new Cleaner();
+ t.setThreadId((int) t.getId());
+ t.setConf(hiveConf);
+ AtomicBoolean looped = new AtomicBoolean();
+ t.init(stop, looped);
+ t.run();
+ }
+
+ /**
+ * Execute Hive CLI statement
+ *
+ * @param cmd arbitrary statement to execute
+ */
+ static void executeStatementOnDriver(String cmd, IDriver driver) throws Exception {
+ LOG.debug("Executing: " + cmd);
+ CommandProcessorResponse cpr = driver.run(cmd);
+ if (cpr.getResponseCode() != 0) {
+ throw new IOException("Failed to execute \"" + cmd + "\". Driver returned: " + cpr);
+ }
+ }
+
+ static List<String> executeStatementOnDriverAndReturnResults(String cmd, IDriver driver) throws Exception {
+ LOG.debug("Executing: " + cmd);
+ CommandProcessorResponse cpr = driver.run(cmd);
+ if (cpr.getResponseCode() != 0) {
+ throw new IOException("Failed to execute \"" + cmd + "\". Driver returned: " + cpr);
+ }
+ List<String> rs = new ArrayList<String>();
+ driver.getResults(rs);
+ return rs;
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
index e7aa041..9ff0107 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
@@ -288,6 +288,7 @@ public final class FunctionRegistry {
system.registerGenericUDF("split", GenericUDFSplit.class);
system.registerGenericUDF("str_to_map", GenericUDFStringToMap.class);
system.registerGenericUDF("translate", GenericUDFTranslate.class);
+ system.registerGenericUDF("validate_acid_sort_order", GenericUDFValidateAcidSortOrder.class);
system.registerGenericUDF(UNARY_PLUS_FUNC_NAME, GenericUDFOPPositive.class);
system.registerGenericUDF(UNARY_MINUS_FUNC_NAME, GenericUDFOPNegative.class);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
index 15c14c9..c270507 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
@@ -128,7 +128,7 @@ public class HiveSplitGenerator extends InputInitializer {
MRInputHelpers.parseMRInputPayload(initializerContext.getInputUserPayload());
this.conf = TezUtils.createConfFromByteString(userPayloadProto.getConfigurationBytes());
-
+
this.jobConf = new JobConf(conf);
// Read all credentials into the credentials instance stored in JobConf.
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java
index 7f8bd22..33d723a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
@@ -33,10 +34,15 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
import org.apache.hadoop.hive.ql.io.HiveInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcSplit;
import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
@@ -160,43 +166,161 @@ public class SplitGrouper {
return generateGroupedSplits(jobConf, conf, splits, waves, availableSlots, null, true, locationProvider);
}
- /** Generate groups of splits, separated by schema evolution boundaries */
- public Multimap<Integer, InputSplit> generateGroupedSplits(JobConf jobConf,
- Configuration conf,
- InputSplit[] splits,
- float waves, int availableSlots,
- String inputName,
- boolean groupAcrossFiles,
- SplitLocationProvider locationProvider) throws
- Exception {
-
- MapWork work = populateMapWork(jobConf, inputName);
+ /**
+ * Generate groups of splits, separated by schema evolution boundaries
+ * OR
+ * When used from compactor, group splits based on the bucket number of the input files
+ * (in this case, splits for same logical bucket but different schema, end up in same group)
+ */
+ public Multimap<Integer, InputSplit> generateGroupedSplits(JobConf jobConf, Configuration conf, InputSplit[] splits,
+ float waves, int availableSlots, String inputName, boolean groupAcrossFiles,
+ SplitLocationProvider locationProvider) throws Exception {
+ MapWork mapWork = populateMapWork(jobConf, inputName);
// ArrayListMultimap is important here to retain the ordering for the splits.
- Multimap<Integer, InputSplit> bucketSplitMultiMap =
- ArrayListMultimap.<Integer, InputSplit> create();
+ Multimap<Integer, InputSplit> schemaGroupedSplitMultiMap = ArrayListMultimap.<Integer, InputSplit> create();
+
+ if (HiveConf.getVar(jobConf, HiveConf.ConfVars.SPLIT_GROUPING_MODE).equalsIgnoreCase("compactor")) {
+ List<Path> paths = Utilities.getInputPathsTez(jobConf, mapWork);
+ for (Path path : paths) {
+ List<String> aliases = mapWork.getPathToAliases().get(path);
+ if ((aliases != null) && (aliases.size() == 1)) {
+ Operator<? extends OperatorDesc> op = mapWork.getAliasToWork().get(aliases.get(0));
+ if ((op != null) && (op instanceof TableScanOperator)) {
+ TableScanOperator tableScan = (TableScanOperator) op;
+ if (!tableScan.getConf().isTranscationalTable()) {
+ String splitPath = getFirstSplitPath(splits);
+ String errorMessage =
+ "Compactor split grouping is enabled only for transactional tables. Please check the path: "
+ + splitPath;
+ LOG.error(errorMessage);
+ throw new RuntimeException(errorMessage);
+ }
+ }
+ }
+ }
+ /**
+ * The expectation is that each InputSplit is a {@link org.apache.hadoop.hive.ql.io.HiveInputFormat.HiveInputSplit}
+ * wrapping an OrcSplit. So group these splits by bucketId and within each bucketId, sort by writeId, stmtId,
+ * rowIdOffset or splitStart. For 'original' splits (w/o acid meta cols in the file) SyntheticBucketProperties
+ * should always be there and so rowIdOffset is there. For 'native' acid files, OrcSplit doesn't have
+ * the 1st rowid in the split, so splitStart is used to sort. This should achieve the required sorting invariance
+ * (sort by: writeId, stmtId, rowIdOffset within each bucket) needed for Acid tables.
+ * See: {@link org.apache.hadoop.hive.ql.io.AcidInputFormat}
+ * Create a TezGroupedSplit for each bucketId and return.
+ * TODO: Are there any other config values (split size etc) that can override this per writer split grouping?
+ */
+ return getCompactorSplitGroups(splits, conf);
+ }
int i = 0;
InputSplit prevSplit = null;
for (InputSplit s : splits) {
- // this is the bit where we make sure we don't group across partition
- // schema boundaries
- if (schemaEvolved(s, prevSplit, groupAcrossFiles, work)) {
+ // this is the bit where we make sure we don't group across partition schema boundaries
+ if (schemaEvolved(s, prevSplit, groupAcrossFiles, mapWork)) {
++i;
prevSplit = s;
}
- bucketSplitMultiMap.put(i, s);
+ schemaGroupedSplitMultiMap.put(i, s);
}
LOG.info("# Src groups for split generation: " + (i + 1));
-
// group them into the chunks we want
Multimap<Integer, InputSplit> groupedSplits =
- this.group(jobConf, bucketSplitMultiMap, availableSlots, waves, locationProvider);
-
+ this.group(jobConf, schemaGroupedSplitMultiMap, availableSlots, waves, locationProvider);
return groupedSplits;
}
+
+ // Returns the path of the first split in this list for logging purposes
+ private String getFirstSplitPath(InputSplit[] splits) {
+ if (splits.length == 0) {
+ throw new RuntimeException("The list of splits provided for grouping is empty.");
+ }
+ Path splitPath = ((FileSplit) splits[0]).getPath();
+
+ return splitPath.toString();
+ }
/**
+ * Takes a list of {@link org.apache.hadoop.hive.ql.io.HiveInputFormat.HiveInputSplit}s
+ * and groups them for Acid Compactor, creating one TezGroupedSplit per bucket number.
+ */
+ Multimap<Integer, InputSplit> getCompactorSplitGroups(InputSplit[] rawSplits, Configuration conf) {
+ // Note: For our case, this multimap will essentially contain one value (one TezGroupedSplit) per key
+ Multimap<Integer, InputSplit> bucketSplitMultiMap = ArrayListMultimap.<Integer, InputSplit> create();
+ HiveInputFormat.HiveInputSplit[] splits = new HiveInputFormat.HiveInputSplit[rawSplits.length];
+ int i = 0;
+ for (InputSplit is : rawSplits) {
+ splits[i++] = (HiveInputFormat.HiveInputSplit) is;
+ }
+ Arrays.sort(splits, new ComparatorCompactor(conf));
+ TezGroupedSplit tgs = null;
+ int previousWriterId = Integer.MIN_VALUE;
+ Path rootDir = null;
+ for (i = 0; i < splits.length; i++) {
+ int writerId = ((OrcSplit) splits[i].getInputSplit()).getBucketId();
+ if (rootDir == null) {
+ rootDir = ((OrcSplit) splits[i].getInputSplit()).getRootDir();
+ }
+ Path rootDirFromCurrentSplit = ((OrcSplit) splits[i].getInputSplit()).getRootDir();
+ // These splits should belong to the same partition
+ assert rootDir == rootDirFromCurrentSplit;
+ if (writerId != previousWriterId) {
+ // Create a new grouped split for this writerId
+ tgs = new TezGroupedSplit(1, "org.apache.hadoop.hive.ql.io.HiveInputFormat", null, null);
+ bucketSplitMultiMap.put(writerId, tgs);
+ }
+ tgs.addSplit(splits[i]);
+ previousWriterId = writerId;
+ }
+ return bucketSplitMultiMap;
+ }
+
+ static class ComparatorCompactor implements Comparator<HiveInputFormat.HiveInputSplit> {
+ private Configuration conf;
+ private ComparatorCompactor(Configuration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public int compare(HiveInputFormat.HiveInputSplit h1, HiveInputFormat.HiveInputSplit h2) {
+ //sort: bucketId,writeId,stmtId,rowIdOffset,splitStart
+ if(h1 == h2) {
+ return 0;
+ }
+ OrcSplit o1 = (OrcSplit)h1.getInputSplit();
+ OrcSplit o2 = (OrcSplit)h2.getInputSplit();
+ try {
+ o1.parse(conf);
+ o2.parse(conf);
+ } catch(IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ // Note: this is the bucket number as seen in the file name.
+ // Hive 3.0 encodes a bunch of info in the Acid schema's bucketId attribute.
+ // See: {@link org.apache.hadoop.hive.ql.io.BucketCodec.V1} for details.
+ if(o1.getBucketId() != o2.getBucketId()) {
+ return o1.getBucketId() < o2.getBucketId() ? -1 : 1;
+ }
+ if(o1.getWriteId() != o2.getWriteId()) {
+ return o1.getWriteId() < o2.getWriteId() ? -1 : 1;
+ }
+ if(o1.getStatementId() != o2.getStatementId()) {
+ return o1.getStatementId() < o2.getStatementId() ? -1 : 1;
+ }
+ long rowOffset1 = o1.getSyntheticAcidProps() == null ? 0 : o1.getSyntheticAcidProps().getRowIdOffset();
+ long rowOffset2 = o2.getSyntheticAcidProps() == null ? 0 : o2.getSyntheticAcidProps().getRowIdOffset();
+ if(rowOffset1 != rowOffset2) {
+ //if 2 splits are from the same file (delta/base in fact), they either both have syntheticAcidProps or both do not
+ return rowOffset1 < rowOffset2 ? -1 : 1;
+ }
+ if(o1.getStart() != o2.getStart()) {
+ return o1.getStart() < o2.getStart() ? -1 : 1;
+ }
+ throw new RuntimeException("Found 2 equal splits: " + o1 + " and " + o2);
+ }
+ }
+
+ /**
* get the size estimates for each bucket in tasks. This is used to make sure
* we allocate the head room evenly
*/
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index 5dbf634..9b51847 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -1553,9 +1553,13 @@ public class AcidUtils {
}
public static boolean isFullAcidScan(Configuration conf) {
- if (!HiveConf.getBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN)) return false;
+ if (!HiveConf.getBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN)) {
+ return false;
+ }
int propInt = conf.getInt(ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname, -1);
- if (propInt == -1) return true;
+ if (propInt == -1) {
+ return true;
+ }
AcidOperationalProperties props = AcidOperationalProperties.parseInt(propInt);
return !props.isInsertOnly();
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
index fbb931c..7c4bc4d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
@@ -1230,9 +1230,6 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
else {
AcidUtils.ParsedDelta pd = AcidUtils.parsedDelta(parent, AcidUtils.DELTA_PREFIX,
parent.getFileSystem(conf));
- assert pd.getMinWriteId() == pd.getMaxWriteId() :
- "This a delta with raw non acid schema, must be result of single write, no compaction: "
- + splitPath;
return new TransactionMetaData(pd.getMinWriteId(), parent, pd.getStatementId());
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
index 6d4578e..2255f8b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
@@ -634,7 +634,7 @@ public class OrcRecordUpdater implements RecordUpdater {
} catch (CharacterCodingException e) {
throw new IllegalArgumentException("Bad string encoding for " +
OrcRecordUpdater.ACID_KEY_INDEX_NAME, e);
- }
+ }
RecordIdentifier[] result = new RecordIdentifier[stripes.length];
for(int i=0; i < stripes.length; ++i) {
if (stripes[i].length() != 0) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
index 4d55592..61e7558 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.io.AcidInputFormat;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.ColumnarSplit;
import org.apache.hadoop.hive.ql.io.LlapAwareSplit;
@@ -64,6 +65,9 @@ public class OrcSplit extends FileSplit implements ColumnarSplit, LlapAwareSplit
private long projColsUncompressedSize;
private transient Object fileKey;
private long fileLen;
+ private transient long writeId = 0;
+ private transient int bucketId = 0;
+ private transient int stmtId = 0;
/**
* This contains the synthetic ROW__ID offset and bucket properties for original file splits in an ACID table.
@@ -306,7 +310,7 @@ public class OrcSplit extends FileSplit implements ColumnarSplit, LlapAwareSplit
/**
* Used for generating synthetic ROW__IDs for reading "original" files.
*/
- static final class OffsetAndBucketProperty {
+ public static final class OffsetAndBucketProperty {
private final long rowIdOffset;
private final int bucketProperty;
private final long syntheticWriteId;
@@ -328,6 +332,38 @@ public class OrcSplit extends FileSplit implements ColumnarSplit, LlapAwareSplit
return syntheticWriteId;
}
}
+
+ /**
+ * Note: this is the write id as seen in the file name that contains this split
+ * For files that have min/max writeId, this is the starting one.
+ * @return
+ */
+ public long getWriteId() {
+ return writeId;
+ }
+
+ public int getStatementId() {
+ return stmtId;
+ }
+
+ /**
+ * Note: this is the bucket number as seen in the file name that contains this split.
+ * Hive 3.0 encodes a bunch of info in the Acid schema's bucketId attribute.
+ * See: {@link org.apache.hadoop.hive.ql.io.BucketCodec.V1} for details.
+ * @return
+ */
+ public int getBucketId() {
+ return bucketId;
+ }
+
+ public void parse(Configuration conf) throws IOException {
+ OrcRawRecordMerger.TransactionMetaData tmd =
+ OrcRawRecordMerger.TransactionMetaData.findWriteIDForSynthetcRowIDs(getPath(), rootDir, conf);
+ writeId = tmd.syntheticWriteId;
+ stmtId = tmd.statementId;
+ AcidOutputFormat.Options opt = AcidUtils.parseBaseOrDeltaBucketFilename(getPath(), conf);
+ bucketId = opt.getBucketId();
+ }
@Override
public String toString() {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
index db3b427..adfa431 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
@@ -2197,6 +2197,12 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer {
String type = unescapeSQLString(ast.getChild(0).getText()).toLowerCase();
+ if (type.equalsIgnoreCase("minor") && HiveConf.getBoolVar(conf, ConfVars.COMPACTOR_CRUD_QUERY_BASED)) {
+ throw new SemanticException(
+ "Minor compaction is not currently supported for query based compaction (enabled by setting: "
+ + ConfVars.COMPACTOR_CRUD_QUERY_BASED + " to true).");
+ }
+
if (!type.equals("minor") && !type.equals("major")) {
throw new SemanticException(ErrorMsg.INVALID_COMPACTION_TYPE.getMsg());
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index dc05e19..cde47da 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -33,6 +33,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.regex.Matcher;
+import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
@@ -59,6 +60,7 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.ql.DriverUtils;
import org.apache.hadoop.hive.ql.exec.DDLTask;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
@@ -69,6 +71,7 @@ import org.apache.hadoop.hive.ql.io.AcidUtils.Directory;
import org.apache.hadoop.hive.ql.io.HiveInputFormat;
import org.apache.hadoop.hive.ql.io.IOConstants;
import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
import org.apache.hadoop.hive.ql.session.SessionState;
@@ -232,6 +235,23 @@ public class CompactorMR {
if(conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST) && conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION)) {
throw new RuntimeException(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION.name() + "=true");
}
+
+ /**
+ * Run major compaction in a HiveQL query (compaction for MM tables handled in runMmCompaction method).
+ * TODO:
+ * 1. A good way to run minor compaction (currently disabled when this config is enabled)
+ * 2. More generic approach to collecting files in the same logical bucket to compact within the same task
+ * (currently we're using Tez split grouping).
+ */
+ if (!AcidUtils.isInsertOnlyTable(t.getParameters()) && HiveConf.getBoolVar(conf,
+ ConfVars.COMPACTOR_CRUD_QUERY_BASED)) {
+ if (ci.isMajorCompaction()) {
+ runCrudCompaction(conf, t, p, sd, writeIds, ci);
+ return;
+ } else {
+ throw new RuntimeException("Query based compaction is not currently supported for minor compactions");
+ }
+ }
if (AcidUtils.isInsertOnlyTable(t.getParameters())) {
if (HiveConf.getBoolVar(conf, ConfVars.HIVE_COMPACTOR_COMPACT_MM)) {
@@ -318,6 +338,85 @@ public class CompactorMR {
su.gatherStats();
}
+ /**
+ *
+ * @param conf
+ * @param t
+ * @param p
+ * @param sd (this is the resolved StorageDescriptor, i.e. resolved to table or partition)
+ * @param writeIds (valid write ids used to filter rows while they're being read for compaction)
+ * @param ci
+ * @throws IOException
+ */
+ private void runCrudCompaction(HiveConf hiveConf, Table t, Partition p, StorageDescriptor sd, ValidWriteIdList writeIds,
+ CompactionInfo ci) throws IOException {
+ AcidUtils.setAcidOperationalProperties(hiveConf, true, AcidUtils.getAcidOperationalProperties(t.getParameters()));
+ AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(sd.getLocation()), hiveConf, writeIds, Ref.from(false), false,
+ t.getParameters());
+ int deltaCount = dir.getCurrentDirectories().size();
+ int origCount = dir.getOriginalFiles().size();
+ if ((deltaCount + (dir.getBaseDirectory() == null ? 0 : 1)) + origCount <= 1) {
+ LOG.debug("Not compacting {}; current base is {} and there are {} deltas and {} originals", sd.getLocation(), dir
+ .getBaseDirectory(), deltaCount, origCount);
+ return;
+ }
+ String user = UserGroupInformation.getCurrentUser().getShortUserName();
+ SessionState sessionState = DriverUtils.setUpSessionState(hiveConf, user, false);
+ // Set up the session for driver.
+ HiveConf conf = new HiveConf(hiveConf);
+ conf.set(ConfVars.HIVE_QUOTEDID_SUPPORT.varname, "column");
+ /**
+ * For now, we will group splits on tez so that we end up with all bucket files,
+ * with same bucket number in one map task.
+ */
+ conf.set(ConfVars.SPLIT_GROUPING_MODE.varname, "compactor");
+ String tmpPrefix = t.getDbName() + "_tmp_compactor_" + t.getTableName() + "_";
+ String tmpTableName = tmpPrefix + System.currentTimeMillis();
+ long compactorTxnId = CompactorMap.getCompactorTxnId(conf);
+ try {
+ // Create a temporary table under the temp location --> db/tbl/ptn/_tmp_1234/db.tmp_compactor_tbl_1234
+ String query = buildCrudMajorCompactionCreateTableQuery(tmpTableName, t, sd);
+ LOG.info("Running major compaction query into temp table with create definition: {}", query);
+ try {
+ DriverUtils.runOnDriver(conf, user, sessionState, query);
+ } catch (Exception ex) {
+ Throwable cause = ex;
+ while (cause != null && !(cause instanceof AlreadyExistsException)) {
+ cause = cause.getCause();
+ }
+ if (cause == null) {
+ throw new IOException(ex);
+ }
+ }
+ query = buildCrudMajorCompactionQuery(conf, t, p, tmpTableName);
+ LOG.info("Running major compaction via query: {}", query);
+ /**
+ * This will create bucket files like:
+ * db/db_tmp_compactor_tbl_1234/00000_0
+ * db/db_tmp_compactor_tbl_1234/00001_0
+ */
+ DriverUtils.runOnDriver(conf, user, sessionState, query, writeIds, compactorTxnId);
+ /**
+ * This achieves a final layout like (wid is the highest valid write id for this major compaction):
+ * db/tbl/ptn/base_wid/bucket_00000
+ * db/tbl/ptn/base_wid/bucket_00001
+ */
+ org.apache.hadoop.hive.ql.metadata.Table tempTable = Hive.get().getTable(tmpTableName);
+ String tmpLocation = tempTable.getSd().getLocation();
+ commitCrudMajorCompaction(t, tmpLocation, tmpTableName, sd.getLocation(), conf, writeIds, compactorTxnId);
+ } catch (HiveException e) {
+ LOG.error("Error doing query based major compaction", e);
+ throw new IOException(e);
+ } finally {
+ try {
+ DriverUtils.runOnDriver(conf, user, sessionState, "drop table if exists " + tmpTableName);
+ } catch (HiveException e) {
+ LOG.error("Unable to delete drop temp table {} which was created for running major compaction", tmpTableName);
+ LOG.error(ExceptionUtils.getStackTrace(e));
+ }
+ }
+ }
+
private void runMmCompaction(HiveConf conf, Table t, Partition p,
StorageDescriptor sd, ValidWriteIdList writeIds, CompactionInfo ci) throws IOException {
LOG.debug("Going to delete directories for aborted transactions for MM table "
@@ -376,8 +475,7 @@ public class CompactorMR {
}
}
}
-
- String query = buildMmCompactionQuery(driverConf, t, p, tmpTableName);
+ String query = buildMmCompactionQuery(conf, t, p, tmpTableName);
LOG.info("Compacting a MM table via " + query);
long compactorTxnId = CompactorMap.getCompactorTxnId(conf);
DriverUtils.runOnDriver(driverConf, user, sessionState, query, writeIds, compactorTxnId);
@@ -393,6 +491,103 @@ public class CompactorMR {
private String generateTmpPath(StorageDescriptor sd) {
return sd.getLocation() + "/" + TMPDIR + "_" + UUID.randomUUID().toString();
}
+
+ /**
+ * Note on ordering of rows in the temp table:
+ * We need each final bucket file soreted by original write id (ascending), bucket (ascending) and row id (ascending).
+ * (current write id will be the same as original write id).
+ * We will be achieving the ordering via a custom split grouper for compactor.
+ * See {@link org.apache.hadoop.hive.conf.HiveConf.ConfVars.SPLIT_GROUPING_MODE} for the config description.
+ * See {@link org.apache.hadoop.hive.ql.exec.tez.SplitGrouper#getCompactorGroups} for details on the mechanism.
+ */
+ private String buildCrudMajorCompactionCreateTableQuery(String fullName, Table t, StorageDescriptor sd) {
+ StringBuilder query = new StringBuilder("create temporary table ").append(fullName).append(" (");
+ // Acid virtual columns
+ query.append(
+ "`operation` int, `originalTransaction` bigint, `bucket` int, `rowId` bigint, `currentTransaction` bigint, `row` struct<");
+ List<FieldSchema> cols = t.getSd().getCols();
+ boolean isFirst = true;
+ // Actual columns
+ for (FieldSchema col : cols) {
+ if (!isFirst) {
+ query.append(", ");
+ }
+ isFirst = false;
+ query.append("`").append(col.getName()).append("` ").append(":").append(col.getType());
+ }
+ query.append(">)");
+ query.append(" stored as orc");
+ query.append(" tblproperties ('transactional'='false')");
+ return query.toString();
+ }
+
+ private String buildCrudMajorCompactionQuery(HiveConf conf, Table t, Partition p, String tmpName) {
+ String fullName = t.getDbName() + "." + t.getTableName();
+ String query = "insert into table " + tmpName + " ";
+ String filter = "";
+ if (p != null) {
+ filter = filter + " where ";
+ List<String> vals = p.getValues();
+ List<FieldSchema> keys = t.getPartitionKeys();
+ assert keys.size() == vals.size();
+ for (int i = 0; i < keys.size(); ++i) {
+ filter += (i == 0 ? "`" : " and `") + (keys.get(i).getName() + "`='" + vals.get(i) + "'");
+ }
+ }
+ query += " select validate_acid_sort_order(ROW__ID.writeId, ROW__ID.bucketId, ROW__ID.rowId), ROW__ID.writeId, "
+ + "ROW__ID.bucketId, ROW__ID.rowId, ROW__ID.writeId, NAMED_STRUCT(";
+ List<FieldSchema> cols = t.getSd().getCols();
+ for (int i = 0; i < cols.size(); ++i) {
+ query += (i == 0 ? "'" : ", '") + cols.get(i).getName() + "', " + cols.get(i).getName();
+ }
+ query += ") from " + fullName + filter;
+ return query;
+ }
+
+ /**
+ * Move and rename bucket files from the temp table (tmpTableName), to the new base path under the source table/ptn.
+ * Since the temp table is a non-transactional table, it has file names in the "original" format.
+ * Also, due to split grouping in {@link org.apache.hadoop.hive.ql.exec.tez.SplitGrouper#getCompactorGroups},
+ * we will end up with one file per bucket.
+ */
+ private void commitCrudMajorCompaction(Table t, String from, String tmpTableName, String to, Configuration conf,
+ ValidWriteIdList actualWriteIds, long compactorTxnId) throws IOException {
+ Path fromPath = new Path(from);
+ Path toPath = new Path(to);
+ Path tmpTablePath = new Path(fromPath, tmpTableName);
+ FileSystem fs = fromPath.getFileSystem(conf);
+ // Assume the high watermark can be used as maximum transaction ID.
+ long maxTxn = actualWriteIds.getHighWatermark();
+ // Get a base_wid path which will be the new compacted base
+ AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf).writingBase(true).isCompressed(false)
+ .maximumWriteId(maxTxn).bucket(0).statementId(-1);
+ Path newBaseDir = AcidUtils.createFilename(toPath, options).getParent();
+ if (!fs.exists(fromPath)) {
+ LOG.info("{} not found. Assuming 0 splits. Creating {}", from, newBaseDir);
+ fs.mkdirs(newBaseDir);
+ return;
+ }
+ LOG.info("Moving contents of {} to {}", tmpTablePath, to);
+ /**
+ * Currently mapping file with name 0000_0 to bucket_00000, 0000_1 to bucket_00001 and so on
+ * TODO/ToThink:
+ * Q. Can file with name 0000_0 under temp table be deterministically renamed to bucket_00000 in the destination?
+ */
+ // List<String> buckCols = t.getSd().getBucketCols();
+ FileStatus[] children = fs.listStatus(fromPath);
+ for (FileStatus filestatus : children) {
+ String originalFileName = filestatus.getPath().getName();
+ // This if() may not be required I think...
+ if (AcidUtils.ORIGINAL_PATTERN.matcher(originalFileName).matches()) {
+ int bucketId = AcidUtils.parseBucketId(filestatus.getPath());
+ options = new AcidOutputFormat.Options(conf).writingBase(true).isCompressed(false).maximumWriteId(maxTxn)
+ .bucket(bucketId).statementId(-1).visibilityTxnId(compactorTxnId);
+ Path finalBucketFile = AcidUtils.createFilename(toPath, options);
+ fs.rename(filestatus.getPath(), finalBucketFile);
+ }
+ }
+ fs.delete(fromPath, true);
+ }
private String buildMmCompactionCtQuery(
String fullName, Table t, StorageDescriptor sd, String location) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index a0df82c..a37c983 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -96,6 +96,12 @@ public class Initiator extends MetaStoreCompactorThread {
LOG.debug("Found " + potentials.size() + " potential compactions, " +
"checking to see if we should compact any of them");
for (CompactionInfo ci : potentials) {
+ // Disable minor compaction for query based compactor
+ if (!ci.isMajorCompaction() && HiveConf.getBoolVar(conf, HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED)) {
+ LOG.debug("Not compacting: " + ci.getFullPartitionName()
+ + ", as query based compaction currently does not " + "support minor compactions.");
+ continue;
+ }
LOG.info("Checking to see if we should compact " + ci.getFullPartitionName());
try {
Table t = resolveTable(ci);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFValidateAcidSortOrder.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFValidateAcidSortOrder.java
new file mode 100644
index 0000000..757a366
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFValidateAcidSortOrder.java
@@ -0,0 +1,100 @@
+package org.apache.hadoop.hive.ql.udf.generic;
+
+import org.apache.hadoop.hive.ql.exec.Description;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
+import org.apache.hadoop.hive.ql.io.BucketCodec;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * GenericUDFValidateAcidSortOrder.
+ */
+@Description(name = "validate_acid_sort_order(ROW__ID.writeId, ROW__ID.bucketId, ROW__ID.rowId)",
+ value = "_FUNC_(writeId, bucketId, rowId) - returns 0 if the current row is in the right acid sort order "
+ + "compared to the previous row")
+public class GenericUDFValidateAcidSortOrder extends GenericUDF {
+ public static final String UDF_NAME = "validate_acid_sort_order";
+ private transient PrimitiveCategory[] inputTypes = new PrimitiveCategory[3];
+ private transient Converter[] converters = new Converter[3];
+ private final LongWritable output = new LongWritable();
+ // See {@link org.apache.hadoop.hive.ql.exec.tez.SplitGrouper#getCompactorGroups}
+ // Each writer is handling only one logical bucket (i.e. all files with same bucket number end up in one writer)
+ private int bucketNumForWriter = -1;
+ private WriteIdRowId previousWriteIdRowId = null;
+
+ @Override
+ public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
+ checkArgsSize(arguments, 3, 3);
+ checkArgPrimitive(arguments, 0);
+ checkArgPrimitive(arguments, 1);
+ checkArgPrimitive(arguments, 2);
+ obtainLongConverter(arguments, 0, inputTypes, converters);
+ obtainIntConverter(arguments, 1, inputTypes, converters);
+ obtainLongConverter(arguments, 2, inputTypes, converters);
+ ObjectInspector outputOI = PrimitiveObjectInspectorFactory.writableLongObjectInspector;
+ return outputOI;
+ }
+
+ @Override
+ public Object evaluate(DeferredObject[] arguments) throws HiveException {
+ long writeId = getLongValue(arguments, 0, converters);
+ int bucketProperty = getIntValue(arguments, 1, converters);
+ int bucketNum = BucketCodec.determineVersion(bucketProperty).decodeWriterId(bucketProperty);
+ long rowId = getLongValue(arguments, 2, converters);
+ if (bucketNumForWriter < 0) {
+ bucketNumForWriter = bucketNum;
+ } else {
+ if (bucketNumForWriter != bucketNum) {
+ throw new HiveException("One writer is supposed to handle only one bucket. We saw these 2 different buckets: "
+ + bucketNumForWriter + " and " + bucketNum);
+ }
+ }
+ WriteIdRowId current = new WriteIdRowId(bucketProperty, writeId, rowId);
+ if (previousWriteIdRowId != null) {
+ // Verify sort order for this new row
+ if (current.compareTo(previousWriteIdRowId) <= 0) {
+ throw new HiveException("Wrong sort order of Acid rows detected for the rows: " + previousWriteIdRowId + " and "
+ + current);
+ }
+ }
+ previousWriteIdRowId = current;
+ output.set(0);
+ return output;
+ }
+
+ @Override
+ public String getDisplayString(String[] children) {
+ return getStandardDisplayString("validate_acid_sort_order", children);
+ }
+
+ final static class WriteIdRowId implements Comparable<WriteIdRowId> {
+ final int bucketProperty;
+ final long writeId;
+ final long rowId;
+
+ WriteIdRowId(int bucketProperty, long writeId, long rowId) {
+ this.bucketProperty = bucketProperty;
+ this.writeId = writeId;
+ this.rowId = rowId;
+ }
+
+ @Override
+ public int compareTo(WriteIdRowId other) {
+ if (this.bucketProperty != other.bucketProperty) {
+ return this.bucketProperty < other.bucketProperty ? -1 : 1;
+ }
+ if (this.writeId != other.writeId) {
+ return this.writeId < other.writeId ? -1 : 1;
+ }
+ if (this.rowId != other.rowId) {
+ return this.rowId < other.rowId ? -1 : 1;
+ }
+ return 0;
+ }
+ }
+}
\ No newline at end of file
diff --git a/ql/src/test/results/clientpositive/show_functions.q.out b/ql/src/test/results/clientpositive/show_functions.q.out
index c9716e9..1d2cb1c 100644
--- a/ql/src/test/results/clientpositive/show_functions.q.out
+++ b/ql/src/test/results/clientpositive/show_functions.q.out
@@ -279,6 +279,7 @@ unhex
unix_timestamp
upper
uuid
+validate_acid_sort_order
var_pop
var_samp
variance
@@ -401,6 +402,7 @@ date_format
date_sub
datediff
to_date
+validate_acid_sort_order
PREHOOK: query: SHOW FUNCTIONS '***'
PREHOOK: type: SHOWFUNCTIONS
POSTHOOK: query: SHOW FUNCTIONS '***'