You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2017/08/26 03:20:39 UTC

[3/3] hive git commit: HIVE-17205 - add functional support for unbucketed tables (Eugene Koifman, reviewed by Wei Zheng)

HIVE-17205 - add functional support for unbucketed tables (Eugene Koifman, reviewed by Wei Zheng)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/6be50b76
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/6be50b76
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/6be50b76

Branch: refs/heads/master
Commit: 6be50b76be5956b3c52ed6024fd7b4a3dee65fb6
Parents: 262d8f9
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Fri Aug 25 20:14:57 2017 -0700
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Fri Aug 25 20:15:26 2017 -0700

----------------------------------------------------------------------
 .../streaming/AbstractRecordWriter.java         |   44 +-
 .../hive/hcatalog/streaming/HiveEndPoint.java   |    4 +-
 .../apache/hive/hcatalog/streaming/package.html |   21 +-
 .../hive/hcatalog/streaming/TestStreaming.java  |   86 +-
 .../hive/metastore/TestHiveMetaStore.java       |    8 +-
 .../apache/hadoop/hive/ql/TestAcidOnTez.java    |  367 +++-
 .../hive/ql/TestAcidOnTezWithSplitUpdate.java   |   28 -
 .../test/resources/testconfiguration.properties |    4 +-
 .../TransactionalValidationListener.java        |   42 +-
 .../org/apache/hadoop/hive/ql/ErrorMsg.java     |    4 +-
 .../hadoop/hive/ql/exec/FileSinkOperator.java   |   22 +-
 .../hadoop/hive/ql/io/orc/OrcInputFormat.java   |   60 +-
 .../hive/ql/io/orc/OrcRawRecordMerger.java      |  225 +-
 .../hadoop/hive/ql/io/orc/OrcRecordUpdater.java |   14 +-
 .../apache/hadoop/hive/ql/io/orc/OrcSplit.java  |   10 +-
 .../io/orc/VectorizedOrcAcidRowBatchReader.java |   26 +-
 .../optimizer/SortedDynPartitionOptimizer.java  |    1 +
 .../hadoop/hive/ql/parse/SemanticAnalyzer.java  |    6 -
 .../hive/ql/txn/compactor/CompactorMR.java      |    3 +-
 .../apache/hadoop/hive/ql/TestTxnCommands.java  |  108 +-
 .../apache/hadoop/hive/ql/TestTxnCommands2.java |   12 +-
 .../hadoop/hive/ql/TestTxnCommandsBase.java     |  162 ++
 .../apache/hadoop/hive/ql/TestTxnNoBuckets.java |  297 +++
 .../hive/ql/io/orc/TestInputOutputFormat.java   |   28 +-
 .../hive/ql/io/orc/TestOrcRawRecordMerger.java  |   86 +-
 .../queries/clientnegative/create_not_acid.q    |    2 +-
 .../queries/clientpositive/acid_no_buckets.q    |  210 ++
 .../clientnegative/create_not_acid.q.out        |    4 +-
 .../clientnegative/delete_non_acid_table.q.out  |    2 +-
 .../clientnegative/update_non_acid_table.q.out  |    2 +-
 .../clientpositive/llap/acid_no_buckets.q.out   | 1976 ++++++++++++++++++
 31 files changed, 3534 insertions(+), 330 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
