You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by mi...@apache.org on 2017/11/28 18:41:09 UTC

[2/8] incubator-impala git commit: IMPALA-4591: Bound Kudu client error mem usage

IMPALA-4591: Bound Kudu client error mem usage

Previously, Kudu client errors could grow in size unbounded,
potentially causing the process to be killed. This patch sets a
bound on the mem that can be used for these error messages, with
the size determined by the flag 'kudu_error_buffer_size'.

If the errors for a Kudu client exceed this size, the query will fail,
as some errors will be dropped and we won't be able to tell if all of
the errors can be safely ignored.

Testing:
- Added a custom cluster test that verifies that a query that exceeds
  the limit fails.

Change-Id: I186ddb3f3b5865e08f17dba57cf6640591d06b14
Reviewed-on: http://gerrit.cloudera.org:8080/8464
Reviewed-by: Thomas Tauber-Marshall <tm...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/abd9b0e7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/abd9b0e7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/abd9b0e7

Branch: refs/heads/master
Commit: abd9b0e70a3793405af8ad72259e4c7196804a47
Parents: 628f19e
Author: Thomas Tauber-Marshall <tm...@cloudera.com>
Authored: Fri Nov 3 14:51:53 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Mon Nov 27 22:28:37 2017 +0000

----------------------------------------------------------------------
 be/src/exec/kudu-table-sink.cc    | 50 ++++++++++++++++------------------
 be/src/exec/kudu-table-sink.h     |  3 ++
 tests/custom_cluster/test_kudu.py | 18 ++++++++++++
 3 files changed, 45 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/abd9b0e7/be/src/exec/kudu-table-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-table-sink.cc b/be/src/exec/kudu-table-sink.cc
index c5abf7b..6faeb7a 100644
--- a/be/src/exec/kudu-table-sink.cc
+++ b/be/src/exec/kudu-table-sink.cc
@@ -34,24 +34,19 @@
 #include "common/names.h"
 
 #define DEFAULT_KUDU_MUTATION_BUFFER_SIZE 10 * 1024 * 1024
+#define DEFAULT_KUDU_ERROR_BUFFER_SIZE 10 * 1024 * 1024
 
 DEFINE_int32(kudu_mutation_buffer_size, DEFAULT_KUDU_MUTATION_BUFFER_SIZE,
     "The size (bytes) of the Kudu client buffer for mutations.");
 
-// The memory (bytes) that this node needs to consume in order to operate. This is
-// necessary because the KuduClient allocates non-trivial amounts of untracked memory,
-// and is potentially unbounded due to how Kudu's async error reporting works.
-// Until Kudu's client memory usage can be bounded (KUDU-1752), we estimate that 2x the
-// mutation buffer size is enough memory, and that seems to provide acceptable results in
+// We estimate that 10MB is enough memory, and that seems to provide acceptable results in
 // testing. This is still exposed as a flag for now though, because it may be possible
 // that in some cases this is always too high (in which case tracked mem >> RSS and the
 // memory is underutilized), or this may not be high enough (e.g. we underestimate the
-// size of error strings, and RSS grows until the process is killed).
-// TODO: Handle DML w/ small or known resource requirements (e.g. VALUES specified or
-// query has LIMIT) specially to avoid over-consumption.
-DEFINE_int32(kudu_sink_mem_required, 2 * DEFAULT_KUDU_MUTATION_BUFFER_SIZE,
-    "(Advanced) The memory required (bytes) for a KuduTableSink. The default value is "
-    " 2x the kudu_mutation_buffer_size. This flag is subject to change or removal.");
+// size of error strings, and queries are failed).
+DEFINE_int32(kudu_error_buffer_size, DEFAULT_KUDU_ERROR_BUFFER_SIZE,
+    "The size (bytes) of the Kudu client buffer for returning errors, with a min of 1KB."
+    "If the actual errors exceed this size the query will fail.");
 
 DECLARE_int32(kudu_operation_timeout_ms);
 
