You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2020/06/30 05:05:40 UTC

[impala] branch master updated: IMPALA-9515: Full ACID Milestone 3: Read support for "original files"

This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 930264a  IMPALA-9515: Full ACID Milestone 3: Read support for "original files"
930264a is described below

commit 930264afbdc6d309a30e2c7e1eef9fd7129ef29b
Author: Zoltan Borok-Nagy <bo...@cloudera.com>
AuthorDate: Tue May 19 11:47:08 2020 +0200

    IMPALA-9515: Full ACID Milestone 3: Read support for "original files"
    
    "Original files" are files that don't have full ACID schema. We can see
    such files if we upgrade a non-ACID table to full ACID. Also, the LOAD
    DATA statement can load non-ACID files into full ACID tables. So such
    files don't store special ACID columns, that means we need
    to auto-generate their values. These are (operation,
    originalTransaction, bucket, rowid, and currentTransaction).
    
    With the exception of 'rowid', all of them can be calculated based on
    the file path, so I add their values to the scanner's template tuple.
    
    'rowid' is the ordinal number of the row inside a bucket inside a
    directory. For now Impala only allows one file per bucket per
    directory. Therefore we can generate row ids for each file
    independently.
    
    Multiple files in a single bucket in a directory can only be present if
    the table was non-transactional earlier and we upgraded it to full ACID
    table. After the first compaction we should only see one original file
    per bucket per directory.
    
    In HdfsOrcScanner we calculate the first row id for our split then
    the OrcStructReader fills the rowid slot with the proper values.
    
    Testing:
     * added e2e tests to check if the generated values are correct
     * added e2e test to reject tables that have multiple files per bucket
     * added unit tests to the new auxiliary functions
    
    Change-Id: I176497ef9873ed7589bd3dee07d048a42dfad953
    Reviewed-on: http://gerrit.cloudera.org:8080/16001
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/acid-metadata-utils-test.cc            |  29 +++
 be/src/exec/acid-metadata-utils.cc                 |  95 ++++++--
 be/src/exec/acid-metadata-utils.h                  |   7 +-
 be/src/exec/hdfs-orc-scanner.cc                    |  75 ++++++-
 be/src/exec/hdfs-orc-scanner.h                     |  12 +
 be/src/exec/orc-column-readers.cc                  |  37 ++-
 be/src/exec/orc-column-readers.h                   |   9 +
 be/src/exec/orc-metadata-utils.cc                  | 161 ++++++++++----
 be/src/exec/orc-metadata-utils.h                   |  41 ++++
 testdata/data/README                               |   5 +
 testdata/data/alltypes_non_acid.orc                | Bin 0 -> 34176 bytes
 .../functional/functional_schema_template.sql      |  25 +++
 .../datasets/functional/schema_constraints.csv     |   1 +
 .../queries/QueryTest/acid-negative.test           |  20 ++
 .../queries/QueryTest/full-acid-original-file.test | 247 +++++++++++++++++++++
 tests/query_test/test_acid.py                      |  23 ++
 16 files changed, 711 insertions(+), 76 deletions(-)

diff --git a/be/src/exec/acid-metadata-utils-test.cc b/be/src/exec/acid-metadata-utils-test.cc
index 7db3e57..e2c4266 100644
--- a/be/src/exec/acid-metadata-utils-test.cc
+++ b/be/src/exec/acid-metadata-utils-test.cc
@@ -207,3 +207,32 @@ TEST(ValidWriteIdListTest, IsCompacted) {
   EXPECT_FALSE(ValidWriteIdList::IsCompacted("/foo/000"));
   EXPECT_FALSE(ValidWriteIdList::IsCompacted("/foo/p=1/000"));
 }
+
+TEST(ValidWriteIdListTest, GetWriteIdRange) {
+  EXPECT_EQ((make_pair<int64_t, int64_t>(0, 0)),
+      ValidWriteIdList::GetWriteIdRange("/foo/00000_0"));
+  EXPECT_EQ((make_pair<int64_t, int64_t>(5, 5)),
+      ValidWriteIdList::GetWriteIdRange("/foo/base_00005/000"));
+  EXPECT_EQ((make_pair<int64_t, int64_t>(5, 5)),
+      ValidWriteIdList::GetWriteIdRange("/foo/base_00005_v123/000"));
+  EXPECT_EQ((make_pair<int64_t, int64_t>(5 ,10)),
+      ValidWriteIdList::GetWriteIdRange("/foo/delta_00005_00010/000"));
+  EXPECT_EQ((make_pair<int64_t, int64_t>(5 ,10)),
+      ValidWriteIdList::GetWriteIdRange("/foo/delta_00005_00010_0006/000"));
+  EXPECT_EQ((make_pair<int64_t, int64_t>(5 ,10)),
+      ValidWriteIdList::GetWriteIdRange("/foo/delta_00005_00010_v123/000"));
+}
+
+TEST(ValidWriteIdListTest, GetBucketProperty) {
+  EXPECT_EQ(536870912, ValidWriteIdList::GetBucketProperty("/foo/0000000_0"));
+  EXPECT_EQ(536936448, ValidWriteIdList::GetBucketProperty("/foo/0000001_1"));
+  EXPECT_EQ(537001984, ValidWriteIdList::GetBucketProperty("/foo/bucket_00002"));
+  EXPECT_EQ(537067520, ValidWriteIdList::GetBucketProperty(
+      "/foo/base_0001_v1/bucket_000003_0"));
+  EXPECT_EQ(537133056, ValidWriteIdList::GetBucketProperty(
+      "/foo/delta_1_5/bucket_0000004_1"));
+  EXPECT_EQ(537198592, ValidWriteIdList::GetBucketProperty(
+      "/foo/delta_1_1_v1/000005_0_copy_1"));
+  EXPECT_EQ(536870913, ValidWriteIdList::GetBucketProperty(
+      "/foo/delta_1_1_1/00000_0"));
+}
diff --git a/be/src/exec/acid-metadata-utils.cc b/be/src/exec/acid-metadata-utils.cc
index fe4691c..4438ad7 100644
--- a/be/src/exec/acid-metadata-utils.cc
+++ b/be/src/exec/acid-metadata-utils.cc
@@ -15,6 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <regex>
+
 #include "exec/acid-metadata-utils.h"
 
 #include "common/logging.h"
@@ -33,36 +35,96 @@ const string DELETE_DELTA_PREFIX = "delete_delta_";
 
 string GetParentDirName(const string& filepath) {
   int slash_before_file = filepath.rfind('/');
-  if (slash_before_file <= 0) return "";
+  if (slash_before_file == string::npos) return "";
   int slash_before_dirname = filepath.rfind('/', slash_before_file - 1);
-  if (slash_before_dirname <= 0) return "";
+  if (slash_before_dirname == string::npos) return "";
   return filepath.substr(
       slash_before_dirname + 1, slash_before_file - slash_before_dirname - 1);
 }
 
+inline string GetFileName(const string& filepath) {
+  std::size_t slash_before_file = filepath.rfind('/');
+  if (slash_before_file == string::npos) return filepath;
+  return filepath.substr(slash_before_file + 1);
+}
+
 inline bool StrStartsWith(const string& str, const string& prefix) {
   return str.rfind(prefix, 0) == 0;
 }
 
