You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by bo...@apache.org on 2022/05/26 13:21:13 UTC

[impala] 03/03: IMPALA-10465: Use IGNORE variant of Kudu write operations

This is an automated email from the ASF dual-hosted git repository.

boroknagyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 4236c307b971881a3b1d85068db5b053a9c34cfa
Author: Riza Suminto <ri...@cloudera.com>
AuthorDate: Sun May 8 22:53:20 2022 -0700

    IMPALA-10465: Use IGNORE variant of Kudu write operations
    
    KUDU-1563 added support for INSERT_IGNORE, UPDATE_IGNORE, and
    DELETE_IGNORE to handle cases where users want to ignore primary key
    errors efficiently. Impala already does this today for its INSERT
    behavior. However, it does so by ignoring the per-row errors from Kudu
    client side. This requires a large error buffer (which may need to be
    expanded in rare cases) to log all of the warning messages which users
    often do not care about and causes significant RPC overhead.
    
    This patch change the Kudu write operation by Impala to use
    INSERT_IGNORE, UPDATE_IGNORE, and DELETE_IGNORE if Kudu cluster supports
    it and backend flag "kudu_ignore_conflicts" is true.
    
    We benchmark the change by doing insert and update query on modified
    tpch.lineitem table where we introduce conflicts for around half of the
    total rows being modified. The table below shows the performance
    difference after the patch:
    
    +----------------------+--------+-------------+------------+------------+----------------+
    | Query                | Avg(s) | Base Avg(s) | Delta(Avg) | StdDev(%)  | Base StdDev(%) |
    +----------------------+--------+-------------+------------+------------+----------------+
    | KUDU-IGNORE-3-UPDATE | 30.06  | 30.52       |   -1.53%   |   0.18%    |   0.58%        |
    | KUDU-IGNORE-2-INSERT | 48.91  | 71.09       | I -31.20%  |   0.60%    |   0.72%        |
    +----------------------+--------+-------------+------------+------------+----------------+
    
    Testing:
    - Pass core tests.
    
    Change-Id: I8da7c41d61b0888378b390b8b643238433eb3b52
    Reviewed-on: http://gerrit.cloudera.org:8080/18536
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/kudu-table-sink.cc                     |  65 ++++-
 be/src/exec/kudu-table-sink.h                      |  10 +
 .../org/apache/impala/planner/KuduTableSink.java   |  21 ++
 tests/custom_cluster/test_kudu.py                  | 320 ++++++++++++++++++---
 4 files changed, 380 insertions(+), 36 deletions(-)

diff --git a/be/src/exec/kudu-table-sink.cc b/be/src/exec/kudu-table-sink.cc
index d2a167d64..1ba1ba986 100644
--- a/be/src/exec/kudu-table-sink.cc
+++ b/be/src/exec/kudu-table-sink.cc
@@ -21,6 +21,7 @@
 
 #include <boost/bind.hpp>
 #include <kudu/client/client.h>
+#include <kudu/client/resource_metrics.h>
 #include <kudu/client/write_op.h>
 #include <thrift/protocol/TDebugProtocol.h>
 
@@ -53,6 +54,23 @@ DEFINE_int32(kudu_error_buffer_size, DEFAULT_KUDU_ERROR_BUFFER_SIZE,
     "The size (bytes) of the Kudu client buffer for returning errors, with a min of 1KB."
     "If the actual errors exceed this size the query will fail.");
 
+// IMPALA-10465: This change the behavior of KuduTableSink to use the IGNORE variant of
+// Kudu API rather than using the non-IGNORE and discarding the Kudu error messages
+// manually. Set 'false' to return to the old behavior.
+DEFINE_bool(kudu_ignore_conflicts, true,
+    "Control whether Impala should ignore Kudu conflict error on duplicate and absent "
+    "primary keys during write operations. If this flag is set to true and Kudu cluster "
+    "supports ignore operations, Impala will use {INSERT,UPDATE,DELETE}_IGNORE "
+    "operations of Kudu API. Otherwise, Impala will use regular {INSERT,UPDATE,DELETE} "
+    "operations and logs any occurrences of such conflict error. "
+    "See also kudu_ignore_conflicts_in_transaction flag.");
+
+DEFINE_bool(kudu_ignore_conflicts_in_transaction, false,
+    "Control whether Kudu transaction should ignore conflict error on duplicate and "
+    "absent primary keys during write operations. If set to true, the Kudu transaction "
+    "will not be aborted for hitting such a conflict error. This flag is only "
+    "considered if kudu_ignore_conflicts flag is true.");
+
 DECLARE_int32(kudu_operation_timeout_ms);
 
 using kudu::client::KuduColumnSchema;