index e409e75..4ec10ad 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
@@ -46,6 +46,7 @@ import java.io.IOException;
 
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Properties;
@@ -54,19 +55,23 @@ import java.util.Properties;
 public abstract class AbstractRecordWriter implements RecordWriter {
   static final private Logger LOG = LoggerFactory.getLogger(AbstractRecordWriter.class.getName());
 
-  final HiveConf conf;
-  final HiveEndPoint endPoint;
+  private final HiveConf conf;
+  private final HiveEndPoint endPoint;
   final Table tbl;
 
-  final IMetaStoreClient msClient;
-  protected final List<Integer> bucketIds;
-  ArrayList<RecordUpdater> updaters = null;
+  private final IMetaStoreClient msClient;
+  final List<Integer> bucketIds;
+  private ArrayList<RecordUpdater> updaters = null;
 
-  public final int totalBuckets;
+  private final int totalBuckets;
+  /**
+   * Indicates whether target table is bucketed
+   */
+  private final boolean isBucketed;
 
   private final Path partitionPath;
 
-  final AcidOutputFormat<?,?> outf;
+  private final AcidOutputFormat<?,?> outf;
   private Object[] bucketFieldData; // Pre-allocated in constructor. Updated on each write.
   private Long curBatchMinTxnId;
   private Long curBatchMaxTxnId;
@@ -109,16 +114,22 @@ public abstract class AbstractRecordWriter implements RecordWriter {
         this.tbl = twp.tbl;
         this.partitionPath = twp.partitionPath;
       }
-      this.totalBuckets = tbl.getSd().getNumBuckets();
-      if (totalBuckets <= 0) {
-        throw new StreamingException("Cannot stream to table that has not been bucketed : "
-          + endPoint);
+      this.isBucketed = tbl.getSd().getNumBuckets() > 0;
+      /**
+       *  For unbucketed tables we have exactly 1 RecrodUpdater for each AbstractRecordWriter which
+       *  ends up writing to a file bucket_000000
+       * See also {@link #getBucket(Object)}
+       */
+      this.totalBuckets = isBucketed ? tbl.getSd().getNumBuckets() : 1;
+      if(isBucketed) {
+        this.bucketIds = getBucketColIDs(tbl.getSd().getBucketCols(), tbl.getSd().getCols());
+        this.bucketFieldData = new Object[bucketIds.size()];
+      }
+      else {
+        bucketIds = Collections.emptyList();
       }
-      this.bucketIds = getBucketColIDs(tbl.getSd().getBucketCols(), tbl.getSd().getCols());
-      this.bucketFieldData = new Object[bucketIds.size()];
       String outFormatName = this.tbl.getSd().getOutputFormat();
       outf = (AcidOutputFormat<?, ?>) ReflectionUtils.newInstance(JavaUtils.loadClass(outFormatName), conf);
-      bucketFieldData = new Object[bucketIds.size()];
     } catch(InterruptedException e) {
       throw new StreamingException(endPoint2.toString(), e);
     } catch (MetaException | NoSuchObjectException e) {
@@ -169,6 +180,9 @@ public abstract class AbstractRecordWriter implements RecordWriter {
 
   // returns the bucket number to which the record belongs to
   protected int getBucket(Object row) throws SerializationError {
+    if(!isBucketed) {
+      return 0;
+    }
     ObjectInspector[] inspectors = getBucketObjectInspectors();
     Object[] bucketFields = getBucketFields(row);
     return ObjectInspectorUtils.getBucketNumber(bucketFields, inspectors, totalBuckets);
@@ -204,7 +218,7 @@ public abstract class AbstractRecordWriter implements RecordWriter {
     curBatchMaxTxnId = maxTxnID;
     updaters = new ArrayList<RecordUpdater>(totalBuckets);
     for (int bucket = 0; bucket < totalBuckets; bucket++) {
-      updaters.add(bucket, null);
+      updaters.add(bucket, null);//so that get(i) returns null rather than ArrayOutOfBounds
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
index 81f6155..28c98bd 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
@@ -20,6 +20,8 @@ package org.apache.hive.hcatalog.streaming;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hive.metastore.api.DataOperationType;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.cli.CliSessionState;
@@ -338,7 +340,7 @@ public class HiveEndPoint {
       // 1 - check if TBLPROPERTIES ('transactional'='true') is set on table
       Map<String, String> params = t.getParameters();
       if (params != null) {
-        String transactionalProp = params.get("transactional");
+        String transactionalProp = params.get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL);
         if (transactionalProp == null || !transactionalProp.equalsIgnoreCase("true")) {
           LOG.error("'transactional' property is not set on Table " + endPoint);
           throw new InvalidTable(endPoint.database, endPoint.table, "\'transactional\' property" +

http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/package.html
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/package.html b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/package.html
index ed4d307..a879b97 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/package.html
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/package.html
@@ -30,7 +30,7 @@ partition. Once data is committed it becomes immediately visible to
 all Hive queries initiated subsequently.</p>
 
 <p>
-This API is intended for streaming clients such as Flume and Storm,
+This API is intended for streaming clients such as NiFi, Flume and Storm,
 which continuously generate data. Streaming support is built on top of
 ACID based insert/update support in Hive.</p>
 
@@ -56,10 +56,7 @@ A few things are currently required to use streaming.
 <ol>
   <li> Currently, only ORC storage format is supported. So 
     '<b>stored as orc</b>' must be specified during table creation.</li>
-  <li> The hive table must be bucketed, but not sorted. So something like 
-    '<b>clustered by (<i>colName</i>) into <i>10</i> buckets</b>' must 
-    be specified during table creation. The number of buckets
-    is ideally the same as the number of streaming writers.</li>
+  <li> The hive table may be bucketed but must not be sorted. </li>
   <li> User of the client streaming process must have the necessary 
     permissions to write to the table or partition and create partitions in
     the table.</li>
@@ -67,7 +64,6 @@ A few things are currently required to use streaming.
     <ol>
       <li><b>hive.input.format =
              org.apache.hadoop.hive.ql.io.HiveInputFormat</b></li>
-      <li><b>hive.vectorized.execution.enabled = false</b></li>
     </ol></li>
   The above client settings are a temporary requirement and the intention is to
   drop the need for them in the near future.
@@ -165,8 +161,21 @@ additional implementations of the <b>RecordWriter</b> interface.
 - Delimited text input.</li>
 <li> <a href="StrictJsonWriter.html"><b>StrictJsonWriter</b></a> 
 - JSON text input.</li>
+  <li> <a href="StrictRegexWriter.html"><b>StrictRegexWriter</b></a>
+    - text input with regex.</li>
 </ul></p>
 
+<h2>Performance, Concurrency, Etc.</h2>
+<p>
+  Each StreamingConnection is writing data at the rate the underlying
+  FileSystem can accept it.  If that is not sufficient, multiple StreamingConnection objects can
+  be created concurrently.
+</p>
+<p>
+  Each StreamingConnection can have at most 1 outstanding TransactionBatch and each TransactionBatch
+  may have at most 2 threads operaing on it.
+  See <a href="TransactionBatch.html"><b>TransactionBatch</b></a>
+</p>
 </body>
 
 </html>

http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
index f3ef92b..49520ef 100644
--- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
@@ -33,6 +33,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -52,6 +53,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
 import org.apache.hadoop.hive.metastore.api.LockState;
 import org.apache.hadoop.hive.metastore.api.LockType;
+import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
@@ -65,6 +67,7 @@ import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
 import org.apache.hadoop.hive.ql.CommandNeedRetryException;
 import org.apache.hadoop.hive.ql.Driver;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.BucketCodec;
 import org.apache.hadoop.hive.ql.io.IOConstants;
 import org.apache.hadoop.hive.ql.io.orc.OrcFile;
 import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
@@ -74,6 +77,7 @@ import org.apache.hadoop.hive.ql.io.orc.RecordReader;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.txn.AcidHouseKeeperService;
+import org.apache.hadoop.hive.ql.txn.compactor.Worker;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
@@ -193,7 +197,6 @@ public class TestStreaming {
 
     conf = new HiveConf(this.getClass());
     conf.set("fs.raw.impl", RawFileSystem.class.getName());
-    conf.set("hive.enforce.bucketing", "true");
     conf
     .setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
         "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
@@ -339,6 +342,83 @@ public class TestStreaming {
     }
   }
 
+  /**
+   * Test that streaming can write to unbucketed table.
+   */
+  @Test
+  public void testNoBuckets() throws Exception {
+    queryTable(driver, "drop table if exists default.streamingnobuckets");
+    //todo: why does it need transactional_properties?
+    queryTable(driver, "create table default.streamingnobuckets (a string, b string) stored as orc TBLPROPERTIES('transactional'='true', 'transactional_properties'='default')");
+    queryTable(driver, "insert into default.streamingnobuckets values('foo','bar')");
+    List<String> rs = queryTable(driver, "select * from default.streamingnobuckets");
+    Assert.assertEquals(1, rs.size());
+    Assert.assertEquals("foo\tbar", rs.get(0));
+    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "default", "streamingnobuckets", null);
+    String[] colNames1 = new String[] { "a", "b" };
+    StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName());
+    DelimitedInputWriter wr = new DelimitedInputWriter(colNames1,",",  endPt, connection);
+
+    TransactionBatch txnBatch =  connection.fetchTransactionBatch(2, wr);
+    txnBatch.beginNextTransaction();
+    txnBatch.write("a1,b2".getBytes());
+    txnBatch.write("a3,b4".getBytes());
+    txnBatch.commit();
+    txnBatch.beginNextTransaction();
+    txnBatch.write("a5,b6".getBytes());
+    txnBatch.write("a7,b8".getBytes());
+    txnBatch.commit();
+    txnBatch.close();
+
+    Assert.assertEquals("", 0, BucketCodec.determineVersion(536870912).decodeWriterId(536870912));
+    rs = queryTable(driver,"select ROW__ID, a, b, INPUT__FILE__NAME from default.streamingnobuckets order by ROW__ID");
+
+    Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"transactionid\":16,\"bucketid\":536870912,\"rowid\":0}\tfoo\tbar"));
+    Assert.assertTrue(rs.get(0), rs.get(0).endsWith("streamingnobuckets/delta_0000016_0000016_0000/bucket_00000"));
+    Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"transactionid\":18,\"bucketid\":536870912,\"rowid\":0}\ta1\tb2"));
+    Assert.assertTrue(rs.get(1), rs.get(1).endsWith("streamingnobuckets/delta_0000018_0000019/bucket_00000"));
+    Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"transactionid\":18,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4"));
+    Assert.assertTrue(rs.get(2), rs.get(2).endsWith("streamingnobuckets/delta_0000018_0000019/bucket_00000"));
+    Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":0}\ta5\tb6"));
+    Assert.assertTrue(rs.get(3), rs.get(3).endsWith("streamingnobuckets/delta_0000018_0000019/bucket_00000"));
+    Assert.assertTrue(rs.get(4), rs.get(4).startsWith("{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":1}\ta7\tb8"));
+    Assert.assertTrue(rs.get(4), rs.get(4).endsWith("streamingnobuckets/delta_0000018_0000019/bucket_00000"));
+
+    queryTable(driver, "update default.streamingnobuckets set a=0, b=0 where a='a7'");
+    queryTable(driver, "delete from default.streamingnobuckets where a='a1'");
+    rs = queryTable(driver, "select a, b from default.streamingnobuckets order by a, b");
+    int row = 0;
+    Assert.assertEquals("at row=" + row, "0\t0", rs.get(row++));
+    Assert.assertEquals("at row=" + row, "a3\tb4", rs.get(row++));
+    Assert.assertEquals("at row=" + row, "a5\tb6", rs.get(row++));
+    Assert.assertEquals("at row=" + row, "foo\tbar", rs.get(row++));
+
+    queryTable(driver, "alter table default.streamingnobuckets compact 'major'");
+    runWorker(conf);
+    rs = queryTable(driver,"select ROW__ID, a, b, INPUT__FILE__NAME from default.streamingnobuckets order by ROW__ID");
+
+    Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"transactionid\":16,\"bucketid\":536870912,\"rowid\":0}\tfoo\tbar"));
+    Assert.assertTrue(rs.get(0), rs.get(0).endsWith("streamingnobuckets/base_0000022/bucket_00000"));
+    Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"transactionid\":18,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4"));
+    Assert.assertTrue(rs.get(1), rs.get(1).endsWith("streamingnobuckets/base_0000022/bucket_00000"));
+    Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":0}\ta5\tb6"));
+    Assert.assertTrue(rs.get(2), rs.get(2).endsWith("streamingnobuckets/base_0000022/bucket_00000"));
+    Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":0}\t0\t0"));
+    Assert.assertTrue(rs.get(3), rs.get(3).endsWith("streamingnobuckets/base_0000022/bucket_00000"));
+  }
+
+  /**
+   * this is a clone from TestTxnStatement2....
+   */
+  public static void runWorker(HiveConf hiveConf) throws MetaException {
+    AtomicBoolean stop = new AtomicBoolean(true);
+    Worker t = new Worker();
+    t.setThreadId((int) t.getId());
+    t.setHiveConf(hiveConf);
+    AtomicBoolean looped = new AtomicBoolean();
+    t.init(stop, looped);
+    t.run();
+  }
 
   // stream data into streaming table with N buckets, then copy the data into another bucketed table
   // check if bucketing in both was done in the same way
