You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/10/25 20:16:16 UTC

[29/33] incubator-impala git commit: IMPALA-4134, IMPALA-3704: Kudu INSERT improvements

IMPALA-4134,IMPALA-3704: Kudu INSERT improvements

1.) IMPALA-4134: Use Kudu AUTO FLUSH
Improves performance of writes to Kudu up to 4.2x in
bulk data loading tests (load 200 million rows from
lineitem).

2.) IMPALA-3704: Improve errors on PK conflicts
The Kudu client reports an error for every PK conflict,
and all errors were being returned in the error status.
As a result, inserts/updates/deletes could return errors
with thousands errors reported. This changes the error
handling to log all reported errors as warnings and
return only the first error in the query error status.

3.) Improve the DataSink reporting of the insert stats.
The per-partition stats returned by the data sink weren't
useful for Kudu sinks. Firstly, the number of appended rows
was not being displayed in the profile. Secondly, the
'stats' field isn't populated for Kudu tables and thus was
confusing in the profile, so it is no longer printed if it
is not set in the thrift struct.

Testing: Ran local tests, including new tests to verify
the query profile insert stats. Manual cluster testing was
conducted of the AUTO FLUSH functionality, and that testing
informed the default mutation buffer value of 100MB which
was found to provide good results.

Change-Id: I5542b9a061b01c543a139e8722560b1365f06595
Reviewed-on: http://gerrit.cloudera.org:8080/4728
Reviewed-by: Matthew Jacobs <mj...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/99ed6dc6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/99ed6dc6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/99ed6dc6

Branch: refs/heads/hadoop-next
Commit: 99ed6dc67ae889eb2a45b10c97cb23f52bc83e5d
Parents: 0eaff80
Author: Matthew Jacobs <mj...@cloudera.com>
Authored: Wed Oct 19 15:30:58 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Tue Oct 25 02:06:10 2016 +0000

----------------------------------------------------------------------
 be/src/exec/data-sink.cc                        |   5 +
 be/src/exec/hbase-table-sink.cc                 |   4 +-
 be/src/exec/hdfs-table-sink.cc                  |   4 +-
 be/src/exec/kudu-table-sink.cc                  | 141 ++++++++++++-------
 be/src/exec/kudu-table-sink.h                   |  53 ++++---
 be/src/runtime/coordinator.cc                   |  14 +-
 be/src/service/impala-beeswax-server.cc         |   4 +-
 be/src/service/query-exec-state.cc              |   2 +-
 common/thrift/ImpalaInternalService.thrift      |   4 +-
 common/thrift/ImpalaService.thrift              |   8 +-
 common/thrift/generate_error_codes.py           |   6 +
 shell/impala_client.py                          |   2 +-
 .../queries/QueryTest/kudu_crud.test            |  94 ++++++++++++-
 tests/beeswax/impala_beeswax.py                 |   4 +-
 14 files changed, 248 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/99ed6dc6/be/src/exec/data-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/data-sink.cc b/be/src/exec/data-sink.cc
index c95b854..6a34543 100644
--- a/be/src/exec/data-sink.cc
+++ b/be/src/exec/data-sink.cc
@@ -142,6 +142,11 @@ string DataSink::OutputInsertStats(const PartitionStatusMap& stats,
     } else {
       ss << partition_key << endl;
     }
+    if (val.second.__isset.num_modified_rows) {
+      ss << "NumModifiedRows: " << val.second.num_modified_rows << endl;
+    }
+
+    if (!val.second.__isset.stats) continue;
     const TInsertStats& stats = val.second.stats;
     ss << indent << "BytesWritten: "
        << PrettyPrinter::Print(stats.bytes_written, TUnit::BYTES);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/99ed6dc6/be/src/exec/hbase-table-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hbase-table-sink.cc b/be/src/exec/hbase-table-sink.cc