@@ -64,6 +82,7 @@ using kudu::client::KuduTransaction;
 using kudu::client::KuduInsert;
 using kudu::client::KuduUpdate;
 using kudu::client::KuduError;
+using kudu::client::ResourceMetrics;
 
 namespace impala {
 
@@ -179,6 +198,10 @@ Status KuduTableSink::Open(RuntimeState* state) {
     session_ = client_->NewSession();
   }
 
+  ignore_conflicts_ =
+      FLAGS_kudu_ignore_conflicts && kudu_table_sink_.ignore_not_found_or_duplicate;
+  if (!ignore_conflicts_) profile()->AppendExecOption("Log Kudu Conflicts");
+
   session_->SetTimeoutMillis(FLAGS_kudu_operation_timeout_ms);
 
   // KuduSession Set* methods here and below return a status for API compatibility.
@@ -236,6 +259,20 @@ kudu::client::KuduWriteOperation* KuduTableSink::NewWriteOp() {
   }
 }
 
+kudu::client::KuduWriteOperation* KuduTableSink::NewWriteIgnoreOp() {
+  if (sink_action_ == TSinkAction::INSERT) {
+    return table_->NewInsertIgnore();
+  } else if (sink_action_ == TSinkAction::UPDATE) {
+    return table_->NewUpdateIgnore();
+  } else if (sink_action_ == TSinkAction::UPSERT) {
+    return table_->NewUpsert();
+  } else {
+    DCHECK(sink_action_ == TSinkAction::DELETE)
+        << "Sink type not supported: " << sink_action_;
+    return table_->NewDeleteIgnore();
+  }
+}
+
 Status KuduTableSink::Send(RuntimeState* state, RowBatch* batch) {
   SCOPED_TIMER(profile()->total_time_counter());
   expr_results_pool_->Clear();
@@ -249,10 +286,14 @@ Status KuduTableSink::Send(RuntimeState* state, RowBatch* batch) {
   // violations.
   int num_null_violations = 0;
 
+  RETURN_IF_ERROR(
+      DebugAction(state->query_options(), "FIS_KUDU_TABLE_SINK_WRITE_BEGIN"));
+
   // Since everything is set up just forward everything to the writer.
   for (int i = 0; i < batch->num_rows(); ++i) {
     TupleRow* current_row = batch->GetRow(i);
-    unique_ptr<kudu::client::KuduWriteOperation> write(NewWriteOp());
+    unique_ptr<kudu::client::KuduWriteOperation> write(
+        ignore_conflicts_ ? NewWriteIgnoreOp() : NewWriteOp());
     bool add_row = true;
 
     for (int j = 0; j < output_expr_evals_.size(); ++j) {
@@ -317,6 +358,27 @@ Status KuduTableSink::Send(RuntimeState* state, RowBatch* batch) {
 
 Status KuduTableSink::CheckForErrors(RuntimeState* state) {
   RETURN_IF_ERROR(state->CheckQueryState());
+
+  if (ignore_conflicts_) {
+    const ResourceMetrics& metrics = session_->GetWriteOpMetrics();
+    int64_t ignored_errors = metrics.GetMetric("insert_ignore_errors")
+        + metrics.GetMetric("update_ignore_errors")
+        + metrics.GetMetric("delete_ignore_errors");
+    DCHECK(ignored_errors >= total_ignored_errors_);
+    if (ignored_errors > total_ignored_errors_) {
+      total_ignored_errors_ = ignored_errors;
+      COUNTER_SET(num_row_errors_, total_ignored_errors_);
+      if (is_transactional_ && !FLAGS_kudu_ignore_conflicts_in_transaction) {
+        // Return general status error to abort transaction.
+        return Status("Kudu reported write operation errors during transaction.");
+      }
+    }
+  }
+
+  // Regardless of ignore_conflicts_ value, we still need to check for pending errors
+  // existence for the following possible scenarios:
+  // - sink_action_ is UPSERT.
+  // - Received other Kudu error such as timeouts or column constraint violations.
   if (session_->CountPendingErrors() == 0) return Status::OK();
 
   vector<KuduError*> errors;
@@ -380,6 +442,7 @@ Status KuduTableSink::FlushFinal(RuntimeState* state) {
 void KuduTableSink::Close(RuntimeState* state) {
   if (closed_) return;
   session_.reset();
+  total_ignored_errors_ = 0;
   mem_tracker_->Release(client_tracked_bytes_);
   txn_.reset();
   client_.reset();
diff --git a/be/src/exec/kudu-table-sink.h b/be/src/exec/kudu-table-sink.h
index 780d4eb8f..7b77ead4c 100644
--- a/be/src/exec/kudu-table-sink.h
+++ b/be/src/exec/kudu-table-sink.h
@@ -84,6 +84,9 @@ class KuduTableSink : public DataSink {
   /// Create a new write operation according to the sink type.
   kudu::client::KuduWriteOperation* NewWriteOp();
 
+  /// Create a new write ignore operation according to the sink type.
+  kudu::client::KuduWriteOperation* NewWriteIgnoreOp();
+
   /// Checks for any errors buffered in the Kudu session, and increments
   /// appropriate counters for ignored errors.
   //
@@ -138,6 +141,13 @@ class KuduTableSink : public DataSink {
 
   /// True if it's in Kudu transaction. It's valid only after Open() succeeds.
   bool is_transactional_ = false;
+
+  /// True if this sink should ignore duplicate and absent key conflicts during Kudu
+  /// write operations. It's valid only after Open() succeeds.
+  bool ignore_conflicts_ = false;
+
+  /// Number of ignored write error operations during the lifetime of 'session_'.
+  int64_t total_ignored_errors_ = 0;
 };
 
 }  // namespace impala
diff --git a/fe/src/main/java/org/apache/impala/planner/KuduTableSink.java b/fe/src/main/java/org/apache/impala/planner/KuduTableSink.java
index 21733ec17..eba08b5ce 100644
--- a/fe/src/main/java/org/apache/impala/planner/KuduTableSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/KuduTableSink.java
@@ -24,6 +24,7 @@ import java.util.List;
 import org.apache.impala.analysis.DescriptorTable;
 import org.apache.impala.analysis.Expr;
 import org.apache.impala.catalog.FeTable;
+import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.service.BackendConfig;
 import org.apache.impala.thrift.TDataSink;
 import org.apache.impala.thrift.TDataSinkType;
@@ -32,7 +33,12 @@ import org.apache.impala.thrift.TKuduTableSink;
 import org.apache.impala.thrift.TQueryOptions;
 import org.apache.impala.thrift.TTableSink;
 import org.apache.impala.thrift.TTableSinkType;
+import org.apache.impala.util.KuduUtil;
+import org.apache.kudu.client.KuduClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
 /**
@@ -40,6 +46,7 @@ import com.google.common.collect.Lists;
  * data from a plan fragment into an Kudu table using a Kudu client.
  */
 public class KuduTableSink extends TableSink {
+  private final static Logger LOG = LoggerFactory.getLogger(KuduTableSink.class);
 
   // Optional list of referenced Kudu table column indices. The position of a result
   // expression i matches a column index into the Kudu schema at targetColdIdxs[i].
@@ -49,6 +56,9 @@ public class KuduTableSink extends TableSink {
   // target table is Kudu table and transaction for Kudu is enabled.
   private java.nio.ByteBuffer txnToken_;
 
+  // Indicate whether Kudu cluster supports IGNORE write operations or not.
+  private boolean supportsIgnoreOperations_ = false;
+
   public KuduTableSink(FeTable targetTable, Op sinkOp, List<Integer> referencedColumns,
       List<Expr> outputExprs, java.nio.ByteBuffer txnToken) {
     super(targetTable, sinkOp, outputExprs);
@@ -56,6 +66,16 @@ public class KuduTableSink extends TableSink {
         ? Lists.newArrayList(referencedColumns) : null;
     txnToken_ =
         txnToken != null ? org.apache.thrift.TBaseHelper.copyBinary(txnToken) : null;
+
+    // Check if Kudu cluster supports IGNORE write operations.
+    Preconditions.checkState(targetTable instanceof FeKuduTable);
+    KuduClient client =
+        KuduUtil.getKuduClient(((FeKuduTable) targetTable).getKuduMasterHosts());
+    try {
+      supportsIgnoreOperations_ = client.supportsIgnoreOperations();
+    } catch (Exception e) {
+      LOG.error("Unable to check Kudu ignore operation support", e);
+    }
   }
 
   @Override
@@ -95,6 +115,7 @@ public class KuduTableSink extends TableSink {
     TKuduTableSink tKuduSink = new TKuduTableSink();
     tKuduSink.setReferenced_columns(targetColIdxs_);
     if (txnToken_ != null) tKuduSink.setKudu_txn_token(txnToken_);
+    tKuduSink.setIgnore_not_found_or_duplicate(supportsIgnoreOperations_);
     tTableSink.setKudu_table_sink(tKuduSink);
     tsink.table_sink = tTableSink;
     tsink.output_exprs = Expr.treesToThrift(outputExprs_);
diff --git a/tests/custom_cluster/test_kudu.py b/tests/custom_cluster/test_kudu.py
index b5959a442..eda2c0731 100644
--- a/tests/custom_cluster/test_kudu.py
+++ b/tests/custom_cluster/test_kudu.py
@@ -334,9 +334,9 @@ class TestKuduHMSIntegration(CustomKuduTest):
     self.run_test_case('QueryTest/kudu_hms_alter', vector, use_db=unique_database)
 
 
-class TestKuduTransaction(CustomClusterTestSuite):
+class TestKuduTransactionBase(CustomClusterTestSuite):
   """
-  This suite tests the Kudu transaction when inserting rows to kudu table.
+  This is a base class of other TestKuduTransaction classes.
   """
 
   # query to create Kudu table.
@@ -366,9 +366,7 @@ class TestKuduTransaction(CustomClusterTestSuite):
   def get_workload(cls):
     return 'functional-query'
 
-  @pytest.mark.execute_serially
-  @SkipIfKudu.no_hybrid_clock
-  def test_kudu_txn_succeed(self, cursor, unique_database):
+  def _test_kudu_txn_succeed(self, cursor, unique_database):
     # Create Kudu table.
     table_name = "%s.test_kudu_txn_succeed" % unique_database
     self.execute_query(self._create_kudu_table_query.format(table_name))
@@ -393,9 +391,7 @@ class TestKuduTransaction(CustomClusterTestSuite):
     cursor.execute(self._row_num_query.format(table_name))
     assert cursor.fetchall() == [(103,)]
 
-  @pytest.mark.execute_serially
-  @SkipIfKudu.no_hybrid_clock
-  def test_kudu_txn_not_implemented(self, cursor, unique_database):
+  def _test_kudu_txn_not_implemented(self, cursor, unique_database):
     # Create Kudu table.
     table_name = "%s.test_kudu_txn_succeed" % unique_database
     self.execute_query(self._create_kudu_table_query.format(table_name))
@@ -430,9 +426,8 @@ class TestKuduTransaction(CustomClusterTestSuite):
     cursor.execute(self._row_num_query.format(table_name))
     assert cursor.fetchall() == [(3,)]
 
-  @pytest.mark.execute_serially
-  @SkipIfKudu.no_hybrid_clock
-  def test_kudu_txn_abort_dup_key(self, cursor, unique_database):
+  def _test_kudu_txn_abort_dup_key(self, cursor, unique_database,
+      expect_fail_on_conflict, expected_error_msg):
     # Create Kudu table.
     table_name = "%s.test_kudu_txn_abort_dup_key" % unique_database
     self.execute_query(self._create_kudu_table_query.format(table_name))
@@ -442,11 +437,11 @@ class TestKuduTransaction(CustomClusterTestSuite):
     self.execute_query("set ENABLE_KUDU_TRANSACTION=true")
     try:
       self.execute_query(self._insert_dup_key_query.format(table_name))
-      assert False, "query was expected to fail"
+      assert (not expect_fail_on_conflict), "query was expected to fail"
     except ImpalaBeeswaxException as e:
-      assert "Key already present in Kudu table" in str(e)
+      assert expected_error_msg in str(e)
     cursor.execute(self._row_num_query.format(table_name))
-    assert cursor.fetchall() == [(0,)]
+    assert cursor.fetchall() == [(0 if expect_fail_on_conflict else 2,)]
 
     # Disable Kudu transactions and run the same query. Part of rows are inserted into
     # Kudu table.
@@ -470,11 +465,11 @@ class TestKuduTransaction(CustomClusterTestSuite):
     # Transaction should be aborted and no rows are inserted into Kudu table.
     try:
       self.execute_query(self._insert_select_query2.format(table_name, table_name2))
-      assert False, "query was expected to fail"
+      assert (not expect_fail_on_conflict), "query was expected to fail"
     except ImpalaBeeswaxException as e:
-      assert "Key already present in Kudu table" in str(e)
+      assert expected_error_msg in str(e)
     cursor.execute(self._row_num_query.format(table_name))
-    assert cursor.fetchall() == [(0,)]
+    assert cursor.fetchall() == [(0 if expect_fail_on_conflict else 2,)]
 
     # Disable Kudu transactions and run the same query. Part of rows are inserted into
     # Kudu table.
@@ -483,9 +478,8 @@ class TestKuduTransaction(CustomClusterTestSuite):
     cursor.execute(self._row_num_query.format(table_name))
     assert cursor.fetchall() == [(2,)]
 
-  @pytest.mark.execute_serially
-  @SkipIfKudu.no_hybrid_clock
-  def test_kudu_txn_ctas(self, cursor, unique_database):
+  def _test_kudu_txn_ctas(self, cursor, unique_database, expect_fail_on_conflict,
+      expected_error_msg):
     # Enable Kudu transactions
     self.execute_query("set ENABLE_KUDU_TRANSACTION=true")
 
@@ -514,11 +508,11 @@ class TestKuduTransaction(CustomClusterTestSuite):
     table_name4 = "%s.test_kudu_txn_ctas4" % unique_database
     try:
       self.execute_query(self._ctas_query.format(table_name4, table_name3))
-      assert False, "query was expected to fail"
+      assert (not expect_fail_on_conflict), "query was expected to fail"
     except ImpalaBeeswaxException as e:
-      assert "Key already present in Kudu table" in str(e)
+      assert expected_error_msg in str(e)
     cursor.execute(self._row_num_query.format(table_name4))
-    assert cursor.fetchall() == [(0,)]
+    assert cursor.fetchall() == [(0 if expect_fail_on_conflict else 2,)]
 
     # Disable Kudu transactions and run the same CTAS query. Part of rows are inserted
     # into Kudu table.
@@ -528,10 +522,7 @@ class TestKuduTransaction(CustomClusterTestSuite):
     cursor.execute(self._row_num_query.format(table_name5))
     assert cursor.fetchall() == [(2,)]
 
-  @pytest.mark.execute_serially
-  @SkipIfKudu.no_hybrid_clock
-  @SkipIfBuildType.not_dev_build
-  def test_kudu_txn_abort_row_batch(self, cursor, unique_database):
+  def _test_kudu_txn_abort_row_batch(self, cursor, unique_database):
     # Create Kudu table.
     table_name = "%s.test_kudu_txn_abort_row_batch" % unique_database
     self.execute_query(self._create_kudu_table_query.format(table_name))
@@ -549,10 +540,7 @@ class TestKuduTransaction(CustomClusterTestSuite):
     cursor.execute(self._row_num_query.format(table_name))
     assert cursor.fetchall() == [(0,)]
 
-  @pytest.mark.execute_serially
-  @SkipIfKudu.no_hybrid_clock
-  @SkipIfBuildType.not_dev_build
-  def test_kudu_txn_abort_partial_rows(self, cursor, unique_database):
+  def _test_kudu_txn_abort_partial_rows(self, cursor, unique_database):
     # Create Kudu table.
     table_name = "%s.test_kudu_txn_abort_partial_rows" % unique_database
     self.execute_query(self._create_kudu_table_query.format(table_name))
@@ -570,10 +558,7 @@ class TestKuduTransaction(CustomClusterTestSuite):
     cursor.execute(self._row_num_query.format(table_name))
     assert cursor.fetchall() == [(0,)]
 
-  @pytest.mark.execute_serially
-  @SkipIfKudu.no_hybrid_clock
-  @SkipIfBuildType.not_dev_build
-  def test_kudu_txn_abort_partition_lock(self, cursor, unique_database):
+  def _test_kudu_txn_abort_partition_lock(self, cursor, unique_database):
     # Running two separate queries that are inserting to the same Kudu partitions.
     # Verify that one of the queries should fail, given Kudu's current implementation
     # of partition locking.
@@ -603,6 +588,135 @@ class TestKuduTransaction(CustomClusterTestSuite):
     self.client.close_query(handle)
 
 
+class TestKuduTransaction(TestKuduTransactionBase):
+  """
+  This suite tests the Kudu transaction when inserting rows to kudu table.
+  """
+
+  # expected error message from kudu on duplicate key.
+  _duplicate_key_error = "Kudu reported write operation errors during transaction."
+
+  @pytest.mark.execute_serially
+  @SkipIfKudu.no_hybrid_clock
+  def test_kudu_txn_succeed(self, cursor, unique_database):
+    self._test_kudu_txn_succeed(cursor, unique_database)
+
+  @pytest.mark.execute_serially
+  @SkipIfKudu.no_hybrid_clock
+  def test_kudu_txn_not_implemented(self, cursor, unique_database):
+    self._test_kudu_txn_not_implemented(cursor, unique_database)
+
+  @pytest.mark.execute_serially
+  @SkipIfKudu.no_hybrid_clock
+  def test_kudu_txn_abort_dup_key(self, cursor, unique_database):
+    self._test_kudu_txn_abort_dup_key(cursor, unique_database, True,
+        self._duplicate_key_error)
+
+  @pytest.mark.execute_serially
+  @SkipIfKudu.no_hybrid_clock
+  def test_kudu_txn_ctas(self, cursor, unique_database):
+    self._test_kudu_txn_ctas(cursor, unique_database, True, self._duplicate_key_error)
+
+  @pytest.mark.execute_serially
+  @SkipIfKudu.no_hybrid_clock
+  @SkipIfBuildType.not_dev_build
+  def test_kudu_txn_abort_row_batch(self, cursor, unique_database):
+    self._test_kudu_txn_abort_row_batch(cursor, unique_database)
+
+  @pytest.mark.execute_serially
+  @SkipIfKudu.no_hybrid_clock
+  @SkipIfBuildType.not_dev_build
+  def test_kudu_txn_abort_partial_rows(self, cursor, unique_database):
+    self._test_kudu_txn_abort_partial_rows(cursor, unique_database)
+
+  @pytest.mark.execute_serially
+  @SkipIfKudu.no_hybrid_clock
+  @SkipIfBuildType.not_dev_build
+  def test_kudu_txn_abort_partition_lock(self, cursor, unique_database):
+    self._test_kudu_txn_abort_partial_rows(cursor, unique_database)
+
+
+class TestKuduTransactionNoIgnore(TestKuduTransactionBase):
+  """
+  This suite tests the Kudu transaction when inserting rows to kudu table with
+  kudu_ignore_conflicts flag set to false.
+  """
+
+  # impalad args to start the cluster.
+  _impalad_args = "--kudu_ignore_conflicts=false"
+  # expected error message from kudu on duplicated key.
+  _duplicate_key_error = "Key already present in Kudu table"
+
+  @pytest.mark.execute_serially
+  @SkipIfKudu.no_hybrid_clock
+  @CustomClusterTestSuite.with_args(impalad_args=_impalad_args)
+  def test_kudu_txn_succeed(self, cursor, unique_database):
+    self._test_kudu_txn_succeed(cursor, unique_database)
+
+  @pytest.mark.execute_serially
+  @SkipIfKudu.no_hybrid_clock
+  @CustomClusterTestSuite.with_args(impalad_args=_impalad_args)
+  def test_kudu_txn_not_implemented(self, cursor, unique_database):
+    self._test_kudu_txn_not_implemented(cursor, unique_database)
+
+  @pytest.mark.execute_serially
+  @SkipIfKudu.no_hybrid_clock
+  @CustomClusterTestSuite.with_args(impalad_args=_impalad_args)
+  def test_kudu_txn_abort_dup_key(self, cursor, unique_database):
+    self._test_kudu_txn_abort_dup_key(cursor, unique_database, True,
+        self._duplicate_key_error)
+
+  @pytest.mark.execute_serially
+  @SkipIfKudu.no_hybrid_clock
+  @CustomClusterTestSuite.with_args(impalad_args=_impalad_args)
+  def test_kudu_txn_ctas(self, cursor, unique_database):
+    self._test_kudu_txn_ctas(cursor, unique_database, True, self._duplicate_key_error)
+
+  @pytest.mark.execute_serially
+  @SkipIfKudu.no_hybrid_clock
+  @SkipIfBuildType.not_dev_build
+  @CustomClusterTestSuite.with_args(impalad_args=_impalad_args)
+  def test_kudu_txn_abort_row_batch(self, cursor, unique_database):
+    self._test_kudu_txn_abort_row_batch(cursor, unique_database)
+
+  @pytest.mark.execute_serially
+  @SkipIfKudu.no_hybrid_clock
+  @SkipIfBuildType.not_dev_build
+  @CustomClusterTestSuite.with_args(impalad_args=_impalad_args)
+  def test_kudu_txn_abort_partial_rows(self, cursor, unique_database):
+    self._test_kudu_txn_abort_partial_rows(cursor, unique_database)
+
+  @pytest.mark.execute_serially
+  @SkipIfKudu.no_hybrid_clock
+  @SkipIfBuildType.not_dev_build
+  @CustomClusterTestSuite.with_args(impalad_args=_impalad_args)
+  def test_kudu_txn_abort_partition_lock(self, cursor, unique_database):
+    self._test_kudu_txn_abort_partial_rows(cursor, unique_database)
+
+
+class TestKuduTransactionIgnoreConflict(TestKuduTransactionBase):
+  """
+  This suite tests the Kudu transaction when inserting rows to kudu table with
+  kudu_ignore_conflicts=true and kudu_ignore_conflicts_in_transaction=true.
+  """
+
+  # impalad args to start the cluster.
+  _impalad_args = "--kudu_ignore_conflicts=true " \
+      "--kudu_ignore_conflicts_in_transaction=true"
+
+  @pytest.mark.execute_serially
+  @SkipIfKudu.no_hybrid_clock
+  @CustomClusterTestSuite.with_args(impalad_args=_impalad_args)
+  def test_kudu_txn_dup_key(self, cursor, unique_database):
+    self._test_kudu_txn_abort_dup_key(cursor, unique_database, False, "no error")
+
+  @pytest.mark.execute_serially
+  @SkipIfKudu.no_hybrid_clock
+  @CustomClusterTestSuite.with_args(impalad_args=_impalad_args)
+  def test_kudu_txn_ctas(self, cursor, unique_database):
+    self._test_kudu_txn_ctas(cursor, unique_database, False, "no error")
+
+
 class TestKuduTxnKeepalive(CustomKuduTest):
   """
   Tests the Kudu transaction to ensure the transaction handle kept by the front-end in
@@ -649,3 +763,139 @@ class TestKuduTxnKeepalive(CustomKuduTest):
     self.execute_query(self._insert_3_rows_query.format(table_name), query_options)
     cursor.execute(self._row_num_query.format(table_name))
     assert cursor.fetchall() == [(3,)]
+
+
+class TestKuduDmlConflictBase(CustomClusterTestSuite):
+  """
+  This is a base class of other TestKuduDml classes.
+  """
+
+  # query to create Kudu table.
+  _create_kudu_table_query = ("create table {0} "
+      "(a int primary key, b timestamp not null) "
+      "partition by hash(a) partitions 8 stored as kudu")
+  # queries to insert rows into Kudu table.
+  _insert_dup_key_query = ("insert into {0} values "
+      "(0, '1400-01-01'), (0, '1400-01-02'), "
+      "(1, '1400-01-01'), (1, '1400-01-02'), "
+      "(2, '1400-01-01'), (2, '1400-01-02')")
+  # query to update rows in Kudu table with some constraint violation.
+  _update_violate_constraint_query = ("update {0} set b = "
+      "case when b <= '2022-01-01' then NULL else '1400-01-01' end")
+  # query to update row by primary key.
+  _update_by_key_query = "update {0} set b = '1400-02-02' where a = {1}"
+  # query to delete row by primary key.
+  _delete_by_key_query = "delete from {0} where a = {1}"
+  # query to drop all rows from Kudu table.
+  _delete_all_query = "delete from {0}"
+  # query to get number of rows.
+  _row_num_query = "select count(*) from {0}"
+
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
+  def _check_errors(self, query_profile, expect_error, error_message, num_row_erros):
+    """
+    Check ocurrence of error_message and num_row_errors in query_profile.
+    """
+
+    error_line = "  Errors: {0}".format(error_message)
+    num_row_error_line = "NumRowErrors: {0}".format(num_row_erros)
+    assert expect_error == (error_line in query_profile)
+    assert num_row_error_line in query_profile
+
+  def _race_queries(self, fast_query, slow_query, expect_error_on_slow_query,
+      error_message, num_row_erros):
+    """
+    Race two queries and check for error message in the slow query's profile.
+    """
+
+    fast_sleep = {'debug_action': 'FIS_KUDU_TABLE_SINK_WRITE_BEGIN:SLEEP@1000'}
+    slow_sleep = {'debug_action': 'FIS_KUDU_TABLE_SINK_WRITE_BEGIN:SLEEP@3000'}
+    timeout = 10
+
+    fast_handle = self.execute_query_async(fast_query, fast_sleep)
+    slow_handle = self.execute_query_async(slow_query, slow_sleep)
+    try:
+      # Wait for both queries to finish.
+      self.wait_for_state(fast_handle, self.client.QUERY_STATES['FINISHED'], timeout)
+      self.wait_for_state(slow_handle, self.client.QUERY_STATES['FINISHED'], timeout)
+      self._check_errors(self.client.get_runtime_profile(slow_handle),
+          expect_error_on_slow_query, error_message, num_row_erros)
+    finally:
+      self.client.close_query(fast_handle)
+      self.client.close_query(slow_handle)
+
+  def _test_insert_update_delete(self, cursor, unique_database,
+      expect_log_on_conflict):
+    """
+    Do sequence of insert, update, and delete query with conflicting primary keys.
+    """
+
+    # Create Kudu table.
+    table_name = "%s.insert_update_delete" % unique_database
+    self.execute_query(self._create_kudu_table_query.format(table_name))
+
+    # Insert rows with duplicate primary key.
+    # Error message should exist in profile if kudu_ignore_conflicts=true.
+    result = self.execute_query(self._insert_dup_key_query.format(table_name))
+    self._check_errors(result.runtime_profile, expect_log_on_conflict,
+        "Key already present in Kudu table", 3)
+    cursor.execute(self._row_num_query.format(table_name))
+    assert cursor.fetchall() == [(3,)]
+
+    # Update rows with some constraint violation.
+    # Error message should exist in profile regardless of kudu_ignore_conflicts value.
+    result = self.execute_query(self._update_violate_constraint_query.format(table_name))
+    self._check_errors(result.runtime_profile, True,
+        "Row with null value violates nullability constraint on table", 3)
+
+    # Update row with non-existent primary key by racing it against concurrent delete
+    # query.
+    delete_query = self._delete_by_key_query.format(table_name, 1)
+    update_query = self._update_by_key_query.format(table_name, 1)
+    self._race_queries(delete_query, update_query, expect_log_on_conflict,
+        "Not found in Kudu table", 1)
+    cursor.execute(self._row_num_query.format(table_name))
+    assert cursor.fetchall() == [(2,)]
+
+    # Delete row with non-existent primary key by racing it against another concurrent
+    # delete.
+    delete_query = self._delete_by_key_query.format(table_name, 2)
+    self._race_queries(delete_query, delete_query, expect_log_on_conflict,
+        "Not found in Kudu table", 1)
+    cursor.execute(self._row_num_query.format(table_name))
+    assert cursor.fetchall() == [(1,)]
+
+    # Delete all rows. Expect no errors.
+    result = self.execute_query(self._delete_all_query.format(table_name))
+    self._check_errors(result.runtime_profile, True, "\n", 0)
+    cursor.execute(self._row_num_query.format(table_name))
+    assert cursor.fetchall() == [(0,)]
+
+
+class TestKuduDmlConflictNoError(TestKuduDmlConflictBase):
+  """
+  Test that Kudu DML ignore conflict and does not log the conflict error message.
+  """
+
+  @pytest.mark.execute_serially
+  @SkipIfKudu.no_hybrid_clock
+  def test_insert_update_delete(self, cursor, unique_database):
+    self._test_insert_update_delete(cursor, unique_database, False)
+
+
+class TestKuduDmlConflictLogError(TestKuduDmlConflictBase):
+  """
+  Test that Kudu DML not ignore conflict and log the conflict error message.
+  """
+
+  # impalad args to start the cluster.
+  _impalad_args = "--kudu_ignore_conflicts=false"
+
+  @pytest.mark.execute_serially
+  @SkipIfKudu.no_hybrid_clock
+  @CustomClusterTestSuite.with_args(impalad_args=_impalad_args)
+  def test_insert_update_delete(self, cursor, unique_database):
+    self._test_insert_update_delete(cursor, unique_database, True)