You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by as...@apache.org on 2021/03/23 00:04:14 UTC

[impala] branch master updated (c9d7bcb -> b28da05)

This is an automated email from the ASF dual-hosted git repository.

asherman pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from c9d7bcb  IMPALA-9661: Avoid introducing unused columns in table masking view
     new a0f7768  IMPALA-10483: Support subqueries in Ranger masking policies
     new e5d5dbc  Update Paramiko to 2.4.2.
     new b28da05  IMPALA-10592: prevent pytest from hanging at exit.

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/impala/analysis/InlineViewRef.java  |  19 +---
 .../apache/impala/analysis/StmtMetadataLoader.java |  46 +++++++-
 .../org/apache/impala/authorization/TableMask.java |  34 ++++--
 .../ranger/RangerAuthorizationChecker.java         |  14 +--
 .../java/org/apache/impala/service/Frontend.java   |   5 +-
 infra/python/deps/extended-test-requirements.txt   |   2 +-
 .../queries/QueryTest/ranger_column_masking.test   |  16 ++-
 .../queries/QueryTest/ranger_row_filtering.test    | 103 ++++++++++++++++--
 tests/authorization/test_ranger.py                 | 118 ++++++++++++++++++---
 tests/custom_cluster/test_admission_controller.py  |   1 +
 10 files changed, 299 insertions(+), 59 deletions(-)

[impala] 01/03: IMPALA-10483: Support subqueries in Ranger masking policies

Posted by as...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

asherman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit a0f77680c53d4bd4b85aa2d80224dbd76dd15126
Author: stiga-huang <hu...@gmail.com>
AuthorDate: Sun Mar 14 16:38:41 2021 +0800

    IMPALA-10483: Support subqueries in Ranger masking policies
    
    This patch adds support for using subqueries in Ranger masking policies,
    i.e. column-masking/row-filtering policies. The subquery can reference
    either the current table or other tables. However, masking policies on
    these tables won't be applied recursively. This is consistent with Hive.
    One motivation is to avoid infinitely masking if it references the same
    table. Another motivation I think is to simplify the masking behavior,
    so when the admin is setting a masking expression, it can be considered
    as running in the admin's perspective (i.e. no masking).
    
    Implementation
    Before analyzing the query, the coordinator loads the metadata of all
    possibly used tables into the query's StmtTableCache. Table masking
    takes place after the analyzing phase. If the subquery filter introduces
    any new tables, the analyzer will fail to resolve them since their
    metadata is not loaded in the StmtTableCache. This patch modified the
    StmtMetadataLoader to also load those tables introduced by masking
    policies. So they can be resolved correctly.
    
    Tests
     - Add more complex tests in test_row_filtering
    
    Change-Id: I254df9f684c95c660f402abd99ca12dded7e764f
    Reviewed-on: http://gerrit.cloudera.org:8080/17185
    Reviewed-by: Aman Sinha <am...@cloudera.com>
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../org/apache/impala/analysis/InlineViewRef.java  |  19 +---
 .../apache/impala/analysis/StmtMetadataLoader.java |  46 +++++++-
 .../org/apache/impala/authorization/TableMask.java |  34 ++++--
 .../ranger/RangerAuthorizationChecker.java         |  14 +--
 .../java/org/apache/impala/service/Frontend.java   |   5 +-
 .../queries/QueryTest/ranger_column_masking.test   |  16 ++-
 .../queries/QueryTest/ranger_row_filtering.test    | 103 ++++++++++++++++--
 tests/authorization/test_ranger.py                 | 118 ++++++++++++++++++---
 8 files changed, 297 insertions(+), 58 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/analysis/InlineViewRef.java b/fe/src/main/java/org/apache/impala/analysis/InlineViewRef.java
