You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2017/08/16 01:14:45 UTC

[3/3] hive git commit: HIVE-17089 - make acid 2.0 the default (Eugene Koifman, reviewed by Sergey Shelukhin)

HIVE-17089 - make acid 2.0 the default (Eugene Koifman, reviewed by Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/34b0e07a
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/34b0e07a
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/34b0e07a

Branch: refs/heads/master
Commit: 34b0e07a3bd5002b197b749e3e5a7992e196c237
Parents: 5061683
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Tue Aug 15 18:13:44 2017 -0700
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Tue Aug 15 18:13:44 2017 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   9 +-
 .../hive/hcatalog/streaming/TestStreaming.java  | 147 ++++-
 .../streaming/mutate/StreamingAssert.java       |  45 +-
 .../streaming/mutate/TestMutations.java         |  17 +-
 .../TransactionalValidationListener.java        |   2 -
 .../hadoop/hive/ql/exec/FileSinkOperator.java   |   2 -
 .../hadoop/hive/ql/io/AcidInputFormat.java      |   6 +
 .../org/apache/hadoop/hive/ql/io/AcidUtils.java |  90 ++-
 .../hadoop/hive/ql/io/orc/OrcInputFormat.java   |  41 +-
 .../hadoop/hive/ql/io/orc/OrcOutputFormat.java  |   2 +-
 .../hive/ql/io/orc/OrcRawRecordMerger.java      |  41 +-
 .../hadoop/hive/ql/io/orc/OrcRecordUpdater.java |  60 +-
 .../apache/hadoop/hive/ql/TestTxnCommands.java  |  20 +-
 .../apache/hadoop/hive/ql/TestTxnCommands2.java | 186 ++++---
 .../ql/TestTxnCommands2WithSplitUpdate.java     | 545 -------------------
 ...ommands2WithSplitUpdateAndVectorization.java |   4 +-
 .../apache/hadoop/hive/ql/io/TestAcidUtils.java |  31 +-
 .../hive/ql/io/orc/TestInputOutputFormat.java   | 191 ++++---
 .../hive/ql/io/orc/TestOrcRawRecordMerger.java  | 509 ++++++++++++-----
 .../hive/ql/io/orc/TestOrcRecordUpdater.java    |  36 +-
 .../TestVectorizedOrcAcidRowBatchReader.java    |   4 -
 .../hive/ql/txn/compactor/CompactorTest.java    |   8 +
 .../hive/ql/txn/compactor/TestWorker.java       | 125 +++--
 .../dynamic_semijoin_reduction_3.q              |   2 +-
 .../llap/dynamic_semijoin_reduction_3.q.out     |   4 +-
 .../test/results/clientpositive/row__id.q.out   |  18 +-
 26 files changed, 1086 insertions(+), 1059 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index b154544..3c158a6 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1841,12 +1841,9 @@ public class HiveConf extends Configuration {
         " of the lock manager is dumped to log file.  This is for debugging.  See also " +
         "hive.lock.numretries and hive.lock.sleep.between.retries."),
 
-    HIVE_TXN_OPERATIONAL_PROPERTIES("hive.txn.operational.properties", 0,
-        "Sets the operational properties that control the appropriate behavior for various\n"
-        + "versions of the Hive ACID subsystem. Setting it to zero will turn on the legacy mode\n"
-        + "for ACID, while setting it to one will enable a split-update feature found in the newer\n"
-        + "version of Hive ACID subsystem. Mostly it is intended to be used as an internal property\n"
-        + "for future versions of ACID. (See HIVE-14035 for details.)"),
+    HIVE_TXN_OPERATIONAL_PROPERTIES("hive.txn.operational.properties", 1,
+        "This is intended to be used as an internal property for future versions of ACID. (See\n" +
+        "HIVE-14035 for details.)"),
 
     HIVE_MAX_OPEN_TXNS("hive.max.open.txns", 100000, "Maximum number of open transactions. If \n" +
         "current open transactions reach this limit, future open transaction requests will be \n" +

http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
index 5e8fe62..f3ef92b 100644
--- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
@@ -27,6 +27,7 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
@@ -44,7 +45,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hive.cli.CliSessionState;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.conf.Validator;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
@@ -59,6 +60,7 @@ import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
 import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
 import org.apache.hadoop.hive.metastore.api.TxnInfo;
 import org.apache.hadoop.hive.metastore.api.TxnState;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
 import org.apache.hadoop.hive.ql.CommandNeedRetryException;
 import org.apache.hadoop.hive.ql.Driver;
@@ -97,6 +99,8 @@ import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.BUCKET_COUNT;
+
 
 public class TestStreaming {
   private static final Logger LOG = LoggerFactory.getLogger(TestStreaming.class);
@@ -448,7 +452,10 @@ public class TestStreaming {
     }
   }
 
-
+  /**
+   * @deprecated use {@link #checkDataWritten2(Path, long, long, int, String, String...)} - there is
+   * little value in using InputFormat directly
+   */
   private void checkDataWritten(Path partitionPath, long minTxn, long maxTxn, int buckets, int numExpectedFiles,
                                 String... records) throws Exception {
     ValidTxnList txns = msClient.getValidTxns();
@@ -473,13 +480,14 @@ public class TestStreaming {
     InputFormat inf = new OrcInputFormat();
     JobConf job = new JobConf();
     job.set("mapred.input.dir", partitionPath.toString());
-    job.set("bucket_count", Integer.toString(buckets));
+    job.set(BUCKET_COUNT, Integer.toString(buckets));
     job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, "id,msg");
     job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, "bigint:string");
-    job.set(ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN.varname, "true");
+    AcidUtils.setTransactionalTableScan(job,true);
+    job.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true);
     job.set(ValidTxnList.VALID_TXNS_KEY, txns.toString());
     InputSplit[] splits = inf.getSplits(job, buckets);
-    Assert.assertEquals(buckets, splits.length);
+    Assert.assertEquals(numExpectedFiles, splits.length);
     org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct> rr =
             inf.getRecordReader(splits[0], job, Reporter.NULL);
 
@@ -491,6 +499,48 @@ public class TestStreaming {
     }
     Assert.assertEquals(false, rr.next(key, value));
   }