@@ -453,8 +533,8 @@ public class TestStreaming {
   }
 
   /**
-   * @deprecated use {@link #checkDataWritten2(Path, long, long, int, String, String...)} - there is
-   * little value in using InputFormat directly
+   * @deprecated use {@link #checkDataWritten2(Path, long, long, int, String, boolean, String...)} -
+   * there is little value in using InputFormat directly
    */
   private void checkDataWritten(Path partitionPath, long minTxn, long maxTxn, int buckets, int numExpectedFiles,
                                 String... records) throws Exception {

http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java
index 8bd23cc..50e5274 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java
@@ -3018,7 +3018,7 @@ public abstract class TestHiveMetaStore extends TestCase {
       Table t = createTable(dbName, tblName, owner, params, null, sd, 0);
       Assert.assertTrue("Expected exception", false);
     } catch (MetaException e) {
-      Assert.assertEquals("The table must be bucketed and stored using an ACID compliant format (such as ORC)", e.getMessage());
+      Assert.assertEquals("The table must be stored using an ACID compliant format (such as ORC)", e.getMessage());
     }
 
     // Fail - "transactional" is set to true, and the table is bucketed, but doesn't use ORC
@@ -3031,7 +3031,7 @@ public abstract class TestHiveMetaStore extends TestCase {
       Table t = createTable(dbName, tblName, owner, params, null, sd, 0);
       Assert.assertTrue("Expected exception", false);
     } catch (MetaException e) {
-      Assert.assertEquals("The table must be bucketed and stored using an ACID compliant format (such as ORC)", e.getMessage());
+      Assert.assertEquals("The table must be stored using an ACID compliant format (such as ORC)", e.getMessage());
     }
 
     // Succeed - "transactional" is set to true, and the table is bucketed, and uses ORC
@@ -3064,13 +3064,14 @@ public abstract class TestHiveMetaStore extends TestCase {
       tblName += "1";
       params.clear();
       sd.unsetBucketCols();
+      sd.setInputFormat("org.apache.hadoop.mapred.FileInputFormat");
       t = createTable(dbName, tblName, owner, params, null, sd, 0);
       params.put("transactional", "true");
       t.setParameters(params);
       client.alter_table(dbName, tblName, t);
       Assert.assertTrue("Expected exception", false);
     } catch (MetaException e) {
-      Assert.assertEquals("The table must be bucketed and stored using an ACID compliant format (such as ORC)", e.getMessage());
+      Assert.assertEquals("The table must be stored using an ACID compliant format (such as ORC)", e.getMessage());
     }
 
     // Succeed - trying to set "transactional" to "true", and satisfies bucketing and Input/OutputFormat requirement
@@ -3078,6 +3079,7 @@ public abstract class TestHiveMetaStore extends TestCase {
     params.clear();
     sd.setNumBuckets(1);
     sd.setBucketCols(bucketCols);
+    sd.setInputFormat("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat");
     t = createTable(dbName, tblName, owner, params, null, sd, 0);
     params.put("transactional", "true");
     t.setParameters(params);

http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java
index 2bf9871..d0b5cf6 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java
@@ -22,7 +22,10 @@ import java.io.File;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
@@ -30,23 +33,26 @@ import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
 import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.ql.io.BucketCodec;
 import org.apache.hadoop.hive.ql.io.HiveInputFormat;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
 import org.apache.hadoop.hive.ql.session.SessionState;
-import org.apache.hadoop.hive.ql.txn.compactor.Cleaner;
-import org.apache.hadoop.hive.ql.txn.compactor.Worker;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This class resides in itests to facilitate running query using Tez engine, since the jars are
  * fully loaded here, which is not the case if it stays in ql.
  */
 public class TestAcidOnTez {
+  static final private Logger LOG = LoggerFactory.getLogger(TestAcidOnTez.class);
   private static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") +
       File.separator + TestAcidOnTez.class.getCanonicalName()
       + "-" + System.currentTimeMillis()
@@ -61,8 +67,10 @@ public class TestAcidOnTez {
   private static enum Table {
     ACIDTBL("acidTbl"),
     ACIDTBLPART("acidTblPart"),
+    ACIDNOBUCKET("acidNoBucket"),
     NONACIDORCTBL("nonAcidOrcTbl"),
-    NONACIDPART("nonAcidPart");
+    NONACIDPART("nonAcidPart"),
+    NONACIDNONBUCKET("nonAcidNonBucket");
 
     private final String name;
     @Override
@@ -159,6 +167,359 @@ public class TestAcidOnTez {
     testJoin("tez", "MapJoin");
   }
 
+  /**
+   * Tests non acid to acid conversion where starting table has non-standard layout, i.e.
+   * where "original" files are not immediate children of the partition dir
+   */
+  @Test
+  public void testNonStandardConversion01() throws Exception {
+    HiveConf confForTez = new HiveConf(hiveConf); // make a clone of existing hive conf
+    setupTez(confForTez);
+    //CTAS with non-ACID target table
+    runStatementOnDriver("create table " + Table.NONACIDNONBUCKET + " stored as ORC TBLPROPERTIES('transactional'='false') as " +
+      "select a, b from " + Table.ACIDTBL + " where a <= 5 union all select a, b from " + Table.NONACIDORCTBL + " where a >= 5", confForTez);
+
+    List<String> rs = runStatementOnDriver("select a, b, INPUT__FILE__NAME from " + Table.NONACIDNONBUCKET + " order by a, b, INPUT__FILE__NAME");
+    String expected0[][] = {
+      {"1\t2", "/1/000000_0"},
+      {"3\t4", "/1/000000_0"},
+      {"5\t6", "/1/000000_0"},
+      {"5\t6", "/2/000000_0"},
+      {"7\t8", "/2/000000_0"},
+      {"9\t10", "/2/000000_0"},
+    };
+    Assert.assertEquals("Unexpected row count after ctas", expected0.length, rs.size());
+    //verify data and layout
+    for(int i = 0; i < expected0.length; i++) {
+      Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected0[i][0]));
+      Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith(expected0[i][1]));
+    }
+    //make the table ACID
+    runStatementOnDriver("alter table " + Table.NONACIDNONBUCKET + " SET TBLPROPERTIES ('transactional'='true')");
+
+    rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDNONBUCKET + " order by ROW__ID");
+    LOG.warn("after ctas:");
+    for (String s : rs) {
+      LOG.warn(s);
+    }
+    Assert.assertEquals(0, BucketCodec.determineVersion(536870912).decodeWriterId(536870912));
+    /*
+    * Expected result 0th entry i the RecordIdentifier + data.  1st entry file before compact*/
+    String expected[][] = {
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "/1/000000_0"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "/1/000000_0"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":2}\t5\t6", "/1/000000_0"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t9\t10", "/2/000000_0"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}\t7\t8", "/2/000000_0"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":5}\t5\t6", "/2/000000_0"},
+    };
+    Assert.assertEquals("Unexpected row count after ctas", expected.length, rs.size());
+    //verify data and layout
+    for(int i = 0; i < expected.length; i++) {
+      Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected[i][0]));
+      Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith(expected[i][1]));
+    }
+    //perform some Update/Delete
+    runStatementOnDriver("update " + Table.NONACIDNONBUCKET + " set a = 70, b  = 80 where a = 7");
+    runStatementOnDriver("delete from " + Table.NONACIDNONBUCKET + " where a = 5");
+    rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDNONBUCKET + " order by ROW__ID");
+    LOG.warn("after update/delete:");
+    for (String s : rs) {
+      LOG.warn(s);
+    }
+    String[][] expected2 = {
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "/1/000000_0"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "/1/000000_0"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t9\t10", "/2/000000_0"},
+      {"{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":0}\t70\t80", "delta_0000021_0000021_0000/bucket_00000"}
+    };
+    Assert.assertEquals("Unexpected row count after update", expected2.length, rs.size());
+    //verify data and layout
+    for(int i = 0; i < expected2.length; i++) {
+      Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected2[i][0]));
+      Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith(expected2[i][1]));
+    }
+    //now make sure delete deltas are present
+    FileSystem fs = FileSystem.get(hiveConf);
+    FileStatus[] status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+      (Table.NONACIDNONBUCKET).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+    String[] expectedDelDelta = {"delete_delta_0000021_0000021_0000", "delete_delta_0000022_0000022_0000"};
+    for(FileStatus stat : status) {
+      for(int i = 0; i < expectedDelDelta.length; i++) {
+        if(expectedDelDelta[i] != null && stat.getPath().toString().endsWith(expectedDelDelta[i])) {
+          expectedDelDelta[i] = null;
+        }
+      }
+    }
+    for(int i = 0; i < expectedDelDelta.length; i++) {
+      Assert.assertNull("at " + i + " " + expectedDelDelta[i] + " not found on disk", expectedDelDelta[i]);
+    }
+    //run Minor compaction
+    runStatementOnDriver("alter table " + Table.NONACIDNONBUCKET + " compact 'minor'");
+    TestTxnCommands2.runWorker(hiveConf);
+    rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDNONBUCKET + " order by ROW__ID");
+    LOG.warn("after compact minor:");
+    for (String s : rs) {
+      LOG.warn(s);
+    }
+    Assert.assertEquals("Unexpected row count after update", expected2.length, rs.size());
+    //verify the data is the same
+    for(int i = 0; i < expected2.length; i++) {
+      Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected2[i][0]));
+      //todo: since HIVE-16669 is not done, Minor compact compacts insert delta as well - it should not
+      //Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith(expected2[i][1]));
+    }
+    //check we have right delete delta files after minor compaction
+    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+      (Table.NONACIDNONBUCKET).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+    String[] expectedDelDelta2 = {"delete_delta_0000021_0000021_0000", "delete_delta_0000022_0000022_0000", "delete_delta_0000021_0000022"};
+    for(FileStatus stat : status) {
+      for(int i = 0; i < expectedDelDelta2.length; i++) {
+        if(expectedDelDelta2[i] != null && stat.getPath().toString().endsWith(expectedDelDelta2[i])) {
+          expectedDelDelta2[i] = null;
+          break;
+        }
+      }
+    }
+    for(int i = 0; i < expectedDelDelta2.length; i++) {
+      Assert.assertNull("at " + i + " " + expectedDelDelta2[i] + " not found on disk", expectedDelDelta2[i]);
+    }
+    //run Major compaction
+    runStatementOnDriver("alter table " + Table.NONACIDNONBUCKET + " compact 'major'");
+    TestTxnCommands2.runWorker(hiveConf);
+    rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDNONBUCKET + " order by ROW__ID");
+    LOG.warn("after compact major:");
+    for (String s : rs) {
+      LOG.warn(s);
+    }
+    Assert.assertEquals("Unexpected row count after major compact", expected2.length, rs.size());
+    for(int i = 0; i < expected2.length; i++) {
+      Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected2[i][0]));
+      //everything is now in base/
+      Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith("base_0000022/bucket_00000"));
+    }
+  }
+  /**
+   * Tests non acid to acid conversion where starting table has non-standard layout, i.e.
+   * where "original" files are not immediate children of the partition dir - partitioned table
+   *
+   * How to do this?  CTAS is the only way to create data files which are not immediate children
+   * of the partition dir.  CTAS/Union/Tez doesn't support partition tables.  The only way is to copy
+   * data files in directly.
+   */
+  @Ignore("HIVE-17214")
+  @Test
+  public void testNonStandardConversion02() throws Exception {
+    HiveConf confForTez = new HiveConf(hiveConf); // make a clone of existing hive conf
+    confForTez.setBoolean("mapred.input.dir.recursive", true);
+    setupTez(confForTez);
+    runStatementOnDriver("create table " + Table.NONACIDNONBUCKET + " stored as ORC " +
+      "TBLPROPERTIES('transactional'='false') as " +
+      "select a, b from " + Table.ACIDTBL + " where a <= 3 union all " +
+      "select a, b from " + Table.NONACIDORCTBL + " where a >= 7 " +
+      "union all select a, b from " + Table.ACIDTBL + " where a = 5", confForTez);
+
+    List<String> rs = runStatementOnDriver("select a, b, INPUT__FILE__NAME from " +
+      Table.NONACIDNONBUCKET + " order by a, b");
+    String expected0[][] = {
+      {"1\t2", "/1/000000_0"},
+      {"3\t4", "/1/000000_0"},
+      {"5\t6", "/3/000000_0"},
+      {"7\t8", "/2/000000_0"},
+      {"9\t10", "/2/000000_0"},
+    };
+    Assert.assertEquals("Unexpected row count after ctas", expected0.length, rs.size());
+    //verify data and layout
+    for(int i = 0; i < expected0.length; i++) {
+      Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected0[i][0]));
+      Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith(expected0[i][1]));
+    }
+    FileSystem fs = FileSystem.get(hiveConf);
+    FileStatus[] status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+      (Table.NONACIDNONBUCKET).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+    //ensure there is partition dir
+    runStatementOnDriver("insert into " + Table.NONACIDPART + " partition (p=1) values (100,110)");
+    //creates more files in that partition
+    for(FileStatus stat : status) {
+      int limit = 5;
+      Path p = stat.getPath();//dirs 1/, 2/, 3/
+      Path to = new Path(TEST_WAREHOUSE_DIR + "/" +  Table.NONACIDPART+ "/p=1/" + p.getName());
+      while(limit-- > 0 && !fs.rename(p, to)) {
+        Thread.sleep(200);
+      }
+      if(limit <= 0) {
+        throw new IllegalStateException("Could not rename " + p + " to " + to);
+      }
+    }
+    /*
+    This is what we expect on disk
+    ekoifman:warehouse ekoifman$ tree nonacidpart/
+    nonacidpart/
+    └── p=1
+    ├── 000000_0
+    ├── 1
+    │   └── 000000_0
+    ├── 2
+    │   └── 000000_0
+    └── 3
+        └── 000000_0
+
+4 directories, 4 files
+    **/
+    //make the table ACID
+    runStatementOnDriver("alter table " + Table.NONACIDPART + " SET TBLPROPERTIES ('transactional'='true')");
+    rs = runStatementOnDriver("select ROW__ID, a, b, p, INPUT__FILE__NAME from " + Table.NONACIDPART + " order by ROW__ID");
+    LOG.warn("after acid conversion:");
+    for (String s : rs) {
+      LOG.warn(s);
+    }
+    String[][] expected = {
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t100\t110\t1", "nonacidpart/p=1/000000_0"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t1\t2\t1", "nonacidpart/p=1/1/000000_0"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":2}\t3\t4\t1", "nonacidpart/p=1/1/000000_0"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t9\t10\t1", "nonacidpart/p=1/2/000000_0"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}\t7\t8\t1", "nonacidpart/p=1/2/000000_0"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":5}\t5\t6\t1", "nonacidpart/p=1/3/000000_0"}
+    };
+    Assert.assertEquals("Wrong row count", expected.length, rs.size());
+    //verify data and layout
+    for(int i = 0; i < expected.length; i++) {
+      Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected[i][0]));
+      Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith(expected[i][1]));
+    }
+
+    //run Major compaction
+    runStatementOnDriver("alter table " + Table.NONACIDPART + " partition (p=1) compact 'major'");
+    TestTxnCommands2.runWorker(hiveConf);
+    rs = runStatementOnDriver("select ROW__ID, a, b, p, INPUT__FILE__NAME from " + Table.NONACIDPART + " order by ROW__ID");
+    LOG.warn("after major compaction:");
+    for (String s : rs) {
+      LOG.warn(s);
+    }
+    //verify data and layout
+    for(int i = 0; i < expected.length; i++) {
+      Assert.assertTrue("Actual line " + i + " ac: " + rs.get(i), rs.get(i).startsWith(expected[i][0]));
+      Assert.assertTrue("Actual line(file) " + i + " ac: " +
+        rs.get(i), rs.get(i).endsWith("nonacidpart/p=1/base_-9223372036854775808/bucket_00000"));
+    }
+
+  }
+  /**
+   * CTAS + Tez + Union creates a non-standard layout in table dir
+   * Each leg of the union places data into a subdir of the table/partition.  Subdirs are named 1/, 2/, etc
+   * The way this currently works is that CTAS creates an Acid table but the insert statement writes
+   * the data in non-acid layout.  Then on read, it's treated like an non-acid to acid conversion.
+   * Longer term CTAS should create acid layout from the get-go.
+   */
+  @Test
+  public void testCtasTezUnion() throws Exception {
+    HiveConf confForTez = new HiveConf(hiveConf); // make a clone of existing hive conf
+    setupTez(confForTez);
+    //CTAS with ACID target table
+    runStatementOnDriver("create table " + Table.ACIDNOBUCKET + " stored as ORC TBLPROPERTIES('transactional'='true') as " +
+      "select a, b from " + Table.ACIDTBL + " where a <= 5 union all select a, b from " + Table.NONACIDORCTBL + " where a >= 5", confForTez);
+    List<String> rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.ACIDNOBUCKET + " order by ROW__ID");
+    LOG.warn("after ctas:");
+    for (String s : rs) {
+      LOG.warn(s);
+    }
+    Assert.assertEquals(0, BucketCodec.determineVersion(536870912).decodeWriterId(536870912));
+    /*
+    * Expected result 0th entry i the RecordIdentifier + data.  1st entry file before compact*/
+    String expected[][] = {
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "/1/000000_0"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "/1/000000_0"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":2}\t5\t6", "/1/000000_0"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t9\t10", "/2/000000_0"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}\t7\t8", "/2/000000_0"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":5}\t5\t6", "/2/000000_0"},
+    };
+    Assert.assertEquals("Unexpected row count after ctas", expected.length, rs.size());
+    //verify data and layout
+    for(int i = 0; i < expected.length; i++) {
+      Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected[i][0]));
+      Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith(expected[i][1]));
+    }
+    //perform some Update/Delete
+    runStatementOnDriver("update " + Table.ACIDNOBUCKET + " set a = 70, b  = 80 where a = 7");
+    runStatementOnDriver("delete from " + Table.ACIDNOBUCKET + " where a = 5");
+    rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.ACIDNOBUCKET + " order by ROW__ID");
+    LOG.warn("after update/delete:");
+    for (String s : rs) {
+      LOG.warn(s);
+    }
+    String[][] expected2 = {
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "/1/000000_0"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "/1/000000_0"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t9\t10", "/2/000000_0"},
+      {"{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":0}\t70\t80", "delta_0000019_0000019_0000/bucket_00000"}
+    };
+    Assert.assertEquals("Unexpected row count after update", expected2.length, rs.size());
+    //verify data and layout
+    for(int i = 0; i < expected2.length; i++) {
+      Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected2[i][0]));
+      Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith(expected2[i][1]));
+    }
+    //now make sure delete deltas are present
+    FileSystem fs = FileSystem.get(hiveConf);
+    FileStatus[] status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+      (Table.ACIDNOBUCKET).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+    String[] expectedDelDelta = {"delete_delta_0000019_0000019_0000", "delete_delta_0000020_0000020_0000"};
+    for(FileStatus stat : status) {
+      for(int i = 0; i < expectedDelDelta.length; i++) {
+        if(expectedDelDelta[i] != null && stat.getPath().toString().endsWith(expectedDelDelta[i])) {
+          expectedDelDelta[i] = null;
+        }
+      }
+    }
+    for(int i = 0; i < expectedDelDelta.length; i++) {
+      Assert.assertNull("at " + i + " " + expectedDelDelta[i] + " not found on disk", expectedDelDelta[i]);
+    }
+    //run Minor compaction
+    runStatementOnDriver("alter table " + Table.ACIDNOBUCKET + " compact 'minor'");
+    TestTxnCommands2.runWorker(hiveConf);
+    rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.ACIDNOBUCKET + " order by ROW__ID");
+    LOG.warn("after compact minor:");
+    for (String s : rs) {
+      LOG.warn(s);
+    }
+    Assert.assertEquals("Unexpected row count after update", expected2.length, rs.size());
+    //verify the data is the same
+    for(int i = 0; i < expected2.length; i++) {
+      Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected2[i][0]));
+      //todo: since HIVE-16669 is not done, Minor compact compacts insert delta as well - it should not
+      //Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith(expected2[i][1]));
+    }
+    //check we have right delete delta files after minor compaction
+    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+      (Table.ACIDNOBUCKET).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+    String[] expectedDelDelta2 = { "delete_delta_0000019_0000019_0000", "delete_delta_0000020_0000020_0000", "delete_delta_0000019_0000020"};
+    for(FileStatus stat : status) {
+      for(int i = 0; i < expectedDelDelta2.length; i++) {
+        if(expectedDelDelta2[i] != null && stat.getPath().toString().endsWith(expectedDelDelta2[i])) {
+          expectedDelDelta2[i] = null;
+          break;
+        }
+      }
+    }
+    for(int i = 0; i < expectedDelDelta2.length; i++) {
+      Assert.assertNull("at " + i + " " + expectedDelDelta2[i] + " not found on disk", expectedDelDelta2[i]);
+    }
+    //run Major compaction
+    runStatementOnDriver("alter table " + Table.ACIDNOBUCKET + " compact 'major'");
+    TestTxnCommands2.runWorker(hiveConf);
+    rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.ACIDNOBUCKET + " order by ROW__ID");
+    LOG.warn("after compact major:");
+    for (String s : rs) {
+      LOG.warn(s);
+    }
+    Assert.assertEquals("Unexpected row count after major compact", expected2.length, rs.size());
+    for(int i = 0; i < expected2.length; i++) {
+      Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected2[i][0]));
+      //everything is now in base/
+      Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith("base_0000020/bucket_00000"));
+    }
+  }
   // Ideally test like this should be a qfile test. However, the explain output from qfile is always
   // slightly different depending on where the test is run, specifically due to file size estimation
   private void testJoin(String engine, String joinType) throws Exception {

http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTezWithSplitUpdate.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTezWithSplitUpdate.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTezWithSplitUpdate.java
deleted file mode 100644
index 3dacf08..0000000
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTezWithSplitUpdate.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql;
-
-/**
- * Same as parent class but covers Acid 2.0 tables
- */
-public class TestAcidOnTezWithSplitUpdate extends TestAcidOnTez {
-  @Override
-  String getTblProperties() {
-    return "TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default')";
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index 37a3757..fa6a2aa 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -456,7 +456,9 @@ minillap.query.files=acid_bucket_pruning.q,\
   llap_stats.q,\
   multi_count_distinct_null.q
 
-minillaplocal.query.files=acid_globallimit.q,\
+minillaplocal.query.files=\
+  acid_no_buckets.q, \
+  acid_globallimit.q,\
   acid_vectorization_missing_cols.q,\
   alter_merge_stats_orc.q,\
   auto_join30.q,\

http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java b/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java
index 023d703..3a3d184 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java
@@ -98,14 +98,26 @@ public final class TransactionalValidationListener extends MetaStorePreEventList
         // that will use it down below.
       }
     }
+    Table oldTable = context.getOldTable();
+    String oldTransactionalValue = null;
+    String oldTransactionalPropertiesValue = null;
+    for (String key : oldTable.getParameters().keySet()) {
+      if (hive_metastoreConstants.TABLE_IS_TRANSACTIONAL.equalsIgnoreCase(key)) {
+        oldTransactionalValue = oldTable.getParameters().get(key);
+      }
+      if (hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES.equalsIgnoreCase(key)) {
+        oldTransactionalPropertiesValue = oldTable.getParameters().get(key);
+      }
+    }
+
     if (transactionalValuePresent) {
       //normalize prop name
       parameters.put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, transactionalValue);
     }