index da7ced2..0f30a40 100644
--- a/fe/src/main/java/org/apache/impala/analysis/InlineViewRef.java
+++ b/fe/src/main/java/org/apache/impala/analysis/InlineViewRef.java
@@ -171,21 +171,6 @@ public class InlineViewRef extends TableRef {
     Expr wherePredicate = tableMask.createRowFilter(authzCtx);
     SelectStmt tableMaskStmt = new SelectStmt(selectList, fromClause, wherePredicate,
         null, null, null, null);
-    // TODO(IMPALA-10483): Column-masking/Row-filtering expressions may have subqueries
-    //  which may introduce new tables. We should trigger StmtMetadataLoader#loadTables()
-    //  on 'tableMaskStmt'. Otherwise, they can't be resolved. Reject the query in this
-    //  case.
-    List<TableRef> tableRefsInView = tableMaskStmt.collectTableRefs();
-    // Should only contain the base table ref.
-    if (tableRefsInView.size() > 1) {
-      tableRefsInView.remove(tableRef);
-      throw new AnalysisException("Column-masking/Row-filtering expressions using " +
-          "subqueries are not supported (IMPALA-10483). Table(s) in the subquery: " +
-          tableRefsInView.stream()
-              .map(t -> String.join(".", t.getPath()))
-              .collect(Collectors.toList())
-      );
-    }
 
     InlineViewRef viewRef = new InlineViewRef(/*alias*/ null, tableMaskStmt,
         (TableSampleClause) null);
@@ -418,7 +403,9 @@ public class InlineViewRef extends TableRef {
     Preconditions.checkState(queryStmt_ instanceof SelectStmt);
     SelectStmt selectStmt = (SelectStmt) queryStmt_;
     Preconditions.checkNotNull(selectStmt.fromClause_);
-    Preconditions.checkState(selectStmt.fromClause_.size() == 1);
+    // FromClause could have several table refs due to subquery rewrite, i.e. subquery
+    // could be rewritten to joins. The first table refs is the original table ref.
+    Preconditions.checkState(selectStmt.fromClause_.size() > 0);
     return selectStmt.fromClause_.get(0);
   }
 
diff --git a/fe/src/main/java/org/apache/impala/analysis/StmtMetadataLoader.java b/fe/src/main/java/org/apache/impala/analysis/StmtMetadataLoader.java
index df7ccf9..b400f93 100644
--- a/fe/src/main/java/org/apache/impala/analysis/StmtMetadataLoader.java
+++ b/fe/src/main/java/org/apache/impala/analysis/StmtMetadataLoader.java
@@ -25,12 +25,16 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.impala.authorization.TableMask;
+import org.apache.impala.authorization.User;
+import org.apache.impala.catalog.Column;
 import org.apache.impala.catalog.FeCatalog;
 import org.apache.impala.catalog.FeDb;
 import org.apache.impala.catalog.FeIncompleteTable;
 import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.FeView;
 import org.apache.impala.catalog.Table;
+import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.InternalException;
 import org.apache.impala.compat.MetastoreShim;
 import org.apache.impala.service.Frontend;
@@ -57,6 +61,7 @@ public class StmtMetadataLoader {
   private final Frontend fe_;
   private final String sessionDb_;
   private final EventSequence timeline_;
+  private final User user_;
 
   // Results of the loading process. See StmtTableCache.
   private final Set<String> dbs_ = new HashSet<>();
@@ -100,10 +105,16 @@ public class StmtMetadataLoader {
    * The 'fe' and 'sessionDb' arguments must be non-null. A null 'timeline' may be passed
    * if no events should be marked.
    */
-  public StmtMetadataLoader(Frontend fe, String sessionDb, EventSequence timeline) {
+  public StmtMetadataLoader(Frontend fe, String sessionDb, EventSequence timeline,
+      User user) {
     fe_ = Preconditions.checkNotNull(fe);
     sessionDb_ = Preconditions.checkNotNull(sessionDb);
     timeline_ = timeline;
+    user_ = user;
+  }
+
+  public StmtMetadataLoader(Frontend fe, String sessionDb, EventSequence timeline) {
+    this(fe, sessionDb, timeline, null);
   }
 
   // Getters for testing
@@ -297,6 +308,15 @@ public class StmtMetadataLoader {
       if (tbl instanceof FeView) {
         viewTbls.addAll(collectTableCandidates(((FeView) tbl).getQueryStmt()));
       }
+      // Adds tables/views introduced by column-masking/row-filtering policies.
+      if (fe_.getAuthzFactory().getAuthorizationConfig().isEnabled()
+          && fe_.getAuthzFactory().supportsTableMasking()) {
+        try {
+          viewTbls.addAll(collectPolicyTables(tbl));
+        } catch (Exception e) {
+          LOG.error("Failed to collect policy tables for {}", tblName, e);
+        }
+      }
     }
     // Recursively collect loaded/missing tables from loaded views.
     if (!viewTbls.isEmpty()) missingTbls.addAll(getMissingTables(catalog, viewTbls));
@@ -319,4 +339,28 @@ public class StmtMetadataLoader {
     }
     return tableNames;
   }
+
+  private Set<TableName> collectPolicyTables(FeTable tbl)
+      throws InternalException, AnalysisException {
+    Set<TableName> tableNames = new HashSet<>();
+    String dbName = tbl.getDb().getName();
+    String tblName = tbl.getName();
+    List<Column> columns = tbl.getColumnsInHiveOrder();
+    TableMask tableMask = new TableMask(fe_.getAuthzChecker(), dbName, tblName, columns,
+        user_);
+    if (tableMask.needsMaskingOrFiltering()) {
+      for (Column col : columns) {
+        if (col.getType().isComplexType()) continue;
+        // Use authzCtx=null to avoid audits and privilege checks.
+        SelectStmt stmt = tableMask.createColumnMaskStmt(
+            col.getName(), col.getType(), /*authzCtx*/ null);
+        if (stmt == null) continue;
+        tableNames.addAll(collectTableCandidates(stmt));
+      }
+      // Use authzCtx=null to avoid audits and privilege checks.
+      SelectStmt filterStmt = tableMask.createRowFilterStmt(/*authzCtx*/null);
+      if (filterStmt != null) tableNames.addAll(collectTableCandidates(filterStmt));
+    }
+    return tableNames;
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/authorization/TableMask.java b/fe/src/main/java/org/apache/impala/authorization/TableMask.java
index 440ecf0..44e954b 100644
--- a/fe/src/main/java/org/apache/impala/authorization/TableMask.java
+++ b/fe/src/main/java/org/apache/impala/authorization/TableMask.java
@@ -75,10 +75,7 @@ public class TableMask {
     return authChecker_.needsRowFiltering(user_, dbName_, tableName_);
   }
 
-  /**
-   * Return the masked Expr of the given column
-   */
-  public Expr createColumnMask(String colName, Type colType,
+  public SelectStmt createColumnMaskStmt(String colName, Type colType,
       AuthorizationContext authzCtx) throws InternalException,
       AnalysisException {
     Preconditions.checkState(!colType.isComplexType());
@@ -89,7 +86,7 @@ public class TableMask {
           dbName_, tableName_, colName, maskedValue);
     }
     if (maskedValue == null || maskedValue.equals(colName)) {  // Don't need masking.
-      return new SlotRef(Lists.newArrayList(colName));
+      return null;
     }
     SelectStmt maskStmt = (SelectStmt) Parser.parse(
         String.format("SELECT CAST(%s AS %s)", maskedValue, colType));
@@ -97,6 +94,17 @@ public class TableMask {
         || maskStmt.hasHavingClause() || maskStmt.hasWhereClause()) {
       throw new AnalysisException("Illegal column masked value: " + maskedValue);
     }
+    return maskStmt;
+  }
+
+  /**
+   * Return the masked Expr of the given column
+   */
+  public Expr createColumnMask(String colName, Type colType,
+      AuthorizationContext authzCtx) throws InternalException,
+      AnalysisException {
+    SelectStmt maskStmt = createColumnMaskStmt(colName, colType, authzCtx);
+    if (maskStmt == null) return new SlotRef(Lists.newArrayList(colName));
     Expr res = maskStmt.getSelectList().getItems().get(0).getExpr();
     if (LOG.isTraceEnabled()) {
       LOG.trace("Returned Expr: " + res.toSql());
@@ -104,16 +112,22 @@ public class TableMask {
     return res;
   }
 
-  /**
-   * Return the row filter Expr
-   */
-  public Expr createRowFilter(AuthorizationContext authzCtx)
+  public SelectStmt createRowFilterStmt(AuthorizationContext authzCtx)
       throws InternalException, AnalysisException {
     String rowFilter = authChecker_.createRowFilter(user_, dbName_, tableName_, authzCtx);
     if (rowFilter == null) return null;
     // Parse the row filter string to AST by using it in a fake query.
     String stmtSql = String.format("SELECT 1 FROM foo WHERE %s", rowFilter);
-    SelectStmt selectStmt = (SelectStmt) Parser.parse(stmtSql);
+    return (SelectStmt) Parser.parse(stmtSql);
+  }
+
+  /**
+   * Return the row filter Expr
+   */
+  public Expr createRowFilter(AuthorizationContext authzCtx)
+      throws InternalException, AnalysisException {
+    SelectStmt selectStmt = createRowFilterStmt(authzCtx);
+    if (selectStmt == null) return null;
     return selectStmt.getWhereClause();
   }
 }
diff --git a/fe/src/main/java/org/apache/impala/authorization/ranger/RangerAuthorizationChecker.java b/fe/src/main/java/org/apache/impala/authorization/ranger/RangerAuthorizationChecker.java
index 876659c..337ff97 100644
--- a/fe/src/main/java/org/apache/impala/authorization/ranger/RangerAuthorizationChecker.java
+++ b/fe/src/main/java/org/apache/impala/authorization/ranger/RangerAuthorizationChecker.java
@@ -360,12 +360,13 @@ public class RangerAuthorizationChecker extends BaseAuthorizationChecker {
   @Override
   public String createColumnMask(User user, String dbName, String tableName,
       String columnName, AuthorizationContext rangerCtx) throws InternalException {
-    RangerBufferAuditHandler auditHandler =
+    RangerBufferAuditHandler auditHandler = (rangerCtx == null) ? null:
         ((RangerAuthorizationContext) rangerCtx).getAuditHandler();
-    int numAuthzAuditEventsBefore = auditHandler.getAuthzEvents().size();
+    int numAuthzAuditEventsBefore = (auditHandler == null) ? 0 :
+        auditHandler.getAuthzEvents().size();
     RangerAccessResult accessResult = evalColumnMask(user, dbName, tableName, columnName,
         auditHandler);
-    if (!accessResult.isMaskEnabled()) {
+    if (!accessResult.isMaskEnabled() && auditHandler != null) {
       // No column masking policies, remove any possible stale audit events and
       // return the original column.
       removeStaleAudits(auditHandler, numAuthzAuditEventsBefore);
@@ -407,12 +408,13 @@ public class RangerAuthorizationChecker extends BaseAuthorizationChecker {
   @Override
   public String createRowFilter(User user, String dbName, String tableName,
       AuthorizationContext rangerCtx) throws InternalException {
-    RangerBufferAuditHandler auditHandler =
+    RangerBufferAuditHandler auditHandler = (rangerCtx == null) ? null :
         ((RangerAuthorizationContext) rangerCtx).getAuditHandler();
-    int numAuthzAuditEventsBefore = auditHandler.getAuthzEvents().size();
+    int numAuthzAuditEventsBefore = (auditHandler == null) ? 0 :
+        auditHandler.getAuthzEvents().size();
     RangerAccessResult accessResult = evalRowFilter(user, dbName, tableName,
         auditHandler);
-    if (!accessResult.isRowFilterEnabled()) {
+    if (!accessResult.isRowFilterEnabled() && auditHandler != null) {
       // No row filtering policies, remove any possible stale audit events.
       removeStaleAudits(auditHandler, numAuthzAuditEventsBefore);
       return null;
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index ba7e89e..d871e4b 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -382,6 +382,8 @@ public class Frontend {
 
   public FeCatalog getCatalog() { return catalogManager_.getOrCreateCatalog(); }
 
+  public AuthorizationFactory getAuthzFactory() { return authzFactory_; }
+
   public AuthorizationChecker getAuthzChecker() { return authzChecker_.get(); }
 
   public AuthorizationManager getAuthzManager() { return authzManager_; }
@@ -1628,8 +1630,9 @@ public class Frontend {
     // Parse stmt and collect/load metadata to populate a stmt-local table cache
     StatementBase stmt = Parser.parse(
         queryCtx.client_request.stmt, queryCtx.client_request.query_options);
+    User user = new User(TSessionStateUtil.getEffectiveUser(queryCtx.session));
     StmtMetadataLoader metadataLoader =
-        new StmtMetadataLoader(this, queryCtx.session.database, timeline);
+        new StmtMetadataLoader(this, queryCtx.session.database, timeline, user);
     //TODO (IMPALA-8788): should load table write ids in transaction context.
     StmtTableCache stmtTableCache = metadataLoader.loadTables(stmt);
 
diff --git a/testdata/workloads/functional-query/queries/QueryTest/ranger_column_masking.test b/testdata/workloads/functional-query/queries/QueryTest/ranger_column_masking.test
index 2d1aed6..c0e37a4 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/ranger_column_masking.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/ranger_column_masking.test
@@ -400,7 +400,17 @@ show create view $UNIQUE_DB.masked_view;
 ====
 ---- QUERY
 select id, bigint_col from functional.alltypesagg order by id limit 10
----- CATCH
-AnalysisException: Column-masking/Row-filtering expressions using subqueries are not
- supported (IMPALA-10483). Table(s) in the subquery: [functional.alltypestiny]
+---- RESULTS
+0,8
+0,8
+1,8
+2,8
+3,8
+4,8
+5,8
+6,8
+7,8
+8,8
+---- TYPES
+INT,BIGINT
 ====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/ranger_row_filtering.test b/testdata/workloads/functional-query/queries/QueryTest/ranger_row_filtering.test
index 17be8d1..2e5add8 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/ranger_row_filtering.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/ranger_row_filtering.test
@@ -340,19 +340,106 @@ select * from $UNIQUE_DB.masked_view
 INT
 ====
 ---- QUERY
-# TODO(IMPALA-10483): support using subquery on the same table as the row-filter expression
+# Test subquery row filter "id = (select min(id) from functional.alltypesagg)".
+# The row filter won't be recursively applied in the subquery.
+select id, bool_col, string_col from functional.alltypesagg
+---- RESULTS
+0,true,'0'
+0,true,'0'
+---- TYPES
+INT,BOOLEAN,STRING
+====
+---- QUERY
+# Test subquery row filter "id = (select min(id) from functional.alltypesagg)".
+# The row filter won't be recursively applied in the subquery.
 select count(*) from functional.alltypesagg
 ---- RESULTS
----- CATCH
-AnalysisException: Column-masking/Row-filtering expressions using subqueries are not
- supported (IMPALA-10483). Table(s) in the subquery: [functional.alltypesagg]
+2
+---- TYPES
+BIGINT
+====
+---- QUERY
+# Test subquery row filter "id in (select id from functional.alltypestiny)".
+# Row filter (id % 2 = 0) of functional.alltypestiny won't be recursively applied.
+# This is consistent with Hive.
+select id, bool_col, string_col from functional_parquet.alltypesagg
+---- RESULTS
+0,true,'0'
+0,true,'0'
+1,false,'1'
+2,true,'2'
+3,false,'3'
+4,true,'4'
+5,false,'5'
+6,true,'6'
+7,false,'7'
+---- TYPES
+INT,BOOLEAN,STRING
 ====
 ---- QUERY
-# TODO(IMPALA-10483): support using subquery on other tables as the row-filter expression
+# Test subquery row filter "id in (select id from functional.alltypestiny)".
+# Row filter (id % 2 = 0) of functional.alltypestiny won't be recursively applied.
+# This is consistent with Hive.
 select count(*) from functional_parquet.alltypesagg
----- CATCH
-AnalysisException: Column-masking/Row-filtering expressions using subqueries are not
- supported (IMPALA-10483). Table(s) in the subquery: [functional.alltypestiny]
+---- RESULTS
+9
+---- TYPES
+BIGINT
+====
+---- QUERY
+# Test subquery row filter "id in (select id from functional.alltypestiny)".
+# Row filter (id % 2 = 0) of functional.alltypestiny will be applied on t. It won't be
+# recursively applied in the row filter of functional_parquet.alltypesagg.
+select t.id, t.bool_col, t.string_col
+from functional.alltypestiny t join functional_parquet.alltypesagg a on t.id = a.id
+---- RESULTS
+0,true,'0'
+0,true,'0'
+2,true,'0'
+4,true,'0'
+6,true,'0'
+---- TYPES
+INT,BOOLEAN,STRING
+====
+---- QUERY
+# Same as the above query but adding a WHERE clause
+select t.id, t.bool_col, t.string_col
+from functional.alltypestiny t join functional_parquet.alltypesagg a on t.id = a.id
+where t.id != 0 and a.id < 3
+---- RESULTS
+2,true,'0'
+---- TYPES
+INT,BOOLEAN,STRING
+====
+---- QUERY
+# Test subquery row filter "id in (select id from functional.alltypestiny)".
+# Row filter (id % 2 = 0) of functional.alltypestiny will be applied on t. It won't be
+# recursively applied in the row filter of functional_parquet.alltypesagg.
+select a.id, a.bool_col, a.string_col, t.id, t.bool_col, t.string_col
+from functional_parquet.alltypesagg a left join functional.alltypestiny t on t.id = a.id
+---- RESULTS
+0,true,'0',0,true,'0'
+0,true,'0',0,true,'0'
+1,false,'1',NULL,NULL,'NULL'
+2,true,'2',2,true,'0'
+3,false,'3',NULL,NULL,'NULL'
+4,true,'4',4,true,'0'
+5,false,'5',NULL,NULL,'NULL'
+6,true,'6',6,true,'0'
+7,false,'7',NULL,NULL,'NULL'
+---- TYPES
+INT,BOOLEAN,STRING,INT,BOOLEAN,STRING
+====
+---- QUERY
+# Test subquery row filter "id in (select id from functional.alltypestiny)" in local
+# views.
+with v1 as (select id, bool_col, string_col from functional_parquet.alltypesagg),
+  v2 as (select id, count(*) cnt from functional_parquet.alltypesagg group by id)
+select count(*) from v1 join v2 on v1.id = v2.id
+---- RESULTS
+9
+---- TYPES
+BIGINT
 ====
 ---- QUERY
 # Row-filtering policy keeps rows with "nested_struct.a is not NULL"
diff --git a/tests/authorization/test_ranger.py b/tests/authorization/test_ranger.py
index 64927aa..0ebc850 100644
--- a/tests/authorization/test_ranger.py
+++ b/tests/authorization/test_ranger.py
@@ -720,21 +720,34 @@ class TestRanger(CustomClusterTestSuite):
   @staticmethod
   def _add_multiuser_row_filtering_policy(policy_name, db, table, users, filters):
     """Adds a row filtering policy on 'db'.'table' and returns the policy id.
+    If len(filters) == 1, all users use the same filter. Otherwise,
     users[0] has filters[0], users[1] has filters[1] and so on."""
     assert len(users) > 0
-    assert len(users) == len(filters)
+    assert len(filters) == 1 or len(users) == len(filters)
     items = []
-    for i in range(len(users)):
+    if len(filters) == 1:
       items.append({
-          "accesses": [
-            {
-              "type": "select",
-              "isAllowed": True
-            }
-          ],
-          "users": [users[i]],
-          "rowFilterInfo": {"filterExpr": filters[i]}
-        })
+        "accesses": [
+          {
+            "type": "select",
+            "isAllowed": True
+          }
+        ],
+        "users": users,
+        "rowFilterInfo": {"filterExpr": filters[0]}
+      })
+    else:
+      for i in range(len(users)):
+        items.append({
+            "accesses": [
+              {
+                "type": "select",
+                "isAllowed": True
+              }
+            ],
+            "users": [users[i]],
+            "rowFilterInfo": {"filterExpr": filters[i]}
+          })
     TestRanger._add_row_filtering_policy_with_items(policy_name, db, table, items)
 
   @staticmethod
@@ -1196,15 +1209,94 @@ class TestRanger(CustomClusterTestSuite):
       assert self.client.execute(query).get_data() == "0"
       assert non_owner_client.execute(query, user="non_owner").get_data() == "1"
       assert non_owner_2_client.execute(query, user="non_owner_2").get_data() == "2"
+
+      #######################################################
+      # Test row filters that contains complex subqueries
+      #######################################################
+      admin_client.execute("""create table %s.employee (
+          e_id bigint,
+          e_name string,
+          e_nation string)
+          stored as textfile""" % unique_database)
+      admin_client.execute("""insert into %s.employee values
+          (0, '%s', 'CHINA'),
+          (1, 'non_owner', 'PERU'),
+          (2, 'non_owner2', 'IRAQ')
+          """ % (unique_database, user))
+      admin_client.execute("grant select on table %s.employee to user %s"
+                           % (unique_database, user))
+      admin_client.execute("grant select on table %s.employee to user non_owner"
+                           % unique_database)
+      admin_client.execute("grant select on table %s.employee to user non_owner_2"
+                           % unique_database)
+      admin_client.execute("grant select on database tpch to user %s" % user)
+      admin_client.execute("grant select on database tpch to user non_owner")
+      admin_client.execute("grant select on database tpch to user non_owner_2")
+
+      # Each employee can only see customers in the same nation.
+      row_filter_tmpl = """c_nationkey in (
+            select n_nationkey from tpch.nation
+            where n_name in (
+              select e_nation from {db}.employee
+              where e_name = %s
+            )
+          )""".format(db=unique_database)
+      TestRanger._add_multiuser_row_filtering_policy(
+          unique_name + str(policy_cnt), "tpch", "customer",
+          [user, 'non_owner', 'non_owner_2'], [row_filter_tmpl % "current_user()"])
+      policy_cnt += 1
+      admin_client.execute("refresh authorization")
+
+      user_query = "select count(*) from tpch.customer"
+      admin_query_tmpl = user_query + " where " + row_filter_tmpl
+      self._verified_multiuser_results(
+          admin_client, admin_query_tmpl, user_query,
+          [user, 'non_owner', 'non_owner_2'],
+          [self.client, non_owner_client, non_owner_2_client])
+
+      tpch_q10_tmpl = """select c_custkey, c_name,
+          sum(l_extendedprice * (1 - l_discount)) as revenue,
+          c_acctbal, n_name, c_address, c_phone, c_comment
+        from {customer_place_holder}, tpch.orders, tpch.lineitem, tpch.nation
+        where
+          c_custkey = o_custkey and l_orderkey = o_orderkey
+          and o_orderdate >= '1993-10-01' and o_orderdate < '1994-01-01'
+          and l_returnflag = 'R' and c_nationkey = n_nationkey
+        group by c_custkey, c_name, c_acctbal, c_phone, n_name, c_address, c_comment
+        order by revenue desc, c_custkey
+        limit 20"""
+      user_query = tpch_q10_tmpl.format(customer_place_holder="tpch.customer")
+      admin_value = "(select * from tpch.customer where " + row_filter_tmpl + ") v"
+      admin_query_tmpl = tpch_q10_tmpl.format(customer_place_holder=admin_value)
+      self._verified_multiuser_results(
+          admin_client, admin_query_tmpl, user_query,
+          [user, 'non_owner', 'non_owner_2'],
+          [self.client, non_owner_client, non_owner_2_client])
     finally:
       for i in range(policy_cnt):
         TestRanger._remove_policy(unique_name + str(i))
       cleanup_statements = [
         "revoke select on table functional_parquet.alltypestiny from user non_owner",
-        "revoke select on table functional_parquet.alltypestiny from user non_owner_2"
+        "revoke select on table functional_parquet.alltypestiny from user non_owner_2",
+        "revoke select on database tpch from user non_owner",
+        "revoke select on database tpch from user non_owner_2",
+        "revoke select on table %s.employee from user %s" % (unique_database, user),
+        "revoke select on table %s.employee from user non_owner" % unique_database,
+        "revoke select on table %s.employee from user non_owner_2" % unique_database,
       ]
       for statement in cleanup_statements:
-        admin_client.execute(statement, user=ADMIN)
+        try:
+          admin_client.execute(statement, user=ADMIN)
+        except Exception as e:
+          LOG.error("Ignored exception in cleanup: " + str(e))
+
+  def _verified_multiuser_results(self, admin_client, admin_query_tmpl, user_query, users,
+                                 user_clients):
+    assert len(users) == len(user_clients)
+    for i in range(len(users)):
+      admin_res = admin_client.execute(admin_query_tmpl % ("'%s'" % users[i])).get_data()
+      user_res = user_clients[i].execute(user_query).get_data()
+      assert admin_res == user_res
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(

[impala] 03/03: IMPALA-10592: prevent pytest from hanging at exit.

Posted by as...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

asherman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit b28da054f3595bb92873433211438306fc22fbc7
Author: Andrew Sherman <as...@cloudera.com>
AuthorDate: Fri Mar 19 18:00:53 2021 -0700

    IMPALA-10592: prevent pytest from hanging at exit.
    
    In TestAdmissionControllerStress mark worker threads as daemons so that
    an exception in teardown() will not cause pytest to hang just after
    printing the test results.
    https://stackoverflow.com/questions/19219596/py-test-hangs-after-showing-test-results
    
    TESTING:
    
    Simulated the failure in IMPALA-10596 by throwing an exception during
    teardown(). Without this fix the pytest invocation hangs.
    
    Change-Id: I74cca8f577c7fbc4d394311e2f039cf4f68b08df
    Reviewed-on: http://gerrit.cloudera.org:8080/17212
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 tests/custom_cluster/test_admission_controller.py | 1 +
 1 file changed, 1 insertion(+)

diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py
index 15a9c63..f012ae2 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -1654,6 +1654,7 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
       self.lock = threading.RLock()
       self.query_handle = None
       self.shutdown = False  # Set by the main thread when tearing down
+      self.setDaemon(True)
 
     def run(self):
       client = None

[impala] 02/03: Update Paramiko to 2.4.2.

Posted by as...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

asherman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit e5d5dbc30a253b6385e5d84dcf53435166d55314
Author: Jim Apple <jb...@apache.org>
AuthorDate: Sun Mar 21 16:55:52 2021 -0700

    Update Paramiko to 2.4.2.
    
    See https://www.paramiko.org/changelog.html#2.4.2. This shouldn't
    directly apply to Impala deployments, but it is best to fix this in
    test now.
    
    Change-Id: If9cc9ea4a0763c8b5303ca4e8482761ee2f53efa
    Reviewed-on: http://gerrit.cloudera.org:8080/17214
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 infra/python/deps/extended-test-requirements.txt | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/infra/python/deps/extended-test-requirements.txt b/infra/python/deps/extended-test-requirements.txt
index 3f70b8d..06238dc 100644
--- a/infra/python/deps/extended-test-requirements.txt
+++ b/infra/python/deps/extended-test-requirements.txt
@@ -25,4 +25,4 @@ Flask==1.0.2
 # Fabric depends on Paramiko. Additionally, the stress test uses
 # Paramiko directly to keep a persistent SSH connection open to each
 # Impalad host to run in-test monitoring.
-paramiko==2.4.1
+paramiko==2.4.2