You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by mi...@apache.org on 2018/08/22 23:20:20 UTC

[1/7] impala git commit: cleanup: remove RuntimeState::exec_env_

Repository: impala
Updated Branches:
  refs/heads/master bdd904922 -> 649f175df


cleanup: remove RuntimeState::exec_env_

Change-Id: I4a1b1bdd41e3d10982b3a4bdb0217e716b4df67f
Reviewed-on: http://gerrit.cloudera.org:8080/11269
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/fccaa723
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/fccaa723
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/fccaa723

Branch: refs/heads/master
Commit: fccaa72366e972228178687adcb0ee1d8e8c5930
Parents: bdd9049
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Sun Aug 19 00:08:16 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Wed Aug 22 06:07:51 2018 +0000

----------------------------------------------------------------------
 be/src/exec/grouping-aggregator.cc           |  2 +-
 be/src/exec/hbase-scan-node.cc               |  4 +++-
 be/src/exec/hbase-table-writer.cc            |  3 ++-
 be/src/exec/hdfs-orc-scanner.h               |  3 ++-
 be/src/exec/hdfs-scan-node-base.cc           | 10 +++++----
 be/src/exec/kudu-scan-node-base.cc           |  2 +-
 be/src/exec/kudu-table-sink.cc               |  4 ++--
 be/src/exec/partitioned-hash-join-builder.cc |  4 ++--
 be/src/runtime/buffered-tuple-stream.cc      |  2 +-
 be/src/runtime/fragment-instance-state.cc    |  2 +-
 be/src/runtime/reservation-manager.cc        |  2 +-
 be/src/runtime/runtime-filter-bank.cc        |  4 ++--
 be/src/runtime/runtime-state.cc              | 26 ++---------------------
 be/src/runtime/runtime-state.h               |  8 -------
 14 files changed, 26 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/fccaa723/be/src/exec/grouping-aggregator.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/grouping-aggregator.cc b/be/src/exec/grouping-aggregator.cc