@@ -76,7 +71,8 @@ KuduTableSink::KuduTableSink(const RowDescriptor* row_desc, const TDataSink& tsi
   : DataSink(row_desc),
     table_id_(tsink.table_sink.target_table_id),
     sink_action_(tsink.table_sink.action),
-    kudu_table_sink_(tsink.table_sink.kudu_table_sink) {
+    kudu_table_sink_(tsink.table_sink.kudu_table_sink),
+    client_tracked_bytes_(0) {
   DCHECK(tsink.__isset.table_sink);
   DCHECK(KuduIsAvailable());
 }
@@ -120,21 +116,20 @@ Status KuduTableSink::Prepare(RuntimeState* state, MemTracker* parent_mem_tracke
 Status KuduTableSink::Open(RuntimeState* state) {
   RETURN_IF_ERROR(DataSink::Open(state));
 
-  int64_t required_mem = FLAGS_kudu_sink_mem_required;
+  // Account for the memory used by the Kudu client. This is necessary because the
+  // KuduClient allocates non-trivial amounts of untracked memory,
+  // TODO: Handle DML w/ small or known resource requirements (e.g. VALUES specified or
+  // query has LIMIT) specially to avoid over-consumption.
+  int64_t error_buffer_size = max<int64_t>(1024, FLAGS_kudu_error_buffer_size);
+  int64_t required_mem = FLAGS_kudu_mutation_buffer_size + error_buffer_size;
   if (!mem_tracker_->TryConsume(required_mem)) {
     return mem_tracker_->MemLimitExceeded(state,
         "Could not allocate memory for KuduTableSink", required_mem);
   }
+  client_tracked_bytes_ = required_mem;
 
-  Status s =
-      state->exec_env()->GetKuduClient(table_desc_->kudu_master_addresses(), &client_);
-  if (!s.ok()) {
-    // Close() releases memory if client_ is not NULL, but since the memory was consumed
-    // and the client failed to be created, it must be released.
-    DCHECK(client_ == nullptr);
-    mem_tracker_->Release(required_mem);
-    return s;
-  }
+  RETURN_IF_ERROR(
+      state->exec_env()->GetKuduClient(table_desc_->kudu_master_addresses(), &client_));
   KUDU_RETURN_IF_ERROR(client_->OpenTable(table_desc_->table_name(), &table_),
       "Unable to open Kudu table");
 
@@ -194,6 +189,10 @@ Status KuduTableSink::Open(RuntimeState* state) {
   // number of these buffers; there are a few ways to accomplish similar behaviors.
   KUDU_RETURN_IF_ERROR(session_->SetMutationBufferMaxNum(0),
       "Couldn't set mutation buffer count");
+
+  KUDU_RETURN_IF_ERROR(session_->SetErrorBufferSpace(error_buffer_size),
+      "Failed to set error buffer space");
+
   return Status::OK();
 }
 
@@ -347,10 +346,9 @@ Status KuduTableSink::FlushFinal(RuntimeState* state) {
 
 void KuduTableSink::Close(RuntimeState* state) {
   if (closed_) return;
-  if (client_ != nullptr) {
-    mem_tracker_->Release(FLAGS_kudu_sink_mem_required);
-    client_ = nullptr;
-  }
+  session_.reset();
+  mem_tracker_->Release(client_tracked_bytes_);
+  client_ = nullptr;
   SCOPED_TIMER(profile()->total_time_counter());
   DataSink::Close(state);
   closed_ = true;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/abd9b0e7/be/src/exec/kudu-table-sink.h
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-table-sink.h b/be/src/exec/kudu-table-sink.h
index 06f5f96..50ba99c 100644
--- a/be/src/exec/kudu-table-sink.h
+++ b/be/src/exec/kudu-table-sink.h
@@ -102,6 +102,9 @@ class KuduTableSink : public DataSink {
   /// Captures parameters passed down from the frontend
   TKuduTableSink kudu_table_sink_;
 
+  /// The amount consumed from 'mem_tracker_' to account for the mem used by 'client_'.
+  int64_t client_tracked_bytes_;
+
   /// Time spent applying Kudu operations. In normal circumstances, Apply() should be
   /// negligible because it is asynchronous with AUTO_FLUSH_BACKGROUND enabled.
   /// Significant time spent in Apply() may indicate that Kudu cannot buffer and send

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/abd9b0e7/tests/custom_cluster/test_kudu.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_kudu.py b/tests/custom_cluster/test_kudu.py
index 5b447e2..7563236 100644
--- a/tests/custom_cluster/test_kudu.py
+++ b/tests/custom_cluster/test_kudu.py
@@ -62,6 +62,24 @@ class TestKuduOperations(CustomClusterTestSuite, KuduTestSuite):
           """ % (table_name, KUDU_MASTER_HOSTS, kudu_table.name))
       cursor.execute("DROP TABLE %s" % table_name)
 
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(impalad_args="-kudu_error_buffer_size=1024")
+  def test_error_buffer_size(self, cursor, unique_database):
+    """Check that queries fail if the size of the Kudu client errors they generate is
+    greater than kudu_error_buffer_size."""
+    table_name = "%s.test_error_buffer_size" % unique_database
+    cursor.execute("create table %s (a bigint primary key) stored as kudu" % table_name)
+    # Insert a large number of a constant value into the table to generate many "Key
+    # already present" errors. 50 errors should fit inside the 1024 byte limit.
+    cursor.execute(
+        "insert into %s select 1 from functional.alltypes limit 50" % table_name)
+    try:
+      # 200 errors should overflow the 1024 byte limit.
+      cursor.execute(
+          "insert into %s select 1 from functional.alltypes limit 200" % table_name)
+      assert False, "Expected: 'Error overflow in Kudu session.'"
+    except Exception as e:
+      assert "Error overflow in Kudu session." in str(e)
 
 class TestKuduClientTimeout(CustomClusterTestSuite, KuduTestSuite):
   """Kudu tests that set the Kudu client operation timeout to 1ms and expect