You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/07/28 19:24:46 UTC

[impala] branch master updated: IMPALA-8636: Implement INSERT for insert-only ACID tables

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 6360657  IMPALA-8636: Implement INSERT for insert-only ACID tables
6360657 is described below

commit 6360657cb4d3b7655d9ff80958b2694ae4609370
Author: Zoltan Borok-Nagy <bo...@cloudera.com>
AuthorDate: Fri Jun 7 11:27:31 2019 +0200

    IMPALA-8636: Implement INSERT for insert-only ACID tables
    
    This commit adds INSERT support for insert-only ACID tables.
    
    The Frontend opens a transaction for INSERT statements when the target
    table is transactional. It also allocates a write ID for the target
    table. The Frontend aborts the transaction if an error occurs during
    analysis/planning.
    
    The Backend gets the transaction id and the write id in TFinalizeParams.
    The write id is also set the for the HDFS table sinks. The sinks write
    the files at their final destination which is an ACID base or delta
    directory. There is no need for finalization of transactional INSERTS.
    
    When the sinks finished with writing the data, the Coordinator invokes
    updateCatalog() on catalogd which also commits the transaction if
    everything went well, otherwise the Coordinator aborts the transaction.
    
    Testing:
    * added new tables during dataload
    * added acid-insert.test file with INSERT statements against the new
      tables
    * test insertions between ACID and non-ACID tables
    * test error scenarios via debug actions
    * added integration test with Hive to test_hms_integration.py. The test
      inserts data with Impala and reads with Hive. (These integration
      tests only run with exhaustive exploration strategy)
    
    TODO in following commits:
    * add locks and heartbeats (without heartbeats long-running transactions
      might be aborted by HMS)
    * implement TRUNCATE
    * CTAS creates files in the 'root' directory of the table/partition. It
      is handled correctly during SELECT, but would be better to create a
      base directory from the beginning. Hive creates a delta directory
      for CTAS.
    
    Change-Id: Id6c36fa6902676f06b4e38730f737becfc7c06ad
    Reviewed-on: http://gerrit.cloudera.org:8080/13559
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/hdfs-table-sink.cc                     |  34 ++-
 be/src/exec/hdfs-table-sink.h                      |  19 +-
 be/src/runtime/coordinator.cc                      |  18 +-
 be/src/runtime/coordinator.h                       |   2 +-
 be/src/service/client-request-state.cc             |  54 ++++-
 be/src/service/client-request-state.h              |  22 +-
 be/src/service/frontend.cc                         |   9 +-
 be/src/service/frontend.h                          |   4 +
 be/src/util/jni-util.h                             |   6 +
 common/thrift/CatalogService.thrift                |   3 +
 common/thrift/DataSinks.thrift                     |   3 +
 common/thrift/Frontend.thrift                      |   6 +
 common/thrift/ImpalaInternalService.thrift         |   3 +
 .../org/apache/impala/compat/MetastoreShim.java    |  33 +++
 .../org/apache/impala/compat/MetastoreShim.java    |  71 +++++-
 .../org/apache/impala/analysis/AlterTableStmt.java |   1 +
 .../impala/analysis/DropTableOrViewStmt.java       |   1 +
 .../org/apache/impala/analysis/InsertStmt.java     |   9 +-
 .../org/apache/impala/analysis/LoadDataStmt.java   |   1 +
 .../org/apache/impala/analysis/TruncateStmt.java   |   1 +
 .../java/org/apache/impala/catalog/HdfsTable.java  |   3 +-
 .../apache/impala/common/TransactionException.java |  37 +++
 .../org/apache/impala/planner/HdfsTableSink.java   |   9 +-
 .../java/org/apache/impala/planner/TableSink.java  |  13 +-
 .../apache/impala/service/CatalogOpExecutor.java   |  23 +-
 .../java/org/apache/impala/service/Frontend.java   | 260 ++++++++++++++-------
 .../org/apache/impala/service/JniFrontend.java     |  10 +
 .../org/apache/impala/analysis/AnalyzerTest.java   |   6 +-
 .../functional/functional_schema_template.sql      |  23 ++
 .../datasets/functional/schema_constraints.csv     |   4 +
 .../queries/QueryTest/acid-insert-fail.test        |  85 +++++++
 .../queries/QueryTest/acid-insert.test             | 129 ++++++++++
 .../queries/QueryTest/acid-negative.test           |   5 -
 .../queries/QueryTest/acid-nonacid-insert.test     |  69 ++++++
 tests/metadata/test_hms_integration.py             |  66 +++++-
 tests/query_test/test_insert.py                    |  30 ++-
 36 files changed, 939 insertions(+), 133 deletions(-)

diff --git a/be/src/exec/hdfs-table-sink.cc b/be/src/exec/hdfs-table-sink.cc
index 0367561..ea53d1a 100644
--- a/be/src/exec/hdfs-table-sink.cc
+++ b/be/src/exec/hdfs-table-sink.cc
@@ -68,6 +68,10 @@ HdfsTableSink::HdfsTableSink(TDataSinkId sink_id, const RowDescriptor* row_desc,
     sort_columns_(tsink.table_sink.hdfs_table_sink.sort_columns),
     current_clustered_partition_(nullptr) {
   DCHECK(tsink.__isset.table_sink);
+  if (tsink.table_sink.hdfs_table_sink.__isset.write_id) {
+    write_id_ = tsink.table_sink.hdfs_table_sink.write_id;
+    DCHECK_GT(write_id_, 0);
+  }
 }
 
 OutputPartition::OutputPartition()
@@ -209,6 +213,10 @@ void HdfsTableSink::BuildHdfsFileNames(
 
   // Create final_hdfs_file_name_prefix and tmp_hdfs_file_name_prefix.
   // Path: <hdfs_base_dir>/<partition_values>/<unique_id_str>
+  // Or, for transactional tables:
+  // Path: <hdfs_base_dir>/<partition_values>/<transaction_directory>/<unique_id_str>
+  // Where <transaction_directory> is either a 'base' or a 'delta' directory in Hive ACID
+  // terminology.
 
   // Temporary files are written under the following path which is unique to this sink:
   // <table_dir>/_impala_insert_staging/<query_id>/<per_fragment_unique_id>_dir/
@@ -229,15 +237,20 @@ void HdfsTableSink::BuildHdfsFileNames(
       query_suffix);
 
   if (partition_descriptor.location().empty()) {
-    output_partition->final_hdfs_file_name_prefix = Substitute("$0/$1$2",
-        table_desc_->hdfs_base_dir(), output_partition->partition_name, query_suffix);
+    output_partition->final_hdfs_file_name_prefix = Substitute("$0/$1",
+        table_desc_->hdfs_base_dir(), output_partition->partition_name);
   } else {
     // If the partition descriptor has a location (as set by alter table add partition
     // with a location clause), that provides the complete directory path for this
     // partition. No partition key suffix ("p=1/j=foo/") should be added.
     output_partition->final_hdfs_file_name_prefix =
-        Substitute("$0/$1", partition_descriptor.location(), query_suffix);
+        Substitute("$0/", partition_descriptor.location());
+  }
+  if (IsTransactional()) {
+    string acid_dir = Substitute(overwrite_ ? "/base_$0/" : "/delta_$0_$0/", write_id_);
+    output_partition->final_hdfs_file_name_prefix += acid_dir;
   }
+  output_partition->final_hdfs_file_name_prefix += query_suffix;
 
   output_partition->num_files = 0;
 }
@@ -484,7 +497,9 @@ Status HdfsTableSink::InitOutputPartition(RuntimeState* state,
 
   // It is incorrect to initialize a writer if there are no rows to feed it. The writer
   // could incorrectly create an empty file or empty partition.
-  if (empty_partition) return Status::OK();
+  // However, for transactional tables we should create a new empty base directory in
+  // case of INSERT OVERWRITEs.
+  if (empty_partition && (!overwrite_ || !IsTransactional())) return Status::OK();
 
   switch (partition_descriptor.file_format()) {
     case THdfsFileFormat::TEXT:
@@ -606,6 +621,7 @@ Status HdfsTableSink::Send(RuntimeState* state, RowBatch* batch) {
       }
     }
   }
+
   return Status::OK();
 }
 
@@ -658,8 +674,8 @@ Status HdfsTableSink::FlushFinal(RuntimeState* state) {
       ++cur_partition) {
     RETURN_IF_ERROR(FinalizePartitionFile(state, cur_partition->second.first.get()));
   }