-std::pair<int64_t, int64_t> GetWriteIdRangeOfDeltaDir(const string& delta_dir) {
+} // unnamed namespace
+
+ValidWriteIdList::ValidWriteIdList(const TValidWriteIdList& valid_write_ids) {
+  InitFrom(valid_write_ids);
+}
+
+std::pair<int64_t, int64_t> ValidWriteIdList::GetWriteIdRange(const string& file_path) {
+  string dir_name = GetParentDirName(file_path);
+  if (!(StrStartsWith(dir_name, DELTA_PREFIX) ||
+        StrStartsWith(dir_name, DELETE_DELTA_PREFIX) ||
+        StrStartsWith(dir_name, BASE_PREFIX))) {
+    // Write ids of original files are 0.
+    return {0, 0};
+  }
   int min_write_id_pos = 0;
-  if (StrStartsWith(delta_dir, DELTA_PREFIX)) {
+  if (StrStartsWith(dir_name, DELTA_PREFIX)) {
     min_write_id_pos = DELTA_PREFIX.size();
   }
-  else if (StrStartsWith(delta_dir, DELETE_DELTA_PREFIX)) {
+  else if (StrStartsWith(dir_name, DELETE_DELTA_PREFIX)) {
     min_write_id_pos = DELETE_DELTA_PREFIX.size();
   } else {
-    DCHECK(false) << delta_dir + " is not a delta directory";
+    StrStartsWith(dir_name, BASE_PREFIX);
+    int write_id_pos = BASE_PREFIX.size();
+    int64_t write_id = std::atoll(dir_name.c_str() + write_id_pos);
+    return {write_id, write_id};
   }
-  int max_write_id_pos = delta_dir.find('_', min_write_id_pos) + 1;
-  return {std::atoll(delta_dir.c_str() + min_write_id_pos),
-          std::atoll(delta_dir.c_str() + max_write_id_pos)};
+  int max_write_id_pos = dir_name.find('_', min_write_id_pos) + 1;
+  return {std::atoll(dir_name.c_str() + min_write_id_pos),
+          std::atoll(dir_name.c_str() + max_write_id_pos)};
 }
 
-} // unnamed namespace
+int ValidWriteIdList::GetStatementId(const std::string& file_path) {
+  string dir_name = GetParentDirName(file_path);
+  // Only delta directories can have a statement id.
+  if (StrStartsWith(dir_name, BASE_PREFIX)) return 0;
+  // Expected number of '_' if statement id is present.
+  int expected_underscores = 0;
+  if (StrStartsWith(dir_name, DELTA_PREFIX)) {
+    expected_underscores = 3;
+  } else if (StrStartsWith(dir_name, DELETE_DELTA_PREFIX)) {
+    expected_underscores = 4;
+  } else {
+    return 0;
+  }
+  int count_underscores = std::count(dir_name.begin(), dir_name.end(), '_');
+  if (count_underscores != expected_underscores || dir_name.find("_v") != string::npos) {
+    return 0;
+  }
+  int last_underscore_pos = dir_name.rfind('_');
+  return std::atoi(dir_name.c_str() + last_underscore_pos + 1);
+}
 
-ValidWriteIdList::ValidWriteIdList(const TValidWriteIdList& valid_write_ids) {
-  InitFrom(valid_write_ids);
+int ValidWriteIdList::GetBucketProperty(const std::string& file_path) {
+  static const std::regex ORIGINAL_PATTERN("[0-9]+_[0-9]+(_copy_[0-9]+)?");
+  static const std::regex BUCKET_PATTERN("bucket_([0-9]+)(_[0-9]+)?");
+
+  string filename = GetFileName(file_path);
+  int bucket_id = 0;
+  if (std::regex_match(filename, ORIGINAL_PATTERN)) {
+    bucket_id = std::atoi(filename.c_str());
+  } else if (std::regex_match(filename, BUCKET_PATTERN)) {
+    bucket_id = std::atoi(filename.c_str() + sizeof("bucket_"));
+  } else {
+    return -1;
+  }
+  int statement_id = GetStatementId(file_path);
+
+  constexpr int BUCKET_CODEC_VERSION = 1;
+  constexpr int NUM_BUCKET_ID_BITS = 12;
+  constexpr int NUM_STATEMENT_ID_BITS = 12;
+  return BUCKET_CODEC_VERSION << (1 + NUM_BUCKET_ID_BITS + 4 + NUM_STATEMENT_ID_BITS) |
+         bucket_id << (4 + NUM_STATEMENT_ID_BITS) |
+         statement_id;
 }
 
 void ValidWriteIdList::InitFrom(const TValidWriteIdList& valid_write_ids) {
@@ -104,12 +166,9 @@ ValidWriteIdList::RangeResponse ValidWriteIdList::IsWriteIdRangeValid(
 
 ValidWriteIdList::RangeResponse ValidWriteIdList::IsFileRangeValid(
     const std::string& file_path) const {
-  string dir_name = GetParentDirName(file_path);
-  if (!(StrStartsWith(dir_name, DELTA_PREFIX) ||
-        StrStartsWith(dir_name, DELETE_DELTA_PREFIX))) {
-    return ALL;
-  }
-  std::pair<int64_t, int64_t> write_id_range = GetWriteIdRangeOfDeltaDir(dir_name);
+  std::pair<int64_t, int64_t> write_id_range = GetWriteIdRange(file_path);
+  // In base and original directories everything is valid.
+  if (write_id_range == std::make_pair<int64_t, int64_t>(0, 0)) return ALL;
   return IsWriteIdRangeValid(write_id_range.first, write_id_range.second);
 }
 
diff --git a/be/src/exec/acid-metadata-utils.h b/be/src/exec/acid-metadata-utils.h
index 2bd5b28..15f6df2 100644
--- a/be/src/exec/acid-metadata-utils.h
+++ b/be/src/exec/acid-metadata-utils.h
@@ -31,6 +31,11 @@ public:
     NONE, SOME, ALL
   };
 
+  static bool IsCompacted(const std::string& file_path);
+  static std::pair<int64_t, int64_t> GetWriteIdRange(const std::string& file_path);
+  static int GetBucketProperty(const std::string& file_path);
+  static int GetStatementId(const std::string& file_path);
+
   ValidWriteIdList() {}
   ValidWriteIdList(const TValidWriteIdList& valid_write_ids);
 
@@ -39,8 +44,6 @@ public:
   bool IsWriteIdValid(int64_t write_id) const;
   RangeResponse IsWriteIdRangeValid(int64_t min_write_id, int64_t max_write_id) const;
   RangeResponse IsFileRangeValid(const std::string& file_path) const;
-
-  static bool IsCompacted(const std::string& file_path);
 private:
   void AddInvalidWriteIds(const std::string& invalid_ids_str);
   int64_t high_water_mark_ = std::numeric_limits<int64_t>::max();
diff --git a/be/src/exec/hdfs-orc-scanner.cc b/be/src/exec/hdfs-orc-scanner.cc
index 5e0ceb6..23e5b7c 100644
--- a/be/src/exec/hdfs-orc-scanner.cc
+++ b/be/src/exec/hdfs-orc-scanner.cc
@@ -193,11 +193,31 @@ Status HdfsOrcScanner::Open(ScannerContext* context) {
   bool is_table_full_acid = scan_node_->hdfs_table()->IsTableFullAcid();
   bool is_file_full_acid = reader_->hasMetadataValue(HIVE_ACID_VERSION_KEY) &&
                            reader_->getMetadataValue(HIVE_ACID_VERSION_KEY) == "2";
-  // TODO: Remove the following constraint once IMPALA-9515 is resolved.
-  if (is_table_full_acid && !is_file_full_acid) {
-    return Status(Substitute("Error: Table is in full ACID format, but "
-        "'hive.acid.version' = '2' is missing from file metadata: table=$0, file=$1",
-        scan_node_->hdfs_table()->name(), filename()));
+  acid_original_file_ = is_table_full_acid && !is_file_full_acid;
+  if (is_table_full_acid) {
+    acid_write_id_range_ = valid_write_ids_.GetWriteIdRange(filename());
+    if (acid_original_file_ &&
+        acid_write_id_range_.first != acid_write_id_range_.second) {
+      return Status(Substitute("Found non-ACID file in directory that can only contain "
+          "files with full ACID schema: $0", filename()));
+    }
+  }
+  if (acid_original_file_) {
+    int32_t filename_len = strlen(filename());
+    if (filename_len >= 2 && strcmp(filename() + filename_len - 2, "_0") != 0) {
+      // It's an original file that should be included in the result.
+      // If it doesn't end with "_0" it means that it belongs to a bucket with other
+      // files. Impala rejects such files and tables.
+      // These files should only exist at table/partition root directory level.
+      // Original files in delta directories are created via the LOAD DATA
+      // statement. LOAD DATA assigns virtual bucket ids to files in non-bucketed
+      // tables, so we will have one file per (virtual) bucket (all of them having "_0"
+      // ending). For bucketed tables LOAD DATA will write ACID files. So after the first
+      // major compaction the table should never get into this state ever again.
+      return Status(Substitute("Found original file with unexpected name: $0 "
+          "Please run a major compaction on the partition/table to overcome this.",
+          filename()));
+    }
   }
   schema_resolver_.reset(new OrcSchemaResolver(*scan_node_->hdfs_table(),
       &reader_->getType(), filename(), is_table_full_acid, is_file_full_acid));
@@ -210,8 +230,8 @@ Status HdfsOrcScanner::Open(ScannerContext* context) {
   // validate the write ids of the row batches.
   if (is_table_full_acid && !ValidWriteIdList::IsCompacted(filename())) {
     valid_write_ids_.InitFrom(scan_node_->hdfs_table()->ValidWriteIdList());
-    ValidWriteIdList::RangeResponse rows_valid =
-        valid_write_ids_.IsFileRangeValid(filename());
+    ValidWriteIdList::RangeResponse rows_valid = valid_write_ids_.IsWriteIdRangeValid(
+        acid_write_id_range_.first, acid_write_id_range_.second);
     DCHECK_NE(rows_valid, ValidWriteIdList::NONE);
     row_batches_need_validation_ = rows_valid == ValidWriteIdList::SOME;
   }
@@ -411,7 +431,11 @@ Status HdfsOrcScanner::ResolveColumns(const TupleDescriptor& tuple_desc,
         *template_tuple =
             Tuple::Create(tuple_desc.byte_size(), template_tuple_pool_.get());
       }
-      (*template_tuple)->SetNull(slot_desc->null_indicator_offset());
+      if (acid_original_file_ && schema_resolver_->IsAcidColumn(slot_desc->col_path())) {
+        SetSyntheticAcidFieldForOriginalFile(slot_desc, *template_tuple);
+      } else {
+        (*template_tuple)->SetNull(slot_desc->null_indicator_offset());
+      }
       missing_field_slots_.insert(slot_desc);
       continue;
     }
@@ -440,6 +464,31 @@ Status HdfsOrcScanner::ResolveColumns(const TupleDescriptor& tuple_desc,
   return Status::OK();
 }
 
+void HdfsOrcScanner::SetSyntheticAcidFieldForOriginalFile(const SlotDescriptor* slot_desc,
+    Tuple* template_tuple) {
+  DCHECK_EQ(1, slot_desc->col_path().size());
+  int field_idx = slot_desc->col_path().front() - scan_node_->num_partition_keys();
+  switch(field_idx) {
+    case ACID_FIELD_OPERATION_INDEX:
+      *template_tuple->GetIntSlot(slot_desc->tuple_offset()) = 0;
+      break;
+    case ACID_FIELD_ORIGINAL_TRANSACTION_INDEX:
+    case ACID_FIELD_CURRENT_TRANSACTION_INDEX:
+      DCHECK_EQ(acid_write_id_range_.first, acid_write_id_range_.second);
+      *template_tuple->GetBigIntSlot(slot_desc->tuple_offset()) =
+          acid_write_id_range_.first;
+      break;
+    case ACID_FIELD_BUCKET_INDEX:
+      *template_tuple->GetBigIntSlot(slot_desc->tuple_offset()) =
+          ValidWriteIdList::GetBucketProperty(filename());
+      break;
+    case ACID_FIELD_ROWID_INDEX:
+      acid_synthetic_rowid_ = slot_desc;
+    default:
+      break;
+  }
+}
+
 /// Whether 'selected_type_ids' contains the id of any children of 'node'
 bool HasChildrenSelected(const orc::Type& node,
     const list<uint64_t>& selected_type_ids) {
@@ -632,6 +681,9 @@ Status HdfsOrcScanner::NextStripe() {
   advance_stripe_ = false;
   stripe_rows_read_ = 0;
 
+  bool first_invocation = stripe_idx_ == -1;
+  int64_t skipped_rows = 0;
+
   // Loop until we have found a non-empty stripe.
   while (true) {
     // Reset the parse status for the next stripe.
@@ -661,12 +713,17 @@ Status HdfsOrcScanner::NextStripe() {
         stripe_mid_pos < split_offset + split_length)) {
       // Middle pos not in split, this stripe will be handled by a different scanner.
       // Mark if the stripe overlaps with the split.
+      if (first_invocation) skipped_rows += stripe->getNumberOfRows();
       misaligned_stripe_skipped |= CheckStripeOverlapsSplit(stripe_offset,
           stripe_offset + stripe_len, split_offset, split_offset + split_length);
       continue;
     }
 
-    // TODO: check if this stripe can be skipped by stats. e.g. IMPALA-6505
+    // TODO: check if this stripe can be skipped by stats. e.g. IMPALA-6505 In that case,
+    // set the file row index in 'orc_root_reader_' accordingly.
+    if (first_invocation && acid_synthetic_rowid_ != nullptr) {
+      orc_root_reader_->SetFileRowIndex(skipped_rows);
+    }
 
     COUNTER_ADD(num_stripes_counter_, 1);
     row_reader_options_.range(stripe->getOffset(), stripe_len);
diff --git a/be/src/exec/hdfs-orc-scanner.h b/be/src/exec/hdfs-orc-scanner.h
index 688dcce..a2c88ff 100644
--- a/be/src/exec/hdfs-orc-scanner.h
+++ b/be/src/exec/hdfs-orc-scanner.h
@@ -220,6 +220,15 @@ class HdfsOrcScanner : public HdfsColumnarScanner {
   /// With the help of it we can check the validity of ACID write ids.
   ValidWriteIdList valid_write_ids_;
 
+  /// The write id range for ACID files.
+  std::pair<int64_t, int64_t> acid_write_id_range_;
+
+  /// Non-ACID file in full ACID table.
+  bool acid_original_file_ = false;
+
+  /// Slot descriptor of synthetic rowid of original files.
+  const SlotDescriptor* acid_synthetic_rowid_ = nullptr;
+
   /// True if we need to validate the row batches against the valid write id list. This
   /// only needs to be done for Hive Streaming Ingestion. The 'write id' will be the same
   /// within a stripe, but we still need to read the row batches for validation because
@@ -299,6 +308,9 @@ class HdfsOrcScanner : public HdfsColumnarScanner {
   bool IsPartitionKeySlot(const SlotDescriptor* slot);
 
   bool IsMissingField(const SlotDescriptor* slot);
+
+  void SetSyntheticAcidFieldForOriginalFile(const SlotDescriptor* slot_desc,
+      Tuple* template_tuple);
 };
 
 } // namespace impala
diff --git a/be/src/exec/orc-column-readers.cc b/be/src/exec/orc-column-readers.cc
index 331f424..2e6f664 100644
--- a/be/src/exec/orc-column-readers.cc
+++ b/be/src/exec/orc-column-readers.cc
@@ -24,6 +24,7 @@
 #include "runtime/decimal-value.h"
 #include "runtime/string-value.inline.h"
 #include "runtime/timestamp-value.inline.h"
+#include "util/mem-util.h"
 #include "common/names.h"
 
 using namespace impala;
@@ -415,18 +416,36 @@ Status OrcStructReader::TopLevelReadValueBatch(ScratchTupleBatch* scratch_batch,
           item_count, orc_column_id_, scratch_batch->num_tuples));
     }
   }
-  row_idx_ += scratch_batch->num_tuples - scratch_batch_idx;
-  if (row_validator_ && scanner_->scan_node_->IsZeroSlotTableScan()) {
-    DCHECK_EQ(1, batch_->fields.size()); // We should only select 'currentTransaction'.
-    DCHECK_EQ(scratch_batch_idx, scratch_batch->num_tuples);
-    int num_to_fake_read = std::min(scratch_batch->capacity - scratch_batch->num_tuples,
-                                    NumElements() - row_idx_);
-    scratch_batch->num_tuples += num_to_fake_read;
-    row_idx_ += num_to_fake_read;
-  }
+  int num_rows_read = scratch_batch->num_tuples - scratch_batch_idx;
+  if (children_.empty()) {
+    DCHECK((scanner_->row_batches_need_validation_ &&
+            scanner_->scan_node_->IsZeroSlotTableScan()) ||
+            scanner_->acid_original_file_);
+    DCHECK_EQ(0, num_rows_read);
+    num_rows_read = std::min(scratch_batch->capacity - scratch_batch->num_tuples,
+                             NumElements() - row_idx_);
+    scratch_batch->num_tuples += num_rows_read;
+  }
+  if (scanner_->acid_synthetic_rowid_ != nullptr) {
+    FillSyntheticRowId(scratch_batch, scratch_batch_idx, num_rows_read);
+  }
+  row_idx_ += num_rows_read;
   return Status::OK();
 }
 
+void OrcStructReader::FillSyntheticRowId(ScratchTupleBatch* scratch_batch,
+    int scratch_batch_idx, int num_rows) {
+    DCHECK(scanner_->acid_synthetic_rowid_ != nullptr);
+    int tuple_size = OrcColumnReader::scanner_->tuple_byte_size();
+    uint8_t* first_tuple = scratch_batch->tuple_mem + scratch_batch_idx * tuple_size;
+    int64_t* first_slot = reinterpret_cast<Tuple*>(first_tuple)->GetBigIntSlot(
+        scanner_->acid_synthetic_rowid_->tuple_offset());
+    StrideWriter<int64_t> out{first_slot, tuple_size};
+    for (int i = 0; i < num_rows; ++i) {
+      *out.Advance() = file_row_idx_++;
+    }
+}
+
 Status OrcStructReader::ReadValueBatch(int row_idx, ScratchTupleBatch* scratch_batch,
     MemPool* pool, int scratch_batch_idx) {
   for (OrcColumnReader* child : children_) {
diff --git a/be/src/exec/orc-column-readers.h b/be/src/exec/orc-column-readers.h
index 54f3a94..e45b0a2 100644
--- a/be/src/exec/orc-column-readers.h
+++ b/be/src/exec/orc-column-readers.h
@@ -577,7 +577,12 @@ class OrcStructReader : public OrcComplexColumnReader {
     return child->NumElements();
   }
 
+  void SetFileRowIndex(int64_t file_row_idx) { file_row_idx_ = file_row_idx; }
+
  private:
+  void FillSyntheticRowId(ScratchTupleBatch* scratch_batch, int scratch_batch_idx,
+      int num_rows);
+
   orc::StructVectorBatch* batch_ = nullptr;
 
   /// Field ids of the children reader
@@ -586,6 +591,10 @@ class OrcStructReader : public OrcComplexColumnReader {
   /// Keep row index if we're top level readers
   int row_idx_;
 
+  /// File-level row index. Only set for original files, and only when ACID field 'rowid'
+  /// is needed.
+  int64_t file_row_idx_ = -1;
+
   int current_write_id_field_index_ = -1;
   std::unique_ptr<OrcRowValidator> row_validator_;
 
diff --git a/be/src/exec/orc-metadata-utils.cc b/be/src/exec/orc-metadata-utils.cc
index dbb0d53..aa81d7d 100644
--- a/be/src/exec/orc-metadata-utils.cc
+++ b/be/src/exec/orc-metadata-utils.cc
@@ -28,11 +28,17 @@ Status OrcSchemaResolver::BuildSchemaPaths(int num_partition_keys,
     return Status(TErrorCode::ORC_TYPE_NOT_ROOT_AT_STRUCT, "file", root_->toString(),
         filename_);
   }
+  bool synthetic_acid_schema = is_table_full_acid_ && !is_file_full_acid_;
   SchemaPath path;
   col_id_path_map->push_back(path);
   int num_columns = root_->getSubtypeCount();
+  // Original files don't have "row" field, let's add it manually.
+  if (synthetic_acid_schema) path.push_back(ACID_FIELD_ROW + num_partition_keys);
   for (int i = 0; i < num_columns; ++i) {
-    path.push_back(i + num_partition_keys);
+    // For synthetic ACID schema these columns are not top-level, so don't need to
+    // adjust them by 'num_part_keys'.
+    int field_offset = synthetic_acid_schema ? 0 : num_partition_keys;
+    path.push_back(i + field_offset);
     BuildSchemaPathHelper(*root_->getSubtype(i), &path, col_id_path_map);
     path.pop_back();
   }
@@ -85,42 +91,24 @@ Status OrcSchemaResolver::ResolveColumn(const SchemaPath& col_path,
   *pos_field = false;
   *missing_field = false;
   DCHECK_OK(ValidateFullAcidFileSchema()); // Should have already been validated.
-  bool translate_acid_path = is_table_full_acid_ && is_file_full_acid_;
-  int num_part_cols = tbl_desc_.num_clustering_cols();
-  for (int i = 0; i < col_path.size(); ++i) {
-    int table_idx = col_path[i];
-    int file_idx = table_idx;
-    if (i == 0) {
-      if (translate_acid_path) {
-        constexpr int FILE_INDEX_OF_FIELD_ROW = 5;
-        if (table_idx == num_part_cols + FILE_INDEX_OF_FIELD_ROW) {
-          // Refers to "row" column. Table definition doesn't have "row" column
-          // so here we just step into the file's "row" column to get in sync
-          // with the table schema.
-          *node = (*node)->getSubtype(FILE_INDEX_OF_FIELD_ROW);
-          continue;
-        }
-        DCHECK_GE(table_idx, num_part_cols);
-        // 'col_path' refers to the ACID columns. In table schema they are nested
-        // under the synthetic 'row__id' column. 'row__id' is at index 'num_part_cols'.
-        table_col_type = &tbl_desc_.col_descs()[num_part_cols].type();
-        // The ACID column is under 'row__id' at index 'table_idx - num_part_cols'.
-        int acid_col_idx = table_idx - num_part_cols;
-        DCHECK_GE(acid_col_idx, 0);
-        DCHECK_LT(acid_col_idx, table_col_type->children.size());
-        table_col_type = &table_col_type->children[acid_col_idx];
+  if (col_path.empty()) return Status::OK();
+  SchemaPath table_path, file_path;
+  TranslateColPaths(col_path, &table_path, &file_path);
+  for (int i = 0; i < table_path.size(); ++i) {
+    int table_idx = table_path[i];
+    int file_idx = file_path[i];
+    if (table_idx == -1 || file_idx == -1) {
+      DCHECK_NE(table_idx, file_idx);
+      if (table_idx == -1) {
+        DCHECK_EQ(*node, root_);
+        *node = (*node)->getSubtype(file_idx);
       } else {
+        DCHECK(table_col_type == nullptr);
         table_col_type = &tbl_desc_.col_descs()[table_idx].type();
       }
-      // For top-level columns, the first index in a path includes the table's partition
-      // keys.
-      file_idx -= num_part_cols;
-    } else if (i == 1 && table_col_type == nullptr && translate_acid_path) {
-      // Here we are referring to a table column from the viewpoint of the user.
-      // Hence, in the table metadata this is a top-level column, i.e. it is offsetted
-      // with 'num_part_cols' in the table schema. We also need to add '1', because in the
-      // FeTable we added a synthetic struct typed column 'row__id'.
-      table_idx += 1 + num_part_cols;
+      continue;
+    }
+    if (table_col_type == nullptr) {
       table_col_type = &tbl_desc_.col_descs()[table_idx].type();
     } else if (table_col_type->type == TYPE_ARRAY &&
         table_idx == SchemaPathConstants::ARRAY_POS) {
@@ -141,29 +129,118 @@ Status OrcSchemaResolver::ResolveColumn(const SchemaPath& col_path,
       DCHECK_EQ(table_col_type->children.size(), 1);
       if ((*node)->getKind() != orc::TypeKind::LIST) {
         return Status(TErrorCode::ORC_NESTED_TYPE_MISMATCH, filename_,
-            PrintSubPath(tbl_desc_, col_path, i), "array", (*node)->toString());
+            PrintPath(tbl_desc_, GetCanonicalSchemaPath(table_path, i)), "array",
+            (*node)->toString());
       }
     } else if (table_col_type->type == TYPE_MAP) {
       DCHECK_EQ(table_col_type->children.size(), 2);
       if ((*node)->getKind() != orc::TypeKind::MAP) {
         return Status(TErrorCode::ORC_NESTED_TYPE_MISMATCH, filename_,
-            PrintSubPath(tbl_desc_, col_path, i), "map", (*node)->toString());
+            PrintPath(tbl_desc_, GetCanonicalSchemaPath(table_path, i)), "map",
+            (*node)->toString());
       }
     } else if (table_col_type->type == TYPE_STRUCT) {
       DCHECK_GT(table_col_type->children.size(), 0);
       if ((*node)->getKind() != orc::TypeKind::STRUCT) {
         return Status(TErrorCode::ORC_NESTED_TYPE_MISMATCH, filename_,
-            PrintSubPath(tbl_desc_, col_path, i), "struct", (*node)->toString());
+            PrintPath(tbl_desc_, GetCanonicalSchemaPath(table_path, i)), "struct",
+            (*node)->toString());
       }
     } else {
       DCHECK(!table_col_type->IsComplexType());
-      DCHECK_EQ(i, col_path.size() - 1);
+      DCHECK_EQ(i, table_path.size() - 1);
       RETURN_IF_ERROR(ValidateType(*table_col_type, **node));
     }
   }
   return Status::OK();
 }
 
+SchemaPath OrcSchemaResolver::GetCanonicalSchemaPath(const SchemaPath& col_path,
+    int last_idx) const {
+  DCHECK_LT(last_idx, col_path.size());
+  SchemaPath ret;
+  ret.reserve(col_path.size());
+  std::copy_if(col_path.begin(),
+               col_path.begin() + last_idx + 1,
+               std::back_inserter(ret),
+               [](int i) { return i >= 0; });
+  return ret;
+}
+
+void OrcSchemaResolver::TranslateColPaths(const SchemaPath& col_path,
+    SchemaPath* table_col_path, SchemaPath* file_col_path) const {
+  DCHECK(!col_path.empty());
+  DCHECK(table_col_path != nullptr);
+  DCHECK(file_col_path != nullptr);
+  table_col_path->reserve(col_path.size() + 1);
+  file_col_path->reserve(col_path.size() + 1);
+  int first_idx = col_path[0];
+  int num_part_cols = tbl_desc_.num_clustering_cols();
+  int remaining_idx = 0;
+  if (!is_table_full_acid_) {
+    // Table is not full ACID. Only need to adjust partitioning columns.
+    table_col_path->push_back(first_idx);
+    file_col_path->push_back(first_idx - num_part_cols);
+    remaining_idx = 1;
+  } else if (is_file_full_acid_) {
+    DCHECK(is_table_full_acid_);
+    // Table is full ACID, and file is in full ACID format too. We need to do some
+    // conversions since the Frontend table schema and file schema differs. See the
+    // comment at the declaration of this function.
+    if (first_idx == num_part_cols + ACID_FIELD_ROW) {
+      // 'first_idx' refers to "row" column. Table definition doesn't have "row" column.
+      table_col_path->push_back(-1);
+      file_col_path->push_back(first_idx - num_part_cols);
+      if (col_path.size() == 1 ) return;
+      int second_idx = col_path[1];
+      // Adjust table with num partitioning colums and the synthetic 'row__id' column.
+      table_col_path->push_back(num_part_cols + 1 + second_idx);
+      file_col_path->push_back(second_idx);
+    } else {
+      DCHECK_GE(first_idx, num_part_cols);
+      // 'col_path' refers to the ACID columns. In table schema they are nested
+      // under the synthetic 'row__id' column. 'row__id' is at index 'num_part_cols'.
+      table_col_path->push_back(num_part_cols);
+      file_col_path->push_back(-1);
+      // The ACID column is under 'row__id' at index 'table_idx - num_part_cols'.
+      int acid_col_idx = first_idx - num_part_cols;
+      table_col_path->push_back(acid_col_idx);
+      file_col_path->push_back(acid_col_idx);
+    }
+    remaining_idx = 2;
+  } else if (!is_file_full_acid_) {
+    DCHECK(is_table_full_acid_);
+    // Table is full ACID, but file is in non-ACID format.
+    if (first_idx == num_part_cols + ACID_FIELD_ROW) {
+      if (col_path.size() == 1 ) return;
+      // 'first_idx' refers to "row" column. Table definition doesn't have "row" column,
+      // but neither the file schema here. We don't include it in the output paths.
+      int second_idx = col_path[1];
+      // Adjust table with num partitioning colums and the synthetic 'row__id' column.
+      table_col_path->push_back(num_part_cols + 1 + second_idx);
+      file_col_path->push_back(second_idx);
+    } else {
+      DCHECK_GE(first_idx, num_part_cols);
+      // 'col_path' refers to the ACID columns. In table schema they are nested
+      // under the synthetic 'row__id' column. 'row__id' is at index 'num_part_cols'.
+      table_col_path->push_back(num_part_cols);
+      file_col_path->push_back(-1);
+      // The ACID column is under 'row__id' at index 'table_idx - num_part_cols'.
+      int acid_col_idx = first_idx - num_part_cols;
+      table_col_path->push_back(acid_col_idx);
+      // ACID columns in original files should be considered as missing colums.
+      file_col_path->push_back(std::numeric_limits<int>::max());
+    }
+    remaining_idx = 2;
+  }
+  // The rest of the path is unchanged.
+  for (int i = remaining_idx; i < col_path.size(); ++i) {
+    table_col_path->push_back(col_path[i]);
+    file_col_path->push_back(col_path[i]);
+  }
+  DCHECK_EQ(table_col_path->size(), file_col_path->size());
+}
+
 Status OrcSchemaResolver::ValidateType(const ColumnType& type,
     const orc::Type& orc_type) const {
   switch (orc_type.getKind()) {
@@ -233,6 +310,14 @@ Status OrcSchemaResolver::ValidateType(const ColumnType& type,
       type.DebugString(), orc_type.toString(), filename_));
 }
 
+bool OrcSchemaResolver::IsAcidColumn(const SchemaPath& col_path) const {
+  DCHECK(is_table_full_acid_);
+  DCHECK(!is_file_full_acid_);
+  int num_part_cols = tbl_desc_.num_clustering_cols();
+  return col_path.size() == 1 &&
+         col_path.front() >= num_part_cols && col_path.front() < num_part_cols + 5;
+}
+
 Status OrcSchemaResolver::ValidateFullAcidFileSchema() const {
   if (!is_file_full_acid_) return Status::OK();
   string error_msg = Substitute("File %0 should have full ACID schema.", filename_);
diff --git a/be/src/exec/orc-metadata-utils.h b/be/src/exec/orc-metadata-utils.h
index 5042092..b83b005 100644
--- a/be/src/exec/orc-metadata-utils.h
+++ b/be/src/exec/orc-metadata-utils.h
@@ -27,6 +27,14 @@ namespace impala {
 // Key of Hive ACID version in ORC metadata.
 const string HIVE_ACID_VERSION_KEY = "hive.acid.version";
 
+// Table level indexes of ACID columns.
+constexpr int ACID_FIELD_OPERATION_INDEX = 0;
+constexpr int ACID_FIELD_ORIGINAL_TRANSACTION_INDEX = 1;
+constexpr int ACID_FIELD_BUCKET_INDEX = 2;
+constexpr int ACID_FIELD_ROWID_INDEX = 3;
+constexpr int ACID_FIELD_CURRENT_TRANSACTION_INDEX = 4;
+constexpr int ACID_FIELD_ROW = 5;
+
 // ORC type id of column "currentTransaction" in full ACID ORC files.
 constexpr int CURRENT_TRANSCACTION_TYPE_ID = 5;
 
@@ -54,7 +62,40 @@ class OrcSchemaResolver {
   /// but the actual file schema doesn't conform to it.
   Status ValidateFullAcidFileSchema() const;
 
+  /// Can be only invoked for original files of full transactional tables.
+  /// Returns true if 'col_path' refers to an ACID column.
+  bool IsAcidColumn(const SchemaPath& col_path) const;
+
  private:
+  /// Translates 'col_path' to non-canonical table and file paths. These non-canonical
+  /// paths have the same lengths. To achieve that they might contain -1 values that must
+  /// be ignored. These paths are useful for tables that have different table and file
+  /// schema (ACID tables, partitioned tables).
+  /// E.g. ACID table schema is
+  /// {
+  ///   "row__id" : {...ACID columns...},
+  ///   ...TABLE columns...
+  /// }
+  /// While ACID file schema is
+  /// {
+  ///   ...ACID columns...,
+  ///   "row" : {...TABLE columns...}
+  /// }
+  /// Let's assume we have a non-partitioned ACID table and the first user column is
+  /// called 'id'.
+  /// In that case 'col_path' for 'id' looks like [5, 0]. This function converts it to
+  /// non-canonical 'table_col_path' [-1, 1] and non-canonical 'file_col_path'
+  /// [5, 0] (which is the same as the canonical in this case).
+  /// Another example for ACID column 'rowid':
+  /// 'col_path' is [3], 'table_col_path' is [0, 3], 'file_col_path' is [-1, 3].
+  /// Different conversions are needed for original files and non-transactional tables
+  /// (for the latter it only adjusts first column offsets if the table is partitioned).
+  /// These non-canonical paths are easier to be processed by ResolveColumn().
+  void TranslateColPaths(const SchemaPath& col_path,
+      SchemaPath* table_col_path, SchemaPath* file_col_path) const;
+
+  SchemaPath GetCanonicalSchemaPath(const SchemaPath& col_path, int last_idx) const;
+
   const HdfsTableDescriptor& tbl_desc_;
   const orc::Type* const root_;
   const char* const filename_ = nullptr;
diff --git a/testdata/data/README b/testdata/data/README
index adfb84c..b91bcd5 100644
--- a/testdata/data/README
+++ b/testdata/data/README
@@ -521,3 +521,8 @@ streaming.orc:
 ORC file generated by Hive Streaming Ingestion. I used a slightly altered version of
 TestStreaming.testNoBuckets() from Hive 3.1 to generate this file. It contains
 values coming from two transactions. The file has two stripes (one per transaction).
+
+alltypes_non_acid.orc:
+Non-acid ORC file generated by Hive 3.1 with the following command:
+CREATE TABLE alltypes_clone STORED AS ORC AS SELECT * FROM functional.alltypes.
+It's used as an original file in ACID tests.
diff --git a/testdata/data/alltypes_non_acid.orc b/testdata/data/alltypes_non_acid.orc
new file mode 100644
index 0000000..6669ec8
Binary files /dev/null and b/testdata/data/alltypes_non_acid.orc differ
diff --git a/testdata/datasets/functional/functional_schema_template.sql b/testdata/datasets/functional/functional_schema_template.sql
index 9d3c533..807267f 100644
--- a/testdata/datasets/functional/functional_schema_template.sql
+++ b/testdata/datasets/functional/functional_schema_template.sql
@@ -374,6 +374,31 @@ TBLPROPERTIES("hbase.table.name" = "functional_hbase.hbasealltypeserror");
 ---- DATASET
 functional
 ---- BASE_TABLE_NAME
+alltypes_promoted
+---- PARTITION_COLUMNS
+year int
+month int
+---- COLUMNS
+id int COMMENT 'Add a comment'
+bool_col boolean
+tinyint_col tinyint
+smallint_col smallint
+int_col int
+bigint_col bigint
+float_col float
+double_col double
+date_string_col string
+string_col string
+timestamp_col timestamp
+---- DEPENDENT_LOAD_HIVE
+INSERT INTO TABLE {db_name}{db_suffix}.{table_name} SELECT * FROM {db_name}{db_suffix}.alltypes;
+ALTER TABLE {db_name}{db_suffix}.{table_name} SET tblproperties('EXTERNAL'='FALSE','transactional'='true');
+---- TABLE_PROPERTIES
+transactional=false
+====
+---- DATASET
+functional
+---- BASE_TABLE_NAME
 hbasecolumnfamilies
 ---- HBASE_COLUMN_FAMILIES
 0
diff --git a/testdata/datasets/functional/schema_constraints.csv b/testdata/datasets/functional/schema_constraints.csv
index 7e0f092..7d5b3d0 100644
--- a/testdata/datasets/functional/schema_constraints.csv
+++ b/testdata/datasets/functional/schema_constraints.csv
@@ -11,6 +11,7 @@ table_name:hbasealltypeserror, constraint:restrict_to, table_format:hbase/none/n
 table_name:hbasealltypeserrornonulls, constraint:restrict_to, table_format:hbase/none/none
 
 table_name:alltypesinsert, constraint:restrict_to, table_format:text/none/none
+table_name:alltypes_promoted, constraint:restrict_to, table_format:orc/def/block
 table_name:stringpartitionkey, constraint:restrict_to, table_format:text/none/none
 table_name:alltypesnopart_insert, constraint:restrict_to, table_format:text/none/none
 table_name:insert_overwrite_nopart, constraint:restrict_to, table_format:text/none/none
diff --git a/testdata/workloads/functional-query/queries/QueryTest/acid-negative.test b/testdata/workloads/functional-query/queries/QueryTest/acid-negative.test
index 3ad8bc8..9cb7d32 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/acid-negative.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/acid-negative.test
@@ -85,3 +85,23 @@ select * from acid;
 ---- TYPES
 INT
 ====
+---- QUERY
+# Impala should reject tables that have multiple files in the same
+# bucket in the same directory.
+# Note: This table is clearly not bucketed, but for row ID
+# generation it has virtual buckets based on the file names.
+create table test_promotion_fail (i int) stored as orc;
+====
+---- HIVE_QUERY
+use $DATABASE;
+insert into test_promotion_fail values (1);
+insert into test_promotion_fail values (1);
+alter table test_promotion_fail
+set tblproperties('EXTERNAL'='false','transactional'='true');
+====
+---- QUERY
+refresh test_promotion_fail;
+select * from  test_promotion_fail;
+---- CATCH
+Found original file with unexpected name
+====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/full-acid-original-file.test b/testdata/workloads/functional-query/queries/QueryTest/full-acid-original-file.test
new file mode 100644
index 0000000..2c9c6e9
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/full-acid-original-file.test
@@ -0,0 +1,247 @@
+====
+---- QUERY
+refresh alltypes_promoted_nopart;
+====
+---- QUERY
+select count(*) from alltypes_promoted_nopart;
+---- RESULTS
+7300
+---- TYPES
+BIGINT
+====
+---- QUERY
+# Check if the proper ACID field values are generated.
+select row__id.*, id from alltypes_promoted_nopart
+where id < 10;
+---- RESULTS
+0,0,536870912,4030,0,0
+0,0,536870912,4031,0,1
+0,0,536870912,4032,0,2
+0,0,536870912,4033,0,3
+0,0,536870912,4034,0,4
+0,0,536870912,4035,0,5
+0,0,536870912,4036,0,6
+0,0,536870912,4037,0,7
+0,0,536870912,4038,0,8
+0,0,536870912,4039,0,9
+---- TYPES
+INT, BIGINT, INT, BIGINT, BIGINT, INT
+====
+---- QUERY
+select row__id.* from alltypes_promoted_nopart
+where id > 990 and id < 1000;
+---- RESULTS
+0,0,536870912,6531,0
+0,0,536870912,6532,0
+0,0,536870912,6533,0
+0,0,536870912,6534,0
+0,0,536870912,6535,0
+0,0,536870912,6536,0
+0,0,536870912,6537,0
+0,0,536870912,6538,0
+0,0,536870912,6539,0
+---- TYPES
+INT, BIGINT, INT, BIGINT, BIGINT
+====
+---- QUERY
+select row__id.* from alltypes_promoted_nopart
+where id > 7200 and id < 7210;
+---- RESULTS
+0,0,536870912,2381,0
+0,0,536870912,2382,0
+0,0,536870912,2383,0
+0,0,536870912,2384,0
+0,0,536870912,2385,0
+0,0,536870912,2386,0
+0,0,536870912,2387,0
+0,0,536870912,2388,0
+0,0,536870912,2389,0
+---- TYPES
+INT, BIGINT, INT, BIGINT, BIGINT
+====
+---- QUERY
+select row__id.* from alltypes_promoted_nopart
+where row__id.rowid > 1200 and row__id.rowid < 1210;
+---- RESULTS
+0,0,536870912,1201,0
+0,0,536870912,1202,0
+0,0,536870912,1203,0
+0,0,536870912,1204,0
+0,0,536870912,1205,0
+0,0,536870912,1206,0
+0,0,536870912,1207,0
+0,0,536870912,1208,0
+0,0,536870912,1209,0
+---- TYPES
+INT, BIGINT, INT, BIGINT, BIGINT
+====
+---- QUERY
+select row__id.*, id from alltypes_promoted_nopart
+where row__id.rowid > 7200 and row__id.rowid < 7210;
+---- RESULTS
+0,0,536870912,7201,0,491
+0,0,536870912,7202,0,492
+0,0,536870912,7203,0,493
+0,0,536870912,7204,0,494
+0,0,536870912,7205,0,495
+0,0,536870912,7206,0,496
+0,0,536870912,7207,0,497
+0,0,536870912,7208,0,498
+0,0,536870912,7209,0,499
+---- TYPES
+INT, BIGINT, INT, BIGINT, BIGINT, INT
+====
+---- QUERY
+# Check if the proper ACID fields are generated even if the
+# scan range points to the middle of the file.
+set MAX_SCAN_RANGE_LENGTH=1000;
+select row__id.*, id from alltypes_promoted_nopart
+where id < 10;
+---- RESULTS
+0,0,536870912,4030,0,0
+0,0,536870912,4031,0,1
+0,0,536870912,4032,0,2
+0,0,536870912,4033,0,3
+0,0,536870912,4034,0,4
+0,0,536870912,4035,0,5
+0,0,536870912,4036,0,6
+0,0,536870912,4037,0,7
+0,0,536870912,4038,0,8
+0,0,536870912,4039,0,9
+---- TYPES
+INT, BIGINT, INT, BIGINT, BIGINT, INT
+---- RUNTIME_PROFILE
+row_regex: .*NumScannersWithNoReads: [1-9].*
+====
+---- QUERY
+set MAX_SCAN_RANGE_LENGTH=1000;
+select row__id.* from alltypes_promoted_nopart
+where id > 990 and id < 1000;
+---- RESULTS
+0,0,536870912,6531,0
+0,0,536870912,6532,0
+0,0,536870912,6533,0
+0,0,536870912,6534,0
+0,0,536870912,6535,0
+0,0,536870912,6536,0
+0,0,536870912,6537,0
+0,0,536870912,6538,0
+0,0,536870912,6539,0
+---- TYPES
+INT, BIGINT, INT, BIGINT, BIGINT
+---- RUNTIME_PROFILE
+row_regex: .*NumScannersWithNoReads: [1-9].*
+====
+---- QUERY
+set MAX_SCAN_RANGE_LENGTH=1000;
+select row__id.* from alltypes_promoted_nopart
+where id > 7200 and id < 7210;
+---- RESULTS
+0,0,536870912,2381,0
+0,0,536870912,2382,0
+0,0,536870912,2383,0
+0,0,536870912,2384,0
+0,0,536870912,2385,0
+0,0,536870912,2386,0
+0,0,536870912,2387,0
+0,0,536870912,2388,0
+0,0,536870912,2389,0
+---- TYPES
+INT, BIGINT, INT, BIGINT, BIGINT
+---- RUNTIME_PROFILE
+row_regex: .*NumScannersWithNoReads: [1-9].*
+====
+---- QUERY
+set MAX_SCAN_RANGE_LENGTH=1000;
+select row__id.* from alltypes_promoted_nopart
+where row__id.rowid > 1200 and row__id.rowid < 1210;
+---- RESULTS
+0,0,536870912,1201,0
+0,0,536870912,1202,0
+0,0,536870912,1203,0
+0,0,536870912,1204,0
+0,0,536870912,1205,0
+0,0,536870912,1206,0
+0,0,536870912,1207,0
+0,0,536870912,1208,0
+0,0,536870912,1209,0
+---- TYPES
+INT, BIGINT, INT, BIGINT, BIGINT
+---- RUNTIME_PROFILE
+row_regex: .*NumScannersWithNoReads: [1-9].*
+====
+---- QUERY
+set MAX_SCAN_RANGE_LENGTH=1000;
+select row__id.*, id from alltypes_promoted_nopart
+where row__id.rowid > 7200 and row__id.rowid < 7210;
+---- RESULTS
+0,0,536870912,7201,0,491
+0,0,536870912,7202,0,492
+0,0,536870912,7203,0,493
+0,0,536870912,7204,0,494
+0,0,536870912,7205,0,495
+0,0,536870912,7206,0,496
+0,0,536870912,7207,0,497
+0,0,536870912,7208,0,498
+0,0,536870912,7209,0,499
+---- TYPES
+INT, BIGINT, INT, BIGINT, BIGINT, INT
+---- RUNTIME_PROFILE
+row_regex: .*NumScannersWithNoReads: [1-9].*
+====
+---- QUERY
+select count(*) from functional_orc_def.alltypes_promoted where id % 2 = 0;
+---- RESULTS
+3650
+---- TYPES
+BIGINT
+====
+---- QUERY
+select * from functional_orc_def.alltypes_promoted where id = 3000;
+---- RESULTS
+3000,true,0,0,0,0,0,0,'10/28/09','0',2009-10-28 04:30:12.150000000,2009,10
+---- TYPES
+INT, BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, STRING, STRING, TIMESTAMP, INT, INT
+====
+---- QUERY
+select row__id.originaltransaction, row__id.rowid, id
+from functional_orc_def.alltypes_promoted
+where id < 5;
+---- RESULTS
+0,0,0
+0,1,1
+0,2,2
+0,3,3
+0,4,4
+---- TYPES
+BIGINT, BIGINT, INT
+====
+---- QUERY
+select row__id.originaltransaction, row__id.currenttransaction
+from functional_orc_def.alltypes_promoted
+where id = 0;
+---- RESULTS
+0,0
+---- TYPES
+BIGINT,BIGINT
+====
+---- QUERY
+create table orig_part (i int) partitioned by (p int) stored as orc;
+====
+---- HIVE_QUERY
+use $DATABASE;
+insert into orig_part partition (p=1) values (1), (2), (3);
+alter table orig_part set tblproperties('EXTERNAL'='FALSE','transactional'='true');
+====
+---- QUERY
+refresh orig_part;
+select row__id.*, * from orig_part;
+---- LABELS
+OPERATION,ORIGINALTRANSACTION,BUCKET,ROWID,CURRENTTRANSACTION,I,P
+---- RESULTS
+0,0,536870912,0,0,1,1
+0,0,536870912,1,0,2,1
+0,0,536870912,2,0,3,1
+---- TYPES
+INT,BIGINT,INT,BIGINT,BIGINT,INT,INT
+====
diff --git a/tests/query_test/test_acid.py b/tests/query_test/test_acid.py
index f9e3f02..f851bb7 100644
--- a/tests/query_test/test_acid.py
+++ b/tests/query_test/test_acid.py
@@ -17,6 +17,7 @@
 
 # Functional tests for ACID integration with Hive.
 
+import os
 import pytest
 import time
 
@@ -126,6 +127,28 @@ class TestAcid(ImpalaTestSuite):
   @SkipIfADLS.hive
   @SkipIfIsilon.hive
   @SkipIfLocal.hive
+  def test_full_acid_original_files(self, vector, unique_database):
+    table_name = "alltypes_promoted_nopart"
+    fq_table_name = "{0}.{1}".format(unique_database, table_name)
+    self.client.execute("""CREATE TABLE {0} (
+          id INT, bool_col BOOLEAN, tinyint_col TINYINT, smallint_col SMALLINT,
+          int_col INT, bigint_col BIGINT, float_col FLOAT, double_col DOUBLE,
+          date_string_col STRING, string_col STRING, timestamp_col TIMESTAMP,
+          year INT, month INT) STORED AS ORC""".format(fq_table_name))
+    table_uri = self._get_table_location(fq_table_name, vector)
+    original_file = os.environ['IMPALA_HOME'] + "/testdata/data/alltypes_non_acid.orc"
+    self.hdfs_client.copy_from_local(original_file, table_uri + "/000000_0")
+    self.run_stmt_in_hive("""alter table {0}.{1}
+        set tblproperties('EXTERNAL'='FALSE','transactional'='true')""".format(
+        unique_database, table_name))
+    self.run_test_case('QueryTest/full-acid-original-file', vector, unique_database)
+
+  @SkipIfHive2.acid
+  @SkipIfS3.hive
+  @SkipIfABFS.hive
+  @SkipIfADLS.hive
+  @SkipIfIsilon.hive
+  @SkipIfLocal.hive
   def test_acid_insert_statschg(self, vector, unique_database):
     self.run_test_case('QueryTest/acid-clear-statsaccurate',
         vector, use_db=unique_database)