-    if ("true".equalsIgnoreCase(transactionalValue)) {
+    if ("true".equalsIgnoreCase(transactionalValue) && !"true".equalsIgnoreCase(oldTransactionalValue)) {
+      //only need to check conformance if alter table enabled aicd
       if (!conformToAcid(newTable)) {
-        throw new MetaException("The table must be bucketed and stored using an ACID compliant" +
-            " format (such as ORC)");
+        throw new MetaException("The table must be stored using an ACID compliant format (such as ORC)");
       }
 
       if (newTable.getTableType().equals(TableType.EXTERNAL_TABLE.toString())) {
@@ -115,17 +127,6 @@ public final class TransactionalValidationListener extends MetaStorePreEventList
       hasValidTransactionalValue = true;
     }
 
-    Table oldTable = context.getOldTable();
-    String oldTransactionalValue = null;
-    String oldTransactionalPropertiesValue = null;
-    for (String key : oldTable.getParameters().keySet()) {
-      if (hive_metastoreConstants.TABLE_IS_TRANSACTIONAL.equalsIgnoreCase(key)) {
-        oldTransactionalValue = oldTable.getParameters().get(key);
-      }
-      if (hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES.equalsIgnoreCase(key)) {
-        oldTransactionalPropertiesValue = oldTable.getParameters().get(key);
-      }
-    }
 
 
     if (oldTransactionalValue == null ? transactionalValue == null
@@ -195,8 +196,7 @@ public final class TransactionalValidationListener extends MetaStorePreEventList
 
     if ("true".equalsIgnoreCase(transactionalValue)) {
       if (!conformToAcid(newTable)) {
-        throw new MetaException("The table must be bucketed and stored using an ACID compliant" +
-            " format (such as ORC)");
+        throw new MetaException("The table must be stored using an ACID compliant format (such as ORC)");
       }
 
       if (newTable.getTableType().equals(TableType.EXTERNAL_TABLE.toString())) {
@@ -214,14 +214,12 @@ public final class TransactionalValidationListener extends MetaStorePreEventList
     throw new MetaException("'transactional' property of TBLPROPERTIES may only have value 'true'");
   }
 
-  // Check if table is bucketed and InputFormatClass/OutputFormatClass should implement
-  // AcidInputFormat/AcidOutputFormat
+  /**
+   * Check that InputFormatClass/OutputFormatClass should implement
+   * AcidInputFormat/AcidOutputFormat
+   */
   private boolean conformToAcid(Table table) throws MetaException {
     StorageDescriptor sd = table.getSd();
-    if (sd.getBucketColsSize() < 1) {
-      return false;
-    }
-
     try {
       Class inputFormatClass = Class.forName(sd.getInputFormat());
       Class outputFormatClass = Class.forName(sd.getOutputFormat());

http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
index 9c9d4e7..b3ef916 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
@@ -418,8 +418,8 @@ public enum ErrorMsg {
       " does not support these operations."),
   VALUES_TABLE_CONSTRUCTOR_NOT_SUPPORTED(10296,
       "Values clause with table constructor not yet supported"),
-  ACID_OP_ON_NONACID_TABLE(10297, "Attempt to do update or delete on table {0} that does not use " +
-      "an AcidOutputFormat or is not bucketed", true),
+  ACID_OP_ON_NONACID_TABLE(10297, "Attempt to do update or delete on table {0} that is " +
+    "not transactional", true),
   ACID_NO_SORTED_BUCKETS(10298, "ACID insert, update, delete not supported on tables that are " +
       "sorted, table {0}", true),
   ALTER_TABLE_TYPE_PARTIAL_PARTITION_SPEC_NO_SUPPORTED(10299,

http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index 25ad1e9..bc265eb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConfUtil;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
@@ -36,6 +37,7 @@ import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
 import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
 import org.apache.hadoop.hive.ql.io.HivePartitioner;
+import org.apache.hadoop.hive.ql.io.RecordIdentifier;
 import org.apache.hadoop.hive.ql.io.RecordUpdater;
 import org.apache.hadoop.hive.ql.io.StatsProvidingRecordWriter;
 import org.apache.hadoop.hive.ql.io.StreamingOutputFormat;
@@ -285,6 +287,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
   private transient int numFiles;
   protected transient boolean multiFileSpray;
   protected transient final Map<Integer, Integer> bucketMap = new HashMap<Integer, Integer>();
+  private transient boolean isBucketed = false;
 
   private transient ObjectInspector[] partitionObjectInspectors;
   protected transient HivePartitioner<HiveKey, Object> prtner;
@@ -345,6 +348,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
       isNativeTable = !conf.getTableInfo().isNonNative();
       isTemporary = conf.isTemporary();
       multiFileSpray = conf.isMultiFileSpray();
+      this.isBucketed = hconf.getInt(hive_metastoreConstants.BUCKET_COUNT, 0) > 0;
       totalFiles = conf.getTotalFiles();
       numFiles = conf.getNumFiles();
       dpCtx = conf.getDynPartCtx();
@@ -791,9 +795,23 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
           * Hive.copyFiles() will make one of them bucket_N_copy_M in the final location.  The
           * reset of acid (read path) doesn't know how to handle copy_N files except for 'original'
           * files (HIVE-16177)*/
+          int writerId = -1;
+          if(!isBucketed) {
+            assert !multiFileSpray;
+            assert writerOffset == 0;
+            /**For un-bucketed tables, Deletes with ROW__IDs with different 'bucketNum' values can
+            * be written to the same bucketN file.
+            * N in this case is writerId and there is no relationship
+            * between the file name and any property of the data in it.  Inserts will be written
+            * to bucketN file such that all {@link RecordIdentifier#getBucketProperty()} indeed
+            * contain writerId=N.
+            * Since taskId is unique (at least per statementId and thus
+            * per [delete_]delta_x_y_stmtId/) there will not be any copy_N files.*/
+            writerId = Integer.parseInt(Utilities.getTaskIdFromFilename(taskId));
+          }
           fpaths.updaters[writerOffset] = HiveFileFormatUtils.getAcidRecordUpdater(
-            jc, conf.getTableInfo(), bucketNum, conf, fpaths.outPaths[writerOffset],
-            rowInspector, reporter, 0);
+            jc, conf.getTableInfo(), writerId >= 0 ? writerId : bucketNum, conf,
+            fpaths.outPaths[writerOffset], rowInspector, reporter, 0);
           if (LOG.isDebugEnabled()) {
             LOG.debug("Created updater for bucket number " + bucketNum + " using file " +
               fpaths.outPaths[writerOffset]);

http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index 751fca8..69a9f9f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -964,6 +964,9 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     private final boolean allowSyntheticFileIds;
     private final boolean isDefaultFs;
 
+    /**
+     * @param dir - root of partition dir
+     */
     public BISplitStrategy(Context context, FileSystem fs, Path dir,
         List<HdfsFileStatusWithId> fileStatuses, boolean isOriginal, List<DeltaMetaData> deltas,
         boolean[] covered, boolean allowSyntheticFileIds, boolean isDefaultFs) {
@@ -996,7 +999,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
             }
             OrcSplit orcSplit = new OrcSplit(fileStatus.getPath(), fileKey, entry.getKey(),
                 entry.getValue().getLength(), entry.getValue().getHosts(), null, isOriginal, true,
-                deltas, -1, logicalLen);
+                deltas, -1, logicalLen, dir);
             splits.add(orcSplit);
           }
         }
@@ -1017,18 +1020,16 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
    * ACID split strategy is used when there is no base directory (when transactions are enabled).
    */
   static class ACIDSplitStrategy implements SplitStrategy<OrcSplit> {
-    private Path dir;
+    Path dir;
     private List<DeltaMetaData> deltas;
-    private boolean[] covered;
-    private int numBuckets;
     private AcidOperationalProperties acidOperationalProperties;
-
+    /**
+     * @param dir root of partition dir
+     */
     ACIDSplitStrategy(Path dir, int numBuckets, List<DeltaMetaData> deltas, boolean[] covered,
         AcidOperationalProperties acidOperationalProperties) {
       this.dir = dir;
-      this.numBuckets = numBuckets;
       this.deltas = deltas;
-      this.covered = covered;
       this.acidOperationalProperties = acidOperationalProperties;
     }
 
@@ -1234,6 +1235,8 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     private final UserGroupInformation ugi;
     private final boolean allowSyntheticFileIds;
     private SchemaEvolution evolution;
+    //this is the root of the partition in which the 'file' is located
+    private final Path rootDir;
 
     public SplitGenerator(SplitInfo splitInfo, UserGroupInformation ugi,
         boolean allowSyntheticFileIds, boolean isDefaultFs) throws IOException {
@@ -1250,6 +1253,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
       this.isOriginal = splitInfo.isOriginal;
       this.deltas = splitInfo.deltas;
       this.hasBase = splitInfo.hasBase;
+      this.rootDir = splitInfo.dir;
       this.projColsUncompressedSize = -1;
       this.deltaSplits = splitInfo.getSplits();
       this.allowSyntheticFileIds = allowSyntheticFileIds;
@@ -1361,7 +1365,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
         fileKey = new SyntheticFileId(file);
       }
       return new OrcSplit(file.getPath(), fileKey, offset, length, hosts,
-          orcTail, isOriginal, hasBase, deltas, scaledProjSize, fileLen);
+          orcTail, isOriginal, hasBase, deltas, scaledProjSize, fileLen, rootDir);
     }
 
     private static final class OffsetAndLength { // Java cruft; pair of long.
@@ -1641,7 +1645,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
       pathFutures.add(ecs.submit(fileGenerator));
     }
 
-    boolean isTransactionalTableScan =
+    boolean isTransactionalTableScan =//this never seems to be set correctly
         HiveConf.getBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN);
     boolean isSchemaEvolution = HiveConf.getBoolVar(conf, ConfVars.HIVE_SCHEMA_EVOLUTION);
     TypeDescription readerSchema =
@@ -1932,16 +1936,17 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
 
     final OrcSplit split = (OrcSplit) inputSplit;
     final Path path = split.getPath();
-
     Path root;
     if (split.hasBase()) {
       if (split.isOriginal()) {
-        root = path.getParent();
+        root = split.getRootDir();
       } else {
         root = path.getParent().getParent();
+        assert root.equals(split.getRootDir()) : "root mismatch: baseDir=" + split.getRootDir() +
+          " path.p.p=" + root;
       }
-    } else {//here path is a delta/ but above it's a partition/
-      root = path;
+    } else {
+      throw new IllegalStateException("Split w/o base: " + path);
     }
 
     // Retrieve the acidOperationalProperties for the table, initialized in HiveInputFormat.
@@ -2037,21 +2042,6 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
       }
     };
   }
