You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by mi...@apache.org on 2018/02/01 16:46:07 UTC

[1/4] impala git commit: IMPALA-6454: CTAS into Kudu fails with mixed-case partition or primary key column names.

Repository: impala
Updated Branches:
  refs/heads/master 999a3f60c -> 3bfda3348


IMPALA-6454: CTAS into Kudu fails with mixed-case partition or primary key column names.

CTAS into Kudu fails if the primary key and/or the partition column names are not
specified in lower case.The problem is that we pass in the primary key column names
directly from the parser instead we should be passing the post-analysis ColumnDefs
as primary keys. So it is fixed by making changes to createCtasTarget()
in KuduTable class to take a list of ColumnDef class associated with the primary keys
from getPrimaryKeyColumnDefs() in CreateTableStmt class.

ColumnDef class has column names stored in lower case that are used by createCtasTarget()
to populate primaryKeyColumnNames_ in KuduTable class that resolves the issue.

Testing
-------
Verified against the newly added test case that reproduces the issue without the fix.

Change-Id: Ica1c8ec1544339e9e80733a7a0c78594e0a727d2
Reviewed-on: http://gerrit.cloudera.org:8080/9147
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/99234d21
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/99234d21
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/99234d21

Branch: refs/heads/master
Commit: 99234d2121f7dd189c0ff2d9f7ad6c7d0c62c5dd
Parents: 999a3f6
Author: Pranay <ps...@cloudera.com>
Authored: Fri Jan 26 18:36:59 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Feb 1 04:34:33 2018 +0000

----------------------------------------------------------------------
 .../org/apache/impala/analysis/CreateTableAsSelectStmt.java  | 2 +-
 .../java/org/apache/impala/analysis/CreateTableStmt.java     | 2 +-
 fe/src/main/java/org/apache/impala/catalog/KuduTable.java    | 8 +++++---
 .../test/java/org/apache/impala/analysis/AnalyzeDDLTest.java | 7 +++++++
 4 files changed, 14 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/99234d21/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java
