You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by vg...@apache.org on 2019/02/05 01:49:07 UTC

[hive] branch master updated: HIVE-20699: Query based compactor for full CRUD Acid tables (Vaibhav Gumashta reviewed by Eugene Koifman)

This is an automated email from the ASF dual-hosted git repository.

vgumashta pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 313e49f  HIVE-20699: Query based compactor for full CRUD Acid tables (Vaibhav Gumashta reviewed by Eugene Koifman)
313e49f is described below

commit 313e49f6b706555a16288fab50c79b7aedf7ba77
Author: Vaibhav Gumashta <vg...@apache.org>
AuthorDate: Mon Feb 4 17:42:02 2019 -0800

    HIVE-20699: Query based compactor for full CRUD Acid tables (Vaibhav Gumashta reviewed by Eugene Koifman)
---
 .../java/org/apache/hadoop/hive/conf/HiveConf.java |   7 +
 .../org/apache/hadoop/hive/ql/TestAcidOnTez.java   |  54 ++-
 .../ql/txn/compactor/TestCrudCompactorOnTez.java   | 429 +++++++++++++++++++++
 .../hadoop/hive/ql/exec/FunctionRegistry.java      |   1 +
 .../hive/ql/exec/tez/HiveSplitGenerator.java       |   2 +-
 .../hadoop/hive/ql/exec/tez/SplitGrouper.java      | 164 +++++++-
 .../org/apache/hadoop/hive/ql/io/AcidUtils.java    |   8 +-
 .../hadoop/hive/ql/io/orc/OrcRawRecordMerger.java  |   3 -
 .../hadoop/hive/ql/io/orc/OrcRecordUpdater.java    |   2 +-
 .../org/apache/hadoop/hive/ql/io/orc/OrcSplit.java |  38 +-
 .../hadoop/hive/ql/parse/DDLSemanticAnalyzer.java  |   6 +
 .../hadoop/hive/ql/txn/compactor/CompactorMR.java  | 199 +++++++++-
 .../hadoop/hive/ql/txn/compactor/Initiator.java    |   6 +
 .../generic/GenericUDFValidateAcidSortOrder.java   | 100 +++++
 .../results/clientpositive/show_functions.q.out    |   2 +
 15 files changed, 987 insertions(+), 34 deletions(-)

diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 414070e..a3b03ca 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2705,6 +2705,13 @@ public class HiveConf extends Configuration {
 
     HIVE_COMPACTOR_COMPACT_MM("hive.compactor.compact.insert.only", true,
         "Whether the compactor should compact insert-only tables. A safety switch."),
+    COMPACTOR_CRUD_QUERY_BASED("hive.compactor.crud.query.based", false,
+        "Means Major compaction on full CRUD tables is done as a query, "
+        + "and minor compaction will be disabled."),
+    SPLIT_GROUPING_MODE("hive.split.grouping.mode", "query", new StringSet("query", "compactor"), 
+        "This is set to compactor from within the query based compactor. This enables the Tez SplitGrouper "
+        + "to group splits based on their bucket number, so that all rows from different bucket files "
+        + " for the same bucket number can end up in the same bucket file after the compaction."),
     /**
      * @deprecated Use MetastoreConf.COMPACTOR_HISTORY_RETENTION_SUCCEEDED
      */
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java
index d6a4191..142c2d2 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java
@@ -121,10 +121,15 @@ public class TestAcidOnTez {
     SessionState.start(new SessionState(hiveConf));
     d = DriverFactory.newDriver(hiveConf);
     dropTables();
-    runStatementOnDriver("create table " + Table.ACIDTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc " + getTblProperties());
-    runStatementOnDriver("create table " + Table.ACIDTBLPART + "(a int, b int) partitioned by (p string) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc " + getTblProperties());
-    runStatementOnDriver("create table " + Table.NONACIDORCTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc ");
-    runStatementOnDriver("create table " + Table.NONACIDPART + "(a int, b int) partitioned by (p string) stored as orc ");
+    runStatementOnDriver("create table " + Table.ACIDTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT
+        + " buckets stored as orc " + getTblProperties());
+    runStatementOnDriver("create table " + Table.ACIDTBLPART
+        + "(a int, b int) partitioned by (p string) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc "
+        + getTblProperties());
+    runStatementOnDriver("create table " + Table.NONACIDORCTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT
+        + " buckets stored as orc ");
+    runStatementOnDriver("create table " + Table.NONACIDPART
+        + "(a int, b int) partitioned by (p string) stored as orc ");
     runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(1,2)");
     runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(3,4)");
     runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(5,6)");
@@ -831,6 +836,42 @@ ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree  ~/dev/hiverwgit/itests/h
     // No transactions - just the header row
     assertEquals(1, rows.size());
   }
+ 
+  /**
+   * HIVE-20699
+   *
+   * see TestTxnCommands3.testCompactor
+   */
+  @Test
+  public void testCrudMajorCompactionSplitGrouper() throws Exception {
+    String tblName = "test_split_grouper";
+    // make a clone of existing hive conf
+    HiveConf confForTez = new HiveConf(hiveConf);
+    setupTez(confForTez); // one-time setup to make query able to run with Tez
+    HiveConf.setVar(confForTez, HiveConf.ConfVars.HIVEFETCHTASKCONVERSION, "none");
+    runStatementOnDriver("create transactional table " + tblName + " (a int, b int) clustered by (a) into 2 buckets "
+        + "stored as ORC TBLPROPERTIES('bucketing_version'='2', 'transactional'='true',"
+        + " 'transactional_properties'='default')", confForTez);
+    runStatementOnDriver("insert into " + tblName + " values(1,2),(1,3),(1,4),(2,2),(2,3),(2,4)", confForTez);
+    runStatementOnDriver("insert into " + tblName + " values(3,2),(3,3),(3,4),(4,2),(4,3),(4,4)", confForTez);
+    runStatementOnDriver("delete from " + tblName + " where b = 2");
+    List<String> expectedRs = new ArrayList<>();
+    expectedRs.add("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t2\t4");
+    expectedRs.add("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t2\t3");
+    expectedRs.add("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t3\t4");
+    expectedRs.add("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\t3\t3");
+    expectedRs.add("{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t1\t4");
+    expectedRs.add("{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t1\t3");
+    expectedRs.add("{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t4\t4");
+    expectedRs.add("{\"writeid\":2,\"bucketid\":536936448,\"rowid\":1}\t4\t3");
+    List<String> rs =
+        runStatementOnDriver("select ROW__ID, * from " + tblName + " order by ROW__ID.bucketid, ROW__ID", confForTez);
+    HiveConf.setVar(confForTez, HiveConf.ConfVars.SPLIT_GROUPING_MODE, "compactor");
+    // No order by needed: this should use the compactor split grouping to return the rows in correct order
+    List<String> rsCompact = runStatementOnDriver("select ROW__ID, * from  " + tblName, confForTez);
+    Assert.assertEquals("normal read", expectedRs, rs);
+    Assert.assertEquals("compacted read", rs, rsCompact);
+  }
 
   private void restartSessionAndDriver(HiveConf conf) throws Exception {
     SessionState ss = SessionState.get();
@@ -910,11 +951,16 @@ ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree  ~/dev/hiverwgit/itests/h
   private void setupTez(HiveConf conf) {
     conf.setVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "tez");
     conf.setVar(HiveConf.ConfVars.HIVE_USER_INSTALL_DIR, TEST_DATA_DIR);
+    conf.set("tez.am.resource.memory.mb", "128");
+    conf.set("tez.am.dag.scheduler.class", "org.apache.tez.dag.app.dag.impl.DAGSchedulerNaturalOrderControlled");
     conf.setBoolean("tez.local.mode", true);
     conf.set("fs.defaultFS", "file:///");
     conf.setBoolean("tez.runtime.optimize.local.fetch", true);
     conf.set("tez.staging-dir", TEST_DATA_DIR);
     conf.setBoolean("tez.ignore.lib.uris", true);
+    conf.set("hive.tez.container.size", "128");
+    conf.setBoolean("hive.merge.tezfiles", false); 
+    conf.setBoolean("hive.in.tez.test", true);
   }
 
   private void setupMapJoin(HiveConf conf) {
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java
new file mode 100644
index 0000000..d59cfe5
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.CompactionRequest;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.ql.DriverFactory;
+import org.apache.hadoop.hive.ql.IDriver;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.HiveInputFormat;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.orc.OrcConf;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings("deprecation")
+// TODO: Add tests for bucketing_version=1 when HIVE-21167 is fixed
+public class TestCrudCompactorOnTez {
+  private static final AtomicInteger salt = new AtomicInteger(new Random().nextInt());
+  private static final Logger LOG = LoggerFactory.getLogger(TestCrudCompactorOnTez.class);
+  private static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") + File.separator
+      + TestCrudCompactorOnTez.class.getCanonicalName() + "-" + System.currentTimeMillis() + "_" + salt
+          .getAndIncrement()).getPath().replaceAll("\\\\", "/");
+  private static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse";
+  private HiveConf conf;
+  IMetaStoreClient msClient;
+  private IDriver driver;
+
+  @Before
+  // Note: we create a new conf and driver object before every test
+  public void setup() throws Exception {
+    File f = new File(TEST_WAREHOUSE_DIR);
+    if (f.exists()) {
+      FileUtil.fullyDelete(f);
+    }
+    if (!(new File(TEST_WAREHOUSE_DIR).mkdirs())) {
+      throw new RuntimeException("Could not create " + TEST_WAREHOUSE_DIR);
+    }
+    HiveConf hiveConf = new HiveConf(this.getClass());
+    hiveConf.setVar(HiveConf.ConfVars.PREEXECHOOKS, "");
+    hiveConf.setVar(HiveConf.ConfVars.POSTEXECHOOKS, "");
+    hiveConf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE, TEST_WAREHOUSE_DIR);
+    hiveConf.setVar(HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName());
+    hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict");
+    hiveConf.setVar(HiveConf.ConfVars.HIVEFETCHTASKCONVERSION, "none");
+    TxnDbUtil.setConfValues(hiveConf);
+    TxnDbUtil.cleanDb(hiveConf);
+    TxnDbUtil.prepDb(hiveConf);
+    conf = hiveConf;
+    // Use tez as execution engine for this test class
+    setupTez(conf);
+    msClient = new HiveMetaStoreClient(conf);
+    driver = DriverFactory.newDriver(conf);
+    SessionState.start(new CliSessionState(conf));
+  }
+
+  private void setupTez(HiveConf conf) {
+    conf.setVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "tez");
+    conf.setVar(HiveConf.ConfVars.HIVE_USER_INSTALL_DIR, TEST_DATA_DIR);
+    conf.set("tez.am.resource.memory.mb", "128");
+    conf.set("tez.am.dag.scheduler.class", "org.apache.tez.dag.app.dag.impl.DAGSchedulerNaturalOrderControlled");
+    conf.setBoolean("tez.local.mode", true);
+    conf.set("fs.defaultFS", "file:///");
+    conf.setBoolean("tez.runtime.optimize.local.fetch", true);
+    conf.set("tez.staging-dir", TEST_DATA_DIR);
+    conf.setBoolean("tez.ignore.lib.uris", true);
+    conf.set("hive.tez.container.size", "128");
+    conf.setBoolean("hive.merge.tezfiles", false); 
+    conf.setBoolean("hive.in.tez.test", true);
+  }
+
+  @After
+  public void tearDown() {
+    if (msClient != null) {
+      msClient.close();
+    }
+    if (driver != null) {
+      driver.close();
+    }
+    conf = null;
+  }
+
+  @Test
+  public void testMajorCompaction() throws Exception {
+    String dbName = "default";
+    String tblName = "testMajorCompaction";
+    executeStatementOnDriver("drop table if exists " + tblName, driver);
+    executeStatementOnDriver("create transactional table " + tblName + " (a int, b int) clustered by (a) into 2 buckets"
+        + " stored as ORC TBLPROPERTIES('bucketing_version'='2', 'transactional'='true',"
+        + " 'transactional_properties'='default')", driver);
+    executeStatementOnDriver("insert into " + tblName + " values(1,2),(1,3),(1,4),(2,2),(2,3),(2,4)", driver);
+    executeStatementOnDriver("insert into " + tblName + " values(3,2),(3,3),(3,4),(4,2),(4,3),(4,4)", driver);
+    executeStatementOnDriver("delete from " + tblName + " where b = 2", driver);
+    // Find the location of the table
+    IMetaStoreClient msClient = new HiveMetaStoreClient(conf);
+    Table table = msClient.getTable(dbName, tblName);
+    FileSystem fs = FileSystem.get(conf);
+    // Verify deltas (delta_0000001_0000001_0000, delta_0000002_0000002_0000) are present
+    FileStatus[] filestatus = fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.deltaFileFilter);
+    String[] deltas = new String[filestatus.length];
+    for (int i = 0; i < deltas.length; i++) {
+      deltas[i] = filestatus[i].getPath().getName();
+    }
+    Arrays.sort(deltas);
+    String[] expectedDeltas = new String[] { "delta_0000001_0000001_0000", "delta_0000002_0000002_0000" };
+    if (!Arrays.deepEquals(expectedDeltas, deltas)) {
+      Assert.fail("Expected: " + Arrays.toString(expectedDeltas) + ", found: " + Arrays.toString(deltas));
+    }
+    // Verify that delete delta (delete_delta_0000003_0000003_0000) is present
+    FileStatus[] deleteDeltaStat = fs.listStatus(new Path(table.getSd().getLocation()),
+        AcidUtils.deleteEventDeltaDirFilter);
+    String[] deleteDeltas = new String[deleteDeltaStat.length];
+    for (int i = 0; i < deleteDeltas.length; i++) {
+      deleteDeltas[i] = deleteDeltaStat[i].getPath().getName();
+    }
+    Arrays.sort(deleteDeltas);
+    String[] expectedDeleteDeltas = new String[] { "delete_delta_0000003_0000003_0000" };
+    if (!Arrays.deepEquals(expectedDeleteDeltas, deleteDeltas)) {
+      Assert.fail("Expected: " + Arrays.toString(expectedDeleteDeltas) + ", found: " + Arrays.toString(deleteDeltas));
+    }
+    List<String> expectedRsBucket0 = new ArrayList<>();
+    expectedRsBucket0.add("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t2\t4");
+    expectedRsBucket0.add("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t2\t3");
+    expectedRsBucket0.add("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t3\t4");
+    expectedRsBucket0.add("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\t3\t3");
+    List<String> expectedRsBucket1 = new ArrayList<>();
+    expectedRsBucket1.add("{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t1\t4");
+    expectedRsBucket1.add("{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t1\t3");
+    expectedRsBucket1.add("{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t4\t4");
+    expectedRsBucket1.add("{\"writeid\":2,\"bucketid\":536936448,\"rowid\":1}\t4\t3");
+    // Bucket 0
+    List<String> rsBucket0 = executeStatementOnDriverAndReturnResults("select ROW__ID, * from " + tblName
+        + " where ROW__ID.bucketid = 536870912 order by ROW__ID", driver);
+    Assert.assertEquals("normal read", expectedRsBucket0, rsBucket0);
+    // Bucket 1
+    List<String> rsBucket1 = executeStatementOnDriverAndReturnResults("select ROW__ID, * from " + tblName
+        + " where ROW__ID.bucketid = 536936448 order by ROW__ID", driver);
+    Assert.assertEquals("normal read", expectedRsBucket1, rsBucket1);
+    // Run major compaction and cleaner
+    runCompaction(dbName, tblName, CompactionType.MAJOR);
+    runCleaner(conf);
+    // Should contain only one base directory now
+    filestatus = fs.listStatus(new Path(table.getSd().getLocation()));
+    String[] bases = new String[filestatus.length];
+    for (int i = 0; i < bases.length; i++) {
+      bases[i] = filestatus[i].getPath().getName();
+    }
+    Arrays.sort(bases);
+    String[] expectedBases = new String[] { "base_0000003_v0000008" };
+    if (!Arrays.deepEquals(expectedBases, bases)) {
+      Assert.fail("Expected: " + Arrays.toString(expectedBases) + ", found: " + Arrays.toString(bases));
+    }
+    // Bucket 0
+    List<String> rsCompactBucket0 = executeStatementOnDriverAndReturnResults("select ROW__ID, * from  " + tblName
+        + " where ROW__ID.bucketid = 536870912", driver);
+    Assert.assertEquals("compacted read", rsBucket0, rsCompactBucket0);
+    // Bucket 1
+    List<String> rsCompactBucket1 = executeStatementOnDriverAndReturnResults("select ROW__ID, * from  " + tblName
+        + " where ROW__ID.bucketid = 536936448", driver);
+    Assert.assertEquals("compacted read", rsBucket1, rsCompactBucket1);
+    // Clean up
+    executeStatementOnDriver("drop table " + tblName, driver);
+  }
+
+  @Test
+  public void testMinorCompactionDisabled() throws Exception {
+    String dbName = "default";
+    String tblName = "testMinorCompactionDisabled";
+    executeStatementOnDriver("drop table if exists " + tblName, driver);
+    executeStatementOnDriver("create transactional table " + tblName + " (a int, b int) clustered by (a) into 2 buckets"
+        + " stored as ORC TBLPROPERTIES('bucketing_version'='2', 'transactional'='true',"
+        + " 'transactional_properties'='default')", driver);
+    executeStatementOnDriver("insert into " + tblName + " values(1,2),(1,3),(1,4),(2,2),(2,3),(2,4)", driver);
+    executeStatementOnDriver("insert into " + tblName + " values(3,2),(3,3),(3,4),(4,2),(4,3),(4,4)", driver);
+    executeStatementOnDriver("delete from " + tblName + " where b = 2", driver);
+    // Find the location of the table
+    IMetaStoreClient msClient = new HiveMetaStoreClient(conf);
+    Table table = msClient.getTable(dbName, tblName);
+    FileSystem fs = FileSystem.get(conf);
+    // Verify deltas (delta_0000001_0000001_0000, delta_0000002_0000002_0000) are present
+    FileStatus[] filestatus = fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.deltaFileFilter);
+    String[] deltas = new String[filestatus.length];
+    for (int i = 0; i < deltas.length; i++) {
+      deltas[i] = filestatus[i].getPath().getName();
+    }
+    Arrays.sort(deltas);
+    String[] expectedDeltas = new String[] { "delta_0000001_0000001_0000", "delta_0000002_0000002_0000" };
+    if (!Arrays.deepEquals(expectedDeltas, deltas)) {
+      Assert.fail("Expected: " + Arrays.toString(expectedDeltas) + ", found: " + Arrays.toString(deltas));
+    }
+    // Verify that delete delta (delete_delta_0000003_0000003_0000) is present
+    FileStatus[] deleteDeltaStat = fs.listStatus(new Path(table.getSd().getLocation()),
+        AcidUtils.deleteEventDeltaDirFilter);
+    String[] deleteDeltas = new String[deleteDeltaStat.length];
+    for (int i = 0; i < deleteDeltas.length; i++) {
+      deleteDeltas[i] = deleteDeltaStat[i].getPath().getName();
+    }
+    Arrays.sort(deleteDeltas);
+    String[] expectedDeleteDeltas = new String[] { "delete_delta_0000003_0000003_0000" };
+    if (!Arrays.deepEquals(expectedDeleteDeltas, deleteDeltas)) {
+      Assert.fail("Expected: " + Arrays.toString(expectedDeleteDeltas) + ", found: " + Arrays.toString(deleteDeltas));
+    }
+    // Initiate a compaction, make sure it's not queued
+    runInitiator(conf);
+    TxnStore txnHandler = TxnUtils.getTxnStore(conf);
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+    Assert.assertEquals(0, compacts.size());
+    // Clean up
+    executeStatementOnDriver("drop table " + tblName, driver);
+  }
+
+  @Test
+  public void testCompactionWithSchemaEvolutionAndBuckets() throws Exception {
+    String dbName = "default";
+    String tblName = "testCompactionWithSchemaEvolutionAndBuckets";
+    executeStatementOnDriver("drop table if exists " + tblName, driver);
+    executeStatementOnDriver("create transactional table " + tblName
+        + " (a int, b int) partitioned by(ds string) clustered by (a) into 2 buckets"
+        + " stored as ORC TBLPROPERTIES('bucketing_version'='2', 'transactional'='true',"
+        + " 'transactional_properties'='default')", driver);
+    // Insert some data
+    executeStatementOnDriver("insert into " + tblName
+        + " partition (ds) values(1,2,'today'),(1,3,'today'),(1,4,'yesterday'),(2,2,'yesterday'),(2,3,'today'),(2,4,'today')",
+        driver);
+    // Add a new column
+    executeStatementOnDriver("alter table " + tblName + " add columns(c int)", driver);
+    // Insert more data
+    executeStatementOnDriver("insert into " + tblName
+        + " partition (ds) values(3,2,1000,'yesterday'),(3,3,1001,'today'),(3,4,1002,'yesterday'),(4,2,1003,'today'),"
+        + "(4,3,1004,'yesterday'),(4,4,1005,'today')", driver);
+    executeStatementOnDriver("delete from " + tblName + " where b = 2", driver);
+    //  Run major compaction and cleaner
+    runCompaction(dbName, tblName, CompactionType.MAJOR, "ds=yesterday", "ds=today");
+    runCleaner(conf);
+    List<String> expectedRsBucket0PtnToday = new ArrayList<>();
+    expectedRsBucket0PtnToday.add("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t2\t3\tNULL\ttoday");
+    expectedRsBucket0PtnToday.add("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t2\t4\tNULL\ttoday");
+    expectedRsBucket0PtnToday.add("{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t3\t3\t1001\ttoday");
+    List<String> expectedRsBucket1PtnToday = new ArrayList<>();
+    expectedRsBucket1PtnToday.add("{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t1\t3\tNULL\ttoday");
+    expectedRsBucket1PtnToday.add("{\"writeid\":3,\"bucketid\":536936448,\"rowid\":1}\t4\t4\t1005\ttoday");
+    // Bucket 0, partition 'today'
+    List<String> rsCompactBucket0PtnToday = executeStatementOnDriverAndReturnResults("select ROW__ID, * from  "
+        + tblName + " where ROW__ID.bucketid = 536870912 and ds='today'", driver);
+    Assert.assertEquals("compacted read", expectedRsBucket0PtnToday, rsCompactBucket0PtnToday);
+    // Bucket 1, partition 'today'
+    List<String> rsCompactBucket1PtnToday = executeStatementOnDriverAndReturnResults("select ROW__ID, * from  "
+        + tblName + " where ROW__ID.bucketid = 536936448 and ds='today'", driver);
+    Assert.assertEquals("compacted read", expectedRsBucket1PtnToday, rsCompactBucket1PtnToday);
+    // Clean up
+    executeStatementOnDriver("drop table " + tblName, driver);
+  }
+
+  @Test
+  public void testCompactionWithSchemaEvolutionNoBucketsMultipleReducers() throws Exception {
+    HiveConf hiveConf = new HiveConf(conf);
+    hiveConf.setIntVar(HiveConf.ConfVars.MAXREDUCERS, 2);
+    hiveConf.setIntVar(HiveConf.ConfVars.HADOOPNUMREDUCERS, 2);
+    driver = DriverFactory.newDriver(hiveConf);
+    String dbName = "default";
+    String tblName = "testCompactionWithSchemaEvolutionNoBucketsMultipleReducers";
+    executeStatementOnDriver("drop table if exists " + tblName, driver);
+    executeStatementOnDriver("create transactional table " + tblName + " (a int, b int) partitioned by(ds string)"
+        + " stored as ORC TBLPROPERTIES('transactional'='true'," + " 'transactional_properties'='default')", driver);
+    // Insert some data
+    executeStatementOnDriver("insert into " + tblName
+        + " partition (ds) values(1,2,'today'),(1,3,'today'),(1,4,'yesterday'),(2,2,'yesterday'),(2,3,'today'),(2,4,'today')",
+        driver);
+    // Add a new column
+    executeStatementOnDriver("alter table " + tblName + " add columns(c int)", driver);
+    // Insert more data
+    executeStatementOnDriver("insert into " + tblName
+        + " partition (ds) values(3,2,1000,'yesterday'),(3,3,1001,'today'),(3,4,1002,'yesterday'),(4,2,1003,'today'),"
+        + "(4,3,1004,'yesterday'),(4,4,1005,'today')", driver);
+    executeStatementOnDriver("delete from " + tblName + " where b = 2", driver);
+    //  Run major compaction and cleaner
+    runCompaction(dbName, tblName, CompactionType.MAJOR, "ds=yesterday", "ds=today");
+    runCleaner(hiveConf);
+    List<String> expectedRsPtnToday = new ArrayList<>();
+    expectedRsPtnToday.add("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t1\t3\tNULL\ttoday");
+    expectedRsPtnToday.add("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":2}\t2\t3\tNULL\ttoday");
+    expectedRsPtnToday.add("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":3}\t2\t4\tNULL\ttoday");
+    expectedRsPtnToday.add("{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t3\t3\t1001\ttoday");
+    expectedRsPtnToday.add("{\"writeid\":3,\"bucketid\":536870912,\"rowid\":2}\t4\t4\t1005\ttoday");
+    List<String> expectedRsPtnYesterday = new ArrayList<>();
+    expectedRsPtnYesterday.add("{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t1\t4\tNULL\tyesterday");
+    expectedRsPtnYesterday.add("{\"writeid\":3,\"bucketid\":536936448,\"rowid\":1}\t3\t4\t1002\tyesterday");
+    expectedRsPtnYesterday.add("{\"writeid\":3,\"bucketid\":536936448,\"rowid\":2}\t4\t3\t1004\tyesterday");
+    // Partition 'today'
+    List<String> rsCompactPtnToday = executeStatementOnDriverAndReturnResults("select ROW__ID, * from  " + tblName
+        + " where ds='today'", driver);
+    Assert.assertEquals("compacted read", expectedRsPtnToday, rsCompactPtnToday);
+    // Partition 'yesterday'
+    List<String> rsCompactPtnYesterday = executeStatementOnDriverAndReturnResults("select ROW__ID, * from  " + tblName
+        + " where ds='yesterday'", driver);
+    Assert.assertEquals("compacted read", expectedRsPtnYesterday, rsCompactPtnYesterday);
+    // Clean up
+    executeStatementOnDriver("drop table " + tblName, driver);
+  }
+
+  private void runCompaction(String dbName, String tblName, CompactionType compactionType, String... partNames)
+      throws Exception {
+    HiveConf hiveConf = new HiveConf(conf);
+    hiveConf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true);
+    TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
+    Worker t = new Worker();
+    t.setThreadId((int) t.getId());
+    t.setConf(hiveConf);
+    t.init(new AtomicBoolean(true), new AtomicBoolean());
+    if (partNames.length == 0) {
+      txnHandler.compact(new CompactionRequest(dbName, tblName, compactionType));
+      t.run();
+    } else {
+      for (String partName : partNames) {
+        CompactionRequest cr = new CompactionRequest(dbName, tblName, compactionType);
+        cr.setPartitionname(partName);
+        txnHandler.compact(cr);
+        t.run();
+      }
+    }
+  }
+
+  static void runInitiator(HiveConf hConf) throws Exception {
+    HiveConf hiveConf = new HiveConf(hConf);
+    hiveConf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true);
+    AtomicBoolean stop = new AtomicBoolean(true);
+    Initiator t = new Initiator();
+    t.setThreadId((int) t.getId());
+    t.setConf(hiveConf);
+    AtomicBoolean looped = new AtomicBoolean();
+    t.init(stop, looped);
+    t.run();
+  }
+
+  static void runWorker(HiveConf hConf) throws Exception {
+    HiveConf hiveConf = new HiveConf(hConf);
+    hiveConf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true);
+    AtomicBoolean stop = new AtomicBoolean(true);
+    Worker t = new Worker();
+    t.setThreadId((int) t.getId());
+    t.setConf(hiveConf);
+    AtomicBoolean looped = new AtomicBoolean();
+    t.init(stop, looped);
+    t.run();
+  }
+
+  static void runCleaner(HiveConf hConf) throws Exception {
+    HiveConf hiveConf = new HiveConf(hConf);
+    AtomicBoolean stop = new AtomicBoolean(true);
+    Cleaner t = new Cleaner();
+    t.setThreadId((int) t.getId());
+    t.setConf(hiveConf);
+    AtomicBoolean looped = new AtomicBoolean();
+    t.init(stop, looped);
+    t.run();
+  }
+
+  /**
+   * Execute Hive CLI statement
+   *
+   * @param cmd arbitrary statement to execute
+   */
+  static void executeStatementOnDriver(String cmd, IDriver driver) throws Exception {
+    LOG.debug("Executing: " + cmd);
+    CommandProcessorResponse cpr = driver.run(cmd);
+    if (cpr.getResponseCode() != 0) {
+      throw new IOException("Failed to execute \"" + cmd + "\". Driver returned: " + cpr);
+    }
+  }
+
+  static List<String> executeStatementOnDriverAndReturnResults(String cmd, IDriver driver) throws Exception {
+    LOG.debug("Executing: " + cmd);
+    CommandProcessorResponse cpr = driver.run(cmd);
+    if (cpr.getResponseCode() != 0) {
+      throw new IOException("Failed to execute \"" + cmd + "\". Driver returned: " + cpr);
+    }
+    List<String> rs = new ArrayList<String>();
+    driver.getResults(rs);
+    return rs;
+  }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
index e7aa041..9ff0107 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
@@ -288,6 +288,7 @@ public final class FunctionRegistry {
     system.registerGenericUDF("split", GenericUDFSplit.class);
     system.registerGenericUDF("str_to_map", GenericUDFStringToMap.class);
     system.registerGenericUDF("translate", GenericUDFTranslate.class);
+    system.registerGenericUDF("validate_acid_sort_order", GenericUDFValidateAcidSortOrder.class);
 
     system.registerGenericUDF(UNARY_PLUS_FUNC_NAME, GenericUDFOPPositive.class);
     system.registerGenericUDF(UNARY_MINUS_FUNC_NAME, GenericUDFOPNegative.class);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
index 15c14c9..c270507 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
@@ -128,7 +128,7 @@ public class HiveSplitGenerator extends InputInitializer {
         MRInputHelpers.parseMRInputPayload(initializerContext.getInputUserPayload());
 
     this.conf = TezUtils.createConfFromByteString(userPayloadProto.getConfigurationBytes());
-
+    
     this.jobConf = new JobConf(conf);
 
     // Read all credentials into the credentials instance stored in JobConf.
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java
index 7f8bd22..33d723a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
@@ -33,10 +34,15 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
 import org.apache.hadoop.hive.ql.io.HiveInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcSplit;
 import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputSplit;
@@ -160,43 +166,161 @@ public class SplitGrouper {
     return generateGroupedSplits(jobConf, conf, splits, waves, availableSlots, null, true, locationProvider);
   }
 
-  /** Generate groups of splits, separated by schema evolution boundaries */
-  public Multimap<Integer, InputSplit> generateGroupedSplits(JobConf jobConf,
-                                                                    Configuration conf,
-                                                                    InputSplit[] splits,
-                                                                    float waves, int availableSlots,
-                                                                    String inputName,
-                                                                    boolean groupAcrossFiles,
-                                                                    SplitLocationProvider locationProvider) throws
-      Exception {
-
-    MapWork work = populateMapWork(jobConf, inputName);
+  /**
+   * Generate groups of splits, separated by schema evolution boundaries
+   * OR
+   * When used from compactor, group splits based on the bucket number of the input files
+   * (in this case, splits for same logical bucket but different schema, end up in same group)
+   */
+  public Multimap<Integer, InputSplit> generateGroupedSplits(JobConf jobConf, Configuration conf, InputSplit[] splits,
+      float waves, int availableSlots, String inputName, boolean groupAcrossFiles,
+      SplitLocationProvider locationProvider) throws Exception {
+    MapWork mapWork = populateMapWork(jobConf, inputName);
     // ArrayListMultimap is important here to retain the ordering for the splits.
-    Multimap<Integer, InputSplit> bucketSplitMultiMap =
-        ArrayListMultimap.<Integer, InputSplit> create();
+    Multimap<Integer, InputSplit> schemaGroupedSplitMultiMap = ArrayListMultimap.<Integer, InputSplit> create();
+    
+    if (HiveConf.getVar(jobConf, HiveConf.ConfVars.SPLIT_GROUPING_MODE).equalsIgnoreCase("compactor")) {
+      List<Path> paths = Utilities.getInputPathsTez(jobConf, mapWork);
+      for (Path path : paths) {
+        List<String> aliases = mapWork.getPathToAliases().get(path);
+        if ((aliases != null) && (aliases.size() == 1)) {
+          Operator<? extends OperatorDesc> op = mapWork.getAliasToWork().get(aliases.get(0));
+          if ((op != null) && (op instanceof TableScanOperator)) {
+            TableScanOperator tableScan = (TableScanOperator) op;
+            if (!tableScan.getConf().isTranscationalTable()) {
+              String splitPath = getFirstSplitPath(splits);
+              String errorMessage =
+                  "Compactor split grouping is enabled only for transactional tables. Please check the path: "
+                      + splitPath;
+              LOG.error(errorMessage);
+              throw new RuntimeException(errorMessage);
+            }
+          }
+        }
+      }
+      /**
+       * The expectation is that each InputSplit is a {@link org.apache.hadoop.hive.ql.io.HiveInputFormat.HiveInputSplit} 
+       * wrapping an OrcSplit. So group these splits by bucketId and within each bucketId, sort by writeId, stmtId, 
+       * rowIdOffset or splitStart. For 'original' splits (w/o acid meta cols in the file) SyntheticBucketProperties 
+       * should always be there and so rowIdOffset is there. For 'native' acid files, OrcSplit doesn't have 
+       * the 1st rowid in the split, so splitStart is used to sort. This should achieve the required sorting invariance 
+       * (sort by: writeId, stmtId, rowIdOffset within each bucket) needed for Acid tables.
+       * See: {@link org.apache.hadoop.hive.ql.io.AcidInputFormat}
+       * Create a TezGroupedSplit for each bucketId and return.
+       * TODO: Are there any other config values (split size etc) that can override this per writer split grouping?
+       */
+      return getCompactorSplitGroups(splits, conf);
+    }
 
     int i = 0;
     InputSplit prevSplit = null;
     for (InputSplit s : splits) {
-      // this is the bit where we make sure we don't group across partition
-      // schema boundaries
-      if (schemaEvolved(s, prevSplit, groupAcrossFiles, work)) {
+      // this is the bit where we make sure we don't group across partition schema boundaries
+      if (schemaEvolved(s, prevSplit, groupAcrossFiles, mapWork)) {
         ++i;
         prevSplit = s;
       }
-      bucketSplitMultiMap.put(i, s);
+      schemaGroupedSplitMultiMap.put(i, s);
     }
     LOG.info("# Src groups for split generation: " + (i + 1));
-
     // group them into the chunks we want
     Multimap<Integer, InputSplit> groupedSplits =
-        this.group(jobConf, bucketSplitMultiMap, availableSlots, waves, locationProvider);
-
+        this.group(jobConf, schemaGroupedSplitMultiMap, availableSlots, waves, locationProvider);
     return groupedSplits;
   }
+  
+  // Returns the path of the first split in this list for logging purposes
+  private String getFirstSplitPath(InputSplit[] splits) {
+    if (splits.length == 0) {
+      throw new RuntimeException("The list of splits provided for grouping is empty.");
+    }
+    Path splitPath = ((FileSplit) splits[0]).getPath();
+   
+    return splitPath.toString();
+  }
 
 
   /**
+   * Takes a list of {@link org.apache.hadoop.hive.ql.io.HiveInputFormat.HiveInputSplit}s
+   * and groups them for Acid Compactor, creating one TezGroupedSplit per bucket number.
+   */
+  Multimap<Integer, InputSplit> getCompactorSplitGroups(InputSplit[] rawSplits, Configuration conf) {
+    // Note: For our case, this multimap will essentially contain one value (one TezGroupedSplit) per key 
+    Multimap<Integer, InputSplit> bucketSplitMultiMap = ArrayListMultimap.<Integer, InputSplit> create();
+    HiveInputFormat.HiveInputSplit[] splits = new HiveInputFormat.HiveInputSplit[rawSplits.length];
+    int i = 0;
+    for (InputSplit is : rawSplits) {
+      splits[i++] = (HiveInputFormat.HiveInputSplit) is;
+    }
+    Arrays.sort(splits, new ComparatorCompactor(conf));
+    TezGroupedSplit tgs = null;
+    int previousWriterId = Integer.MIN_VALUE;
+    Path rootDir = null;
+    for (i = 0; i < splits.length; i++) {
+      int writerId = ((OrcSplit) splits[i].getInputSplit()).getBucketId();
+      if (rootDir == null) {
+        rootDir = ((OrcSplit) splits[i].getInputSplit()).getRootDir();
+      } 
+      Path rootDirFromCurrentSplit = ((OrcSplit) splits[i].getInputSplit()).getRootDir();
+      // These splits should belong to the same partition
+      assert rootDir == rootDirFromCurrentSplit;
+      if (writerId != previousWriterId) {
+        // Create a new grouped split for this writerId
+        tgs = new TezGroupedSplit(1, "org.apache.hadoop.hive.ql.io.HiveInputFormat", null, null);
+        bucketSplitMultiMap.put(writerId, tgs);
+      }
+      tgs.addSplit(splits[i]);
+      previousWriterId = writerId;
+    }
+    return bucketSplitMultiMap;
+  }
+  
+  static class ComparatorCompactor implements Comparator<HiveInputFormat.HiveInputSplit> {
+    private  Configuration conf;
+    private ComparatorCompactor(Configuration conf) {
+      this.conf = conf;
+    }
+    
+    @Override
+    public int compare(HiveInputFormat.HiveInputSplit h1, HiveInputFormat.HiveInputSplit h2) {
+      //sort: bucketId,writeId,stmtId,rowIdOffset,splitStart
+      if(h1 == h2) {
+        return 0;
+      }
+      OrcSplit o1 = (OrcSplit)h1.getInputSplit();
+      OrcSplit o2 = (OrcSplit)h2.getInputSplit();
+      try {
+        o1.parse(conf);
+        o2.parse(conf);
+      } catch(IOException ex) {
+        throw new RuntimeException(ex);
+      }
+      // Note: this is the bucket number as seen in the file name.
+      // Hive 3.0 encodes a bunch of info in the Acid schema's bucketId attribute.
+      // See: {@link org.apache.hadoop.hive.ql.io.BucketCodec.V1} for details.
+      if(o1.getBucketId() != o2.getBucketId()) {
+        return o1.getBucketId() < o2.getBucketId() ? -1 : 1;
+      }
+      if(o1.getWriteId() != o2.getWriteId()) {
+        return o1.getWriteId() < o2.getWriteId() ? -1 : 1;
+      }
+      if(o1.getStatementId() != o2.getStatementId()) {
+        return o1.getStatementId() < o2.getStatementId() ? -1 : 1;
+      }
+      long rowOffset1 = o1.getSyntheticAcidProps() == null ? 0 : o1.getSyntheticAcidProps().getRowIdOffset();
+      long rowOffset2 = o2.getSyntheticAcidProps() == null ? 0 : o2.getSyntheticAcidProps().getRowIdOffset();
+      if(rowOffset1 != rowOffset2) {
+        //if 2 splits are from the same file (delta/base in fact), they either both have syntheticAcidProps or both do not
+        return rowOffset1 < rowOffset2 ? -1 : 1;
+      }
+      if(o1.getStart() != o2.getStart()) {
+        return o1.getStart() < o2.getStart() ? -1 : 1;
+      }
+      throw new RuntimeException("Found 2 equal splits: " + o1 + " and " + o2);
+    }
+  }
+
+  /**
    * get the size estimates for each bucket in tasks. This is used to make sure
    * we allocate the head room evenly
    */
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index 5dbf634..9b51847 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -1553,9 +1553,13 @@ public class AcidUtils {
   }
 
   public static boolean isFullAcidScan(Configuration conf) {
-    if (!HiveConf.getBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN)) return false;
+    if (!HiveConf.getBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN)) {
+      return false;
+    }
     int propInt = conf.getInt(ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname, -1);
-    if (propInt == -1) return true;
+    if (propInt == -1) {
+      return true;
+    }
     AcidOperationalProperties props = AcidOperationalProperties.parseInt(propInt);
     return !props.isInsertOnly();
   }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
index fbb931c..7c4bc4d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
@@ -1230,9 +1230,6 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
           else {
             AcidUtils.ParsedDelta pd = AcidUtils.parsedDelta(parent, AcidUtils.DELTA_PREFIX,
               parent.getFileSystem(conf));
-            assert pd.getMinWriteId() == pd.getMaxWriteId() :
-              "This a delta with raw non acid schema, must be result of single write, no compaction: "
-                + splitPath;
             return new TransactionMetaData(pd.getMinWriteId(), parent, pd.getStatementId());
           }
         }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
index 6d4578e..2255f8b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
@@ -634,7 +634,7 @@ public class OrcRecordUpdater implements RecordUpdater {
     } catch (CharacterCodingException e) {
       throw new IllegalArgumentException("Bad string encoding for " +
           OrcRecordUpdater.ACID_KEY_INDEX_NAME, e);
-    }
+    } 
     RecordIdentifier[] result = new RecordIdentifier[stripes.length];
     for(int i=0; i < stripes.length; ++i) {
       if (stripes[i].length() != 0) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
index 4d55592..61e7558 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.io.AcidInputFormat;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.ColumnarSplit;
 import org.apache.hadoop.hive.ql.io.LlapAwareSplit;
@@ -64,6 +65,9 @@ public class OrcSplit extends FileSplit implements ColumnarSplit, LlapAwareSplit
   private long projColsUncompressedSize;
   private transient Object fileKey;
   private long fileLen;
+  private transient long writeId = 0;
+  private transient int bucketId = 0;
+  private transient int stmtId = 0;
 
   /**
    * This contains the synthetic ROW__ID offset and bucket properties for original file splits in an ACID table.
@@ -306,7 +310,7 @@ public class OrcSplit extends FileSplit implements ColumnarSplit, LlapAwareSplit
   /**
    * Used for generating synthetic ROW__IDs for reading "original" files.
    */
-  static final class OffsetAndBucketProperty {
+  public static final class OffsetAndBucketProperty {
     private final long rowIdOffset;
     private final int bucketProperty;
     private final long syntheticWriteId;
@@ -328,6 +332,38 @@ public class OrcSplit extends FileSplit implements ColumnarSplit, LlapAwareSplit
       return syntheticWriteId;
     }
   }
+  
+  /**
+   * Note: this is the write id as seen in the file name that contains this split
+   * For files that have min/max writeId, this is the starting one.  
+   * @return
+   */
+  public long getWriteId() {
+    return writeId;
+  }
+
+  public int getStatementId() {
+    return stmtId;
+  }
+
+  /**
+   * Note: this is the bucket number as seen in the file name that contains this split.
+   * Hive 3.0 encodes a bunch of info in the Acid schema's bucketId attribute.
+   * See: {@link org.apache.hadoop.hive.ql.io.BucketCodec.V1} for details.
+   * @return
+   */
+  public int getBucketId() {
+    return bucketId;
+  }
+
+  public void parse(Configuration conf) throws IOException {
+    OrcRawRecordMerger.TransactionMetaData tmd =
+        OrcRawRecordMerger.TransactionMetaData.findWriteIDForSynthetcRowIDs(getPath(), rootDir, conf);
+    writeId = tmd.syntheticWriteId;
+    stmtId = tmd.statementId;
+    AcidOutputFormat.Options opt = AcidUtils.parseBaseOrDeltaBucketFilename(getPath(), conf);
+    bucketId = opt.getBucketId();
+  }
 
   @Override
   public String toString() {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
index db3b427..adfa431 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
@@ -2197,6 +2197,12 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer {
 
     String type = unescapeSQLString(ast.getChild(0).getText()).toLowerCase();
 
+    if (type.equalsIgnoreCase("minor") && HiveConf.getBoolVar(conf, ConfVars.COMPACTOR_CRUD_QUERY_BASED)) {
+      throw new SemanticException(
+          "Minor compaction is not currently supported for query based compaction (enabled by setting: "
+              + ConfVars.COMPACTOR_CRUD_QUERY_BASED + " to true).");
+    }
+
     if (!type.equals("minor") && !type.equals("major")) {
       throw new SemanticException(ErrorMsg.INVALID_COMPACTION_TYPE.getMsg());
     }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index dc05e19..cde47da 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -33,6 +33,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.regex.Matcher;
 
+import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileStatus;
@@ -59,6 +60,7 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.ql.DriverUtils;
 import org.apache.hadoop.hive.ql.exec.DDLTask;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
@@ -69,6 +71,7 @@ import org.apache.hadoop.hive.ql.io.AcidUtils.Directory;
 import org.apache.hadoop.hive.ql.io.HiveInputFormat;
 import org.apache.hadoop.hive.ql.io.IOConstants;
 import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
 import org.apache.hadoop.hive.ql.session.SessionState;
@@ -232,6 +235,23 @@ public class CompactorMR {
     if(conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST) && conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION)) {
       throw new RuntimeException(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION.name() + "=true");
     }
+    
+    /**
+     * Run major compaction in a HiveQL query (compaction for MM tables handled in runMmCompaction method).
+     * TODO: 
+     * 1. A good way to run minor compaction (currently disabled when this config is enabled)
+     * 2. More generic approach to collecting files in the same logical bucket to compact within the same task
+     * (currently we're using Tez split grouping).
+     */
+    if (!AcidUtils.isInsertOnlyTable(t.getParameters()) && HiveConf.getBoolVar(conf,
+        ConfVars.COMPACTOR_CRUD_QUERY_BASED)) {
+      if (ci.isMajorCompaction()) {
+        runCrudCompaction(conf, t, p, sd, writeIds, ci);
+        return;
+      } else {
+        throw new RuntimeException("Query based compaction is not currently supported for minor compactions");
+      }
+    }
 
     if (AcidUtils.isInsertOnlyTable(t.getParameters())) {
       if (HiveConf.getBoolVar(conf, ConfVars.HIVE_COMPACTOR_COMPACT_MM)) {
@@ -318,6 +338,85 @@ public class CompactorMR {
     su.gatherStats();
   }
 
+  /**
+   * 
+   * @param conf
+   * @param t
+   * @param p
+   * @param sd (this is the resolved StorageDescriptor, i.e. resolved to table or partition)
+   * @param writeIds (valid write ids used to filter rows while they're being read for compaction)
+   * @param ci
+   * @throws IOException
+   */
+  private void runCrudCompaction(HiveConf hiveConf, Table t, Partition p, StorageDescriptor sd, ValidWriteIdList writeIds,
+      CompactionInfo ci) throws IOException {
+    AcidUtils.setAcidOperationalProperties(hiveConf, true, AcidUtils.getAcidOperationalProperties(t.getParameters()));
+    AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(sd.getLocation()), hiveConf, writeIds, Ref.from(false), false,
+        t.getParameters());
+    int deltaCount = dir.getCurrentDirectories().size();
+    int origCount = dir.getOriginalFiles().size();
+    if ((deltaCount + (dir.getBaseDirectory() == null ? 0 : 1)) + origCount <= 1) {
+      LOG.debug("Not compacting {}; current base is {} and there are {} deltas and {} originals", sd.getLocation(), dir
+          .getBaseDirectory(), deltaCount, origCount);
+      return;
+    }
+    String user = UserGroupInformation.getCurrentUser().getShortUserName();
+    SessionState sessionState = DriverUtils.setUpSessionState(hiveConf, user, false);
+    // Set up the session for driver.
+    HiveConf conf = new HiveConf(hiveConf);
+    conf.set(ConfVars.HIVE_QUOTEDID_SUPPORT.varname, "column");
+    /**
+     * For now, we will group splits on tez so that we end up with all bucket files, 
+     * with same bucket number in one map task.
+     */
+    conf.set(ConfVars.SPLIT_GROUPING_MODE.varname, "compactor");
+    String tmpPrefix = t.getDbName() + "_tmp_compactor_" + t.getTableName() + "_";
+    String tmpTableName = tmpPrefix + System.currentTimeMillis();
+    long compactorTxnId = CompactorMap.getCompactorTxnId(conf);
+    try {
+      // Create a temporary table under the temp location --> db/tbl/ptn/_tmp_1234/db.tmp_compactor_tbl_1234
+      String query = buildCrudMajorCompactionCreateTableQuery(tmpTableName, t, sd);
+      LOG.info("Running major compaction query into temp table with create definition: {}", query);
+      try {
+        DriverUtils.runOnDriver(conf, user, sessionState, query);
+      } catch (Exception ex) {
+        Throwable cause = ex;
+        while (cause != null && !(cause instanceof AlreadyExistsException)) {
+          cause = cause.getCause();
+        }
+        if (cause == null) {
+          throw new IOException(ex);
+        }
+      }
+      query = buildCrudMajorCompactionQuery(conf, t, p, tmpTableName);
+      LOG.info("Running major compaction via query: {}", query);
+      /**
+       * This will create bucket files like:
+       * db/db_tmp_compactor_tbl_1234/00000_0
+       * db/db_tmp_compactor_tbl_1234/00001_0
+       */
+      DriverUtils.runOnDriver(conf, user, sessionState, query, writeIds, compactorTxnId);
+      /**
+       * This achieves a final layout like (wid is the highest valid write id for this major compaction):
+       * db/tbl/ptn/base_wid/bucket_00000
+       * db/tbl/ptn/base_wid/bucket_00001
+       */
+      org.apache.hadoop.hive.ql.metadata.Table tempTable = Hive.get().getTable(tmpTableName);
+      String tmpLocation = tempTable.getSd().getLocation();
+      commitCrudMajorCompaction(t, tmpLocation, tmpTableName, sd.getLocation(), conf, writeIds, compactorTxnId);
+    } catch (HiveException e) {
+      LOG.error("Error doing query based major compaction", e);
+      throw new IOException(e);
+    } finally {
+      try {
+        DriverUtils.runOnDriver(conf, user, sessionState, "drop table if exists " + tmpTableName);
+      } catch (HiveException e) {
+        LOG.error("Unable to delete drop temp table {} which was created for running major compaction", tmpTableName);
+        LOG.error(ExceptionUtils.getStackTrace(e));
+      }
+    }
+  }
+
   private void runMmCompaction(HiveConf conf, Table t, Partition p,
       StorageDescriptor sd, ValidWriteIdList writeIds, CompactionInfo ci) throws IOException {
     LOG.debug("Going to delete directories for aborted transactions for MM table "
@@ -376,8 +475,7 @@ public class CompactorMR {
           }
         }
       }
-
-      String query = buildMmCompactionQuery(driverConf, t, p, tmpTableName);
+      String query = buildMmCompactionQuery(conf, t, p, tmpTableName);
       LOG.info("Compacting a MM table via " + query);
       long compactorTxnId = CompactorMap.getCompactorTxnId(conf);
       DriverUtils.runOnDriver(driverConf, user, sessionState, query, writeIds, compactorTxnId);
@@ -393,6 +491,103 @@ public class CompactorMR {
   private String generateTmpPath(StorageDescriptor sd) {
     return sd.getLocation() + "/" + TMPDIR + "_" + UUID.randomUUID().toString();
   }
+  
+  /**
+   * Note on ordering of rows in the temp table:
+   * We need each final bucket file soreted by original write id (ascending), bucket (ascending) and row id (ascending). 
+   * (current write id will be the same as original write id). 
+   * We will be achieving the ordering via a custom split grouper for compactor.
+   * See {@link org.apache.hadoop.hive.conf.HiveConf.ConfVars.SPLIT_GROUPING_MODE} for the config description.
+   * See {@link org.apache.hadoop.hive.ql.exec.tez.SplitGrouper#getCompactorGroups} for details on the mechanism.
+   */
+  private String buildCrudMajorCompactionCreateTableQuery(String fullName, Table t, StorageDescriptor sd) {
+    StringBuilder query = new StringBuilder("create temporary table ").append(fullName).append(" (");
+    // Acid virtual columns
+    query.append(
+        "`operation` int, `originalTransaction` bigint, `bucket` int, `rowId` bigint, `currentTransaction` bigint, `row` struct<");
+    List<FieldSchema> cols = t.getSd().getCols();
+    boolean isFirst = true;
+    // Actual columns
+    for (FieldSchema col : cols) {
+      if (!isFirst) {
+        query.append(", ");
+      }
+      isFirst = false;
+      query.append("`").append(col.getName()).append("` ").append(":").append(col.getType());
+    }
+    query.append(">)");
+    query.append(" stored as orc");
+    query.append(" tblproperties ('transactional'='false')");
+    return query.toString();
+  }
+
+  private String buildCrudMajorCompactionQuery(HiveConf conf, Table t, Partition p, String tmpName) {
+    String fullName = t.getDbName() + "." + t.getTableName();
+    String query = "insert into table " + tmpName + " ";
+    String filter = "";
+    if (p != null) {
+      filter = filter + " where ";
+      List<String> vals = p.getValues();
+      List<FieldSchema> keys = t.getPartitionKeys();
+      assert keys.size() == vals.size();
+      for (int i = 0; i < keys.size(); ++i) {
+        filter += (i == 0 ? "`" : " and `") + (keys.get(i).getName() + "`='" + vals.get(i) + "'");
+      }
+    }
+    query += " select validate_acid_sort_order(ROW__ID.writeId, ROW__ID.bucketId, ROW__ID.rowId), ROW__ID.writeId, "
+        + "ROW__ID.bucketId, ROW__ID.rowId, ROW__ID.writeId, NAMED_STRUCT(";
+    List<FieldSchema> cols = t.getSd().getCols();
+    for (int i = 0; i < cols.size(); ++i) {
+      query += (i == 0 ? "'" : ", '") + cols.get(i).getName() + "', " + cols.get(i).getName();
+    }
+    query += ") from " + fullName + filter;
+    return query;
+  }
+
+  /**
+   * Move and rename bucket files from the temp table (tmpTableName), to the new base path under the source table/ptn.
+   * Since the temp table is a non-transactional table, it has file names in the "original" format.
+   * Also, due to split grouping in {@link org.apache.hadoop.hive.ql.exec.tez.SplitGrouper#getCompactorGroups}, 
+   * we will end up with one file per bucket.
+   */
+  private void commitCrudMajorCompaction(Table t, String from, String tmpTableName, String to, Configuration conf,
+      ValidWriteIdList actualWriteIds, long compactorTxnId) throws IOException {
+    Path fromPath = new Path(from);
+    Path toPath = new Path(to);
+    Path tmpTablePath = new Path(fromPath, tmpTableName);
+    FileSystem fs = fromPath.getFileSystem(conf);
+    // Assume the high watermark can be used as maximum transaction ID.
+    long maxTxn = actualWriteIds.getHighWatermark();
+    // Get a base_wid path which will be the new compacted base
+    AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf).writingBase(true).isCompressed(false)
+        .maximumWriteId(maxTxn).bucket(0).statementId(-1);
+    Path newBaseDir = AcidUtils.createFilename(toPath, options).getParent();
+    if (!fs.exists(fromPath)) {
+      LOG.info("{} not found.  Assuming 0 splits. Creating {}", from, newBaseDir);
+      fs.mkdirs(newBaseDir);
+      return;
+    }
+    LOG.info("Moving contents of {} to {}", tmpTablePath, to);
+    /**
+     * Currently mapping file with name 0000_0 to bucket_00000, 0000_1 to bucket_00001 and so on
+     * TODO/ToThink:
+     * Q. Can file with name 0000_0 under temp table be deterministically renamed to bucket_00000 in the destination?
+     */
+    //    List<String> buckCols = t.getSd().getBucketCols();
+    FileStatus[] children = fs.listStatus(fromPath);
+    for (FileStatus filestatus : children) {
+      String originalFileName = filestatus.getPath().getName();
+      // This if() may not be required I think...
+      if (AcidUtils.ORIGINAL_PATTERN.matcher(originalFileName).matches()) {
+        int bucketId = AcidUtils.parseBucketId(filestatus.getPath());
+        options = new AcidOutputFormat.Options(conf).writingBase(true).isCompressed(false).maximumWriteId(maxTxn)
+            .bucket(bucketId).statementId(-1).visibilityTxnId(compactorTxnId);
+        Path finalBucketFile = AcidUtils.createFilename(toPath, options);
+        fs.rename(filestatus.getPath(), finalBucketFile);
+      }
+    }
+    fs.delete(fromPath, true);
+  }
 
   private String buildMmCompactionCtQuery(
       String fullName, Table t, StorageDescriptor sd, String location) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index a0df82c..a37c983 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -96,6 +96,12 @@ public class Initiator extends MetaStoreCompactorThread {
           LOG.debug("Found " + potentials.size() + " potential compactions, " +
               "checking to see if we should compact any of them");
           for (CompactionInfo ci : potentials) {
+            // Disable minor compaction for query based compactor
+            if (!ci.isMajorCompaction() && HiveConf.getBoolVar(conf, HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED)) {
+              LOG.debug("Not compacting: " + ci.getFullPartitionName()
+                  + ", as query based compaction currently does not " + "support minor compactions.");
+              continue;
+            }
             LOG.info("Checking to see if we should compact " + ci.getFullPartitionName());
             try {
               Table t = resolveTable(ci);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFValidateAcidSortOrder.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFValidateAcidSortOrder.java
new file mode 100644
index 0000000..757a366
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFValidateAcidSortOrder.java
@@ -0,0 +1,100 @@
+package org.apache.hadoop.hive.ql.udf.generic;
+
+import org.apache.hadoop.hive.ql.exec.Description;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
+import org.apache.hadoop.hive.ql.io.BucketCodec;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * GenericUDFValidateAcidSortOrder.
+ */
+@Description(name = "validate_acid_sort_order(ROW__ID.writeId, ROW__ID.bucketId, ROW__ID.rowId)", 
+  value = "_FUNC_(writeId, bucketId, rowId) - returns 0 if the current row is in the right acid sort order "
+    + "compared to the previous row")
+public class GenericUDFValidateAcidSortOrder extends GenericUDF {
+  public static final String UDF_NAME = "validate_acid_sort_order";
+  private transient PrimitiveCategory[] inputTypes = new PrimitiveCategory[3];
+  private transient Converter[] converters = new Converter[3];
+  private final LongWritable output = new LongWritable();
+  // See {@link org.apache.hadoop.hive.ql.exec.tez.SplitGrouper#getCompactorGroups}
+  // Each writer is handling only one logical bucket (i.e. all files with same bucket number end up in one writer)
+  private int bucketNumForWriter = -1;
+  private WriteIdRowId previousWriteIdRowId = null;
+
+  @Override
+  public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
+    checkArgsSize(arguments, 3, 3);
+    checkArgPrimitive(arguments, 0);
+    checkArgPrimitive(arguments, 1);
+    checkArgPrimitive(arguments, 2);
+    obtainLongConverter(arguments, 0, inputTypes, converters);
+    obtainIntConverter(arguments, 1, inputTypes, converters);
+    obtainLongConverter(arguments, 2, inputTypes, converters);
+    ObjectInspector outputOI = PrimitiveObjectInspectorFactory.writableLongObjectInspector;
+    return outputOI;
+  }
+
+  @Override
+  public Object evaluate(DeferredObject[] arguments) throws HiveException {
+    long writeId = getLongValue(arguments, 0, converters);
+    int bucketProperty = getIntValue(arguments, 1, converters);
+    int bucketNum = BucketCodec.determineVersion(bucketProperty).decodeWriterId(bucketProperty);
+    long rowId = getLongValue(arguments, 2, converters);
+    if (bucketNumForWriter < 0) {
+      bucketNumForWriter = bucketNum;
+    } else {
+      if (bucketNumForWriter != bucketNum) {
+        throw new HiveException("One writer is supposed to handle only one bucket. We saw these 2 different buckets: "
+            + bucketNumForWriter + " and " + bucketNum);
+      }
+    }
+    WriteIdRowId current = new WriteIdRowId(bucketProperty, writeId, rowId);
+    if (previousWriteIdRowId != null) {
+      // Verify sort order for this new row
+      if (current.compareTo(previousWriteIdRowId) <= 0) {
+        throw new HiveException("Wrong sort order of Acid rows detected for the rows: " + previousWriteIdRowId + " and "
+            + current);
+      }
+    }
+    previousWriteIdRowId = current;
+    output.set(0);
+    return output;
+  }
+
+  @Override
+  public String getDisplayString(String[] children) {
+    return getStandardDisplayString("validate_acid_sort_order", children);
+  }
+
+  final static class WriteIdRowId implements Comparable<WriteIdRowId> {
+    final int bucketProperty;
+    final long writeId;
+    final long rowId;
+
+    WriteIdRowId(int bucketProperty, long writeId, long rowId) {
+      this.bucketProperty = bucketProperty;
+      this.writeId = writeId;
+      this.rowId = rowId;
+    }
+
+    @Override
+    public int compareTo(WriteIdRowId other) {
+      if (this.bucketProperty != other.bucketProperty) {
+        return this.bucketProperty < other.bucketProperty ? -1 : 1;
+      }
+      if (this.writeId != other.writeId) {
+        return this.writeId < other.writeId ? -1 : 1;
+      }
+      if (this.rowId != other.rowId) {
+        return this.rowId < other.rowId ? -1 : 1;
+      }
+      return 0;
+    }
+  }
+}
\ No newline at end of file
diff --git a/ql/src/test/results/clientpositive/show_functions.q.out b/ql/src/test/results/clientpositive/show_functions.q.out
index c9716e9..1d2cb1c 100644
--- a/ql/src/test/results/clientpositive/show_functions.q.out
+++ b/ql/src/test/results/clientpositive/show_functions.q.out
@@ -279,6 +279,7 @@ unhex
 unix_timestamp
 upper
 uuid
+validate_acid_sort_order
 var_pop
 var_samp
 variance
@@ -401,6 +402,7 @@ date_format
 date_sub
 datediff
 to_date
+validate_acid_sort_order
 PREHOOK: query: SHOW FUNCTIONS '***'
 PREHOOK: type: SHOWFUNCTIONS
 POSTHOOK: query: SHOW FUNCTIONS '***'