-  private static Path findOriginalBucket(FileSystem fs,
-                                 Path directory,
-                                 int bucket) throws IOException {
-    for(FileStatus stat: fs.listStatus(directory)) {
-      if(stat.getLen() <= 0) {
-        continue;
-      }
-      AcidOutputFormat.Options bucketInfo =
-        AcidUtils.parseBaseOrDeltaBucketFilename(stat.getPath(), fs.getConf());
-      if(bucketInfo.getBucketId() == bucket) {
-        return stat.getPath();
-      }
-    }
-    throw new IllegalArgumentException("Can't find bucket " + bucket + " in " + directory);
-  }
 
   static Reader.Options createOptionsForReader(Configuration conf) {
     /**
@@ -2275,20 +2265,22 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
                                            ) throws IOException {
     Reader reader = null;
     boolean isOriginal = false;
-    if (baseDirectory != null) {
-      Path bucketFile;
+    if (baseDirectory != null) {//this is NULL for minor compaction
+      Path bucketFile = null;
       if (baseDirectory.getName().startsWith(AcidUtils.BASE_PREFIX)) {
         bucketFile = AcidUtils.createBucketFile(baseDirectory, bucket);
       } else {
+        /**we don't know which file to start reading -
+         * {@link OrcRawRecordMerger.OriginalReaderPairToCompact} does*/
         isOriginal = true;
-        bucketFile = findOriginalBucket(baseDirectory.getFileSystem(conf),
-            baseDirectory, bucket);
       }