-
-  return Status::OK();
+  // Returns OK if there is no debug action.
+  return DebugAction(state->query_options(), "FIS_FAIL_HDFS_TABLE_SINK_FLUSH_FINAL");
 }
 
 void HdfsTableSink::Close(RuntimeState* state) {
@@ -683,8 +699,9 @@ void HdfsTableSink::Close(RuntimeState* state) {
 }
 
 bool HdfsTableSink::ShouldSkipStaging(RuntimeState* state, OutputPartition* partition) {
-  return IsS3APath(partition->final_hdfs_file_name_prefix.c_str()) && !overwrite_ &&
-      state->query_options().s3_skip_insert_staging;
+  if (IsTransactional()) return true;
+  return (IsS3APath(partition->final_hdfs_file_name_prefix.c_str()) && !overwrite_ &&
+      state->query_options().s3_skip_insert_staging);
 }
 
 string HdfsTableSink::DebugString() const {
@@ -694,6 +711,7 @@ string HdfsTableSink::DebugString() const {
       << " partition_key_exprs="
       << ScalarExpr::DebugString(partition_key_exprs_)
       << " output_exprs=" << ScalarExpr::DebugString(output_exprs_)
+      << " write_id=" << write_id_
       << ")";
   return out.str();
 }
diff --git a/be/src/exec/hdfs-table-sink.h b/be/src/exec/hdfs-table-sink.h
index 277a212..81d5d40 100644
--- a/be/src/exec/hdfs-table-sink.h
+++ b/be/src/exec/hdfs-table-sink.h
@@ -125,6 +125,13 @@ struct OutputPartition {
 /// The temporary directory is <table base dir>/<unique_id.hi>-<unique_id.lo>_data
 /// such that an external tool can easily clean up incomplete inserts.
 /// This is consistent with Hive's behavior.
+//
+/// ACID tables:
+/// In case of ACID tables the sink writes the files into their final destination which
+/// is an ACID base or delta directory. No additional moves are required at the end, only
+/// a commit for the ACID transaction.
+/// The name of the output directory will be
+/// <table base dir>/<partition dirs>/<ACID base or delta directory>
 class HdfsTableSink : public DataSink {
  public:
   HdfsTableSink(TDataSinkId sink_id, const RowDescriptor* row_desc,
@@ -144,9 +151,8 @@ class HdfsTableSink : public DataSink {
   /// TODO: IMPALA-2988: Move calls to functions that can fail in Close() to FlushFinal()
   virtual Status FlushFinal(RuntimeState* state);
 
-  /// Move temporary Hdfs files to final locations.
-  /// Remove original Hdfs files if overwrite was specified.
-  /// Closes output_exprs and partition_key_exprs.
+  /// Closes writers, output_exprs and partition_key_exprs and releases resources.
+  /// The temporary files will be moved to their final destination by the Coordinator.
   virtual void Close(RuntimeState* state);
 
   int skip_header_line_count() const { return skip_header_line_count_; }
@@ -239,6 +245,9 @@ class HdfsTableSink : public DataSink {
   // to the final location and need to write to the temporary staging location.
   bool ShouldSkipStaging(RuntimeState* state, OutputPartition* partition);
 
+  /// Returns TRUE if the target table is an insert-only ACID table.
+  bool IsTransactional() const { return write_id_ != -1; }
+
   /// Descriptor of target table. Set in Prepare().
   const HdfsTableDescriptor* table_desc_;
 
@@ -257,6 +266,10 @@ class HdfsTableSink : public DataSink {
   /// Indicates whether the existing partitions should be overwritten.
   bool overwrite_;
 
+  /// The allocated write ID for the target table. -1 means that the target table is
+  /// a plain, non-ACID table.
+  int64_t write_id_ = -1;
+
   /// Indicates whether the input is ordered by the partition keys, meaning partitions can
   /// be opened, written, and closed one by one.
   bool input_is_clustered_;
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index b2c88da..2224a8a 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -568,12 +568,13 @@ void Coordinator::HandleExecStateTransition(
   ComputeQuerySummary();
 }
 
-Status Coordinator::FinalizeHdfsInsert() {
+Status Coordinator::FinalizeHdfsDml() {
   // All instances must have reported their final statuses before finalization, which is a
   // post-condition of Wait. If the query was not successful, still try to clean up the
   // staging directory.
   DCHECK(has_called_wait_);
   DCHECK(finalize_params() != nullptr);
+  bool is_transactional = finalize_params()->__isset.write_id;
 
   VLOG_QUERY << "Finalizing query: " << PrintId(query_id());
   SCOPED_TIMER(finalization_timer_);
@@ -587,10 +588,17 @@ Status Coordinator::FinalizeHdfsInsert() {
     DCHECK(hdfs_table != nullptr)
         << "INSERT target table not known in descriptor table: "
         << finalize_params()->table_id;
-    return_status = dml_exec_state_.FinalizeHdfsInsert(*finalize_params(),
-        query_ctx().client_request.query_options.s3_skip_insert_staging,
-        hdfs_table, query_profile_);
+    // There is no need for finalization for transactional inserts.
+    if (!is_transactional) {
+      // 'write_id' is NOT set, therefore we need to do some finalization, e.g. moving
+      // files or delete old files in case of INSERT OVERWRITE.
+      return_status = dml_exec_state_.FinalizeHdfsInsert(*finalize_params(),
+          query_ctx().client_request.query_options.s3_skip_insert_staging,
+          hdfs_table, query_profile_);
+    }
     hdfs_table->ReleaseResources();
+  } else if (is_transactional) {
+    parent_request_state_->AbortTransaction();
   }
 
   stringstream staging_dir;
@@ -631,7 +639,7 @@ Status Coordinator::Wait() {
   WaitForBackends();
   if (finalize_params() != nullptr) {
     RETURN_IF_ERROR(UpdateExecState(
-            FinalizeHdfsInsert(), nullptr, FLAGS_hostname));
+            FinalizeHdfsDml(), nullptr, FLAGS_hostname));
   }
   // DML requests are finished at this point.
   RETURN_IF_ERROR(SetNonErrorTerminalState(ExecState::RETURNED_RESULTS));
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index 044bdd8..9098dd0 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -474,7 +474,7 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// INSERT. Called by Wait() only after all fragment instances have returned, or if
   /// the query has failed, in which case it only cleans up temporary data rather than
   /// finishing the INSERT in flight.
-  Status FinalizeHdfsInsert() WARN_UNUSED_RESULT;
+  Status FinalizeHdfsDml() WARN_UNUSED_RESULT;
 
   /// Helper for Exec(). Populates backend_states_, starts query execution at all
   /// backends in parallel, and blocks until startup completes.
diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index 4e848e8..2ba7b9e 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -723,6 +723,9 @@ void ClientRequestState::Done() {
     }
   }
 
+  // If the transaction didn't get committed by this point then we should just abort it.
+  if (InTransaction()) AbortTransaction();
+
   UpdateEndTime();
   unique_lock<mutex> l(lock_);
   query_events_->MarkEvent("Unregister query");
@@ -1097,6 +1100,9 @@ Status ClientRequestState::UpdateCatalog() {
       catalog_update.target_table = finalize_params.table_name;
       catalog_update.db_name = finalize_params.table_db;
       catalog_update.is_overwrite = finalize_params.is_overwrite;
+      if (InTransaction()) {
+        catalog_update.__set_transaction_id(finalize_params.transaction_id);
+      }
 
       Status cnxn_status;
       const TNetworkAddress& address =
@@ -1107,12 +1113,22 @@ Status ClientRequestState::UpdateCatalog() {
 
       VLOG_QUERY << "Executing FinalizeDml() using CatalogService";
       TUpdateCatalogResponse resp;
-      RETURN_IF_ERROR(client.DoRpc(&CatalogServiceClientWrapper::UpdateCatalog,
-          catalog_update, &resp));
-
-      Status status(resp.result.status);
-      if (!status.ok()) LOG(ERROR) << "ERROR Finalizing DML: " << status.GetDetail();
-      RETURN_IF_ERROR(status);
+      Status status = DebugAction(query_options(), "CLIENT_REQUEST_UPDATE_CATALOG");
+      if (status.ok()) {
+        status = client.DoRpc(
+            &CatalogServiceClientWrapper::UpdateCatalog, catalog_update, &resp);
+      }
+      if (status.ok()) status = Status(resp.result.status);
+      if (!status.ok()) {
+        if (InTransaction()) AbortTransaction();
+        LOG(ERROR) << "ERROR Finalizing DML: " << status.GetDetail();
+        return status;
+      }
+      if (InTransaction()) {
+        // UpdateCatalog() succeeded and already committed the transaction for us.
+        ClearTransactionState();
+        query_events_->MarkEvent("Transaction committed");
+      }
       RETURN_IF_ERROR(parent_server_->ProcessCatalogUpdateResult(resp.result,
           exec_request_.query_options.sync_ddl));
     }
@@ -1326,4 +1342,30 @@ void ClientRequestState::UpdateEndTime() {
         "End Time", ToStringFromUnixMicros(end_time_us(), TimePrecision::Nanosecond));
   }
 }
+
+int64_t ClientRequestState::GetTransactionId() const {
+  DCHECK(InTransaction());
+  return exec_request_.query_exec_request.finalize_params.transaction_id;
+}
+
+bool ClientRequestState::InTransaction() const {
+  return exec_request_.query_exec_request.finalize_params.__isset.transaction_id &&
+      !transaction_closed_;
+}
+
+void ClientRequestState::AbortTransaction() {
+  DCHECK(InTransaction());
+  if (frontend_->AbortTransaction(GetTransactionId()).ok()) {
+    query_events_->MarkEvent("Transaction aborted");
+  } else {
+    VLOG(1) << Substitute("Unable to abort transaction with id: $0", GetTransactionId());
+  }
+  ClearTransactionState();
+}
+
+void ClientRequestState::ClearTransactionState() {
+  DCHECK(InTransaction());
+  transaction_closed_ = true;
+}
+
 }
diff --git a/be/src/service/client-request-state.h b/be/src/service/client-request-state.h
index a06442d..4f1d712 100644
--- a/be/src/service/client-request-state.h
+++ b/be/src/service/client-request-state.h
@@ -304,6 +304,9 @@ protected:
   /// True if the query expired by timing out.
   bool is_expired_ = false;
 
+  /// True if there was a transaction and it got committed or aborted.
+  bool transaction_closed_ = false;
+
   /// Executor for any child queries (e.g. compute stats subqueries). Always non-NULL.
   const boost::scoped_ptr<ChildQueryExecutor> child_query_executor_;
 
@@ -490,7 +493,11 @@ protected:
   Status FetchRowsInternal(const int32_t max_rows, QueryResultSet* fetched_rows)
       WARN_UNUSED_RESULT;
 
-  /// Gather and publish all required updates to the metastore
+  /// Gather and publish all required updates to the metastore.
+  /// For transactional queries:
+  /// If everything goes well the transaction is committed by the Catalogd.
+  /// If an error occurs the transaction gets aborted by this function. Either way
+  /// the transaction will be closed when this function returns.
   Status UpdateCatalog() WARN_UNUSED_RESULT;
 
   /// Copies results into request_result_set_
@@ -533,6 +540,19 @@ protected:
   /// and populates the result set with them. It covers the subset of options for
   /// 'SET' and all of them for 'SET ALL'
   void PopulateResultForSet(bool is_set_all);
+
+  /// Returns the transaction id for this client request. 'InTransaction()' must be
+  /// true when invoked.
+  int64_t GetTransactionId() const;
+
+  /// Returns true if there is an open transaction for this client request.
+  bool InTransaction() const;
+
+  /// Aborts the transaction of this client request.
+  void AbortTransaction();
+
+  /// Invoke this function when the transaction is committed or aborted.
+  void ClearTransactionState();
 };
 
 }
diff --git a/be/src/service/frontend.cc b/be/src/service/frontend.cc
index 757ef92..f9f8f9e 100644
--- a/be/src/service/frontend.cc
+++ b/be/src/service/frontend.cc
@@ -109,7 +109,8 @@ Frontend::Frontend() {
     {"getTableFiles", "([B)[B", &get_table_files_id_},
     {"showCreateFunction", "([B)Ljava/lang/String;", &show_create_function_id_},
     {"buildTestDescriptorTable", "([B)[B", &build_test_descriptor_table_id_},
-    {"callQueryCompleteHooks", "([B)V", &call_query_complete_hooks_id_}
+    {"callQueryCompleteHooks", "([B)V", &call_query_complete_hooks_id_},
+    {"abortTransaction", "(J)V", &abort_txn_},
   };
 
   JNIEnv* jni_env = JniUtil::GetJNIEnv();
@@ -265,6 +266,10 @@ Status Frontend::LoadData(const TLoadDataReq& request, TLoadDataResp* response)
   return JniUtil::CallJniMethod(fe_, load_table_data_id_, request, response);
 }
 
+Status Frontend::AbortTransaction(int64_t transaction_id) {
+  return JniUtil::CallJniMethod(fe_, abort_txn_, transaction_id);
+}
+
 bool Frontend::IsAuthorizationError(const Status& status) {
   return !status.ok() && status.GetDetail().find("AuthorizationException") == 0;
 }
@@ -298,4 +303,4 @@ Status Frontend::BuildTestDescriptorTable(const TBuildTestDescriptorTableParams&
 // Call FE post-query execution hook
 Status Frontend::CallQueryCompleteHooks(const TQueryCompleteContext& context) {
   return JniUtil::CallJniMethod(fe_, call_query_complete_hooks_id_, context);
-}
\ No newline at end of file
+}
diff --git a/be/src/service/frontend.h b/be/src/service/frontend.h
index f063fce..2483f50 100644
--- a/be/src/service/frontend.h
+++ b/be/src/service/frontend.h
@@ -167,6 +167,9 @@ class Frontend {
   /// completed successfully.
   Status LoadData(const TLoadDataReq& load_data_request, TLoadDataResp* response);
 
+  /// Aborts transaction with the given transaction id.
+  Status AbortTransaction(int64_t transaction_id);
+
   /// Returns true if the error returned by the FE was due to an AuthorizationException.
   static bool IsAuthorizationError(const Status& status);
 
@@ -218,6 +221,7 @@ class Frontend {
   jmethodID get_table_files_id_; // JniFrontend.getTableFiles
   jmethodID show_create_function_id_; // JniFrontend.showCreateFunction
   jmethodID call_query_complete_hooks_id_; // JniFrontend.callQueryCompleteHooks
+  jmethodID abort_txn_; // JniFrontend.abortTransaction()
 
   // Only used for testing.
   jmethodID build_test_descriptor_table_id_; // JniFrontend.buildTestDescriptorTable()
diff --git a/be/src/util/jni-util.h b/be/src/util/jni-util.h
index 7c90d33..f56ce40 100644
--- a/be/src/util/jni-util.h
+++ b/be/src/util/jni-util.h
@@ -424,6 +424,12 @@ inline Status JniUtil::CallJniMethod(const jobject& obj, const jmethodID& method
   return JniCall::instance_method(obj, method).with_thrift_arg(arg).Call();
 }
 
+template <>
+inline Status JniUtil::CallJniMethod<int64_t>(const jobject& obj, const jmethodID& method,
+    const int64_t& arg) {
+  return JniCall::instance_method(obj, method).with_primitive_arg(arg).Call();
+}
+
 template <typename T, typename R>
 inline Status JniUtil::CallJniMethod(const jobject& obj, const jmethodID& method,
     const T& arg, R* response) {
diff --git a/common/thrift/CatalogService.thrift b/common/thrift/CatalogService.thrift
index ccabdc8..eafb79d 100644
--- a/common/thrift/CatalogService.thrift
+++ b/common/thrift/CatalogService.thrift
@@ -206,6 +206,9 @@ struct TUpdateCatalogRequest {
 
   // True if the update corresponds to an "insert overwrite" operation
   7: required bool is_overwrite;
+
+  // ACID transaction ID for transactional inserts.
+  8: optional i64 transaction_id;
 }
 
 // Response from a TUpdateCatalogRequest
diff --git a/common/thrift/DataSinks.thrift b/common/thrift/DataSinks.thrift
index 5a42e8c..1cce0eb 100644
--- a/common/thrift/DataSinks.thrift
+++ b/common/thrift/DataSinks.thrift
@@ -76,6 +76,9 @@ struct THdfsTableSink {
   // are stored in the 'sort.columns' table property. This is used in the backend to
   // populate the RowGroup::sorting_columns list in parquet files.
   5: optional list<i32> sort_columns
+
+  // Stores the allocated ACID write id if the target table is transactional.
+  6: optional i64 write_id
 }
 
 // Structure to encapsulate specific options that are passed down to the KuduTableSink
diff --git a/common/thrift/Frontend.thrift b/common/thrift/Frontend.thrift
index f5ba40c..d6e5f70 100644
--- a/common/thrift/Frontend.thrift
+++ b/common/thrift/Frontend.thrift
@@ -357,6 +357,12 @@ struct TFinalizeParams {
   // Identifier for the target table in the query-wide descriptor table (see
   // TDescriptorTable and TTableDescriptor).
   6: optional i64 table_id;
+
+  // Stores the ACID transaction id of the target table for transactional INSERTs.
+  7: optional i64 transaction_id;
+
+  // Stores the ACID write id of the target table for transactional INSERTs.
+  8: optional i64 write_id;
 }
 
 // Request for a LOAD DATA statement. LOAD DATA is only supported for HDFS backed tables.
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index 1c41c60..2cc2f3e 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -521,6 +521,9 @@ struct TQueryCtx {
   // coordinator and executors for a given query always agree this value.
   22: optional i32 status_report_interval_ms
   23: optional i32 status_report_max_retry_s
+
+  // Stores the transaction id if the query is transactional.
+  25: optional i64 transaction_id
 }
 
 // Specification of one output destination of a plan fragment
diff --git a/fe/src/compat-hive-2/java/org/apache/impala/compat/MetastoreShim.java b/fe/src/compat-hive-2/java/org/apache/impala/compat/MetastoreShim.java
index 61ec3e5..2ea270a 100644
--- a/fe/src/compat-hive-2/java/org/apache/impala/compat/MetastoreShim.java
+++ b/fe/src/compat-hive-2/java/org/apache/impala/compat/MetastoreShim.java
@@ -53,6 +53,7 @@ import org.apache.hive.service.rpc.thrift.TGetTablesReq;
 import org.apache.impala.authorization.User;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.Pair;
+import org.apache.impala.common.TransactionException;
 import org.apache.impala.service.Frontend;
 import org.apache.impala.service.MetadataOp;
 import org.apache.impala.thrift.TMetadataOpRequest;
@@ -301,6 +302,38 @@ public class MetastoreShim {
   }
 
   /**
+   * Hive-3 only function
+   */
+  public static long openTransaction(IMetaStoreClient client, String userId)
+      throws TransactionException {
+    throw new UnsupportedOperationException("openTransaction is not supported.");
+  }
+
+  /**
+   * Hive-3 only function
+   */
+  public static void commitTransaction(IMetaStoreClient client, long txnId)
+      throws TransactionException {
+    throw new UnsupportedOperationException("commitTransaction is not supported.");
+  }
+
+  /**
+   * Hive-3 only function
+   */
+  public static void abortTransaction(IMetaStoreClient client, long txnId)
+      throws TransactionException {
+    throw new UnsupportedOperationException("abortTransaction is not supported.");
+  }
+
+  /**
+   * Hive-3 only function
+   */
+  public static long allocateTableWriteId(IMetaStoreClient client, long txnId,
+      String dbName, String tableName) throws TransactionException {
+    throw new UnsupportedOperationException("allocateTableWriteId is not supported.");
+  }
+
+  /**
    * @return the shim version.
    */
   public static long getMajorVersion() {
diff --git a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
index db06ffe..371e0d9 100644
--- a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
+++ b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
@@ -28,6 +28,8 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
+
+import java.util.Arrays;
 import java.util.EnumSet;
 import java.util.List;
 
@@ -61,6 +63,7 @@ import org.apache.hive.service.rpc.thrift.TGetTablesReq;
 import org.apache.impala.authorization.User;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.Pair;
+import org.apache.impala.common.TransactionException;
 import org.apache.impala.compat.HiveMetadataFormatUtils;
 import org.apache.impala.service.BackendConfig;
 import org.apache.impala.service.Frontend;
@@ -445,6 +448,70 @@ public class MetastoreShim {
   }
 
   /**
+   * Opens a new transaction.
+   * @param client is the HMS client to be used.
+   * @param userId of user who is opening this transaction.
+   * @return the new transaction id.
+   * @throws TransactionException
+   */
+  public static long openTransaction(IMetaStoreClient client, String userId)
+      throws TransactionException {
+    try {
+      return client.openTxn(userId);
+    } catch (Exception e) {
+      throw new TransactionException(e.getMessage());
+    }
+  }
+
+  /**
+   * Commits a transaction.
+   * @param client is the HMS client to be used.
+   * @param txnId is the transaction id.
+   * @throws TransactionException
+   */
+  public static void commitTransaction(IMetaStoreClient client, long txnId)
+      throws TransactionException {
+    try {
+      client.commitTxn(txnId);
+    } catch (Exception e) {
+      throw new TransactionException(e.getMessage());
+    }
+  }
+
+  /**
+   * Aborts a transaction.
+   * @param client is the HMS client to be used.
+   * @param txnId is the transaction id.
+   * @throws TransactionException
+   */
+  public static void abortTransaction(IMetaStoreClient client, long txnId)
+      throws TransactionException {
+    try {
+      client.abortTxns(Arrays.asList(txnId));
+    } catch (Exception e) {
+      throw new TransactionException(e.getMessage());
+    }
+  }
+
+  /**
+   * Allocates a write id for the given table.
+   * @param client is the HMS client to be used.
+   * @param txnId is the transaction id.
+   * @param dbName is the database name.
+   * @param tableName is the target table name.
+   * @return the allocated write id.
+   * @throws TransactionException
+   */
+  public static long allocateTableWriteId(IMetaStoreClient client, long txnId,
+      String dbName, String tableName) throws TransactionException {
+    try {
+      return client.allocateTableWriteId(txnId, dbName, tableName);
+    } catch (Exception e) {
+      throw new TransactionException(e.getMessage());
+    }
+  }
+
+  /**
    * Set impala capabilities to hive client
    * Impala supports:
    * - external table read/write
@@ -464,12 +531,12 @@ public class MetastoreShim {
         BackendConfig.INSTANCE.getImpalaBuildVersion() : String.valueOf(MAJOR_VERSION);
     if (buildVersion == null) buildVersion = String.valueOf(MAJOR_VERSION);
 
-    // TODO: Add HIVEMANAGEDINSERTWRITE once IMPALA-8636 goes in.
     String impalaId = String.format("Impala%s@%s", buildVersion, hostName);
     String[] capabilities = new String[] {
         EXTWRITE, // External table write
         EXTREAD,  // External table read
-        HIVEMANAGEDINSERTREAD,
+        HIVEMANAGEDINSERTREAD, // Insert-only table read
+        HIVEMANAGEDINSERTWRITE, // Insert-only table write
         HIVESQL,
         HIVEMQT,
         HIVEBUCKET2 // Includes the capability to get the correct bucket number.
diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterTableStmt.java b/fe/src/main/java/org/apache/impala/analysis/AlterTableStmt.java
index c023df7..47f4ca1 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AlterTableStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AlterTableStmt.java
@@ -91,6 +91,7 @@ public abstract class AlterTableStmt extends StatementBase {
     Preconditions.checkState(tableRef instanceof BaseTableRef);
     table_ = tableRef.getTable();
     analyzer.checkTableCapability(table_, Analyzer.OperationType.WRITE);
+    analyzer.ensureTableNotTransactional(table_);
     if (table_ instanceof FeDataSourceTable
         && !(this instanceof AlterTableSetColumnStats)) {
       throw new AnalysisException(String.format(
diff --git a/fe/src/main/java/org/apache/impala/analysis/DropTableOrViewStmt.java b/fe/src/main/java/org/apache/impala/analysis/DropTableOrViewStmt.java
index 74b3019..047e285 100644
--- a/fe/src/main/java/org/apache/impala/analysis/DropTableOrViewStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/DropTableOrViewStmt.java
@@ -121,6 +121,7 @@ public class DropTableOrViewStmt extends StatementBase {
       if (dropTable_) {
         // To drop a view needs not write capabilities, only checks for tables.
         analyzer.checkTableCapability(table, Analyzer.OperationType.WRITE);
+        analyzer.ensureTableNotTransactional(table);
       }
 
     } catch (TableLoadingException e) {
diff --git a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
index f3f0f35..a5cc510 100644
--- a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
@@ -175,6 +175,9 @@ public class InsertStmt extends StatementBase {
   // non-Kudu tables.
   private List<Expr> primaryKeyExprs_ = new ArrayList<>();
 
+  // Set by the Frontend if the target table is transactional.
+  private long writeId_ = -1;
+
   // END: Members that need to be reset()
   /////////////////////////////////////////
 
@@ -232,6 +235,7 @@ public class InsertStmt extends StatementBase {
     columnPermutation_ = other.columnPermutation_;
     table_ = other.table_;
     isUpsert_ = other.isUpsert_;
+    writeId_ = other.writeId_;
   }
 
   @Override
@@ -252,6 +256,7 @@ public class InsertStmt extends StatementBase {
     resultExprs_.clear();
     mentionedColumns_.clear();
     primaryKeyExprs_.clear();
+    writeId_ = -1;
   }
 
   @Override
@@ -902,6 +907,7 @@ public class InsertStmt extends StatementBase {
   public TableName getTargetTableName() { return targetTableName_; }
   public FeTable getTargetTable() { return table_; }
   public void setTargetTable(FeTable table) { this.table_ = table; }
+  public void setWriteId(long writeId) { this.writeId_ = writeId; }
   public boolean isOverwrite() { return overwrite_; }
 
   /**
@@ -916,6 +922,7 @@ public class InsertStmt extends StatementBase {
   public boolean hasNoClusteredHint() { return hasNoClusteredHint_; }
   public List<Expr> getPrimaryKeyExprs() { return primaryKeyExprs_; }
   public List<Expr> getSortExprs() { return sortExprs_; }
+  public long getWriteId() { return writeId_; }
 
   // Clustering is enabled by default. If the table has a 'sort.columns' property and the
   // query has a 'noclustered' hint, we issue a warning during analysis and ignore the
@@ -936,7 +943,7 @@ public class InsertStmt extends StatementBase {
     Preconditions.checkState(table_ != null);
     return TableSink.create(table_, isUpsert_ ? TableSink.Op.UPSERT : TableSink.Op.INSERT,
         partitionKeyExprs_, mentionedColumns_, overwrite_, requiresClustering(),
-        sortColumns_);
+        sortColumns_, writeId_);
   }
 
   /**
diff --git a/fe/src/main/java/org/apache/impala/analysis/LoadDataStmt.java b/fe/src/main/java/org/apache/impala/analysis/LoadDataStmt.java
index 4f3f90b..ba3d01a 100644
--- a/fe/src/main/java/org/apache/impala/analysis/LoadDataStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/LoadDataStmt.java
@@ -110,6 +110,7 @@ public class LoadDataStmt extends StatementBase {
           dbName_ + "." + getTbl());
     }
     analyzer.checkTableCapability(table, Analyzer.OperationType.WRITE);
+    analyzer.ensureTableNotTransactional(table);
 
     // Analyze the partition spec, if one was specified.
     if (partitionSpec_ != null) {
diff --git a/fe/src/main/java/org/apache/impala/analysis/TruncateStmt.java b/fe/src/main/java/org/apache/impala/analysis/TruncateStmt.java
index 60d6bf2..15c0a3e 100644
--- a/fe/src/main/java/org/apache/impala/analysis/TruncateStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/TruncateStmt.java
@@ -68,6 +68,7 @@ public class TruncateStmt extends StatementBase {
           "TRUNCATE TABLE not supported on non-HDFS table: %s", table_.getFullName()));
     }
     analyzer.checkTableCapability(table_, Analyzer.OperationType.WRITE);
+    analyzer.ensureTableNotTransactional(table_);
   }
 
   @Override
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 99eb943..ba9738c 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -937,9 +937,8 @@ public class HdfsTable extends Table implements FeFsTable {
                 MetaStoreUtil.getNullPartitionKeyValue(client).intern();
             loadSchema(msTbl);
             loadAllColumnStats(client);
-            //TODO writeIDs may also be loaded in other code paths.
-            loadValidWriteIdList(client);
         }
+        loadValidWriteIdList(client);
         // Load partition and file metadata
         if (reuseMetadata) {
           // Incrementally update this table's partitions and file metadata
diff --git a/fe/src/main/java/org/apache/impala/common/TransactionException.java b/fe/src/main/java/org/apache/impala/common/TransactionException.java
new file mode 100644
index 0000000..6bd7a84
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/common/TransactionException.java
@@ -0,0 +1,37 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.common;
+
+/**
+ * Thrown for errors that occur when interacting with ACID transactions,
+ * e.g. failures to open, commit, or abort a transaction. Or, failing to
+ * allocate a write id.
+ */
+public class TransactionException extends ImpalaException {
+  public TransactionException(String msg, Throwable cause) {
+    super(msg, cause);
+  }
+
+  public TransactionException(String msg) {
+    super(msg);
+  }
+
+  public TransactionException(Throwable cause) {
+    super(cause);
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java b/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
index e313464..ea7fc3d 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
@@ -67,14 +67,19 @@ public class HdfsTableSink extends TableSink {
   // populate the RowGroup::sorting_columns list in parquet files.
   private List<Integer> sortColumns_ = new ArrayList<>();
 
+  // Stores the allocated write id if the target table is transactional, otherwise -1.
+  private long writeId_;
+
   public HdfsTableSink(FeTable targetTable, List<Expr> partitionKeyExprs,
-      boolean overwrite, boolean inputIsClustered, List<Integer> sortColumns) {
+      boolean overwrite, boolean inputIsClustered, List<Integer> sortColumns,
+      long writeId) {
     super(targetTable, Op.INSERT);
     Preconditions.checkState(targetTable instanceof FeFsTable);
     partitionKeyExprs_ = partitionKeyExprs;
     overwrite_ = overwrite;
     inputIsClustered_ = inputIsClustered;
     sortColumns_ = sortColumns;
+    writeId_ = writeId;
   }
 
   @Override
@@ -191,6 +196,8 @@ public class HdfsTableSink extends TableSink {
       hdfsTableSink.setSkip_header_line_count(skipHeaderLineCount);
     }
     hdfsTableSink.setSort_columns(sortColumns_);
+    if (writeId_ != -1) hdfsTableSink.setWrite_id(writeId_);
+
     TTableSink tTableSink = new TTableSink(DescriptorTable.TABLE_SINK_ID,
         TTableSinkType.HDFS, sinkOp_.toThrift());
     tTableSink.hdfs_table_sink = hdfsTableSink;
diff --git a/fe/src/main/java/org/apache/impala/planner/TableSink.java b/fe/src/main/java/org/apache/impala/planner/TableSink.java
index 12cc4b0..c85cd74 100644
--- a/fe/src/main/java/org/apache/impala/planner/TableSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/TableSink.java
@@ -93,6 +93,17 @@ public abstract class TableSink extends DataSink {
   public static TableSink create(FeTable table, Op sinkAction,
       List<Expr> partitionKeyExprs,  List<Integer> referencedColumns,
       boolean overwrite, boolean inputIsClustered, List<Integer> sortColumns) {
+    return create(table, sinkAction, partitionKeyExprs, referencedColumns, overwrite,
+        inputIsClustered, sortColumns, -1);
+  }
+
+  /**
+   * Same as above, plus it takes an ACID write id in parameter 'writeId'.
+   */
+  public static TableSink create(FeTable table, Op sinkAction,
+      List<Expr> partitionKeyExprs,  List<Integer> referencedColumns,
+      boolean overwrite, boolean inputIsClustered, List<Integer> sortColumns,
+      long writeId) {
     Preconditions.checkNotNull(partitionKeyExprs);
     Preconditions.checkNotNull(referencedColumns);
     Preconditions.checkNotNull(sortColumns);
@@ -102,7 +113,7 @@ public abstract class TableSink extends DataSink {
       // Referenced columns don't make sense for an Hdfs table.
       Preconditions.checkState(referencedColumns.isEmpty());
       return new HdfsTableSink(table, partitionKeyExprs, overwrite, inputIsClustered,
-          sortColumns);
+          sortColumns, writeId);
     } else if (table instanceof FeHBaseTable) {
       // HBase only supports inserts.
       Preconditions.checkState(sinkAction == Op.INSERT);
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index cf0d0fc..1d1fc15 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -20,6 +20,7 @@ package org.apache.impala.service;
 import static org.apache.impala.analysis.Analyzer.ACCESSTYPE_READWRITE;
 
 import com.google.common.collect.Iterables;
+
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -98,6 +99,7 @@ import org.apache.impala.common.InternalException;
 import org.apache.impala.common.JniUtil;
 import org.apache.impala.common.Pair;
 import org.apache.impala.common.Reference;
+import org.apache.impala.common.TransactionException;
 import org.apache.impala.compat.MetastoreShim;
 import org.apache.impala.thrift.JniCatalogConstants;
 import org.apache.impala.thrift.TAlterDbParams;
@@ -184,6 +186,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+
 import org.slf4j.LoggerFactory;
 
 /**
@@ -3840,7 +3843,14 @@ public class CatalogOpExecutor {
         response.getResult().setStatus(
             new TStatus(TErrorCode.OK, new ArrayList<String>()));
       }
-
+      // Commit transactional inserts on success. We don't abort the transaction
+      // here in case of failures, because the client, i.e. query coordinator, is
+      // always responsible for aborting transactions when queries hit errors.
+      if (update.isSetTransaction_id()) {
+        if (response.getResult().getStatus().getStatus_code() == TErrorCode.OK) {
+          commitTransaction(update.getTransaction_id());
+        }
+      }
       loadTableMetadata(table, newCatalogVersion, true, false, partsToLoadMetadata,
           "INSERT");
       // After loading metadata, fire insert events if external event processing is
@@ -4190,4 +4200,15 @@ public class CatalogOpExecutor {
           "lock contention.", type, tbl.getFullName()));
     }
   }
+
+  /**
+   * Commits ACID transaction with given transaction id.
+   * @param transactionId is the id of the transaction.
+   * @throws TransactionException
+   */
+  private void commitTransaction(long transactionId) throws TransactionException {
+    try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+      MetastoreShim.commitTransaction(msClient.getHiveClient(), transactionId);
+    }
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index 5c4c8dc..492f5b0 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -33,6 +33,9 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.impala.analysis.AlterDbStmt;
 import org.apache.impala.analysis.AnalysisContext;
 import org.apache.impala.analysis.AnalysisContext.AnalysisResult;
@@ -86,12 +89,15 @@ import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.Function;
 import org.apache.impala.catalog.ImpaladCatalog;
 import org.apache.impala.catalog.ImpaladTableUsageTracker;
+import org.apache.impala.catalog.MetaStoreClientPool;
+import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.catalog.local.InconsistentMetadataFetchException;
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.InternalException;
 import org.apache.impala.common.NotImplementedException;
+import org.apache.impala.common.TransactionException;
 import org.apache.impala.compat.MetastoreShim;
 import org.apache.impala.hooks.QueryCompleteContext;
 import org.apache.impala.hooks.QueryEventHook;
@@ -141,6 +147,7 @@ import org.apache.impala.thrift.TTableName;
 import org.apache.impala.thrift.TUpdateCatalogCacheRequest;
 import org.apache.impala.thrift.TUpdateCatalogCacheResponse;
 import org.apache.impala.thrift.TUpdateExecutorMembershipRequest;
+import org.apache.impala.util.AcidUtils;
 import org.apache.impala.util.EventSequence;
 import org.apache.impala.util.ExecutorMembershipSnapshot;
 import org.apache.impala.util.PatternMatcher;
@@ -247,6 +254,9 @@ public class Frontend {
 
   private final QueryEventHookManager queryHookManager_;
 
+  // Stores metastore clients for direct accesses to HMS.
+  private final MetaStoreClientPool metaStoreClientPool_;
+
   public Frontend(AuthorizationFactory authzFactory) throws ImpalaException {
     this(authzFactory, FeCatalogManager.createFromBackendConfig());
   }
@@ -279,6 +289,7 @@ public class Frontend {
     impaladTableUsageTracker_ = ImpaladTableUsageTracker.createFromConfig(
         BackendConfig.INSTANCE);
     queryHookManager_ = QueryEventHookManager.createFromConfig(BackendConfig.INSTANCE);
+    metaStoreClientPool_ = new MetaStoreClientPool(1, 0);
   }
 
   public FeCatalog getCatalog() { return catalogManager_.getOrCreateCatalog(); }
@@ -1254,108 +1265,131 @@ public class Frontend {
 
     // Analyze and authorize stmt
     AnalysisContext analysisCtx = new AnalysisContext(queryCtx, authzFactory_, timeline);
-    AnalysisResult analysisResult =
-        analysisCtx.analyzeAndAuthorize(stmt, stmtTableCache, authzChecker_.get());
+    AnalysisResult analysisResult = analysisCtx.analyzeAndAuthorize(stmt, stmtTableCache,
+        authzChecker_.get());
     LOG.info("Analysis and authorization finished.");
     Preconditions.checkNotNull(analysisResult.getStmt());
-
     TExecRequest result = createBaseExecRequest(queryCtx, analysisResult);
 
-    TQueryOptions queryOptions = queryCtx.client_request.query_options;
-    if (analysisResult.isCatalogOp()) {
-      result.stmt_type = TStmtType.DDL;
-      createCatalogOpRequest(analysisResult, result);
-      TLineageGraph thriftLineageGraph = analysisResult.getThriftLineageGraph();
-      if (thriftLineageGraph != null && thriftLineageGraph.isSetQuery_text()) {
-        result.catalog_op_request.setLineage_graph(thriftLineageGraph);
-      }
-      setMtDopForCatalogOp(analysisResult, queryOptions);
-      // All DDL operations except for CTAS are done with analysis at this point.
-      if (!analysisResult.isCreateTableAsSelectStmt()) {
+    try {
+      TQueryOptions queryOptions = queryCtx.client_request.query_options;
+      if (analysisResult.isCatalogOp()) {
+        result.stmt_type = TStmtType.DDL;
+        createCatalogOpRequest(analysisResult, result);
+        TLineageGraph thriftLineageGraph = analysisResult.getThriftLineageGraph();
+        if (thriftLineageGraph != null && thriftLineageGraph.isSetQuery_text()) {
+          result.catalog_op_request.setLineage_graph(thriftLineageGraph);
+        }
+        setMtDopForCatalogOp(analysisResult, queryOptions);
+        // All DDL operations except for CTAS are done with analysis at this point.
+        if (!analysisResult.isCreateTableAsSelectStmt()) {
+          return result;
+        }
+      } else if (analysisResult.isInsertStmt() ||
+          analysisResult.isCreateTableAsSelectStmt()) {
+        InsertStmt insertStmt = analysisResult.getInsertStmt();
+        FeTable targetTable = insertStmt.getTargetTable();
+        if (AcidUtils.isTransactionalTable(
+            targetTable.getMetaStoreTable().getParameters())) {
+          // TODO (IMPALA-8788): should load table write ids in transaction context.
+          long txnId = openTransaction();
+          queryCtx.setTransaction_id(txnId);
+          timeline.markEvent("Transaction opened");
+          Long writeId = allocateWriteIdIfNeeded(queryCtx, targetTable);
+          if (writeId != null) insertStmt.setWriteId(writeId);
+        }
+      } else if (analysisResult.isLoadDataStmt()) {
+        result.stmt_type = TStmtType.LOAD;
+        result.setResult_set_metadata(new TResultSetMetadata(
+            Collections.singletonList(new TColumn("summary", Type.STRING.toThrift()))));
+        result.setLoad_data_request(analysisResult.getLoadDataStmt().toThrift());
         return result;
-      }
-    } else if (analysisResult.isLoadDataStmt()) {
-      result.stmt_type = TStmtType.LOAD;
-      result.setResult_set_metadata(new TResultSetMetadata(
-          Collections.singletonList(new TColumn("summary", Type.STRING.toThrift()))));
-      result.setLoad_data_request(analysisResult.getLoadDataStmt().toThrift());
-      return result;
-    } else if (analysisResult.isSetStmt()) {
-      result.stmt_type = TStmtType.SET;
-      result.setResult_set_metadata(new TResultSetMetadata(Arrays.asList(
-          new TColumn("option", Type.STRING.toThrift()),
-          new TColumn("value", Type.STRING.toThrift()),
-          new TColumn("level", Type.STRING.toThrift()))));
-      result.setSet_query_option_request(analysisResult.getSetStmt().toThrift());
-      return result;
-    } else if (analysisResult.isAdminFnStmt()) {
-      result.stmt_type = TStmtType.ADMIN_FN;
-      result.setResult_set_metadata(new TResultSetMetadata(Arrays.asList(
-          new TColumn("summary", Type.STRING.toThrift()))));
-      result.setAdmin_request(analysisResult.getAdminFnStmt().toThrift());
-      return result;
-    } else if (analysisResult.isTestCaseStmt()) {
-      CopyTestCaseStmt testCaseStmt = ((CopyTestCaseStmt) stmt);
-      if (testCaseStmt.isTestCaseExport()) {
-        result.setStmt_type(TStmtType.TESTCASE);
+      } else if (analysisResult.isSetStmt()) {
+        result.stmt_type = TStmtType.SET;
         result.setResult_set_metadata(new TResultSetMetadata(Arrays.asList(
-          new TColumn("Test case data output path", Type.STRING.toThrift()))));
-        result.setTestcase_data_path(testCaseStmt.writeTestCaseData());
-      } else {
-        // Mimic it as a DDL.
-        result.setStmt_type(TStmtType.DDL);
-        createCatalogOpRequest(analysisResult, result);
+            new TColumn("option", Type.STRING.toThrift()),
+            new TColumn("value", Type.STRING.toThrift()),
+            new TColumn("level", Type.STRING.toThrift()))));
+        result.setSet_query_option_request(analysisResult.getSetStmt().toThrift());
+        return result;
+      } else if (analysisResult.isAdminFnStmt()) {
+        result.stmt_type = TStmtType.ADMIN_FN;
+        result.setResult_set_metadata(new TResultSetMetadata(Arrays.asList(
+            new TColumn("summary", Type.STRING.toThrift()))));
+        result.setAdmin_request(analysisResult.getAdminFnStmt().toThrift());
+        return result;
+      } else if (analysisResult.isTestCaseStmt()) {
+        CopyTestCaseStmt testCaseStmt = ((CopyTestCaseStmt) stmt);
+        if (testCaseStmt.isTestCaseExport()) {
+          result.setStmt_type(TStmtType.TESTCASE);
+          result.setResult_set_metadata(new TResultSetMetadata(Arrays.asList(
+            new TColumn("Test case data output path", Type.STRING.toThrift()))));
+          result.setTestcase_data_path(testCaseStmt.writeTestCaseData());
+        } else {
+          // Mimic it as a DDL.
+          result.setStmt_type(TStmtType.DDL);
+          createCatalogOpRequest(analysisResult, result);
+        }
+        return result;
       }
-      return result;
-    }
 
-    // If unset, set MT_DOP to 0 to simplify the rest of the code.
-    if (!queryOptions.isSetMt_dop()) queryOptions.setMt_dop(0);
+      // If unset, set MT_DOP to 0 to simplify the rest of the code.
+      if (!queryOptions.isSetMt_dop()) queryOptions.setMt_dop(0);
 
-    // create TQueryExecRequest
-    TQueryExecRequest queryExecRequest =
-        getPlannedExecRequest(planCtx, analysisResult, timeline);
+      // create TQueryExecRequest
+      TQueryExecRequest queryExecRequest =
+          getPlannedExecRequest(planCtx, analysisResult, timeline);
 
-    TLineageGraph thriftLineageGraph = analysisResult.getThriftLineageGraph();
-    if (thriftLineageGraph != null && thriftLineageGraph.isSetQuery_text()) {
-      queryExecRequest.setLineage_graph(thriftLineageGraph);
-    }
+      TLineageGraph thriftLineageGraph = analysisResult.getThriftLineageGraph();
+      if (thriftLineageGraph != null && thriftLineageGraph.isSetQuery_text()) {
+        queryExecRequest.setLineage_graph(thriftLineageGraph);
+      }
 
-    // Override the per_host_mem_estimate sent to the backend if needed. The explain
-    // string is already generated at this point so this does not change the estimate
-    // shown in the plan.
-    checkAndOverrideMemEstimate(queryExecRequest, queryOptions);
+      // Override the per_host_mem_estimate sent to the backend if needed. The explain
+      // string is already generated at this point so this does not change the estimate
+      // shown in the plan.
+      checkAndOverrideMemEstimate(queryExecRequest, queryOptions);
 
-    if (analysisResult.isExplainStmt()) {
-      // Return the EXPLAIN request
-      createExplainRequest(planCtx.getExplainString(), result);
-      return result;
-    }
+      if (analysisResult.isExplainStmt()) {
+        // Return the EXPLAIN request
+        createExplainRequest(planCtx.getExplainString(), result);
+        return result;
+      }
 
-    result.setQuery_exec_request(queryExecRequest);
-    if (analysisResult.isQueryStmt()) {
-      result.stmt_type = TStmtType.QUERY;
-      result.query_exec_request.stmt_type = result.stmt_type;
-      // fill in the metadata
-      result.setResult_set_metadata(createQueryResultSetMetadata(analysisResult));
-    } else if (analysisResult.isInsertStmt() ||
-        analysisResult.isCreateTableAsSelectStmt()) {
-      // For CTAS the overall TExecRequest statement type is DDL, but the
-      // query_exec_request should be DML
-      result.stmt_type =
-          analysisResult.isCreateTableAsSelectStmt() ? TStmtType.DDL : TStmtType.DML;
-      result.query_exec_request.stmt_type = TStmtType.DML;
-      // create finalization params of insert stmt
-      addFinalizationParamsForInsert(
-          queryCtx, queryExecRequest, analysisResult.getInsertStmt());
-    } else {
-      Preconditions.checkState(
-          analysisResult.isUpdateStmt() || analysisResult.isDeleteStmt());
-      result.stmt_type = TStmtType.DML;
-      result.query_exec_request.stmt_type = TStmtType.DML;
+      result.setQuery_exec_request(queryExecRequest);
+      if (analysisResult.isQueryStmt()) {
+        result.stmt_type = TStmtType.QUERY;
+        result.query_exec_request.stmt_type = result.stmt_type;
+        // fill in the metadata
+        result.setResult_set_metadata(createQueryResultSetMetadata(analysisResult));
+      } else if (analysisResult.isInsertStmt() ||
+          analysisResult.isCreateTableAsSelectStmt()) {
+        // For CTAS the overall TExecRequest statement type is DDL, but the
+        // query_exec_request should be DML
+        result.stmt_type =
+            analysisResult.isCreateTableAsSelectStmt() ? TStmtType.DDL : TStmtType.DML;
+        result.query_exec_request.stmt_type = TStmtType.DML;
+        // create finalization params of insert stmt
+        addFinalizationParamsForInsert(
+            queryCtx, queryExecRequest, analysisResult.getInsertStmt());
+      } else {
+        Preconditions.checkState(
+            analysisResult.isUpdateStmt() || analysisResult.isDeleteStmt());
+        result.stmt_type = TStmtType.DML;
+        result.query_exec_request.stmt_type = TStmtType.DML;
+      }
+      return result;
+    } catch (Exception e) {
+      if (queryCtx.isSetTransaction_id()) {
+        try {
+          abortTransaction(queryCtx.getTransaction_id());
+          timeline.markEvent("Transaction aborted");
+        } catch (TransactionException te) {
+          LOG.error("Could not abort transaction because: " + te.getMessage());
+        }
+      }
+      throw e;
     }
-
-    return result;
   }
 
   /**
@@ -1402,6 +1436,11 @@ public class Frontend {
       finalizeParams.setHdfs_base_dir(hdfsTable.getHdfsBaseDir());
       finalizeParams.setStaging_dir(
           hdfsTable.getHdfsBaseDir() + "/_impala_insert_staging");
+      if (insertStmt.getWriteId() != -1) {
+        Preconditions.checkState(queryCtx.isSetTransaction_id());
+        finalizeParams.setTransaction_id(queryCtx.getTransaction_id());
+        finalizeParams.setWrite_id(insertStmt.getWriteId());
+      }
       queryExecRequest.setFinalize_params(finalizeParams);
     }
   }
@@ -1598,4 +1637,49 @@ public class Frontend {
     final List<Future<QueryEventHook>> futures
         = this.queryHookManager_.executeQueryCompleteHooks(context);
   }
+
+  /**
+   * Opens a new transaction.
+   * @return the transaction id.
+   * @throws TransactionException
+   */
+  private long openTransaction() throws TransactionException {
+    IMetaStoreClient hmsClient = metaStoreClientPool_.getClient().getHiveClient();
+    return MetastoreShim.openTransaction(hmsClient, "Impala");
+  }
+
+  /**
+   * Aborts a transaction.
+   * @param transactionId is the id of the transaction to abort.
+   * @throws TransactionException
+   * TODO: maybe we should make it async.
+   */
+  public void abortTransaction(long transactionId) throws TransactionException {
+    Long txnId = transactionId;
+    LOG.error("Aborting transaction: " + txnId.toString());
+    IMetaStoreClient hmsClient = metaStoreClientPool_.getClient().getHiveClient();
+    MetastoreShim.abortTransaction(hmsClient, transactionId);
+  }
+
+  /**
+   * Checks whether we should allocate a write id for 'table'. If so, it does the
+   * allocation and returns the write id, otherwise returns null.
+   * @param queryCtx the query context that contains the transaction id.
+   * @param table the target table of the write operation.
+   * @return the allocated write id, or null
+   * @throws TransactionException
+   */
+  private Long allocateWriteIdIfNeeded(TQueryCtx queryCtx, FeTable table)
+      throws TransactionException {
+    if (!queryCtx.isSetTransaction_id()) return null;
+    if (!(table instanceof FeFsTable)) return null;
+    if (!AcidUtils.isTransactionalTable(table.getMetaStoreTable().getParameters())) {
+      return null;
+    }
+
+    IMetaStoreClient hmsClient = this.metaStoreClientPool_.getClient().getHiveClient();
+    long txnId = queryCtx.getTransaction_id();
+    return MetastoreShim.allocateTableWriteId(hmsClient, txnId, table.getDb().getName(),
+        table.getName());
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/service/JniFrontend.java b/fe/src/main/java/org/apache/impala/service/JniFrontend.java
index 139bb74..c73ca06 100644
--- a/fe/src/main/java/org/apache/impala/service/JniFrontend.java
+++ b/fe/src/main/java/org/apache/impala/service/JniFrontend.java
@@ -51,6 +51,7 @@ import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.InternalException;
 import org.apache.impala.common.JniUtil;
+import org.apache.impala.common.TransactionException;
 import org.apache.impala.hooks.QueryCompleteContext;
 import org.apache.impala.service.Frontend.PlanCtx;
 import org.apache.impala.thrift.TBackendGflags;
@@ -637,6 +638,15 @@ public class JniFrontend {
   }
 
   /**
+   * Aborts a transaction.
+   * @param transactionId the id of the transaction to commit.
+   * @throws TransactionException
+   */
+  public void abortTransaction(long transactionId) throws TransactionException {
+    this.frontend_.abortTransaction(transactionId);
+  }
+
+  /**
    * Returns an error string describing configuration issue with the groups mapping
    * provider implementation.
    */
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java
index 8e3993c..ebca3cf 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java
@@ -565,10 +565,8 @@ public class AnalyzerTest extends FrontendTestBase {
     AnalyzesOk("insert into functional.testtbl select *,'test',1 " +
             "from functional.insert_only_transactional_table");
 
-    AnalysisError(
-        "insert into functional.insert_only_transactional_table select * " +
-          "from functional.insert_only_transactional_table",
-        insertOnlyErrorMsg);
+    AnalyzesOk("insert into functional.insert_only_transactional_table select * " +
+        "from functional.insert_only_transactional_table");
 
     AnalysisError(
         "compute stats functional_orc_def.full_transactional_table",
diff --git a/testdata/datasets/functional/functional_schema_template.sql b/testdata/datasets/functional/functional_schema_template.sql
index d460eaa..ab59a5c 100644
--- a/testdata/datasets/functional/functional_schema_template.sql
+++ b/testdata/datasets/functional/functional_schema_template.sql
@@ -2649,3 +2649,26 @@ date_part DATE
 id_col INT
 date_col DATE
 ====
+---- DATASET
+functional
+---- BASE_TABLE_NAME
+insertonly_nopart_insert
+---- HIVE_MAJOR_VERSION
+3
+---- CREATE
+CREATE TABLE IF NOT EXISTS {db_name}{db_suffix}.{table_name} (i int)
+STORED AS {file_format}
+TBLPROPERTIES('transactional'='true', 'transactional_properties'='insert_only');
+====
+---- DATASET
+functional
+---- BASE_TABLE_NAME
+insertonly_part_insert
+---- HIVE_MAJOR_VERSION
+3
+---- CREATE
+CREATE TABLE IF NOT EXISTS {db_name}{db_suffix}.{table_name} (i int)
+PARTITIONED BY (p int)
+STORED AS {file_format}
+TBLPROPERTIES('transactional'='true', 'transactional_properties'='insert_only');
+====
diff --git a/testdata/datasets/functional/schema_constraints.csv b/testdata/datasets/functional/schema_constraints.csv
index 0c156c9..2099035 100644
--- a/testdata/datasets/functional/schema_constraints.csv
+++ b/testdata/datasets/functional/schema_constraints.csv
@@ -248,6 +248,10 @@ table_name:full_transactional_table, constraint:restrict_to, table_format:orc/de
 # Insert-only transactional tables only work for file-format based tables
 table_name:insert_only_transactional_table, constraint:exclude, table_format:hbase/none/none
 table_name:insert_only_transactional_table, constraint:exclude, table_format:kudu/none/none
+table_name:insertonly_nopart_insert, constraint:restrict_to, table_format:text/none/none
+table_name:insertonly_nopart_insert, constraint:restrict_to, table_format:parquet/none/none
+table_name:insertonly_part_insert, constraint:restrict_to, table_format:text/none/none
+table_name:insertonly_part_insert, constraint:restrict_to, table_format:parquet/none/none
 
 # 'materialized_view' is based on 'insert_only_transactional_table' from the same
 # database, so it needs to be excluded where 'insert_only_transactional_table' is
diff --git a/testdata/workloads/functional-query/queries/QueryTest/acid-insert-fail.test b/testdata/workloads/functional-query/queries/QueryTest/acid-insert-fail.test
new file mode 100644
index 0000000..0093771
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/acid-insert-fail.test
@@ -0,0 +1,85 @@
+====
+---- QUERY
+create table insertonly_acid (i int)
+  tblproperties('transactional'='true', 'transactional_properties'='insert_only');
+insert into insertonly_acid values (1), (2);
+select * from insertonly_acid;
+---- RESULTS
+1
+2
+---- TYPES
+INT
+====
+---- QUERY
+set DEBUG_ACTION="FIS_FAIL_HDFS_TABLE_SINK_FLUSH_FINAL:FAIL@1.0";
+insert into insertonly_acid values (42);
+---- CATCH
+Query aborted:Debug Action: FIS_FAIL_HDFS_TABLE_SINK_FLUSH_FINAL:FAIL@1.0
+====
+---- QUERY
+select * from insertonly_acid;
+---- RESULTS
+1
+2
+---- TYPES
+INT
+====
+---- QUERY
+set DEBUG_ACTION="CLIENT_REQUEST_UPDATE_CATALOG:FAIL@1.0";
+insert into insertonly_acid values (42);
+---- CATCH
+Query aborted:Debug Action: CLIENT_REQUEST_UPDATE_CATALOG:FAIL@1.0
+====
+---- QUERY
+select * from insertonly_acid;
+---- RESULTS
+1
+2
+---- TYPES
+INT
+====
+---- QUERY
+create table part (n int)
+partitioned by (p int) tblproperties (
+  'transactional'='true',
+  'transactional_properties'='insert_only');
+insert into table part (p, n) select 1, 10;
+insert into table part (p, n) select 2, 20;
+select p, n from part;
+---- RESULTS
+1,10
+2,20
+----
+---- TYPES
+INT,INT
+====
+---- QUERY
+# Dynamic partition insert into existing and non-existing partitions.
+set DEBUG_ACTION="FIS_FAIL_HDFS_TABLE_SINK_FLUSH_FINAL:FAIL@1.0";
+insert into part (p, n) select cast(i + 1 as INT), 11 from insertonly_acid;
+---- CATCH
+Query aborted:Debug Action: FIS_FAIL_HDFS_TABLE_SINK_FLUSH_FINAL:FAIL@1.0
+====
+---- QUERY
+select p, n from part;
+---- RESULTS
+1,10
+2,20
+---- TYPES
+INT,INT
+====
+---- QUERY
+# Dynamic partition insert into existing and non-existing partitions.
+set DEBUG_ACTION="CLIENT_REQUEST_UPDATE_CATALOG:FAIL@1.0";
+insert into part (p, n) select cast(i + 1 as INT), 11 from insertonly_acid;
+---- CATCH
+Query aborted:Debug Action: CLIENT_REQUEST_UPDATE_CATALOG:FAIL@1.0
+====
+---- QUERY
+select p, n from part;
+---- RESULTS
+1,10
+2,20
+---- TYPES
+INT,INT
+====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/acid-insert.test b/testdata/workloads/functional-query/queries/QueryTest/acid-insert.test
new file mode 100644
index 0000000..3e0baae
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/acid-insert.test
@@ -0,0 +1,129 @@
+====
+---- SETUP
+RESET insertonly_nopart_insert
+---- QUERY
+insert into insertonly_nopart_insert values (1), (2);
+====
+---- QUERY
+select i from insertonly_nopart_insert order by i;
+---- RESULTS
+1
+2
+---- TYPES
+INT
+====
+---- QUERY
+insert into insertonly_nopart_insert values (3);
+====
+---- QUERY
+select i from insertonly_nopart_insert order by i;
+---- RESULTS
+1
+2
+3
+---- TYPES
+INT
+====
+---- QUERY
+insert overwrite insertonly_nopart_insert values (10);
+====
+---- QUERY
+select i from insertonly_nopart_insert order by i;
+---- RESULTS
+10
+---- TYPES
+INT
+====
+---- QUERY
+insert overwrite insertonly_nopart_insert select 100;
+====
+---- QUERY
+select i from insertonly_nopart_insert order by i;
+---- RESULTS
+100
+---- TYPES
+INT
+====
+---- QUERY
+insert overwrite insertonly_nopart_insert
+select * from insertonly_nopart_insert limit 0;
+====
+---- QUERY
+select i from insertonly_nopart_insert order by i;
+---- RESULTS
+---- TYPES
+INT
+====
+---- SETUP
+RESET insertonly_part_insert
+---- QUERY
+insert into insertonly_part_insert partition (p=1) values (10), (11);
+insert into insertonly_part_insert partition (p=2) values (20);
+====
+---- QUERY
+select p, i from insertonly_part_insert order by i;
+---- RESULTS
+1,10
+1,11
+2,20
+---- TYPES
+INT,INT
+====
+---- QUERY
+insert into insertonly_part_insert partition (p=2) values (21);
+insert into insertonly_part_insert partition (p=3) values (30);
+====
+---- QUERY
+select p, i from insertonly_part_insert order by i;
+---- RESULTS
+1,10
+1,11
+2,20
+2,21
+3,30
+---- TYPES
+INT,INT
+====
+---- QUERY
+insert overwrite insertonly_part_insert partition (p=2) values (22);
+insert overwrite insertonly_part_insert partition (p=3) values (31);
+====
+---- QUERY
+select p, i from insertonly_part_insert order by i;
+---- RESULTS
+1,10
+1,11
+2,22
+3,31
+---- TYPES
+INT,INT
+====
+---- QUERY
+insert overwrite insertonly_part_insert partition (p=1)
+select * from insertonly_nopart_insert limit 0;
+insert overwrite insertonly_part_insert partition (p=2)
+select * from insertonly_nopart_insert limit 0;
+====
+---- QUERY
+select p, i from insertonly_part_insert order by i;
+---- RESULTS
+3,31
+---- TYPES
+INT,INT
+====
+---- QUERY
+insert overwrite insertonly_part_insert partition (p)
+values (1000, 1), (2000, 2), (4000, 4), (5000, 5), (5001, 5);
+====
+---- QUERY
+select p, i from insertonly_part_insert order by p, i;
+---- RESULTS
+1,1000
+2,2000
+3,31
+4,4000
+5,5000
+5,5001
+---- TYPES
+INT,INT
+====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/acid-negative.test b/testdata/workloads/functional-query/queries/QueryTest/acid-negative.test
index 8ac3812..655d2fc 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/acid-negative.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/acid-negative.test
@@ -1,10 +1,5 @@
 ====
 ---- QUERY
-insert into functional.insert_only_transactional_table values (1);
----- CATCH
-AnalysisException: Table functional.insert_only_transactional_table not supported. Transactional (ACID) tables are only supported for read.
-====
----- QUERY
 alter table functional.insert_only_transactional_table change column x y bigint;
 ---- CATCH
 AnalysisException: Table functional.insert_only_transactional_table not supported. Transactional (ACID) tables are only supported for read.
diff --git a/testdata/workloads/functional-query/queries/QueryTest/acid-nonacid-insert.test b/testdata/workloads/functional-query/queries/QueryTest/acid-nonacid-insert.test
new file mode 100644
index 0000000..7fd9fa7
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/acid-nonacid-insert.test
@@ -0,0 +1,69 @@
+====
+---- QUERY
+create table insertonly_acid (i int)
+  tblproperties('transactional'='true', 'transactional_properties'='insert_only');
+insert into insertonly_acid values (1), (2);
+create table plain (i int);
+====
+---- QUERY
+# INSERT from ACID to PLAIN
+insert into plain select * from insertonly_acid;
+select * from plain;
+---- RESULTS
+1
+2
+---- TYPES
+INT
+====
+---- QUERY
+# INSERT from PLAIN to ACID
+insert into insertonly_acid select cast(i + 2 as int) from plain;
+select * from insertonly_acid;
+---- RESULTS
+1
+2
+3
+4
+---- TYPES
+INT
+====
+---- QUERY
+# CTAS PLAIN from ACID
+create table plain_ctas as select * from insertonly_acid;
+select * from plain_ctas;
+---- RESULTS
+1
+2
+3
+4
+---- TYPES
+INT
+====
+---- QUERY
+# CTAS ACID from PLAIN
+create table insertonly_acid_ctas
+  tblproperties('transactional'='true', 'transactional_properties'='insert_only')
+  as select * from plain_ctas;
+select * from insertonly_acid_ctas;
+---- RESULTS
+1
+2
+3
+4
+---- TYPES
+INT
+====
+---- QUERY
+# CTAS ACID from ACID
+create table insertonly_acid_ctas_2
+  tblproperties('transactional'='true', 'transactional_properties'='insert_only')
+  as select * from insertonly_acid;
+select * from insertonly_acid_ctas_2;
+---- RESULTS
+1
+2
+3
+4
+---- TYPES
+INT
+====
diff --git a/tests/metadata/test_hms_integration.py b/tests/metadata/test_hms_integration.py
index 6ec085f..f7d2595 100644
--- a/tests/metadata/test_hms_integration.py
+++ b/tests/metadata/test_hms_integration.py
@@ -31,7 +31,8 @@ from subprocess import call
 
 from tests.common.environ import HIVE_MAJOR_VERSION
 from tests.common.impala_test_suite import ImpalaTestSuite
-from tests.common.skip import SkipIfS3, SkipIfABFS, SkipIfADLS, SkipIfIsilon, SkipIfLocal
+from tests.common.skip import (SkipIfS3, SkipIfABFS, SkipIfADLS, SkipIfHive2,
+    SkipIfIsilon, SkipIfLocal)
 from tests.common.test_dimensions import (
     create_single_exec_option_dimension,
     create_uncompressed_text_dimension)
@@ -659,6 +660,69 @@ class TestHmsIntegration(ImpalaTestSuite):
             self.client.execute, 'select * from %s' % table_name,
             "Column type: INT, Parquet schema:")
 
+  @SkipIfHive2.acid
+  def test_acid_inserts(self, vector, unique_database):
+    """
+    Insert data to insert-only ACID table from Impala and checks that Hive is able to
+    see the data.
+    """
+    table_name = "%s.acid_insert" % unique_database
+    self.client.execute(
+      "create table %s (i int) "
+      "TBLPROPERTIES('transactional'='true', "
+      "'transactional_properties'='insert_only')" % table_name)
+    self.client.execute("insert into table %s values (1)" % table_name)
+    assert '1' == self.run_stmt_in_hive("select * from %s" % table_name).split('\n')[1]
+    self.client.execute("insert into table %s values (2)" % table_name)
+    assert '2' == self.run_stmt_in_hive(
+      "select * from %s order by i" % table_name).split('\n')[2]
+    self.client.execute("insert overwrite table %s values (10)" % table_name)
+    assert '10' == self.run_stmt_in_hive("select * from %s" % table_name).split('\n')[1]
+    self.client.execute("insert into table %s values (11)" % table_name)
+    assert '11' == self.run_stmt_in_hive(
+      "select * from %s order by i" % table_name).split('\n')[2]
+    assert '2' == self.run_stmt_in_hive("select count(*) from %s"
+                                        % table_name).split('\n')[1]
+
+    # CTAS ACID table with Impala and select from Hive
+    ctas_table_name = "%s.acid_insert_ctas" % unique_database
+    self.client.execute(
+      "create table %s "
+      "TBLPROPERTIES('transactional'='true', "
+      "'transactional_properties'='insert_only') "
+      "as select * from %s" % (ctas_table_name, table_name))
+    assert '11' == self.run_stmt_in_hive(
+      "select * from %s order by i" % ctas_table_name).split('\n')[2]
+    assert '2' == self.run_stmt_in_hive("select count(*) from %s"
+                                        % ctas_table_name).split('\n')[1]
+
+    # Insert into partitioned ACID table
+    part_table_name = "%s.part_acid_insert" % unique_database
+    self.client.execute(
+      "create table %s (i int) partitioned by (p int)"
+      "TBLPROPERTIES('transactional'='true', "
+      "'transactional_properties'='insert_only')" % part_table_name)
+    self.client.execute("insert into %s partition (p=1) values (10)" % part_table_name)
+    self.client.execute("insert into %s partition (p=2) values (20)" % part_table_name)
+    hive_result = self.run_stmt_in_hive(
+      "select p, i from %s order by p, i" % part_table_name).split('\n')
+    assert '1,10' == hive_result[1]
+    assert '2,20' == hive_result[2]
+    self.client.execute(
+      "insert into %s partition (p) values (30,3),(40,4)" % part_table_name)
+    hive_result = self.run_stmt_in_hive(
+      "select p, i from %s order by p, i" % part_table_name).split('\n')
+    assert '3,30' == hive_result[3]
+    assert '4,40' == hive_result[4]
+    self.client.execute(
+      "insert overwrite %s partition (p) values (11,1),(41,4)" % part_table_name)
+    hive_result = self.run_stmt_in_hive(
+      "select p, i from %s order by p, i" % part_table_name).split('\n')
+    assert '1,11' == hive_result[1]
+    assert '2,20' == hive_result[2]
+    assert '3,30' == hive_result[3]
+    assert '4,41' == hive_result[4]
+
   @pytest.mark.execute_serially
   def test_change_table_name(self, vector):
     """
diff --git a/tests/query_test/test_insert.py b/tests/query_test/test_insert.py
index dd2c4f9..95df39b 100644
--- a/tests/query_test/test_insert.py
+++ b/tests/query_test/test_insert.py
@@ -25,7 +25,7 @@ from tests.common.impala_cluster import ImpalaCluster
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.parametrize import UniqueDatabase
 from tests.common.skip import SkipIfABFS, SkipIfEC, SkipIfLocal, \
-    SkipIfNotHdfsMinicluster, SkipIfS3, SkipIfDockerizedCluster
+    SkipIfHive2, SkipIfNotHdfsMinicluster, SkipIfS3, SkipIfDockerizedCluster
 from tests.common.test_dimensions import (
     create_exec_option_dimension,
     create_uncompressed_text_dimension)
@@ -137,6 +137,34 @@ class TestInsertQueries(ImpalaTestSuite):
         multiple_impalad=vector.get_value('exec_option')['sync_ddl'] == 1)
 
   @pytest.mark.execute_serially
+  @SkipIfHive2.acid
+  def test_acid_insert(self, vector):
+    if (vector.get_value('table_format').file_format == 'parquet'):
+      vector.get_value('exec_option')['COMPRESSION_CODEC'] = \
+          vector.get_value('compression_codec')
+    # We need to turn off capability checks. Otherwise we get an error from HMS because
+    # this python client doesn't have the capability for handling ACID tables. But we only
+    # need to drop and create such tables, and table properties are preserved during
+    # those operations and this is enough for the tests (A table is ACID if it has the
+    # relevant table properties).
+    capability_check = self.hive_client.getMetaConf("metastore.client.capability.check")
+    self.hive_client.setMetaConf("metastore.client.capability.check", "false")
+    self.run_test_case('QueryTest/acid-insert', vector,
+        multiple_impalad=vector.get_value('exec_option')['sync_ddl'] == 1)
+    # Reset original state.
+    self.hive_client.setMetaConf("metastore.client.capability.check", capability_check)
+
+  @SkipIfHive2.acid
+  def test_acid_nonacid_insert(self, vector, unique_database):
+    self.run_test_case('QueryTest/acid-nonacid-insert', vector, unique_database,
+        multiple_impalad=vector.get_value('exec_option')['sync_ddl'] == 1)
+
+  @SkipIfHive2.acid
+  def test_acid_insert_fail(self, vector, unique_database):
+    self.run_test_case('QueryTest/acid-insert-fail', vector, unique_database,
+        multiple_impalad=vector.get_value('exec_option')['sync_ddl'] == 1)
+
+  @pytest.mark.execute_serially
   @SkipIfNotHdfsMinicluster.tuned_for_minicluster
   def test_insert_mem_limit(self, vector):
     if (vector.get_value('table_format').file_format == 'parquet'):