You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ar...@apache.org on 2019/05/29 23:28:22 UTC

[impala] branch 2.x updated (2413c6c -> ac13cb6)

This is an automated email from the ASF dual-hosted git repository.

arodoni pushed a change to branch 2.x
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from 2413c6c  IMPALA-7186: [DOCS] Documented the KUDU_READ_MODE query option
     new 537a464  cleanup: extract RowBatchQueue into its own file
     new 9f372f0  IMPALA-7294. TABLESAMPLE should not allocate array based on total table file count
     new 10a65ef  IMPALA-7014: Disable stacktrace symbolisation by default
     new ac13cb6  IMPALA-7059: Inconsistent privilege between DESCRIBE and DESCRIBE DATABASE

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/common/init.cc                              |  2 +
 be/src/exec/blocking-join-node.h                   |  1 +
 be/src/exec/exec-node.cc                           | 31 -----------
 be/src/exec/exec-node.h                            | 35 ------------
 be/src/exec/hdfs-scan-node.cc                      |  1 +
 be/src/exec/kudu-scan-node.cc                      |  1 +
 be/src/exec/scan-node.cc                           |  1 +
 be/src/exec/scan-node.h                            |  1 +
 be/src/exec/scanner-context.cc                     |  1 +
 be/src/runtime/CMakeLists.txt                      |  1 +
 be/src/runtime/data-stream-recvr.cc                |  1 +
 be/src/runtime/krpc-data-stream-recvr.cc           |  1 +
 .../{string-value.cc => row-batch-queue.cc}        | 34 ++++++++---
 be/src/runtime/row-batch-queue.h                   | 65 ++++++++++++++++++++++
 .../java/org/apache/impala/catalog/HdfsTable.java  | 22 +++++---
 .../java/org/apache/impala/service/Frontend.java   |  5 +-
 .../impala/analysis/AuthorizationStmtTest.java     | 55 ++++++++----------
 tests/query_test/test_tablesample.py               | 20 +++++--
 18 files changed, 158 insertions(+), 120 deletions(-)
 copy be/src/runtime/{string-value.cc => row-batch-queue.cc} (52%)
 create mode 100644 be/src/runtime/row-batch-queue.h


[impala] 03/04: IMPALA-7014: Disable stacktrace symbolisation by default

Posted by ar...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

arodoni pushed a commit to branch 2.x
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 10a65efd5d14b3368718bc9ff394833921d91f9b
Author: Zoram Thanga <zo...@cloudera.com>
AuthorDate: Tue Jul 17 15:01:39 2018 -0700

    IMPALA-7014: Disable stacktrace symbolisation by default
    
    Stacktrace symbolization has been shown to be 2500x slower
    compared to just printing the un-symbolized one.
    
    This has burned us a few times now, so let's disable it by
    default.
    
    Change-Id: If3af209890ccc242beb742145c63eb6836d4bfbb
    Reviewed-on: http://gerrit.cloudera.org:8080/10964
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/common/init.cc | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/be/src/common/init.cc b/be/src/common/init.cc
index c763c91..2a9f14f 100644
--- a/be/src/common/init.cc
+++ b/be/src/common/init.cc
@@ -67,6 +67,7 @@ DECLARE_int32(max_log_files);
 DECLARE_int32(max_minidumps);
 DECLARE_string(redaction_rules_file);
 DECLARE_string(reserved_words_version);