index ba9a02a..e050d06 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java
@@ -191,7 +191,7 @@ public class CreateTableAsSelectStmt extends StatementBase {
       Table tmpTable = null;
       if (KuduTable.isKuduTable(msTbl)) {
         tmpTable = KuduTable.createCtasTarget(db, msTbl, createStmt_.getColumnDefs(),
-            createStmt_.getTblPrimaryKeyColumnNames(),
+            createStmt_.getPrimaryKeyColumnDefs(),
             createStmt_.getKuduPartitionParams());
       } else {
         // TODO: Creating a tmp table using load() is confusing.

http://git-wip-us.apache.org/repos/asf/impala/blob/99234d21/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
index b4d59e8..2e5425a 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
@@ -94,7 +94,7 @@ public class CreateTableStmt extends StatementBase {
     getColumnDefs().clear();
     getColumnDefs().addAll(colDefs);
   }
-  private List<ColumnDef> getPrimaryKeyColumnDefs() {
+  public List<ColumnDef> getPrimaryKeyColumnDefs() {
     return tableDef_.getPrimaryKeyColumnDefs();
   }
   public boolean isExternal() { return tableDef_.isExternal(); }

http://git-wip-us.apache.org/repos/asf/impala/blob/99234d21/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
index e9e1617..8296ed0 100644
--- a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
@@ -107,7 +107,7 @@ public class KuduTable extends Table {
   // Comma separated list of Kudu master hosts with optional ports.
   private String kuduMasters_;
 
-  // Primary key column names.
+  // Primary key column names, the column names are all in lower case.
   private final List<String> primaryKeyColumnNames_ = Lists.newArrayList();
 
   // Partitioning schemes of this Kudu table. Both range and hash-based partitioning are
@@ -318,13 +318,15 @@ public class KuduTable extends Table {
    */
   public static KuduTable createCtasTarget(Db db,
       org.apache.hadoop.hive.metastore.api.Table msTbl, List<ColumnDef> columnDefs,
-      List<String> primaryKeyColumnNames, List<KuduPartitionParam> partitionParams) {
+      List<ColumnDef> primaryKeyColumnDefs, List<KuduPartitionParam> partitionParams) {
     KuduTable tmpTable = new KuduTable(msTbl, db, msTbl.getTableName(), msTbl.getOwner());
     int pos = 0;
     for (ColumnDef colDef: columnDefs) {
       tmpTable.addColumn(new Column(colDef.getColName(), colDef.getType(), pos++));
     }
-    tmpTable.primaryKeyColumnNames_.addAll(primaryKeyColumnNames);
+    for (ColumnDef pkColDef: primaryKeyColumnDefs) {
+      tmpTable.primaryKeyColumnNames_.add(pkColDef.getColName());
+    }
     tmpTable.partitionBy_.addAll(partitionParams);
     return tmpTable;
   }

http://git-wip-us.apache.org/repos/asf/impala/blob/99234d21/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
index 3083f1f..122b49d 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
@@ -1733,6 +1733,13 @@ public class AnalyzeDDLTest extends FrontendTestBase {
         " stored as kudu as select id, a from functional.complextypes_fileformat",
         "Expr 'a' in select list returns a complex type 'ARRAY<INT>'.\n" +
         "Only scalar types are allowed in the select list.");
+
+    // IMPALA-6454: CTAS into Kudu tables with primary key specified in upper case.
+    AnalyzesOk("create table part_kudu_tbl primary key(INT_COL, SMALLINT_COL, ID)" +
+        " partition by hash(INT_COL, SMALLINT_COL, ID) PARTITIONS 2" +
+        " stored as kudu as SELECT INT_COL, SMALLINT_COL, ID, BIGINT_COL," +
+        " DATE_STRING_COL, STRING_COL, TIMESTAMP_COL, YEAR, MONTH FROM " +
+        " functional.alltypes");
   }
 
   @Test


[4/4] impala git commit: IMPALA-6193: Track memory of incoming data streams

Posted by mi...@apache.org.
IMPALA-6193: Track memory of incoming data streams

This change adds memory tracking to incoming transmit data RPCs when
using KRPC. We track memory against a global tracker called "Data Stream
Service" until it is handed over to the stream manager. There we track
it in a global tracker called "Data Stream Queued RPC Calls" until a
receiver registers and takes over the early sender RPCs. Inside the
receiver, memory for deferred RPCs is tracked against the fragment
instance's memtracker until we unpack the batches and add them to the
row batch queue.

The DCHECK in MemTracker::Close() covers that all memory consumed by a
tracker gets release eventually. In addition to that, this change adds a
custom cluster test that makes sure that queued memory gets tracked by
inspecting the peak consumption of the new memtrackers.

Change-Id: I2df1204d2483313a8a18e5e3be6cec9e402614c4
Reviewed-on: http://gerrit.cloudera.org:8080/8914
Reviewed-by: Lars Volker <lv...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/3bfda334
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/3bfda334
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/3bfda334

Branch: refs/heads/master
Commit: 3bfda3348740e0951cbf8f60cde70cc4d1391c5e
Parents: acfd169
Author: Lars Volker <lv...@cloudera.com>
Authored: Tue Jan 16 16:03:42 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Feb 1 08:53:36 2018 +0000

----------------------------------------------------------------------
 be/src/rpc/impala-service-pool.cc           | 48 +++++++-----
 be/src/rpc/impala-service-pool.h            | 16 +++-
 be/src/rpc/rpc-mgr-test.cc                  | 49 ++++++++----
 be/src/rpc/rpc-mgr.cc                       |  4 +-
 be/src/rpc/rpc-mgr.h                        |  3 +-
 be/src/runtime/exec-env.cc                  | 46 +++++++----
 be/src/runtime/krpc-data-stream-mgr.cc      | 44 ++++++++---
 be/src/runtime/krpc-data-stream-mgr.h       | 23 +++++-
 be/src/runtime/krpc-data-stream-recvr.cc    | 25 ++++--
 be/src/runtime/mem-tracker.h                |  4 +-
 be/src/util/memory-metrics.h                |  2 +-
 common/protobuf/data_stream_service.proto   |  2 +-
 tests/custom_cluster/test_krpc_mem_usage.py | 98 ++++++++++++++++++++++++
 tests/verifiers/mem_usage_verifier.py       | 70 +++++++++++++++++
 14 files changed, 350 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/3bfda334/be/src/rpc/impala-service-pool.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/impala-service-pool.cc b/be/src/rpc/impala-service-pool.cc
index 3b1c02d..34a3960 100644
--- a/be/src/rpc/impala-service-pool.cc
+++ b/be/src/rpc/impala-service-pool.cc
@@ -34,6 +34,7 @@
 #include "kudu/util/metrics.h"
 #include "kudu/util/status.h"
 #include "kudu/util/trace.h"
+#include "runtime/mem-tracker.h"
 
 #include "common/names.h"
 #include "common/status.h"
@@ -46,13 +47,15 @@ METRIC_DEFINE_histogram(server, impala_unused,
 
 namespace impala {
 
-ImpalaServicePool::ImpalaServicePool(std::unique_ptr<kudu::rpc::ServiceIf> service,
+ImpalaServicePool::ImpalaServicePool(MemTracker* mem_tracker,
+                         std::unique_ptr<kudu::rpc::ServiceIf> service,
                          const scoped_refptr<kudu::MetricEntity>& entity,
                          size_t service_queue_length)
-  : service_(std::move(service)),
+  : mem_tracker_(mem_tracker),
+    service_(std::move(service)),
     service_queue_(service_queue_length),
     unused_histogram_(METRIC_impala_unused.Instantiate(entity)) {
-
+  DCHECK(mem_tracker_ != nullptr);
 }
 
 ImpalaServicePool::~ImpalaServicePool() {
@@ -84,8 +87,8 @@ void ImpalaServicePool::Shutdown() {
   kudu::Status status = kudu::Status::ServiceUnavailable("Service is shutting down");
   std::unique_ptr<kudu::rpc::InboundCall> incoming;
   while (service_queue_.BlockingGet(&incoming)) {
-    incoming.release()->RespondFailure(
-        kudu::rpc::ErrorStatusPB::FATAL_SERVER_SHUTTING_DOWN, status);
+    FailAndReleaseRpc(kudu::rpc::ErrorStatusPB::FATAL_SERVER_SHUTTING_DOWN, status,
+        incoming.release());
   }
 
   service_->Shutdown();
@@ -100,12 +103,20 @@ void ImpalaServicePool::RejectTooBusy(kudu::rpc::InboundCall* c) {
                  c->remote_address().ToString(),
                  service_queue_.max_size());
   rpcs_queue_overflow_.Add(1);
-  c->RespondFailure(kudu::rpc::ErrorStatusPB::ERROR_SERVER_TOO_BUSY,
-                    kudu::Status::ServiceUnavailable(err_msg));
+  FailAndReleaseRpc(kudu::rpc::ErrorStatusPB::ERROR_SERVER_TOO_BUSY,
+                    kudu::Status::ServiceUnavailable(err_msg), c);
   VLOG(1) << err_msg << " Contents of service queue:\n"
           << service_queue_.ToString();
 }
 
+void ImpalaServicePool::FailAndReleaseRpc(
+    const kudu::rpc::ErrorStatusPB::RpcErrorCodePB& error_code,
+    const kudu::Status& status, kudu::rpc::InboundCall* call) {
+  int64_t transfer_size = call->GetTransferSize();
+  call->RespondFailure(error_code, status);
+  mem_tracker_->Release(transfer_size);
+}
+
 kudu::rpc::RpcMethodInfo* ImpalaServicePool::LookupMethod(
     const kudu::rpc::RemoteMethod& method) {
   return service_->LookupMethod(method);
@@ -124,15 +135,16 @@ kudu::Status ImpalaServicePool::QueueInboundCall(
 
   if (!unsupported_features.empty()) {
     c->RespondUnsupportedFeature(unsupported_features);
-    return kudu::Status::NotSupported("call requires unsupported application feature flags",
-                                JoinMapped(unsupported_features,
-                                           [] (uint32_t flag) { return std::to_string(flag); },
-                                           ", "));
+    return kudu::Status::NotSupported(
+        "call requires unsupported application feature flags",
+        JoinMapped(unsupported_features,
+        [] (uint32_t flag) { return std::to_string(flag); }, ", "));
   }
 
   TRACE_TO(c->trace(), "Inserting onto call queue"); // NOLINT(*)
 
   // Queue message on service queue
+  mem_tracker_->Consume(c->GetTransferSize());
   boost::optional<kudu::rpc::InboundCall*> evicted;
   auto queue_status = service_queue_.Put(c, &evicted);
   if (queue_status == kudu::rpc::QueueStatus::QUEUE_FULL) {
@@ -154,11 +166,11 @@ kudu::Status ImpalaServicePool::QueueInboundCall(
   kudu::Status status = kudu::Status::OK();
   if (queue_status == kudu::rpc::QueueStatus::QUEUE_SHUTDOWN) {
     status = kudu::Status::ServiceUnavailable("Service is shutting down");
-    c->RespondFailure(kudu::rpc::ErrorStatusPB::FATAL_SERVER_SHUTTING_DOWN, status);
+    FailAndReleaseRpc(kudu::rpc::ErrorStatusPB::FATAL_SERVER_SHUTTING_DOWN, status, c);
   } else {
     status = kudu::Status::RuntimeError(
         Substitute("Unknown error from BlockingQueue: $0", queue_status));
-    c->RespondFailure(kudu::rpc::ErrorStatusPB::FATAL_UNKNOWN, status);
+    FailAndReleaseRpc(kudu::rpc::ErrorStatusPB::FATAL_UNKNOWN, status, c);
   }
   return status;
 }
@@ -181,13 +193,9 @@ void ImpalaServicePool::RunThread() {
 
       // Respond as a failure, even though the client will probably ignore
       // the response anyway.
-      incoming->RespondFailure(
-        kudu::rpc::ErrorStatusPB::ERROR_SERVER_TOO_BUSY,
-        kudu::Status::TimedOut("Call waited in the queue past client deadline"));
-
-      // Must release since RespondFailure above ends up taking ownership
-      // of the object.
-      ignore_result(incoming.release());
+      FailAndReleaseRpc(kudu::rpc::ErrorStatusPB::ERROR_SERVER_TOO_BUSY,
+          kudu::Status::TimedOut("Call waited in the queue past client deadline"),
+          incoming.release());
       continue;
     }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/3bfda334/be/src/rpc/impala-service-pool.h
----------------------------------------------------------------------
diff --git a/be/src/rpc/impala-service-pool.h b/be/src/rpc/impala-service-pool.h
index 93f2972..fe70686 100644
--- a/be/src/rpc/impala-service-pool.h
+++ b/be/src/rpc/impala-service-pool.h
@@ -31,14 +31,16 @@
 #include "util/thread.h"
 
 namespace impala {
+class MemTracker;
 
 // A pool of threads that handle new incoming RPC calls.
 // Also includes a queue that calls get pushed onto for handling by the pool.
 class ImpalaServicePool : public kudu::rpc::RpcService {
  public:
-  ImpalaServicePool(std::unique_ptr<kudu::rpc::ServiceIf> service,
-              const scoped_refptr<kudu::MetricEntity>& metric_entity,
-              size_t service_queue_length);
+  ImpalaServicePool(MemTracker* mem_tracker,
+      std::unique_ptr<kudu::rpc::ServiceIf> service,
+      const scoped_refptr<kudu::MetricEntity>& metric_entity,
+      size_t service_queue_length);
   virtual ~ImpalaServicePool();
 
   // Start up the thread pool.
@@ -58,6 +60,14 @@ class ImpalaServicePool : public kudu::rpc::RpcService {
   void RunThread();
   void RejectTooBusy(kudu::rpc::InboundCall* c);
 
+  // Respond with failure to the incoming call in 'call' with 'error_code' and 'status'
+  // and release the payload memory from 'mem_tracker_'. Takes ownership of 'call'.
+  void FailAndReleaseRpc(const kudu::rpc::ErrorStatusPB::RpcErrorCodePB& error_code,
+      const kudu::Status& status, kudu::rpc::InboundCall* call);
+
+  // Tracks memory of inbound calls in 'service_queue_'.
+  MemTracker* const mem_tracker_;
+
   std::unique_ptr<kudu::rpc::ServiceIf> service_;
   std::vector<std::unique_ptr<Thread> > threads_;
   kudu::rpc::LifoServiceQueue service_queue_;

http://git-wip-us.apache.org/repos/asf/impala/blob/3bfda334/be/src/rpc/rpc-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr-test.cc b/be/src/rpc/rpc-mgr-test.cc
index 7effda9..c525148 100644
--- a/be/src/rpc/rpc-mgr-test.cc
+++ b/be/src/rpc/rpc-mgr-test.cc
@@ -26,6 +26,7 @@
 #include "kudu/util/monotime.h"
 #include "kudu/util/status.h"
 #include "rpc/auth-provider.h"
+#include "runtime/mem-tracker.h"
 #include "testutil/gtest-util.h"
 #include "testutil/mini-kdc-wrapper.h"
 #include "testutil/scoped-flag-setter.h"
@@ -110,6 +111,8 @@ template <class T> class RpcMgrTestBase : public T {
     request->set_sidecar_idx(idx);
   }
 
+  MemTracker* service_tracker() { return &service_tracker_; }
+
  protected:
   TNetworkAddress krpc_address_;
   RpcMgr rpc_mgr_;
@@ -127,6 +130,7 @@ template <class T> class RpcMgrTestBase : public T {
 
  private:
   int32_t payload_[PAYLOAD_SIZE];
+  MemTracker service_tracker_;
 };
 
 // For tests that do not require kerberized testing, we use RpcTest.
@@ -172,25 +176,28 @@ class PingServiceImpl : public PingServiceIf {
  public:
   // 'cb' is a callback used by tests to inject custom behaviour into the RPC handler.
   PingServiceImpl(const scoped_refptr<kudu::MetricEntity>& entity,
-      const scoped_refptr<kudu::rpc::ResultTracker> tracker,
+      const scoped_refptr<kudu::rpc::ResultTracker> tracker, MemTracker* mem_tracker,
       ServiceCB cb = [](RpcContext* ctx) { ctx->RespondSuccess(); })
-    : PingServiceIf(entity, tracker), cb_(cb) {}
+    : PingServiceIf(entity, tracker), mem_tracker_(mem_tracker), cb_(cb) {}
 
   virtual void Ping(
       const PingRequestPB* request, PingResponsePB* response, RpcContext* context) {
     response->set_int_response(42);
+    // Incoming requests will already be tracked and we need to release the memory.
+    mem_tracker_->Release(context->GetTransferSize());
     cb_(context);
   }
 
  private:
+  MemTracker* mem_tracker_;
   ServiceCB cb_;
 };
 
 class ScanMemServiceImpl : public ScanMemServiceIf {
  public:
   ScanMemServiceImpl(const scoped_refptr<kudu::MetricEntity>& entity,
-      const scoped_refptr<kudu::rpc::ResultTracker> tracker)
-    : ScanMemServiceIf(entity, tracker) {
+      const scoped_refptr<kudu::rpc::ResultTracker> tracker, MemTracker* mem_tracker)
+    : ScanMemServiceIf(entity, tracker), mem_tracker_(mem_tracker) {
   }
 
   // The request comes with an int 'pattern' and a payload of int array sent with
@@ -207,13 +214,20 @@ class ScanMemServiceImpl : public ScanMemServiceIf {
     for (int i = 0; i < payload.size() / sizeof(int32_t); ++i) {
       int32_t val = v[i];
       if (val != pattern) {
+        // Incoming requests will already be tracked and we need to release the memory.
+        mem_tracker_->Release(context->GetTransferSize());
         context->RespondFailure(kudu::Status::Corruption(
             Substitute("Expecting $1; Found $2", pattern, val)));
         return;
       }
     }
+    // Incoming requests will already be tracked and we need to release the memory.
+    mem_tracker_->Release(context->GetTransferSize());
     context->RespondSuccess();
   }
+
+ private:
+  MemTracker* mem_tracker_;
 };
 
 // TODO: USE_KUDU_KERBEROS and USE_IMPALA_KERBEROS are disabled due to IMPALA-6448.
@@ -225,16 +239,17 @@ INSTANTIATE_TEST_CASE_P(KerberosOnAndOff,
 template <class T>
 Status RunMultipleServicesTestTemplate(RpcMgrTestBase<T>* test_base,
     RpcMgr* rpc_mgr, const TNetworkAddress& krpc_address) {
+  MemTracker* mem_tracker = test_base->service_tracker();
   // Test that a service can be started, and will respond to requests.
-  unique_ptr<ServiceIf> ping_impl(
-      new PingServiceImpl(rpc_mgr->metric_entity(), rpc_mgr->result_tracker()));
-  RETURN_IF_ERROR(rpc_mgr->RegisterService(10, 10, move(ping_impl)));
+  unique_ptr<ServiceIf> ping_impl(new PingServiceImpl(rpc_mgr->metric_entity(),
+      rpc_mgr->result_tracker(), mem_tracker));
+  RETURN_IF_ERROR(rpc_mgr->RegisterService(10, 10, move(ping_impl), mem_tracker));
 
   // Test that a second service, that verifies the RPC payload is not corrupted,
   // can be started.
-  unique_ptr<ServiceIf> scan_mem_impl(
-      new ScanMemServiceImpl(rpc_mgr->metric_entity(), rpc_mgr->result_tracker()));
-  RETURN_IF_ERROR(rpc_mgr->RegisterService(10, 10, move(scan_mem_impl)));
+  unique_ptr<ServiceIf> scan_mem_impl(new ScanMemServiceImpl(rpc_mgr->metric_entity(),
+      rpc_mgr->result_tracker(), mem_tracker));
+  RETURN_IF_ERROR(rpc_mgr->RegisterService(10, 10, move(scan_mem_impl), mem_tracker));
 
   FLAGS_num_acceptor_threads = 2;
   FLAGS_num_reactor_threads = 10;
@@ -452,7 +467,6 @@ TEST_F(RpcMgrTest, ValidMultiCiphersTls) {
 }
 
 TEST_F(RpcMgrTest, SlowCallback) {
-
   // Use a callback which is slow to respond.
   auto slow_cb = [](RpcContext* ctx) {
     SleepForMs(300);
@@ -462,11 +476,12 @@ TEST_F(RpcMgrTest, SlowCallback) {
   // Test a service which is slow to respond and has a short queue.
   // Set a timeout on the client side. Expect either a client timeout
   // or the service queue filling up.
-  unique_ptr<ServiceIf> impl(
-      new PingServiceImpl(rpc_mgr_.metric_entity(), rpc_mgr_.result_tracker(), slow_cb));
+  unique_ptr<ServiceIf> impl(new PingServiceImpl(rpc_mgr_.metric_entity(),
+      rpc_mgr_.result_tracker(), service_tracker(), slow_cb));
   const int num_service_threads = 1;
   const int queue_size = 3;
-  ASSERT_OK(rpc_mgr_.RegisterService(num_service_threads, queue_size, move(impl)));
+  ASSERT_OK(rpc_mgr_.RegisterService(num_service_threads, queue_size, move(impl),
+      service_tracker()));
 
   FLAGS_num_acceptor_threads = 2;
   FLAGS_num_reactor_threads = 10;
@@ -487,9 +502,9 @@ TEST_F(RpcMgrTest, SlowCallback) {
 }
 
 TEST_F(RpcMgrTest, AsyncCall) {
-  unique_ptr<ServiceIf> scan_mem_impl(
-      new ScanMemServiceImpl(rpc_mgr_.metric_entity(), rpc_mgr_.result_tracker()));
-  ASSERT_OK(rpc_mgr_.RegisterService(10, 10, move(scan_mem_impl)));
+  unique_ptr<ServiceIf> scan_mem_impl(new ScanMemServiceImpl(rpc_mgr_.metric_entity(),
+      rpc_mgr_.result_tracker(), service_tracker()));
+  ASSERT_OK(rpc_mgr_.RegisterService(10, 10, move(scan_mem_impl), service_tracker()));
 
   unique_ptr<ScanMemServiceProxy> scan_mem_proxy;
   ASSERT_OK(rpc_mgr_.GetProxy<ScanMemServiceProxy>(krpc_address_, &scan_mem_proxy));

http://git-wip-us.apache.org/repos/asf/impala/blob/3bfda334/be/src/rpc/rpc-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr.cc b/be/src/rpc/rpc-mgr.cc
index c70c117..7adde36 100644
--- a/be/src/rpc/rpc-mgr.cc
+++ b/be/src/rpc/rpc-mgr.cc
@@ -114,11 +114,11 @@ Status RpcMgr::Init() {
 }
 
 Status RpcMgr::RegisterService(int32_t num_service_threads, int32_t service_queue_depth,
-    unique_ptr<ServiceIf> service_ptr) {
+    unique_ptr<ServiceIf> service_ptr, MemTracker* mem_tracker) {
   DCHECK(is_inited()) << "Must call Init() before RegisterService()";
   DCHECK(!services_started_) << "Cannot call RegisterService() after StartServices()";
   scoped_refptr<ImpalaServicePool> service_pool =
-      new ImpalaServicePool(std::move(service_ptr),
+      new ImpalaServicePool(mem_tracker, std::move(service_ptr),
           messenger_->metric_entity(), service_queue_depth);
   // Start the thread pool first before registering the service in case the startup fails.
   RETURN_IF_ERROR(service_pool->Init(num_service_threads));

http://git-wip-us.apache.org/repos/asf/impala/blob/3bfda334/be/src/rpc/rpc-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr.h b/be/src/rpc/rpc-mgr.h
index b2099f2..fc74c2e 100644
--- a/be/src/rpc/rpc-mgr.h
+++ b/be/src/rpc/rpc-mgr.h
@@ -124,7 +124,8 @@ class RpcMgr {
   ///
   /// It is an error to call this after StartServices() has been called.
   Status RegisterService(int32_t num_service_threads, int32_t service_queue_depth,
-      std::unique_ptr<kudu::rpc::ServiceIf> service_ptr) WARN_UNUSED_RESULT;
+      std::unique_ptr<kudu::rpc::ServiceIf> service_ptr, MemTracker* mem_tracker)
+      WARN_UNUSED_RESULT;
 
   /// Creates a new proxy for a remote service of type P at location 'address', and places
   /// it in 'proxy'. 'P' must descend from kudu::rpc::ServiceIf. Note that 'address' must

http://git-wip-us.apache.org/repos/asf/impala/blob/3bfda334/be/src/runtime/exec-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 0551848..1c3ab7a 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -301,23 +301,6 @@ Status ExecEnv::Init() {
   // Resolve hostname to IP address.
   RETURN_IF_ERROR(HostnameToIpAddr(backend_address_.hostname, &ip_address_));
 
-  // Initialize the RPCMgr before allowing services registration.
-  if (FLAGS_use_krpc) {
-    krpc_address_.__set_hostname(ip_address_);
-    RETURN_IF_ERROR(KrpcStreamMgr()->Init());
-    RETURN_IF_ERROR(rpc_mgr_->Init());
-    unique_ptr<ServiceIf> data_svc(new DataStreamService(rpc_mgr_.get()));
-    int num_svc_threads = FLAGS_datastream_service_num_svc_threads > 0 ?
-        FLAGS_datastream_service_num_svc_threads : CpuInfo::num_cores();
-    RETURN_IF_ERROR(rpc_mgr_->RegisterService(num_svc_threads,
-        FLAGS_datastream_service_queue_depth, move(data_svc)));
-    // Bump thread cache to 1GB to reduce contention for TCMalloc central
-    // list's spinlock.
-    if (FLAGS_tcmalloc_max_total_thread_cache_bytes == 0) {
-      FLAGS_tcmalloc_max_total_thread_cache_bytes = 1 << 30;
-    }
-  }
-
   mem_tracker_.reset(
       new MemTracker(AggregateMemoryMetrics::TOTAL_USED, bytes_limit, "Process"));
   // Add BufferPool MemTrackers for cached memory that is not tracked against queries
@@ -334,6 +317,35 @@ Status ExecEnv::Init() {
       BufferPoolMetric::UNUSED_RESERVATION_BYTES));
   obj_pool_->Add(new MemTracker(negated_unused_reservation, -1,
       "Buffer Pool: Unused Reservation", mem_tracker_.get()));
+
+  // Initialize the RPCMgr before allowing services registration.
+  if (FLAGS_use_krpc) {
+    krpc_address_.__set_hostname(ip_address_);
+    RETURN_IF_ERROR(rpc_mgr_->Init());
+
+    // Add a MemTracker for memory used to store incoming calls before they handed over to
+    // the data stream manager.
+    MemTracker* data_svc_tracker = obj_pool_->Add(
+        new MemTracker(-1, "Data Stream Service", mem_tracker_.get()));
+
+    // Add a MemTracker for the data stream manager, which uses it to track memory used by
+    // deferred RPC calls while they are buffered in the data stream manager.
+    MemTracker* stream_mgr_tracker = obj_pool_->Add(
+        new MemTracker(-1, "Data Stream Queued RPC Calls", mem_tracker_.get()));
+    RETURN_IF_ERROR(KrpcStreamMgr()->Init(stream_mgr_tracker, data_svc_tracker));
+
+    unique_ptr<ServiceIf> data_svc(new DataStreamService(rpc_mgr_.get()));
+    int num_svc_threads = FLAGS_datastream_service_num_svc_threads > 0 ?
+        FLAGS_datastream_service_num_svc_threads : CpuInfo::num_cores();
+    RETURN_IF_ERROR(rpc_mgr_->RegisterService(num_svc_threads,
+        FLAGS_datastream_service_queue_depth, move(data_svc), data_svc_tracker));
+    // Bump thread cache to 1GB to reduce contention for TCMalloc central
+    // list's spinlock.
+    if (FLAGS_tcmalloc_max_total_thread_cache_bytes == 0) {
+      FLAGS_tcmalloc_max_total_thread_cache_bytes = 1 << 30;
+    }
+  }
+
 #if !defined(ADDRESS_SANITIZER) && !defined(THREAD_SANITIZER)
   // Change the total TCMalloc thread cache size if necessary.
   if (FLAGS_tcmalloc_max_total_thread_cache_bytes > 0 &&

http://git-wip-us.apache.org/repos/asf/impala/blob/3bfda334/be/src/runtime/krpc-data-stream-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-mgr.cc b/be/src/runtime/krpc-data-stream-mgr.cc
index 86955c8..3f777ea 100644
--- a/be/src/runtime/krpc-data-stream-mgr.cc
+++ b/be/src/runtime/krpc-data-stream-mgr.cc
@@ -24,6 +24,8 @@
 
 #include "kudu/rpc/rpc_context.h"
 
+#include "exec/kudu-util.h"
+#include "runtime/mem-tracker.h"
 #include "runtime/krpc-data-stream-recvr.h"
 #include "runtime/raw-value.inline.h"
 #include "runtime/row-batch.h"
@@ -70,7 +72,10 @@ KrpcDataStreamMgr::KrpcDataStreamMgr(MetricGroup* metrics)
       "total-senders-timedout-waiting-for-recvr-creation", 0L);
 }
 
-Status KrpcDataStreamMgr::Init() {
+Status KrpcDataStreamMgr::Init(MemTracker* mem_tracker,
+    MemTracker* incoming_request_tracker) {
+  mem_tracker_ = mem_tracker;
+  incoming_request_tracker_ = incoming_request_tracker;
   RETURN_IF_ERROR(Thread::Create("krpc-data-stream-mgr", "maintenance",
       [this](){ this->Maintenance(); }, &maintenance_thread_));
   RETURN_IF_ERROR(deserialize_pool_.Init());
@@ -109,13 +114,15 @@ shared_ptr<DataStreamRecvrBase> KrpcDataStreamMgr::CreateRecvr(
       // Let the receiver take over the RPC payloads of early senders and process them
       // asynchronously.
       for (unique_ptr<TransmitDataCtx>& ctx : early_senders.waiting_sender_ctxs) {
+        // Release memory. The receiver will track it in its instance tracker.
+        int64_t transfer_size = ctx->rpc_context->GetTransferSize();
         recvr->TakeOverEarlySender(move(ctx));
+        mem_tracker_->Release(transfer_size);
         num_senders_waiting_->Increment(-1);
       }
       for (const unique_ptr<EndDataStreamCtx>& ctx : early_senders.closed_sender_ctxs) {
         recvr->RemoveSender(ctx->request->sender_id());
-        Status::OK().ToProto(ctx->response->mutable_status());
-        ctx->rpc_context->RespondSuccess();
+        RespondAndReleaseRpc(Status::OK(), ctx->response, ctx->rpc_context, mem_tracker_);
         num_senders_waiting_->Increment(-1);
       }
       early_senders_map_.erase(it);
@@ -150,6 +157,10 @@ shared_ptr<KrpcDataStreamRecvr> KrpcDataStreamMgr::FindRecvr(
 void KrpcDataStreamMgr::AddEarlySender(const TUniqueId& finst_id,
     const TransmitDataRequestPB* request, TransmitDataResponsePB* response,
     kudu::rpc::RpcContext* rpc_context) {
+  DCHECK_EQ(incoming_request_tracker_->parent(), mem_tracker_->parent());
+  incoming_request_tracker_->ReleaseLocal(
+      rpc_context->GetTransferSize(), mem_tracker_->parent());
+  mem_tracker_->ConsumeLocal(rpc_context->GetTransferSize(), mem_tracker_->parent());
   RecvrId recvr_id = make_pair(finst_id, request->dest_node_id());
   auto payload = make_unique<TransmitDataCtx>(request, response, rpc_context);
   early_senders_map_[recvr_id].waiting_sender_ctxs.emplace_back(move(payload));
@@ -160,6 +171,10 @@ void KrpcDataStreamMgr::AddEarlySender(const TUniqueId& finst_id,
 void KrpcDataStreamMgr::AddEarlyClosedSender(const TUniqueId& finst_id,
     const EndDataStreamRequestPB* request, EndDataStreamResponsePB* response,
     kudu::rpc::RpcContext* rpc_context) {
+  DCHECK_EQ(incoming_request_tracker_->parent(), mem_tracker_->parent());
+  incoming_request_tracker_->ReleaseLocal(
+      rpc_context->GetTransferSize(), mem_tracker_->parent());
+  mem_tracker_->ConsumeLocal(rpc_context->GetTransferSize(), mem_tracker_->parent());
   RecvrId recvr_id = make_pair(finst_id, request->dest_node_id());
   auto payload = make_unique<EndDataStreamCtx>(request, response, rpc_context);
   early_senders_map_[recvr_id].closed_sender_ctxs.emplace_back(move(payload));
@@ -199,12 +214,15 @@ void KrpcDataStreamMgr::AddData(const TransmitDataRequestPB* request,
     // detect this case by checking already_unregistered - if true then the receiver was
     // already closed deliberately, and there's no unexpected error here.
     ErrorMsg msg(TErrorCode::DATASTREAM_RECVR_CLOSED, PrintId(finst_id), dest_node_id);
-    Status::Expected(msg).ToProto(response->mutable_status());
-    rpc_context->RespondSuccess();
+    RespondAndReleaseRpc(Status::Expected(msg), response, rpc_context,
+        incoming_request_tracker_);
     return;
   }
   DCHECK(recvr != nullptr);
+  int64_t transfer_size = rpc_context->GetTransferSize();
   recvr->AddBatch(request, response, rpc_context);
+  // Release memory. The receiver already tracks it in its instance tracker.
+  incoming_request_tracker_->Release(transfer_size);
 }
 
 void KrpcDataStreamMgr::EnqueueDeserializeTask(const TUniqueId& finst_id,
@@ -252,8 +270,7 @@ void KrpcDataStreamMgr::CloseSender(const EndDataStreamRequestPB* request,
   // If we reach this point, either the receiver is found or it has been unregistered
   // already. In either cases, it's safe to just return an OK status.
   if (LIKELY(recvr != nullptr)) recvr->RemoveSender(request->sender_id());
-  Status::OK().ToProto(response->mutable_status());
-  rpc_context->RespondSuccess();
+  RespondAndReleaseRpc(Status::OK(), response, rpc_context, incoming_request_tracker_);
 
   {
     // TODO: Move this to maintenance thread.
@@ -338,12 +355,21 @@ void KrpcDataStreamMgr::RespondToTimedOutSender(const std::unique_ptr<ContextTyp
   ErrorMsg msg(TErrorCode::DATASTREAM_SENDER_TIMEOUT, remote_addr, PrintId(finst_id),
       ctx->request->dest_node_id());
   VLOG_QUERY << msg.msg();
-  Status::Expected(msg).ToProto(ctx->response->mutable_status());
-  ctx->rpc_context->RespondSuccess();
+  RespondAndReleaseRpc(Status::Expected(msg), ctx->response, ctx->rpc_context,
+      mem_tracker_);
   num_senders_waiting_->Increment(-1);
   num_senders_timedout_->Increment(1);
 }
 
+template<typename ResponsePBType>
+void KrpcDataStreamMgr::RespondAndReleaseRpc(const Status& status,
+    ResponsePBType* response, kudu::rpc::RpcContext* ctx, MemTracker* mem_tracker) {
+  status.ToProto(response->mutable_status());
+  int64_t transfer_size = ctx->GetTransferSize();
+  ctx->RespondSuccess();
+  mem_tracker->Release(transfer_size);
+}
+
 void KrpcDataStreamMgr::Maintenance() {
   while (true) {
     // Notify any senders that have been waiting too long for their receiver to

http://git-wip-us.apache.org/repos/asf/impala/blob/3bfda334/be/src/runtime/krpc-data-stream-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-mgr.h b/be/src/runtime/krpc-data-stream-mgr.h
index 5171e80..458ebe7 100644
--- a/be/src/runtime/krpc-data-stream-mgr.h
+++ b/be/src/runtime/krpc-data-stream-mgr.h
@@ -225,16 +225,13 @@ struct EndDataStreamCtx {
 ///  time.
 ///  'total-senders-timedout-waiting-for-recvr-creation' - total number of senders that
 ///  timed-out while waiting for a receiver.
-///
-/// TODO: The recv buffers used in KrpcDataStreamRecvr should count against
-/// per-query memory limits.
 class KrpcDataStreamMgr : public DataStreamMgrBase {
  public:
   KrpcDataStreamMgr(MetricGroup* metrics);
 
   /// Initialize the deserialization thread pool and create the maintenance thread.
   /// Return error status on failure. Return OK otherwise.
-  Status Init();
+  Status Init(MemTracker* mem_tracker, MemTracker* incoming_request_tracker);
 
   /// Create a receiver for a specific fragment_instance_id/dest_node_id.
   /// If is_merging is true, the receiver maintains a separate queue of incoming row
@@ -293,6 +290,18 @@ class KrpcDataStreamMgr : public DataStreamMgrBase {
  private:
   friend class KrpcDataStreamRecvr;
 
+  /// MemTracker for memory used for transmit data requests before we hand them over to a
+  /// specific receiver. Used only to track payloads of deferred RPCs (e.g. early
+  /// senders). Not owned.
+  MemTracker* mem_tracker_ = nullptr;
+
+  /// MemTracker which is used by the DataStreamService to track memory for incoming
+  /// requests. Memory for new incoming requests is initially tracked against this tracker
+  /// before the requests are handed over to the data stream manager. It is this class's
+  /// responsibility to release memory from this tracker and track it against its own
+  /// tracker (here: mem_tracker_). Not owned.
+  MemTracker* incoming_request_tracker_ = nullptr;
+
   /// A task for the deserialization threads to work on. The fields identify
   /// the target receiver's sender queue.
   struct DeserializeTask {
@@ -466,6 +475,12 @@ class KrpcDataStreamMgr : public DataStreamMgrBase {
   template<typename ContextType, typename RequestPBType>
   void RespondToTimedOutSender(const std::unique_ptr<ContextType>& ctx);
 
+  /// Respond to the RPC passed in 'response'/'ctx' with 'status' and release the payload
+  /// memory from 'mem_tracker'. Takes ownership of 'ctx'.
+  template<typename ResponsePBType>
+  void RespondAndReleaseRpc(const Status& status, ResponsePBType* response,
+      kudu::rpc::RpcContext* ctx, MemTracker* mem_tracker);
+
   /// Notifies any sender that has been waiting for its receiver for more than
   /// FLAGS_datastream_sender_timeout_ms.
   ///

http://git-wip-us.apache.org/repos/asf/impala/blob/3bfda334/be/src/runtime/krpc-data-stream-recvr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-recvr.cc b/be/src/runtime/krpc-data-stream-recvr.cc
index 68f00e3..138cc8b 100644
--- a/be/src/runtime/krpc-data-stream-recvr.cc
+++ b/be/src/runtime/krpc-data-stream-recvr.cc
@@ -126,6 +126,10 @@ class KrpcDataStreamRecvr::SenderQueue {
       const kudu::Slice& tuple_offsets, const kudu::Slice& tuple_data,
       unique_lock<SpinLock>* lock);
 
+  // Respond to the TransmitData RPC passed in 'ctx' with 'status' and release the payload
+  // memory from the MemTracker associated with 'recvr_'.
+  void RespondAndReleaseRpc(const Status& status, const unique_ptr<TransmitDataCtx>& ctx);
+
   // Receiver of which this queue is a member.
   KrpcDataStreamRecvr* recvr_;
 
@@ -321,6 +325,14 @@ void KrpcDataStreamRecvr::SenderQueue::AddBatchWork(int64_t batch_size,
   data_arrival_cv_.notify_one();
 }
 
+void KrpcDataStreamRecvr::SenderQueue::RespondAndReleaseRpc(const Status& status,
+    const unique_ptr<TransmitDataCtx>& ctx) {
+  int64_t transfer_size = ctx->rpc_context->GetTransferSize();
+  status.ToProto(ctx->response->mutable_status());
+  ctx->rpc_context->RespondSuccess();
+  recvr_->mem_tracker()->Release(transfer_size);
+}
+
 void KrpcDataStreamRecvr::SenderQueue::AddBatch(const TransmitDataRequestPB* request,
     TransmitDataResponsePB* response, RpcContext* rpc_context) {
   // TODO: Add timers for time spent in this function and queue time in 'batch_queue_'.
@@ -349,13 +361,14 @@ void KrpcDataStreamRecvr::SenderQueue::AddBatch(const TransmitDataRequestPB* req
       return;
     }
 
-    // If there's something in the queue and this batch will push us over the buffer
+    // If there's something in the queue or this batch will push us over the buffer
     // limit we need to wait until the queue gets drained. We store the rpc context
     // so that we can signal it at a later time to resend the batch that we couldn't
     // process here. If there are already deferred RPCs waiting in queue, the new
     // batch needs to line up after the deferred RPCs to avoid starvation of senders
     // in the non-merging case.
     if (UNLIKELY(!deferred_rpcs_.empty() || !CanEnqueue(batch_size))) {
+      recvr_->mem_tracker()->Consume(rpc_context->GetTransferSize());
       auto payload = make_unique<TransmitDataCtx>(request, response, rpc_context);
       deferred_rpcs_.push(move(payload));
       COUNTER_ADD(recvr_->num_deferred_batches_, 1);
@@ -391,8 +404,7 @@ void KrpcDataStreamRecvr::SenderQueue::DequeueDeferredRpc() {
         &tuple_data, &batch_size);
     // Reply with error status if the entry cannot be unpacked.
     if (UNLIKELY(!status.ok())) {
-      status.ToProto(ctx->response->mutable_status());
-      ctx->rpc_context->RespondSuccess();
+      RespondAndReleaseRpc(status, ctx);
       deferred_rpcs_.pop();
       return;
     }
@@ -412,13 +424,13 @@ void KrpcDataStreamRecvr::SenderQueue::DequeueDeferredRpc() {
   }
 
   // Responds to the sender to ack the insertion of the row batches.
-  Status::OK().ToProto(ctx->response->mutable_status());
-  ctx->rpc_context->RespondSuccess();
+  RespondAndReleaseRpc(Status::OK(), ctx);
 }
 
 void KrpcDataStreamRecvr::SenderQueue::TakeOverEarlySender(
     unique_ptr<TransmitDataCtx> ctx) {
   int sender_id = ctx->request->sender_id();
+  recvr_->mem_tracker()->Consume(ctx->rpc_context->GetTransferSize());
   COUNTER_ADD(recvr_->num_deferred_batches_, 1);
   {
     lock_guard<SpinLock> l(lock_);
@@ -449,8 +461,7 @@ void KrpcDataStreamRecvr::SenderQueue::Cancel() {
     // Respond to deferred RPCs.
     while (!deferred_rpcs_.empty()) {
       const unique_ptr<TransmitDataCtx>& payload = deferred_rpcs_.front();
-      Status::OK().ToProto(payload->response->mutable_status());
-      payload->rpc_context->RespondSuccess();
+      RespondAndReleaseRpc(Status::OK(), payload);
       deferred_rpcs_.pop();
     }
   }

http://git-wip-us.apache.org/repos/asf/impala/blob/3bfda334/be/src/runtime/mem-tracker.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker.h b/be/src/runtime/mem-tracker.h
index c582d72..4228288 100644
--- a/be/src/runtime/mem-tracker.h
+++ b/be/src/runtime/mem-tracker.h
@@ -55,8 +55,8 @@ class TQueryOptions;
 ///
 /// By default, memory consumption is tracked via calls to Consume()/Release(), either to
 /// the tracker itself or to one of its descendents. Alternatively, a consumption metric
-/// can specified, and then the metric's value is used as the consumption rather than the
-/// tally maintained by Consume() and Release(). A tcmalloc metric is used to track
+/// can be specified, and then the metric's value is used as the consumption rather than
+/// the tally maintained by Consume() and Release(). A tcmalloc metric is used to track
 /// process memory consumption, since the process memory usage may be higher than the
 /// computed total memory (tcmalloc does not release deallocated memory immediately).
 /// Other consumption metrics are used in trackers below the process level to account

http://git-wip-us.apache.org/repos/asf/impala/blob/3bfda334/be/src/util/memory-metrics.h
----------------------------------------------------------------------
diff --git a/be/src/util/memory-metrics.h b/be/src/util/memory-metrics.h
index 6c10e09..0ac04bf 100644
--- a/be/src/util/memory-metrics.h
+++ b/be/src/util/memory-metrics.h
@@ -198,7 +198,7 @@ class BufferPoolMetric : public IntGauge {
   static Status InitMetrics(MetricGroup* metrics, ReservationTracker* global_reservations,
       BufferPool* buffer_pool) WARN_UNUSED_RESULT;
 
-  /// Global metrics, initialized by CreateAndRegisterMetrics().
+  /// Global metrics, initialized by InitMetrics().
   static BufferPoolMetric* LIMIT;
   static BufferPoolMetric* SYSTEM_ALLOCATED;
   static BufferPoolMetric* RESERVED;

http://git-wip-us.apache.org/repos/asf/impala/blob/3bfda334/common/protobuf/data_stream_service.proto
----------------------------------------------------------------------
diff --git a/common/protobuf/data_stream_service.proto b/common/protobuf/data_stream_service.proto
index 3aa3f28..c2045d2 100644
--- a/common/protobuf/data_stream_service.proto
+++ b/common/protobuf/data_stream_service.proto
@@ -41,7 +41,7 @@ message TransmitDataRequestPB {
   optional int32 tuple_offsets_sidecar_idx = 5;
 
   // The sidecar index of the tuple's data which is a (compressed) row batch.
-  // The details  of the row batch (e.g. # of rows) is in 'row_batch_header' above.
+  // The details of the row batch (e.g. # of rows) is in 'row_batch_header' above.
   optional int32 tuple_data_sidecar_idx = 6;
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/3bfda334/tests/custom_cluster/test_krpc_mem_usage.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_krpc_mem_usage.py b/tests/custom_cluster/test_krpc_mem_usage.py
new file mode 100644
index 0000000..ed7b056
--- /dev/null
+++ b/tests/custom_cluster/test_krpc_mem_usage.py
@@ -0,0 +1,98 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+import time
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.common.impala_cluster import ImpalaCluster
+from tests.common.skip import SkipIfBuildType
+from tests.verifiers.mem_usage_verifier import MemUsageVerifier
+
+DATA_STREAM_MGR_METRIC = "Data Stream Queued RPC Calls"
+DATA_STREAM_SVC_METRIC = "Data Stream Service"
+ALL_METRICS = [ DATA_STREAM_MGR_METRIC, DATA_STREAM_SVC_METRIC ]
+
+class TestKrpcMemUsage(CustomClusterTestSuite):
+  """Test for memory usage tracking when using KRPC."""
+  TEST_QUERY = "select count(c2.string_col) from \
+     functional.alltypestiny join functional.alltypessmall c2"
+
+  @classmethod
+  def get_workload(self):
+    return 'functional-query'
+
+  @classmethod
+  def setup_class(cls):
+    if cls.exploration_strategy() != 'exhaustive':
+      pytest.skip('runs only in exhaustive')
+    super(TestKrpcMemUsage, cls).setup_class()
+
+  def execute_query_verify_mem_usage(self, query, non_zero_peak_metrics):
+    """Executes 'query' and makes sure that the memory used by KRPC is returned to the
+    memtrackers. It also verifies that metrics in 'non_zero_peak_metrics' have a peak
+    value > 0.
+    """
+    self.client.execute(query)
+    self.verify_mem_usage(non_zero_peak_metrics)
+
+  def verify_mem_usage(self, non_zero_peak_metrics):
+    """Verifies that the memory used by KRPC is returned to the memtrackers and that
+    metrics in 'non_zero_peak_metrics' have a peak value > 0.
+    """
+    verifiers = [ MemUsageVerifier(i.service) for i in ImpalaCluster().impalads ]
+    for verifier in verifiers:
+      for metric_name in ALL_METRICS:
+        usage = verifier.get_mem_usage_values(metric_name)
+        assert usage["total"] == 0
+        if metric_name in non_zero_peak_metrics:
+          assert usage["peak"] > 0, metric_name
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args("--use_krpc")
+  def test_krpc_unqueued_memory_usage(self, vector):
+    """Executes a simple query and checks that the data stream service consumed some
+    memory.
+    """
+    # The data stream manager may not need to track memory in any queue if the receivers
+    # show up in time.
+    self.execute_query_verify_mem_usage(self.TEST_QUERY, [DATA_STREAM_SVC_METRIC])
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args("--use_krpc --stress_datastream_recvr_delay_ms=1000")
+  def test_krpc_deferred_memory_usage(self, vector):
+    """Executes a simple query. The cluster is started with delayed receiver creation to
+    trigger RPC queueing.
+    """
+    self.execute_query_verify_mem_usage(self.TEST_QUERY, ALL_METRICS)
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args("--use_krpc --stress_datastream_recvr_delay_ms=1000")
+  def test_krpc_deferred_memory_cancellation(self, vector):
+    """Executes a query and cancels it while RPCs are still queued up. This exercises the
+    code to flush the deferred RPC queue in the receiver.
+    """
+    query = "select count(*) from tpch_parquet.lineitem l1 join tpch_parquet.lineitem l2 \
+            where l1.l_orderkey = l2.l_orderkey"
+    # Warm up metadata
+    self.client.execute(query)
+    # Execute and cancel query
+    handle = self.client.execute_async(query)
+    # Sleep to allow RPCs to arrive.
+    time.sleep(0.5)
+    self.client.cancel(handle)
+    self.client.close()
+    self.verify_mem_usage(ALL_METRICS)

http://git-wip-us.apache.org/repos/asf/impala/blob/3bfda334/tests/verifiers/mem_usage_verifier.py
----------------------------------------------------------------------
diff --git a/tests/verifiers/mem_usage_verifier.py b/tests/verifiers/mem_usage_verifier.py
new file mode 100644
index 0000000..644e5fa
--- /dev/null
+++ b/tests/verifiers/mem_usage_verifier.py
@@ -0,0 +1,70 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# Verifier for memtracker usage values (Total, Peak, etc).
+
+import re
+
+SIZE_FACTORS = {"b": 1, "kb": 1 << 10, "mb": 1 << 20, "gb": 1 << 30}
+
+def parse_mem_value(value):
+  """Parses a memory value with an optional unit like "123", "10 B", or "1.5 KB" into an
+  number of bytes.
+  """
+  elements = value.split()
+  result = float(elements[0])
+  if len(elements) > 1:
+    result *= SIZE_FACTORS[elements[1].lower()]
+  return result
+
+class MemUsageVerifier(object):
+  """MemUsageVerifier objects can be used to verify values in the debug output of memory
+  trackers.
+  """
+
+  def __init__(self, impalad_service):
+    """Initialize module given an ImpalaService object"""
+    self.impalad_service = impalad_service
+
+  def get_mem_usage_values(self, name):
+    """Returns a dictionary of all key=value pairs of the memtracker specified by 'name'
+    by reading the '/memz' debug webpage. It also parses and converts memory values
+    including optional units like "10 B" or "1.5 KB". All strings are converted to
+    lowercase. Only the first line starting with 'name' is considered.
+
+    For example, for the line "Data Stream Service: Total=0 Peak=108.00 B" this will
+    return "dict(total=0, peak=108.0)".
+    """
+    memz = self.impalad_service.get_debug_webpage_json("memz")
+    details = memz.get("detailed", "")
+    for line in details.splitlines():
+      line = line.strip()
+      prefix = name + ":"
+      if line.startswith(prefix):
+        line = line[len(prefix):]
+        result = {}
+        # The value regex matches either '0' or any number including a decimal dot,
+        # followed by a required unit.
+        for k, v in re.findall(r"(\S+)=(0|[\d\.]+ [KMG]?B)", line):
+          result[k.lower()] = parse_mem_value(v)
+        return result
+    return {}
+
+
+
+
+


[3/4] impala git commit: IMPALA-4319: remove some deprecated query options

Posted by mi...@apache.org.
IMPALA-4319: remove some deprecated query options

Adds a concept of a "removed" query option that has no effect but does
not return an error when a user attempts to set it. These options are
not returned by "set" or "set all" commands that are executed in
impala-shell or server-side.

These query options have been deprecated for several releases:
DEFAULT_ORDER_BY_LIMIT, ABORT_ON_DEFAULT_LIMIT_EXCEEDED,
V_CPU_CORES, RESERVATION_REQUEST_TIMEOUT, RM_INITIAL_MEM,
SCAN_NODE_CODEGEN_THRESHOLD, MAX_IO_BUFFERS

RM_INITIAL_MEM did still have an effect, but it was undocumented and
MEM_LIMIT should be used in preference.

DISABLE_CACHED_READS also had an effect but it was documented as
deprecated.

Otherwise the options had no effect at all.

Testing:
Ran exhaustive build.

Updated query option tests to reflect the new behaviour.

Cherry-picks: not for 2.x.

Change-Id: I9e742e9b0eca0e5c81fd71db3122fef31522fcad
Reviewed-on: http://gerrit.cloudera.org:8080/9118
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/acfd169c
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/acfd169c
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/acfd169c

Branch: refs/heads/master
Commit: acfd169c8e814adb61a1f1d35327b38ee980f217
Parents: 1b1087e
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Tue Jan 23 14:36:11 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Feb 1 08:26:26 2018 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-scan-node-base.cc              |  3 -
 be/src/runtime/runtime-state.h                  |  3 -
 be/src/scheduling/query-schedule.cc             |  6 +-
 be/src/scheduling/scheduler-test-util.h         |  1 -
 be/src/scheduling/scheduler-test.cc             | 27 --------
 be/src/scheduling/scheduler.cc                  |  7 ---
 be/src/scheduling/scheduler.h                   |  7 ---
 be/src/service/child-query.cc                   |  2 +
 be/src/service/client-request-state.cc          |  1 +
 be/src/service/query-options-test.cc            |  8 +--
 be/src/service/query-options.cc                 | 65 +++++++++-----------
 be/src/service/query-options.h                  | 20 +++---
 common/thrift/ImpalaInternalService.thrift      | 29 ---------
 common/thrift/ImpalaService.thrift              | 28 +++------
 common/thrift/beeswax.thrift                    |  6 +-
 shell/impala_shell.py                           | 17 +++--
 .../functional-query/queries/QueryTest/set.test | 32 ++++------
 tests/comparison/discrepancy_searcher.py        |  2 -
 .../custom_cluster/test_admission_controller.py |  5 +-
 tests/hs2/test_hs2.py                           | 12 +++-
 tests/query_test/test_observability.py          |  2 +-
 tests/shell/test_shell_commandline.py           | 14 +++--
 tests/shell/test_shell_interactive.py           |  8 ++-
 23 files changed, 105 insertions(+), 200 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/acfd169c/be/src/exec/hdfs-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index 2f3d0c5..8b065fa 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -232,9 +232,6 @@ Status HdfsScanNodeBase::Prepare(RuntimeState* state) {
     if (expected_local && params.volume_id == -1) ++num_ranges_missing_volume_id;
 
     bool try_cache = params.is_cached;
-    if (runtime_state_->query_options().disable_cached_reads) {
-      DCHECK(!try_cache) << "Params should not have had this set.";
-    }
     file_desc->splits.push_back(
         AllocateScanRange(file_desc->fs, file_desc->filename.c_str(), split.length,
             split.offset, split.partition_id, params.volume_id, expected_local,

http://git-wip-us.apache.org/repos/asf/impala/blob/acfd169c/be/src/runtime/runtime-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h
index 4eb3e10..66b2099 100644
--- a/be/src/runtime/runtime-state.h
+++ b/be/src/runtime/runtime-state.h
@@ -102,9 +102,6 @@ class RuntimeState {
   int batch_size() const { return query_options().batch_size; }
   bool abort_on_error() const { return query_options().abort_on_error; }
   bool strict_mode() const { return query_options().strict_mode; }
-  bool abort_on_default_limit_exceeded() const {
-    return query_options().abort_on_default_limit_exceeded;
-  }
   bool decimal_v2() const { return query_options().decimal_v2; }
   const TQueryCtx& query_ctx() const;
   const TPlanFragmentInstanceCtx& instance_ctx() const { return *instance_ctx_; }

http://git-wip-us.apache.org/repos/asf/impala/blob/acfd169c/be/src/scheduling/query-schedule.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/query-schedule.cc b/be/src/scheduling/query-schedule.cc
index e14af78..f833273 100644
--- a/be/src/scheduling/query-schedule.cc
+++ b/be/src/scheduling/query-schedule.cc
@@ -193,11 +193,7 @@ int64_t QuerySchedule::GetPerHostMemoryEstimate() const {
   }
 
   int64_t per_host_mem = 0L;
-  // TODO: Remove rm_initial_mem and associated logic when we're sure that clients won't
-  // be affected.
-  if (query_options_.__isset.rm_initial_mem && query_options_.rm_initial_mem > 0) {
-    per_host_mem = query_options_.rm_initial_mem;
-  } else if (has_query_option) {
+  if (has_query_option) {
     per_host_mem = query_option_memory_limit;
   } else {
     DCHECK(request_.__isset.per_host_mem_estimate);

http://git-wip-us.apache.org/repos/asf/impala/blob/acfd169c/be/src/scheduling/scheduler-test-util.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler-test-util.h b/be/src/scheduling/scheduler-test-util.h
index c3d6b1c..254acb0 100644
--- a/be/src/scheduling/scheduler-test-util.h
+++ b/be/src/scheduling/scheduler-test-util.h
@@ -236,7 +236,6 @@ class Plan {
   void SetReplicaPreference(TReplicaPreference::type p);
 
   void SetRandomReplica(bool b) { query_options_.schedule_random_replica = b; }
-  void SetDisableCachedReads(bool b) { query_options_.disable_cached_reads = b; }
   const Cluster& cluster() const { return schema_.cluster(); }
 
   const std::vector<TNetworkAddress>& referenced_datanodes() const;

http://git-wip-us.apache.org/repos/asf/impala/blob/acfd169c/be/src/scheduling/scheduler-test.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler-test.cc b/be/src/scheduling/scheduler-test.cc
index c70e54f..ca0dbbd 100644
--- a/be/src/scheduling/scheduler-test.cc
+++ b/be/src/scheduling/scheduler-test.cc
@@ -281,33 +281,6 @@ TEST_F(SchedulerTest, TestCachedReadPreferred) {
   EXPECT_EQ(0, result.NumRemoteAssignedBytes());
 }
 
-/// Verify that disable_cached_reads is effective.
-TEST_F(SchedulerTest, TestDisableCachedReads) {
-  Cluster cluster;
-  cluster.AddHosts(3, true, true);
-
-  Schema schema(cluster);
-  schema.AddSingleBlockTable("T1", {0, 2}, {1});
-
-  Plan plan(schema);
-  // 1 of the 3 replicas is cached.
-  plan.AddTableScan("T1");
-  plan.SetDisableCachedReads(true);
-
-  Result result(plan);
-  SchedulerWrapper scheduler(plan);
-  ASSERT_OK(scheduler.Compute(&result));
-  EXPECT_EQ(0, result.NumCachedAssignedBytes());
-  EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumDiskAssignedBytes());
-  EXPECT_EQ(0, result.NumRemoteAssignedBytes());
-
-  // Compute additional assignments.
-  for (int i = 0; i < 8; ++i) ASSERT_OK(scheduler.Compute(&result));
-  EXPECT_EQ(0, result.NumCachedAssignedBytes());
-  EXPECT_EQ(9 * Block::DEFAULT_BLOCK_SIZE, result.NumDiskAssignedBytes());
-  EXPECT_EQ(0, result.NumRemoteAssignedBytes());
-}
-
 /// IMPALA-3019: Test for round robin reset problem. We schedule the same plan twice but
 /// send an empty statestored message in between.
 /// TODO: This problem cannot occur anymore and the test is merely green for random

http://git-wip-us.apache.org/repos/asf/impala/blob/acfd169c/be/src/scheduling/scheduler.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index e924f50..527b5cf 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -510,13 +510,6 @@ Status Scheduler::ComputeScanRangeAssignment(const BackendConfig& executor_confi
   // of memory distance classes see TReplicaPreference in PlanNodes.thrift.
   TReplicaPreference::type base_distance = query_options.replica_preference;
 
-  // The query option to disable cached reads adjusts the memory base distance to view
-  // all replicas as having a distance disk_local or worse.
-  if (query_options.disable_cached_reads
-      && base_distance == TReplicaPreference::CACHE_LOCAL) {
-    base_distance = TReplicaPreference::DISK_LOCAL;
-  }
-
   // A preference attached to the plan node takes precedence.
   if (node_replica_preference) base_distance = *node_replica_preference;
 

http://git-wip-us.apache.org/repos/asf/impala/blob/acfd169c/be/src/scheduling/scheduler.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler.h b/be/src/scheduling/scheduler.h
index 2fe90b8..4be2996 100644
--- a/be/src/scheduling/scheduler.h
+++ b/be/src/scheduling/scheduler.h
@@ -58,8 +58,6 @@ class SchedulerWrapper;
 /// TODO: Notice when there are duplicate statestore registrations (IMPALA-23)
 /// TODO: Track assignments (assignment_ctx in ComputeScanRangeAssignment) per query
 ///       instead of per plan node?
-/// TODO: Remove disable_cached_reads query option in the next compatibility-breaking
-///       release (IMPALA-2963)
 /// TODO: Replace the usage of shared_ptr with atomic_shared_ptr once compilers support
 ///       it. Alternatively consider using Kudu's rw locks.
 /// TODO: Inject global dependencies into the class (for example ExecEnv::GetInstance(),
@@ -376,11 +374,6 @@ class Scheduler {
   ///   the assignments over more replicas. Allowed values are CACHE_LOCAL (default),
   ///   DISK_LOCAL and REMOTE.
   ///
-  /// disable_cached_reads:
-  ///   Setting this value to true is equivalent to setting replica_preference to
-  ///   DISK_LOCAL and takes precedence over replica_preference. The default setting is
-  ///   false.
-  ///
   /// schedule_random_replica:
   ///   When equivalent executors with a memory distance of DISK_LOCAL are found for a
   ///   scan range (same memory distance, same amount of assigned work), then the first

http://git-wip-us.apache.org/repos/asf/impala/blob/acfd169c/be/src/service/child-query.cc
----------------------------------------------------------------------
diff --git a/be/src/service/child-query.cc b/be/src/service/child-query.cc
index 520a834..ac4c6be 100644
--- a/be/src/service/child-query.cc
+++ b/be/src/service/child-query.cc
@@ -112,8 +112,10 @@ void ChildQuery::SetQueryOptions(const TQueryOptions& parent_options,
     val << parent_options.NAME;\
     conf[#ENUM] = val.str();\
   }
+#define REMOVED_QUERY_OPT_FN(NAME, ENUM)
   QUERY_OPTS_TABLE
 #undef QUERY_OPT_FN
+#undef REMOVED_QUERY_OPT_FN
   // Ignore debug actions on child queries because they may cause deadlock.
   map<string, string>::iterator it = conf.find("DEBUG_ACTION");
   if (it != conf.end()) conf.erase(it);

http://git-wip-us.apache.org/repos/asf/impala/blob/acfd169c/be/src/service/client-request-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index ee35afa..64ab950 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -241,6 +241,7 @@ void ClientRequestState::PopulateResultForSet(bool is_set_all) {
   for (; itr != config.end(); ++itr) {
     const auto opt_level_id =
         parent_server_->query_option_levels_[itr->first];
+    if (opt_level_id == TQueryOptionLevel::REMOVED) continue;
     if (!is_set_all && (opt_level_id == TQueryOptionLevel::DEVELOPMENT ||
                         opt_level_id == TQueryOptionLevel::DEPRECATED)) {
       continue;

http://git-wip-us.apache.org/repos/asf/impala/blob/acfd169c/be/src/service/query-options-test.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-options-test.cc b/be/src/service/query-options-test.cc
index 552c218..42681d9 100644
--- a/be/src/service/query-options-test.cc
+++ b/be/src/service/query-options-test.cc
@@ -138,7 +138,6 @@ TEST(QueryOptions, SetByteOptions) {
   vector<pair<OptionDef<int64_t>, Range<int64_t>>> case_set_i64 {
       {MAKE_OPTIONDEF(mem_limit),             {-1, I64_MAX}},
       {MAKE_OPTIONDEF(max_scan_range_length), {-1, I64_MAX}},
-      {MAKE_OPTIONDEF(rm_initial_mem),        {-1, I64_MAX}},
       {MAKE_OPTIONDEF(buffer_pool_limit),     {-1, I64_MAX}},
       {MAKE_OPTIONDEF(max_row_size),          {1, ROW_SIZE_LIMIT}},
       {MAKE_OPTIONDEF(parquet_file_size),     {-1, I32_MAX}},
@@ -244,19 +243,14 @@ TEST(QueryOptions, SetIntOptions) {
 
 // Test options with non regular validation rule
 TEST(QueryOptions, SetSpecialOptions) {
-  // REPLICA_PREFERENCE cannot be set to 0 if DISABLE_CACHED_READS is true
-  // It also has unsettable enum values: cache_rack(1) & disk_rack(3)
+  // REPLICA_PREFERENCE has unsettable enum values: cache_rack(1) & disk_rack(3)
   TQueryOptions options;
   {
     OptionDef<TReplicaPreference::type> key_def = MAKE_OPTIONDEF(replica_preference);
     auto TestOk = MakeTestOkFn(options, key_def);
     auto TestError = MakeTestErrFn(options, key_def);
-    EXPECT_OK(SetQueryOption("DISABLE_CACHED_READS", "false", &options, nullptr));
     TestOk("cache_local", TReplicaPreference::CACHE_LOCAL);
     TestOk("0", TReplicaPreference::CACHE_LOCAL);
-    EXPECT_OK(SetQueryOption("DISABLE_CACHED_READS", "true", &options, nullptr));
-    TestError("cache_local");
-    TestError("0");
     TestError("cache_rack");
     TestError("1");
     TestOk("disk_local", TReplicaPreference::DISK_LOCAL);

http://git-wip-us.apache.org/repos/asf/impala/blob/acfd169c/be/src/service/query-options.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index ff2fd4e..3f7bb7c 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -62,8 +62,10 @@ void impala::OverlayQueryOptions(const TQueryOptions& src, const QueryOptionsMas
       "Size of QueryOptionsMask must be increased.";
 #define QUERY_OPT_FN(NAME, ENUM, LEVEL)\
   if (src.__isset.NAME && mask[TImpalaQueryOptions::ENUM]) dst->__set_##NAME(src.NAME);
+#define REMOVED_QUERY_OPT_FN(NAME, ENUM)
   QUERY_OPTS_TABLE
 #undef QUERY_OPT_FN
+#undef REMOVED_QUERY_OPT_FN
 }
 
 void impala::TQueryOptionsToMap(const TQueryOptions& query_options,
@@ -78,8 +80,10 @@ void impala::TQueryOptionsToMap(const TQueryOptions& query_options,
       (*configuration)[#ENUM] = ""; \
     }\
   }
+#define REMOVED_QUERY_OPT_FN(NAME, ENUM) (*configuration)[#ENUM] = "";
   QUERY_OPTS_TABLE
 #undef QUERY_OPT_FN
+#undef REMOVED_QUERY_OPT_FN
 }
 
 // Resets query_options->option to its default value.
@@ -91,8 +95,10 @@ static void ResetQueryOption(const int option, TQueryOptions* query_options) {
       query_options->__isset.NAME = defaults.__isset.NAME;\
       query_options->NAME = defaults.NAME;\
       break;
+#define REMOVED_QUERY_OPT_FN(NAME, ENUM)
   QUERY_OPTS_TABLE
 #undef QUERY_OPT_FN
+#undef REMOVED_QUERY_OPT_FN
   }
 }
 
@@ -113,8 +119,10 @@ string impala::DebugQueryOptions(const TQueryOptions& query_options) {
     if (i++ > 0) ss << ",";\
     ss << #ENUM << "=" << query_options.NAME;\
   }
+#define REMOVED_QUERY_OPT_FN(NAME, ENUM)
   QUERY_OPTS_TABLE
 #undef QUERY_OPT_FN
+#undef REMOVED_QUERY_OPT_FN
   return ss.str();
 }
 
@@ -131,6 +139,19 @@ static int GetQueryOptionForKey(const string& key) {
   return -1;
 }
 
+// Return true if we can ignore a reference to this removed query option.
+static bool IsRemovedQueryOption(const string& key) {
+#define QUERY_OPT_FN(NAME, ENUM, LEVEL)
+#define REMOVED_QUERY_OPT_FN(NAME, ENUM) \
+  if (iequals(key, #NAME)) { \
+    return true; \
+  }
+  QUERY_OPTS_TABLE
+#undef QUERY_OPT_FN
+#undef REMOVED_QUERY_OPT_FN
+  return false;
+}
+
 // Note that we allow numerical values for boolean and enum options. This is because
 // TQueryOptionsToMap() will output the numerical values, and we need to parse its output
 // configuration.
@@ -185,9 +206,6 @@ Status impala::SetQueryOption(const string& key, const string& value,
         query_options->__set_max_scan_range_length(scan_length);
         break;
       }
-      case TImpalaQueryOptions::MAX_IO_BUFFERS:
-        query_options->__set_max_io_buffers(atoi(value.c_str()));
-        break;
       case TImpalaQueryOptions::NUM_SCANNER_THREADS:
         query_options->__set_num_scanner_threads(atoi(value.c_str()));
         break;
@@ -195,9 +213,6 @@ Status impala::SetQueryOption(const string& key, const string& value,
         query_options->__set_allow_unsupported_formats(
             iequals(value, "true") || iequals(value, "1"));
         break;
-      case TImpalaQueryOptions::DEFAULT_ORDER_BY_LIMIT:
-        query_options->__set_default_order_by_limit(atoi(value.c_str()));
-        break;
       case TImpalaQueryOptions::DEBUG_ACTION:
         query_options->__set_debug_action(value.c_str());
         break;
@@ -233,10 +248,6 @@ Status impala::SetQueryOption(const string& key, const string& value,
         }
         break;
       }
-      case TImpalaQueryOptions::ABORT_ON_DEFAULT_LIMIT_EXCEEDED:
-        query_options->__set_abort_on_default_limit_exceeded(
-            iequals(value, "true") || iequals(value, "1"));
-        break;
       case TImpalaQueryOptions::HBASE_CACHING:
         query_options->__set_hbase_caching(atoi(value.c_str()));
         break;
@@ -278,27 +289,10 @@ Status impala::SetQueryOption(const string& key, const string& value,
       case TImpalaQueryOptions::REQUEST_POOL:
         query_options->__set_request_pool(value);
         break;
-      case TImpalaQueryOptions::V_CPU_CORES:
-        query_options->__set_v_cpu_cores(atoi(value.c_str()));
-        break;
-      case TImpalaQueryOptions::RESERVATION_REQUEST_TIMEOUT:
-        query_options->__set_reservation_request_timeout(atoi(value.c_str()));
-        break;
-      case TImpalaQueryOptions::DISABLE_CACHED_READS:
-        if (iequals(value, "true") || iequals(value, "1")) {
-          query_options->__set_disable_cached_reads(true);
-        }
-        break;
       case TImpalaQueryOptions::DISABLE_OUTERMOST_TOPN:
         query_options->__set_disable_outermost_topn(
             iequals(value, "true") || iequals(value, "1"));
         break;
-      case TImpalaQueryOptions::RM_INITIAL_MEM: {
-        int64_t reservation_size;
-        RETURN_IF_ERROR(ParseMemValue(value, "RM memory limit", &reservation_size));
-        query_options->__set_rm_initial_mem(reservation_size);
-        break;
-      }
       case TImpalaQueryOptions::QUERY_TIMEOUT_S:
         query_options->__set_query_timeout_s(atoi(value.c_str()));
         break;
@@ -327,10 +321,6 @@ Status impala::SetQueryOption(const string& key, const string& value,
         break;
       case TImpalaQueryOptions::REPLICA_PREFERENCE:
         if (iequals(value, "cache_local") || iequals(value, "0")) {
-          if (query_options->disable_cached_reads) {
-            return Status("Conflicting settings: DISABLE_CACHED_READS = true and"
-                " REPLICA_PREFERENCE = CACHE_LOCAL");
-          }
           query_options->__set_replica_preference(TReplicaPreference::CACHE_LOCAL);
         } else if (iequals(value, "disk_local") || iequals(value, "2")) {
           query_options->__set_replica_preference(TReplicaPreference::DISK_LOCAL);
@@ -345,10 +335,6 @@ Status impala::SetQueryOption(const string& key, const string& value,
         query_options->__set_schedule_random_replica(
             iequals(value, "true") || iequals(value, "1"));
         break;
-      // TODO: remove this query option (IMPALA-4319).
-      case TImpalaQueryOptions::SCAN_NODE_CODEGEN_THRESHOLD:
-        query_options->__set_scan_node_codegen_threshold(atol(value.c_str()));
-        break;
       case TImpalaQueryOptions::DISABLE_STREAMING_PREAGGREGATIONS:
         query_options->__set_disable_streaming_preaggregations(
             iequals(value, "true") || iequals(value, "1"));
@@ -614,6 +600,10 @@ Status impala::SetQueryOption(const string& key, const string& value,
         break;
       }
       default:
+        if (IsRemovedQueryOption(key)) {
+          LOG(WARNING) << "Ignoring attempt to set removed query option '" << key << "'";
+          return Status::OK();
+        }
         // We hit this DCHECK(false) if we forgot to add the corresponding entry here
         // when we add a new query option.
         LOG(ERROR) << "Missing exec option implementation: " << key;
@@ -659,7 +649,12 @@ void impala::PopulateQueryOptionLevels(QueryOptionLevels* query_option_levels)
   {\
     (*query_option_levels)[#ENUM] = LEVEL;\
   }
+#define REMOVED_QUERY_OPT_FN(NAME, ENUM)\
+  {\
+    (*query_option_levels)[#ENUM] = TQueryOptionLevel::REMOVED;\
+  }
   QUERY_OPTS_TABLE
   QUERY_OPT_FN(support_start_over, SUPPORT_START_OVER, TQueryOptionLevel::ADVANCED)
 #undef QUERY_OPT_FN
+#undef REMOVED_QUERY_OPT_FN
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/acfd169c/be/src/service/query-options.h
----------------------------------------------------------------------
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index 9cdc935..bb07552 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -42,16 +42,14 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
 #define QUERY_OPTS_TABLE\
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
       TImpalaQueryOptions::COMPUTE_STATS_MIN_SAMPLE_SIZE + 1);\
-  QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED,\
-      TQueryOptionLevel::DEPRECATED)\
+  REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)\
   QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS,\
       TQueryOptionLevel::DEPRECATED)\
   QUERY_OPT_FN(batch_size, BATCH_SIZE, TQueryOptionLevel::DEVELOPMENT)\
   QUERY_OPT_FN(debug_action, DEBUG_ACTION, TQueryOptionLevel::DEVELOPMENT)\
-  QUERY_OPT_FN(default_order_by_limit, DEFAULT_ORDER_BY_LIMIT,\
-      TQueryOptionLevel::DEPRECATED)\
-  QUERY_OPT_FN(disable_cached_reads, DISABLE_CACHED_READS, TQueryOptionLevel::DEPRECATED)\
+  REMOVED_QUERY_OPT_FN(default_order_by_limit, DEFAULT_ORDER_BY_LIMIT)\
+  REMOVED_QUERY_OPT_FN(disable_cached_reads, DISABLE_CACHED_READS)\
   QUERY_OPT_FN(disable_outermost_topn, DISABLE_OUTERMOST_TOPN,\
       TQueryOptionLevel::DEVELOPMENT)\
   QUERY_OPT_FN(disable_codegen, DISABLE_CODEGEN, TQueryOptionLevel::REGULAR)\
@@ -59,7 +57,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
   QUERY_OPT_FN(hbase_cache_blocks, HBASE_CACHE_BLOCKS, TQueryOptionLevel::ADVANCED)\
   QUERY_OPT_FN(hbase_caching, HBASE_CACHING, TQueryOptionLevel::ADVANCED)\
   QUERY_OPT_FN(max_errors, MAX_ERRORS, TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(max_io_buffers, MAX_IO_BUFFERS, TQueryOptionLevel::DEPRECATED)\
+  REMOVED_QUERY_OPT_FN(max_io_buffers, MAX_IO_BUFFERS)\
   QUERY_OPT_FN(max_scan_range_length, MAX_SCAN_RANGE_LENGTH,\
       TQueryOptionLevel::DEVELOPMENT)\
   QUERY_OPT_FN(mem_limit, MEM_LIMIT, TQueryOptionLevel::REGULAR)\
@@ -68,11 +66,10 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
   QUERY_OPT_FN(compression_codec, COMPRESSION_CODEC, TQueryOptionLevel::REGULAR)\
   QUERY_OPT_FN(parquet_file_size, PARQUET_FILE_SIZE, TQueryOptionLevel::ADVANCED)\
   QUERY_OPT_FN(request_pool, REQUEST_POOL, TQueryOptionLevel::REGULAR)\
-  QUERY_OPT_FN(reservation_request_timeout, RESERVATION_REQUEST_TIMEOUT,\
-      TQueryOptionLevel::DEPRECATED)\
+  REMOVED_QUERY_OPT_FN(reservation_request_timeout, RESERVATION_REQUEST_TIMEOUT)\
   QUERY_OPT_FN(sync_ddl, SYNC_DDL, TQueryOptionLevel::REGULAR)\
-  QUERY_OPT_FN(v_cpu_cores, V_CPU_CORES, TQueryOptionLevel::DEPRECATED)\
-  QUERY_OPT_FN(rm_initial_mem, RM_INITIAL_MEM, TQueryOptionLevel::DEPRECATED)\
+  REMOVED_QUERY_OPT_FN(v_cpu_cores, V_CPU_CORES)\
+  REMOVED_QUERY_OPT_FN(rm_initial_mem, RM_INITIAL_MEM)\
   QUERY_OPT_FN(query_timeout_s, QUERY_TIMEOUT_S, TQueryOptionLevel::REGULAR)\
   QUERY_OPT_FN(buffer_pool_limit, BUFFER_POOL_LIMIT, TQueryOptionLevel::ADVANCED)\
   QUERY_OPT_FN(appx_count_distinct, APPX_COUNT_DISTINCT, TQueryOptionLevel::ADVANCED)\
@@ -85,8 +82,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
   QUERY_OPT_FN(replica_preference, REPLICA_PREFERENCE, TQueryOptionLevel::ADVANCED)\
   QUERY_OPT_FN(schedule_random_replica, SCHEDULE_RANDOM_REPLICA,\
       TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(scan_node_codegen_threshold, SCAN_NODE_CODEGEN_THRESHOLD,\
-      TQueryOptionLevel::DEPRECATED)\
+  REMOVED_QUERY_OPT_FN(scan_node_codegen_threshold, SCAN_NODE_CODEGEN_THRESHOLD)\
   QUERY_OPT_FN(disable_streaming_preaggregations, DISABLE_STREAMING_PREAGGREGATIONS,\
       TQueryOptionLevel::REGULAR)\
   QUERY_OPT_FN(runtime_filter_mode, RUNTIME_FILTER_MODE, TQueryOptionLevel::REGULAR)\

http://git-wip-us.apache.org/repos/asf/impala/blob/acfd169c/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index c44189a..37bf638 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -97,16 +97,9 @@ struct TQueryOptions {
   5: optional i32 num_nodes = NUM_NODES_ALL
   6: optional i64 max_scan_range_length = 0
   7: optional i32 num_scanner_threads = 0
-
-  // TODO: IMPALA-4306: retire at compatibility-breaking version
-  8: optional i32 max_io_buffers = 0              // Deprecated in 1.1
   9: optional bool allow_unsupported_formats = 0
-  // TODO: IMPALA-4306: retire at compatibility-breaking version
-  10: optional i64 default_order_by_limit = -1    // Deprecated in 1.4
   11: optional string debug_action = ""
   12: optional i64 mem_limit = 0
-  // TODO: IMPALA-4306: retire at compatibility-breaking version
-  13: optional bool abort_on_default_limit_exceeded = 0 // Deprecated in 1.4
   14: optional CatalogObjects.THdfsCompression compression_codec
   15: optional i32 hbase_caching = 0
   16: optional bool hbase_cache_blocks = 0
@@ -118,27 +111,9 @@ struct TQueryOptions {
   // the pool is determined based on the user.
   20: optional string request_pool
 
-  // Per-host virtual CPU cores required for query (only relevant with RM).
-  // TODO: IMPALA-3271: retire at compatibility-breaking version
-  21: optional i16 v_cpu_cores
-
-  // Max time in milliseconds the resource broker should wait for
-  // a resource request to be granted by Llama/Yarn (only relevant with RM).
-  // TODO: IMPALA-3271: retire at compatibility-breaking version
-  22: optional i64 reservation_request_timeout
-
-  // Disables taking advantage of HDFS caching. This has two parts:
-  // 1. disable preferring to schedule to cached replicas
-  // 2. disable the cached read path.
-  23: optional bool disable_cached_reads = 0
-
   // test hook to disable topn on the outermost select block.
   24: optional bool disable_outermost_topn = 0
 
-  // Override for initial memory reservation size if RM is enabled.
-  // TODO: IMPALA-3271: retire at compatibility-breaking version
-  25: optional i64 rm_initial_mem = 0
-
   // Time, in s, before a query will be timed out if it is inactive. May not exceed
   // --idle_query_timeout if that flag > 0.
   26: optional i32 query_timeout_s = 0
@@ -178,10 +153,6 @@ struct TQueryOptions {
   // subsequent queries. The default is to start with the same replica for every query.
   34: optional bool schedule_random_replica = 0
 
-  // For scan nodes with any conjuncts, use codegen to evaluate the conjuncts if
-  // the number of rows * number of operators in the conjuncts exceeds this threshold.
-  35: optional i64 scan_node_codegen_threshold = 1800000
-
   // If true, the planner will not generate plans with streaming preaggregations.
   36: optional bool disable_streaming_preaggregations = 0
 

http://git-wip-us.apache.org/repos/asf/impala/blob/acfd169c/common/thrift/ImpalaService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index 0f2d3d0..0360f6c 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -67,8 +67,7 @@ enum TImpalaQueryOptions {
   // a length of 0 indicates backend default;
   MAX_SCAN_RANGE_LENGTH,
 
-  // Maximum number of io buffers (per disk)
-  MAX_IO_BUFFERS,
+  MAX_IO_BUFFERS, // Removed
 
   // Number of scanner threads.
   NUM_SCANNER_THREADS,
@@ -76,10 +75,7 @@ enum TImpalaQueryOptions {
   // If true, Impala will try to execute on file formats that are not fully supported yet
   ALLOW_UNSUPPORTED_FORMATS,
 
-  // if set and > -1, specifies the default limit applied to a top-level SELECT statement
-  // with an ORDER BY but without a LIMIT clause (ie, if the SELECT statement also has
-  // a LIMIT clause, this default is ignored)
-  DEFAULT_ORDER_BY_LIMIT,
+  DEFAULT_ORDER_BY_LIMIT, // Removed
 
   // DEBUG ONLY:
   // If set to
@@ -92,8 +88,7 @@ enum TImpalaQueryOptions {
   // invalid, the option is ignored.
   DEBUG_ACTION,
 
-  // If true, raise an error when the DEFAULT_ORDER_BY_LIMIT has been reached.
-  ABORT_ON_DEFAULT_LIMIT_EXCEEDED,
+  ABORT_ON_DEFAULT_LIMIT_EXCEEDED, // Removed
 
   // Compression codec when inserting into tables.
   // Valid values are "snappy", "gzip", "bzip2" and "none"
@@ -133,14 +128,9 @@ enum TImpalaQueryOptions {
   // the pool is determined based on the user.
   REQUEST_POOL,
 
-  // Per-host virtual CPU cores required for query (only relevant with RM).
-  // TODO: IMPALA-3271: retire at compatibility-breaking version
-  V_CPU_CORES,
+  V_CPU_CORES, // Removed
 
-  // Max time in milliseconds the resource broker should wait for
-  // a resource request to be granted by Llama/Yarn (only relevant with RM).
-  // TODO: IMPALA-3271: retire at compatibility-breaking version
-  RESERVATION_REQUEST_TIMEOUT,
+  RESERVATION_REQUEST_TIMEOUT, // Removed
 
   // if true, disables cached reads. This option has no effect if REPLICA_PREFERENCE is
   // configured.
@@ -150,9 +140,7 @@ enum TImpalaQueryOptions {
   // Temporary testing flag
   DISABLE_OUTERMOST_TOPN,
 
-  // Size of initial memory reservation when RM is enabled
-  // TODO: IMPALA-3271: retire at compatibility-breaking version
-  RM_INITIAL_MEM,
+  RM_INITIAL_MEM, // Removed
 
   // Time, in s, before a query will be timed out if it is inactive. May not exceed
   // --idle_query_timeout if that flag > 0.
@@ -186,9 +174,7 @@ enum TImpalaQueryOptions {
   // Enables random backend selection during scheduling.
   SCHEDULE_RANDOM_REPLICA,
 
-  // For scan nodes with any conjuncts, use codegen to evaluate the conjuncts if
-  // the number of rows * number of operators in the conjuncts exceeds this threshold.
-  SCAN_NODE_CODEGEN_THRESHOLD,
+  SCAN_NODE_CODEGEN_THRESHOLD, // Removed
 
   // If true, the planner will not generate plans with streaming preaggregations.
   DISABLE_STREAMING_PREAGGREGATIONS,

http://git-wip-us.apache.org/repos/asf/impala/blob/acfd169c/common/thrift/beeswax.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/beeswax.thrift b/common/thrift/beeswax.thrift
index 5d133dd..481aa2f 100644
--- a/common/thrift/beeswax.thrift
+++ b/common/thrift/beeswax.thrift
@@ -96,12 +96,14 @@ exception QueryNotFoundException {
 }
 
 // Impala extension:
-// Levels to use when displaying query options from Impala shell
+// Levels to use when displaying query options from Impala shell. REMOVED options should
+// not be displayed in the shell, but setting them is a warning rather than an error.
 enum TQueryOptionLevel {
   REGULAR,
   ADVANCED,
   DEVELOPMENT,
-  DEPRECATED
+  DEPRECATED,
+  REMOVED
 }
 
 /** Represents a Hadoop-style configuration variable. */

http://git-wip-us.apache.org/repos/asf/impala/blob/acfd169c/shell/impala_shell.py
----------------------------------------------------------------------
diff --git a/shell/impala_shell.py b/shell/impala_shell.py
index a671f2a..4a77d53 100755
--- a/shell/impala_shell.py
+++ b/shell/impala_shell.py
@@ -86,6 +86,7 @@ class QueryOptionLevels:
   ADVANCED = 1
   DEVELOPMENT = 2
   DEPRECATED = 3
+  REMOVED = 4
 
 class QueryOptionDisplayModes:
   REGULAR_OPTIONS_ONLY = 1
@@ -237,8 +238,8 @@ class ImpalaShell(object, cmd.Cmd):
     if not self.imp_client.default_query_options and not self.set_query_options:
       print '\tNo options available.'
     else:
-      (regular_options, advanced_options, development_options, deprecated_options) = \
-          self._get_query_option_grouping()
+      (regular_options, advanced_options, development_options, deprecated_options,
+          removed_options) = self._get_query_option_grouping()
       self._print_option_group(regular_options)
       # If the shell is connected to an Impala that predates IMPALA-2181 then
       # the advanced_options would be empty and only the regular options would
@@ -260,8 +261,8 @@ class ImpalaShell(object, cmd.Cmd):
     query option level for display purposes using the received query_option_levels
     parameters.
     If the option level can't be determined then it defaults to 'REGULAR'"""
-    regular_options, advanced_options, development_options, deprecated_options = \
-        {}, {}, {}, {}
+    (regular_options, advanced_options, development_options, deprecated_options,
+        removed_options) = {}, {}, {}, {}, {}
     for option_name, option_value in self.imp_client.default_query_options.iteritems():
       level = self.imp_client.query_option_levels.get(option_name,
                                                       QueryOptionLevels.REGULAR)
@@ -271,9 +272,12 @@ class ImpalaShell(object, cmd.Cmd):
         development_options[option_name] = option_value
       elif level == QueryOptionLevels.DEPRECATED:
         deprecated_options[option_name] = option_value
+      elif level == QueryOptionLevels.REMOVED:
+        removed_options[option_name] = option_value
       else:
         advanced_options[option_name] = option_value
-    return (regular_options, advanced_options, development_options, deprecated_options)
+    return (regular_options, advanced_options, development_options, deprecated_options,
+        removed_options)
 
   def _print_option_group(self, query_options):
     """Gets query options and prints them. Value is inside [] for the ones having
@@ -683,6 +687,9 @@ class ImpalaShell(object, cmd.Cmd):
         print "Available query options, with their values (defaults shown in []):"
         self._print_options(QueryOptionDisplayModes.REGULAR_OPTIONS_ONLY)
         return CmdStatus.ERROR
+      if self.imp_client.query_option_levels[option_upper] == QueryOptionLevels.REMOVED:
+        self._print_if_verbose("Ignoring removed query option: '{0}'".format(tokens[0]))
+        return CmdStatus.SUCCESS
       self.set_query_options[option_upper] = tokens[1]
       self._print_if_verbose('%s set to %s' % (option_upper, tokens[1]))
 

http://git-wip-us.apache.org/repos/asf/impala/blob/acfd169c/testdata/workloads/functional-query/queries/QueryTest/set.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/set.test b/testdata/workloads/functional-query/queries/QueryTest/set.test
index 32ad938..57c5131 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/set.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/set.test
@@ -7,21 +7,17 @@ set buffer_pool_limit=7;
 ---- QUERY
 set all;
 ---- RESULTS: VERIFY_IS_SUBSET
-'ABORT_ON_DEFAULT_LIMIT_EXCEEDED','0','DEPRECATED'
 'ABORT_ON_ERROR','0','REGULAR'
 'ALLOW_UNSUPPORTED_FORMATS','0','DEPRECATED'
 'BATCH_SIZE','0','DEVELOPMENT'
 'BUFFER_POOL_LIMIT','','ADVANCED'
 'DEBUG_ACTION','','DEVELOPMENT'
-'DEFAULT_ORDER_BY_LIMIT','-1','DEPRECATED'
-'DISABLE_CACHED_READS','0','DEPRECATED'
 'DISABLE_CODEGEN','0','REGULAR'
 'DISABLE_OUTERMOST_TOPN','0','DEVELOPMENT'
 'EXPLAIN_LEVEL','1','REGULAR'
 'HBASE_CACHE_BLOCKS','0','ADVANCED'
 'HBASE_CACHING','0','ADVANCED'
 'MAX_ERRORS','100','ADVANCED'
-'MAX_IO_BUFFERS','0','DEPRECATED'
 'MAX_SCAN_RANGE_LENGTH','0','DEVELOPMENT'
 'MEM_LIMIT','0','REGULAR'
 'NUM_NODES','0','DEVELOPMENT'
@@ -29,10 +25,7 @@ set all;
 'COMPRESSION_CODEC','','REGULAR'
 'PARQUET_FILE_SIZE','0','ADVANCED'
 'REQUEST_POOL','','REGULAR'
-'RESERVATION_REQUEST_TIMEOUT','','DEPRECATED'
-'RM_INITIAL_MEM','0','DEPRECATED'
 'SYNC_DDL','0','REGULAR'
-'V_CPU_CORES','','DEPRECATED'
 ---- TYPES
 STRING, STRING, STRING
 ====
@@ -40,21 +33,17 @@ STRING, STRING, STRING
 set explain_level=3;
 set all;
 ---- RESULTS: VERIFY_IS_SUBSET
-'ABORT_ON_DEFAULT_LIMIT_EXCEEDED','0','DEPRECATED'
 'ABORT_ON_ERROR','0','REGULAR'
 'ALLOW_UNSUPPORTED_FORMATS','0','DEPRECATED'
 'BATCH_SIZE','0','DEVELOPMENT'
 'BUFFER_POOL_LIMIT','','ADVANCED'
 'DEBUG_ACTION','','DEVELOPMENT'
-'DEFAULT_ORDER_BY_LIMIT','-1','DEPRECATED'
-'DISABLE_CACHED_READS','0','DEPRECATED'
 'DISABLE_CODEGEN','0','REGULAR'
 'DISABLE_OUTERMOST_TOPN','0','DEVELOPMENT'
 'EXPLAIN_LEVEL','3','REGULAR'
 'HBASE_CACHE_BLOCKS','0','ADVANCED'
 'HBASE_CACHING','0','ADVANCED'
 'MAX_ERRORS','100','ADVANCED'
-'MAX_IO_BUFFERS','0','DEPRECATED'
 'MAX_SCAN_RANGE_LENGTH','0','DEVELOPMENT'
 'MEM_LIMIT','0','REGULAR'
 'NUM_NODES','0','DEVELOPMENT'
@@ -62,10 +51,7 @@ set all;
 'COMPRESSION_CODEC','','REGULAR'
 'PARQUET_FILE_SIZE','0','ADVANCED'
 'REQUEST_POOL','','REGULAR'
-'RESERVATION_REQUEST_TIMEOUT','','DEPRECATED'
-'RM_INITIAL_MEM','0','DEPRECATED'
 'SYNC_DDL','0','REGULAR'
-'V_CPU_CORES','','DEPRECATED'
 ---- TYPES
 STRING, STRING, STRING
 ====
@@ -73,21 +59,17 @@ STRING, STRING, STRING
 set explain_level='0';
 set all;
 ---- RESULTS: VERIFY_IS_SUBSET
-'ABORT_ON_DEFAULT_LIMIT_EXCEEDED','0','DEPRECATED'
 'ABORT_ON_ERROR','0','REGULAR'
 'ALLOW_UNSUPPORTED_FORMATS','0','DEPRECATED'
 'BATCH_SIZE','0','DEVELOPMENT'
 'BUFFER_POOL_LIMIT','','ADVANCED'
 'DEBUG_ACTION','','DEVELOPMENT'
-'DEFAULT_ORDER_BY_LIMIT','-1','DEPRECATED'
-'DISABLE_CACHED_READS','0','DEPRECATED'
 'DISABLE_CODEGEN','0','REGULAR'
 'DISABLE_OUTERMOST_TOPN','0','DEVELOPMENT'
 'EXPLAIN_LEVEL','0','REGULAR'
 'HBASE_CACHE_BLOCKS','0','ADVANCED'
 'HBASE_CACHING','0','ADVANCED'
 'MAX_ERRORS','100','ADVANCED'
-'MAX_IO_BUFFERS','0','DEPRECATED'
 'MAX_SCAN_RANGE_LENGTH','0','DEVELOPMENT'
 'MEM_LIMIT','0','REGULAR'
 'NUM_NODES','0','DEVELOPMENT'
@@ -95,10 +77,7 @@ set all;
 'COMPRESSION_CODEC','','REGULAR'
 'PARQUET_FILE_SIZE','0','ADVANCED'
 'REQUEST_POOL','','REGULAR'
-'RESERVATION_REQUEST_TIMEOUT','','DEPRECATED'
-'RM_INITIAL_MEM','0','DEPRECATED'
 'SYNC_DDL','0','REGULAR'
-'V_CPU_CORES','','DEPRECATED'
 ---- TYPES
 STRING, STRING, STRING
 ====
@@ -241,3 +220,14 @@ set max_row_size=0;
 ---- CATCH
 Invalid max row size of 0. Valid sizes are in [1, 1099511627776]
 ====
+---- QUERY
+# Setting some removed query options should be a no-op.
+set DEFAULT_ORDER_BY_LIMIT="foo";
+set ABORT_ON_DEFAULT_LIMIT_EXCEEDED = "foo";
+set V_CPU_CORES = "foo";
+set RESERVATION_REQUEST_TIMEOUT = "foo";
+set RM_INITIAL_MEM = "foo";
+set SCAN_NODE_CODEGEN_THRESHOLD = "foo";
+set max_io_buffers="foo";
+---- RESULTS
+====

http://git-wip-us.apache.org/repos/asf/impala/blob/acfd169c/tests/comparison/discrepancy_searcher.py
----------------------------------------------------------------------
diff --git a/tests/comparison/discrepancy_searcher.py b/tests/comparison/discrepancy_searcher.py
index e0e1725..df9c562 100755
--- a/tests/comparison/discrepancy_searcher.py
+++ b/tests/comparison/discrepancy_searcher.py
@@ -316,7 +316,6 @@ class QueryExecutor(object):
         SET DISABLE_UNSAFE_SPILLS={disable_unsafe_spills};
         SET EXEC_SINGLE_NODE_ROWS_THRESHOLD={exec_single_node_rows_threshold};
         SET BUFFER_POOL_LIMIT={buffer_pool_limit};
-        SET MAX_IO_BUFFERS={max_io_buffers};
         SET MAX_SCAN_RANGE_LENGTH={max_scan_range_length};
         SET NUM_NODES={num_nodes};
         SET NUM_SCANNER_THREADS={num_scanner_threads};
@@ -334,7 +333,6 @@ class QueryExecutor(object):
             disable_unsafe_spills=choice((0, 1)),
             exec_single_node_rows_threshold=randint(1, 100000000),
             buffer_pool_limit=randint(1, 100000000),
-            max_io_buffers=randint(1, 100000000),
             max_scan_range_length=randint(1, 100000000),
             num_nodes=randint(3, 3),
             num_scanner_threads=randint(1, 100),

http://git-wip-us.apache.org/repos/asf/impala/blob/acfd169c/tests/custom_cluster/test_admission_controller.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py
index ccbbd32..69cabd8 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -298,12 +298,11 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
       # Should be able to set query options (overriding defaults if applicable) with the
       # config overlay sent with the query RPC. mem_limit is a pool-level override and
       # max_io_buffers has no proc/pool default.
-      client.set_configuration({'request_pool': 'root.queueA', 'mem_limit': '12345',
-                                'max_io_buffers': '100'})
+      client.set_configuration({'request_pool': 'root.queueA', 'mem_limit': '12345'})
       result = client.execute("select 1")
       self.__check_query_options(result.runtime_profile,\
           ['MEM_LIMIT=12345', 'QUERY_TIMEOUT_S=5', 'REQUEST_POOL=root.queueA',\
-           'ABORT_ON_ERROR=1', 'MAX_IO_BUFFERS=100'])
+           'ABORT_ON_ERROR=1'])
 
       # Once options are reset to their defaults, the queue
       # configuration should kick back in. We'll see the

http://git-wip-us.apache.org/repos/asf/impala/blob/acfd169c/tests/hs2/test_hs2.py
----------------------------------------------------------------------
diff --git a/tests/hs2/test_hs2.py b/tests/hs2/test_hs2.py
index c90113c..ccdcdea 100644
--- a/tests/hs2/test_hs2.py
+++ b/tests/hs2/test_hs2.py
@@ -93,7 +93,10 @@ class TestHS2(HS2TestSuite):
     assert "MAX_ERRORS" in vals2
     assert levels["MAX_ERRORS"] == "ADVANCED"
     assert "DEBUG_ACTION" not in vals2
-    assert "SCAN_NODE_CODEGEN_THRESHOLD" not in vals2
+    assert "ALLOW_UNSUPPORTED_FORMATS" not in vals2
+
+    # Removed options should not be returned.
+    assert "MAX_IO_BUFFERS" not in vals2
 
   @needs_session()
   def test_session_option_levels_via_set_all(self):
@@ -106,12 +109,15 @@ class TestHS2(HS2TestSuite):
     assert "SYNC_DDL" in vals
     assert "MAX_ERRORS" in vals
     assert "DEBUG_ACTION" in vals
-    assert "SCAN_NODE_CODEGEN_THRESHOLD" in vals
+    assert "ALLOW_UNSUPPORTED_FORMATS" in vals
     assert levels["COMPRESSION_CODEC"] == "REGULAR"
     assert levels["SYNC_DDL"] == "REGULAR"
     assert levels["MAX_ERRORS"] == "ADVANCED"
     assert levels["DEBUG_ACTION"] == "DEVELOPMENT"
-    assert levels["SCAN_NODE_CODEGEN_THRESHOLD"] == "DEPRECATED"
+    assert levels["ALLOW_UNSUPPORTED_FORMATS"] == "DEPRECATED"
+
+    # Removed options should not be returned.
+    assert "MAX_IO_BUFFERS" not in vals
 
   def test_open_session_http_addr(self):
     """Check that OpenSession returns the coordinator's http address."""

http://git-wip-us.apache.org/repos/asf/impala/blob/acfd169c/tests/query_test/test_observability.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_observability.py b/tests/query_test/test_observability.py
index 85fc4f1..e838081 100644
--- a/tests/query_test/test_observability.py
+++ b/tests/query_test/test_observability.py
@@ -104,7 +104,7 @@ class TestObservability(ImpalaTestSuite):
     # Set a query option explicitly through client
     self.execute_query("set MEM_LIMIT = 8589934592")
     # Make sure explicitly set default values are not shown in the profile
-    self.execute_query("set MAX_IO_BUFFERS = 0")
+    self.execute_query("set runtime_filter_wait_time_ms = 0")
     runtime_profile = self.execute_query("select 1").runtime_profile
     assert "Query Options (set by configuration): MEM_LIMIT=8589934592" in runtime_profile
     # For this query, the planner sets NUM_NODES=1, NUM_SCANNER_THREADS=1,

http://git-wip-us.apache.org/repos/asf/impala/blob/acfd169c/tests/shell/test_shell_commandline.py
----------------------------------------------------------------------
diff --git a/tests/shell/test_shell_commandline.py b/tests/shell/test_shell_commandline.py
index c2e7509..ee9b4db 100644
--- a/tests/shell/test_shell_commandline.py
+++ b/tests/shell/test_shell_commandline.py
@@ -200,6 +200,12 @@ class TestImpalaShell(ImpalaTestSuite):
     result = run_impala_shell_cmd(args)
     assert 'WARNINGS:' not in result.stderr
 
+  def test_removed_query_option(self):
+    """Test that removed query options produce warning."""
+    result = run_impala_shell_cmd("-q 'set disable_cached_reads=true'",
+        expect_success=True)
+    assert "Ignoring removed query option: 'disable_cached_reads'" in result.stderr
+
   def test_output_format(self):
     expected_output = ['1'] * 3
     args = '-q "select 1,1,1" -B --quiet'
@@ -230,10 +236,10 @@ class TestImpalaShell(ImpalaTestSuite):
     run_impala_shell_cmd(args)
     # set
     # spaces around the = sign
-    args = '-q "set default_order_by_limit  =   10"'
+    args = '-q "set batch_size  =   10"'
     run_impala_shell_cmd(args)
     # no spaces around the = sign
-    args = '-q "set default_order_by_limit=10"'
+    args = '-q "set batch_size=10"'
     run_impala_shell_cmd(args)
     # test query options displayed
     args = '-q "set"'
@@ -251,10 +257,10 @@ class TestImpalaShell(ImpalaTestSuite):
     assert 'MEM_LIMIT: [0]' not in result_set.stdout
     # Negative tests for set
     # use : instead of =
-    args = '-q "set default_order_by_limit:10"'
+    args = '-q "set batch_size:10"'
     run_impala_shell_cmd(args, expect_success=False)
     # use 2 = signs
-    args = '-q "set default_order_by_limit=10=50"'
+    args = '-q "set batch_size=10=50"'
     run_impala_shell_cmd(args, expect_success=False)
     # describe and desc should return the same result.
     args = '-q "describe %s" -B' % empty_table

http://git-wip-us.apache.org/repos/asf/impala/blob/acfd169c/tests/shell/test_shell_interactive.py
----------------------------------------------------------------------
diff --git a/tests/shell/test_shell_interactive.py b/tests/shell/test_shell_interactive.py
index a104809..7f7f955 100755
--- a/tests/shell/test_shell_interactive.py
+++ b/tests/shell/test_shell_interactive.py
@@ -367,10 +367,12 @@ class TestImpalaShellInteractive(object):
     assert "Advanced Query Options:" in result.stdout
     assert "APPX_COUNT_DISTINCT" in result.stdout
     assert "SUPPORT_START_OVER" in result.stdout
+    # Development, deprecated and removed options should not be shown.
     assert "Development Query Options:" not in result.stdout
     assert "DEBUG_ACTION" not in result.stdout
     assert "Deprecated Query Options:" not in result.stdout
-    assert "ABORT_ON_DEFAULT_LIMIT_EXCEEDED" not in result.stdout
+    assert "ALLOW_UNSUPPORTED_FORMATS" not in result.stdout
+    assert "MAX_IO_BUFFERS" not in result.stdout
 
     shell2 = ImpalaShell()
     shell2.send_cmd("set all")
@@ -388,7 +390,9 @@ class TestImpalaShellInteractive(object):
     assert "APPX_COUNT_DISTINCT" in advanced_part
     assert "SUPPORT_START_OVER" in advanced_part
     assert "DEBUG_ACTION" in development_part
-    assert "ABORT_ON_DEFAULT_LIMIT_EXCEEDED" in result.stdout[deprecated_part_start_idx:]
+    assert "ALLOW_UNSUPPORTED_FORMATS" in result.stdout[deprecated_part_start_idx:]
+    # Removed options should not be shown.
+    assert "MAX_IO_BUFFERS" not in result.stdout
 
   def check_command_case_sensitivity(self, command, expected):
     shell = ImpalaShell()


[2/4] impala git commit: IMPALA-3282: Adds regexp_escape built-in function

Posted by mi...@apache.org.
IMPALA-3282: Adds regexp_escape built-in function

Escapes the following special characters in RE2 library:
.\+*?[^]$(){}=!<>|:-

Testing:
Add some unit tests into ExprTest.StringRegexpFunctions
Add some E2E tests into exprs.test

Change-Id: I84c3e0ded26f6eb20794c38b75be9b25cd111e4b
Reviewed-on: http://gerrit.cloudera.org:8080/8900
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/1b1087eb
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/1b1087eb
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/1b1087eb

Branch: refs/heads/master
Commit: 1b1087eb0546d81574e8c7fdcaf3fc11d6db3cea
Parents: 99234d2
Author: Jinchul <ji...@gmail.com>
Authored: Tue Dec 19 11:29:16 2017 +0900
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Feb 1 05:14:14 2018 +0000

----------------------------------------------------------------------
 be/src/exprs/expr-test.cc                       | 40 +++++++++++++++++++-
 be/src/exprs/string-functions-ir.cc             | 23 +++++++++++
 be/src/exprs/string-functions.h                 |  1 +
 common/function-registry/impala_functions.py    |  1 +
 .../queries/QueryTest/exprs.test                | 28 ++++++++++++++
 5 files changed, 91 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/1b1087eb/be/src/exprs/expr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/expr-test.cc b/be/src/exprs/expr-test.cc
index 877127d..21911a1 100644
--- a/be/src/exprs/expr-test.cc
+++ b/be/src/exprs/expr-test.cc
@@ -4244,10 +4244,46 @@ TEST_F(ExprTest, StringRegexpFunctions) {
   TestIsNull("regexp_match_count(NULL, '.*')", TYPE_INT);
   TestIsNull("regexp_match_count('a123', NULL)", TYPE_INT);
   TestIsNull("regexp_match_count(NULL, NULL)", TYPE_INT);
+
+  TestIsNull("regexp_escape(NULL)", TYPE_STRING);
+  TestStringValue("regexp_escape('')", "");
+  // Test special character escape
+  // .\+*?[^]$(){}=!<>|:-
+  TestStringValue("regexp_escape('Hello.world')", R"(Hello\.world)");
+  TestStringValue(R"(regexp_escape('Hello\\world'))", R"(Hello\\world)");
+  TestStringValue("regexp_escape('Hello+world')", R"(Hello\+world)");
+  TestStringValue("regexp_escape('Hello*world')", R"(Hello\*world)");
+  TestStringValue("regexp_escape('Hello?world')", R"(Hello\?world)");
+  TestStringValue("regexp_escape('Hello[world')", R"(Hello\[world)");
+  TestStringValue("regexp_escape('Hello^world')", R"(Hello\^world)");
+  TestStringValue("regexp_escape('Hello]world')", R"(Hello\]world)");
+  TestStringValue("regexp_escape('Hello$world')", R"(Hello\$world)");
+  TestStringValue("regexp_escape('Hello(world')", R"(Hello\(world)");
+  TestStringValue("regexp_escape('Hello)world')", R"(Hello\)world)");
+  TestStringValue("regexp_escape('Hello{world')", R"(Hello\{world)");
+  TestStringValue("regexp_escape('Hello}world')", R"(Hello\}world)");
+  TestStringValue("regexp_escape('Hello=world')", R"(Hello\=world)");
+  TestStringValue("regexp_escape('Hello!world')", R"(Hello\!world)");
+  TestStringValue("regexp_escape('Hello<world')", R"(Hello\<world)");
+  TestStringValue("regexp_escape('Hello>world')", R"(Hello\>world)");
+  TestStringValue("regexp_escape('Hello|world')", R"(Hello\|world)");
+  TestStringValue("regexp_escape('Hello:world')", R"(Hello\:world)");
+  TestStringValue("regexp_escape('Hello-world')", R"(Hello\-world)");
+  // Mixed case
+  TestStringValue(R"(regexp_escape('a.b\\c+d*e?f[g]h$i(j)k{l}m=n!o<p>q|r:s-t'))",
+      R"(a\.b\\c\+d\*e\?f\[g\]h\$i\(j\)k\{l\}m\=n\!o\<p\>q\|r\:s\-t)");
+  // Mixed case with other regexp_* functions
+  TestStringValue(R"(regexp_extract(regexp_escape('Hello\\world'),)"
+      R"('([[:alpha:]]+)(\\\\\\\\)([[:alpha:]]+)', 0))", R"(Hello\\world)");
+  TestStringValue(R"(regexp_extract(regexp_escape('Hello\\world'),)"
+      R"('([[:alpha:]]+)(\\\\\\\\)([[:alpha:]]+)', 1))", "Hello");
+  TestStringValue(R"(regexp_extract(regexp_escape('Hello\\world'),)"
+      R"('([[:alpha:]]+)(\\\\\\\\)([[:alpha:]]+)', 2))", R"(\\)");
+  TestStringValue(R"(regexp_extract(regexp_escape('Hello\\world'),)"
+      R"('([[:alpha:]]+)(\\\\\\\\)([[:alpha:]]+)', 3))", "world");
 }
 
-TEST_F(ExprTest, StringParseUrlFunction) {
-  // TODO: For now, our parse_url my not behave exactly like Hive
+TEST_F(ExprTest, StringParseUrlFunction) { // TODO: For now, our parse_url my not behave exactly like Hive
   // when given malformed URLs.
   // If necessary, we can closely follow Java's URL implementation
   // to behave exactly like Hive.

http://git-wip-us.apache.org/repos/asf/impala/blob/1b1087eb/be/src/exprs/string-functions-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/string-functions-ir.cc b/be/src/exprs/string-functions-ir.cc
index 50378bd..50c37b2 100644
--- a/be/src/exprs/string-functions-ir.cc
+++ b/be/src/exprs/string-functions-ir.cc
@@ -26,6 +26,7 @@
 
 #include "exprs/anyval-util.h"
 #include "exprs/scalar-expr.h"
+#include "gutil/strings/charset.h"
 #include "runtime/string-value.inline.h"
 #include "runtime/tuple-row.h"
 #include "util/bit-util.h"
@@ -670,6 +671,28 @@ void StringFunctions::RegexpClose(
   context->SetFunctionState(scope, nullptr);
 }
 
+StringVal StringFunctions::RegexpEscape(FunctionContext* context, const StringVal& str) {
+  if (str.is_null) return StringVal::null();
+  if (str.len == 0) return str;
+
+  static const strings::CharSet REGEX_ESCAPE_CHARACTERS(".\\+*?[^]$(){}=!<>|:-");
+  const uint8_t* const start_ptr = str.ptr;
+  const uint8_t* const end_ptr = start_ptr + str.len;
+  StringVal result(context, str.len * 2);
+  if (UNLIKELY(result.is_null)) return StringVal::null();
+  uint8_t* dest_ptr = result.ptr;
+  for (const uint8_t* c = start_ptr; c < end_ptr; ++c) {
+    if (REGEX_ESCAPE_CHARACTERS.Test(*c)) {
+      *dest_ptr++ = '\\';
+    }
+    *dest_ptr++ = *c;
+  }
+  result.len = dest_ptr - result.ptr;
+  DCHECK_GE(result.len, str.len);
+
+  return result;
+}
+
 StringVal StringFunctions::RegexpExtract(FunctionContext* context, const StringVal& str,
     const StringVal& pattern, const BigIntVal& index) {
   if (str.is_null || pattern.is_null || index.is_null) return StringVal::null();

http://git-wip-us.apache.org/repos/asf/impala/blob/1b1087eb/be/src/exprs/string-functions.h
----------------------------------------------------------------------
diff --git a/be/src/exprs/string-functions.h b/be/src/exprs/string-functions.h
index 91ad2cc..45876f8 100644
--- a/be/src/exprs/string-functions.h
+++ b/be/src/exprs/string-functions.h
@@ -117,6 +117,7 @@ class StringFunctions {
       re2::RE2::Options* opts);
   static void RegexpPrepare(FunctionContext*, FunctionContext::FunctionStateScope);
   static void RegexpClose(FunctionContext*, FunctionContext::FunctionStateScope);
+  static StringVal RegexpEscape(FunctionContext*, const StringVal& str);
   static StringVal RegexpExtract(FunctionContext*, const StringVal& str,
       const StringVal& pattern, const BigIntVal& index);
   static StringVal RegexpReplace(FunctionContext*, const StringVal& str,

http://git-wip-us.apache.org/repos/asf/impala/blob/1b1087eb/common/function-registry/impala_functions.py
----------------------------------------------------------------------
diff --git a/common/function-registry/impala_functions.py b/common/function-registry/impala_functions.py
index b78062b..8174abb 100644
--- a/common/function-registry/impala_functions.py
+++ b/common/function-registry/impala_functions.py
@@ -462,6 +462,7 @@ visible_functions = [
   [['locate'], 'INT', ['STRING', 'STRING'], 'impala::StringFunctions::Locate'],
   [['locate'], 'INT', ['STRING', 'STRING', 'BIGINT'],
    'impala::StringFunctions::LocatePos'],
+  [['regexp_escape'], 'STRING', ['STRING'], 'impala::StringFunctions::RegexpEscape'],
   [['regexp_extract'], 'STRING', ['STRING', 'STRING', 'BIGINT'],
    'impala::StringFunctions::RegexpExtract',
    '_ZN6impala15StringFunctions13RegexpPrepareEPN10impala_udf15FunctionContextENS2_18FunctionStateScopeE',

http://git-wip-us.apache.org/repos/asf/impala/blob/1b1087eb/testdata/workloads/functional-query/queries/QueryTest/exprs.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/exprs.test b/testdata/workloads/functional-query/queries/QueryTest/exprs.test
index 1cb803b..71759f3 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/exprs.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/exprs.test
@@ -2451,6 +2451,34 @@ select regexp_match_count(tmp.str, tmp.pattern, tmp.start_pos, tmp.params) from
 Illegal match parameter x
 ====
 ---- QUERY
+select regexp_escape(tmp.str) from (values
+('a.b\\c+d*e?f[g]h$i(j)k{l}m=n!o<p>q|r:s-t' as str)) as tmp
+---- RESULTS
+'a\\.b\\\\c\\+d\\*e\\?f\\[g\\]h\\$i\\(j\\)k\\{l\\}m\\=n\\!o\\<p\\>q\\|r\\:s\\-t'
+---- TYPES
+string
+====
+---- QUERY
+select regexp_extract(regexp_escape(tmp.str),
+tmp.pattern, tmp.index) from (values
+('Hello\\world' as str, '([[:alpha:]]+)(\\\\\\\\)([[:alpha:]]+)' as pattern, 2 as index)
+) as tmp
+---- RESULTS
+'\\\\'
+---- TYPES
+string
+====
+---- QUERY
+select regexp_extract(regexp_escape(tmp.str),
+tmp.pattern, tmp.index) from (values
+('Hello\\world' as str, '([[:alpha:]]+)(\\\\\\\\)([[:alpha:]]+)' as pattern, 3 as index)
+) as tmp
+---- RESULTS
+'world'
+---- TYPES
+string
+====
+---- QUERY
 # IMPALA-2147: IS [NOT] DISTINCT FROM and "<=>"
 select NULL <=> NULL
 ---- RESULTS