-      reader = OrcFile.createReader(bucketFile, OrcFile.readerOptions(conf));
+      if(bucketFile != null) {
+        reader = OrcFile.createReader(bucketFile, OrcFile.readerOptions(conf));
+      }
     }
     OrcRawRecordMerger.Options mergerOptions = new OrcRawRecordMerger.Options()
       .isCompacting(true)
-      .rootPath(baseDirectory);
+      .rootPath(baseDirectory).isMajorCompaction(baseDirectory != null);
     return new OrcRawRecordMerger(conf, collapseEvents, reader, isOriginal,
         bucket, validTxnList, new Reader.Options(), deltaDirectory, mergerOptions);
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
index 97c4e3d..cbbb4c4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
@@ -22,6 +22,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
 import org.apache.hadoop.hive.ql.io.BucketCodec;
@@ -29,7 +31,6 @@ import org.apache.hadoop.hive.shims.HadoopShims;
 import org.apache.orc.OrcUtils;
 import org.apache.orc.StripeInformation;
 import org.apache.orc.TypeDescription;
-import org.apache.orc.impl.AcidStats;
 import org.apache.orc.impl.OrcAcidUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -68,7 +69,6 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
   private final RecordIdentifier maxKey;
   // an extra value so that we can return it while reading ahead
   private OrcStruct extraValue;