+  /**
+   * @param validationQuery query to read from table to compare data against {@code records}
+   * @param records expected data.  each row is CVS list of values
+   */
+  private void checkDataWritten2(Path partitionPath, long minTxn, long maxTxn, int numExpectedFiles,
+                                String validationQuery, boolean vectorize, String... records) throws Exception {
+    ValidTxnList txns = msClient.getValidTxns();
+    AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, txns);
+    Assert.assertEquals(0, dir.getObsolete().size());
+    Assert.assertEquals(0, dir.getOriginalFiles().size());
+    List<AcidUtils.ParsedDelta> current = dir.getCurrentDirectories();
+    System.out.println("Files found: ");
+    for (AcidUtils.ParsedDelta pd : current) System.out.println(pd.getPath().toString());
+    Assert.assertEquals(numExpectedFiles, current.size());
+
+    // find the absolute minimum transaction
+    long min = Long.MAX_VALUE;
+    long max = Long.MIN_VALUE;
+    for (AcidUtils.ParsedDelta pd : current) {
+      if (pd.getMaxTransaction() > max) max = pd.getMaxTransaction();
+      if (pd.getMinTransaction() < min) min = pd.getMinTransaction();
+    }
+    Assert.assertEquals(minTxn, min);
+    Assert.assertEquals(maxTxn, max);
+    boolean isVectorizationEnabled = conf.getBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED);
+    if(vectorize) {
+      conf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true);
+    }
+
+    String currStrategy = conf.getVar(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY);
+    for(String strategy : ((Validator.StringSet)HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.getValidator()).getExpected()) {
+      //run it with each split strategy - make sure there are differences
+      conf.setVar(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY, strategy.toUpperCase());
+      List<String> actualResult = queryTable(driver, validationQuery);
+      for (int i = 0; i < actualResult.size(); i++) {
+        Assert.assertEquals("diff at [" + i + "].  actual=" + actualResult + " expected=" +
+          Arrays.toString(records), records[i], actualResult.get(i));
+      }
+    }
+    conf.setVar(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY, currStrategy);
+    conf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, isVectorizationEnabled);
+  }
 
   private void checkNothingWritten(Path partitionPath) throws Exception {
     ValidTxnList txns = msClient.getValidTxns();
@@ -1016,15 +1066,15 @@ public class TestStreaming {
     txnBatch.beginNextTransaction();
     txnBatch.write("1,Hello streaming".getBytes());
     txnBatch.commit();
-
-    checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}");
+    String validationQuery = "select id, msg from " + dbName + "." + tblName + " order by id, msg";
+    checkDataWritten2(partLoc, 15, 24, 1, validationQuery, false, "1\tHello streaming");
 
     txnBatch.beginNextTransaction();
     txnBatch.write("2,Welcome to streaming".getBytes());
     txnBatch.commit();
 
-    checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}",
-            "{2, Welcome to streaming}");
+    checkDataWritten2(partLoc, 15, 24,  1, validationQuery, true, "1\tHello streaming",
+            "2\tWelcome to streaming");
 
     txnBatch.close();
 
@@ -1034,16 +1084,16 @@ public class TestStreaming {
     txnBatch.write("3,Hello streaming - once again".getBytes());
     txnBatch.commit();
 
-    checkDataWritten(partLoc, 15, 34, 1, 2, "{1, Hello streaming}",
-            "{2, Welcome to streaming}", "{3, Hello streaming - once again}");
+    checkDataWritten2(partLoc, 15, 40,  2, validationQuery, false, "1\tHello streaming",
+            "2\tWelcome to streaming", "3\tHello streaming - once again");
 
     txnBatch.beginNextTransaction();
     txnBatch.write("4,Welcome to streaming - once again".getBytes());
     txnBatch.commit();
 
-    checkDataWritten(partLoc, 15, 34, 1, 2, "{1, Hello streaming}",
-            "{2, Welcome to streaming}", "{3, Hello streaming - once again}",
-            "{4, Welcome to streaming - once again}");
+    checkDataWritten2(partLoc, 15, 40,  2, validationQuery, true, "1\tHello streaming",
+            "2\tWelcome to streaming", "3\tHello streaming - once again",
+            "4\tWelcome to streaming - once again");
 
     Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
             , txnBatch.getCurrentTransactionState());
@@ -1053,7 +1103,6 @@ public class TestStreaming {
     connection.close();
   }
 