+DECLARE_bool(symbolize_stacktrace);
 
 DEFINE_int32(max_audit_event_log_files, 0, "Maximum number of audit event log files "
     "to retain. The most recent audit event log files are retained. If set to 0, "
@@ -187,6 +188,7 @@ void impala::InitCommonRuntime(int argc, char** argv, bool init_jvm,
   // Set the default hostname. The user can override this with the hostname flag.
   ABORT_IF_ERROR(GetHostname(&FLAGS_hostname));
 
+  FLAGS_symbolize_stacktrace = false;
   google::SetVersionString(impala::GetBuildVersion());
   google::ParseCommandLineFlags(&argc, &argv, true);
   if (!FLAGS_redaction_rules_file.empty()) {


[impala] 04/04: IMPALA-7059: Inconsistent privilege between DESCRIBE and DESCRIBE DATABASE

Posted by ar...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

arodoni pushed a commit to branch 2.x
in repository https://gitbox.apache.org/repos/asf/impala.git

commit ac13cb667edff8d9b91d8bf48940dbe5b2eb19db
Author: Fredy Wijaya <fw...@cloudera.com>
AuthorDate: Wed Jul 11 11:18:05 2018 -0700

    IMPALA-7059: Inconsistent privilege between DESCRIBE and DESCRIBE DATABASE
    
    In DESCRIBE DATABASE, having VIEW_METADATA privilege allows seeing the
    metadata information on the target database. Similarly, other SQL show
    commands require VIEW_METADATA privilege on the target database/table.
    In the prior code, DESCRIBE requires SELECT privilege on the target table
    and is inconsistent with the rest of other SQL metadata commands. The
    patch fixes the inconsistency by requiring DESCRIBE to use VIEW_METADATA
    privilege.
    
    Testing:
    - Updated authorization tests
    - Ran all FE tests
    
    Change-Id: I37d1610a922741a6c95059c3beb7d04eb507783f
    Reviewed-on: http://gerrit.cloudera.org:8080/10923
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../java/org/apache/impala/service/Frontend.java   |  5 +-
 .../impala/analysis/AuthorizationStmtTest.java     | 55 +++++++++-------------
 2 files changed, 26 insertions(+), 34 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index a363458..ac65171 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -813,7 +813,8 @@ public class Frontend {
     if (authzConfig_.isEnabled()) {
       // First run a table check
       PrivilegeRequest privilegeRequest = new PrivilegeRequestBuilder()
-          .allOf(Privilege.SELECT).onTable(table.getDb().getName(), table.getName())
+          .allOf(Privilege.VIEW_METADATA).onTable(table.getDb().getName(),
+              table.getName())
           .toRequest();
       if (!authzChecker_.get().hasAccess(user, privilegeRequest)) {
         // Filter out columns that the user is not authorized to see.
@@ -821,7 +822,7 @@ public class Frontend {
         for (Column col: table.getColumnsInHiveOrder()) {
           String colName = col.getName();
           privilegeRequest = new PrivilegeRequestBuilder()
-              .allOf(Privilege.SELECT)
+              .allOf(Privilege.VIEW_METADATA)
               .onColumn(table.getDb().getName(), table.getName(), colName)
               .toRequest();
           if (authzChecker_.get().hasAccess(user, privilegeRequest)) {
diff --git a/fe/src/test/java/org/apache/impala/analysis/AuthorizationStmtTest.java b/fe/src/test/java/org/apache/impala/analysis/AuthorizationStmtTest.java
index 52e0952..df46899 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AuthorizationStmtTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AuthorizationStmtTest.java
@@ -1041,20 +1041,19 @@ public class AuthorizationStmtTest extends FrontendTestBase {
     TTableName tableName = new TTableName("functional", "alltypes");
     TDescribeOutputStyle style = TDescribeOutputStyle.MINIMAL;
     authzTest = authorize("describe functional.alltypes");
-    for (TPrivilegeLevel privilege: new TPrivilegeLevel[]{
-        TPrivilegeLevel.ALL, TPrivilegeLevel.SELECT}) {
+    for (TPrivilegeLevel privilege: viewMetadataPrivileges()) {
       authzTest.okDescribe(tableName, style, ALLTYPES_COLUMNS, null, onServer(privilege))
           .okDescribe(tableName, style, ALLTYPES_COLUMNS, null, onDatabase("functional",
               privilege))
           .okDescribe(tableName, style, ALLTYPES_COLUMNS, null, onTable("functional",
               "alltypes", privilege));
     }
-    authzTest.okDescribe(tableName, style, null, ALLTYPES_COLUMNS, onServer(allExcept(
-        TPrivilegeLevel.ALL, TPrivilegeLevel.SELECT)))
+    authzTest.okDescribe(tableName, style, null, ALLTYPES_COLUMNS, onServer(
+        allExcept(viewMetadataPrivileges())))
         .okDescribe(tableName, style, null, ALLTYPES_COLUMNS, onDatabase("functional",
-            allExcept(TPrivilegeLevel.ALL, TPrivilegeLevel.SELECT)))
+            allExcept(viewMetadataPrivileges())))
         .okDescribe(tableName, style, null, ALLTYPES_COLUMNS, onTable("functional",
-            "alltypes", allExcept(TPrivilegeLevel.ALL, TPrivilegeLevel.SELECT)))
+            "alltypes", allExcept(viewMetadataPrivileges())))
         // In this test, since we only have column level privileges on "id", then
         // only the "id" column should show and the others should not.
         .okDescribe(tableName, style, new String[]{"id"}, ALLTYPES_COLUMNS_WITHOUT_ID,
@@ -1064,26 +1063,24 @@ public class AuthorizationStmtTest extends FrontendTestBase {
     // Describe table extended.
     tableName = new TTableName("functional", "alltypes");
     style = TDescribeOutputStyle.EXTENDED;
-    String[] locationString = new String[]{"Location:"};
     String[] checkStrings = (String[]) ArrayUtils.addAll(ALLTYPES_COLUMNS,
-        locationString);
+        new String[]{"Location:"});
     authzTest = authorize("describe functional.alltypes");
-    for (TPrivilegeLevel privilege: new TPrivilegeLevel[]{
-        TPrivilegeLevel.ALL, TPrivilegeLevel.SELECT}) {
+    for (TPrivilegeLevel privilege: viewMetadataPrivileges()) {
       authzTest.okDescribe(tableName, style, checkStrings, null, onServer(privilege))
           .okDescribe(tableName, style, checkStrings, null, onDatabase("functional",
               privilege))
           .okDescribe(tableName, style, checkStrings, null, onTable("functional",
               "alltypes", privilege));
     }
-    authzTest.okDescribe(tableName, style, locationString, ALLTYPES_COLUMNS,
-        onServer(allExcept(TPrivilegeLevel.ALL, TPrivilegeLevel.SELECT)))
-        .okDescribe(tableName, style, locationString, ALLTYPES_COLUMNS,
-            onDatabase("functional", allExcept(TPrivilegeLevel.ALL,
-            TPrivilegeLevel.SELECT)))
-        .okDescribe(tableName, style, locationString, ALLTYPES_COLUMNS,
-            onTable("functional", "alltypes", allExcept(TPrivilegeLevel.ALL,
-            TPrivilegeLevel.SELECT)))
+    // Describe table without VIEW_METADATA privilege should not show all columns and
+    // location.
+    authzTest.okDescribe(tableName, style, null, ALLTYPES_COLUMNS,
+            onServer(allExcept(viewMetadataPrivileges())))
+        .okDescribe(tableName, style, null, ALLTYPES_COLUMNS,
+            onDatabase("functional", allExcept(viewMetadataPrivileges())))
+        .okDescribe(tableName, style, null, ALLTYPES_COLUMNS,
+            onTable("functional", "alltypes", allExcept(viewMetadataPrivileges())))
         // Location should not appear with only column level auth.
         .okDescribe(tableName, style, new String[]{"id"},
             (String[]) ArrayUtils.addAll(ALLTYPES_COLUMNS_WITHOUT_ID,
@@ -1095,20 +1092,17 @@ public class AuthorizationStmtTest extends FrontendTestBase {
     tableName = new TTableName("functional", "alltypes_view");
     style = TDescribeOutputStyle.MINIMAL;
     authzTest = authorize("describe functional.alltypes_view");
-    for (TPrivilegeLevel privilege: new TPrivilegeLevel[]{
-        TPrivilegeLevel.ALL, TPrivilegeLevel.SELECT}) {
+    for (TPrivilegeLevel privilege: viewMetadataPrivileges()) {
       authzTest.okDescribe(tableName, style, ALLTYPES_COLUMNS, null, onServer(privilege))
           .okDescribe(tableName, style, ALLTYPES_COLUMNS, null, onDatabase("functional",
               privilege))
           .okDescribe(tableName, style, ALLTYPES_COLUMNS, null, onTable("functional",
               "alltypes_view", privilege));
     }
-    authzTest.okDescribe(tableName, style, null, ALLTYPES_COLUMNS, onServer(allExcept(
-        TPrivilegeLevel.ALL, TPrivilegeLevel.SELECT)))
+    authzTest.okDescribe(tableName, style, null, ALLTYPES_COLUMNS, onServer(
+        allExcept(viewMetadataPrivileges())))
         .okDescribe(tableName, style, null, ALLTYPES_COLUMNS, onDatabase("functional",
-            allExcept(TPrivilegeLevel.ALL, TPrivilegeLevel.SELECT)))
-        .okDescribe(tableName, style, null, ALLTYPES_COLUMNS, onTable("functional",
-            "alltypes_view", TPrivilegeLevel.INSERT))
+            allExcept(viewMetadataPrivileges())))
         .error(accessError("functional.alltypes_view"));
 
     // Describe view extended.
@@ -1118,20 +1112,17 @@ public class AuthorizationStmtTest extends FrontendTestBase {
     String[] viewStrings = new String[]{"View Original Text:", "View Expanded Text:"};
     checkStrings = (String[]) ArrayUtils.addAll(ALLTYPES_COLUMNS, viewStrings);
     authzTest = authorize("describe functional.alltypes_view");
-    for (TPrivilegeLevel privilege: new TPrivilegeLevel[]{
-        TPrivilegeLevel.ALL, TPrivilegeLevel.SELECT}) {
+    for (TPrivilegeLevel privilege: viewMetadataPrivileges()) {
       authzTest.okDescribe(tableName, style, checkStrings, null, onServer(privilege))
           .okDescribe(tableName, style, checkStrings, null, onDatabase("functional",
               privilege))
           .okDescribe(tableName, style, checkStrings, null, onTable("functional",
               "alltypes_view", privilege));
     }
-    authzTest.okDescribe(tableName, style, null, ALLTYPES_COLUMNS, onServer(allExcept(
-        TPrivilegeLevel.ALL, TPrivilegeLevel.SELECT)))
+    authzTest.okDescribe(tableName, style, null, ALLTYPES_COLUMNS, onServer(
+        allExcept(viewMetadataPrivileges())))
         .okDescribe(tableName, style, null, ALLTYPES_COLUMNS, onDatabase("functional",
-            allExcept(TPrivilegeLevel.ALL, TPrivilegeLevel.SELECT)))
-        .okDescribe(tableName, style, viewStrings, ALLTYPES_COLUMNS, onTable("functional",
-            "alltypes_view", TPrivilegeLevel.INSERT))
+            allExcept(viewMetadataPrivileges())))
         .error(accessError("functional.alltypes_view"));
 
     // Describe specific column on a table.


[impala] 01/04: cleanup: extract RowBatchQueue into its own file

Posted by ar...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

arodoni pushed a commit to branch 2.x
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 537a4646dc1bf0a204354f23ed0905674c71eae5
Author: Tim Armstrong <ta...@cloudera.com>
AuthorDate: Thu Jul 12 23:10:38 2018 -0700

    cleanup: extract RowBatchQueue into its own file
    
    While looking at IMPALA-7096, I noticed that RowBatchQueue was
    implemented in a strange place.
    
    Change-Id: I3577c1c6920b8cf858c8d49f8812ccc305d833f6
    Reviewed-on: http://gerrit.cloudera.org:8080/10943
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/blocking-join-node.h         |  1 +
 be/src/exec/exec-node.cc                 | 31 ---------------
 be/src/exec/exec-node.h                  | 35 -----------------
 be/src/exec/hdfs-scan-node.cc            |  1 +
 be/src/exec/kudu-scan-node.cc            |  1 +
 be/src/exec/scan-node.cc                 |  1 +
 be/src/exec/scan-node.h                  |  1 +
 be/src/exec/scanner-context.cc           |  1 +
 be/src/runtime/CMakeLists.txt            |  1 +
 be/src/runtime/data-stream-recvr.cc      |  1 +
 be/src/runtime/krpc-data-stream-recvr.cc |  1 +
 be/src/runtime/row-batch-queue.cc        | 55 +++++++++++++++++++++++++++
 be/src/runtime/row-batch-queue.h         | 65 ++++++++++++++++++++++++++++++++
 13 files changed, 129 insertions(+), 66 deletions(-)

diff --git a/be/src/exec/blocking-join-node.h b/be/src/exec/blocking-join-node.h
index b7dd79a..8198ad0 100644
--- a/be/src/exec/blocking-join-node.h
+++ b/be/src/exec/blocking-join-node.h
@@ -25,6 +25,7 @@
 
 #include "exec/exec-node.h"
 #include "util/promise.h"
+#include "util/stopwatch.h"
 
 #include "gen-cpp/PlanNodes_types.h"  // for TJoinOp
 
diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc
index a44a5c1..5dca184 100644
--- a/be/src/exec/exec-node.cc
+++ b/be/src/exec/exec-node.cc
@@ -80,37 +80,6 @@ int ExecNode::GetNodeIdFromProfile(RuntimeProfile* p) {
   return p->metadata();
 }
 
-ExecNode::RowBatchQueue::RowBatchQueue(int max_batches)
-  : BlockingQueue<unique_ptr<RowBatch>>(max_batches) {
-}
-
-ExecNode::RowBatchQueue::~RowBatchQueue() {
-  DCHECK(cleanup_queue_.empty());
-}
-
-void ExecNode::RowBatchQueue::AddBatch(unique_ptr<RowBatch> batch) {
-  if (!BlockingPut(move(batch))) {
-    lock_guard<SpinLock> l(lock_);
-    cleanup_queue_.push_back(move(batch));
-  }
-}
-
-unique_ptr<RowBatch> ExecNode::RowBatchQueue::GetBatch() {
-  unique_ptr<RowBatch> result;
-  if (BlockingGet(&result)) return result;
-  return unique_ptr<RowBatch>();
-}
-
-void ExecNode::RowBatchQueue::Cleanup() {
-  unique_ptr<RowBatch> batch = NULL;
-  while ((batch = GetBatch()) != NULL) {
-    batch.reset();
-  }
-
-  lock_guard<SpinLock> l(lock_);
-  cleanup_queue_.clear();
-}
-
 ExecNode::ExecNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
   : id_(tnode.node_id),
     type_(tnode.node_type),
diff --git a/be/src/exec/exec-node.h b/be/src/exec/exec-node.h
index 9a87a56..a62ed6c 100644
--- a/be/src/exec/exec-node.h
+++ b/be/src/exec/exec-node.h
@@ -30,7 +30,6 @@
 #include "runtime/bufferpool/reservation-tracker.h"
 #include "runtime/descriptors.h" // for RowDescriptor
 #include "runtime/reservation-manager.h"
-#include "util/blocking-queue.h"
 #include "util/runtime-profile.h"
 
 namespace impala {
@@ -243,40 +242,6 @@ class ExecNode {
     return reservation_manager_.ReleaseUnusedReservation();
   }
 
-  /// Extends blocking queue for row batches. Row batches have a property that
-  /// they must be processed in the order they were produced, even in cancellation
-  /// paths. Preceding row batches can contain ptrs to memory in subsequent row batches
-  /// and we need to make sure those ptrs stay valid.
-  /// Row batches that are added after Shutdown() are queued in another queue, which can
-  /// be cleaned up during Close().
-  /// All functions are thread safe.
-  class RowBatchQueue : public BlockingQueue<std::unique_ptr<RowBatch>> {
-   public:
-    /// max_batches is the maximum number of row batches that can be queued.
-    /// When the queue is full, producers will block.
-    RowBatchQueue(int max_batches);
-    ~RowBatchQueue();
-
-    /// Adds a batch to the queue. This is blocking if the queue is full.
-    void AddBatch(std::unique_ptr<RowBatch> batch);
-
-    /// Gets a row batch from the queue. Returns NULL if there are no more.
-    /// This function blocks.
-    /// Returns NULL after Shutdown().
-    std::unique_ptr<RowBatch> GetBatch();
-
-    /// Deletes all row batches in cleanup_queue_. Not valid to call AddBatch()
-    /// after this is called.
-    void Cleanup();
-
-   private:
-    /// Lock protecting cleanup_queue_
-    SpinLock lock_;
-
-    /// Queue of orphaned row batches
-    std::list<std::unique_ptr<RowBatch>> cleanup_queue_;
-  };
-
   /// Unique within a single plan tree.
   int id_;
   TPlanNodeType::type type_;
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index 1ceaf2c..5d4f9b0 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -31,6 +31,7 @@
 #include "runtime/runtime-state.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/row-batch.h"
+#include "runtime/row-batch-queue.h"
 #include "runtime/thread-resource-mgr.h"
 #include "util/debug-util.h"
 #include "util/disk-info.h"
diff --git a/be/src/exec/kudu-scan-node.cc b/be/src/exec/kudu-scan-node.cc
index 48816f9..30194f9 100644
--- a/be/src/exec/kudu-scan-node.cc
+++ b/be/src/exec/kudu-scan-node.cc
@@ -27,6 +27,7 @@
 #include "runtime/mem-pool.h"
 #include "runtime/runtime-state.h"
 #include "runtime/row-batch.h"
+#include "runtime/row-batch-queue.h"
 #include "runtime/thread-resource-mgr.h"
 #include "runtime/tuple-row.h"
 #include "util/runtime-profile-counters.h"
diff --git a/be/src/exec/scan-node.cc b/be/src/exec/scan-node.cc
index 01aa269..4d59eed 100644
--- a/be/src/exec/scan-node.cc
+++ b/be/src/exec/scan-node.cc
@@ -22,6 +22,7 @@
 #include "exprs/scalar-expr.h"
 #include "runtime/io/disk-io-mgr.h"
 #include "runtime/row-batch.h"
+#include "runtime/row-batch-queue.h"
 #include "runtime/runtime-filter.inline.h"
 #include "runtime/runtime-state.h"
 #include "util/disk-info.h"
diff --git a/be/src/exec/scan-node.h b/be/src/exec/scan-node.h
index 63bb59b..1d0728c 100644
--- a/be/src/exec/scan-node.h
+++ b/be/src/exec/scan-node.h
@@ -28,6 +28,7 @@
 
 namespace impala {
 
+class RowBatchQueue;
 class TScanRange;
 
 /// Abstract base class of all scan nodes. Subclasses support different storage layers
diff --git a/be/src/exec/scanner-context.cc b/be/src/exec/scanner-context.cc
index 75aacee..a9fad6a 100644
--- a/be/src/exec/scanner-context.cc
+++ b/be/src/exec/scanner-context.cc
@@ -26,6 +26,7 @@
 #include "runtime/exec-env.h"
 #include "runtime/mem-pool.h"
 #include "runtime/row-batch.h"
+#include "runtime/row-batch-queue.h"
 #include "runtime/runtime-state.h"
 #include "runtime/string-buffer.h"
 #include "util/debug-util.h"
diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt
index 2fea5bd..e09b27c 100644
--- a/be/src/runtime/CMakeLists.txt
+++ b/be/src/runtime/CMakeLists.txt
@@ -61,6 +61,7 @@ add_library(Runtime
   reservation-manager.cc
   row-batch.cc
   ${ROW_BATCH_PROTO_SRCS}
+  row-batch-queue.cc
   runtime-filter.cc
   runtime-filter-bank.cc
   runtime-filter-ir.cc
diff --git a/be/src/runtime/data-stream-recvr.cc b/be/src/runtime/data-stream-recvr.cc
index 8d9047f..c9a9ab9 100644
--- a/be/src/runtime/data-stream-recvr.cc
+++ b/be/src/runtime/data-stream-recvr.cc
@@ -22,6 +22,7 @@
 #include "runtime/data-stream-mgr.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/row-batch.h"
+#include "runtime/row-batch-queue.h"
 #include "runtime/sorted-run-merger.h"
 #include "util/condition-variable.h"
 #include "util/runtime-profile-counters.h"
diff --git a/be/src/runtime/krpc-data-stream-recvr.cc b/be/src/runtime/krpc-data-stream-recvr.cc
index be51f32..3933e02 100644
--- a/be/src/runtime/krpc-data-stream-recvr.cc
+++ b/be/src/runtime/krpc-data-stream-recvr.cc
@@ -31,6 +31,7 @@
 #include "runtime/krpc-data-stream-mgr.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/row-batch.h"
+#include "runtime/row-batch-queue.h"
 #include "runtime/sorted-run-merger.h"
 #include "service/data-stream-service.h"
 #include "util/runtime-profile-counters.h"
diff --git a/be/src/runtime/row-batch-queue.cc b/be/src/runtime/row-batch-queue.cc
new file mode 100644
index 0000000..1fd5555
--- /dev/null
+++ b/be/src/runtime/row-batch-queue.cc
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/row-batch-queue.h"
+
+#include "runtime/row-batch.h"
+
+#include "common/names.h"
+
+namespace impala {
+
+RowBatchQueue::RowBatchQueue(int max_batches)
+  : BlockingQueue<unique_ptr<RowBatch>>(max_batches) {}
+
+RowBatchQueue::~RowBatchQueue() {
+  DCHECK(cleanup_queue_.empty());
+}
+
+void RowBatchQueue::AddBatch(unique_ptr<RowBatch> batch) {
+  if (!BlockingPut(move(batch))) {
+    lock_guard<SpinLock> l(lock_);
+    cleanup_queue_.push_back(move(batch));
+  }
+}
+
+unique_ptr<RowBatch> RowBatchQueue::GetBatch() {
+  unique_ptr<RowBatch> result;
+  if (BlockingGet(&result)) return result;
+  return unique_ptr<RowBatch>();
+}
+
+void RowBatchQueue::Cleanup() {
+  unique_ptr<RowBatch> batch = nullptr;
+  while ((batch = GetBatch()) != nullptr) {
+    batch.reset();
+  }
+
+  lock_guard<SpinLock> l(lock_);
+  cleanup_queue_.clear();
+}
+}
diff --git a/be/src/runtime/row-batch-queue.h b/be/src/runtime/row-batch-queue.h
new file mode 100644
index 0000000..bd2f551
--- /dev/null
+++ b/be/src/runtime/row-batch-queue.h
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_RUNTIME_BLOCKING_QUEUE_H
+#define IMPALA_RUNTIME_BLOCKING_QUEUE_H
+
+#include <list>
+#include <memory>
+
+#include "util/blocking-queue.h"
+#include "util/spinlock.h"
+
+namespace impala {
+
+class RowBatch;
+
+/// Extends blocking queue for row batches. Row batches have a property that
+/// they must be processed in the order they were produced, even in cancellation
+/// paths. Preceding row batches can contain ptrs to memory in subsequent row batches
+/// and we need to make sure those ptrs stay valid.
+/// Row batches that are added after Shutdown() are queued in a separate "cleanup"
+/// queue, which can be cleaned up during Close().
+/// All functions are thread safe.
+class RowBatchQueue : public BlockingQueue<std::unique_ptr<RowBatch>> {
+ public:
+  /// max_batches is the maximum number of row batches that can be queued.
+  /// When the queue is full, producers will block.
+  RowBatchQueue(int max_batches);
+  ~RowBatchQueue();
+
+  /// Adds a batch to the queue. This is blocking if the queue is full.
+  void AddBatch(std::unique_ptr<RowBatch> batch);
+
+  /// Gets a row batch from the queue. Returns NULL if there are no more.
+  /// This function blocks.
+  /// Returns NULL after Shutdown().
+  std::unique_ptr<RowBatch> GetBatch();
+
+  /// Deletes all row batches in cleanup_queue_. Not valid to call AddBatch()
+  /// after this is called.
+  void Cleanup();
+
+ private:
+  /// Lock protecting cleanup_queue_
+  SpinLock lock_;
+
+  /// Queue of orphaned row batches
+  std::list<std::unique_ptr<RowBatch>> cleanup_queue_;
+};
+}
+#endif


[impala] 02/04: IMPALA-7294. TABLESAMPLE should not allocate array based on total table file count

Posted by ar...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

arodoni pushed a commit to branch 2.x
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 9f372f0d5df734c682c4c4cb237fc17e3e3f7bb0
Author: Todd Lipcon <to...@cloudera.com>
AuthorDate: Thu Jul 12 17:07:09 2018 -0700

    IMPALA-7294. TABLESAMPLE should not allocate array based on total table file count
    
    This changes HdfsTable.getFilesSample() to allocate its intermediate
    sampling array based on the number of files in the selected
    (post-pruning) partitions, rather than the total number of files in the
    table. While the former behavior was correct (the total file count is of
    course an upper bound on the pruned file count), it was an unnecessarily
    large allocation, which has some downsides around garbage collection.
    
    In addition, this is important for the LocalCatalog implementation of
    table sampling, since we do not want to have to load all partition file
    lists in order to compute a sample over a pruned subset of partitions.
    
    The original code indicated that this was an optimization to avoid
    looping over the partition list an extra time. However, typical
    partition lists are relatively small even in the worst case (order of
    100k) and looping over 100k in-memory Java objects is not likely to be
    the bottleneck in planning any query. This is especially true
    considering that we loop over that same list later in the function
    anyway, so we probably aren't saving page faults or LLC cache misses
    either.
    
    In testing this change I noticed that the existing test for TABLESAMPLE
    didn't test TABLESAMPLE when applied in conjunction with a predicate.
    I added a new dimension to the test which employs a predicate which
    prunes some partitions to ensure that the code works in that case.
    I also added coverage of the "100%" sampling parameter as a sanity check
    that it returns the same results as a non-sampled query.
    
    Change-Id: I0248d89bcd9dd4ff8b4b85fef282c19e3fe9bdd5
    Reviewed-on: http://gerrit.cloudera.org:8080/10936
    Reviewed-by: Philip Zeyliger <ph...@cloudera.com>
    Reviewed-by: Vuk Ercegovac <ve...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../java/org/apache/impala/catalog/HdfsTable.java  | 22 ++++++++++++++--------
 tests/query_test/test_tablesample.py               | 20 +++++++++++++++-----
 2 files changed, 29 insertions(+), 13 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 5360960..5f2fc05 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -2157,16 +2157,20 @@ public class HdfsTable extends Table implements FeFsTable {
     Preconditions.checkState(percentBytes >= 0 && percentBytes <= 100);
     Preconditions.checkState(minSampleBytes >= 0);
 
+    long totalNumFiles = 0;
+    for (FeFsPartition part : inputParts) {
+      totalNumFiles += part.getNumFileDescriptors();
+    }
+
     // Conservative max size for Java arrays. The actual maximum varies
     // from JVM version and sometimes between configurations.
     final long JVM_MAX_ARRAY_SIZE = Integer.MAX_VALUE - 10;
-    if (fileMetadataStats_.numFiles > JVM_MAX_ARRAY_SIZE) {
+    if (totalNumFiles > JVM_MAX_ARRAY_SIZE) {
       throw new IllegalStateException(String.format(
-          "Too many files to generate a table sample. " +
-          "Table '%s' has %s files, but a maximum of %s files are supported.",
-          getTableName().toString(), fileMetadataStats_.numFiles, JVM_MAX_ARRAY_SIZE));
+          "Too many files to generate a table sample of table %s. " +
+          "Sample requested over %s files, but a maximum of %s files are supported.",
+          getTableName().toString(), totalNumFiles, JVM_MAX_ARRAY_SIZE));
     }
-    int totalNumFiles = (int) fileMetadataStats_.numFiles;
 
     // Ensure a consistent ordering of files for repeatable runs. The files within a
     // partition are already ordered based on how they are loaded in the catalog.
@@ -2176,12 +2180,11 @@ public class HdfsTable extends Table implements FeFsTable {
     // fileIdxs contains indexes into the file descriptor lists of all inputParts
     // parts[i] contains the partition corresponding to fileIdxs[i]
     // fileIdxs[i] is an index into the file descriptor list of the partition parts[i]
-    // Use max size to avoid looping over inputParts for the exact size.
     // The purpose of these arrays is to efficiently avoid selecting the same file
     // multiple times during the sampling, regardless of the sample percent. We purposely
     // avoid generating objects proportional to the number of files.
-    int[] fileIdxs = new int[totalNumFiles];
-    FeFsPartition[] parts = new FeFsPartition[totalNumFiles];
+    int[] fileIdxs = new int[(int)totalNumFiles];
+    FeFsPartition[] parts = new FeFsPartition[(int)totalNumFiles];
     int idx = 0;
     long totalBytes = 0;
     for (FeFsPartition part: orderedParts) {
@@ -2193,6 +2196,9 @@ public class HdfsTable extends Table implements FeFsTable {
         ++idx;
       }
     }
+    if (idx != totalNumFiles) {
+      throw new AssertionError("partition file counts changed during iteration");
+    }
 
     int numFilesRemaining = idx;
     double fracPercentBytes = (double) percentBytes / 100;
diff --git a/tests/query_test/test_tablesample.py b/tests/query_test/test_tablesample.py
index 4bc7e1f..8652382 100644
--- a/tests/query_test/test_tablesample.py
+++ b/tests/query_test/test_tablesample.py
@@ -32,6 +32,7 @@ class TestTableSample(ImpalaTestSuite):
   def add_test_dimensions(cls):
     super(TestTableSample, cls).add_test_dimensions()
     cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('repeatable', *[True, False]))
+    cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('filtered', *[True, False]))
     # Tablesample is only supported on HDFS tables.
     cls.ImpalaTestMatrix.add_constraint(lambda v:
       v.get_value('table_format').file_format != 'kudu' and
@@ -48,15 +49,21 @@ class TestTableSample(ImpalaTestSuite):
     # 2. The results of queries without a repeatable clause could change due to
     # changes in data loading that affect the number or size of files.
     repeatable = vector.get_value('repeatable')
+    filtered = vector.get_value('filtered')
+
+    where_clause = ""
+    if filtered:
+      where_clause = "where month between 1 and 6"
+
     ImpalaTestSuite.change_database(self.client, vector.get_value('table_format'))
-    result = self.client.execute("select count(*) from alltypes")
+    result = self.client.execute("select count(*) from alltypes %s" % where_clause)
     baseline_count = int(result.data[0])
     prev_count = None
-    for perc in [5, 20, 50]:
+    for perc in [5, 20, 50, 100]:
       rep_sql = ""
       if repeatable: rep_sql = " repeatable(1)"
-      sql_stmt = "select count(*) from alltypes tablesample system(%s)%s" \
-                 % (perc, rep_sql)
+      sql_stmt = "select count(*) from alltypes tablesample system(%s)%s %s" \
+                 % (perc, rep_sql, where_clause)
       handle = self.client.execute_async(sql_stmt)
       # IMPALA-6352: flaky test, possibly due to a hung thread. Wait for 500 sec before
       # failing and logging the backtraces of all impalads.
@@ -76,7 +83,10 @@ class TestTableSample(ImpalaTestSuite):
       result = self.client.fetch(sql_stmt, handle)
       self.client.close_query(handle)
       count = int(result.data[0])
-      assert count < baseline_count
+      if perc < 100:
+        assert count < baseline_count
+      else:
+        assert count == baseline_count
       if prev_count and repeatable:
         # May not necessarily be true for non-repeatable samples
         assert count > prev_count