index e6052cc..3d84fed 100644
--- a/be/src/exec/hbase-table-sink.cc
+++ b/be/src/exec/hbase-table-sink.cc
@@ -72,7 +72,7 @@ Status HBaseTableSink::Prepare(RuntimeState* state, MemTracker* mem_tracker) {
 
   // Add a 'root partition' status in which to collect insert statistics
   TInsertPartitionStatus root_status;
-  root_status.__set_num_appended_rows(0L);
+  root_status.__set_num_modified_rows(0L);
   root_status.__set_stats(TInsertStats());
   root_status.__set_id(-1L);
   state->per_partition_status()->insert(make_pair(ROOT_PARTITION_KEY, root_status));
@@ -90,7 +90,7 @@ Status HBaseTableSink::Send(RuntimeState* state, RowBatch* batch) {
   RETURN_IF_ERROR(state->CheckQueryState());
   // Since everything is set up just forward everything to the writer.
   RETURN_IF_ERROR(hbase_table_writer_->AppendRowBatch(batch));
-  (*state->per_partition_status())[ROOT_PARTITION_KEY].num_appended_rows +=
+  (*state->per_partition_status())[ROOT_PARTITION_KEY].num_modified_rows +=
       batch->num_rows();
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/99ed6dc6/be/src/exec/hdfs-table-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-table-sink.cc b/be/src/exec/hdfs-table-sink.cc
index 77316d4..63bd648 100644
--- a/be/src/exec/hdfs-table-sink.cc
+++ b/be/src/exec/hdfs-table-sink.cc
@@ -499,7 +499,7 @@ inline Status HdfsTableSink::GetOutputPartition(RuntimeState* state,
     DCHECK(state->per_partition_status()->find(partition->partition_name) ==
         state->per_partition_status()->end());
     TInsertPartitionStatus partition_status;
-    partition_status.__set_num_appended_rows(0L);
+    partition_status.__set_num_modified_rows(0L);
     partition_status.__set_id(partition_descriptor->id());
     partition_status.__set_stats(TInsertStats());
     partition_status.__set_partition_base_dir(table_desc_->hdfs_base_dir());
@@ -594,7 +594,7 @@ Status HdfsTableSink::FinalizePartitionFile(RuntimeState* state,
     // Should have been created in GetOutputPartition() when the partition was
     // initialised.
     DCHECK(it != state->per_partition_status()->end());
-    it->second.num_appended_rows += partition->num_rows;
+    it->second.num_modified_rows += partition->num_rows;
     DataSink::MergeInsertStats(partition->writer->stats(), &it->second.stats);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/99ed6dc6/be/src/exec/kudu-table-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-table-sink.cc b/be/src/exec/kudu-table-sink.cc
index 661489f..70a74a9 100644
--- a/be/src/exec/kudu-table-sink.cc
+++ b/be/src/exec/kudu-table-sink.cc
@@ -33,6 +33,8 @@
 
 DEFINE_int32(kudu_session_timeout_seconds, 60, "Timeout set on the Kudu session. "
     "How long to wait before considering a write failed.");
+DEFINE_int32(kudu_mutation_buffer_size, 100 * 1024 * 1024, "The size (bytes) of the "
+    "Kudu client buffer for mutations.");
 
 using kudu::client::KuduColumnSchema;
 using kudu::client::KuduSchema;
@@ -48,6 +50,9 @@ namespace impala {
 const static string& ROOT_PARTITION_KEY =
     g_ImpalaInternalService_constants.ROOT_PARTITION_KEY;
 
+// Send 7MB buffers to Kudu, matching a hard-coded size in Kudu (KUDU-1693).
+const static int INDIVIDUAL_BUFFER_SIZE = 7 * 1024 * 1024;
+
 KuduTableSink::KuduTableSink(const RowDescriptor& row_desc,
     const vector<TExpr>& select_list_texprs,
     const TDataSink& tsink)
@@ -56,8 +61,6 @@ KuduTableSink::KuduTableSink(const RowDescriptor& row_desc,
       select_list_texprs_(select_list_texprs),
       sink_action_(tsink.table_sink.action),
       kudu_table_sink_(tsink.table_sink.kudu_table_sink),
-      kudu_flush_counter_(NULL),
-      kudu_flush_timer_(NULL),
       kudu_error_counter_(NULL),
       rows_written_(NULL),
       rows_written_rate_(NULL) {
@@ -91,16 +94,14 @@ Status KuduTableSink::Prepare(RuntimeState* state, MemTracker* mem_tracker) {
 
   // Add a 'root partition' status in which to collect write statistics
   TInsertPartitionStatus root_status;
-  root_status.__set_num_appended_rows(0L);
-  root_status.__set_stats(TInsertStats());
+  root_status.__set_num_modified_rows(0L);
   root_status.__set_id(-1L);
   state->per_partition_status()->insert(make_pair(ROOT_PARTITION_KEY, root_status));
 
   // Add counters
-  kudu_flush_counter_ = ADD_COUNTER(profile(), "TotalKuduFlushOperations", TUnit::UNIT);
   kudu_error_counter_ = ADD_COUNTER(profile(), "TotalKuduFlushErrors", TUnit::UNIT);
-  kudu_flush_timer_ = ADD_TIMER(profile(), "KuduFlushTimer");
   rows_written_ = ADD_COUNTER(profile(), "RowsWritten", TUnit::UNIT);
+  kudu_apply_timer_ = ADD_TIMER(profile(), "KuduApplyTimer");
   rows_written_rate_ = profile()->AddDerivedCounter(
       "RowsWrittenRate", TUnit::UNIT_PER_SECOND,
       bind<int64_t>(&RuntimeProfile::UnitsPerSecond, rows_written_,
@@ -124,8 +125,40 @@ Status KuduTableSink::Open(RuntimeState* state) {
 
   session_ = client_->NewSession();
   session_->SetTimeoutMillis(FLAGS_kudu_session_timeout_seconds * 1000);
+
+  // KuduSession Set* methods here and below return a status for API compatibility.
+  // As long as the Kudu client is statically linked, these shouldn't fail and thus these
+  // calls could also DCHECK status is OK for debug builds (while still returning errors
+  // for release).
   KUDU_RETURN_IF_ERROR(session_->SetFlushMode(
-      kudu::client::KuduSession::MANUAL_FLUSH), "Unable to set flush mode");
+      kudu::client::KuduSession::AUTO_FLUSH_BACKGROUND), "Unable to set flush mode");
+
+  const int32_t buf_size = FLAGS_kudu_mutation_buffer_size;
+  if (buf_size < 1024 * 1024) {
+    return Status(strings::Substitute(
+        "Invalid kudu_mutation_buffer_size: '$0'. Must be greater than 1MB.", buf_size));
+  }
+  KUDU_RETURN_IF_ERROR(session_->SetMutationBufferSpace(buf_size),
+      "Couldn't set mutation buffer size");
+
+  // Configure client memory used for buffering.
+  // Internally, the Kudu client keeps one or more buffers for writing operations. When a
+  // single buffer is flushed, it is locked (that space cannot be reused) until all
+  // operations within it complete, so it is important to have a number of buffers. In
+  // our testing, we found that allowing a total of 100MB of buffer space to provide good
+  // results; this is the default.  Then, because of some existing 8MB limits in Kudu, we
+  // want to have that total space broken up into 7MB buffers (INDIVIDUAL_BUFFER_SIZE).
+  // The mutation flush watermark is set to flush every INDIVIDUAL_BUFFER_SIZE.
+  int num_buffers = FLAGS_kudu_mutation_buffer_size / INDIVIDUAL_BUFFER_SIZE;
+  if (num_buffers == 0) num_buffers = 1;
+  KUDU_RETURN_IF_ERROR(session_->SetMutationBufferFlushWatermark(1.0 / num_buffers),
+      "Couldn't set mutation buffer watermark");
+
+  // No limit on the buffer count since the settings above imply a max number of buffers.
+  // Note that the Kudu client API has a few too many knobs for configuring the size and
+  // number of these buffers; there are a few ways to accomplish similar behaviors.
+  KUDU_RETURN_IF_ERROR(session_->SetMutationBufferMaxNum(0),
+      "Couldn't set mutation buffer count");
   return Status::OK();
 }
 
@@ -135,7 +168,8 @@ kudu::client::KuduWriteOperation* KuduTableSink::NewWriteOp() {
   } else if (sink_action_ == TSinkAction::UPDATE) {
     return table_->NewUpdate();
   } else {
-    DCHECK(sink_action_ == TSinkAction::DELETE) << "Sink type not supported. " << sink_action_;
+    DCHECK(sink_action_ == TSinkAction::DELETE) << "Sink type not supported: "
+        << sink_action_;
     return table_->NewDelete();
   }
 }
@@ -145,11 +179,15 @@ Status KuduTableSink::Send(RuntimeState* state, RowBatch* batch) {
   ExprContext::FreeLocalAllocations(output_expr_ctxs_);
   RETURN_IF_ERROR(state->CheckQueryState());
 
+  // Collect all write operations and apply them together so the time in Apply() can be
+  // easily timed.
+  vector<unique_ptr<kudu::client::KuduWriteOperation>> write_ops;
+
   int rows_added = 0;
   // Since everything is set up just forward everything to the writer.
   for (int i = 0; i < batch->num_rows(); ++i) {
     TupleRow* current_row = batch->GetRow(i);
-    gscoped_ptr<kudu::client::KuduWriteOperation> write(NewWriteOp());
+    unique_ptr<kudu::client::KuduWriteOperation> write(NewWriteOp());
 
     for (int j = 0; j < output_expr_ctxs_.size(); ++j) {
       int col = kudu_table_sink_.referenced_columns.empty() ?
@@ -173,7 +211,7 @@ Status KuduTableSink::Send(RuntimeState* state, RowBatch* batch) {
         case TYPE_STRING: {
           StringValue* sv = reinterpret_cast<StringValue*>(value);
           kudu::Slice slice(reinterpret_cast<uint8_t*>(sv->ptr), sv->len);
-          KUDU_RETURN_IF_ERROR(write->mutable_row()->SetStringNoCopy(col, slice),
+          KUDU_RETURN_IF_ERROR(write->mutable_row()->SetString(col, slice),
               "Could not add Kudu WriteOp.");
           break;
         }
@@ -216,46 +254,38 @@ Status KuduTableSink::Send(RuntimeState* state, RowBatch* batch) {
           return Status(TErrorCode::IMPALA_KUDU_TYPE_MISSING, TypeToString(type));
       }
     }
+    write_ops.push_back(move(write));
+  }
 
-    KUDU_RETURN_IF_ERROR(session_->Apply(write.release()),
-        "Error while applying Kudu session.");
-    ++rows_added;
+  {
+    SCOPED_TIMER(kudu_apply_timer_);
+    for (auto&& write: write_ops) {
+      KUDU_RETURN_IF_ERROR(session_->Apply(write.release()), "Error applying Kudu Op.");
+      ++rows_added;
+    }
   }
+
   COUNTER_ADD(rows_written_, rows_added);
-  int64_t error_count = 0;
-  RETURN_IF_ERROR(Flush(&error_count));
-  (*state->per_partition_status())[ROOT_PARTITION_KEY].num_appended_rows +=
-      rows_added - error_count;
+  RETURN_IF_ERROR(CheckForErrors(state));
   return Status::OK();
 }
 
-Status KuduTableSink::Flush(int64_t* error_count) {
-  // TODO right now we always flush an entire row batch, if these are small we'll
-  // be inefficient. Consider decoupling impala's batch size from kudu's
-  kudu::Status s;
-  {
-    SCOPED_TIMER(kudu_flush_timer_);
-    COUNTER_ADD(kudu_flush_counter_, 1);
-    s = session_->Flush();
-  }
-  if (LIKELY(s.ok())) return Status::OK();
+Status KuduTableSink::CheckForErrors(RuntimeState* state) {
+  if (session_->CountPendingErrors() == 0) return Status::OK();
 
-  stringstream error_msg_buffer;
   vector<KuduError*> errors;
-
-  // Check if there are pending errors in the Kudu session. If errors overflowed the error
-  // buffer we can't be sure all errors can be ignored and fail immediately, setting 'failed'
-  // to true.
-  bool failed = false;
-  session_->GetPendingErrors(&errors, &failed);
-  if (UNLIKELY(failed)) {
-    error_msg_buffer << "Error overflow in Kudu session, "
-                     << "previous write operation might be inconsistent.\n";
+  Status status = Status::OK();
+
+  // Get the pending errors from the Kudu session. If errors overflowed the error buffer
+  // we can't be sure all errors can be ignored, so an error status will be reported.
+  bool error_overflow = false;
+  session_->GetPendingErrors(&errors, &error_overflow);
+  if (UNLIKELY(error_overflow)) {
+    status = Status("Error overflow in Kudu session.");
   }
 
   // The memory for the errors is manually managed. Iterate over all errors and delete
   // them accordingly.
-  bool first_error = true;
   for (int i = 0; i < errors.size(); ++i) {
     kudu::Status e = errors[i]->status();
     // If the sink has the option "ignore_not_found_or_duplicate" set, duplicate key or
@@ -265,24 +295,39 @@ Status KuduTableSink::Flush(int64_t* error_count) {
         ((sink_action_ == TSinkAction::DELETE && !e.IsNotFound()) ||
             (sink_action_ == TSinkAction::UPDATE && !e.IsNotFound()) ||
             (sink_action_ == TSinkAction::INSERT && !e.IsAlreadyPresent()))) {
-      if (first_error) {
-        error_msg_buffer << "Error while flushing Kudu session: \n";
-        first_error = false;
+      if (status.ok()) {
+        status = Status(strings::Substitute(
+            "Kudu error(s) reported, first error: $0", e.ToString()));
       }
-      error_msg_buffer << e.ToString() << "\n";
-      failed = true;
+    }
+    if (e.IsNotFound()) {
+      state->LogError(ErrorMsg::Init(TErrorCode::KUDU_KEY_NOT_FOUND,
+          table_desc_->table_name()));
+    } else if (e.IsAlreadyPresent()) {
+      state->LogError(ErrorMsg::Init(TErrorCode::KUDU_KEY_ALREADY_PRESENT,
+          table_desc_->table_name()));
+    } else {
+      state->LogError(ErrorMsg::Init(TErrorCode::KUDU_SESSION_ERROR,
+          table_desc_->table_name(), e.ToString()));
     }
     delete errors[i];
   }
   COUNTER_ADD(kudu_error_counter_, errors.size());
-  if (error_count != NULL) *error_count = errors.size();
-  if (failed) return Status(error_msg_buffer.str());
-  return Status::OK();
+  return status;
 }
 
 Status KuduTableSink::FlushFinal(RuntimeState* state) {
-  // No buffered state to flush.
-  return Status::OK();
+  kudu::Status flush_status = session_->Flush();
+
+  // Flush() may return an error status but any errors will also be reported by
+  // CheckForErrors(), so it's safe to ignore and always call CheckForErrors.
+  if (!flush_status.ok()) {
+    VLOG_RPC << "Ignoring Flush() error status: " << flush_status.ToString();
+  }
+  Status status = CheckForErrors(state);
+  (*state->per_partition_status())[ROOT_PARTITION_KEY].__set_num_modified_rows(
+      rows_written_->value() - kudu_error_counter_->value());
+  return status;
 }
 
 void KuduTableSink::Close(RuntimeState* state) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/99ed6dc6/be/src/exec/kudu-table-sink.h
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-table-sink.h b/be/src/exec/kudu-table-sink.h
index 2eeb721..fe278c5 100644
--- a/be/src/exec/kudu-table-sink.h
+++ b/be/src/exec/kudu-table-sink.h
@@ -30,17 +30,27 @@
 
 namespace impala {
 
-/// Sink that takes RowBatches and writes them into Kudu.
-/// Currently the data is sent to Kudu on Send(), i.e. the data is batched on the
-/// KuduSession until all the rows in a RowBatch are applied and then the session
-/// is flushed.
+/// Sink that takes RowBatches and writes them into a Kudu table.
 ///
-/// Kudu doesn't have transactions (yet!) so some rows may fail to write while
-/// others are successful. This sink will return an error if any of the rows fails
-/// to be written.
+/// The data is added to Kudu in Send(). The Kudu client is configured to automatically
+/// flush records when enough data has been written (AUTO_FLUSH_BACKGROUND). This
+/// requires specifying a mutation buffer size and a buffer flush watermark percentage in
+/// the Kudu client. The mutation buffer needs to be large enough to buffer rows sent to
+/// all destination nodes because the buffer accounting is not specified per-tablet
+/// server (KUDU-1693). Tests showed that 100MB was a good default, and this is
+/// configurable via the gflag --kudu_mutation_buffer_size. The buffer flush watermark
+/// percentage is set to a value that results in Kudu flushing after 7MB is in a
+/// buffer for a particular destination (of the 100MB of the total mutation buffer space)
+/// because Kudu currently has some 8MB buffer limits.
 ///
-/// TODO Once Kudu actually implements AUTOFLUSH_BACKGROUND flush mode we should
-/// change the flushing behavior as it will likely make writes more efficient.
+/// Kudu doesn't have transactions yet, so some rows may fail to write while others are
+/// successful. The Kudu client reports errors, some of which may be considered to be
+/// expected: rows that fail to be written/updated/deleted due to a key conflict while
+/// the IGNORE option is specified, and these will not result in the sink returning an
+/// error. These errors when IGNORE is not specified, or any other kind of error
+/// reported by Kudu result in the sink returning an error status. The first non-ignored
+/// error is returned in the sink's Status. All reported errors (ignored or not) will be
+/// logged via the RuntimeState.
 class KuduTableSink : public DataSink {
  public:
   KuduTableSink(const RowDescriptor& row_desc,
@@ -59,7 +69,7 @@ class KuduTableSink : public DataSink {
   /// The KuduSession is flushed on each row batch.
   virtual Status Send(RuntimeState* state, RowBatch* batch);
 
-  /// Does nothing. We currently flush on each Send() call.
+  /// Forces any remaining buffered operations to be flushed to Kudu.
   virtual Status FlushFinal(RuntimeState* state);
 
   /// Closes the KuduSession and the expressions.
@@ -72,12 +82,11 @@ class KuduTableSink : public DataSink {
   /// Create a new write operation according to the sink type.
   kudu::client::KuduWriteOperation* NewWriteOp();
 
-  /// Flushes the Kudu session, making sure all previous operations were committed, and handles
-  /// errors returned from Kudu. Passes the number of errors during the flush operations as an
-  /// out parameter.
-  /// Returns a non-OK status if there was an unrecoverable error. This might return an OK
-  /// status even if 'error_count' is > 0, as some errors might be ignored.
-  Status Flush(int64_t* error_count);
+  /// Checks for any errors buffered in the Kudu session, and increments
+  /// appropriate counters for ignored errors.
+  //
+  /// Returns a bad Status if there are non-ignorable errors.
+  Status CheckForErrors(RuntimeState* state);
 
   /// Used to get the KuduTableDescriptor from the RuntimeState
   TableId table_id_;
@@ -102,15 +111,15 @@ class KuduTableSink : public DataSink {
   /// Captures parameters passed down from the frontend
   TKuduTableSink kudu_table_sink_;
 
-  /// Counts the number of calls to KuduSession::flush().
-  RuntimeProfile::Counter* kudu_flush_counter_;
-
-  /// Aggregates the times spent in KuduSession:flush().
-  RuntimeProfile::Counter* kudu_flush_timer_;
-
   /// Total number of errors returned from Kudu.
   RuntimeProfile::Counter* kudu_error_counter_;
 
+  /// Time spent applying Kudu operations. In normal circumstances, Apply() should be
+  /// negligible because it is asynchronous with AUTO_FLUSH_BACKGROUND enabled.
+  /// Significant time spent in Apply() may indicate that Kudu cannot buffer and send
+  /// rows as fast as the sink can write them.
+  RuntimeProfile::Counter* kudu_apply_timer_;
+
   /// Total number of rows written including errors.
   RuntimeProfile::Counter* rows_written_;
   RuntimeProfile::Counter* rows_written_rate_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/99ed6dc6/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 4214d4d..0f41deb 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -1668,11 +1668,15 @@ Status Coordinator::UpdateFragmentExecStatus(const TReportExecStatusParams& para
     for (const PartitionStatusMap::value_type& partition:
          params.insert_exec_status.per_partition_status) {
       TInsertPartitionStatus* status = &(per_partition_status_[partition.first]);
-      status->num_appended_rows += partition.second.num_appended_rows;
-      status->id = partition.second.id;
-      status->partition_base_dir = partition.second.partition_base_dir;
-      if (!status->__isset.stats) status->__set_stats(TInsertStats());
-      DataSink::MergeInsertStats(partition.second.stats, &status->stats);
+      status->__set_num_modified_rows(
+          status->num_modified_rows + partition.second.num_modified_rows);
+      status->__set_id(partition.second.id);
+      status->__set_partition_base_dir(partition.second.partition_base_dir);
+
+      if (partition.second.__isset.stats) {
+        if (!status->__isset.stats) status->__set_stats(TInsertStats());
+        DataSink::MergeInsertStats(partition.second.stats, &status->stats);
+      }
     }
     files_to_move_.insert(
         params.insert_exec_status.files_to_move.begin(),

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/99ed6dc6/be/src/service/impala-beeswax-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-beeswax-server.cc b/be/src/service/impala-beeswax-server.cc
index b50499e..63fa9cd 100644
--- a/be/src/service/impala-beeswax-server.cc
+++ b/be/src/service/impala-beeswax-server.cc
@@ -523,8 +523,8 @@ Status ImpalaServer::CloseInsertInternal(const TUniqueId& query_id,
         for (const PartitionStatusMap::value_type& v:
              exec_state->coord()->per_partition_status()) {
           const pair<string, TInsertPartitionStatus> partition_status = v;
-          insert_result->rows_appended[partition_status.first] =
-              partition_status.second.num_appended_rows;
+          insert_result->rows_modified[partition_status.first] =
+              partition_status.second.num_modified_rows;
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/99ed6dc6/be/src/service/query-exec-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-exec-state.cc b/be/src/service/query-exec-state.cc
index 69473c5..3dc9d6c 100644
--- a/be/src/service/query-exec-state.cc
+++ b/be/src/service/query-exec-state.cc
@@ -956,7 +956,7 @@ void ImpalaServer::QueryExecState::SetCreateTableAsSelectResultSet() {
   if (catalog_op_executor_->ddl_exec_response()->new_table_created) {
     DCHECK(coord_.get());
     for (const PartitionStatusMap::value_type& p: coord_->per_partition_status()) {
-      total_num_rows_inserted += p.second.num_appended_rows;
+      total_num_rows_inserted += p.second.num_modified_rows;
     }
   }
   const string& summary_msg = Substitute("Inserted $0 row(s)", total_num_rows_inserted);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/99ed6dc6/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index 736de34..e9c3119 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -422,8 +422,8 @@ struct TInsertPartitionStatus {
   // query). See THdfsTable.partitions.
   1: optional i64 id
 
-  // The number of rows appended to this partition
-  2: optional i64 num_appended_rows
+  // The number of rows modified in this partition
+  2: optional i64 num_modified_rows
 
   // Detailed statistics gathered by table writers for this partition
   3: optional TInsertStats stats

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/99ed6dc6/common/thrift/ImpalaService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index 794c140..573709b 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -247,10 +247,10 @@ enum TImpalaQueryOptions {
 
 // The summary of an insert.
 struct TInsertResult {
-  // Number of appended rows per modified partition. Only applies to HDFS tables.
-  // The keys represent partitions to create, coded as k1=v1/k2=v2/k3=v3..., with the
-  // root in an unpartitioned table being the empty string.
-  1: required map<string, i64> rows_appended
+  // Number of modified rows per partition. Only applies to HDFS and Kudu tables.
+  // The keys represent partitions to create, coded as k1=v1/k2=v2/k3=v3..., with
+  // the root in an unpartitioned table being the empty string.
+  1: required map<string, i64> rows_modified
 }
 
 // Response from a call to PingImpalaService

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/99ed6dc6/common/thrift/generate_error_codes.py
----------------------------------------------------------------------
diff --git a/common/thrift/generate_error_codes.py b/common/thrift/generate_error_codes.py
index ae338d5..f03b073 100755
--- a/common/thrift/generate_error_codes.py
+++ b/common/thrift/generate_error_codes.py
@@ -292,6 +292,12 @@ error_codes = (
 
   ("NO_REGISTERED_BACKENDS", 94, "Cannot schedule query: no registered backends "
    "available."),
+
+  ("KUDU_KEY_ALREADY_PRESENT", 95, "Key already present in Kudu table '$0'."),
+
+  ("KUDU_KEY_NOT_FOUND", 96, "Key not found in Kudu table '$0'."),
+
+  ("KUDU_SESSION_ERROR", 97, "Error in Kudu table '$0': $1")
 )
 
 import sys

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/99ed6dc6/shell/impala_client.py
----------------------------------------------------------------------
diff --git a/shell/impala_client.py b/shell/impala_client.py
index 0d1c835..f57a015 100755
--- a/shell/impala_client.py
+++ b/shell/impala_client.py
@@ -354,7 +354,7 @@ class ImpalaClient(object):
     if status != RpcStatus.OK:
       raise RPCException()
 
-    num_rows = sum([int(k) for k in insert_result.rows_appended.values()])
+    num_rows = sum([int(k) for k in insert_result.rows_modified.values()])
     return num_rows
 
   def close_query(self, last_query_handle, query_handle_closed=False):

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/99ed6dc6/testdata/workloads/functional-query/queries/QueryTest/kudu_crud.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/kudu_crud.test b/testdata/workloads/functional-query/queries/QueryTest/kudu_crud.test
index a06d203..e4f3205 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/kudu_crud.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/kudu_crud.test
@@ -29,10 +29,15 @@ insert into tdata values
 (3, "todd", cast(1.0 as float), 993393939, cast('c' as VARCHAR(20)), true)
 ---- RESULTS
 : 3
+---- RUNTIME_PROFILE
+row_regex: .*NumModifiedRows: 3.*
 ====
 ---- QUERY
 update tdata set vali=43 where id = 1
 ---- RESULTS
+# TODO: Verify row count after fixing IMPALA-3713 (Here and UPDATE/DELETE below)
+---- RUNTIME_PROFILE
+row_regex: .*NumModifiedRows: 1.*
 ====
 ---- QUERY
 select * from tdata
@@ -48,6 +53,8 @@ INT,STRING,FLOAT,BIGINT,STRING,BOOLEAN
 # Try updating a varchar col. with a value that is bigger than it's size (truncated).
 update tdata set valv=cast('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' as VARCHAR(20)) where id = 1
 ---- RESULTS
+---- RUNTIME_PROFILE
+row_regex: .*NumModifiedRows: 1.*
 ====
 ---- QUERY
 select * from tdata
@@ -58,10 +65,11 @@ select * from tdata
 ---- TYPES
 INT,STRING,FLOAT,BIGINT,STRING,BOOLEAN
 ====
-====
 ---- QUERY
 update tdata set valb=false where id = 1
 ---- RESULTS
+---- RUNTIME_PROFILE
+row_regex: .*NumModifiedRows: 1.*
 ====
 ---- QUERY
 select * from tdata
@@ -75,6 +83,8 @@ INT,STRING,FLOAT,BIGINT,STRING,BOOLEAN
 ---- QUERY
 update tdata set vali=43 where id > 1
 ---- RESULTS
+---- RUNTIME_PROFILE
+row_regex: .*NumModifiedRows: 2.*
 ====
 ---- QUERY
 select * from tdata
@@ -88,6 +98,8 @@ INT,STRING,FLOAT,BIGINT,STRING,BOOLEAN
 ---- QUERY
 update tdata set name='unknown' where name = 'martin'
 ---- RESULTS
+---- RUNTIME_PROFILE
+row_regex: .*NumModifiedRows: 1.*
 ====
 ---- QUERY
 select * from tdata
@@ -104,6 +116,8 @@ insert into tdata values
 (120, "she", cast(0.0 as float), 99, cast('f' as VARCHAR(20)), true)
 ---- RESULTS
 : 2
+---- RUNTIME_PROFILE
+row_regex: .*NumModifiedRows: 2.*
 ====
 ---- QUERY
 select * from tdata
@@ -119,6 +133,8 @@ INT,STRING,FLOAT,BIGINT,STRING,BOOLEAN
 ---- QUERY
 update tdata set name=null where id = 40
 ---- RESULTS
+---- RUNTIME_PROFILE
+row_regex: .*NumModifiedRows: 1.*
 ====
 ---- QUERY
 select * from tdata
@@ -133,6 +149,8 @@ INT,STRING,FLOAT,BIGINT,STRING,BOOLEAN
 ====
 ---- QUERY
 update tdata set name='he' where id = 40
+---- RUNTIME_PROFILE
+row_regex: .*NumModifiedRows: 1.*
 ---- RESULTS
 ====
 ---- QUERY
@@ -152,6 +170,8 @@ INT,STRING,FLOAT,BIGINT,STRING,BOOLEAN
 insert into tdata values (320, '', 2.0, 932, cast('' as VARCHAR(20)), false)
 ---- RESULTS
 : 1
+---- RUNTIME_PROFILE
+row_regex: .*NumModifiedRows: 1.*
 ====
 ---- QUERY
 select id, name, valv, valb from tdata where id = 320;
@@ -169,6 +189,10 @@ create table ignore_column_case (Id int, NAME string, vAlf float, vali bigint,
 ====
 ---- QUERY
 insert into ignore_column_case values (1, 'Martin', 1.0, 10);
+---- RESULTS
+: 1
+---- RUNTIME_PROFILE
+row_regex: .*NumModifiedRows: 1.*
 ====
 ---- QUERY
 select ID, nAmE, VALF, VALI from ignore_column_case where NaMe = 'Martin';
@@ -182,36 +206,44 @@ insert into tdata values
 (666, "The Devil", cast(1.2 as float), 43, cast('z' as VARCHAR(20)), true)
 ---- RESULTS
 : 1
+---- RUNTIME_PROFILE
+row_regex: .*NumModifiedRows: 1.*
 ====
 ---- QUERY
 insert into tdata values
 (666, "The Devil", cast(1.2 as float), 43, cast('z' as VARCHAR(20)), true)
 ---- CATCH
-Error while flushing Kudu session:
+Kudu error(s) reported, first error: Already present
 ====
 ---- QUERY
 insert ignore into tdata values
 (666, "The Devil", cast(1.2 as float), 43, cast('z' as VARCHAR(20)), true)
 ---- RESULTS
 : 0
+---- RUNTIME_PROFILE
+row_regex: .*NumModifiedRows: 0.*
 ====
 ---- QUERY
--- Updating the same record twice
+-- Updating the same record many times: cross join produces 7 identical updates
 update a set a.name='Satan' from tdata a, tdata b where a.id = 666
 ---- RESULTS
+---- RUNTIME_PROFILE
+row_regex: .*NumModifiedRows: 7.*
 ====
 ---- QUERY
--- Does not exercise any error path in the sink because updating the same record twice
--- is valid. Makes sure IGNORE works.
+-- Does not exercise any error path in the sink because updating the same record multiple
+-- times is valid. Makes sure IGNORE works.
 update ignore a set a.name='Satan' from tdata a, tdata b where a.id = 666
 ---- RESULTS
+---- RUNTIME_PROFILE
+row_regex: .*NumModifiedRows: 7.*
 ====
 ---- QUERY
 -- Using a cross join to generate the same delete twice. After the first delete succeeded,
 -- trying to execute the second delete will fail because the record does not exist.
 delete a from tdata a, tdata b where a.id = 666
 ---- CATCH
-Error while flushing Kudu session:
+Kudu error(s) reported, first error: Not found: key not found
 ====
 ---- QUERY
 -- Re-insert the data
@@ -223,6 +255,8 @@ insert into tdata values
 ---- QUERY
 delete ignore a from tdata a, tdata b where a.id = 666
 ---- RESULTS
+---- RUNTIME_PROFILE
+row_regex: .*NumModifiedRows: 1.*
 ====
 ---- QUERY
 # IMPALA-3454: A delete that requires a rewrite may not get the Kudu column order correct
@@ -242,6 +276,8 @@ insert into impala_3454 values
 ---- QUERY
 delete from impala_3454 where key_1 < (select max(key_2) from impala_3454)
 ---- RESULTS
+---- RUNTIME_PROFILE
+row_regex: .*NumModifiedRows: 2.*
 ====
 ---- QUERY
 select * from impala_3454
@@ -250,3 +286,49 @@ select * from impala_3454
 ---- TYPES
 TINYINT,BIGINT
 ====
+---- QUERY
+CREATE TABLE kudu_test_tbl PRIMARY KEY(id)
+DISTRIBUTE BY RANGE(id) SPLIT ROWS ((100000000))
+STORED AS KUDU AS
+SELECT * FROM functional_kudu.alltypes WHERE id < 100;
+---- RESULTS
+'Inserted 100 row(s)'
+---- RUNTIME_PROFILE
+row_regex: .*NumModifiedRows: 100.*
+====
+---- QUERY
+INSERT IGNORE INTO kudu_test_tbl
+SELECT * FROM functional_kudu.alltypes WHERE id < 100;
+---- RESULTS
+: 0
+---- RUNTIME_PROFILE
+row_regex: .*NumModifiedRows: 0.*
+====
+---- QUERY
+INSERT INTO kudu_test_tbl
+SELECT * FROM functional_kudu.alltypes WHERE id < 100;
+---- CATCH
+Kudu error(s) reported, first error: Already present: key already present
+====
+---- QUERY
+INSERT IGNORE INTO kudu_test_tbl
+SELECT * FROM functional_kudu.alltypes;
+---- RESULTS
+: 7200
+---- RUNTIME_PROFILE
+row_regex: .*NumModifiedRows: 7200.*
+====
+---- QUERY
+# Test a larger UPDATE
+UPDATE kudu_test_tbl SET int_col = -1;
+---- RESULTS
+---- RUNTIME_PROFILE
+row_regex: .*NumModifiedRows: 7300.*
+====
+---- QUERY
+# Test a larger DELETE
+DELETE FROM kudu_test_tbl WHERE id > -1;
+---- RESULTS
+---- RUNTIME_PROFILE
+row_regex: .*NumModifiedRows: 7300.*
+====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/99ed6dc6/tests/beeswax/impala_beeswax.py
----------------------------------------------------------------------
diff --git a/tests/beeswax/impala_beeswax.py b/tests/beeswax/impala_beeswax.py
index e0f5d55..7ed411e 100644
--- a/tests/beeswax/impala_beeswax.py
+++ b/tests/beeswax/impala_beeswax.py
@@ -416,8 +416,8 @@ class ImpalaBeeswaxClient(object):
     """Executes an insert query"""
     result = self.__do_rpc(lambda: self.imp_service.CloseInsert(handle))
     # The insert was successful
-    num_rows = sum(map(int, result.rows_appended.values()))
-    data = ["%s: %s" % row for row in result.rows_appended.iteritems()]
+    num_rows = sum(map(int, result.rows_modified.values()))
+    data = ["%s: %s" % row for row in result.rows_modified.iteritems()]
     exec_result = ImpalaBeeswaxResult(success=True, data=data)
     exec_result.summary = "Inserted %d rows" % (num_rows,)
     return exec_result