-
   @Test
   public void testInterleavedTransactionBatchCommits() throws Exception {
     HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
@@ -1078,32 +1127,69 @@ public class TestStreaming {
 
     txnBatch2.commit();
 
-    checkDataWritten(partLoc, 24, 33, 1, 1, "{3, Hello streaming - once again}");
+    String validationQuery = "select id, msg from " + dbName + "." + tblName + " order by id, msg";
+    checkDataWritten2(partLoc, 24, 33, 1,
+      validationQuery, true, "3\tHello streaming - once again");
 
     txnBatch1.commit();
-
-    checkDataWritten(partLoc, 14, 33, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}");
+    /*now both batches have committed (but not closed) so we for each primary file we expect a side
+    file to exist and indicate the true length of primary file*/
+    FileSystem fs = partLoc.getFileSystem(conf);
+    AcidUtils.Directory dir = AcidUtils.getAcidState(partLoc, conf, msClient.getValidTxns());
+    for(AcidUtils.ParsedDelta pd : dir.getCurrentDirectories()) {
+      for(FileStatus stat : fs.listStatus(pd.getPath(), AcidUtils.bucketFileFilter)) {
+        Path lengthFile = OrcAcidUtils.getSideFile(stat.getPath());
+        Assert.assertTrue(lengthFile + " missing", fs.exists(lengthFile));
+        long lengthFileSize = fs.getFileStatus(lengthFile).getLen();
+        Assert.assertTrue("Expected " + lengthFile + " to be non empty. lengh=" +
+          lengthFileSize, lengthFileSize > 0);
+        long logicalLength = AcidUtils.getLogicalLength(fs, stat);
+        long actualLength = stat.getLen();
+        Assert.assertTrue("", logicalLength == actualLength);
+      }
+    }
+    checkDataWritten2(partLoc, 14, 33, 2,
+      validationQuery, false,"1\tHello streaming", "3\tHello streaming - once again");
 
     txnBatch1.beginNextTransaction();
     txnBatch1.write("2,Welcome to streaming".getBytes());
 
     txnBatch2.beginNextTransaction();
     txnBatch2.write("4,Welcome to streaming - once again".getBytes());
-
-    checkDataWritten(partLoc, 14, 33, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}");
+    //here each batch has written data and committed (to bucket0 since table only has 1 bucket)
+    //so each of 2 deltas has 1 bucket0 and 1 bucket0_flush_length.  Furthermore, each bucket0
+    //has now received more data(logically - it's buffered) but it is not yet committed.
+    //lets check that side files exist, etc
+    dir = AcidUtils.getAcidState(partLoc, conf, msClient.getValidTxns());
+    for(AcidUtils.ParsedDelta pd : dir.getCurrentDirectories()) {
+      for(FileStatus stat : fs.listStatus(pd.getPath(), AcidUtils.bucketFileFilter)) {
+        Path lengthFile = OrcAcidUtils.getSideFile(stat.getPath());
+        Assert.assertTrue(lengthFile + " missing", fs.exists(lengthFile));
+        long lengthFileSize = fs.getFileStatus(lengthFile).getLen();
+        Assert.assertTrue("Expected " + lengthFile + " to be non empty. lengh=" +
+          lengthFileSize, lengthFileSize > 0);
+        long logicalLength = AcidUtils.getLogicalLength(fs, stat);
+        long actualLength = stat.getLen();
+        Assert.assertTrue("", logicalLength <= actualLength);
+      }
+    }
+    checkDataWritten2(partLoc, 14, 33, 2,
+      validationQuery, true,"1\tHello streaming", "3\tHello streaming - once again");
 
     txnBatch1.commit();
 
-    checkDataWritten(partLoc, 14, 33, 1, 2, "{1, Hello streaming}",
-        "{2, Welcome to streaming}",
-        "{3, Hello streaming - once again}");
+    checkDataWritten2(partLoc, 14, 33, 2,
+      validationQuery, false, "1\tHello streaming",
+        "2\tWelcome to streaming",
+        "3\tHello streaming - once again");
 
     txnBatch2.commit();
 
-    checkDataWritten(partLoc, 14, 33, 1, 2, "{1, Hello streaming}",
-        "{2, Welcome to streaming}",
-        "{3, Hello streaming - once again}",
-        "{4, Welcome to streaming - once again}");
+    checkDataWritten2(partLoc, 14, 33, 2,
+      validationQuery, true, "1\tHello streaming",
+        "2\tWelcome to streaming",
+        "3\tHello streaming - once again",
+        "4\tWelcome to streaming - once again");
 
     Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
             , txnBatch1.getCurrentTransactionState());
@@ -2035,11 +2121,12 @@ public class TestStreaming {
 
   public static ArrayList<String> queryTable(Driver driver, String query)
           throws CommandNeedRetryException, IOException {
-    driver.run(query);
+    CommandProcessorResponse cpr = driver.run(query);
+    if(cpr.getResponseCode() != 0) {
+      throw new RuntimeException(query + " failed: " + cpr);
+    }
     ArrayList<String> res = new ArrayList<String>();
     driver.getResults(res);
-    if(res.isEmpty())
-      System.err.println(driver.getErrorMsg());
     return res;
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java
index de41d34..d5429fb 100644
--- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java
@@ -27,12 +27,12 @@ import java.util.List;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.io.AcidInputFormat.AcidRecordReader;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.IOConstants;
@@ -123,35 +123,50 @@ public class StreamingAssert {
   }
 
   List<Record> readRecords() throws Exception {
+    return readRecords(1);
+  }
+
+  /**
+   * TODO: this would be more flexible doing a SQL select statement rather than using InputFormat directly
+   * see {@link org.apache.hive.hcatalog.streaming.TestStreaming#checkDataWritten2(Path, long, long, int, String, String...)}
+   * @param numSplitsExpected
+   * @return
+   * @throws Exception
+   */
+  List<Record> readRecords(int numSplitsExpected) throws Exception {
     if (currentDeltas.isEmpty()) {
       throw new AssertionError("No data");
     }
     InputFormat<NullWritable, OrcStruct> inputFormat = new OrcInputFormat();
     JobConf job = new JobConf();
     job.set("mapred.input.dir", partitionLocation.toString());
-    job.set("bucket_count", Integer.toString(table.getSd().getNumBuckets()));
+    job.set(hive_metastoreConstants.BUCKET_COUNT, Integer.toString(table.getSd().getNumBuckets()));
     job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, "id,msg");
     job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, "bigint:string");
-    job.set(ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN.varname, "true");
+    AcidUtils.setTransactionalTableScan(job,true);
+    job.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true);
     job.set(ValidTxnList.VALID_TXNS_KEY, txns.toString());
     InputSplit[] splits = inputFormat.getSplits(job, 1);
-    assertEquals(1, splits.length);
-
-    final AcidRecordReader<NullWritable, OrcStruct> recordReader = (AcidRecordReader<NullWritable, OrcStruct>) inputFormat
-        .getRecordReader(splits[0], job, Reporter.NULL);
+    assertEquals(numSplitsExpected, splits.length);
 
-    NullWritable key = recordReader.createKey();
-    OrcStruct value = recordReader.createValue();
 
     List<Record> records = new ArrayList<>();
-    while (recordReader.next(key, value)) {
-      RecordIdentifier recordIdentifier = recordReader.getRecordIdentifier();
-      Record record = new Record(new RecordIdentifier(recordIdentifier.getTransactionId(),
+    for(InputSplit is : splits) {
+      final AcidRecordReader<NullWritable, OrcStruct> recordReader = (AcidRecordReader<NullWritable, OrcStruct>) inputFormat
+        .getRecordReader(is, job, Reporter.NULL);
+
+      NullWritable key = recordReader.createKey();
+      OrcStruct value = recordReader.createValue();
+
+      while (recordReader.next(key, value)) {
+        RecordIdentifier recordIdentifier = recordReader.getRecordIdentifier();
+        Record record = new Record(new RecordIdentifier(recordIdentifier.getTransactionId(),
           recordIdentifier.getBucketProperty(), recordIdentifier.getRowId()), value.toString());
-      System.out.println(record);
-      records.add(record);
+        System.out.println(record);
+        records.add(record);
+      }
+      recordReader.close();
     }
-    recordReader.close();
     return records;
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/TestMutations.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/TestMutations.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/TestMutations.java
index ab9f313..5bfa04d 100644
--- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/TestMutations.java
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/TestMutations.java
@@ -529,22 +529,23 @@ public class TestMutations {
     StreamingAssert indiaAssertions = assertionFactory.newStreamingAssert(table, ASIA_INDIA);
     indiaAssertions.assertMinTransactionId(1L);
     indiaAssertions.assertMaxTransactionId(2L);
-    List<Record> indiaRecords = indiaAssertions.readRecords();
+    List<Record> indiaRecords = indiaAssertions.readRecords(2);
     assertThat(indiaRecords.size(), is(3));
     assertThat(indiaRecords.get(0).getRow(), is("{1, Namaste streaming 1}"));
     assertThat(indiaRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L,
       encodeBucket(0), 0L)));
     assertThat(indiaRecords.get(1).getRow(), is("{2, UPDATED: Namaste streaming 2}"));
-    assertThat(indiaRecords.get(1).getRecordIdentifier(), is(new RecordIdentifier(1L,
-      encodeBucket(0), 1L)));
+    assertThat(indiaRecords.get(1).getRecordIdentifier(), is(new RecordIdentifier(2L,
+      encodeBucket(0), 0L)));//with split update, new version of the row is a new insert
     assertThat(indiaRecords.get(2).getRow(), is("{20, Namaste streaming 3}"));
     assertThat(indiaRecords.get(2).getRecordIdentifier(), is(new RecordIdentifier(2L,
-      encodeBucket(0), 0L)));
+      encodeBucket(0), 1L)));
 
     StreamingAssert ukAssertions = assertionFactory.newStreamingAssert(table, EUROPE_UK);
     ukAssertions.assertMinTransactionId(1L);
     ukAssertions.assertMaxTransactionId(2L);
-    List<Record> ukRecords = ukAssertions.readRecords();
+    //1 split since mutateTransaction txn just does deletes
+    List<Record> ukRecords = ukAssertions.readRecords(1);
     assertThat(ukRecords.size(), is(1));
     assertThat(ukRecords.get(0).getRow(), is("{4, Hello streaming 2}"));
     assertThat(ukRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L,
@@ -553,11 +554,11 @@ public class TestMutations {
     StreamingAssert franceAssertions = assertionFactory.newStreamingAssert(table, EUROPE_FRANCE);
     franceAssertions.assertMinTransactionId(1L);
     franceAssertions.assertMaxTransactionId(2L);
-    List<Record> franceRecords = franceAssertions.readRecords();
+    List<Record> franceRecords = franceAssertions.readRecords(2);
     assertThat(franceRecords.size(), is(1));
     assertThat(franceRecords.get(0).getRow(), is("{6, UPDATED: Bonjour streaming 2}"));
-    assertThat(franceRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L,
-      encodeBucket(0), 1L)));
+    assertThat(franceRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(2L,
+      encodeBucket(0), 0L)));//with split update, new version of the row is a new insert
 
     client.close();
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java b/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java
index 0f08f43..023d703 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java
@@ -39,7 +39,6 @@ public final class TransactionalValidationListener extends MetaStorePreEventList
 
   // These constants are also imported by org.apache.hadoop.hive.ql.io.AcidUtils.
   public static final String DEFAULT_TRANSACTIONAL_PROPERTY = "default";