-
   /**
    * A RecordIdentifier extended with the current transaction id. This is the
    * key of our merge sort with the originalTransaction, bucket, and rowId
@@ -294,9 +294,15 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
    * Running multiple Insert statements on the same partition (of non acid table) creates files
    * like so: 00000_0, 00000_0_copy1, 00000_0_copy2, etc.  So the OriginalReaderPair must treat all
    * of these files as part of a single logical bucket file.
+   *
+   * Also, for unbucketed (non acid) tables, there are no guarantees where data files may be placed.
+   * For example, CTAS+Tez+Union creates subdirs 1/, 2/, etc for each leg of the Union.  Thus the
+   * data file need not be an immediate child of partition dir.  All files for a given writerId are
+   * treated as one logical unit to assign {@link RecordIdentifier}s to them consistently.
    * 
    * For Compaction, where each split includes the whole bucket, this means reading over all the
    * files in order to assign ROW__ID.rowid in one sequence for the entire logical bucket.
+   * For unbucketed tables, a Compaction split is all files written by a given writerId.
    *
    * For a read after the table is marked transactional but before it's rewritten into a base/
    * by compaction, each of the original files may be split into many pieces.  For each split we
@@ -305,7 +311,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
    * split of the original file and used to filter rows from all the deltas.  The ROW__ID.rowid for
    * the rows of the 'original' file of course, must be assigned from the beginning of logical
    * bucket.  The last split of the logical bucket, i.e. the split that has the end of last file,
-   * should include all insert events from deltas.
+   * should include all insert events from deltas (last sentence is obsolete for Acid 2: HIVE-17320)
    */
   private static abstract class OriginalReaderPair implements ReaderPair {
     OrcStruct nextRecord;
@@ -407,18 +413,18 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
       RecordIdentifier newMaxKey = maxKey;
       recordReader = reader.rowsOptions(options);
       /**
-       * Logically each bucket consists of 0000_0, 0000_0_copy_1... 0000_0_copyN. etc  We don't
-       * know N a priori so if this is true, then the current split is from 0000_0_copyN file.
+       * Logically each bucket consists of 0000_0, 0000_0_copy_1... 0000_0_copy_N. etc  We don't
+       * know N a priori so if this is true, then the current split is from 0000_0_copy_N file.
        * It's needed to correctly set maxKey.  In particular, set maxKey==null if this split
        * is the tail of the last file for this logical bucket to include all deltas written after
-       * non-acid to acid table conversion.
+       * non-acid to acid table conversion (todo: HIVE-17320).
+       * Also, see comments at {@link OriginalReaderPair} about unbucketed tables.
        */
-      boolean isLastFileForThisBucket = false;
+      boolean isLastFileForThisBucket = true;
       boolean haveSeenCurrentFile = false;
       long rowIdOffsetTmp = 0;
-      if (mergerOptions.getCopyIndex() > 0) {
+      {
         //the split is from something other than the 1st file of the logical bucket - compute offset
-
         AcidUtils.Directory directoryState = AcidUtils.getAcidState(mergerOptions.getRootPath(),
           conf, validTxnList, false, true);
         for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) {
@@ -467,23 +473,6 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
             maxKey.setRowId(maxKey.getRowId() + rowIdOffset);
           }
         }
-      } else {
-        rowIdOffset = 0;
-        isLastFileForThisBucket = true;
-        AcidUtils.Directory directoryState = AcidUtils.getAcidState(mergerOptions.getRootPath(),
-          conf, validTxnList, false, true);
-        int numFilesInBucket = 0;
-        for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) {
-          AcidOutputFormat.Options bucketOptions =
-            AcidUtils.parseBaseOrDeltaBucketFilename(f.getFileStatus().getPath(), conf);
-          if (bucketOptions.getBucketId() == bucketId) {
-            numFilesInBucket++;
-            if (numFilesInBucket > 1) {
-              isLastFileForThisBucket = false;
-              break;
-            }
-          }
-        }
       }
       if (!isLastFileForThisBucket && maxKey == null) {
           /*
@@ -651,6 +640,12 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
   }
   /**
    * Find the key range for original bucket files.
+   * For unbucketed tables the insert event data is still written to bucket_N file except that
+   * N is just a writer ID - it still matches {@link RecordIdentifier#getBucketProperty()}.  For
+   * 'original' files (ubucketed) the same applies.  A file 000000_0 encodes a taskId/wirterId and
+   * at read time we synthesize {@link RecordIdentifier#getBucketProperty()} to match the file name
+   * and so the same bucketProperty is used here to create minKey/maxKey, i.e. these keys are valid
+   * to filter data from delete_delta files even for unbucketed tables.
    * @param reader the reader
    * @param bucket the bucket number we are reading
    * @param options the options for reading with
@@ -740,7 +735,6 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
    */
   static Reader.Options createEventOptions(Reader.Options options) {
     Reader.Options result = options.clone();
-    //result.range(options.getOffset(), Long.MAX_VALUE);WTF?
     result.include(options.getInclude());
 
     // slide the column names down by 6 for the name array
@@ -755,11 +749,17 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
     return result;
   }
 
+  /**
+   * {@link OrcRawRecordMerger} Acid reader is used slightly differently in various contexts.
+   * This makes the "context" explicit.
+   */
   static class Options {
     private int copyIndex = 0;
     private boolean isCompacting = false;
     private Path bucketPath;
     private Path rootPath;
+    private boolean isMajorCompaction = false;
+    private boolean isDeleteReader = false;
     Options copyIndex(int copyIndex) {
       assert copyIndex >= 0;
       this.copyIndex = copyIndex;
@@ -767,6 +767,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
     }
     Options isCompacting(boolean isCompacting) {
       this.isCompacting = isCompacting;
+      assert !isDeleteReader;
       return this;
     }
     Options bucketPath(Path bucketPath) {
@@ -777,6 +778,16 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
       this.rootPath = rootPath;
       return this;
     }
+    Options isMajorCompaction(boolean isMajor) {
+      this.isMajorCompaction = isMajor;
+      assert !isDeleteReader;
+      return this;
+    }
+    Options isDeleteReader(boolean isDeleteReader) {
+      this.isDeleteReader = isDeleteReader;
+      assert !isCompacting;
+      return this;
+    }
     /**
      * 0 means it's the original file, without {@link Utilities#COPY_KEYWORD} suffix
      */
@@ -788,7 +799,6 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
     }
     /**
      * Full path to the data file
-     * @return
      */
     Path getBucketPath() {
       return bucketPath;
@@ -797,6 +807,22 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
      * Partition folder (Table folder if not partitioned)
      */
     Path getRootPath()  { return rootPath; }
+    /**
+     * @return true if major compaction, false if minor
+     */
+    boolean isMajorCompaction() {
+      return isMajorCompaction && isCompacting;
+    }
+    boolean isMinorCompaction() {
+      return !isMajorCompaction && isCompacting;
+    }
+    /**
+     * true if this is only processing delete deltas to load in-memory table for
+     * vectorized reader
+     */
+    boolean isDeleteReader() {
+      return isDeleteReader;
+    }
   }
   /**
    * Create a reader that merge sorts the ACID events together.
@@ -820,6 +846,39 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
     this.offset = options.getOffset();
     this.length = options.getLength();
     this.validTxnList = validTxnList;
+    /**
+     * @since Hive 3.0
+     * With split update (HIVE-14035) we have base/, delta/ and delete_delta/ - the latter only
+     * has Delete events and the others only have Insert events.  Thus {@link #baseReader} is
+     * a split of a file in base/ or delta/.
+     *
+     * For Compaction, each split (for now) is a logical bucket, i.e. all files from base/ + delta(s)/
+     * for a given bucket ID and delete_delta(s)/
+     *
+     * For bucketed tables, the data files are named bucket_N and all rows in this file are such
+     * that {@link org.apache.hadoop.hive.ql.io.BucketCodec#decodeWriterId(int)} of
+     * {@link RecordIdentifier#getBucketProperty()} is N.  This is currently true for all types of
+     * files but may not be true for for delete_delta/ files in the future.
+     *
+     * For un-bucketed tables, the system is designed so that it works when there is no relationship
+     * between delete_delta file name (bucket_N) and the value of {@link RecordIdentifier#getBucketProperty()}.
+     * (Later we this maybe optimized to take advantage of situations where it is known that
+     * bucket_N matches bucketProperty().)  This implies that for a given {@link baseReader} all
+     * files in delete_delta/ have to be opened ({@link ReaderPair} created).  Insert events are
+     * still written such that N in file name (writerId) matches what's in bucketProperty().
+     *
+     * Compactor for un-bucketed tables works exactly the same as for bucketed ones though it
+     * should be optimized (see HIVE-17206).  In particular, each split is a set of files
+     * created by a writer with the same writerId, i.e. all bucket_N files across base/ &
+     * deleta/ for the same N. Unlike bucketed tables, there is no relationship between
+     * any values in user columns to file name.
+     * The maximum N is determined by the number of writers the system chose for the the "largest"
+     * write into a given partition.
+     *
+     * In both cases, Compactor should be changed so that Minor compaction is run very often and
+     * only compacts delete_delta/.  Major compaction can do what it does now.
+     */
+    boolean isBucketed = conf.getInt(hive_metastoreConstants.BUCKET_COUNT, 0) > 0;
 
     TypeDescription typeDescr =
         OrcInputFormat.getDesiredRowTypeDescr(conf, true, Integer.MAX_VALUE);
@@ -829,16 +888,26 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
 
     // modify the options to reflect the event instead of the base row
     Reader.Options eventOptions = createEventOptions(options);
-    if (reader == null) {
+    if((mergerOptions.isCompacting() && mergerOptions.isMinorCompaction()) ||
+      mergerOptions.isDeleteReader()) {
+      //for minor compaction, there is no progress report and we don't filter deltas
       baseReader = null;
       minKey = maxKey = null;
+      assert reader == null : "unexpected input reader during minor compaction: " +
+        mergerOptions.getRootPath();
     } else {
       KeyInterval keyInterval;
-      // find the min/max based on the offset and length (and more for 'original')
-      if (isOriginal) {
-        keyInterval = discoverOriginalKeyBounds(reader, bucket, options, conf);
+      if (mergerOptions.isCompacting()) {
+        assert mergerOptions.isMajorCompaction();
+        //compaction doesn't filter deltas but *may* have a reader for 'base'
+        keyInterval = new KeyInterval(null, null);
       } else {
-        keyInterval = discoverKeyBounds(reader, options);
+        // find the min/max based on the offset and length (and more for 'original')
+        if (isOriginal) {
+          keyInterval = discoverOriginalKeyBounds(reader, bucket, options, conf);
+        } else {
+          keyInterval = discoverKeyBounds(reader, options);
+        }
       }
       LOG.info("min key = " + keyInterval.getMinKey() + ", max key = " + keyInterval.getMaxKey());
       // use the min/max instead of the byte range
@@ -849,8 +918,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
         if(mergerOptions.isCompacting()) {
           pair = new OriginalReaderPairToCompact(key, bucket, options, mergerOptions,
             conf, validTxnList);
-        }
-        else {
+        } else {
           pair = new OriginalReaderPairToRead(key, reader, bucket, keyInterval.getMinKey(),
             keyInterval.getMaxKey(), options, mergerOptions, conf, validTxnList);
         }
@@ -868,35 +936,31 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
       baseReader = pair.getRecordReader();
     }
 
-    // we always want to read all of the deltas
-    eventOptions.range(0, Long.MAX_VALUE);
     if (deltaDirectory != null) {
+      /*whatever SARG maybe applicable to base it's not applicable to delete_delta since it has no
+      * user columns
+      * HIVE-17320: we should compute a SARG to push down min/max key to delete_delta*/
+      Reader.Options deltaEventOptions = eventOptions.clone()
+        .searchArgument(null, null).range(0, Long.MAX_VALUE);
       for(Path delta: deltaDirectory) {
         if(!mergerOptions.isCompacting() && !AcidUtils.isDeleteDelta(delta)) {
           //all inserts should be in baseReader for normal read so this should always be delete delta if not compacting
           throw new IllegalStateException(delta + " is not delete delta and is not compacting.");
         }
         ReaderKey key = new ReaderKey();
-        Path deltaFile = AcidUtils.createBucketFile(delta, bucket);
         AcidUtils.ParsedDelta deltaDir = AcidUtils.parsedDelta(delta);
-        FileSystem fs = deltaFile.getFileSystem(conf);
-        long length = OrcAcidUtils.getLastFlushLength(fs, deltaFile);
-        if (length != -1 && fs.exists(deltaFile)) {
-          Reader deltaReader = OrcFile.createReader(deltaFile,
-              OrcFile.readerOptions(conf).maxLength(length));
-          Reader.Options deltaEventOptions = null;
-          if(eventOptions.getSearchArgument() != null) {
-            // Turn off the sarg before pushing it to delta.  We never want to push a sarg to a delta as
-            // it can produce wrong results (if the latest valid version of the record is filtered out by
-            // the sarg) or ArrayOutOfBounds errors (when the sarg is applied to a delete record)
-            // unless the delta only has insert events
-            AcidStats acidStats = OrcAcidUtils.parseAcidStats(deltaReader);
-            if(acidStats.deletes > 0 || acidStats.updates > 0) {
-              deltaEventOptions = eventOptions.clone().searchArgument(null, null);
-            }
+        for (Path deltaFile : getDeltaFiles(delta, bucket, conf, mergerOptions, isBucketed)) {
+          FileSystem fs = deltaFile.getFileSystem(conf);
+          if(!fs.exists(deltaFile)) {
+            continue;
           }
+          /* side files are only created by streaming ingest.  If this is a compaction, we may
+          * have an insert delta/ here with side files there because the original writer died.*/
+          long length = AcidUtils.getLogicalLength(fs, fs.getFileStatus(deltaFile));
+          assert length >= 0;
+          Reader deltaReader = OrcFile.createReader(deltaFile, OrcFile.readerOptions(conf).maxLength(length));
           ReaderPairAcid deltaPair = new ReaderPairAcid(key, deltaReader, minKey, maxKey,
-            deltaEventOptions != null ? deltaEventOptions : eventOptions, deltaDir.getStatementId());
+            deltaEventOptions, deltaDir.getStatementId());
           if (deltaPair.nextRecord() != null) {
             readers.put(key, deltaPair);
           }
@@ -921,6 +985,59 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
     }
   }
 
+  /**
+   * This determines the set of {@link ReaderPairAcid} to create for a given delta/.
+   * For unbucketed tables {@code bucket} can be thought of as a write tranche.
+   */
+  static Path[] getDeltaFiles(Path deltaDirectory, int bucket, Configuration conf,
+                              Options mergerOptions, boolean isBucketed) throws IOException {
+    if(isBucketed) {
+      /**
+       * for bucketed tables (for now) we always trust that the N in bucketN file name means that
+       * all records have {@link RecordIdentifier#getBucketProperty()} encoding bucketId = N.  This
+       * means that a delete event in bucketN can only modify an insert in another bucketN file for
+       * the same N. (Down the road we may trust it only in certain delta dirs)
+       *
+       * Compactor takes all types of deltas for a given bucket.  For regular read, any file that
+       * contains (only) insert events is treated as base and only
+       * delete_delta/ are treated as deltas.
+       */
+        assert (!mergerOptions.isCompacting &&
+          deltaDirectory.getName().startsWith(AcidUtils.DELETE_DELTA_PREFIX)
+        ) || mergerOptions.isCompacting : "Unexpected delta: " + deltaDirectory;
+      Path deltaFile = AcidUtils.createBucketFile(deltaDirectory, bucket);
+      return new Path[]{deltaFile};
+    }
+    /**
+     * For unbucketed tables insert events are also stored in bucketN files but here N is
+     * the writer ID.  We can trust that N matches info in {@link RecordIdentifier#getBucketProperty()}
+     * delta_x_y but it's not required since we can't trust N for delete_delta_x_x/bucketN.
+     * Thus we always have to take all files in a delete_delta.
+     * For regular read, any file that has (only) insert events is treated as base so
+     * {@link deltaDirectory} can only be delete_delta and so we take all files in it.
+     * For compacting, every split contains base/bN + delta(s)/bN + delete_delta(s){all buckets} for
+     * a given N.
+     */
+    if(deltaDirectory.getName().startsWith(AcidUtils.DELETE_DELTA_PREFIX)) {
+      //it's not wrong to take all delete events for bucketed tables but it's more efficient
+      //to only take those that belong to the 'bucket' assuming we trust the file name
+      //un-bucketed table - get all files
+      FileSystem fs = deltaDirectory.getFileSystem(conf);
+      FileStatus[] dataFiles = fs.listStatus(deltaDirectory, AcidUtils.bucketFileFilter);
+      Path[] deltaFiles = new Path[dataFiles.length];
+      int i = 0;
+      for (FileStatus stat : dataFiles) {
+        deltaFiles[i++] = stat.getPath();
+      }//todo: need a test where we actually have more than 1 file
+      return deltaFiles;
+    }
+    //if here it must be delta_x_y - insert events only, so we must be compacting
+    assert mergerOptions.isCompacting() : "Expected to be called as part of compaction";
+    Path deltaFile = AcidUtils.createBucketFile(deltaDirectory, bucket);
+    return new Path[] {deltaFile};
+
+  }
+  
   @VisibleForTesting
   RecordIdentifier getMinKey() {
     return minKey;

http://git-wip-us.apache.org/repos/asf/hive/blob/6be50b76/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
index 429960b..1e19a91 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
@@ -243,7 +243,8 @@ public class OrcRecordUpdater implements RecordUpdater {
     }
     if (options.getMinimumTransactionId() != options.getMaximumTransactionId()
         && !options.isWritingBase()){
-      flushLengths = fs.create(OrcAcidUtils.getSideFile(this.path), true, 8,
+      //throw if file already exists as that should never happen
+      flushLengths = fs.create(OrcAcidUtils.getSideFile(this.path), false, 8,
           options.getReporter());
       flushLengths.writeLong(0);
       OrcInputFormat.SHIMS.hflush(flushLengths);
@@ -349,6 +350,12 @@ public class OrcRecordUpdater implements RecordUpdater {
       return newInspector;
     }
   }
+
+  /**
+   * The INSERT event always uses {@link #bucket} that this {@link RecordUpdater} was created with
+   * thus even for unbucketed tables, the N in bucket_N file name matches writerId/bucketId even for
+   * late split
+   */
   private void addSimpleEvent(int operation, long currentTransaction, long rowId, Object row)
       throws IOException {
     this.operation.set(operation);
@@ -394,6 +401,11 @@ public class OrcRecordUpdater implements RecordUpdater {
     Integer currentBucket = null;
 
     if (operation == DELETE_OPERATION || operation == UPDATE_OPERATION) {
+      /**
+       * make sure bucketProperty in the delete event is from the {@link row} rather than whatever
+       * {@link this#bucket} is.  For bucketed tables, the 2 must agree on bucketId encoded in it
+       * not for necessarily the whole value.  For unbucketed tables there is no relationship.
+       */
       currentBucket = setBucket(bucketInspector.get(
         recIdInspector.getStructFieldData(rowValue, bucketField)), operation);
       // Initialize a deleteEventWriter if not yet done. (Lazy initialization)