index 42d6be8..0ac4926 100644
--- a/be/src/exec/grouping-aggregator.cc
+++ b/be/src/exec/grouping-aggregator.cc
@@ -210,7 +210,7 @@ Status GroupingAggregator::Open(RuntimeState* state) {
 
   if (ht_allocator_ == nullptr) {
     // Allocate 'serialize_stream_' and 'ht_allocator_' on the first Open() call.
-    ht_allocator_.reset(new Suballocator(state_->exec_env()->buffer_pool(),
+    ht_allocator_.reset(new Suballocator(ExecEnv::GetInstance()->buffer_pool(),
         buffer_pool_client(), resource_profile_.spillable_buffer_size));
 
     if (!is_streaming_preagg_ && needs_serialize_) {

http://git-wip-us.apache.org/repos/asf/impala/blob/fccaa723/be/src/exec/hbase-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hbase-scan-node.cc b/be/src/exec/hbase-scan-node.cc
index 5aee999..148be66 100644
--- a/be/src/exec/hbase-scan-node.cc
+++ b/be/src/exec/hbase-scan-node.cc
@@ -19,6 +19,7 @@
 
 #include <algorithm>
 
+#include "runtime/exec-env.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/runtime-state.h"
 #include "runtime/row-batch.h"
@@ -60,7 +61,8 @@ Status HBaseScanNode::Prepare(RuntimeState* state) {
   hbase_read_timer_ = ADD_TIMER(runtime_profile(), TOTAL_HBASE_READ_TIMER);
   AddBytesReadCounters();
 
-  hbase_scanner_.reset(new HBaseTableScanner(this, state->htable_factory(), state));
+  hbase_scanner_.reset(
+      new HBaseTableScanner(this, ExecEnv::GetInstance()->htable_factory(), state));
 
   tuple_desc_ = state->desc_tbl().GetTupleDescriptor(tuple_id_);
   if (tuple_desc_ == NULL) {

http://git-wip-us.apache.org/repos/asf/impala/blob/fccaa723/be/src/exec/hbase-table-writer.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hbase-table-writer.cc b/be/src/exec/hbase-table-writer.cc
index 2723eb7..cf71522 100644
--- a/be/src/exec/hbase-table-writer.cc
+++ b/be/src/exec/hbase-table-writer.cc
@@ -23,6 +23,7 @@
 #include "common/logging.h"
 #include "exprs/scalar-expr.h"
 #include "exprs/scalar-expr-evaluator.h"
+#include "runtime/exec-env.h"
 #include "runtime/hbase-table-factory.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/raw-value.h"
@@ -53,7 +54,7 @@ HBaseTableWriter::HBaseTableWriter(HBaseTableDescriptor* table_desc,
     runtime_profile_(profile) { }
 
 Status HBaseTableWriter::Init(RuntimeState* state) {
-  RETURN_IF_ERROR(state->htable_factory()->GetTable(table_desc_->name(),
+  RETURN_IF_ERROR(ExecEnv::GetInstance()->htable_factory()->GetTable(table_desc_->name(),
       &table_));
   encoding_timer_ = ADD_TIMER(runtime_profile_, "EncodingTimer");
   htable_put_timer_ = ADD_TIMER(runtime_profile_, "HTablePutTimer");

http://git-wip-us.apache.org/repos/asf/impala/blob/fccaa723/be/src/exec/hdfs-orc-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-orc-scanner.h b/be/src/exec/hdfs-orc-scanner.h
index 132442e..5b9be5b 100644
--- a/be/src/exec/hdfs-orc-scanner.h
+++ b/be/src/exec/hdfs-orc-scanner.h
@@ -21,6 +21,7 @@
 
 #include <orc/OrcFile.hh>
 
+#include "runtime/exec-env.h"
 #include "runtime/io/disk-io-mgr.h"
 #include "runtime/runtime-state.h"
 #include "exec/hdfs-scanner.h"
@@ -81,7 +82,7 @@ class HdfsOrcScanner : public HdfsScanner {
     }
 
     uint64_t getNaturalReadSize() const {
-      return scanner_->state_->io_mgr()->max_buffer_size();
+      return ExecEnv::GetInstance()->disk_io_mgr()->max_buffer_size();
     }
 
     void read(void* buf, uint64_t length, uint64_t offset);

http://git-wip-us.apache.org/repos/asf/impala/blob/fccaa723/be/src/exec/hdfs-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index 6454652..09c2c0d 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -340,7 +340,7 @@ Status HdfsScanNodeBase::Open(RuntimeState* state) {
   }
 
   RETURN_IF_ERROR(ClaimBufferReservation(state));
-  reader_context_ = runtime_state_->io_mgr()->RegisterContext();
+  reader_context_ = ExecEnv::GetInstance()->disk_io_mgr()->RegisterContext();
 
   // Initialize HdfsScanNode specific counters
   hdfs_read_timer_ = ADD_TIMER(runtime_profile(), TOTAL_HDFS_READ_TIMER);
@@ -392,7 +392,8 @@ Status HdfsScanNodeBase::Open(RuntimeState* state) {
       "MaxCompressedTextFileLength", TUnit::BYTES);
 
   hdfs_read_thread_concurrency_bucket_ = runtime_profile()->AddBucketingCounters(
-      &active_hdfs_read_thread_counter_, state->io_mgr()->num_total_disks() + 1);
+      &active_hdfs_read_thread_counter_,
+      ExecEnv::GetInstance()->disk_io_mgr()->num_total_disks() + 1);
 
   counters_running_ = true;
 
@@ -413,7 +414,7 @@ void HdfsScanNodeBase::Close(RuntimeState* state) {
   if (reader_context_ != nullptr) {
     // Need to wait for all the active scanner threads to finish to ensure there is no
     // more memory tracked by this scan node's mem tracker.
-    state->io_mgr()->UnregisterContext(reader_context_.get());
+    ExecEnv::GetInstance()->disk_io_mgr()->UnregisterContext(reader_context_.get());
   }
 
   StopAndFinalizeCounters();
@@ -569,7 +570,8 @@ ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char* file,
   DCHECK_GE(len, 0);
   DCHECK_LE(offset + len, GetFileDesc(metadata->partition_id, file)->file_length)
       << "Scan range beyond end of file (offset=" << offset << ", len=" << len << ")";
-  disk_id = runtime_state_->io_mgr()->AssignQueue(file, disk_id, expected_local);
+  disk_id = ExecEnv::GetInstance()->disk_io_mgr()->AssignQueue(
+      file, disk_id, expected_local);
 
   ScanRange* range = runtime_state_->obj_pool()->Add(new ScanRange);
   range->Reset(fs, file, len, offset, disk_id, expected_local, buffer_opts, metadata);

http://git-wip-us.apache.org/repos/asf/impala/blob/fccaa723/be/src/exec/kudu-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scan-node-base.cc b/be/src/exec/kudu-scan-node-base.cc
index 0e7cdfa..b57f4ee 100644
--- a/be/src/exec/kudu-scan-node-base.cc
+++ b/be/src/exec/kudu-scan-node-base.cc
@@ -92,7 +92,7 @@ Status KuduScanNodeBase::Open(RuntimeState* state) {
   const KuduTableDescriptor* table_desc =
       static_cast<const KuduTableDescriptor*>(tuple_desc_->table_desc());
 
-  RETURN_IF_ERROR(runtime_state_->exec_env()->GetKuduClient(
+  RETURN_IF_ERROR(ExecEnv::GetInstance()->GetKuduClient(
       table_desc->kudu_master_addresses(), &client_));
 
   uint64_t latest_ts = static_cast<uint64_t>(

http://git-wip-us.apache.org/repos/asf/impala/blob/fccaa723/be/src/exec/kudu-table-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-table-sink.cc b/be/src/exec/kudu-table-sink.cc
index 32ba2bb..b50394d 100644
--- a/be/src/exec/kudu-table-sink.cc
+++ b/be/src/exec/kudu-table-sink.cc
@@ -120,8 +120,8 @@ Status KuduTableSink::Open(RuntimeState* state) {
   }
   client_tracked_bytes_ = required_mem;
 
-  RETURN_IF_ERROR(
-      state->exec_env()->GetKuduClient(table_desc_->kudu_master_addresses(), &client_));
+  RETURN_IF_ERROR(ExecEnv::GetInstance()->GetKuduClient(
+      table_desc_->kudu_master_addresses(), &client_));
   KUDU_RETURN_IF_ERROR(client_->OpenTable(table_desc_->table_name(), &table_),
       "Unable to open Kudu table");
 

http://git-wip-us.apache.org/repos/asf/impala/blob/fccaa723/be/src/exec/partitioned-hash-join-builder.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-builder.cc b/be/src/exec/partitioned-hash-join-builder.cc
index c627b98..c2980b5 100644
--- a/be/src/exec/partitioned-hash-join-builder.cc
+++ b/be/src/exec/partitioned-hash-join-builder.cc
@@ -149,8 +149,8 @@ Status PhjBuilder::Open(RuntimeState* state) {
   }
   if (ht_allocator_ == nullptr) {
     // Create 'ht_allocator_' on the first call to Open().
-    ht_allocator_.reset(new Suballocator(
-        state->exec_env()->buffer_pool(), buffer_pool_client_, spillable_buffer_size_));
+    ht_allocator_.reset(new Suballocator(ExecEnv::GetInstance()->buffer_pool(),
+        buffer_pool_client_, spillable_buffer_size_));
   }
   RETURN_IF_ERROR(CreateHashPartitions(0));
   AllocateRuntimeFilters();

http://git-wip-us.apache.org/repos/asf/impala/blob/fccaa723/be/src/runtime/buffered-tuple-stream.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream.cc b/be/src/runtime/buffered-tuple-stream.cc
index 71175d1..e0cf854 100644
--- a/be/src/runtime/buffered-tuple-stream.cc
+++ b/be/src/runtime/buffered-tuple-stream.cc
@@ -54,7 +54,7 @@ BufferedTupleStream::BufferedTupleStream(RuntimeState* state,
     int64_t default_page_len, int64_t max_page_len, const set<SlotId>& ext_varlen_slots)
   : state_(state),
     desc_(row_desc),
-    buffer_pool_(state->exec_env()->buffer_pool()),
+    buffer_pool_(ExecEnv::GetInstance()->buffer_pool()),
     buffer_pool_client_(buffer_pool_client),
     read_page_reservation_(buffer_pool_client_),
     write_page_reservation_(buffer_pool_client_),

http://git-wip-us.apache.org/repos/asf/impala/blob/fccaa723/be/src/runtime/fragment-instance-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/fragment-instance-state.cc b/be/src/runtime/fragment-instance-state.cc
index 51ff13d..700a391 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -123,7 +123,7 @@ void FragmentInstanceState::Cancel() {
   DCHECK(runtime_state_ != nullptr);
   runtime_state_->set_is_cancelled();
   if (root_sink_ != nullptr) root_sink_->Cancel(runtime_state_);
-  runtime_state_->stream_mgr()->Cancel(runtime_state_->fragment_instance_id());
+  ExecEnv::GetInstance()->stream_mgr()->Cancel(runtime_state_->fragment_instance_id());
 }
 
 Status FragmentInstanceState::Prepare() {

http://git-wip-us.apache.org/repos/asf/impala/blob/fccaa723/be/src/runtime/reservation-manager.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/reservation-manager.cc b/be/src/runtime/reservation-manager.cc
index 1712e09..c87f92c 100644
--- a/be/src/runtime/reservation-manager.cc
+++ b/be/src/runtime/reservation-manager.cc
@@ -44,7 +44,7 @@ void ReservationManager::Close(RuntimeState* state) {
     VLOG_FILE << name_ << " returning reservation " << resource_profile_.min_reservation;
     state->query_state()->initial_reservations()->Return(
         &buffer_pool_client_, resource_profile_.min_reservation);
-    state->exec_env()->buffer_pool()->DeregisterClient(&buffer_pool_client_);
+    ExecEnv::GetInstance()->buffer_pool()->DeregisterClient(&buffer_pool_client_);
   }
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/fccaa723/be/src/runtime/runtime-filter-bank.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-filter-bank.cc b/be/src/runtime/runtime-filter-bank.cc
index 64638a6..e1a2512 100644
--- a/be/src/runtime/runtime-filter-bank.cc
+++ b/be/src/runtime/runtime-filter-bank.cc
@@ -61,7 +61,7 @@ Status RuntimeFilterBank::ClaimBufferReservation() {
   DCHECK(!buffer_pool_client_.is_registered());
   string filter_bank_name = Substitute(
       "RuntimeFilterBank (Fragment Id: $0)", PrintId(state_->fragment_instance_id()));
-  RETURN_IF_ERROR(state_->exec_env()->buffer_pool()->RegisterClient(filter_bank_name,
+  RETURN_IF_ERROR(ExecEnv::GetInstance()->buffer_pool()->RegisterClient(filter_bank_name,
       state_->query_state()->file_group(), state_->instance_buffer_reservation(),
       filter_mem_tracker_.get(), total_bloom_filter_mem_required_,
       state_->runtime_profile(), &buffer_pool_client_));
@@ -268,7 +268,7 @@ void RuntimeFilterBank::Close() {
               << ") returning reservation " << total_bloom_filter_mem_required_;
     state_->query_state()->initial_reservations()->Return(
         &buffer_pool_client_, total_bloom_filter_mem_required_);
-    state_->exec_env()->buffer_pool()->DeregisterClient(&buffer_pool_client_);
+    ExecEnv::GetInstance()->buffer_pool()->DeregisterClient(&buffer_pool_client_);
   }
   DCHECK_EQ(filter_mem_tracker_->consumption(), 0);
   filter_mem_tracker_->Close();

http://git-wip-us.apache.org/repos/asf/impala/blob/fccaa723/be/src/runtime/runtime-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc
index 5bb5a2d..dedb5f5 100644
--- a/be/src/runtime/runtime-state.cc
+++ b/be/src/runtime/runtime-state.cc
@@ -73,7 +73,6 @@ RuntimeState::RuntimeState(QueryState* query_state, const TPlanFragmentCtx& frag
     utc_timestamp_(new TimestampValue(TimestampValue::Parse(
         query_state->query_ctx().utc_timestamp_string))),
     local_time_zone_(&TimezoneDatabase::GetUtcTimezone()),
-    exec_env_(exec_env),
     profile_(RuntimeProfile::Create(
           obj_pool(), "Fragment " + PrintId(instance_ctx.fragment_instance_id))),
     instance_buffer_reservation_(new ReservationTracker) {
@@ -91,7 +90,6 @@ RuntimeState::RuntimeState(
     now_(new TimestampValue(TimestampValue::Parse(qctx.now_string))),
     utc_timestamp_(new TimestampValue(TimestampValue::Parse(qctx.utc_timestamp_string))),
     local_time_zone_(&TimezoneDatabase::GetUtcTimezone()),
-    exec_env_(exec_env),
     profile_(RuntimeProfile::Create(obj_pool(), "<unnamed>")) {
   // We may use execution resources while evaluating exprs, etc. Decremented in
   // ReleaseResources() to release resources.
@@ -111,7 +109,7 @@ void RuntimeState::Init() {
   SCOPED_TIMER(profile_->total_time_counter());
 
   // Register with the thread mgr
-  resource_pool_ = exec_env_->thread_mgr()->CreatePool();
+  resource_pool_ = ExecEnv::GetInstance()->thread_mgr()->CreatePool();
   DCHECK(resource_pool_ != nullptr);
   if (fragment_ctx_ != nullptr) {
     // Ensure that the planner correctly determined the required threads.
@@ -265,7 +263,7 @@ void RuntimeState::ReleaseResources() {
   DCHECK(!released_resources_);
   if (filter_bank_ != nullptr) filter_bank_->Close();
   if (resource_pool_ != nullptr) {
-    exec_env_->thread_mgr()->DestroyPool(move(resource_pool_));
+    ExecEnv::GetInstance()->thread_mgr()->DestroyPool(move(resource_pool_));
   }
   // Release any memory associated with codegen.
   if (codegen_ != nullptr) codegen_->Close();
@@ -290,26 +288,6 @@ const std::string& RuntimeState::GetEffectiveUser() const {
   return impala::GetEffectiveUser(query_ctx().session);
 }
 
-ImpalaBackendClientCache* RuntimeState::impalad_client_cache() {
-  return exec_env_->impalad_client_cache();
-}
-
-CatalogServiceClientCache* RuntimeState::catalogd_client_cache() {
-  return exec_env_->catalogd_client_cache();
-}
-
-io::DiskIoMgr* RuntimeState::io_mgr() {
-  return exec_env_->disk_io_mgr();
-}
-
-KrpcDataStreamMgr* RuntimeState::stream_mgr() {
-  return exec_env_->stream_mgr();
-}
-
-HBaseTableFactory* RuntimeState::htable_factory() {
-  return exec_env_->htable_factory();
-}
-
 ObjectPool* RuntimeState::obj_pool() const {
   DCHECK(query_state_ != nullptr);
   return query_state_->obj_pool();

http://git-wip-us.apache.org/repos/asf/impala/blob/fccaa723/be/src/runtime/runtime-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h
index 78a9864..6ca9574 100644
--- a/be/src/runtime/runtime-state.h
+++ b/be/src/runtime/runtime-state.h
@@ -106,12 +106,6 @@ class RuntimeState {
         ? instance_ctx_->fragment_instance_id
         : no_instance_id_;
   }
-  ExecEnv* exec_env() { return exec_env_; }
-  KrpcDataStreamMgr* stream_mgr();
-  HBaseTableFactory* htable_factory();
-  ImpalaBackendClientCache* impalad_client_cache();
-  CatalogServiceClientCache* catalogd_client_cache();
-  io::DiskIoMgr* io_mgr();
   MemTracker* instance_mem_tracker() { return instance_mem_tracker_.get(); }
   MemTracker* query_mem_tracker();  // reference to the query_state_'s memtracker
   ReservationTracker* instance_buffer_reservation() {
@@ -327,8 +321,6 @@ class RuntimeState {
   /// Owned by a static storage member of TimezoneDatabase class. It cannot be nullptr.
   const Timezone* local_time_zone_;
 
-  /// TODO: get rid of this and use ExecEnv::GetInstance() instead
-  ExecEnv* exec_env_;
   boost::scoped_ptr<LlvmCodeGen> codegen_;
 
   /// Contains all ScalarFnCall expressions which need to be codegen'd.


[2/7] impala git commit: IMPALA-7466: Improve readability of describe authorization tests

Posted by mi...@apache.org.
IMPALA-7466: Improve readability of describe authorization tests

This patch improves the readability and usability of the describe
authorization tests.  It removes the ambiguity of the function
parameters required for validating the describe output.

Testing:
- Refactored describe authorization tests
- Ran AuthorizationStmtTests

Change-Id: I6244b6dbafbd3827d488588a16e66dbf15d0e115
Reviewed-on: http://gerrit.cloudera.org:8080/11278
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/c0c3de20
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/c0c3de20
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/c0c3de20

Branch: refs/heads/master
Commit: c0c3de2057307883fb258f00d1aa6871b88906e0
Parents: fccaa72
Author: Adam Holley <gi...@holleyism.com>
Authored: Mon Aug 20 16:38:18 2018 -0500
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Wed Aug 22 09:06:58 2018 +0000

----------------------------------------------------------------------
 .../impala/analysis/AuthorizationStmtTest.java  | 183 ++++++++++++-------
 1 file changed, 118 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/c0c3de20/fe/src/test/java/org/apache/impala/analysis/AuthorizationStmtTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/analysis/AuthorizationStmtTest.java b/fe/src/test/java/org/apache/impala/analysis/AuthorizationStmtTest.java
index a750c6f..8da44c1 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AuthorizationStmtTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AuthorizationStmtTest.java
@@ -1210,22 +1210,26 @@ public class AuthorizationStmtTest extends FrontendTestBase {
     TDescribeOutputStyle style = TDescribeOutputStyle.MINIMAL;
     authzTest = authorize("describe functional.alltypes");
     for (TPrivilegeLevel privilege: viewMetadataPrivileges()) {
-      authzTest.okDescribe(tableName, style, ALLTYPES_COLUMNS, null, onServer(privilege))
-          .okDescribe(tableName, style, ALLTYPES_COLUMNS, null, onDatabase("functional",
-              privilege))
-          .okDescribe(tableName, style, ALLTYPES_COLUMNS, null, onTable("functional",
-              "alltypes", privilege));
-    }
-    authzTest.okDescribe(tableName, style, null, ALLTYPES_COLUMNS, onServer(
-        allExcept(viewMetadataPrivileges())))
-        .okDescribe(tableName, style, null, ALLTYPES_COLUMNS, onDatabase("functional",
-            allExcept(viewMetadataPrivileges())))
-        .okDescribe(tableName, style, null, ALLTYPES_COLUMNS, onTable("functional",
-            "alltypes", allExcept(viewMetadataPrivileges())))
+      authzTest
+          .okDescribe(tableName, describeOutput(style).includeStrings(ALLTYPES_COLUMNS),
+              onServer(privilege))
+          .okDescribe(tableName, describeOutput(style).includeStrings(ALLTYPES_COLUMNS),
+              onDatabase("functional", privilege))
+          .okDescribe(tableName, describeOutput(style).includeStrings(ALLTYPES_COLUMNS),
+              onTable("functional", "alltypes", privilege));
+    }
+    authzTest
+        .okDescribe(tableName, describeOutput(style).excludeStrings(ALLTYPES_COLUMNS),
+            onServer(allExcept(viewMetadataPrivileges())))
+        .okDescribe(tableName, describeOutput(style).excludeStrings(ALLTYPES_COLUMNS),
+            onDatabase("functional",allExcept(viewMetadataPrivileges())))
+        .okDescribe(tableName, describeOutput(style).excludeStrings(ALLTYPES_COLUMNS),
+            onTable("functional", "alltypes", allExcept(viewMetadataPrivileges())))
         // In this test, since we only have column level privileges on "id", then
         // only the "id" column should show and the others should not.
-        .okDescribe(tableName, style, new String[]{"id"}, ALLTYPES_COLUMNS_WITHOUT_ID,
-            onColumn("functional", "alltypes", "id", TPrivilegeLevel.SELECT))
+        .okDescribe(tableName, describeOutput(style).includeStrings(new String[]{"id"})
+            .excludeStrings(ALLTYPES_COLUMNS_WITHOUT_ID), onColumn("functional",
+            "alltypes", "id", TPrivilegeLevel.SELECT))
         .error(accessError("functional.alltypes"));
 
     // Describe table extended.
@@ -1235,24 +1239,27 @@ public class AuthorizationStmtTest extends FrontendTestBase {
         new String[]{"Location:"});
     authzTest = authorize("describe functional.alltypes");
     for (TPrivilegeLevel privilege: viewMetadataPrivileges()) {
-      authzTest.okDescribe(tableName, style, checkStrings, null, onServer(privilege))
-          .okDescribe(tableName, style, checkStrings, null, onDatabase("functional",
-              privilege))
-          .okDescribe(tableName, style, checkStrings, null, onTable("functional",
-              "alltypes", privilege));
+      authzTest
+          .okDescribe(tableName, describeOutput(style).includeStrings(checkStrings),
+              onServer(privilege))
+          .okDescribe(tableName, describeOutput(style).includeStrings(checkStrings),
+              onDatabase("functional", privilege))
+          .okDescribe(tableName, describeOutput(style).includeStrings(checkStrings),
+              onTable("functional", "alltypes", privilege));
     }
     // Describe table without VIEW_METADATA privilege should not show all columns and
     // location.
-    authzTest.okDescribe(tableName, style, null, ALLTYPES_COLUMNS,
+    authzTest
+        .okDescribe(tableName, describeOutput(style).excludeStrings(ALLTYPES_COLUMNS),
             onServer(allExcept(viewMetadataPrivileges())))
-        .okDescribe(tableName, style, null, ALLTYPES_COLUMNS,
+        .okDescribe(tableName, describeOutput(style).excludeStrings(ALLTYPES_COLUMNS),
             onDatabase("functional", allExcept(viewMetadataPrivileges())))
-        .okDescribe(tableName, style, null, ALLTYPES_COLUMNS,
+        .okDescribe(tableName, describeOutput(style).excludeStrings(ALLTYPES_COLUMNS),
             onTable("functional", "alltypes", allExcept(viewMetadataPrivileges())))
         // Location should not appear with only column level auth.
-        .okDescribe(tableName, style, new String[]{"id"},
-            (String[]) ArrayUtils.addAll(ALLTYPES_COLUMNS_WITHOUT_ID,
-            new String[]{"Location:"}), onColumn("functional", "alltypes", "id",
+        .okDescribe(tableName, describeOutput(style).includeStrings(new String[]{"id"})
+            .excludeStrings((String[]) ArrayUtils.addAll(ALLTYPES_COLUMNS_WITHOUT_ID,
+            new String[]{"Location:"})), onColumn("functional", "alltypes", "id",
             TPrivilegeLevel.SELECT))
         .error(accessError("functional.alltypes"));
 
@@ -1261,16 +1268,19 @@ public class AuthorizationStmtTest extends FrontendTestBase {
     style = TDescribeOutputStyle.MINIMAL;
     authzTest = authorize("describe functional.alltypes_view");
     for (TPrivilegeLevel privilege: viewMetadataPrivileges()) {
-      authzTest.okDescribe(tableName, style, ALLTYPES_COLUMNS, null, onServer(privilege))
-          .okDescribe(tableName, style, ALLTYPES_COLUMNS, null, onDatabase("functional",
-              privilege))
-          .okDescribe(tableName, style, ALLTYPES_COLUMNS, null, onTable("functional",
-              "alltypes_view", privilege));
-    }
-    authzTest.okDescribe(tableName, style, null, ALLTYPES_COLUMNS, onServer(
-        allExcept(viewMetadataPrivileges())))
-        .okDescribe(tableName, style, null, ALLTYPES_COLUMNS, onDatabase("functional",
-            allExcept(viewMetadataPrivileges())))
+      authzTest
+          .okDescribe(tableName, describeOutput(style).includeStrings(ALLTYPES_COLUMNS),
+              onServer(privilege))
+          .okDescribe(tableName, describeOutput(style).includeStrings(ALLTYPES_COLUMNS),
+              onDatabase("functional", privilege))
+          .okDescribe(tableName, describeOutput(style).includeStrings(ALLTYPES_COLUMNS),
+              onTable("functional", "alltypes_view", privilege));
+    }
+    authzTest
+        .okDescribe(tableName, describeOutput(style).excludeStrings(ALLTYPES_COLUMNS),
+            onServer(allExcept(viewMetadataPrivileges())))
+        .okDescribe(tableName, describeOutput(style).excludeStrings(ALLTYPES_COLUMNS),
+            onDatabase("functional",allExcept(viewMetadataPrivileges())))
         .error(accessError("functional.alltypes_view"));
 
     // Describe view extended.
@@ -1281,16 +1291,19 @@ public class AuthorizationStmtTest extends FrontendTestBase {
     checkStrings = (String[]) ArrayUtils.addAll(ALLTYPES_COLUMNS, viewStrings);
     authzTest = authorize("describe functional.alltypes_view");
     for (TPrivilegeLevel privilege: viewMetadataPrivileges()) {
-      authzTest.okDescribe(tableName, style, checkStrings, null, onServer(privilege))
-          .okDescribe(tableName, style, checkStrings, null, onDatabase("functional",
-              privilege))
-          .okDescribe(tableName, style, checkStrings, null, onTable("functional",
-              "alltypes_view", privilege));
-    }
-    authzTest.okDescribe(tableName, style, null, ALLTYPES_COLUMNS, onServer(
-        allExcept(viewMetadataPrivileges())))
-        .okDescribe(tableName, style, null, ALLTYPES_COLUMNS, onDatabase("functional",
-            allExcept(viewMetadataPrivileges())))
+      authzTest
+          .okDescribe(tableName, describeOutput(style).includeStrings(checkStrings),
+              onServer(privilege))
+          .okDescribe(tableName, describeOutput(style).includeStrings(checkStrings),
+              onDatabase("functional", privilege))
+          .okDescribe(tableName, describeOutput(style).includeStrings(checkStrings),
+              onTable("functional", "alltypes_view", privilege));
+    }
+    authzTest
+        .okDescribe(tableName, describeOutput(style).excludeStrings(ALLTYPES_COLUMNS),
+            onServer(allExcept(viewMetadataPrivileges())))
+        .okDescribe(tableName, describeOutput(style).excludeStrings(ALLTYPES_COLUMNS),
+            onDatabase("functional",allExcept(viewMetadataPrivileges())))
         .error(accessError("functional.alltypes_view"));
 
     // Describe specific column on a table.
@@ -2677,6 +2690,63 @@ public class AuthorizationStmtTest extends FrontendTestBase {
     public String getName() { return test_.role_; }
   }
 
+
+  private class DescribeOutput {
+    private String[] excludedStrings_ = new String[0];
+    private String[] includedStrings_ = new String[0];
+    private final TDescribeOutputStyle outputStyle_;
+
+    public DescribeOutput(TDescribeOutputStyle style) {
+      outputStyle_ = style;
+    }
+
+    /**
+     * Indicates which strings must not appear in the output of the describe statement.
+     * During validation, if one of these strings exists, an assertion is thrown.
+     *
+     * @param excluded - Array of strings that must not exist in the output.
+     * @return DescribeOutput instance.
+     */
+    public DescribeOutput excludeStrings(String[] excluded) {
+      excludedStrings_ = excluded;
+      return this;
+    }
+
+    /**
+     * Indicates which strings are required to appear in the output of the describe
+     * statement.  During validation, if any one of these strings does not exist, an
+     * assertion is thrown.
+     *
+     * @param included - Array of strings that must exist in the output.
+     * @return DescribeOutput instance.
+     */
+    public DescribeOutput includeStrings(String[] included) {
+      includedStrings_ = included;
+      return this;
+    }
+
+    public void validate(TTableName table) throws ImpalaException {
+      Preconditions.checkArgument(includedStrings_.length != 0 ||
+          excludedStrings_.length != 0,
+          "One or both of included or excluded strings must be defined.");
+      List<String> result = resultToStringList(authzFrontend_.describeTable(table,
+          outputStyle_, USER));
+      for (String str: includedStrings_) {
+        assertTrue(String.format("\"%s\" is not in the describe output.\n" +
+            "Expected : %s\n Actual   : %s", str, Arrays.toString(includedStrings_),
+            result), result.contains(str));
+      }
+      for (String str: excludedStrings_) {
+        assertTrue(String.format("\"%s\" should not be in the describe output.", str),
+            !result.contains(str));
+      }
+    }
+  }
+
+  private DescribeOutput describeOutput(TDescribeOutputStyle style) {
+    return new DescribeOutput(style);
+  }
+
   private class AuthzTest {
     private final AnalysisContext context_;
     private final String stmt_;
@@ -2756,9 +2826,8 @@ public class AuthorizationStmtTest extends FrontendTestBase {
      * into the new role/user. The new role/user will be dropped once this method
      * finishes.
      */
-    public AuthzTest okDescribe(TTableName table, TDescribeOutputStyle style,
-        String[] requiredStrings, String[] excludedStrings, TPrivilege[]... privileges)
-        throws ImpalaException {
+    public AuthzTest okDescribe(TTableName table, DescribeOutput output,
+        TPrivilege[]... privileges) throws ImpalaException {
       for (WithPrincipal withPrincipal: new WithPrincipal[]{
           new WithRole(this), new WithUser(this)}) {
         try {
@@ -2768,23 +2837,7 @@ public class AuthorizationStmtTest extends FrontendTestBase {
           } else {
             authzOk(stmt_, withPrincipal);
           }
-          List<String> result = resultToStringList(authzFrontend_.describeTable(table,
-              style, USER));
-          if (requiredStrings != null) {
-            for (String str : requiredStrings) {
-              assertTrue(String.format("\"%s\" is not in the describe output.\n" +
-                  "Expected : %s\n" +
-                  "Actual   : %s", str, Arrays.toString(requiredStrings), result),
-                  result.contains(str));
-            }
-          }
-          if (excludedStrings != null) {
-            for (String str : excludedStrings) {
-              assertTrue(String.format(
-                  "\"%s\" should not be in the describe output.", str),
-                  !result.contains(str));
-            }
-          }
+          output.validate(table);
         } finally {
           withPrincipal.drop();
         }


[7/7] impala git commit: IMPALA-7344: Restrict ALTER DATABASE/TABLE SET OWNER statements

Posted by mi...@apache.org.
IMPALA-7344: Restrict ALTER DATABASE/TABLE SET OWNER statements

Prior to this patch, any user with ALTER privilege could alter the
database/table ownership from one user/role to another user/role. This is
undesirable because altering an object ownership means giving a full
access to that object. This patch restricts the ALTER DATABASE/TABLE SET
OWNER statements to require ALL/OWNER with GRANT OPTION when authorization
is enabled.

Testing:
- Added FE authorization tests
- Ran all FE tests
- Ran core tests

Change-Id: I2485933c02b5384950b7c882ba1eb0fd703db5a3
Reviewed-on: http://gerrit.cloudera.org:8080/11279
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/649f175d
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/649f175d
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/649f175d

Branch: refs/heads/master
Commit: 649f175df08cf4708fe48ab5d4364d1634c16d22
Parents: 632e0e2
Author: Fredy Wijaya <fw...@cloudera.com>
Authored: Mon Aug 20 16:02:28 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Wed Aug 22 21:27:01 2018 +0000

----------------------------------------------------------------------
 bin/impala-config.sh                            |   2 +-
 .../impala/analysis/AlterDbSetOwnerStmt.java    |   5 +-
 .../analysis/AlterTableOrViewSetOwnerStmt.java  |   4 +-
 .../org/apache/impala/analysis/Analyzer.java    |  67 +++++++--
 .../apache/impala/analysis/BaseTableRef.java    |   3 +-
 .../impala/analysis/CollectionTableRef.java     |   3 +-
 .../apache/impala/analysis/InlineViewRef.java   |   4 +-
 .../org/apache/impala/analysis/TableRef.java    |  15 +-
 .../authorization/AuthorizationChecker.java     |  31 +++--
 .../impala/authorization/PrivilegeRequest.java  |  31 +++--
 .../authorization/PrivilegeRequestBuilder.java  |  11 +-
 .../impala/catalog/PrincipalPrivilege.java      |   1 +
 .../apache/impala/util/SentryPolicyService.java |   2 +-
 .../impala/analysis/AnalyzeStmtsTest.java       |   2 +-
 .../impala/analysis/AuthorizationStmtTest.java  | 139 ++++++++++++++-----
 15 files changed, 236 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/649f175d/bin/impala-config.sh
----------------------------------------------------------------------
diff --git a/bin/impala-config.sh b/bin/impala-config.sh
index 656861d..facb7f4 100755
--- a/bin/impala-config.sh
+++ b/bin/impala-config.sh
@@ -162,7 +162,7 @@ unset IMPALA_KUDU_URL
 : ${CDH_DOWNLOAD_HOST:=native-toolchain.s3.amazonaws.com}
 export CDH_DOWNLOAD_HOST
 export CDH_MAJOR_VERSION=6
-export CDH_BUILD_NUMBER=533940
+export CDH_BUILD_NUMBER=537982
 export IMPALA_HADOOP_VERSION=3.0.0-cdh6.x-SNAPSHOT
 export IMPALA_HBASE_VERSION=2.0.0-cdh6.x-SNAPSHOT
 export IMPALA_HIVE_VERSION=2.1.1-cdh6.x-SNAPSHOT

http://git-wip-us.apache.org/repos/asf/impala/blob/649f175d/fe/src/main/java/org/apache/impala/analysis/AlterDbSetOwnerStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterDbSetOwnerStmt.java b/fe/src/main/java/org/apache/impala/analysis/AlterDbSetOwnerStmt.java
index 379a806..87c0edd 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AlterDbSetOwnerStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AlterDbSetOwnerStmt.java
@@ -18,6 +18,7 @@
 package org.apache.impala.analysis;
 
 import com.google.common.base.Preconditions;
+import org.apache.impala.authorization.Privilege;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.thrift.TAlterDbParams;
 import org.apache.impala.thrift.TAlterDbSetOwnerParams;
@@ -38,7 +39,9 @@ public class AlterDbSetOwnerStmt extends AlterDbStmt {
 
   @Override
   public void analyze(Analyzer analyzer) throws AnalysisException {
-    super.analyze(analyzer);
+    // Require ALL with GRANT OPTION privilege.
+    analyzer.getDb(dbName_, Privilege.ALL, /* throw if does not exist */ true,
+        /* grant option */ true);
     String ownerName = owner_.getOwnerName();
     if (ownerName.length() > MetaStoreUtil.MAX_OWNER_LENGTH) {
       throw new AnalysisException(String.format("Owner name exceeds maximum length of " +

http://git-wip-us.apache.org/repos/asf/impala/blob/649f175d/fe/src/main/java/org/apache/impala/analysis/AlterTableOrViewSetOwnerStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterTableOrViewSetOwnerStmt.java b/fe/src/main/java/org/apache/impala/analysis/AlterTableOrViewSetOwnerStmt.java
index c508413..25470de 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AlterTableOrViewSetOwnerStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AlterTableOrViewSetOwnerStmt.java
@@ -46,7 +46,9 @@ public abstract class AlterTableOrViewSetOwnerStmt extends AlterTableStmt {
           MetaStoreUtil.MAX_OWNER_LENGTH, ownerName.length()));
     }
     tableName_ = analyzer.getFqTableName(tableName_);
-    TableRef tableRef = new TableRef(tableName_.toPath(), null, Privilege.ALTER);
+    // Require ALL with GRANT OPTION privilege.
+    TableRef tableRef = new TableRef(tableName_.toPath(), null, Privilege.ALL,
+        /* grant option */ true);
     tableRef = analyzer.resolveTableRef(tableRef);
     Preconditions.checkNotNull(tableRef);
     tableRef.analyze(analyzer);

http://git-wip-us.apache.org/repos/asf/impala/blob/649f175d/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
index 6468513..94375a4 100644
--- a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
+++ b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
@@ -569,13 +569,21 @@ public class Analyzer {
       // an analysis error. We should not accidentally reveal the non-existence of a
       // table/database if the user is not authorized.
       if (rawPath.size() > 1) {
-        registerPrivReq(new PrivilegeRequestBuilder()
+        PrivilegeRequestBuilder builder = new PrivilegeRequestBuilder()
             .onTable(rawPath.get(0), rawPath.get(1))
-            .allOf(tableRef.getPrivilege()).toRequest());
+            .allOf(tableRef.getPrivilege());
+        if (tableRef.requireGrantOption()) {
+          builder.grantOption();
+        }
+        registerPrivReq(builder.toRequest());
       }
-      registerPrivReq(new PrivilegeRequestBuilder()
+      PrivilegeRequestBuilder builder = new PrivilegeRequestBuilder()
           .onTable(getDefaultDb(), rawPath.get(0))
-          .allOf(tableRef.getPrivilege()).toRequest());
+          .allOf(tableRef.getPrivilege());
+      if (tableRef.requireGrantOption()) {
+        builder.grantOption();
+      }
+      registerPrivReq(builder.toRequest());
       throw e;
     } catch (TableLoadingException e) {
       throw new AnalysisException(String.format(
@@ -2448,21 +2456,39 @@ public class Analyzer {
   }
 
   /**
-   * Returns the Catalog Db object for the given database at the given
-   * Privilege level. The privilege request is tracked in the analyzer
-   * and authorized post-analysis.
-   *
-   * Registers a new access event if the catalog lookup was successful.
-   *
    * If the database does not exist in the catalog an AnalysisError is thrown.
+   * This method does not require the grant option permission.
    */
   public FeDb getDb(String dbName, Privilege privilege) throws AnalysisException {
     return getDb(dbName, privilege, true);
   }
 
+  /**
+   * This method does not require the grant option permission.
+   */
   public FeDb getDb(String dbName, Privilege privilege, boolean throwIfDoesNotExist)
       throws AnalysisException {
+    return getDb(dbName, privilege, throwIfDoesNotExist, false);
+  }
+
+  /**
+   * Returns the Catalog Db object for the given database at the given
+   * Privilege level. The privilege request is tracked in the analyzer
+   * and authorized post-analysis.
+   *
+   * Registers a new access event if the catalog lookup was successful.
+   *
+   * If throwIfDoesNotExist is set to true and the database does not exist in the catalog
+   * an AnalysisError is thrown.
+   * If requireGrantOption is set to true, the grant option permission is required for
+   * the specified privilege.
+   */
+  public FeDb getDb(String dbName, Privilege privilege, boolean throwIfDoesNotExist,
+      boolean requireGrantOption) throws AnalysisException {
     PrivilegeRequestBuilder pb = new PrivilegeRequestBuilder();
+    if (requireGrantOption) {
+      pb.grantOption();
+    }
     if (privilege == Privilege.ANY) {
       registerPrivReq(
           pb.any().onAnyColumn(dbName, AuthorizeableTable.ANY_TABLE_NAME).toRequest());
@@ -2593,11 +2619,20 @@ public class Analyzer {
   }
 
   /**
+   * This method does not require the grant option permission.
+   */
+  public void registerAuthAndAuditEvent(FeTable table, Privilege priv) {
+    registerAuthAndAuditEvent(table, priv, false);
+  }
+
+  /**
    * Registers a table-level privilege request and an access event for auditing
    * for the given table and privilege. The table must be a base table or a
-   * catalog view (not a local view).
+   * catalog view (not a local view). If requireGrantOption is set to true, the
+   * the grant option permission is required for the specified privilege.
    */
-  public void registerAuthAndAuditEvent(FeTable table, Privilege priv) {
+  public void registerAuthAndAuditEvent(FeTable table, Privilege priv,
+      boolean requireGrantOption) {
     // Add access event for auditing.
     if (table instanceof FeView) {
       FeView view = (FeView) table;
@@ -2612,8 +2647,12 @@ public class Analyzer {
     }
     // Add privilege request.
     TableName tableName = table.getTableName();
-    registerPrivReq(new PrivilegeRequestBuilder()
+    PrivilegeRequestBuilder builder = new PrivilegeRequestBuilder()
         .onTable(tableName.getDb(), tableName.getTbl())
-        .allOf(priv).toRequest());
+        .allOf(priv);
+    if (requireGrantOption) {
+      builder.grantOption();
+    }
+    registerPrivReq(builder.toRequest());
   }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/649f175d/fe/src/main/java/org/apache/impala/analysis/BaseTableRef.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/BaseTableRef.java b/fe/src/main/java/org/apache/impala/analysis/BaseTableRef.java
index 5b80712..f6ecdb8 100644
--- a/fe/src/main/java/org/apache/impala/analysis/BaseTableRef.java
+++ b/fe/src/main/java/org/apache/impala/analysis/BaseTableRef.java
@@ -60,7 +60,8 @@ public class BaseTableRef extends TableRef {
   @Override
   public void analyze(Analyzer analyzer) throws AnalysisException {
     if (isAnalyzed_) return;
-    analyzer.registerAuthAndAuditEvent(resolvedPath_.getRootTable(), priv_);
+    analyzer.registerAuthAndAuditEvent(resolvedPath_.getRootTable(), priv_,
+        requireGrantOption_);
     desc_ = analyzer.registerTableRef(this);
     isAnalyzed_ = true;
     analyzeTableSample(analyzer);

http://git-wip-us.apache.org/repos/asf/impala/blob/649f175d/fe/src/main/java/org/apache/impala/analysis/CollectionTableRef.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/CollectionTableRef.java b/fe/src/main/java/org/apache/impala/analysis/CollectionTableRef.java
index 92015f5..a6f66ab 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CollectionTableRef.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CollectionTableRef.java
@@ -103,7 +103,8 @@ public class CollectionTableRef extends TableRef {
       // Register a table-level privilege request as well as a column-level privilege request
       // for the collection-typed column.
       Preconditions.checkNotNull(resolvedPath_.getRootTable());
-      analyzer.registerAuthAndAuditEvent(resolvedPath_.getRootTable(), priv_);
+      analyzer.registerAuthAndAuditEvent(resolvedPath_.getRootTable(), priv_,
+          requireGrantOption_);
       analyzer.registerPrivReq(new PrivilegeRequestBuilder().
           allOf(Privilege.SELECT).onColumn(desc_.getTableName().getDb(),
           desc_.getTableName().getTbl(), desc_.getPath().getRawPath().get(0))

http://git-wip-us.apache.org/repos/asf/impala/blob/649f175d/fe/src/main/java/org/apache/impala/analysis/InlineViewRef.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/InlineViewRef.java b/fe/src/main/java/org/apache/impala/analysis/InlineViewRef.java
index dbcb59a..494b4af 100644
--- a/fe/src/main/java/org/apache/impala/analysis/InlineViewRef.java
+++ b/fe/src/main/java/org/apache/impala/analysis/InlineViewRef.java
@@ -95,7 +95,7 @@ public class InlineViewRef extends TableRef {
    */
   public InlineViewRef(FeView view, TableRef origTblRef) {
     super(view.getTableName().toPath(), origTblRef.getExplicitAlias(),
-        origTblRef.getPrivilege());
+        origTblRef.getPrivilege(), origTblRef.requireGrantOption());
     queryStmt_ = view.getQueryStmt().clone();
     queryStmt_.reset();
     if (view.isLocalView()) queryStmt_.reset();
@@ -145,7 +145,7 @@ public class InlineViewRef extends TableRef {
     // Catalog views refs require special analysis settings for authorization.
     boolean isCatalogView = (view_ != null && !view_.isLocalView());
     if (isCatalogView) {
-      analyzer.registerAuthAndAuditEvent(view_, priv_);
+      analyzer.registerAuthAndAuditEvent(view_, priv_, requireGrantOption_);
       if (inlineViewAnalyzer_.isExplain()) {
         // If the user does not have privileges on the view's definition
         // then we report a masked authorization error so as not to reveal

http://git-wip-us.apache.org/repos/asf/impala/blob/649f175d/fe/src/main/java/org/apache/impala/analysis/TableRef.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/TableRef.java b/fe/src/main/java/org/apache/impala/analysis/TableRef.java
index 8ccc9ab..0cd65ae 100644
--- a/fe/src/main/java/org/apache/impala/analysis/TableRef.java
+++ b/fe/src/main/java/org/apache/impala/analysis/TableRef.java
@@ -79,6 +79,7 @@ public class TableRef implements ParseNode {
 
   // Analysis registers privilege and/or audit requests based on this privilege.
   protected final Privilege priv_;
+  protected final boolean requireGrantOption_;
 
   // Optional TABLESAMPLE clause. Null if not specified.
   protected TableSampleClause sampleParams_;
@@ -135,15 +136,20 @@ public class TableRef implements ParseNode {
   }
 
   public TableRef(List<String> path, String alias, TableSampleClause tableSample) {
-    this(path, alias, tableSample, Privilege.SELECT);
+    this(path, alias, tableSample, Privilege.SELECT, false);
   }
 
   public TableRef(List<String> path, String alias, Privilege priv) {
-    this(path, alias, null, priv);
+    this(path, alias, null, priv, false);
+  }
+
+  public TableRef(List<String> path, String alias, Privilege priv,
+      boolean requireGrantOption) {
+    this(path, alias, null, priv, requireGrantOption);
   }
 
   public TableRef(List<String> path, String alias, TableSampleClause sampleParams,
-      Privilege priv) {
+      Privilege priv, boolean requireGrantOption) {
     rawPath_ = path;
     if (alias != null) {
       aliases_ = new String[] { alias.toLowerCase() };
@@ -153,6 +159,7 @@ public class TableRef implements ParseNode {
     }
     sampleParams_ = sampleParams;
     priv_ = priv;
+    requireGrantOption_ = requireGrantOption;
     isAnalyzed_ = false;
     replicaPreference_ = null;
     randomReplica_ = false;
@@ -168,6 +175,7 @@ public class TableRef implements ParseNode {
     hasExplicitAlias_ = other.hasExplicitAlias_;
     sampleParams_ = other.sampleParams_;
     priv_ = other.priv_;
+    requireGrantOption_ = other.requireGrantOption_;
     joinOp_ = other.joinOp_;
     joinHints_ = Lists.newArrayList(other.joinHints_);
     onClause_ = (other.onClause_ != null) ? other.onClause_.clone() : null;
@@ -274,6 +282,7 @@ public class TableRef implements ParseNode {
   }
   public TableSampleClause getSampleParams() { return sampleParams_; }
   public Privilege getPrivilege() { return priv_; }
+  public boolean requireGrantOption() { return requireGrantOption_; }
   public List<PlanHint> getJoinHints() { return joinHints_; }
   public List<PlanHint> getTableHints() { return tableHints_; }
   public Expr getOnClause() { return onClause_; }

http://git-wip-us.apache.org/repos/asf/impala/blob/649f175d/fe/src/main/java/org/apache/impala/authorization/AuthorizationChecker.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/authorization/AuthorizationChecker.java b/fe/src/main/java/org/apache/impala/authorization/AuthorizationChecker.java
index 82dcbc8..326ffa4 100644
--- a/fe/src/main/java/org/apache/impala/authorization/AuthorizationChecker.java
+++ b/fe/src/main/java/org/apache/impala/authorization/AuthorizationChecker.java
@@ -19,7 +19,6 @@ package org.apache.impala.authorization;
 
 import java.util.Collections;
 import java.util.EnumSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 
@@ -104,20 +103,22 @@ public class AuthorizationChecker {
       Privilege privilege = privilegeRequest.getPrivilege();
       if (privilegeRequest.getAuthorizeable() instanceof AuthorizeableFn) {
         throw new AuthorizationException(String.format(
-            "User '%s' does not have privileges to %s functions in: %s",
-            user.getName(), privilege, privilegeRequest.getName()));
+            "User '%s' does not have privileges%s to %s functions in: %s",
+            user.getName(), grantOption(privilegeRequest.hasGrantOption()), privilege,
+            privilegeRequest.getName()));
       }
 
       if (EnumSet.of(Privilege.ANY, Privilege.ALL, Privilege.VIEW_METADATA)
           .contains(privilege)) {
         throw new AuthorizationException(String.format(
-            "User '%s' does not have privileges to access: %s",
-            user.getName(), privilegeRequest.getName()));
+            "User '%s' does not have privileges%s to access: %s",
+            user.getName(), grantOption(privilegeRequest.hasGrantOption()),
+            privilegeRequest.getName()));
       } else if (privilege == Privilege.REFRESH) {
         throw new AuthorizationException(String.format(
-            "User '%s' does not have privileges to execute " +
+            "User '%s' does not have privileges%s to execute " +
             "'INVALIDATE METADATA/REFRESH' on: %s", user.getName(),
-            privilegeRequest.getName()));
+            grantOption(privilegeRequest.hasGrantOption()), privilegeRequest.getName()));
       } else if (privilege == Privilege.CREATE &&
           privilegeRequest.getAuthorizeable() instanceof AuthorizeableTable) {
         // Creating a table requires CREATE on a database and we shouldn't
@@ -125,16 +126,22 @@ public class AuthorizationChecker {
         AuthorizeableTable authorizeableTable =
             (AuthorizeableTable) privilegeRequest.getAuthorizeable();
           throw new AuthorizationException(String.format(
-              "User '%s' does not have privileges to execute '%s' on: %s",
-              user.getName(), privilege, authorizeableTable.getDbName()));
+              "User '%s' does not have privileges%s to execute '%s' on: %s",
+              user.getName(), grantOption(privilegeRequest.hasGrantOption()), privilege,
+              authorizeableTable.getDbName()));
       } else {
         throw new AuthorizationException(String.format(
-            "User '%s' does not have privileges to execute '%s' on: %s",
-            user.getName(), privilege, privilegeRequest.getName()));
+            "User '%s' does not have privileges%s to execute '%s' on: %s",
+            user.getName(), grantOption(privilegeRequest.hasGrantOption()), privilege,
+            privilegeRequest.getName()));
       }
     }
   }
 
+  private static String grantOption(boolean hasGrantOption) {
+    return hasGrantOption ? " with 'GRANT OPTION'" : "";
+  }
+
   /*
    * Returns true if the given user has permission to execute the given
    * request, false otherwise. Always returns true if authorization is disabled.
@@ -180,6 +187,6 @@ public class AuthorizationChecker {
       authorizeables.remove(authorizeables.size() - 1);
     }
     return provider_.hasAccess(new Subject(user.getShortName()), authorizeables, actions,
-        ActiveRoleSet.ALL);
+        request.hasGrantOption(), ActiveRoleSet.ALL);
   }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/649f175d/fe/src/main/java/org/apache/impala/authorization/PrivilegeRequest.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/authorization/PrivilegeRequest.java b/fe/src/main/java/org/apache/impala/authorization/PrivilegeRequest.java
index d206908..e6335d3 100644
--- a/fe/src/main/java/org/apache/impala/authorization/PrivilegeRequest.java
+++ b/fe/src/main/java/org/apache/impala/authorization/PrivilegeRequest.java
@@ -19,6 +19,8 @@ package org.apache.impala.authorization;
 
 import com.google.common.base.Preconditions;
 
+import java.util.Objects;
+
 /**
  * Represents a privilege request in the context of an Authorizeable object. If no
  * Authorizeable object is provided, it represents a privilege request on the server.
@@ -27,18 +29,26 @@ import com.google.common.base.Preconditions;
 public class PrivilegeRequest {
   private final Authorizeable authorizeable_;
   private final Privilege privilege_;
+  private final boolean grantOption_;
 
   public PrivilegeRequest(Authorizeable authorizeable, Privilege privilege) {
+    this(authorizeable, privilege, false);
+  }
+
+  public PrivilegeRequest(Authorizeable authorizeable, Privilege privilege,
+      boolean grantOption) {
     Preconditions.checkNotNull(authorizeable);
     Preconditions.checkNotNull(privilege);
     authorizeable_ = authorizeable;
     privilege_ = privilege;
+    grantOption_ = grantOption;
   }
 
   public PrivilegeRequest(Privilege privilege) {
     Preconditions.checkNotNull(privilege);
     authorizeable_ = null;
     privilege_ = privilege;
+    grantOption_ = false;
   }
 
   /*
@@ -53,25 +63,28 @@ public class PrivilegeRequest {
    */
   public Privilege getPrivilege() { return privilege_; }
 
-
   /*
    * Returns Authorizeable object. Null if the request is for server-level permission.
    */
   public Authorizeable getAuthorizeable() { return authorizeable_; }
 
+  /**
+   * Returns whether the grant option is required or not.
+   */
+  public boolean hasGrantOption() { return grantOption_; }
+
   @Override
   public int hashCode() {
-    return (authorizeable_ == null ? 0 : authorizeable_.hashCode()) * 37 +
-        privilege_.hashCode();
+    return Objects.hash(authorizeable_, privilege_, grantOption_);
   }
 
   @Override
   public boolean equals(Object o) {
-    if (!(o instanceof PrivilegeRequest)) return false;
-    if (authorizeable_ == null) {
-      return ((PrivilegeRequest) o).getPrivilege().equals(privilege_);
-    }
-    return ((PrivilegeRequest) o).getAuthorizeable().equals(authorizeable_) &&
-        ((PrivilegeRequest) o).getPrivilege().equals(privilege_);
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    PrivilegeRequest that = (PrivilegeRequest) o;
+    return grantOption_ == that.grantOption_ &&
+        Objects.equals(authorizeable_, that.authorizeable_) &&
+        privilege_ == that.privilege_;
   }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/649f175d/fe/src/main/java/org/apache/impala/authorization/PrivilegeRequestBuilder.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/authorization/PrivilegeRequestBuilder.java b/fe/src/main/java/org/apache/impala/authorization/PrivilegeRequestBuilder.java
index efe043d..6aa71e7 100644
--- a/fe/src/main/java/org/apache/impala/authorization/PrivilegeRequestBuilder.java
+++ b/fe/src/main/java/org/apache/impala/authorization/PrivilegeRequestBuilder.java
@@ -35,6 +35,7 @@ import com.google.common.base.Preconditions;
 public class PrivilegeRequestBuilder {
   Authorizeable authorizeable_;
   Privilege privilege_;
+  boolean grantOption_ = false;
 
   /**
    * Sets the authorizeable object to be a column.
@@ -108,12 +109,20 @@ public class PrivilegeRequestBuilder {
   }
 
   /**
+   * Specifies that grant option is required.
+   */
+  public PrivilegeRequestBuilder grantOption() {
+    grantOption_ = true;
+    return this;
+  }
+
+  /**
    * Builds a PrivilegeRequest object based on the current Authorizeable object
    * and privilege settings.
    */
   public PrivilegeRequest toRequest() {
     Preconditions.checkNotNull(authorizeable_);
     Preconditions.checkNotNull(privilege_);
-    return new PrivilegeRequest(authorizeable_, privilege_);
+    return new PrivilegeRequest(authorizeable_, privilege_, grantOption_);
   }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/649f175d/fe/src/main/java/org/apache/impala/catalog/PrincipalPrivilege.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/PrincipalPrivilege.java b/fe/src/main/java/org/apache/impala/catalog/PrincipalPrivilege.java
index 033b9c9..bb7500b 100644
--- a/fe/src/main/java/org/apache/impala/catalog/PrincipalPrivilege.java
+++ b/fe/src/main/java/org/apache/impala/catalog/PrincipalPrivilege.java
@@ -116,6 +116,7 @@ public class PrincipalPrivilege extends CatalogObjectImpl {
         authorizable.add(KV_JOINER.join("action",
             privilege.getPrivilege_level().toString()));
       }
+      authorizable.add(KV_JOINER.join("grantoption", privilege.isHas_grant_opt()));
       return AUTHORIZABLE_JOINER.join(authorizable);
     } catch (Exception e) {
       // Should never make it here unless the privilege is malformed.

http://git-wip-us.apache.org/repos/asf/impala/blob/649f175d/fe/src/main/java/org/apache/impala/util/SentryPolicyService.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/util/SentryPolicyService.java b/fe/src/main/java/org/apache/impala/util/SentryPolicyService.java
index 5d3ee33..3a5c11c 100644
--- a/fe/src/main/java/org/apache/impala/util/SentryPolicyService.java
+++ b/fe/src/main/java/org/apache/impala/util/SentryPolicyService.java
@@ -493,7 +493,6 @@ public class SentryPolicyService {
       privilege.setPrivilege_level(Enum.valueOf(TPrivilegeLevel.class,
           sentryPriv.getAction().toUpperCase()));
     }
-    privilege.setPrivilege_name(PrincipalPrivilege.buildPrivilegeName(privilege));
     privilege.setCreate_time_ms(sentryPriv.getCreateTime());
     if (sentryPriv.isSetGrantOption() &&
         sentryPriv.getGrantOption() == TSentryGrantOption.TRUE) {
@@ -501,6 +500,7 @@ public class SentryPolicyService {
     } else {
       privilege.setHas_grant_opt(false);
     }
+    privilege.setPrivilege_name(PrincipalPrivilege.buildPrivilegeName(privilege));
     privilege.setPrincipal_id(principal.getId());
     privilege.setPrincipal_type(principal.getPrincipalType());
     return privilege;

http://git-wip-us.apache.org/repos/asf/impala/blob/649f175d/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
index 5249826..520b88f 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
@@ -3782,7 +3782,7 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
     testNumberOfMembers(ValuesStmt.class, 0);
 
     // Also check TableRefs.
-    testNumberOfMembers(TableRef.class, 20);
+    testNumberOfMembers(TableRef.class, 21);
     testNumberOfMembers(BaseTableRef.class, 0);
     testNumberOfMembers(InlineViewRef.class, 8);
   }

http://git-wip-us.apache.org/repos/asf/impala/blob/649f175d/fe/src/test/java/org/apache/impala/analysis/AuthorizationStmtTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/analysis/AuthorizationStmtTest.java b/fe/src/test/java/org/apache/impala/analysis/AuthorizationStmtTest.java
index 8da44c1..1edb066 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AuthorizationStmtTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AuthorizationStmtTest.java
@@ -1958,15 +1958,15 @@ public class AuthorizationStmtTest extends FrontendTestBase {
             "delimited fields terminated by ' '"),
         authorize("alter table functional.alltypes add partition(year=1, month=1)"),
         authorize("alter table functional.alltypes drop partition(" +
-            "year=2009, month=1)"),
-        authorize("alter table functional.alltypes set owner user foo_owner"),
-        authorize("alter table functional.alltypes set owner role foo_owner")}) {
+            "year=2009, month=1)")}) {
       test.ok(onServer(TPrivilegeLevel.ALL))
           .ok(onServer(TPrivilegeLevel.OWNER))
           .ok(onServer(TPrivilegeLevel.ALTER))
           .ok(onDatabase("functional", TPrivilegeLevel.ALL))
           .ok(onDatabase("functional", TPrivilegeLevel.OWNER))
           .ok(onDatabase("functional", TPrivilegeLevel.ALTER))
+          .ok(onTable("functional", "alltypes", TPrivilegeLevel.ALL))
+          .ok(onTable("functional", "alltypes", TPrivilegeLevel.OWNER))
           .ok(onTable("functional", "alltypes", TPrivilegeLevel.ALTER))
           .error(alterError("functional.alltypes"))
           .error(alterError("functional.alltypes"), onServer(allExcept(
@@ -1978,6 +1978,32 @@ public class AuthorizationStmtTest extends FrontendTestBase {
               TPrivilegeLevel.ALTER)));
     }
 
+    // Alter table set owner.
+    for (AuthzTest test: new AuthzTest[]{
+        authorize("alter table functional.alltypes set owner user foo_owner"),
+        authorize("alter table functional.alltypes set owner role foo_owner")}) {
+      test.ok(onServer(true, TPrivilegeLevel.ALL))
+          .ok(onServer(true, TPrivilegeLevel.OWNER))
+          .ok(onDatabase(true, "functional", TPrivilegeLevel.ALL))
+          .ok(onDatabase(true, "functional", TPrivilegeLevel.OWNER))
+          .ok(onTable(true, "functional", "alltypes", TPrivilegeLevel.ALL))
+          .ok(onTable(true, "functional", "alltypes", TPrivilegeLevel.OWNER))
+          .error(accessError(true, "functional.alltypes"), onServer(
+              TPrivilegeLevel.values()))
+          .error(accessError(true, "functional.alltypes"), onDatabase("functional",
+              TPrivilegeLevel.values()))
+          .error(accessError(true, "functional.alltypes"), onTable("functional",
+              "alltypes", TPrivilegeLevel.values()))
+          .error(accessError(true, "functional.alltypes"))
+          .error(accessError(true, "functional.alltypes"), onServer(true, allExcept(
+              TPrivilegeLevel.ALL, TPrivilegeLevel.OWNER)))
+          .error(accessError(true, "functional.alltypes"), onDatabase(true, "functional",
+              allExcept(TPrivilegeLevel.ALL, TPrivilegeLevel.OWNER)))
+          .error(accessError(true, "functional.alltypes"),
+              onTable(true, "functional", "alltypes", allExcept(
+              TPrivilegeLevel.ALL, TPrivilegeLevel.OWNER)));
+    }
+
     // Alter table rename.
     authorize("alter table functional.alltypes rename to functional.new_table")
         .ok(onServer(TPrivilegeLevel.ALL))
@@ -2178,24 +2204,25 @@ public class AuthorizationStmtTest extends FrontendTestBase {
     for (AuthzTest test: new AuthzTest[]{
         authorize("alter view functional.alltypes_view set owner user foo_owner"),
         authorize("alter view functional.alltypes_view set owner role foo_owner")}) {
-      test.ok(onServer(TPrivilegeLevel.ALL))
-          .ok(onServer(TPrivilegeLevel.OWNER))
-          .ok(onServer(TPrivilegeLevel.ALTER))
-          .ok(onDatabase("functional", TPrivilegeLevel.ALL))
-          .ok(onDatabase("functional", TPrivilegeLevel.OWNER))
-          .ok(onDatabase("functional", TPrivilegeLevel.ALTER))
-          .ok(onTable("functional", "alltypes_view", TPrivilegeLevel.ALL))
-          .ok(onTable("functional", "alltypes_view", TPrivilegeLevel.OWNER))
-          .ok(onTable("functional", "alltypes_view", TPrivilegeLevel.ALTER))
-          .error(alterError("functional.alltypes_view"))
-          .error(alterError("functional.alltypes_view"), onServer(allExcept(
-              TPrivilegeLevel.ALL, TPrivilegeLevel.OWNER, TPrivilegeLevel.ALTER)))
-          .error(alterError("functional.alltypes_view"), onDatabase("functional",
-              allExcept(TPrivilegeLevel.ALL, TPrivilegeLevel.OWNER,
-              TPrivilegeLevel.ALTER)))
-          .error(alterError("functional.alltypes_view"), onTable("functional",
-              "alltypes_view", allExcept(TPrivilegeLevel.ALL, TPrivilegeLevel.OWNER,
-              TPrivilegeLevel.ALTER)));
+      test.ok(onServer(true, TPrivilegeLevel.ALL))
+          .ok(onServer(true, TPrivilegeLevel.OWNER))
+          .ok(onDatabase(true, "functional", TPrivilegeLevel.ALL))
+          .ok(onDatabase(true, "functional", TPrivilegeLevel.OWNER))
+          .ok(onTable(true, "functional", "alltypes_view", TPrivilegeLevel.ALL))
+          .ok(onTable(true, "functional", "alltypes_view", TPrivilegeLevel.OWNER))
+          .error(accessError(true, "functional.alltypes_view"), onServer(
+              TPrivilegeLevel.values()))
+          .error(accessError(true, "functional.alltypes_view"), onDatabase("functional",
+              TPrivilegeLevel.values()))
+          .error(accessError(true, "functional.alltypes_view"), onTable("functional",
+              "alltypes_view", TPrivilegeLevel.values()))
+          .error(accessError(true, "functional.alltypes_view"))
+          .error(accessError(true, "functional.alltypes_view"), onServer(allExcept(
+              TPrivilegeLevel.ALL, TPrivilegeLevel.OWNER)))
+          .error(accessError(true, "functional.alltypes_view"), onDatabase("functional",
+              allExcept(TPrivilegeLevel.ALL, TPrivilegeLevel.OWNER)))
+          .error(accessError(true, "functional.alltypes_view"), onTable("functional",
+              "alltypes_view", allExcept(TPrivilegeLevel.ALL, TPrivilegeLevel.OWNER)));
     }
 
     // Database does not exist.
@@ -2215,25 +2242,27 @@ public class AuthorizationStmtTest extends FrontendTestBase {
 
   @Test
   public void testAlterDatabase() throws ImpalaException {
+    // Alter database set owner.
     for (String ownerType: new String[]{"user", "role"}) {
       authorize(String.format("alter database functional set owner %s foo", ownerType))
-          .ok(onServer(TPrivilegeLevel.ALL))
-          .ok(onServer(TPrivilegeLevel.OWNER))
-          .ok(onServer(TPrivilegeLevel.ALTER))
-          .ok(onDatabase("functional", TPrivilegeLevel.ALL))
-          .ok(onDatabase("functional", TPrivilegeLevel.OWNER))
-          .ok(onDatabase("functional", TPrivilegeLevel.ALTER))
-          .error(alterError("functional"))
-          .error(alterError("functional"), onServer(allExcept(
-              TPrivilegeLevel.ALL, TPrivilegeLevel.OWNER, TPrivilegeLevel.ALTER)))
-          .error(alterError("functional"), onDatabase("functional", allExcept(
-              TPrivilegeLevel.ALL, TPrivilegeLevel.OWNER, TPrivilegeLevel.ALTER)));
+          .ok(onServer(true, TPrivilegeLevel.ALL))
+          .ok(onServer(true, TPrivilegeLevel.OWNER))
+          .ok(onDatabase(true, "functional", TPrivilegeLevel.ALL))
+          .ok(onDatabase(true, "functional", TPrivilegeLevel.OWNER))
+          .error(accessError(true, "functional"), onServer(TPrivilegeLevel.values()))
+          .error(accessError(true, "functional"), onDatabase("functional",
+              TPrivilegeLevel.values()))
+          .error(accessError(true, "functional"))
+          .error(accessError(true, "functional"), onServer(true, allExcept(
+              TPrivilegeLevel.ALL, TPrivilegeLevel.OWNER)))
+          .error(accessError(true, "functional"), onDatabase(true, "functional",
+              allExcept(TPrivilegeLevel.ALL, TPrivilegeLevel.OWNER)));
 
       // Database does not exist.
       authorize(String.format("alter database nodb set owner %s foo", ownerType))
-          .error(alterError("nodb"))
-          .error(alterError("nodb"), onServer(allExcept(
-              TPrivilegeLevel.ALL, TPrivilegeLevel.OWNER, TPrivilegeLevel.ALTER)));
+          .error(accessError(true, "nodb"))
+          .error(accessError(true, "nodb"), onServer(true, allExcept(
+              TPrivilegeLevel.ALL, TPrivilegeLevel.OWNER)));
     }
   }
 
@@ -2580,7 +2609,12 @@ public class AuthorizationStmtTest extends FrontendTestBase {
   }
 
   private static String accessError(String object) {
-    return "User '%s' does not have privileges to access: " + object;
+    return accessError(false, object);
+  }
+
+  private static String accessError(boolean grantOption, String object) {
+    return "User '%s' does not have privileges" +
+        (grantOption ? " with 'GRANT OPTION'" : "") + " to access: " + object;
   }
 
   private static String refreshError(String object) {
@@ -2880,10 +2914,15 @@ public class AuthorizationStmtTest extends FrontendTestBase {
   }
 
   private TPrivilege[] onServer(TPrivilegeLevel... levels) {
+    return onServer(false, levels);
+  }
+
+  private TPrivilege[] onServer(boolean grantOption, TPrivilegeLevel... levels) {
     TPrivilege[] privileges = new TPrivilege[levels.length];
     for (int i = 0; i < levels.length; i++) {
       privileges[i] = new TPrivilege("", levels[i], TPrivilegeScope.SERVER, false);
       privileges[i].setServer_name(SENTRY_SERVER);
+      privileges[i].setHas_grant_opt(grantOption);
       privileges[i].setPrivilege_name(PrincipalPrivilege.buildPrivilegeName(
           privileges[i]));
     }
@@ -2891,11 +2930,17 @@ public class AuthorizationStmtTest extends FrontendTestBase {
   }
 
   private TPrivilege[] onDatabase(String db, TPrivilegeLevel... levels) {
+    return onDatabase(false, db, levels);
+  }
+
+  private TPrivilege[] onDatabase(boolean grantOption, String db,
+      TPrivilegeLevel... levels) {
     TPrivilege[] privileges = new TPrivilege[levels.length];
     for (int i = 0; i < levels.length; i++) {
       privileges[i] = new TPrivilege("", levels[i], TPrivilegeScope.DATABASE, false);
       privileges[i].setServer_name(SENTRY_SERVER);
       privileges[i].setDb_name(db);
+      privileges[i].setHas_grant_opt(grantOption);
       privileges[i].setPrivilege_name(PrincipalPrivilege.buildPrivilegeName(
           privileges[i]));
     }
@@ -2903,12 +2948,18 @@ public class AuthorizationStmtTest extends FrontendTestBase {
   }
 
   private TPrivilege[] onTable(String db, String table, TPrivilegeLevel... levels) {
+    return onTable(false, db, table, levels);
+  }
+
+  private TPrivilege[] onTable(boolean grantOption, String db, String table,
+      TPrivilegeLevel... levels) {
     TPrivilege[] privileges = new TPrivilege[levels.length];
     for (int i = 0; i < levels.length; i++) {
       privileges[i] = new TPrivilege("", levels[i], TPrivilegeScope.TABLE, false);
       privileges[i].setServer_name(SENTRY_SERVER);
       privileges[i].setDb_name(db);
       privileges[i].setTable_name(table);
+      privileges[i].setHas_grant_opt(grantOption);
       privileges[i].setPrivilege_name(PrincipalPrivilege.buildPrivilegeName(
           privileges[i]));
     }
@@ -2920,8 +2971,18 @@ public class AuthorizationStmtTest extends FrontendTestBase {
     return onColumn(db, table, new String[]{column}, levels);
   }
 
+  private TPrivilege[] onColumn(boolean grantOption, String db, String table,
+      String column, TPrivilegeLevel... levels) {
+    return onColumn(grantOption, db, table, new String[]{column}, levels);
+  }
+
   private TPrivilege[] onColumn(String db, String table, String[] columns,
       TPrivilegeLevel... levels) {
+    return onColumn(false, db, table, columns, levels);
+  }
+
+  private TPrivilege[] onColumn(boolean grantOption, String db, String table,
+      String[] columns, TPrivilegeLevel... levels) {
     int size = columns.length * levels.length;
     TPrivilege[] privileges = new TPrivilege[size];
     int idx = 0;
@@ -2932,6 +2993,7 @@ public class AuthorizationStmtTest extends FrontendTestBase {
         privileges[idx].setDb_name(db);
         privileges[idx].setTable_name(table);
         privileges[idx].setColumn_name(column);
+        privileges[idx].setHas_grant_opt(grantOption);
         privileges[idx].setPrivilege_name(PrincipalPrivilege.buildPrivilegeName(
             privileges[idx]));
         idx++;
@@ -2941,11 +3003,16 @@ public class AuthorizationStmtTest extends FrontendTestBase {
   }
 
   private TPrivilege[] onUri(String uri, TPrivilegeLevel... levels) {
+    return onUri(false, uri, levels);
+  }
+
+  private TPrivilege[] onUri(boolean grantOption, String uri, TPrivilegeLevel... levels) {
     TPrivilege[] privileges = new TPrivilege[levels.length];
     for (int i = 0; i < levels.length; i++) {
       privileges[i] = new TPrivilege("", levels[i], TPrivilegeScope.URI, false);
       privileges[i].setServer_name(SENTRY_SERVER);
       privileges[i].setUri(uri);
+      privileges[i].setHas_grant_opt(grantOption);
       privileges[i].setPrivilege_name(PrincipalPrivilege.buildPrivilegeName(
           privileges[i]));
     }


[3/7] impala git commit: IMPALA-7457. statestore: allow filtering by key prefix

Posted by mi...@apache.org.
IMPALA-7457. statestore: allow filtering by key prefix

This adds the ability for a statestore subscriber to specify a key
prefix which acts as a filter. Only topic entries which match the
specified prefix are transmitted to the subscriber.

This patch makes use of the new feature for a small optimization: the
catalogd subscribes to the catalog topic with a key prefix "!" which we
know doesn't match any actual topic items. This avoids the statestore
having to reflect back the catalog contents to the catalogd, since the
catalogd ignored this info anyway.

A later patch will make use of this to publish lightweight catalog
object version numbers in the same topic as the catalog objects
themselves.

The modification to catalogd's topic subscription is covered by existing
tests. A new specific test is added to verify the filtering mechanism.

Change-Id: I6ddcf3bfaf16bc3cd1ba01100e948ff142a67620
Reviewed-on: http://gerrit.cloudera.org:8080/11253
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/da01f29d
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/da01f29d
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/da01f29d

Branch: refs/heads/master
Commit: da01f29d303dca1dbc2be30bc75a72d698a9f4d2
Parents: 0782321
Author: Todd Lipcon <to...@apache.org>
Authored: Thu Aug 16 13:18:28 2018 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Wed Aug 22 16:05:55 2018 +0000

----------------------------------------------------------------------
 be/src/catalog/catalog-server.cc           |  9 ++++-
 be/src/scheduling/admission-controller.cc  |  4 +-
 be/src/scheduling/scheduler.cc             |  4 +-
 be/src/service/impala-server.cc            | 10 +++--
 be/src/statestore/statestore-subscriber.cc |  8 +++-
 be/src/statestore/statestore-subscriber.h  |  6 ++-
 be/src/statestore/statestore.cc            | 13 +++++--
 be/src/statestore/statestore.h             | 22 +++++++++--
 common/thrift/StatestoreService.thrift     |  6 +++
 tests/statestore/test_statestore.py        | 49 +++++++++++++++++++++++++
 10 files changed, 115 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/da01f29d/be/src/catalog/catalog-server.cc
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index 96646fb..bd754cf 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -204,7 +204,14 @@ Status CatalogServer::Start() {
 
   StatestoreSubscriber::UpdateCallback cb =
       bind<void>(mem_fn(&CatalogServer::UpdateCatalogTopicCallback), this, _1, _2);
-  status = statestore_subscriber_->AddTopic(IMPALA_CATALOG_TOPIC, false, false, cb);
+  // The catalogd never needs to read any entries from the topic. It only publishes
+  // entries. So, we set a prefix to some random character that we know won't be a
+  // prefix of any key. This saves a bit of network communication from the statestore
+  // back to the catalog.
+  string filter_prefix = "!";
+  status = statestore_subscriber_->AddTopic(IMPALA_CATALOG_TOPIC,
+      /* is_transient=*/ false, /* populate_min_subscriber_topic_version=*/ false,
+      filter_prefix, cb);
   if (!status.ok()) {
     status.AddDetail("CatalogService failed to start");
     return status;

http://git-wip-us.apache.org/repos/asf/impala/blob/da01f29d/be/src/scheduling/admission-controller.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc
index 3960528..2e1f7d9 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -246,7 +246,9 @@ Status AdmissionController::Init() {
       const StatestoreSubscriber::TopicDeltaMap& state,
       vector<TTopicDelta>* topic_updates) { UpdatePoolStats(state, topic_updates); };
   Status status =
-      subscriber_->AddTopic(Statestore::IMPALA_REQUEST_QUEUE_TOPIC, true, false, cb);
+      subscriber_->AddTopic(Statestore::IMPALA_REQUEST_QUEUE_TOPIC,
+          /* is_transient=*/ true, /* populate_min_subscriber_topic_version=*/ false,
+          /* filter_prefix=*/"", cb);
   if (!status.ok()) {
     status.AddDetail("AdmissionController failed to register request queue topic");
   }

http://git-wip-us.apache.org/repos/asf/impala/blob/da01f29d/be/src/scheduling/scheduler.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index 20acc43..ae4f049 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -84,7 +84,9 @@ Status Scheduler::Init(const TNetworkAddress& backend_address,
     StatestoreSubscriber::UpdateCallback cb =
         bind<void>(mem_fn(&Scheduler::UpdateMembership), this, _1, _2);
     Status status = statestore_subscriber_->AddTopic(
-        Statestore::IMPALA_MEMBERSHIP_TOPIC, true, false, cb);
+        Statestore::IMPALA_MEMBERSHIP_TOPIC, /* is_transient=*/ true,
+        /* populate_min_subscriber_topic_version=*/ false,
+        /* filter_prefix= */"", cb);
     if (!status.ok()) {
       status.AddDetail("Scheduler failed to register membership topic");
       return status;

http://git-wip-us.apache.org/repos/asf/impala/blob/da01f29d/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 23a09f5..6b37d49 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -362,15 +362,19 @@ ImpalaServer::ImpalaServer(ExecEnv* exec_env)
       this->MembershipCallback(state, topic_updates);
     };
     ABORT_IF_ERROR(exec_env->subscriber()->AddTopic(
-        Statestore::IMPALA_MEMBERSHIP_TOPIC, true, false, cb));
+        Statestore::IMPALA_MEMBERSHIP_TOPIC, /* is_transient=*/ true,
+        /* populate_min_subscriber_topic_version=*/ false,
+        /* filter_prefix=*/"", cb));
 
-    if (FLAGS_is_coordinator && !FLAGS_use_local_catalog) {
+    if (FLAGS_is_coordinator) {
       auto catalog_cb = [this] (const StatestoreSubscriber::TopicDeltaMap& state,
           vector<TTopicDelta>* topic_updates) {
         this->CatalogUpdateCallback(state, topic_updates);
       };
       ABORT_IF_ERROR(exec_env->subscriber()->AddTopic(
-          CatalogServer::IMPALA_CATALOG_TOPIC, true, true, catalog_cb));
+          CatalogServer::IMPALA_CATALOG_TOPIC, /* is_transient=*/ true,
+          /* populate_min_subscriber_topic_version=*/ true, /* filter_prefix=*/ "",
+          catalog_cb));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/da01f29d/be/src/statestore/statestore-subscriber.cc
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore-subscriber.cc b/be/src/statestore/statestore-subscriber.cc
index 443a0e5..c83b520 100644
--- a/be/src/statestore/statestore-subscriber.cc
+++ b/be/src/statestore/statestore-subscriber.cc
@@ -43,6 +43,8 @@ using boost::posix_time::seconds;
 using boost::shared_lock;
 using boost::shared_mutex;
 using boost::try_to_lock;
+using std::string;
+
 using namespace apache::thrift;
 using namespace apache::thrift::transport;
 using namespace strings;
@@ -105,7 +107,7 @@ class StatestoreSubscriberThriftIf : public StatestoreSubscriberIf {
   StatestoreSubscriber* subscriber_;
 };
 
-StatestoreSubscriber::StatestoreSubscriber(const std::string& subscriber_id,
+StatestoreSubscriber::StatestoreSubscriber(const string& subscriber_id,
     const TNetworkAddress& heartbeat_address, const TNetworkAddress& statestore_address,
     MetricGroup* metrics)
     : subscriber_id_(subscriber_id),
@@ -139,7 +141,7 @@ StatestoreSubscriber::StatestoreSubscriber(const std::string& subscriber_id,
 
 Status StatestoreSubscriber::AddTopic(const Statestore::TopicId& topic_id,
     bool is_transient, bool populate_min_subscriber_topic_version,
-    const UpdateCallback& callback) {
+    string filter_prefix, const UpdateCallback& callback) {
   lock_guard<shared_mutex> exclusive_lock(lock_);
   if (is_registered_) return Status("Subscriber already started, can't add new topic");
   TopicRegistration& registration = topic_registrations_[topic_id];
@@ -154,6 +156,7 @@ Status StatestoreSubscriber::AddTopic(const Statestore::TopicId& topic_id,
   registration.is_transient = is_transient;
   registration.populate_min_subscriber_topic_version =
       populate_min_subscriber_topic_version;
+  registration.filter_prefix = std::move(filter_prefix);
   return Status::OK();
 }
 
@@ -169,6 +172,7 @@ Status StatestoreSubscriber::Register() {
     thrift_topic.is_transient = registration.second.is_transient;
     thrift_topic.populate_min_subscriber_topic_version =
         registration.second.populate_min_subscriber_topic_version;
+    thrift_topic.__set_filter_prefix(registration.second.filter_prefix);
     request.topic_registrations.push_back(thrift_topic);
   }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/da01f29d/be/src/statestore/statestore-subscriber.h
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore-subscriber.h b/be/src/statestore/statestore-subscriber.h
index 05d4489..016343f 100644
--- a/be/src/statestore/statestore-subscriber.h
+++ b/be/src/statestore/statestore-subscriber.h
@@ -112,7 +112,8 @@ class StatestoreSubscriber {
   /// Must be called before Start(), in which case it will return
   /// Status::OK. Otherwise an error will be returned.
   Status AddTopic(const Statestore::TopicId& topic_id, bool is_transient,
-      bool populate_min_subscriber_topic_version, const UpdateCallback& callback);
+      bool populate_min_subscriber_topic_version, std::string filter_prefix,
+      const UpdateCallback& callback);
 
   /// Registers this subscriber with the statestore, and starts the
   /// heartbeat service, as well as a thread to check for failure and
@@ -220,6 +221,9 @@ class StatestoreSubscriber {
     /// in on updates.
     bool populate_min_subscriber_topic_version = false;
 
+    /// Only subscribe to keys with the provided prefix.
+    string filter_prefix;
+
     /// The last version of the topic this subscriber processed.
     /// -1 if no updates have been processed yet.
     int64_t current_topic_version = -1;

http://git-wip-us.apache.org/repos/asf/impala/blob/da01f29d/be/src/statestore/statestore.cc
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore.cc b/be/src/statestore/statestore.cc
index a208d97..8749825 100644
--- a/be/src/statestore/statestore.cc
+++ b/be/src/statestore/statestore.cc
@@ -25,6 +25,7 @@
 #include <boost/thread.hpp>
 #include <thrift/Thrift.h>
 #include <gutil/strings/substitute.h>
+#include <gutil/strings/util.h>
 
 #include "common/status.h"
 #include "gen-cpp/StatestoreService_types.h"
@@ -232,7 +233,8 @@ void Statestore::Topic::ClearAllEntries() {
 }
 
 void Statestore::Topic::BuildDelta(const SubscriberId& subscriber_id,
-    TopicEntry::Version last_processed_version, TTopicDelta* delta) {
+    TopicEntry::Version last_processed_version,
+    const string& filter_prefix, TTopicDelta* delta) {
   // If the subscriber version is > 0, send this update as a delta. Otherwise, this is
   // a new subscriber so send them a non-delta update that includes all entries in the
   // topic.
@@ -253,6 +255,9 @@ void Statestore::Topic::BuildDelta(const SubscriberId& subscriber_id,
       if (!delta->is_delta && topic_entry.is_deleted()) {
         continue;
       }
+      // Skip any entries that don't match the requested prefix.
+      if (!HasPrefixString(itr->first, filter_prefix)) continue;
+
       delta->topic_entries.push_back(TTopicItem());
       TTopicItem& delta_entry = delta->topic_entries.back();
       delta_entry.key = itr->first;
@@ -314,7 +319,8 @@ Statestore::Subscriber::Subscriber(const SubscriberId& subscriber_id,
     GetTopicsMapForId(topic.topic_name)
         ->emplace(piecewise_construct, forward_as_tuple(topic.topic_name),
             forward_as_tuple(
-                topic.is_transient, topic.populate_min_subscriber_topic_version));
+                topic.is_transient, topic.populate_min_subscriber_topic_version,
+                topic.filter_prefix));
   }
 }
 
@@ -752,7 +758,8 @@ void Statestore::GatherTopicUpdates(const Subscriber& subscriber, UpdateKind upd
       TTopicDelta& topic_delta =
           update_state_request->topic_deltas[subscribed_topic.first];
       topic_delta.topic_name = subscribed_topic.first;
-      topic_it->second.BuildDelta(subscriber.id(), last_processed_version, &topic_delta);
+      topic_it->second.BuildDelta(subscriber.id(), last_processed_version,
+          subscribed_topic.second.filter_prefix, &topic_delta);
       if (subscribed_topic.second.populate_min_subscriber_topic_version) {
         deltas_needing_min_version.push_back(&topic_delta);
       }

http://git-wip-us.apache.org/repos/asf/impala/blob/da01f29d/be/src/statestore/statestore.h
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore.h b/be/src/statestore/statestore.h
index 1d7f1a2..9326492 100644
--- a/be/src/statestore/statestore.h
+++ b/be/src/statestore/statestore.h
@@ -97,6 +97,13 @@ typedef TUniqueId RegistrationId;
 /// subscriber, rather than all items in the topic.  For non-delta updates, the statestore
 /// will send an update that includes all values in the topic.
 ///
+/// Subscribers may filter the keys within a subscribed topic by an optional prefix. If
+/// a key filter prefix is specified, only entries matching that prefix will be sent to
+/// the subscriber in updates. Note that this may result in empty updates being sent
+/// to subscribers in the case that all updated keys have been excluded by the filter.
+/// These empty updates are important so that subscribers can keep track of the current
+/// version number and report back their progress in receiving the topic contents.
+///
 /// +================+
 /// | Implementation |
 /// +================+
@@ -262,12 +269,14 @@ class Statestore : public CacheLineAligned {
     void DeleteIfVersionsMatch(TopicEntry::Version version, const TopicEntryKey& key);
 
     /// Build a delta update to send to 'subscriber_id' including the deltas greater
-    /// than 'last_processed_version' (not inclusive).
+    /// than 'last_processed_version' (not inclusive). Only those items whose keys
+    /// start with 'filter_prefix' are included in the update.
     ///
     /// Safe to call concurrently from multiple threads (for different subscribers).
     /// Acquires a shared read lock for the topic.
     void BuildDelta(const SubscriberId& subscriber_id,
-        TopicEntry::Version last_processed_version, TTopicDelta* delta);
+        TopicEntry::Version last_processed_version, const std::string& filter_prefix,
+        TTopicDelta* delta);
 
     /// Adds entries representing the current topic state to 'topic_json'.
     void ToJson(rapidjson::Document* document, rapidjson::Value* topic_json);
@@ -335,9 +344,11 @@ class Statestore : public CacheLineAligned {
 
     /// Information about a subscriber's subscription to a specific topic.
     struct TopicSubscription {
-      TopicSubscription(bool is_transient, bool populate_min_subscriber_topic_version)
+      TopicSubscription(bool is_transient, bool populate_min_subscriber_topic_version,
+          std::string filter_prefix)
         : is_transient(is_transient),
-          populate_min_subscriber_topic_version(populate_min_subscriber_topic_version) {}
+          populate_min_subscriber_topic_version(populate_min_subscriber_topic_version),
+          filter_prefix(std::move(filter_prefix)) {}
 
       /// Whether entries written by this subscriber should be considered transient.
       const bool is_transient;
@@ -346,6 +357,9 @@ class Statestore : public CacheLineAligned {
       /// subscription.
       const bool populate_min_subscriber_topic_version;
 
+      /// The prefix for which the subscriber wants to see updates.
+      const std::string filter_prefix;
+
       /// The last topic entry version successfully processed by this subscriber. Only
       /// written by a single thread at a time but can be read concurrently.
       AtomicInt64 last_version{TOPIC_INITIAL_VERSION};

http://git-wip-us.apache.org/repos/asf/impala/blob/da01f29d/common/thrift/StatestoreService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/StatestoreService.thrift b/common/thrift/StatestoreService.thrift
index 783bea7..1c82170 100644
--- a/common/thrift/StatestoreService.thrift
+++ b/common/thrift/StatestoreService.thrift
@@ -145,6 +145,12 @@ struct TTopicRegistration {
   // actually required - computing the version is relatively expensive compared to
   // other aspects of preparing topic updates - see IMPALA-6816.
   3: required bool populate_min_subscriber_topic_version = false;
+
+  // Restrict the items to receive on this subscription to only those items
+  // starting with the given prefix.
+  //
+  // If this is not specified, all items will be subscribed to.
+  4: optional string filter_prefix
 }
 
 struct TRegisterSubscriberRequest {

http://git-wip-us.apache.org/repos/asf/impala/blob/da01f29d/tests/statestore/test_statestore.py
----------------------------------------------------------------------
diff --git a/tests/statestore/test_statestore.py b/tests/statestore/test_statestore.py
index 682a306..d45deeb 100644
--- a/tests/statestore/test_statestore.py
+++ b/tests/statestore/test_statestore.py
@@ -378,6 +378,55 @@ class TestStatestore():
          .wait_for_update(topic_name, 3)
     )
 
+  def test_filter_prefix(self):
+    topic_name = "topic_delta_%s" % uuid.uuid1()
+
+    def topic_update_correct(sub, args):
+      foo_delta = self.make_topic_update(topic_name, num_updates=1)
+      bar_delta = self.make_topic_update(topic_name, num_updates=2, key_template='bar')
+
+      update_count = sub.update_counts[topic_name]
+      if topic_name not in args.topic_deltas:
+        # The update doesn't contain our topic.
+        pass
+      elif update_count == 1:
+        # Send some values with both prefixes.
+        return TUpdateStateResponse(status=STATUS_OK,
+                                    topic_updates=[foo_delta, bar_delta],
+                                    skipped=False)
+      elif update_count == 2:
+        # We should only get the 'bar' entries back.
+        assert len(args.topic_deltas) == 1, args.topic_deltas
+        assert args.topic_deltas[topic_name].topic_entries == bar_delta.topic_entries
+        assert args.topic_deltas[topic_name].topic_name == bar_delta.topic_name
+      elif update_count == 3:
+        # Send some more updates that only have 'foo' prefixes.
+        return TUpdateStateResponse(status=STATUS_OK,
+                                    topic_updates=[foo_delta],
+                                    skipped=False)
+      elif update_count == 4:
+        # We shouldn't see any entries from the above update, but we should still see
+        # the version number change due to the new entries in the topic.
+        assert len(args.topic_deltas[topic_name].topic_entries) == 0
+        assert args.topic_deltas[topic_name].from_version == 3
+        assert args.topic_deltas[topic_name].to_version == 4
+      elif update_count == 5:
+        # After the content-bearing update was processed, the next delta should be empty
+        assert len(args.topic_deltas[topic_name].topic_entries) == 0
+        assert args.topic_deltas[topic_name].from_version == 4
+        assert args.topic_deltas[topic_name].to_version == 4
+
+      return DEFAULT_UPDATE_STATE_RESPONSE
+
+    sub = StatestoreSubscriber(update_cb=topic_update_correct)
+    reg = TTopicRegistration(topic_name=topic_name, is_transient=False,
+                             filter_prefix="bar")
+    (
+      sub.start()
+         .register(topics=[reg])
+         .wait_for_update(topic_name, 5)
+    )
+
   def test_update_is_delta(self):
     """Test that the 'is_delta' flag is correctly set. The first update for a topic should
     always not be a delta, and so should all subsequent updates until the subscriber says


[5/7] impala git commit: IMPALA-6709: Simplify tests that copy local files to tables

Posted by mi...@apache.org.
IMPALA-6709: Simplify tests that copy local files to tables

We had quite a few tests that created a table and used
"hdfs dfs -copyFromLocal" to copy data files to the
warehouse directory for this table.

This operation needs some boilerplate code that I
refactored to the new functions called
create_table_from_parquet() and
create_table_and_copy_files().

Change-Id: Ie00a4561825facf8abe2e8e74a6b6e93194f416f
Reviewed-on: http://gerrit.cloudera.org:8080/11127
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/e27954a5
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/e27954a5
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/e27954a5

Branch: refs/heads/master
Commit: e27954a5aa585db23fe3c97726aa89305efa306d
Parents: da01f29
Author: Zoltan Borok-Nagy <bo...@cloudera.com>
Authored: Thu Aug 2 15:13:04 2018 +0200
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Wed Aug 22 18:08:20 2018 +0000

----------------------------------------------------------------------
 .../queries/QueryTest/parquet-def-levels.test   |   8 +-
 tests/common/file_utils.py                      |  58 ++++++
 tests/query_test/test_parquet_stats.py          |  32 ++--
 tests/query_test/test_scanners.py               | 178 +++++--------------
 4 files changed, 115 insertions(+), 161 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/e27954a5/testdata/workloads/functional-query/queries/QueryTest/parquet-def-levels.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/parquet-def-levels.test b/testdata/workloads/functional-query/queries/QueryTest/parquet-def-levels.test
index e55fc4d..0145fca 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/parquet-def-levels.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/parquet-def-levels.test
@@ -54,14 +54,14 @@ INT,BOOLEAN,TINYINT,SMALLINT,INT,BIGINT,FLOAT,DOUBLE,STRING,STRING,TIMESTAMP,INT
 ---- QUERY
 # IMPALA-6077: unsupported BIT_PACKED encoding fails when materializing columns.
 select id
-from alltypesagg_bitpacked
+from alltypes_agg_bitpacked_def_levels
 ---- CATCH
 deprecated BIT_PACKED encoding for rep or def levels.
 ====
 ---- QUERY
 # IMPALA-6077: do not need to decode BIT_PACKED encoding when not materializing columns.
 select count(*)
-from alltypesagg_bitpacked
+from alltypes_agg_bitpacked_def_levels
 ---- RESULTS
 11000
 ---- TYPES
@@ -72,7 +72,7 @@ BIGINT
 # case it should either work or fail gracefully. For now it still requires materialising
 # levels.
 select count(id)
-from alltypesagg_bitpacked
+from alltypes_agg_bitpacked_def_levels
 ---- CATCH
 deprecated BIT_PACKED encoding for rep or def levels.
 ====
@@ -81,7 +81,7 @@ deprecated BIT_PACKED encoding for rep or def levels.
 # case it should either work or fail gracefully. For now it still requires materialising
 # levels.
 select min(int_col)
-from alltypesagg_bitpacked
+from alltypes_agg_bitpacked_def_levels
 ---- CATCH
 deprecated BIT_PACKED encoding for rep or def levels.
 ====

http://git-wip-us.apache.org/repos/asf/impala/blob/e27954a5/tests/common/file_utils.py
----------------------------------------------------------------------
diff --git a/tests/common/file_utils.py b/tests/common/file_utils.py
new file mode 100644
index 0000000..d3fa61c
--- /dev/null
+++ b/tests/common/file_utils.py
@@ -0,0 +1,58 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# This module contains utility functions for testing Parquet files
+
+import os
+from subprocess import check_call
+
+from tests.util.filesystem_utils import get_fs_path
+
+
+def create_table_from_parquet(impala_client, unique_database, table_name):
+  """Utility function to create a database table from a Parquet file. A Parquet file must
+  exist in $IMPALA_HOME/testdata/data with the name 'table_name'.parquet"""
+  filename = '{0}.parquet'.format(table_name)
+  local_file = os.path.join(os.environ['IMPALA_HOME'],
+                            'testdata/data/{0}'.format(filename))
+  assert os.path.isfile(local_file)
+  hdfs_file = get_fs_path('/test-warehouse/{0}.db/{1}'.format(unique_database, filename))
+  check_call(['hdfs', 'dfs', '-copyFromLocal', '-f', local_file, hdfs_file])
+
+  qualified_table_name = '{0}.{1}'.format(unique_database, table_name)
+  impala_client.execute('create table {0} like parquet "{1}" stored as parquet'.format(
+    qualified_table_name, hdfs_file))
+  impala_client.execute('load data inpath "{0}" into table {1}'.format(
+    hdfs_file, qualified_table_name))
+
+
+def create_table_and_copy_files(impala_client, create_stmt, unique_database, table_name,
+                                files):
+  create_stmt = create_stmt.format(db=unique_database, tbl=table_name)
+  impala_client.execute(create_stmt)
+  for local_file in files:
+    # Cut off leading '/' to make os.path.join() happy
+    local_file = local_file if local_file[0] != '/' else local_file[1:]
+    local_file = os.path.join(os.environ['IMPALA_HOME'], local_file)
+    assert os.path.isfile(local_file)
+    basename = os.path.basename(local_file)
+    hdfs_file = get_fs_path('/test-warehouse/{0}.db/{1}'.format(unique_database,
+                                                                basename))
+    check_call(['hdfs', 'dfs', '-copyFromLocal', '-f', local_file, hdfs_file])
+    qualified_table_name = '{0}.{1}'.format(unique_database, table_name)
+    impala_client.execute('load data inpath "{0}" into table {1}'.format(
+      hdfs_file, qualified_table_name))

http://git-wip-us.apache.org/repos/asf/impala/blob/e27954a5/tests/query_test/test_parquet_stats.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_parquet_stats.py b/tests/query_test/test_parquet_stats.py
index 3f8cd2f..cb35653 100644
--- a/tests/query_test/test_parquet_stats.py
+++ b/tests/query_test/test_parquet_stats.py
@@ -20,6 +20,8 @@ import pytest
 import shlex
 from subprocess import check_call
 
+from tests.common.file_utils import (
+  create_table_from_parquet, create_table_and_copy_files)
 from tests.common.test_vector import ImpalaTestDimension
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.util.filesystem_utils import get_fs_path
@@ -52,19 +54,14 @@ class TestParquetStats(ImpalaTestSuite):
     """Test that reading parquet files with statistics with deprecated 'min'/'max' fields
     works correctly. The statistics will be used for known-good types (boolean, integral,
     float) and will be ignored for all other types (string, decimal, timestamp)."""
-    table_name = 'deprecated_stats'
+
     # We use CTAS instead of "create table like" to convert the partition columns into
     # normal table columns.
-    self.client.execute('create table %s.%s stored as parquet as select * from '
-                        'functional.alltypessmall limit 0' %
-                        (unique_database, table_name))
-    table_location = get_fs_path('/test-warehouse/%s.db/%s' %
-                                 (unique_database, table_name))
-    local_file = os.path.join(os.environ['IMPALA_HOME'],
-                              'testdata/data/deprecated_statistics.parquet')
-    assert os.path.isfile(local_file)
-    check_call(['hdfs', 'dfs', '-copyFromLocal', local_file, table_location])
-    self.client.execute('invalidate metadata %s.%s' % (unique_database, table_name))
+    create_table_and_copy_files(self.client, 'create table {db}.{tbl} stored as parquet '
+                                             'as select * from functional.alltypessmall '
+                                             'limit 0',
+                                unique_database, 'deprecated_stats',
+                                ['testdata/data/deprecated_statistics.parquet'])
     # The test makes assumptions about the number of row groups that are processed and
     # skipped inside a fragment, so we ensure that the tests run in a single fragment.
     vector.get_value('exec_option')['num_nodes'] = 1
@@ -74,14 +71,5 @@ class TestParquetStats(ImpalaTestSuite):
     """IMPALA-6538" Test that reading parquet files with statistics with invalid
     'min_value'/'max_value' fields works correctly. 'min_value' and 'max_value' are both
     NaNs, therefore we need to ignore them"""
-    table_name = 'min_max_is_nan'
-    self.client.execute('create table %s.%s (val double) stored as parquet' %
-                       (unique_database, table_name))
-    table_location = get_fs_path('/test-warehouse/%s.db/%s' %
-                                 (unique_database, table_name))
-    local_file = os.path.join(os.environ['IMPALA_HOME'],
-                              'testdata/data/min_max_is_nan.parquet')
-    assert os.path.isfile(local_file)
-    check_call(['hdfs', 'dfs', '-copyFromLocal', local_file, table_location])
-    self.client.execute('invalidate metadata %s.%s' % (unique_database, table_name))
-    self.run_test_case('QueryTest/parquet-invalid-minmax-stats', vector, unique_database)
\ No newline at end of file
+    create_table_from_parquet(self.client, unique_database, 'min_max_is_nan')
+    self.run_test_case('QueryTest/parquet-invalid-minmax-stats', vector, unique_database)

http://git-wip-us.apache.org/repos/asf/impala/blob/e27954a5/tests/query_test/test_scanners.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_scanners.py b/tests/query_test/test_scanners.py
index 2e61d20..1cd883e 100644
--- a/tests/query_test/test_scanners.py
+++ b/tests/query_test/test_scanners.py
@@ -44,6 +44,9 @@ from tests.common.test_dimensions import (
     create_single_exec_option_dimension,
     create_exec_option_dimension,
     create_uncompressed_text_dimension)
+from tests.common.file_utils import (
+    create_table_from_parquet,
+    create_table_and_copy_files)
 from tests.common.test_result_verifier import (
     parse_column_types,
     parse_column_labels,
@@ -290,21 +293,6 @@ class TestParquet(ImpalaTestSuite):
     cls.ImpalaTestMatrix.add_constraint(
       lambda v: v.get_value('table_format').file_format == 'parquet')
 
-  def _create_table_from_file(self, table_name, unique_database):
-    filename = '%s.parquet' % table_name
-    local_file = os.path.join(os.environ['IMPALA_HOME'],
-                              'testdata/data/%s' % filename)
-    assert os.path.isfile(local_file)
-    hdfs_file = get_fs_path('/test-warehouse/{0}.db/{1}'.format(
-        unique_database, filename))
-    check_call(['hdfs', 'dfs', '-copyFromLocal', '-f', local_file, hdfs_file])
-
-    qualified_table_name = '%s.%s' % (unique_database, table_name)
-    self.client.execute('create table %s like parquet "%s" stored as parquet' %
-                        (qualified_table_name, hdfs_file))
-    self.client.execute('load data inpath "%s" into table %s' %
-                        (hdfs_file, qualified_table_name))
-
   def test_parquet(self, vector):
     self.run_test_case('QueryTest/parquet', vector)
 
@@ -316,13 +304,7 @@ class TestParquet(ImpalaTestSuite):
 
   def test_timestamp_out_of_range(self, vector, unique_database):
     """IMPALA-4363: Test scanning parquet files with an out of range timestamp."""
-    self.client.execute(("create table {0}.out_of_range_timestamp (ts timestamp) "
-        "stored as parquet").format(unique_database))
-    out_of_range_timestamp_loc = get_fs_path(
-        "/test-warehouse/{0}.db/{1}".format(unique_database, "out_of_range_timestamp"))
-    check_call(['hdfs', 'dfs', '-copyFromLocal',
-        os.environ['IMPALA_HOME'] + "/testdata/data/out_of_range_timestamp.parquet",
-        out_of_range_timestamp_loc])
+    create_table_from_parquet(self.client, unique_database, "out_of_range_timestamp")
 
     vector.get_value('exec_option')['abort_on_error'] = 0
     self.run_test_case('QueryTest/out-of-range-timestamp-continue-on-error',
@@ -335,21 +317,9 @@ class TestParquet(ImpalaTestSuite):
     """IMPALA-3943: Tests that scanning files with num_rows=0 in the file footer
     succeeds without errors."""
     # Create test table with a file that has 0 rows and 0 row groups.
-    self.client.execute("create table %s.zero_rows_zero_row_groups (c int) "
-        "stored as parquet" % unique_database)
-    zero_rows_zero_row_groups_loc = get_fs_path(
-        "/test-warehouse/%s.db/%s" % (unique_database, "zero_rows_zero_row_groups"))
-    check_call(['hdfs', 'dfs', '-copyFromLocal',
-        os.environ['IMPALA_HOME'] + "/testdata/data/zero_rows_zero_row_groups.parquet",
-        zero_rows_zero_row_groups_loc])
+    create_table_from_parquet(self.client, unique_database, "zero_rows_zero_row_groups")
     # Create test table with a file that has 0 rows and 1 row group.
-    self.client.execute("create table %s.zero_rows_one_row_group (c int) "
-        "stored as parquet" % unique_database)
-    zero_rows_one_row_group_loc = get_fs_path(
-        "/test-warehouse/%s.db/%s" % (unique_database, "zero_rows_one_row_group"))
-    check_call(['hdfs', 'dfs', '-copyFromLocal',
-        os.environ['IMPALA_HOME'] + "/testdata/data/zero_rows_one_row_group.parquet",
-        zero_rows_one_row_group_loc])
+    create_table_from_parquet(self.client, unique_database, "zero_rows_one_row_group")
 
     vector.get_value('exec_option')['abort_on_error'] = 0
     self.run_test_case('QueryTest/parquet-zero-rows', vector, unique_database)
@@ -359,13 +329,7 @@ class TestParquet(ImpalaTestSuite):
   def test_repeated_root_schema(self, vector, unique_database):
     """IMPALA-4826: Tests that running a scan on a schema where the root schema's
        repetetion level is set to REPEATED succeeds without errors."""
-    self.client.execute("create table %s.repeated_root_schema (i int) "
-        "stored as parquet" % unique_database)
-    repeated_root_schema_loc = get_fs_path(
-        "/test-warehouse/%s.db/%s" % (unique_database, "repeated_root_schema"))
-    check_call(['hdfs', 'dfs', '-copyFromLocal',
-        os.environ['IMPALA_HOME'] + "/testdata/data/repeated_root_schema.parquet",
-        repeated_root_schema_loc])
+    create_table_from_parquet(self.client, unique_database, "repeated_root_schema")
 
     result = self.client.execute("select * from %s.repeated_root_schema" % unique_database)
     assert len(result.data) == 300
@@ -373,13 +337,7 @@ class TestParquet(ImpalaTestSuite):
   def test_huge_num_rows(self, vector, unique_database):
     """IMPALA-5021: Tests that a zero-slot scan on a file with a huge num_rows in the
     footer succeeds without errors."""
-    self.client.execute("create table %s.huge_num_rows (i int) stored as parquet"
-      % unique_database)
-    huge_num_rows_loc = get_fs_path(
-        "/test-warehouse/%s.db/%s" % (unique_database, "huge_num_rows"))
-    check_call(['hdfs', 'dfs', '-copyFromLocal',
-        os.environ['IMPALA_HOME'] + "/testdata/data/huge_num_rows.parquet",
-        huge_num_rows_loc])
+    create_table_from_parquet(self.client, unique_database, "huge_num_rows")
     result = self.client.execute("select count(*) from %s.huge_num_rows"
       % unique_database)
     assert len(result.data) == 1
@@ -407,16 +365,12 @@ class TestParquet(ImpalaTestSuite):
     check_call(['hive', '-e', hql_format.format(codec="snappy", year=2010, month=1)])
     check_call(['hive', '-e', hql_format.format(codec="gzip", year=2010, month=2)])
 
-    self.client.execute("create table %s.multi_compression (a string, b string)"
-        " stored as parquet" % unique_database)
-    multi_compression_tbl_loc =\
-        get_fs_path("/test-warehouse/%s.db/%s" % (unique_database, "multi_compression"))
-    check_call(['hdfs', 'dfs', '-copyFromLocal', os.environ['IMPALA_HOME'] +
-        "/testdata/multi_compression_parquet_data/tinytable_0_gzip_snappy.parq",
-        multi_compression_tbl_loc])
-    check_call(['hdfs', 'dfs', '-copyFromLocal', os.environ['IMPALA_HOME'] +
-        "/testdata/multi_compression_parquet_data/tinytable_1_snappy_gzip.parq",
-        multi_compression_tbl_loc])
+    test_files = ["testdata/multi_compression_parquet_data/tinytable_0_gzip_snappy.parq",
+                  "testdata/multi_compression_parquet_data/tinytable_1_snappy_gzip.parq"]
+    create_table_and_copy_files(self.client, "create table {db}.{tbl} "
+                                             "(a string, b string) stored as parquet",
+                                unique_database, "multi_compression",
+                                test_files)
 
     vector.get_value('exec_option')['num_nodes'] = 1
     self.run_test_case('QueryTest/hdfs_parquet_scan_node_profile',
@@ -429,16 +383,11 @@ class TestParquet(ImpalaTestSuite):
     - incorrect repeat count of 0 for the RLE encoded dictionary indexes
     """
     # Create test table and copy the corrupt files into it.
-    self.client.execute(
-        "create table %s.bad_rle_counts (c bigint) stored as parquet" % unique_database)
-    bad_rle_counts_tbl_loc =\
-        get_fs_path("/test-warehouse/%s.db/%s" % (unique_database, "bad_rle_counts"))
-    check_call(['hdfs', 'dfs', '-copyFromLocal',
-        os.environ['IMPALA_HOME'] + "/testdata/data/bad_rle_literal_count.parquet",
-        bad_rle_counts_tbl_loc])
-    check_call(['hdfs', 'dfs', '-copyFromLocal',
-        os.environ['IMPALA_HOME'] + "/testdata/data/bad_rle_repeat_count.parquet",
-        bad_rle_counts_tbl_loc])
+    test_files = ["testdata/data/bad_rle_literal_count.parquet",
+                  "testdata/data/bad_rle_repeat_count.parquet"]
+    create_table_and_copy_files(self.client,
+                                "create table {db}.{tbl} (c bigint) stored as parquet",
+                                unique_database, "bad_rle_counts", test_files)
     # Querying the corrupted files should not DCHECK or crash.
     vector.get_value('exec_option')['abort_on_error'] = 0
     self.run_test_case('QueryTest/parquet-corrupt-rle-counts', vector, unique_database)
@@ -449,53 +398,34 @@ class TestParquet(ImpalaTestSuite):
   def test_bad_compressed_page_size(self, vector, unique_database):
     """IMPALA-6353: Tests that a parquet dict page with 0 compressed_page_size is
     gracefully handled. """
-    self.client.execute(
-        "create table %s.bad_compressed_dict_page_size (col string) stored as parquet"
-        % unique_database)
-    tbl_loc = get_fs_path("/test-warehouse/%s.db/%s" % (unique_database,
-        "bad_compressed_dict_page_size"))
-    check_call(['hdfs', 'dfs', '-copyFromLocal', os.environ['IMPALA_HOME'] +
-        "/testdata/data/bad_compressed_dict_page_size.parquet", tbl_loc])
+    create_table_from_parquet(self.client, unique_database,
+                              "bad_compressed_dict_page_size")
     self.run_test_case('QueryTest/parquet-bad-compressed-dict-page-size', vector,
         unique_database)
 
   def test_def_levels(self, vector, unique_database):
     """Test that Impala behaves as expected when decoding def levels with different
        encodings - RLE, BIT_PACKED, etc."""
-    self.client.execute(("""CREATE TABLE {0}.alltypesagg_bitpacked (
-          id INT, bool_col BOOLEAN, tinyint_col TINYINT, smallint_col SMALLINT,
-          int_col INT, bigint_col BIGINT, float_col FLOAT, double_col DOUBLE,
-          date_string_col STRING, string_col STRING, timestamp_col TIMESTAMP,
-          year INT, month INT, day INT) STORED AS PARQUET""").format(unique_database))
-    alltypesagg_loc = get_fs_path(
-        "/test-warehouse/{0}.db/{1}".format(unique_database, "alltypesagg_bitpacked"))
-    check_call(['hdfs', 'dfs', '-copyFromLocal', os.environ['IMPALA_HOME'] +
-        "/testdata/data/alltypes_agg_bitpacked_def_levels.parquet", alltypesagg_loc])
-    self.client.execute("refresh {0}.alltypesagg_bitpacked".format(unique_database));
-
+    create_table_from_parquet(self.client, unique_database,
+                              "alltypes_agg_bitpacked_def_levels")
     self.run_test_case('QueryTest/parquet-def-levels', vector, unique_database)
 
   def test_bad_compression_codec(self, vector, unique_database):
     """IMPALA-6593: test the bad compression codec is handled gracefully. """
-    self.client.execute(("""CREATE TABLE {0}.bad_codec (
+    test_files = ["testdata/data/bad_codec.parquet"]
+    create_table_and_copy_files(self.client, """CREATE TABLE {db}.{tbl} (
           id INT, bool_col BOOLEAN, tinyint_col TINYINT, smallint_col SMALLINT,
           int_col INT, bigint_col BIGINT, float_col FLOAT, double_col DOUBLE,
           date_string_col STRING, string_col STRING, timestamp_col TIMESTAMP,
-          year INT, month INT) STORED AS PARQUET""").format(unique_database))
-    tbl_loc = get_fs_path("/test-warehouse/%s.db/%s" % (unique_database,
-        "bad_codec"))
-    check_call(['hdfs', 'dfs', '-copyFromLocal', os.environ['IMPALA_HOME'] +
-        "/testdata/data/bad_codec.parquet", tbl_loc])
+          year INT, month INT) STORED AS PARQUET""",
+                                unique_database, "bad_codec",
+                                test_files)
     self.run_test_case('QueryTest/parquet-bad-codec', vector, unique_database)
 
   def test_num_values_def_levels_mismatch(self, vector, unique_database):
     """IMPALA-6589: test the bad num_values handled correctly. """
-    self.client.execute(("""CREATE TABLE {0}.num_values_def_levels_mismatch (_c0 BOOLEAN)
-        STORED AS PARQUET""").format(unique_database))
-    tbl_loc = get_fs_path("/test-warehouse/%s.db/%s" % (unique_database,
-        "num_values_def_levels_mismatch"))
-    check_call(['hdfs', 'dfs', '-copyFromLocal', os.environ['IMPALA_HOME'] +
-        "/testdata/data/num_values_def_levels_mismatch.parquet", tbl_loc])
+    create_table_from_parquet(self.client, unique_database,
+                              "num_values_def_levels_mismatch")
     self.run_test_case('QueryTest/parquet-num-values-def-levels-mismatch',
         vector, unique_database)
 
@@ -706,33 +636,20 @@ class TestParquet(ImpalaTestSuite):
   def test_decimal_encodings(self, vector, unique_database):
     # Create a table using an existing data file with dictionary-encoded, variable-length
     # physical encodings for decimals.
-    TABLE_NAME = "decimal_encodings"
-    self.client.execute('''create table if not exists %s.%s
-    (small_dec decimal(9,2), med_dec decimal(18,2), large_dec decimal(38,2))
-    STORED AS PARQUET''' % (unique_database, TABLE_NAME))
-
-    table_loc = get_fs_path(
-      "/test-warehouse/%s.db/%s" % (unique_database, TABLE_NAME))
-    for file_name in ["binary_decimal_dictionary.parquet",
-                      "binary_decimal_no_dictionary.parquet"]:
-      data_file_path = os.path.join(os.environ['IMPALA_HOME'],
-                                    "testdata/data/", file_name)
-      check_call(['hdfs', 'dfs', '-copyFromLocal', data_file_path, table_loc])
-
-    self._create_table_from_file('decimal_stored_as_int32', unique_database)
-    self._create_table_from_file('decimal_stored_as_int64', unique_database)
+    test_files = ["testdata/data/binary_decimal_dictionary.parquet",
+                  "testdata/data/binary_decimal_no_dictionary.parquet"]
+    create_table_and_copy_files(self.client, """create table if not exists {db}.{tbl}
+        (small_dec decimal(9,2), med_dec decimal(18,2), large_dec decimal(38,2))
+         STORED AS PARQUET""", unique_database, "decimal_encodings", test_files)
+
+    create_table_from_parquet(self.client, unique_database, 'decimal_stored_as_int32')
+    create_table_from_parquet(self.client, unique_database, 'decimal_stored_as_int64')
 
     self.run_test_case('QueryTest/parquet-decimal-formats', vector, unique_database)
 
   def test_rle_encoded_bools(self, vector, unique_database):
     """IMPALA-6324: Test that Impala decodes RLE encoded booleans correctly."""
-    self.client.execute(("""CREATE TABLE {0}.rle_encoded_bool (b boolean, i int)
-        STORED AS PARQUET""").format(unique_database))
-    table_loc = get_fs_path(
-        "/test-warehouse/{0}.db/{1}".format(unique_database, "rle_encoded_bool"))
-    check_call(['hdfs', 'dfs', '-copyFromLocal', os.environ['IMPALA_HOME'] +
-        "/testdata/data/rle_encoded_bool.parquet", table_loc])
-
+    create_table_from_parquet(self.client, unique_database, "rle_encoded_bool")
     self.run_test_case(
         'QueryTest/parquet-rle-encoded-bool', vector, unique_database)
 
@@ -741,13 +658,7 @@ class TestParquet(ImpalaTestSuite):
        dictionary index bit width is larger than the encoded byte's bit width.
     """
     TABLE_NAME = "dict_encoding_with_large_bit_width"
-    self.client.execute("CREATE TABLE {0}.{1} (i tinyint) STORED AS PARQUET".format(
-        unique_database, TABLE_NAME))
-    table_loc = get_fs_path(
-        "/test-warehouse/{0}.db/{1}".format(unique_database, TABLE_NAME))
-    check_call(['hdfs', 'dfs', '-copyFromLocal', os.environ['IMPALA_HOME'] +
-        "/testdata/data/{0}.parquet".format(TABLE_NAME), table_loc])
-
+    create_table_from_parquet(self.client, unique_database, TABLE_NAME)
     result = self.execute_query(
         "select * from {0}.{1}".format(unique_database, TABLE_NAME))
     assert(len(result.data) == 33)
@@ -1002,12 +913,9 @@ class TestUncompressedText(ImpalaTestSuite):
 
   # IMPALA-5315: Test support for date/time in unpadded format
   def test_scan_lazy_timestamp(self, vector, unique_database):
-    self.client.execute(("""CREATE TABLE {0}.lazy_ts (ts TIMESTAMP)""").format
-          (unique_database))
-    tbl_loc = get_fs_path("/test-warehouse/%s.db/%s" % (unique_database,
-          "lazy_ts"))
-    check_call(['hdfs', 'dfs', '-copyFromLocal', os.environ['IMPALA_HOME'] +
-          "/testdata/data/lazy_timestamp.csv", tbl_loc])
+    test_files = ["testdata/data/lazy_timestamp.csv"]
+    create_table_and_copy_files(self.client, """CREATE TABLE {db}.{tbl} (ts TIMESTAMP)""",
+                                unique_database, "lazy_ts", test_files)
     self.run_test_case('QueryTest/select-lazy-timestamp', vector, unique_database)
 
 class TestOrc(ImpalaTestSuite):


[4/7] impala git commit: IMPALA-7447. Evict LocalCatalog cache entries based on size

Posted by mi...@apache.org.
IMPALA-7447. Evict LocalCatalog cache entries based on size

This pulls in the 'sizeof' library from ehcache (Apache-licensed) and
uses it to implement size-based eviction of cache entries in
LocalCatalog.

This is difficult to test without being quite fragile to small changes
in the cached structures. However, I added a simple unit test as a
general sanity-check that it is computing some reasonable result.

Change-Id: Ia96af49b35c17e505b7b6785e78d140939085d91
Reviewed-on: http://gerrit.cloudera.org:8080/11231
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/07823211
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/07823211
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/07823211

Branch: refs/heads/master
Commit: 078232112469c238d5332453627eb37dcb10eb97
Parents: c0c3de2
Author: Todd Lipcon <to...@apache.org>
Authored: Tue Aug 14 01:42:43 2018 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Wed Aug 22 16:05:55 2018 +0000

----------------------------------------------------------------------
 be/src/runtime/exec-env.cc                      | 15 ++++-
 be/src/util/backend-gflag-util.cc               |  5 ++
 common/thrift/BackendGflags.thrift              |  4 ++
 fe/pom.xml                                      |  6 ++
 .../catalog/local/CatalogdMetaProvider.java     | 66 +++++++++++++++++---
 .../impala/catalog/local/LocalCatalog.java      |  4 +-
 .../catalog/local/CatalogdMetaProviderTest.java | 25 +++++++-
 7 files changed, 113 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/07823211/be/src/runtime/exec-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 319e948..924073a 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -81,8 +81,19 @@ DEFINE_int32(num_hdfs_worker_threads, 16,
     "(Advanced) The number of threads in the global HDFS operation pool");
 
 DEFINE_bool_hidden(use_local_catalog, false,
-  "Use experimental implementation of a local catalog. If this is set, "
-  "the catalog service is not used and does not need to be started.");
+    "Use experimental implementation of a local catalog. If this is set, "
+    "the catalog service is not used and does not need to be started.");
+DEFINE_int32_hidden(local_catalog_cache_mb, -1,
+    "If --use_local_catalog is enabled, configures the size of the catalog "
+    "cache within each impalad. If this is set to -1, the cache is auto-"
+    "configured to 60% of the configured Java heap size. Note that the Java "
+    "heap size is distinct from and typically smaller than the overall "
+    "Impala memory limit.");
+DEFINE_int32_hidden(local_catalog_cache_expiration_s, 60 * 60,
+    "If --use_local_catalog is enabled, configures the expiration time "
+    "of the catalog cache within each impalad. Even if the configured "
+    "cache capacity has not been reached, items are removed from the cache "
+    "if they have not been accessed in this amount of time.");
 
 DECLARE_int32(state_store_port);
 DECLARE_int32(num_threads_per_core);

http://git-wip-us.apache.org/repos/asf/impala/blob/07823211/be/src/util/backend-gflag-util.cc
----------------------------------------------------------------------
diff --git a/be/src/util/backend-gflag-util.cc b/be/src/util/backend-gflag-util.cc
index c7982bf..051d396 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -29,6 +29,8 @@ DECLARE_bool(load_auth_to_local_rules);
 DECLARE_bool(enable_stats_extrapolation);
 DECLARE_bool(enable_orc_scanner);
 DECLARE_bool(use_local_catalog);
+DECLARE_int32(local_catalog_cache_expiration_s);
+DECLARE_int32(local_catalog_cache_mb);
 DECLARE_int32(non_impala_java_vlog);
 DECLARE_int32(num_metadata_loading_threads);
 DECLARE_int32(max_hdfs_partitions_parallel_load);
@@ -63,6 +65,9 @@ Status GetThriftBackendGflags(JNIEnv* jni_env, jbyteArray* cfg_bytes) {
   cfg.__set_load_catalog_in_background(FLAGS_load_catalog_in_background);
   cfg.__set_enable_orc_scanner(FLAGS_enable_orc_scanner);
   cfg.__set_use_local_catalog(FLAGS_use_local_catalog);
+  cfg.__set_local_catalog_cache_mb(FLAGS_local_catalog_cache_mb);
+  cfg.__set_local_catalog_cache_expiration_s(
+    FLAGS_local_catalog_cache_expiration_s);
   cfg.__set_server_name(FLAGS_server_name);
   cfg.__set_sentry_config(FLAGS_sentry_config);
   cfg.__set_authorization_policy_provider_class(

http://git-wip-us.apache.org/repos/asf/impala/blob/07823211/common/thrift/BackendGflags.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/BackendGflags.thrift b/common/thrift/BackendGflags.thrift
index 6852d27..7ba6c7e 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -81,4 +81,8 @@ struct TBackendGflags {
   27: required bool use_local_catalog
 
   28: required bool disable_catalog_data_ops_debug_only
+
+  29: required i32 local_catalog_cache_mb
+
+  30: required i32 local_catalog_cache_expiration_s
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/07823211/fe/pom.xml
----------------------------------------------------------------------
diff --git a/fe/pom.xml b/fe/pom.xml
index c695a5b..896dce5 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -467,6 +467,12 @@ under the License.
     </dependency>
 
     <dependency>
+        <groupId>org.ehcache</groupId>
+        <artifactId>sizeof</artifactId>
+        <version>0.3.0</version>
+    </dependency>
+
+    <dependency>
       <groupId>com.googlecode.json-simple</groupId>
       <artifactId>json-simple</artifactId>
       <version>1.1.1</version>

http://git-wip-us.apache.org/repos/asf/impala/blob/07823211/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
index 4d2aa6b..5c1d820 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
@@ -17,6 +17,7 @@
 
 package org.apache.impala.catalog.local;
 
+import java.lang.management.ManagementFactory;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -38,6 +39,7 @@ import org.apache.impala.common.InternalException;
 import org.apache.impala.common.Pair;
 import org.apache.impala.common.Reference;
 import org.apache.impala.service.FeSupport;
+import org.apache.impala.thrift.TBackendGflags;
 import org.apache.impala.thrift.TCatalogInfoSelector;
 import org.apache.impala.thrift.TCatalogObject;
 import org.apache.impala.thrift.TCatalogObjectType;
@@ -55,15 +57,18 @@ import org.apache.impala.util.ListMap;
 import org.apache.thrift.TDeserializer;
 import org.apache.thrift.TException;
 import org.apache.thrift.TSerializer;
+import org.ehcache.sizeof.SizeOf;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheStats;
+import com.google.common.cache.Weigher;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -121,13 +126,34 @@ public class CatalogdMetaProvider implements MetaProvider {
   // to the "direct" provider for now and circumvent catalogd.
   private DirectMetaProvider directProvider_ = new DirectMetaProvider();
 
-  // TODO(todd): hard-coded TTL is not the final solution here. We should implement
-  // memory estimation for all cached objects, and evict based on a configurable
-  // memory pressure.
-  final Cache<Object,Object> cache_ = CacheBuilder.newBuilder()
-      .expireAfterAccess(1, TimeUnit.HOURS)
-      .recordStats()
-      .build();
+
+  final Cache<Object,Object> cache_;
+
+  public CatalogdMetaProvider(TBackendGflags flags) {
+    Preconditions.checkArgument(flags.isSetLocal_catalog_cache_expiration_s());
+    Preconditions.checkArgument(flags.isSetLocal_catalog_cache_mb());
+
+    long cacheSizeBytes;
+    if (flags.local_catalog_cache_mb < 0) {
+      long maxHeapBytes = ManagementFactory.getMemoryMXBean()
+          .getHeapMemoryUsage().getMax();
+      cacheSizeBytes = (long)(maxHeapBytes * 0.6);
+    } else {
+      cacheSizeBytes = flags.local_catalog_cache_mb * 1024 * 1024;
+    }
+    int expirationSecs = flags.local_catalog_cache_expiration_s;
+    LOG.info("Metadata cache configuration: capacity={} MB, expiration={} sec",
+        cacheSizeBytes/1024/1024, expirationSecs);
+
+    // TODO(todd) add end-to-end test cases which stress cache eviction (both time
+    // and size-triggered) and make sure results are still correct.
+    cache_ = CacheBuilder.newBuilder()
+        .maximumWeight(cacheSizeBytes)
+        .expireAfterAccess(expirationSecs, TimeUnit.SECONDS)
+        .weigher(new SizeOfWeigher())
+        .recordStats()
+        .build();
+  }
 
   public CacheStats getCacheStats() {
     return cache_.stats();
@@ -729,4 +755,30 @@ public class CatalogdMetaProvider implements MetaProvider {
       return super.equals(obj) && partId_ == other.partId_;
     }
   }
+
+  @VisibleForTesting
+  static class SizeOfWeigher implements Weigher<Object, Object> {
+    // Bypass flyweight objects like small boxed integers, Boolean.TRUE, enums, etc.
+    private static final boolean BYPASS_FLYWEIGHT = true;
+    // Cache the reflected sizes of classes seen.
+    private static final boolean CACHE_SIZES = true;
+
+    private static SizeOf SIZEOF = SizeOf.newInstance(BYPASS_FLYWEIGHT, CACHE_SIZES);
+
+    private static final int BYTES_PER_WORD = 8; // Assume 64-bit VM.
+    // Guava cache overhead based on:
+    // http://code-o-matic.blogspot.com/2012/02/updated-memory-cost-per-javaguava.html
+    private static final int OVERHEAD_PER_ENTRY =
+        12 * BYTES_PER_WORD + // base cost per entry
+        4 * BYTES_PER_WORD;  // for use of 'maximumSize()'
+
+    @Override
+    public int weigh(Object key, Object value) {
+      long size = SIZEOF.deepSizeOf(key, value) + OVERHEAD_PER_ENTRY;
+      if (size > Integer.MAX_VALUE) {
+        return Integer.MAX_VALUE;
+      }
+      return (int)size;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/07823211/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java
index 89f5345..c3c918e 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java
@@ -40,6 +40,7 @@ import org.apache.impala.catalog.Function.CompareMode;
 import org.apache.impala.catalog.HdfsCachePool;
 import org.apache.impala.catalog.PartitionNotFoundException;
 import org.apache.impala.catalog.PrunablePartition;
+import org.apache.impala.service.BackendConfig;
 import org.apache.impala.thrift.TCatalogObject;
 import org.apache.impala.thrift.TPartitionKeyValue;
 import org.apache.impala.thrift.TUniqueId;
@@ -71,7 +72,8 @@ public class LocalCatalog implements FeCatalog {
   private String nullPartitionKeyValue_;
   private final String defaultKuduMasterHosts_;
 
-  private static MetaProvider PROVIDER = new CatalogdMetaProvider();
+  private static MetaProvider PROVIDER = new CatalogdMetaProvider(
+      BackendConfig.INSTANCE.getBackendCfg());
 
   public static LocalCatalog create(String defaultKuduMasterHosts) {
     return new LocalCatalog(PROVIDER, defaultKuduMasterHosts);

http://git-wip-us.apache.org/repos/asf/impala/blob/07823211/fe/src/test/java/org/apache/impala/catalog/local/CatalogdMetaProviderTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/catalog/local/CatalogdMetaProviderTest.java b/fe/src/test/java/org/apache/impala/catalog/local/CatalogdMetaProviderTest.java
index f7c3abd..d5da6ad 100644
--- a/fe/src/test/java/org/apache/impala/catalog/local/CatalogdMetaProviderTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/local/CatalogdMetaProviderTest.java
@@ -17,18 +17,20 @@
 
 package org.apache.impala.catalog.local;
 
-import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.*;
 
 import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.impala.catalog.local.CatalogdMetaProvider.SizeOfWeigher;
 import org.apache.impala.catalog.local.MetaProvider.PartitionMetadata;
 import org.apache.impala.catalog.local.MetaProvider.PartitionRef;
 import org.apache.impala.catalog.local.MetaProvider.TableMetaRef;
 import org.apache.impala.common.Pair;
 import org.apache.impala.service.FeSupport;
+import org.apache.impala.thrift.TBackendGflags;
 import org.apache.impala.thrift.TNetworkAddress;
 import org.apache.impala.util.ListMap;
 import org.junit.Test;
@@ -53,7 +55,11 @@ public class CatalogdMetaProviderTest {
   }
 
   public CatalogdMetaProviderTest() throws Exception {
-    provider_ = new CatalogdMetaProvider();
+    // Set sufficient expiration/capacity for the test to not evict.
+    TBackendGflags flags = new TBackendGflags();
+    flags.setLocal_catalog_cache_expiration_s(3600);
+    flags.setLocal_catalog_cache_mb(100);
+    provider_ = new CatalogdMetaProvider(flags);
     Pair<Table, TableMetaRef> tablePair = provider_.loadTable("functional", "alltypes");
     tableRef_ = tablePair.second;
     prevStats_ = provider_.getCacheStats();
@@ -138,4 +144,19 @@ public class CatalogdMetaProviderTest {
     assertEquals(2, stats.hitCount());
     assertEquals(0, stats.missCount());
   }
+
+  @Test
+  public void testWeights() throws Exception {
+    List<PartitionRef> refs = provider_.loadPartitionList(tableRef_);
+    ListMap<TNetworkAddress> hostIndex = new ListMap<>();
+    provider_.loadPartitionsByRefs(tableRef_, /* ignored */null, hostIndex , refs);
+
+    // Unfortunately Guava doesn't provide a statistic on the total weight of cached
+    // elements. So, we'll just instantiate the weigher directly and sanity check
+    // the size loosely.
+    SizeOfWeigher weigher = new SizeOfWeigher();
+    assertTrue(weigher.weigh(refs, null) > 3000);
+    assertTrue(weigher.weigh(refs, null) < 4000);
+  }
+
 }
\ No newline at end of file


[6/7] impala git commit: IMPALA-7412: width_bucket() function overflows too easily

Posted by mi...@apache.org.
IMPALA-7412: width_bucket() function overflows too easily

Running the tests of https://gerrit.cloudera.org/#/c/10859/
it turned out that the width_bucket() function overflows
very often.

A common problem is that the function tries to cast the
'num_buckets' parameter to the decimal determined by the
Frontend. When the Frontend determined the precision and
scale of this decimal it only considered the decimal
arguments and ignored everything else. Therefore the
determined precision and scale is often not suitable for
the 'num_buckets' parameter.

WidthBucketImpl() has three decimal arguments, all of them
have the same byte size, precision, and scale. So it is
possible to interpret them as plain integers and still
calculate the proper bucket.

I included the python test cases from IMPALA-7202 developed
by Taras Bobrovytsky.
I also extended the backend tests with new test cases.

For performance test I used the following query:

SELECT sum(width_bucket(cast(l_orderkey AS DECIMAL(30, 10)),
           0, 5500000, 1000000))
FROM tpch_parquet.lineitem;

The new implementation executed it in ~0.3 seconds.
The old implementation executed it in ~0.8 seconds.

Change-Id: I68262698144029ef7f54e027e586eaf105f36ab3
Reviewed-on: http://gerrit.cloudera.org:8080/11282
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/632e0e2e
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/632e0e2e
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/632e0e2e

Branch: refs/heads/master
Commit: 632e0e2e36463fc8dce7a50ddf2defe8dae25def
Parents: e27954a
Author: Zoltan Borok-Nagy <bo...@cloudera.com>
Authored: Wed Aug 8 17:17:37 2018 +0200
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Wed Aug 22 18:26:23 2018 +0000

----------------------------------------------------------------------
 be/src/exprs/expr-test.cc             |  51 +++++----
 be/src/exprs/math-functions-ir.cc     | 159 +++++++++++------------------
 be/src/util/bit-util.h                |  53 +++++++++-
 tests/query_test/test_decimal_fuzz.py |  63 +++++++++++-
 4 files changed, 201 insertions(+), 125 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/632e0e2e/be/src/exprs/expr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/expr-test.cc b/be/src/exprs/expr-test.cc
index d0e3ff3..49d0ae2 100644
--- a/be/src/exprs/expr-test.cc
+++ b/be/src/exprs/expr-test.cc
@@ -5348,16 +5348,11 @@ TEST_F(ExprTest, MathFunctions) {
   // Test when min > max
   TestErrorString("width_bucket(22, 50, 5, 4)",
       "UDF ERROR: Lower bound cannot be greater than or equal to the upper bound\n");
-  // Test max - min will overflow during width_bucket evaluation
-  TestErrorString("width_bucket(11, -9, 99999999999999999999999999999999999999, 4000)",
-      "UDF ERROR: Overflow while evaluating the difference between min_range: -9 and "
-      "max_range: 99999999999999999999999999999999999999\n");
-  // If expr - min overflows during width_bucket evaluation, max - min will also
-  // overflow. Since we evaluate max - min before evaluating expr - min, we will never
-  // end up overflowing expr - min.
-  TestErrorString("width_bucket(1, -99999999999999999999999999999999999999, 9, 40)",
-      "UDF ERROR: Overflow while evaluating the difference between min_range: "
-      "-99999999999999999999999999999999999999 and max_range: 9\n");
+  // IMPALA-7412: Test max - min should not overflow anymore
+  TestValue("width_bucket(11, -9, 99999999999999999999999999999999999999, 4000)",
+      TYPE_BIGINT, 1);
+  TestValue("width_bucket(1, -99999999999999999999999999999999999999, 9, 40)",
+      TYPE_BIGINT, 40);
   // Test when dist_from_min * buckets cannot be stored in a int128_t (overflows)
   // and needs to be stored in a int256_t
   TestValue("width_bucket(8000000000000000000000000000000000000,"
@@ -5380,16 +5375,34 @@ TEST_F(ExprTest, MathFunctions) {
   // max and min value that would require int256_t for evalation
   TestValue("width_bucket(10000000000000000000000000000000000000, 1,"
             "99999999999999999999999999999999999999, 15)", TYPE_BIGINT, 2);
-  // IMPALA-7242/IMPALA-7243: check for overflow when converting IntVal to DecimalValue
-  TestErrorString("width_bucket(cast(-0.10 as decimal(37,30)), cast(-0.36028797018963968 "
+  // IMPALA-7412: These should not overflow anymore
+  TestValue("width_bucket(cast(-0.10 as decimal(37,30)), cast(-0.36028797018963968 "
       "as decimal(25,25)), cast(9151517.4969773200562764155787276999832"
-      "as decimal(38,31)), 1328180220)",
-      "UDF ERROR: Overflow while representing the num_buckets:1328180220 as a "
-      "DecimalVal\n");
-  TestErrorString("width_bucket(cast(9 as decimal(10,7)), cast(-60000 as decimal(11,6)), "
-      "cast(10 as decimal(7,5)), 249895273);",
-      "UDF ERROR: Overflow while representing the num_buckets:249895273 as a "
-      "DecimalVal\n");
+      "as decimal(38,31)), 1328180220)", TYPE_BIGINT, 38);
+  TestValue("width_bucket(cast(9 as decimal(10,7)), cast(-60000 as decimal(11,6)), "
+      "cast(10 as decimal(7,5)), 249895273);", TYPE_BIGINT, 249891109);
+  // max - min and expr - min needs bigger type than the underlying type of
+  // the deduced decimal. The calculation must succeed by using a bigger type.
+  TestValue("width_bucket(cast(0.9999 as decimal(35,35)), cast(-0.705408425140 as "
+      "decimal(23,23)), cast(0.999999999999999999999 as decimal(38,38)), 699997927)",
+      TYPE_BIGINT, 699956882ll);
+  // max - min needs bigger type, but expr - min and (expr - min) * num_buckets fits
+  // into deduced decimal
+  TestValue("width_bucket(cast(-0.7054084251 as decimal(23,23)), cast(-0.705408425140 "
+      "as decimal(23,23)), cast(0.999999999999999999999 as decimal(38,38)), 10)",
+      TYPE_BIGINT, 1);
+  // max - min fits into deduced decimal, (max - min) * num_buckets needs bigger type,
+  // but expr == min
+  TestValue("width_bucket(cast(1 as decimal(9,0)), cast(1 as decimal(9,0)), "
+      "cast(100000000 as decimal(9,0)), 100)", TYPE_BIGINT, 1);
+  // max - min fits into deduced decimal, (max - min) * num_buckets needs bigger type,
+  // but (expr - min) * num_buckets fits
+  TestValue("width_bucket(cast(2 as decimal(9,0)), cast(1 as decimal(9,0)), "
+      "cast(100000000 as decimal(9,0)), 100)", TYPE_BIGINT, 1);
+  // max - min fits into deduced decimal, but (expr - min) * num_buckets needs bigger type
+  TestValue("width_bucket(cast(100000000 as decimal(9,0)), cast(1 as decimal(9,0)), "
+      "cast(100000001 as decimal(9,0)), 100)", TYPE_BIGINT, 100);
+
   // Run twice to test deterministic behavior.
   for (uint32_t seed : {0, 1234}) {
     stringstream rand, random;

http://git-wip-us.apache.org/repos/asf/impala/blob/632e0e2e/be/src/exprs/math-functions-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/math-functions-ir.cc b/be/src/exprs/math-functions-ir.cc
index 28c9e1e..efe429e 100644
--- a/be/src/exprs/math-functions-ir.cc
+++ b/be/src/exprs/math-functions-ir.cc
@@ -457,57 +457,32 @@ DoubleVal MathFunctions::FmodDouble(FunctionContext* ctx, const DoubleVal& a,
 
 // The bucket_number is evaluated using the following formula:
 //
-//   bucket_number = dist_from_min * num_buckets / range_size
+//   bucket_number = dist_from_min * num_buckets / range_size + 1
 //      where -
 //        dist_from_min = expr - min_range
 //        range_size = max_range - min_range
-//        buckets = number of buckets
+//        num_buckets = number of buckets
 //
-// The results of the above subtractions are stored in Decimal16Value to avoid an overflow
-// in the following cases:
-//   case 1:
-//      T1 is decimal8Value
-//         When evaluating this particular expression
-//            dist_from_min = expr - min_range
-//         If expr is a max positive value which can be represented in decimal8Value and
-//         min_range < 0 the resulting dist_from_min can be represented in decimal16Val
-//         without overflowing
-//   case 2:
-//      T1 is decimal16Value
-//         Subtracting a negative min_range from expr can result in an overflow in which
-//         case the function errors out. There is no decimal32Val to handle this. So
-//         storing it in decimal16Value.
-//   case 3:
-//      T1 is decimal4Value
-//         We can store the results in a decimal8Value. But this change hard codes to
-//         store the result in decimal16Val for now to be compatible with the other
-//         decimal*Vals
+// Since expr, min_range, and max_range are all decimals with the same
+// byte size, precision, and scale we can interpret them as plain integers
+// and still calculate the proper bucket.
 //
-// The result of this multiplication dist_from_min * buckets is stored as a int256_t
-// if storing it in a int128_t would overflow.
+// There are some possibilities of overflowing during the calculation:
+// range_size = max_range - min_range
+// dist_from_min = expr - min_range
+// dist_from_min * num_buckets
 //
-// To perform the division, range_size is scaled up. The scale and precision of the
-// numerator and denominator are adjusted to be the same. This avoids the need to compute
-// the resulting scale and precision.
+// For all the above cases we use a bigger integer type provided by the
+// BitUtil::DoubleWidth<> metafunction.
 template <class  T1>
 BigIntVal MathFunctions::WidthBucketImpl(FunctionContext* ctx,
     const T1& expr, const T1& min_range,
     const T1& max_range, const IntVal& num_buckets) {
-  // FE casts expr, min_range and max_range to be of the scale and precision
-  int input_scale = ctx->impl()->GetConstFnAttr(FunctionContextImpl::ARG_TYPE_SCALE, 1);
-  int input_precision = ctx->impl()->GetConstFnAttr(
-      FunctionContextImpl::ARG_TYPE_PRECISION, 1);
-
-  bool overflow = false;
-  Decimal16Value range_size = max_range.template Subtract<int128_t>(input_scale,
-      min_range, input_scale, input_precision, input_scale, false, &overflow);
-  if (UNLIKELY(overflow)) {
-    ostringstream error_msg;
-    error_msg << "Overflow while evaluating the difference between min_range: " <<
-        min_range.value() << " and max_range: " << max_range.value();
-    ctx->SetError(error_msg.str().c_str());
-    return BigIntVal::null();
-  }
+  auto expr_val = expr.value();
+  using ActualType = decltype(expr_val);
+  auto min_range_val = min_range.value();
+  auto max_range_val = max_range.value();
+  auto num_buckets_val = static_cast<ActualType>(num_buckets.val);
 
   if (UNLIKELY(num_buckets.val <= 0)) {
     ostringstream error_msg;
@@ -516,73 +491,61 @@ BigIntVal MathFunctions::WidthBucketImpl(FunctionContext* ctx,
     return BigIntVal::null();
   }
 
-  if (UNLIKELY(min_range >= max_range)) {
+  if (UNLIKELY(min_range_val >= max_range_val)) {
     ctx->SetError("Lower bound cannot be greater than or equal to the upper bound");
     return BigIntVal::null();
   }
 
-  if (UNLIKELY(expr < min_range)) return 0;
-
-  if (UNLIKELY(expr >= max_range)) {
-    BigIntVal result;
-    result.val = num_buckets.val;
-    ++result.val;
-    return result;
+  if (expr_val < min_range_val) return 0;
+  if (expr_val >= max_range_val) {
+    return BigIntVal(static_cast<int64_t>(num_buckets.val) + 1);
   }
 
-  Decimal16Value dist_from_min = expr.template Subtract<int128_t>(input_scale,
-      min_range, input_scale, input_precision, input_scale, false, &overflow);
-  DCHECK_EQ(overflow, false);
-
-  Decimal16Value buckets = Decimal16Value::FromInt(input_precision, input_scale,
-      num_buckets.val, &overflow);
-
-  if (UNLIKELY(overflow)) {
-    stringstream error_msg;
-    error_msg << "Overflow while representing the num_buckets:" << num_buckets.val
-        << " as a DecimalVal";
-    ctx->SetError(error_msg.str().c_str());
-    return BigIntVal::null();
-  }
-  bool needs_int256 = false;
-  // Check if dist_from_min * buckets would overflow and if there is a need to
-  // store the intermediate results in int256_t to avoid an overflows
-  // Check if scaling up range size overflows and if there is a need to store the
-  // intermediate results in int256_t to avoid the overflow
-  if (UNLIKELY(BitUtil::CountLeadingZeros(abs(buckets.value())) +
-      BitUtil::CountLeadingZeros(abs(dist_from_min.value())) <= 128 ||
-      BitUtil::CountLeadingZeros(range_size.value()) +
-      detail::MinLeadingZerosAfterScaling(BitUtil::CountLeadingZeros(
-      range_size.value()), input_scale) <= 128)) {
-    needs_int256 = true;
+  bool bigger_type_needed = false;
+  // It is likely that this if stmt can be evaluated during codegen because
+  // 'max_range' and 'min_range' are almost certainly constant expressions:
+  if (max_range_val >= 0 && min_range_val < 0) {
+    if (static_cast<UnsignedType<ActualType>>(max_range_val) +
+        static_cast<UnsignedType<ActualType>>(abs(min_range_val)) >=
+        static_cast<UnsignedType<ActualType>>(BitUtil::Max<ActualType>())) {
+      bigger_type_needed = true;
+    }
   }
 
-  int128_t result;
-  if (needs_int256) {
-    // resulting scale should be 2 * input_scale as per multiplication rules
-    int256_t x = ConvertToInt256(buckets.value()) * ConvertToInt256(
-        dist_from_min.value());
-
-    // Since "range_size" and "x" have different scales, the divison would require
-    // evaluating the resulting scale. To avoid this we scale up the denominator to
-    // match the scale of the numerator.
-    int256_t y = DecimalUtil::MultiplyByScale<int256_t>(ConvertToInt256(
-        range_size.value()), input_scale, false);
-    result = ConvertToInt128(x / y, DecimalUtil::MAX_UNSCALED_DECIMAL16,
-        &overflow);
-    DCHECK_EQ(overflow, false);
+  auto MultiplicationOverflows = [](ActualType lhs, ActualType rhs) {
+    DCHECK(lhs > 0 && rhs > 0);
+    using ActualType = decltype(lhs);
+    return BitUtil::CountLeadingZeros(lhs) + BitUtil::CountLeadingZeros(rhs) <=
+        BitUtil::UnsignedWidth<ActualType>() + 1;
+  };
+
+  // It is likely that this can be evaluated during codegen:
+  bool multiplication_can_overflow = bigger_type_needed ||  MultiplicationOverflows(
+      max_range_val - min_range_val, num_buckets_val);
+  // 'expr_val' is not likely to be a constant expression, so this almost certainly
+  // needs runtime evaluation if 'bigger_type_needed' is false and
+  // 'multiplication_can_overflow' is true:
+  bigger_type_needed = bigger_type_needed || (
+      multiplication_can_overflow &&
+      expr_val != min_range_val &&
+      MultiplicationOverflows(expr_val - min_range_val, num_buckets_val));
+
+  auto BucketFunc = [](auto element, auto min_rng, auto max_rng, auto buckets) {
+    auto range_size = max_rng - min_rng;
+    auto dist_from_min = element - min_rng;
+    auto ret = dist_from_min * buckets / range_size;
+    return BigIntVal(static_cast<int64_t>(ret) + 1);
+  };
+
+  if (bigger_type_needed) {
+    using BiggerType = typename DoubleWidth<ActualType>::type;
+
+    return BucketFunc(static_cast<BiggerType>(expr_val),
+        static_cast<BiggerType>(min_range_val), static_cast<BiggerType>(max_range_val),
+        static_cast<BiggerType>(num_buckets.val));
   } else {
-    // resulting scale should be 2 * input_scale as per multiplication rules
-    int128_t x = buckets.value() * dist_from_min.value();
-
-    // Since "range_size" and "x" have different scales, the divison would require
-    // evaluating the resulting scale. To avoid this we scale up the denominator to
-    // match the scale of the numerator.
-    int128_t y = DecimalUtil::MultiplyByScale<int128_t>(range_size.value(),
-        input_scale, false);
-    result = x / y; // NOLINT: clang-tidy thinks y may equal zero here.
+    return BucketFunc(expr_val, min_range_val, max_range_val, num_buckets.val);
   }
-  return (BigIntVal(abs(result) + 1));
 }
 
 BigIntVal MathFunctions::WidthBucket(FunctionContext* ctx, const DecimalVal& expr,

http://git-wip-us.apache.org/repos/asf/impala/blob/632e0e2e/be/src/util/bit-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/bit-util.h b/be/src/util/bit-util.h
index 8a65509..d07dd9d 100644
--- a/be/src/util/bit-util.h
+++ b/be/src/util/bit-util.h
@@ -28,8 +28,7 @@
 #include <climits>
 #include <limits>
 #include <typeinfo>
-
-#include <boost/type_traits/make_unsigned.hpp>
+#include <type_traits>
 
 #include "common/compiler-util.h"
 #include "gutil/bits.h"
@@ -39,7 +38,40 @@
 
 namespace impala {
 
-using boost::make_unsigned;
+/// Nested 'type' corresponds to the unsigned version of T.
+template <typename T>
+struct MakeUnsigned {
+  using type = std::make_unsigned_t<T>;
+};
+
+template <>
+struct MakeUnsigned<int128_t> {
+  using type = __uint128_t;
+};
+
+template <typename T>
+using UnsignedType = typename MakeUnsigned<T>::type;
+
+// Doubles the width of integer types (e.g. int32_t -> int64_t).
+// Currently only works with a few signed types.
+// Feel free to extend it to other types as well.
+template <typename T>
+struct DoubleWidth {};
+
+template <>
+struct DoubleWidth<int32_t> {
+  using type = int64_t;
+};
+
+template <>
+struct DoubleWidth<int64_t> {
+  using type = int128_t;
+};
+
+template <>
+struct DoubleWidth<int128_t> {
+  using type = int256_t;
+};
 
 /// Utility class to do standard bit tricks
 /// TODO: is this in boost or something else like that?
@@ -59,6 +91,17 @@ class BitUtil {
         std::is_same<CVR_REMOVED, __int128>::value ? 127 : -1;
   }
 
+  /// Returns the max value that can be represented in T.
+  template<typename T, typename CVR_REMOVED = typename std::decay<T>::type,
+      typename std::enable_if<std::is_integral<CVR_REMOVED> {}||
+                              std::is_same<CVR_REMOVED, __int128> {}, int>::type = 0>
+  constexpr static inline CVR_REMOVED Max() {
+    return std::is_integral<CVR_REMOVED>::value ?
+        std::numeric_limits<CVR_REMOVED>::max() :
+        std::is_same<CVR_REMOVED, __int128>::value ?
+            static_cast<UnsignedType<CVR_REMOVED>>(-1) / 2 : -1;
+  }
+
   /// Return an integer signifying the sign of the value, returning +1 for
   /// positive integers (and zero), -1 for negative integers.
   /// The extra shift is to silence GCC warnings about full width shift on
@@ -168,7 +211,7 @@ class BitUtil {
   template<typename T>
   static inline int PopcountSigned(T v) {
     // Converting to same-width unsigned then extending preserves the bit pattern.
-    return BitUtil::Popcount(static_cast<typename make_unsigned<T>::type>(v));
+    return BitUtil::Popcount(static_cast<UnsignedType<T>>(v));
   }
 
   /// Returns the 'num_bits' least-significant bits of 'v'.
@@ -249,7 +292,7 @@ class BitUtil {
   template <typename T>
   constexpr static T ShiftRightLogical(T v, int shift) {
     // Conversion to unsigned ensures most significant bits always filled with 0's
-    return static_cast<typename make_unsigned<T>::type>(v) >> shift;
+    return static_cast<UnsignedType<T>>(v) >> shift;
   }
 
   /// Get an specific bit of a numeric type

http://git-wip-us.apache.org/repos/asf/impala/blob/632e0e2e/tests/query_test/test_decimal_fuzz.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_decimal_fuzz.py b/tests/query_test/test_decimal_fuzz.py
index a129e33..1433ec3 100644
--- a/tests/query_test/test_decimal_fuzz.py
+++ b/tests/query_test/test_decimal_fuzz.py
@@ -30,6 +30,9 @@ from tests.common.test_vector import ImpalaTestDimension, ImpalaTestMatrix
 
 class TestDecimalFuzz(ImpalaTestSuite):
 
+  # Impala's max precision for decimals is 38, so we should have the same in the tests
+  decimal.getcontext().prec = 38
+
   @classmethod
   def get_workload(cls):
     return 'functional-query'
@@ -200,7 +203,7 @@ class TestDecimalFuzz(ImpalaTestSuite):
         return True
     return False
 
-  def execute_one(self):
+  def execute_one_decimal_op(self):
     '''Executes a single query and compares the result to a result that we computed in
     Python.'''
     op = random.choice(['+', '-', '*', '/', '%'])
@@ -243,6 +246,60 @@ class TestDecimalFuzz(ImpalaTestSuite):
         expected_result = None
       assert self.result_equals(expected_result, result)
 
-  def test_fuzz(self, vector):
+  def test_decimal_ops(self, vector):
+    for _ in xrange(self.iterations):
+      self.execute_one_decimal_op()
+
+  def width_bucket(self, val, min_range, max_range, num_buckets):
+    # Multiplying the values by 10**40 guarantees that the numbers can be converted
+    # to int without losing information.
+    val_int = int(decimal.Decimal(val) * 10**40)
+    min_range_int = int(decimal.Decimal(min_range) * 10**40)
+    max_range_int = int(decimal.Decimal(max_range) * 10**40)
+
+    if min_range_int >= max_range_int:
+      return None
+    if val_int < min_range_int:
+      return 0
+    if val_int > max_range_int:
+      return num_buckets + 1
+
+    range_size = max_range_int - min_range_int
+    dist_from_min = val_int - min_range_int
+    return (num_buckets * dist_from_min) / range_size + 1
+
+  def execute_one_width_bucket(self):
+    val, val_prec, val_scale = self.get_decimal()
+    min_range, min_range_prec, min_range_scale = self.get_decimal()
+    max_range, max_range_prec, max_range_scale = self.get_decimal()
+    num_buckets = random.randint(1, 2147483647)
+
+    query = ('select width_bucket('
+        'cast({val} as decimal({val_prec},{val_scale})), '
+        'cast({min_range} as decimal({min_range_prec},{min_range_scale})), '
+        'cast({max_range} as decimal({max_range_prec},{max_range_scale})), '
+        '{num_buckets})')
+
+    query = query.format(val=val, val_prec=val_prec, val_scale=val_scale,
+        min_range=min_range, min_range_prec=min_range_prec,
+        min_range_scale=min_range_scale,
+        max_range=max_range, max_range_prec=max_range_prec,
+        max_range_scale=max_range_scale,
+        num_buckets=num_buckets)
+
+    expected_result = self.width_bucket(val, min_range, max_range, num_buckets)
+    if not expected_result:
+      return
+
+    try:
+      result = self.execute_scalar(query, query_options={'decimal_v2': 'true'})
+      assert int(result) == expected_result
+    except ImpalaBeeswaxException as e:
+      if "You need to wrap the arguments in a CAST" not in str(e):
+        # Sometimes the decimal inputs are incompatible with each other, so it's ok
+        # to ignore this error.
+        raise e
+
+  def test_width_bucket(self, vector):
     for _ in xrange(self.iterations):
-      self.execute_one()
+      self.execute_one_width_bucket()