-  public static final String LEGACY_TRANSACTIONAL_PROPERTY = "legacy";
 
   TransactionalValidationListener(Configuration conf) {
     super(conf);
@@ -276,7 +275,6 @@ public final class TransactionalValidationListener extends MetaStorePreEventList
     boolean isValid = false;
     switch (transactionalProperties) {
       case DEFAULT_TRANSACTIONAL_PROPERTY:
-      case LEGACY_TRANSACTIONAL_PROPERTY:
         isValid = true;
         break;
       default:

http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index 8999f6f..25ad1e9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -148,8 +148,6 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
     RecordWriter[] outWriters;
     RecordUpdater[] updaters;
     Stat stat;
-    int acidLastBucket = -1;
-    int acidFileOffset = -1;
 
     public FSPaths(Path specPath) {
       tmpPath = Utilities.toTempPath(specPath);

http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
index 25177ef..9864c76 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
@@ -112,6 +112,7 @@ public interface AcidInputFormat<KEY extends WritableComparable, VALUE>
     private long minTxnId;
     private long maxTxnId;
     private List<Integer> stmtIds;
+    //would be useful to have enum for Type: insert/delete/load data
 
     public DeltaMetaData() {
       this(0,0,new ArrayList<Integer>());
@@ -155,6 +156,11 @@ public interface AcidInputFormat<KEY extends WritableComparable, VALUE>
         stmtIds.add(in.readInt());
       }
     }
+    @Override
+    public String toString() {
+      //? is Type - when implemented
+      return "Delta(?," + minTxnId + "," + maxTxnId + "," + stmtIds + ")";
+    }
   }
   /**
    * Options for controlling the record readers.

http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index 1e33424..feacdd8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -39,11 +39,13 @@ import org.apache.hadoop.hive.metastore.api.DataOperationType;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.metastore.TransactionalValidationListener;
 import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.io.orc.OrcRecordUpdater;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.shims.HadoopShims;
 import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hive.common.util.Ref;
+import org.apache.orc.impl.OrcAcidUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -67,6 +69,18 @@ public class AcidUtils {
   };
   public static final String DELTA_PREFIX = "delta_";
   public static final String DELETE_DELTA_PREFIX = "delete_delta_";
+  /**
+   * Acid Streaming Ingest writes multiple transactions to the same file.  It also maintains a
+   * {@link org.apache.orc.impl.OrcAcidUtils#getSideFile(Path)} side file which stores the length of
+   * the primary file as of the last commit ({@link OrcRecordUpdater#flush()}).  That is the 'logical length'.
+   * Once the primary is closed, the side file is deleted (logical length = actual length) but if
+   * the writer dies or the primary file is being read while its still being written to, anything
+   * past the logical length should be ignored.
+   *
+   * @see org.apache.orc.impl.OrcAcidUtils#DELTA_SIDE_FILE_SUFFIX
+   * @see org.apache.orc.impl.OrcAcidUtils#getLastFlushLength(FileSystem, Path)
+   * @see #getLogicalLength(FileSystem, FileStatus)
+   */
   public static final String DELTA_SIDE_FILE_SUFFIX = "_flush_length";
   public static final PathFilter deltaFileFilter = new PathFilter() {
     @Override
@@ -167,7 +181,7 @@ public class AcidUtils {
    * This is format of delete delta dir name prior to Hive 2.2.x
    */
   @VisibleForTesting
-  static String deleteDeltaSubdir(long min, long max) {
+  public static String deleteDeltaSubdir(long min, long max) {
     return DELETE_DELTA_PREFIX + String.format(DELTA_DIGITS, min) + "_" +
         String.format(DELTA_DIGITS, max);
   }
@@ -178,7 +192,7 @@ public class AcidUtils {
    * @since 2.2.x
    */
   @VisibleForTesting
-  static String deleteDeltaSubdir(long min, long max, int statementId) {
+  public static String deleteDeltaSubdir(long min, long max, int statementId) {
     return deleteDeltaSubdir(min, max) + "_" + String.format(STATEMENT_DIGITS, statementId);
   }
 
@@ -371,21 +385,10 @@ public class AcidUtils {
     public static final int HASH_BASED_MERGE_BIT = 0x02;
     public static final String HASH_BASED_MERGE_STRING = "hash_merge";
     public static final String DEFAULT_VALUE_STRING = TransactionalValidationListener.DEFAULT_TRANSACTIONAL_PROPERTY;
-    public static final String LEGACY_VALUE_STRING = TransactionalValidationListener.LEGACY_TRANSACTIONAL_PROPERTY;
 
     private AcidOperationalProperties() {
     }
 
-    /**
-     * Returns an acidOperationalProperties object that represents ACID behavior for legacy tables
-     * that were created before ACID type system using operational properties was put in place.
-     * @return the acidOperationalProperties object
-     */
-    public static AcidOperationalProperties getLegacy() {
-      AcidOperationalProperties obj = new AcidOperationalProperties();
-      // In legacy mode, none of these properties are turned on.
-      return obj;
-    }
 
     /**
      * Returns an acidOperationalProperties object that represents default ACID behavior for tables
@@ -406,14 +409,11 @@ public class AcidUtils {
      */
     public static AcidOperationalProperties parseString(String propertiesStr) {
       if (propertiesStr == null) {
-        return AcidOperationalProperties.getLegacy();
+        return AcidOperationalProperties.getDefault();
       }
       if (propertiesStr.equalsIgnoreCase(DEFAULT_VALUE_STRING)) {
         return AcidOperationalProperties.getDefault();
       }
-      if (propertiesStr.equalsIgnoreCase(LEGACY_VALUE_STRING)) {
-        return AcidOperationalProperties.getLegacy();
-      }
       AcidOperationalProperties obj = new AcidOperationalProperties();
       String[] options = propertiesStr.split("\\|");
       for (String option : options) {
@@ -1119,7 +1119,12 @@ public class AcidUtils {
   public static void setTransactionalTableScan(Configuration conf, boolean isAcidTable) {
     HiveConf.setBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN, isAcidTable);
   }
-
+  /**
+   * @param p - not null
+   */
+  public static boolean isDeleteDelta(Path p) {
+    return p.getName().startsWith(DELETE_DELTA_PREFIX);
+  }
   /** Checks if a table is a valid ACID table.
    * Note, users are responsible for using the correct TxnManager. We do not look at
    * SessionState.get().getTxnMgr().supportsAcid() here
@@ -1171,8 +1176,8 @@ public class AcidUtils {
     String transactionalProperties = table.getProperty(
             hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES);
     if (transactionalProperties == null) {
-      // If the table does not define any transactional properties, we return a legacy type.
-      return AcidOperationalProperties.getLegacy();
+      // If the table does not define any transactional properties, we return a default type.
+      return AcidOperationalProperties.getDefault();
     }
     return AcidOperationalProperties.parseString(transactionalProperties);
   }
@@ -1184,7 +1189,7 @@ public class AcidUtils {
    */
   public static AcidOperationalProperties getAcidOperationalProperties(Configuration conf) {
     // If the conf does not define any transactional properties, the parseInt() should receive
-    // a value of zero, which will set AcidOperationalProperties to a legacy type and return that.
+    // a value of 1, which will set AcidOperationalProperties to a default type and return that.
     return AcidOperationalProperties.parseInt(
             HiveConf.getIntVar(conf, ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES));
   }
@@ -1197,8 +1202,8 @@ public class AcidUtils {
   public static AcidOperationalProperties getAcidOperationalProperties(Properties props) {
     String resultStr = props.getProperty(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES);
     if (resultStr == null) {
-      // If the properties does not define any transactional properties, we return a legacy type.
-      return AcidOperationalProperties.getLegacy();
+      // If the properties does not define any transactional properties, we return a default type.
+      return AcidOperationalProperties.getDefault();
     }
     return AcidOperationalProperties.parseString(resultStr);
   }
@@ -1212,9 +1217,44 @@ public class AcidUtils {
           Map<String, String> parameters) {
     String resultStr = parameters.get(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES);
     if (resultStr == null) {
-      // If the parameters does not define any transactional properties, we return a legacy type.
-      return AcidOperationalProperties.getLegacy();
+      // If the parameters does not define any transactional properties, we return a default type.
+      return AcidOperationalProperties.getDefault();
     }
     return AcidOperationalProperties.parseString(resultStr);
   }
+  /**
+   * See comments at {@link AcidUtils#DELTA_SIDE_FILE_SUFFIX}.
+   *
+   * Returns the logical end of file for an acid data file.
+   *
+   * This relies on the fact that if delta_x_y has no committed transactions it wil be filtered out
+   * by {@link #getAcidState(Path, Configuration, ValidTxnList)} and so won't be read at all.
+   * @param file - data file to read/compute splits on
+   */
+  public static long getLogicalLength(FileSystem fs, FileStatus file) throws IOException {
+    Path lengths = OrcAcidUtils.getSideFile(file.getPath());
+    if(!fs.exists(lengths)) {
+      /**
+       * if here for delta_x_y that means txn y is resolved and all files in this delta are closed so
+       * they should all have a valid ORC footer and info from NameNode about length is good
+       */
+      return file.getLen();
+    }
+    long len = OrcAcidUtils.getLastFlushLength(fs, file.getPath());
+    if(len >= 0) {
+      /**
+       * if here something is still writing to delta_x_y so  read only as far as the last commit,
+       * i.e. where last footer was written.  The file may contain more data after 'len' position
+       * belonging to an open txn.
+       */
+      return len;
+    }
+    /**
+     * if here, side file is there but we couldn't read it.  We want to avoid a read where
+     * (file.getLen() < 'value from side file' which may happen if file is not closed) because this
+     * means some committed data from 'file' would be skipped.
+     * This should be very unusual.
+     */
+    throw new IOException(lengths + " found but is not readable.  Consider waiting or orcfiledump --recover");
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index de49fc8..17f3d02 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -641,6 +642,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
       // The table properties are copied to job conf at HiveInputFormat::addSplitsForGroup(),
       // & therefore we should be able to retrieve them here and determine appropriate behavior.
       // Note that this will be meaningless for non-acid tables & will be set to null.
+      //this is set by Utilities.copyTablePropertiesToConf()
       boolean isTableTransactional = conf.getBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, false);
       String transactionalProperties = conf.get(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES);
       this.acidOperationalProperties = isTableTransactional ?
@@ -972,16 +974,21 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
       List<OrcSplit> splits = Lists.newArrayList();
       for (HdfsFileStatusWithId file : fileStatuses) {
         FileStatus fileStatus = file.getFileStatus();
-        if (fileStatus.getLen() != 0) {
+        long logicalLen = AcidUtils.getLogicalLength(fs, fileStatus);
+        if (logicalLen != 0) {
           Object fileKey = file.getFileId();
           if (fileKey == null && allowSyntheticFileIds) {
             fileKey = new SyntheticFileId(fileStatus);
           }
           TreeMap<Long, BlockLocation> blockOffsets = SHIMS.getLocationsWithOffset(fs, fileStatus);
           for (Map.Entry<Long, BlockLocation> entry : blockOffsets.entrySet()) {
+            if(entry.getKey() + entry.getValue().getLength() > logicalLen) {
+              //don't create splits for anything past logical EOF
+              continue;
+            }
             OrcSplit orcSplit = new OrcSplit(fileStatus.getPath(), fileKey, entry.getKey(),
                 entry.getValue().getLength(), entry.getValue().getHosts(), null, isOriginal, true,
-                deltas, -1, fileStatus.getLen());
+                deltas, -1, logicalLen);
             splits.add(orcSplit);
           }
         }
@@ -1002,13 +1009,13 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
    * ACID split strategy is used when there is no base directory (when transactions are enabled).
    */
   static class ACIDSplitStrategy implements SplitStrategy<OrcSplit> {
-    Path dir;
-    List<DeltaMetaData> deltas;
-    boolean[] covered;
-    int numBuckets;
-    AcidOperationalProperties acidOperationalProperties;
+    private Path dir;
+    private List<DeltaMetaData> deltas;
+    private boolean[] covered;
+    private int numBuckets;
+    private AcidOperationalProperties acidOperationalProperties;
 
-    public ACIDSplitStrategy(Path dir, int numBuckets, List<DeltaMetaData> deltas, boolean[] covered,
+    ACIDSplitStrategy(Path dir, int numBuckets, List<DeltaMetaData> deltas, boolean[] covered,
         AcidOperationalProperties acidOperationalProperties) {
       this.dir = dir;
       this.numBuckets = numBuckets;
@@ -1027,18 +1034,15 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
       // with valid user payload data has already been considered as base for the covered buckets.
       // Hence, the uncovered buckets do not have any relevant data and we can just ignore them.
       if (acidOperationalProperties != null && acidOperationalProperties.isSplitUpdate()) {
-        return splits; // return an empty list.
+        return Collections.emptyList();
       }
 
       // Generate a split for any buckets that weren't covered.
       // This happens in the case where a bucket just has deltas and no
       // base.
       if (!deltas.isEmpty()) {
-        for (int b = 0; b < numBuckets; ++b) {
-          if (!covered[b]) {
-            splits.add(new OrcSplit(dir, null, b, 0, new String[0], null, false, false, deltas, -1, -1));
-          }
-        }
+        //since HIVE-17089 if here, then it's not an acid table so there should never be any deltas
+        throw new IllegalStateException("Found unexpected deltas: " + deltas + " in " + dir);
       }
       return splits;
     }
@@ -1133,7 +1137,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
             if (val == null || val) {
               try {
                 List<HdfsFileStatusWithId> insertDeltaFiles =
-                    SHIMS.listLocatedHdfsStatus(fs, parsedDelta.getPath(), AcidUtils.hiddenFileFilter);
+                    SHIMS.listLocatedHdfsStatus(fs, parsedDelta.getPath(), AcidUtils.bucketFileFilter);
                 for (HdfsFileStatusWithId fileId : insertDeltaFiles) {
                   baseFiles.add(new AcidBaseFileInfo(fileId, AcidUtils.AcidBaseFileType.INSERT_DELTA));
                 }
@@ -1149,7 +1153,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
               }
             }
             // Fall back to regular API and create statuses without ID.
-            List<FileStatus> children = HdfsUtils.listLocatedStatus(fs, parsedDelta.getPath(), AcidUtils.hiddenFileFilter);
+            List<FileStatus> children = HdfsUtils.listLocatedStatus(fs, parsedDelta.getPath(), AcidUtils.bucketFileFilter);
             for (FileStatus child : children) {
               HdfsFileStatusWithId fileId = AcidUtils.createOriginalObj(null, child);
               baseFiles.add(new AcidBaseFileInfo(fileId, AcidUtils.AcidBaseFileType.INSERT_DELTA));
@@ -1402,7 +1406,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
         populateAndCacheStripeDetails();
         boolean[] includeStripe = null;
         // We can't eliminate stripes if there are deltas because the
-        // deltas may change the rows making them match the predicate.
+        // deltas may change the rows making them match the predicate. todo: See HIVE-14516.
         if ((deltas == null || deltas.isEmpty()) && context.sarg != null) {
           String[] colNames =
               extractNeededColNames((readerTypes == null ? fileTypes : readerTypes),
@@ -1516,7 +1520,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
         Reader orcReader = OrcFile.createReader(file.getPath(),
             OrcFile.readerOptions(context.conf)
                 .filesystem(fs)
-                .maxLength(file.getLen()));
+                .maxLength(AcidUtils.getLogicalLength(fs, file)));
         orcTail = new OrcTail(orcReader.getFileTail(), orcReader.getSerializedFileFooter(),
             file.getModificationTime());
         if (context.cacheStripeDetails) {
@@ -2210,7 +2214,6 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
         totalFileSize += child.getFileStatus().getLen();
         AcidOutputFormat.Options opts = AcidUtils.parseBaseOrDeltaBucketFilename
             (child.getFileStatus().getPath(), context.conf);
-        opts.writingBase(true);
         int b = opts.getBucketId();
         // If the bucket is in the valid range, mark it as covered.
         // I wish Hive actually enforced bucketing all of the time.

http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
index a179300..214f22a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
@@ -296,7 +296,7 @@ public class OrcOutputFormat extends FileOutputFormat<NullWritable, OrcSerdeRow>
           .rowIndexStride(0);
     }
     final OrcRecordUpdater.KeyIndexBuilder watcher =
-        new OrcRecordUpdater.KeyIndexBuilder();
+        new OrcRecordUpdater.KeyIndexBuilder("compactor");
     opts.inspector(options.getInspector())
         .callback(watcher);
     final Writer writer = OrcFile.createWriter(filename, opts);

http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
index 814782a..97c4e3d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
@@ -24,6 +24,7 @@ import java.util.TreeMap;
 
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.ql.io.BucketCodec;
 import org.apache.hadoop.hive.shims.HadoopShims;
 import org.apache.orc.OrcUtils;
 import org.apache.orc.StripeInformation;
@@ -310,11 +311,13 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
     OrcStruct nextRecord;
     private final ReaderKey key;
     final int bucketId;
+    final int bucketProperty;
 
-    OriginalReaderPair(ReaderKey key, int bucketId) throws IOException {
+    OriginalReaderPair(ReaderKey key, int bucketId, Configuration conf) throws IOException {
       this.key = key;
       this.bucketId = bucketId;
       assert bucketId >= 0 : "don't support non-bucketed tables yet";
+      this.bucketProperty = encodeBucketId(conf, bucketId);
     }
     @Override public final OrcStruct nextRecord() {
       return nextRecord;
@@ -348,7 +351,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
           nextRecord().setFieldValue(OrcRecordUpdater.ORIGINAL_TRANSACTION,
               new LongWritable(0));
           nextRecord().setFieldValue(OrcRecordUpdater.BUCKET,
-              new IntWritable(bucketId));
+              new IntWritable(bucketProperty));
           nextRecord().setFieldValue(OrcRecordUpdater.ROW_ID,
               new LongWritable(nextRowId));
           nextRecord().setFieldValue(OrcRecordUpdater.ROW,
@@ -360,7 +363,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
           ((LongWritable) next.getFieldValue(OrcRecordUpdater.ORIGINAL_TRANSACTION))
               .set(0);
           ((IntWritable) next.getFieldValue(OrcRecordUpdater.BUCKET))
-              .set(bucketId);
+              .set(bucketProperty);
           ((LongWritable) next.getFieldValue(OrcRecordUpdater.CURRENT_TRANSACTION))
               .set(0);
           ((LongWritable) next.getFieldValue(OrcRecordUpdater.ROW_ID))
@@ -368,7 +371,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
           nextRecord().setFieldValue(OrcRecordUpdater.ROW,
               getRecordReader().next(OrcRecordUpdater.getRow(next)));
         }
-        key.setValues(0L, bucketId, nextRowId, 0L, 0);
+        key.setValues(0L, bucketProperty, nextRowId, 0L, 0);
         if (getMaxKey() != null && key.compareRow(getMaxKey()) > 0) {
           if (LOG.isDebugEnabled()) {
             LOG.debug("key " + key + " > maxkey " + getMaxKey());
@@ -380,6 +383,9 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
       return false;//reached EndOfFile
     }
   }
+  static int encodeBucketId(Configuration conf, int bucketId) {
+    return BucketCodec.V1.encode(new AcidOutputFormat.Options(conf).bucket(bucketId));
+  }
   @VisibleForTesting
   final static class OriginalReaderPairToRead extends OriginalReaderPair {
     private final long rowIdOffset;
@@ -392,7 +398,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
                              final RecordIdentifier minKey, final RecordIdentifier maxKey,
                              Reader.Options options, Options mergerOptions, Configuration conf,
                              ValidTxnList validTxnList) throws IOException {
-      super(key, bucketId);
+      super(key, bucketId, conf);
       this.reader = reader;
       assert !mergerOptions.isCompacting();
       assert mergerOptions.getRootPath() != null : "Since we have original files";
@@ -444,8 +450,8 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
         if (rowIdOffset > 0) {
           //rowIdOffset could be 0 if all files before current one are empty
           /**
-           * Since we already done {@link OrcRawRecordMerger#discoverOriginalKeyBounds(Reader,
-           * int, Reader.Options)} need to fix min/max key since these are used by
+           * Since we already done {@link OrcRawRecordMerger#discoverOriginalKeyBounds(Reader, int, Reader.Options, Configuration)}
+           * need to fix min/max key since these are used by
            * {@link #next(OrcStruct)} which uses {@link #rowIdOffset} to generate rowId for
            * the key.  Clear?  */
           if (minKey != null) {
@@ -455,7 +461,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
              *  If this is not the 1st file, set minKey 1 less than the start of current file
              * (Would not need to set minKey if we knew that there are no delta files)
              * {@link #advanceToMinKey()} needs this */
-            newMinKey = new RecordIdentifier(0, bucketId, rowIdOffset - 1);
+            newMinKey = new RecordIdentifier(0, bucketProperty,rowIdOffset - 1);
           }
           if (maxKey != null) {
             maxKey.setRowId(maxKey.getRowId() + rowIdOffset);
@@ -485,7 +491,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
            * of the file so we want to leave it blank to make sure any insert events in delta
            * files are included; Conversely, if it's not the last file, set the maxKey so that
            * events from deltas that don't modify anything in the current split are excluded*/
-        newMaxKey = new RecordIdentifier(0, bucketId,
+        newMaxKey = new RecordIdentifier(0, bucketProperty,
           rowIdOffset + reader.getNumberOfRows() - 1);
       }
       this.minKey = newMinKey;
@@ -536,7 +542,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
     OriginalReaderPairToCompact(ReaderKey key, int bucketId,
                        Reader.Options options, Options mergerOptions, Configuration conf,
                        ValidTxnList validTxnList) throws IOException {
-      super(key, bucketId);
+      super(key, bucketId, conf);
       assert mergerOptions.isCompacting() : "Should only be used for Compaction";
       this.conf = conf;
       this.options = options;
@@ -651,8 +657,8 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
    * @throws IOException
    */
   private KeyInterval discoverOriginalKeyBounds(Reader reader, int bucket,
-                                         Reader.Options options
-                                         ) throws IOException {
+                                         Reader.Options options,
+                                         Configuration conf) throws IOException {
     long rowLength = 0;
     long rowOffset = 0;
     long offset = options.getOffset();//this would usually be at block boundary
@@ -660,6 +666,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
     boolean isTail = true;
     RecordIdentifier minKey = null;
     RecordIdentifier maxKey = null;
+    int bucketProperty = encodeBucketId(conf, bucket);
    /**
     * options.getOffset() and getMaxOffset() would usually be at block boundary which doesn't
     * necessarily match stripe boundary.  So we want to come up with minKey to be one before the 1st
@@ -679,10 +686,10 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
       }
     }
     if (rowOffset > 0) {
-      minKey = new RecordIdentifier(0, bucket, rowOffset - 1);
+      minKey = new RecordIdentifier(0, bucketProperty, rowOffset - 1);
     }
     if (!isTail) {
-      maxKey = new RecordIdentifier(0, bucket, rowOffset + rowLength - 1);
+      maxKey = new RecordIdentifier(0, bucketProperty, rowOffset + rowLength - 1);
     }
     return new KeyInterval(minKey, maxKey);
   }
@@ -829,7 +836,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
       KeyInterval keyInterval;
       // find the min/max based on the offset and length (and more for 'original')
       if (isOriginal) {
-        keyInterval = discoverOriginalKeyBounds(reader, bucket, options);
+        keyInterval = discoverOriginalKeyBounds(reader, bucket, options, conf);
       } else {
         keyInterval = discoverKeyBounds(reader, options);
       }
@@ -865,6 +872,10 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
     eventOptions.range(0, Long.MAX_VALUE);
     if (deltaDirectory != null) {
       for(Path delta: deltaDirectory) {
+        if(!mergerOptions.isCompacting() && !AcidUtils.isDeleteDelta(delta)) {
+          //all inserts should be in baseReader for normal read so this should always be delete delta if not compacting
+          throw new IllegalStateException(delta + " is not delete delta and is not compacting.");
+        }
         ReaderKey key = new ReaderKey();
         Path deltaFile = AcidUtils.createBucketFile(delta, bucket);
         AcidUtils.ParsedDelta deltaDir = AcidUtils.parsedDelta(delta);

http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
index c30e8fe..429960b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
@@ -90,6 +90,7 @@ public class OrcRecordUpdater implements RecordUpdater {
   private Path deleteEventPath;
   private final FileSystem fs;
   private OrcFile.WriterOptions writerOptions;
+  private OrcFile.WriterOptions deleteWriterOptions;
   private Writer writer = null;
   private boolean writerClosed = false;
   private Writer deleteEventWriter = null;
@@ -104,7 +105,7 @@ public class OrcRecordUpdater implements RecordUpdater {
   // This records how many rows have been inserted or deleted.  It is separate from insertedRows
   // because that is monotonically increasing to give new unique row ids.
   private long rowCountDelta = 0;
-  private final KeyIndexBuilder indexBuilder = new KeyIndexBuilder();
+  private final KeyIndexBuilder indexBuilder = new KeyIndexBuilder("insert");
   private KeyIndexBuilder deleteEventIndexBuilder;
   private StructField recIdField = null; // field to look for the record identifier in
   private StructField rowIdField = null; // field inside recId to look for row id in
@@ -148,20 +149,23 @@ public class OrcRecordUpdater implements RecordUpdater {
   /**
    * An extension to AcidOutputFormat that allows users to add additional
    * options.
+   *
+   * todo: since this is only used for testing could we not control the writer some other way?
+   * to simplify {@link #OrcRecordUpdater(Path, AcidOutputFormat.Options)}
    */
-  public static class OrcOptions extends AcidOutputFormat.Options {
+   final static class OrcOptions extends AcidOutputFormat.Options {
     OrcFile.WriterOptions orcOptions = null;
 
-    public OrcOptions(Configuration conf) {
+    OrcOptions(Configuration conf) {
       super(conf);
     }
 
-    public OrcOptions orcOptions(OrcFile.WriterOptions opts) {
+    OrcOptions orcOptions(OrcFile.WriterOptions opts) {
       this.orcOptions = opts;
       return this;
     }
 
-    public OrcFile.WriterOptions getOrcOptions() {
+    OrcFile.WriterOptions getOrcOptions() {
       return orcOptions;
     }
   }
@@ -205,6 +209,7 @@ public class OrcRecordUpdater implements RecordUpdater {
       this.acidOperationalProperties =
           AcidUtils.getAcidOperationalProperties(options.getConfiguration());
     }
+    assert this.acidOperationalProperties.isSplitUpdate() : "HIVE-17089?!";
     BucketCodec bucketCodec = BucketCodec.V1;
     if(options.getConfiguration() != null) {
       //so that we can test "old" files
@@ -240,6 +245,8 @@ public class OrcRecordUpdater implements RecordUpdater {
         && !options.isWritingBase()){
       flushLengths = fs.create(OrcAcidUtils.getSideFile(this.path), true, 8,
           options.getReporter());
+      flushLengths.writeLong(0);
+      OrcInputFormat.SHIMS.hflush(flushLengths);
     } else {
       flushLengths = null;
     }
@@ -265,12 +272,26 @@ public class OrcRecordUpdater implements RecordUpdater {
             optionsCloneForDelta.getConfiguration());
       }
       if (this.acidOperationalProperties.isSplitUpdate()) {
+        AcidOutputFormat.Options deleteOptions = options.clone().writingDeleteDelta(true);
         // If this is a split-update, we initialize a delete delta file path in anticipation that
         // they would write update/delete events to that separate file.
         // This writes to a file in directory which starts with "delete_delta_..."
-        // The actual initialization of a writer only happens if any delete events are written.
-        this.deleteEventPath = AcidUtils.createFilename(path,
-            optionsCloneForDelta.writingDeleteDelta(true));
+        // The actual initialization of a writer only happens if any delete events are written
+        //to avoid empty files.
+        this.deleteEventPath = AcidUtils.createFilename(path, deleteOptions);
+        /**
+         * HIVE-14514 is not done so we can't clone writerOptions().  So here we create a new
+         * options object to make sure insert and delete writers don't share them (like the
+         * callback object, for example)
+         * In any case insert writer and delete writer would most likely have very different
+         * characteristics - delete writer only writes a tiny amount of data.  Once we do early
+         * update split, each {@link OrcRecordUpdater} will have only 1 writer. (except for Mutate API)
+         * Then it would perhaps make sense to take writerOptions as input - how?.
+         */
+        this.deleteWriterOptions = OrcFile.writerOptions(optionsCloneForDelta.getTableProperties(),
+          optionsCloneForDelta.getConfiguration());
+        this.deleteWriterOptions.inspector(createEventSchema(findRecId(options.getInspector(),
+          options.getRecordIdColumn())));
       }
 
       // get buffer size and stripe size for base writer
@@ -377,19 +398,10 @@ public class OrcRecordUpdater implements RecordUpdater {
         recIdInspector.getStructFieldData(rowValue, bucketField)), operation);
       // Initialize a deleteEventWriter if not yet done. (Lazy initialization)
       if (deleteEventWriter == null) {
-        // Initialize an indexBuilder for deleteEvents.
-        deleteEventIndexBuilder = new KeyIndexBuilder();
-        // Change the indexBuilder callback too for the deleteEvent file, the remaining writer
-        // options remain the same.
-
-        // TODO: When we change the callback, we are essentially mutating the writerOptions.
-        // This works but perhaps is not a good thing. The proper way to do this would be
-        // to clone the writerOptions, however it requires that the parent OrcFile.writerOptions
-        // implements a clone() method (which it does not for now). HIVE-14514 is currently an open
-        // JIRA to fix this.
-
+        // Initialize an indexBuilder for deleteEvents. (HIVE-17284)
+        deleteEventIndexBuilder = new KeyIndexBuilder("delete");
         this.deleteEventWriter = OrcFile.createWriter(deleteEventPath,
-                                                      writerOptions.callback(deleteEventIndexBuilder));
+                                                      deleteWriterOptions.callback(deleteEventIndexBuilder));
       }
 
       // A delete/update generates a delete event for the original row.
@@ -461,6 +473,8 @@ public class OrcRecordUpdater implements RecordUpdater {
     long len = writer.writeIntermediateFooter();
     flushLengths.writeLong(len);
     OrcInputFormat.SHIMS.hflush(flushLengths);
+    //multiple transactions only happen for streaming ingest which only allows inserts
+    assert deleteEventWriter == null : "unexpected delete writer for " + path;
   }
 
   @Override
@@ -539,12 +553,16 @@ public class OrcRecordUpdater implements RecordUpdater {
   }
 
   static class KeyIndexBuilder implements OrcFile.WriterCallback {
-    StringBuilder lastKey = new StringBuilder();
+    private final String builderName;
+    StringBuilder lastKey = new StringBuilder();//list of last keys for each stripe
     long lastTransaction;
     int lastBucket;
     long lastRowId;
     AcidStats acidStats = new AcidStats();
 
+    KeyIndexBuilder(String name) {
+      this.builderName = name;
+    }
     @Override
     public void preStripeWrite(OrcFile.WriterContext context
     ) throws IOException {

http://git-wip-us.apache.org/repos/asf/hive/blob/34b0e07a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
index c50c1a8..bff9884 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
@@ -31,7 +31,9 @@ import org.apache.hadoop.hive.metastore.api.TxnState;
 import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.BucketCodec;
 import org.apache.hadoop.hive.ql.io.HiveInputFormat;
 import org.apache.hadoop.hive.ql.lockmgr.TestDbTxnManager2;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -859,15 +861,19 @@ public class TestTxnCommands {
     for(String s : rs) {
       LOG.warn(s);
     }
+    Assert.assertEquals(536870912,
+      BucketCodec.V1.encode(new AcidOutputFormat.Options(hiveConf).bucket(0)));
+    Assert.assertEquals(536936448,
+      BucketCodec.V1.encode(new AcidOutputFormat.Options(hiveConf).bucket(1)));
     Assert.assertEquals("", 4, rs.size());
-    Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"transactionid\":0,\"bucketid\":0,\"rowid\":0}\t0\t12"));
+    Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t0\t12"));
     Assert.assertTrue(rs.get(0), rs.get(0).endsWith("nonacidorctbl/000000_0_copy_1"));
-    Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"transactionid\":0,\"bucketid\":1,\"rowid\":0}\t1\t2"));
+    Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":0}\t1\t2"));
     Assert.assertTrue(rs.get(1), rs.get(1).endsWith("nonacidorctbl/000001_0"));
-    Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"transactionid\":0,\"bucketid\":1,\"rowid\":1}\t1\t5"));
+    Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":1}\t1\t5"));
     Assert.assertTrue(rs.get(2), rs.get(2).endsWith("nonacidorctbl/000001_0_copy_1"));
     Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":14,\"bucketid\":536936448,\"rowid\":0}\t1\t17"));
-    Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nonacidorctbl/000001_0_copy_1"));
+    Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nonacidorctbl/delta_0000014_0000014_0000/bucket_00001"));
     //run Compaction
     runStatementOnDriver("alter table "+ TestTxnCommands2.Table.NONACIDORCTBL +" compact 'major'");
     TestTxnCommands2.runWorker(hiveConf);
@@ -877,11 +883,11 @@ public class TestTxnCommands {
       LOG.warn(s);
     }
     Assert.assertEquals("", 4, rs.size());
-    Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"transactionid\":0,\"bucketid\":0,\"rowid\":0}\t0\t12"));
+    Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t0\t12"));
     Assert.assertTrue(rs.get(0), rs.get(0).endsWith("nonacidorctbl/base_0000014/bucket_00000"));
-    Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"transactionid\":0,\"bucketid\":1,\"rowid\":0}\t1\t2"));
+    Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":0}\t1\t2"));
     Assert.assertTrue(rs.get(1), rs.get(1).endsWith("nonacidorctbl/base_0000014/bucket_00001"));
-    Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"transactionid\":0,\"bucketid\":1,\"rowid\":1}\t1\t5"));
+    Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":1}\t1\t5"));
     Assert.assertTrue(rs.get(2), rs.get(2).endsWith("nonacidorctbl/base_0000014/bucket_00001"));
     Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":14,\"bucketid\":536936448,\"rowid\":0}\t1\t17"));
     Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nonacidorctbl/base_0000014/bucket_00001"));