You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2023/09/12 07:21:42 UTC

[impala] branch master updated (41da3f54d -> 1642886b4)

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


    from 41da3f54d IMPALA-12425: Upgrade json-smart to fix CVE
     new 704ff7788 IMPALA-12383: Fix SingleNodePlanner aggregation limits
     new bd2df1170 IMPALA-12357: Skip scheduling bloom filter from full-build scan
     new 1642886b4 IMPALA-12432: Make LdapKerberosImpalaShellTest* work with Guava 28

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/exec/aggregation-node-base.cc               |    3 +-
 be/src/exec/grouping-aggregator.cc                 |    6 +-
 be/src/exec/grouping-aggregator.h                  |    5 +-
 be/src/service/fe-support.cc                       |   12 +
 be/src/util/backend-gflag-util.cc                  |   10 +
 common/thrift/BackendGflags.thrift                 |    2 +
 .../java/org/apache/impala/common/TreeNode.java    |   10 +
 .../org/apache/impala/planner/ExchangeNode.java    |    1 +
 .../java/org/apache/impala/planner/PlanNode.java   |    5 +
 .../impala/planner/RuntimeFilterGenerator.java     |  201 +++-
 .../java/org/apache/impala/planner/ScanNode.java   |    6 +
 .../java/org/apache/impala/planner/UnionNode.java  |    2 +
 .../org/apache/impala/service/BackendConfig.java   |    4 +
 .../java/org/apache/impala/service/FeSupport.java  |   15 +
 .../customcluster/LdapKerberosImpalaShellTest.java |   44 +-
 .../LdapKerberosImpalaShellTestBase.java           |   20 +-
 .../PlannerTest/bloom-filter-assignment.test       | 1202 ++++++++++++++++++++
 tests/common/impala_test_suite.py                  |   12 +-
 tests/common/test_dimensions.py                    |    9 +-
 tests/query_test/test_aggregation.py               |   18 +-
 20 files changed, 1496 insertions(+), 91 deletions(-)


[impala] 02/03: IMPALA-12357: Skip scheduling bloom filter from full-build scan

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit bd2df11709bf1b048e889be058fa758b51b97e76
Author: Riza Suminto <ri...@cloudera.com>
AuthorDate: Wed Aug 2 16:18:47 2023 -0700

    IMPALA-12357: Skip scheduling bloom filter from full-build scan
    
    PK-FK join between a dimension table and a fact table is common
    occurrences in a query. Such join often does not involve any predicate
    filter in the dimension table. Thus, bloom filter value from this kind
    of dimension table scan (PK) will most likely to have all values from
    the fact table column (FK). It is ineffective to generate this filter
    because it is unlikely to reject any rows, especially if the bloom
    filter size is large and has high false positive probability (fpp)
    estimate.
    
    This patch skip scheduling bloom filter from join node that has this
    characteristics:
    
    1. Build side is full table scan (has hard estimates).
    2. The build scan does not have any predicate filter nor consume any
       runtime filter.
    3. The join node is assumed to have PK-FK relationship.
    4. The planned bloom filter has resulting fpp estimate higher than
       max_filter_error_rate_from_full_scan flag (default to 0.9).
    
    The fourth criteria is an additional control to eliminate based on fpp
    threshold because low fpp filter sometimes is still effective in
    eliminating rows (i.e., rows with NULL value). Non-bloom filters remain
    unchanged as they are relatively lighter to build and evaluate than
    bloom filter.
    
    Testing:
    - Add testcase in testBloomFilterAssignment
    - Pass core tests
    - Ran TPC-DS 3TB with following query options:
      * RUNTIME_FILTER_MIN_SIZE=8192
      * RUNTIME_FILTER_MAX_SIZE=2097152
      * MAX_NUM_RUNTIME_FILTERS=50
      * RUNTIME_FILTER_WAIT_TIME_MS=10000
      19 out of 103 queries show reduction in number of runtime bloom
      filters without any notable performance regression.
    
    Change-Id: I494533bc06da84e606cbd1ae1619083333089a5e
    Reviewed-on: http://gerrit.cloudera.org:8080/20366
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/service/fe-support.cc                       |   12 +
 be/src/util/backend-gflag-util.cc                  |   10 +
 common/thrift/BackendGflags.thrift                 |    2 +
 .../java/org/apache/impala/common/TreeNode.java    |   10 +
 .../org/apache/impala/planner/ExchangeNode.java    |    1 +
 .../java/org/apache/impala/planner/PlanNode.java   |    5 +
 .../impala/planner/RuntimeFilterGenerator.java     |  201 +++-
 .../java/org/apache/impala/planner/ScanNode.java   |    6 +
 .../java/org/apache/impala/planner/UnionNode.java  |    2 +
 .../org/apache/impala/service/BackendConfig.java   |    4 +
 .../java/org/apache/impala/service/FeSupport.java  |   15 +
 .../PlannerTest/bloom-filter-assignment.test       | 1202 ++++++++++++++++++++
 12 files changed, 1433 insertions(+), 37 deletions(-)

diff --git a/be/src/service/fe-support.cc b/be/src/service/fe-support.cc
index 89e29c323..498c2e2df 100644
--- a/be/src/service/fe-support.cc
+++ b/be/src/service/fe-support.cc
@@ -652,6 +652,14 @@ Java_org_apache_impala_service_FeSupport_MinLogSpaceForBloomFilter(
   return BloomFilter::MinLogSpace(ndv, fpp);
 }
 
+/// Returns the expected false positive rate for the given ndv and log_bufferpool_space.
+extern "C"
+JNIEXPORT jdouble JNICALL
+Java_org_apache_impala_service_FeSupport_FalsePositiveProbForBloomFilter(
+    JNIEnv* env, jclass fe_support_class, jlong ndv, jint log_bufferpool_space) {
+  return BloomFilter::FalsePositiveProb(ndv, log_bufferpool_space);
+}
+
 extern "C"
 JNIEXPORT jbyteArray JNICALL
 Java_org_apache_impala_service_FeSupport_nativeParseDateString(JNIEnv* env,
@@ -753,6 +761,10 @@ static JNINativeMethod native_methods[] = {
     const_cast<char*>("MinLogSpaceForBloomFilter"), const_cast<char*>("(JD)I"),
     (void*)::Java_org_apache_impala_service_FeSupport_MinLogSpaceForBloomFilter
   },
+  {
+    const_cast<char*>("FalsePositiveProbForBloomFilter"), const_cast<char*>("(JI)D"),
+    (void*)::Java_org_apache_impala_service_FeSupport_FalsePositiveProbForBloomFilter
+  },
   {
     const_cast<char*>("nativeParseDateString"),
     const_cast<char*>("(Ljava/lang/String;)[B"),
diff --git a/be/src/util/backend-gflag-util.cc b/be/src/util/backend-gflag-util.cc
index ea09764d4..72fd2471b 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -241,6 +241,14 @@ DEFINE_bool_hidden(skip_resource_checking_on_last_executor_group_set, true,
     "ensure that query will always get admitted into last executor group set if it does "
     "not fit in any other group set.");
 
+DEFINE_double_hidden(max_filter_error_rate_from_full_scan, 0.9,
+    "(Advance) Skip generating bloom runtime filter that is generated from "
+    "a full build scan and has resulting error rate estimation that is higher than "
+    "this value after filter size limit applied. This config may get ignored if "
+    "target error rate is set with higher value through RUNTIME_FILTER_ERROR_RATE "
+    "query option or max_filter_error_rate backend flag. Setting value less than 0 "
+    "will disable this runtime filter reduction feature.");
+
 using strings::Substitute;
 
 namespace impala {
@@ -427,6 +435,8 @@ Status PopulateThriftBackendGflags(TBackendGflags& cfg) {
   cfg.__set_iceberg_reload_new_files_threshold(FLAGS_iceberg_reload_new_files_threshold);
   cfg.__set_enable_skipping_older_events(FLAGS_enable_skipping_older_events);
   cfg.__set_enable_json_scanner(FLAGS_enable_json_scanner);
+  cfg.__set_max_filter_error_rate_from_full_scan(
+      FLAGS_max_filter_error_rate_from_full_scan);
   return Status::OK();
 }
 
diff --git a/common/thrift/BackendGflags.thrift b/common/thrift/BackendGflags.thrift
index fa933424f..0ca30ca4a 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -264,4 +264,6 @@ struct TBackendGflags {
   116: required bool enable_skipping_older_events;
 
   117: required bool enable_json_scanner
+
+  118: required double max_filter_error_rate_from_full_scan
 }
diff --git a/fe/src/main/java/org/apache/impala/common/TreeNode.java b/fe/src/main/java/org/apache/impala/common/TreeNode.java
index db6f3327c..f10465809 100644
--- a/fe/src/main/java/org/apache/impala/common/TreeNode.java
+++ b/fe/src/main/java/org/apache/impala/common/TreeNode.java
@@ -245,4 +245,14 @@ public abstract class TreeNode<NodeType extends TreeNode<NodeType>> {
     visitor.visit((C) this);
     for (NodeType p: children_) p.accept(visitor);
   }
+
+  /**
+   * A variant of visitor pattern accept method that do post-order traversal instead
+   * of pre-order.
+   */
+  @SuppressWarnings("unchecked")
+  public <C extends TreeNode<NodeType>> void postAccept(Visitor<C> visitor) {
+    for (NodeType p : children_) p.postAccept(visitor);
+    visitor.visit((C) this);
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java b/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java
index 62f2a50ca..e92fa906f 100644
--- a/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java
@@ -111,6 +111,7 @@ public class ExchangeNode extends PlanNode {
     cardinality_ = capCardinalityAtLimit(children_.get(0).getCardinality());
     // Apply the offset correction if there's a valid cardinality
     if (cardinality_ > -1) cardinality_ = Math.max(0, cardinality_ - offset_);
+    hasHardEstimates_ = children_.get(0).hasHardEstimates_;
   }
 
   /**
diff --git a/fe/src/main/java/org/apache/impala/planner/PlanNode.java b/fe/src/main/java/org/apache/impala/planner/PlanNode.java
index 4fa1209c9..0f662fb81 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlanNode.java
@@ -158,6 +158,11 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
   // Gets set correctly in computeProcessingCost().
   protected ProcessingCost processingCost_ = ProcessingCost.invalid();
 
+  // True if this PlanNode and its children has very accurate cardinality estimates,
+  // assuming that the initial stats collection is also accurate and no runtime filter
+  // is involved.
+  protected boolean hasHardEstimates_ = false;
+
   protected PlanNode(PlanNodeId id, List<TupleId> tupleIds, String displayName) {
     this(id, displayName);
     tupleIds_.addAll(tupleIds);
diff --git a/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java b/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
index 0e4a6b36e..5034a61ad 100644
--- a/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
+++ b/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
@@ -19,6 +19,7 @@ package org.apache.impala.planner;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -27,6 +28,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import org.apache.impala.analysis.Analyzer;
 import org.apache.impala.analysis.BinaryPredicate;
@@ -39,15 +41,14 @@ import org.apache.impala.analysis.IsNullPredicate;
 import org.apache.impala.analysis.LiteralExpr;
 import org.apache.impala.analysis.NullLiteral;
 import org.apache.impala.analysis.Predicate;
-import org.apache.impala.analysis.SlotDescriptor;
 import org.apache.impala.analysis.SlotId;
 import org.apache.impala.analysis.SlotRef;
 import org.apache.impala.analysis.TupleDescriptor;
 import org.apache.impala.analysis.TupleId;
 import org.apache.impala.analysis.TupleIsNullPredicate;
-import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.Column;
 import org.apache.impala.catalog.FeIcebergTable;
+import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.HdfsFileFormat;
 import org.apache.impala.catalog.KuduColumn;
 import org.apache.impala.catalog.PrimitiveType;
@@ -60,7 +61,6 @@ import org.apache.impala.planner.JoinNode.DistributionMode;
 import org.apache.impala.service.BackendConfig;
 import org.apache.impala.service.FeSupport;
 import org.apache.impala.thrift.TColumnValue;
-import org.apache.impala.thrift.TExplainLevel;
 import org.apache.impala.thrift.TQueryOptions;
 import org.apache.impala.thrift.TRuntimeFilterDesc;
 import org.apache.impala.thrift.TRuntimeFilterMode;
@@ -68,6 +68,7 @@ import org.apache.impala.thrift.TRuntimeFilterTargetDesc;
 import org.apache.impala.thrift.TRuntimeFilterType;
 import org.apache.impala.util.BitUtil;
 import org.apache.impala.util.TColumnValueUtil;
+import org.apache.impala.util.Visitor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -127,6 +128,12 @@ public final class RuntimeFilterGenerator {
   private final IdGenerator<RuntimeFilterId> filterIdGenerator =
       RuntimeFilterId.createGenerator();
 
+  // Map between a FragmentId to the height of plan subtree rooted at that FragmentId.
+  // Can also be interpreted as the distance between FragmentId to its furthest leaf
+  // fragment + 1. Leaf fragment has height 1.
+  // Note that RuntimeFilterGenerator operate over a distributed plan tree.
+  private final Map<PlanFragmentId, Integer> fragmentHeight_ = new HashMap<>();
+
   /**
    * Internal class that encapsulates the max, min and default sizes used for creating
    * bloom filter objects, and entry limit for in-list filters.
@@ -153,11 +160,13 @@ public final class RuntimeFilterGenerator {
       long maxLimit = tQueryOptions.getRuntime_filter_max_size();
       long minBufferSize = BackendConfig.INSTANCE.getMinBufferSize();
       maxVal = BitUtil.roundUpToPowerOf2(Math.max(maxLimit, minBufferSize));
+      Preconditions.checkState(maxVal <= MAX_BLOOM_FILTER_SIZE);
 
       long minLimit = tQueryOptions.getRuntime_filter_min_size();
       minLimit = Math.max(minLimit, minBufferSize);
       // Make sure minVal <= defaultVal <= maxVal
       minVal = BitUtil.roundUpToPowerOf2(Math.min(minLimit, maxVal));
+      Preconditions.checkState(minVal >= MIN_BLOOM_FILTER_SIZE);
 
       long defaultValue = tQueryOptions.getRuntime_bloom_filter_size();
       defaultValue = Math.max(defaultValue, minVal);
@@ -220,6 +229,8 @@ public final class RuntimeFilterGenerator {
     private long filterSizeBytes_ = 0;
     // Size of the filter (in Bytes) before applying min/max filter sizes.
     private long originalFilterSizeBytes_ = 0;
+    // False positive probability of this filter. Only set for bloom filter.
+    private double est_fpp_ = 0;
     // If true, the filter is produced by a broadcast join and there is at least one
     // destination scan node which is in the same fragment as the join; set in
     // DistributedPlanner.createHashJoinFragment().
@@ -236,6 +247,10 @@ public final class RuntimeFilterGenerator {
     // If set, indicates that the filter is targeted for Kudu scan node with source
     // timestamp truncation.
     private boolean isTimestampTruncation_ = false;
+    // The level of this runtime filter.
+    // Runtime filter level is defined as the height of build side subtree of the
+    // join node that produce this filter.
+    private int level_ = 1;
 
     /**
      * Internal representation of a runtime filter target.
@@ -313,13 +328,13 @@ public final class RuntimeFilterGenerator {
       @Override
       public String toString() {
         StringBuilder output = new StringBuilder();
-        return output.append("Target Id: " + node.getId() + " ")
-            .append("Target expr: " + expr.debugString() + " ")
-            .append("Partition columns: " + isBoundByPartitionColumns)
-            .append("Is column stored in data files: " + isColumnInDataFile)
-            .append("Is local: " + isLocalTarget)
-            .append("lowValue: " + (lowValue != null ? lowValue.toString() : -1))
-            .append("highValue: " + (highValue != null ? highValue.toString() : -1))
+        return output.append("Target Id: " + node.getId())
+            .append(" Target expr: " + expr.debugString())
+            .append(" Partition columns: " + isBoundByPartitionColumns)
+            .append(" Is column stored in data files: " + isColumnInDataFile)
+            .append(" Is local: " + isLocalTarget)
+            .append(" lowValue: " + (lowValue != null ? lowValue.toString() : -1))
+            .append(" highValue: " + (highValue != null ? highValue.toString() : -1))
             .toString();
       }
     }
@@ -327,7 +342,7 @@ public final class RuntimeFilterGenerator {
     private RuntimeFilter(RuntimeFilterId filterId, JoinNode filterSrcNode, Expr srcExpr,
         Expr origTargetExpr, Operator exprCmpOp, Map<TupleId, List<SlotId>> targetSlots,
         TRuntimeFilterType type, FilterSizeLimits filterSizeLimits,
-        boolean isTimestampTruncation) {
+        boolean isTimestampTruncation, int level) {
       id_ = filterId;
       src_ = filterSrcNode;
       srcExpr_ = srcExpr;
@@ -336,6 +351,7 @@ public final class RuntimeFilterGenerator {
       targetSlotsByTid_ = targetSlots;
       type_ = type;
       isTimestampTruncation_ = isTimestampTruncation;
+      level_ = level;
       computeNdvEstimate();
       calculateFilterSize(filterSizeLimits);
     }
@@ -387,7 +403,7 @@ public final class RuntimeFilterGenerator {
     public static RuntimeFilter create(IdGenerator<RuntimeFilterId> idGen,
         Analyzer analyzer, Expr joinPredicate, JoinNode filterSrcNode,
         TRuntimeFilterType type, FilterSizeLimits filterSizeLimits,
-        boolean isTimestampTruncation) {
+        boolean isTimestampTruncation, int level) {
       Preconditions.checkNotNull(idGen);
       Preconditions.checkNotNull(joinPredicate);
       Preconditions.checkNotNull(filterSrcNode);
@@ -522,7 +538,7 @@ public final class RuntimeFilterGenerator {
       }
       return new RuntimeFilter(idGen.getNextId(), filterSrcNode, srcExpr, targetExpr,
           normalizedJoinConjunct.getOp(), targetSlots, type, filterSizeLimits,
-          isTimestampTruncation);
+          isTimestampTruncation, level);
     }
 
     /**
@@ -715,6 +731,9 @@ public final class RuntimeFilterGenerator {
       filterSizeBytes_ = originalFilterSizeBytes_;
       filterSizeBytes_ = Math.max(filterSizeBytes_, filterSizeLimits.minVal);
       filterSizeBytes_ = Math.min(filterSizeBytes_, filterSizeLimits.maxVal);
+      int actualLogFilterSize = (int) (Math.log(filterSizeBytes_) / Math.log(2));
+      est_fpp_ =
+          FeSupport.GetFalsePositiveProbForBloomFilter(ndvEstimate_, actualLogFilterSize);
     }
 
     /**
@@ -728,18 +747,21 @@ public final class RuntimeFilterGenerator {
 
     public String debugString() {
       StringBuilder output = new StringBuilder();
-      return output.append("FilterID: " + id_ + " ")
-          .append("Type: " + type_ + " ")
-          .append("Source: " + src_.getId() + " ")
-          .append("SrcExpr: " + getSrcExpr().debugString() +  " ")
-          .append("Target(s): ")
-          .append(Joiner.on(", ").join(targets_) + " ")
-          .append("Selectivity: " + getSelectivity() + " ")
-          .append("Build key NDV: " + buildKeyNdv_ + " ")
-          .append("NDV estimate " + ndvEstimate_ + " ")
-          .append("Filter size (bytes): " + filterSizeBytes_ + " ")
-          .append("Original filter size (bytes): " + originalFilterSizeBytes_ + " ")
-          .toString();
+      output.append("FilterID: " + id_)
+          .append(" Type: " + type_)
+          .append(" Source: " + src_.getId())
+          .append(" SrcExpr: " + getSrcExpr().debugString())
+          .append(" Target(s): " + Joiner.on(", ").join(targets_))
+          .append(" Selectivity: " + getSelectivity())
+          .append(" Build key NDV: " + buildKeyNdv_)
+          .append(" NDV estimate " + ndvEstimate_)
+          .append(" Filter size (bytes): " + filterSizeBytes_)
+          .append(" Original filter size (bytes): " + originalFilterSizeBytes_)
+          .append(" Level: " + level_);
+      if (type_ == TRuntimeFilterType.BLOOM) {
+        output.append(" Est fpp: " + est_fpp_);
+      }
+      return output.toString();
     }
   }
 
@@ -812,6 +834,92 @@ public final class RuntimeFilterGenerator {
     }
   }
 
+  /**
+   * Visitor class over PlanNode tree to check whether the build hand side of join is
+   * eligible for runtime filter pruning based on full scan assumption.
+   */
+  private class BuildVisitor implements Visitor<PlanNode> {
+    private boolean hasIncomingFilter_ = false;
+
+    @Override
+    public void visit(PlanNode a) {
+      if (!isEligibleForPrunning() || !(a instanceof ScanNode)) return;
+
+      ScanNode scan = (ScanNode) a;
+      TupleId sourceTid = scan.getTupleIds().get(0);
+      List<RuntimeFilter> incomingFilters = runtimeFiltersByTid_.get(sourceTid);
+      hasIncomingFilter_ |= (incomingFilters != null && !incomingFilters.isEmpty());
+    }
+
+    public boolean isEligibleForPrunning() { return !hasIncomingFilter_; }
+
+    public void reset() { hasIncomingFilter_ = false; }
+  }
+
+  /**
+   * Remove filters that are unlikely to be useful out of allFilters list.
+   */
+  private void reduceFilters(Collection<RuntimeFilter> allFilters) {
+    if (BackendConfig.INSTANCE.getMaxFilterErrorRateFromFullScan() < 0) return;
+
+    // Filter out bloom filter with fpp > max_filter_error_rate_from_full_scan that
+    // comes from full table scan without filter predicate or incoming runtime filter.
+    // Only do this optimization for filter level 1 for simplicity.
+    boolean hasEliminatedFilter = true;
+    final double maxAllowedFpp =
+        Math.max(BackendConfig.INSTANCE.getMaxFilterErrorRateFromFullScan(),
+            filterSizeLimits_.targetFpp);
+    List<RuntimeFilter> highFppFilters =
+        allFilters.stream()
+            .filter(f
+                -> f.getType() == TRuntimeFilterType.BLOOM
+                    && f.level_ <= 1
+                    // Build hand node has hard cardinality estimates
+                    // (no reduction before runtime filter addition).
+                    && f.src_.getChild(1).hasHardEstimates_
+                    // At least assumed to have fk/pk relationship.
+                    && f.src_.fkPkEqJoinConjuncts_ != null
+                    // TODO: FK/PK alone may not be enough to justify eliminating the
+                    // filter because it could end up eliminating a filter that is
+                    // effective because it filters a lot of NULLs. Therefore, fpp
+                    // threshold check is added here to only consider the high fpp one.
+                    // This last check can be removed once IMPALA-2743 is implemented
+                    // to cover the NULL filtering.
+                    && f.est_fpp_ > maxAllowedFpp)
+            .collect(Collectors.toList());
+
+    // As we eliminate a runtime filter in one iteration, there might be another runtime
+    // filter that becomes eligible for removal. Loop this routine until it does eliminate
+    // any other runtime filter.
+    while (!highFppFilters.isEmpty() && hasEliminatedFilter) {
+      hasEliminatedFilter = false;
+      List<RuntimeFilter> skippedFilters = new ArrayList<>();
+      BuildVisitor buildVisitor = new BuildVisitor();
+      for (RuntimeFilter filter : highFppFilters) {
+        // Check if the build side of filter.src_ has a filtering scan or not.
+        buildVisitor.reset();
+        filter.src_.getChild(1).accept(buildVisitor);
+        if (buildVisitor.isEligibleForPrunning()) {
+          // Build side of filter.src_ is all full scan.
+          // Skip this filter and remove it from all of its target TupleIds in
+          // runtimeFiltersByTid_ map.
+          hasEliminatedFilter = true;
+          skippedFilters.add(filter);
+          for (RuntimeFilter.RuntimeFilterTarget target : filter.getTargets()) {
+            TupleId targetTid = target.node.getTupleIds().get(0);
+            runtimeFiltersByTid_.get(targetTid).remove(filter);
+          }
+        }
+      }
+
+      // Remove all skippedFilters from highFppFilters and allFilters for next iteration.
+      for (RuntimeFilter filter : skippedFilters) {
+        highFppFilters.remove(filter);
+        allFilters.remove(filter);
+      }
+    }
+  }
+
   /**
    * Returns a list of all the registered runtime filters, ordered by filter ID.
    */
@@ -828,9 +936,21 @@ public final class RuntimeFilterGenerator {
         }
       }
     );
+    reduceFilters(resultList);
     return resultList;
   }
 
+  private void generateFilters(PlannerContext ctx, PlanNode root) {
+    root.getFragment().postAccept(f -> {
+      int height = 0;
+      for (PlanFragment child : f.getChildren()) {
+        height = Math.max(height, fragmentHeight_.get(child.getId()));
+      }
+      fragmentHeight_.put(((PlanFragment) f).getId(), height + 1);
+    });
+    generateFiltersRecursive(ctx, root);
+  }
+
   /**
    * Generates the runtime filters for a query by recursively traversing the distributed
    * plan tree rooted at 'root'. In the top-down traversal of the plan tree, candidate
@@ -838,9 +958,18 @@ public final class RuntimeFilterGenerator {
    * In the bottom-up traversal of the plan tree, the filters are assigned to destination
    * (scan) nodes. Filters that cannot be assigned to a scan node are discarded.
    */
-  private void generateFilters(PlannerContext ctx, PlanNode root) {
+  private void generateFiltersRecursive(PlannerContext ctx, PlanNode root) {
     if (root instanceof HashJoinNode || root instanceof NestedLoopJoinNode) {
       JoinNode joinNode = (JoinNode) root;
+
+      // Determine level of filter produced by root.
+      PlanNode buildHandNode = root.getChild(1);
+      while (buildHandNode.getFragment() == root.getFragment()
+          && buildHandNode.hasChild(0)) {
+        buildHandNode = buildHandNode.getChild(0);
+      }
+      int level = fragmentHeight_.get(buildHandNode.getFragment().getId());
+
       List<Expr> joinConjuncts = new ArrayList<>();
       if (!joinNode.getJoinOp().isLeftOuterJoin()
           && !joinNode.getJoinOp().isFullOuterJoin()
@@ -858,10 +987,9 @@ public final class RuntimeFilterGenerator {
       for (TRuntimeFilterType filterType : TRuntimeFilterType.values()) {
         if (!enabledRuntimeFilterTypes.contains(filterType)) continue;
         for (Expr conjunct : joinConjuncts) {
-          RuntimeFilter filter =
-              RuntimeFilter.create(filterIdGenerator, ctx.getRootAnalyzer(), conjunct,
-                  joinNode, filterType, filterSizeLimits_,
-                  /* isTimestampTruncation */ false);
+          RuntimeFilter filter = RuntimeFilter.create(filterIdGenerator,
+              ctx.getRootAnalyzer(), conjunct, joinNode, filterType, filterSizeLimits_,
+              /* isTimestampTruncation */ false, level);
           if (filter != null) {
             registerRuntimeFilter(filter);
             filters.add(filter);
@@ -872,27 +1000,26 @@ public final class RuntimeFilterGenerator {
               && Predicate.isEquivalencePredicate(conjunct)
               && conjunct.getChild(0).getType().isTimestamp()
               && conjunct.getChild(1).getType().isTimestamp()) {
-            RuntimeFilter filter2 =
-                RuntimeFilter.create(filterIdGenerator, ctx.getRootAnalyzer(), conjunct,
-                    joinNode, filterType, filterSizeLimits_,
-                    /* isTimestampTruncation */ true);
+            RuntimeFilter filter2 = RuntimeFilter.create(filterIdGenerator,
+                ctx.getRootAnalyzer(), conjunct, joinNode, filterType, filterSizeLimits_,
+                /* isTimestampTruncation */ true, level);
             if (filter2 == null) continue;
             registerRuntimeFilter(filter2);
             filters.add(filter2);
           }
         }
       }
-      generateFilters(ctx, root.getChild(0));
+      generateFiltersRecursive(ctx, root.getChild(0));
       // Finalize every runtime filter of that join. This is to ensure that we don't
       // assign a filter to a scan node from the right subtree of joinNode or ancestor
       // join nodes in case we don't find a destination node in the left subtree.
       for (RuntimeFilter runtimeFilter: filters) finalizeRuntimeFilter(runtimeFilter);
-      generateFilters(ctx, root.getChild(1));
+      generateFiltersRecursive(ctx, root.getChild(1));
     } else if (root instanceof ScanNode) {
       assignRuntimeFilters(ctx, (ScanNode) root);
     } else {
       for (PlanNode childNode: root.getChildren()) {
-        generateFilters(ctx, childNode);
+        generateFiltersRecursive(ctx, childNode);
       }
     }
   }
diff --git a/fe/src/main/java/org/apache/impala/planner/ScanNode.java b/fe/src/main/java/org/apache/impala/planner/ScanNode.java
index 7ab07bbe9..a2bcda486 100644
--- a/fe/src/main/java/org/apache/impala/planner/ScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/ScanNode.java
@@ -89,6 +89,12 @@ abstract public class ScanNode extends PlanNode {
     desc_ = desc;
   }
 
+  @Override
+  public void computeStats(Analyzer analyzer) {
+    super.computeStats(analyzer);
+    hasHardEstimates_ = !hasScanConjuncts() && !isAccessingCollectionType();
+  }
+
   public TupleDescriptor getTupleDesc() { return desc_; }
 
   @Override
diff --git a/fe/src/main/java/org/apache/impala/planner/UnionNode.java b/fe/src/main/java/org/apache/impala/planner/UnionNode.java
index a83a4ecdc..30636ad2e 100644
--- a/fe/src/main/java/org/apache/impala/planner/UnionNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/UnionNode.java
@@ -116,6 +116,7 @@ public class UnionNode extends PlanNode {
     super.computeStats(analyzer);
     long totalChildCardinality = 0;
     boolean haveChildWithCardinality = false;
+    hasHardEstimates_ = true;
     for (PlanNode child: children_) {
       // ignore missing child cardinality info in the hope it won't matter enough
       // to change the planning outcome
@@ -129,6 +130,7 @@ public class UnionNode extends PlanNode {
       // subsets of each other, i.e. not just partly overlapping.
       numNodes_ = Math.max(child.getNumNodes(), numNodes_);
       numInstances_ = Math.max(child.getNumInstances(), numInstances_);
+      hasHardEstimates_ &= child.hasHardEstimates_;
     }
     // Consider estimate valid if we have at least one child with known cardinality, or
     // only constant values.
diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
index 93025fee3..980e7b016 100644
--- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java
+++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
@@ -432,4 +432,8 @@ public class BackendConfig {
   public boolean isJsonScannerEnabled() {
     return backendCfg_.enable_json_scanner;
   }
+
+  public double getMaxFilterErrorRateFromFullScan() {
+    return backendCfg_.max_filter_error_rate_from_full_scan;
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/service/FeSupport.java b/fe/src/main/java/org/apache/impala/service/FeSupport.java
index 7e9d08cea..e8ea8430a 100644
--- a/fe/src/main/java/org/apache/impala/service/FeSupport.java
+++ b/fe/src/main/java/org/apache/impala/service/FeSupport.java
@@ -132,6 +132,8 @@ public class FeSupport {
       byte[] queryOptions);
 
   public native static int MinLogSpaceForBloomFilter(long ndv, double fpp);
+  public native static double FalsePositiveProbForBloomFilter(
+      long ndv, int logBufferpoolSpace);
 
   // Parses date string, verifies if it is valid and returns the resulting
   // TParseDateStringResult object. Different date string variations are accepted.
@@ -436,6 +438,19 @@ public class FeSupport {
     return MinLogSpaceForBloomFilter(ndv, fpp);
   }
 
+  /**
+   * Returns the expected false positive rate for the given ndv and logBufferpoolSpace.
+   */
+  public static double GetFalsePositiveProbForBloomFilter(
+      long ndv, int logBufferpoolSpace) {
+    try {
+      return FalsePositiveProbForBloomFilter(ndv, logBufferpoolSpace);
+    } catch (UnsatisfiedLinkError e) {
+      loadLibrary();
+    }
+    return FalsePositiveProbForBloomFilter(ndv, logBufferpoolSpace);
+  }
+
   public static byte[] GetPartialCatalogObject(byte[] thriftReq)
       throws InternalException {
     try {
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/bloom-filter-assignment.test b/testdata/workloads/functional-planner/queries/PlannerTest/bloom-filter-assignment.test
index c141b52cc..a8f648bcd 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/bloom-filter-assignment.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/bloom-filter-assignment.test
@@ -540,3 +540,1205 @@ PLAN-ROOT SINK
    tuple-ids=0 row-size=16B cardinality=12.88K
    in pipelines: 00(GETNEXT)
 ====
+# IMPALA-12357: filter size 512KB can achieve fpp lower than 0.9.
+select STRAIGHT_JOIN count(*) from
+     (select l_orderkey from tpch_parquet.lineitem) a
+     join (select o_orderkey from tpch_parquet.orders) l1 on a.l_orderkey = l1.o_orderkey
+     join (select o_orderkey from tpch_parquet.orders) l2 on l1.o_orderkey = l2.o_orderkey
+     join (select o_orderkey from tpch_parquet.orders) l3 on l2.o_orderkey = l3.o_orderkey;
+---- QUERYOPTIONS
+RUNTIME_FILTER_MAX_SIZE=512KB
+EXPLAIN_LEVEL=2
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=379.83MB mem-reservation=119.50MB thread-reservation=5 runtime-filters-memory=1.50MB
+PLAN-ROOT SINK
+|  output exprs: count(*)
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|
+07:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  mem-estimate=16.00KB mem-reservation=0B spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=8 row-size=8B cardinality=1
+|  in pipelines: 07(GETNEXT), 00(OPEN)
+|
+06:HASH JOIN [INNER JOIN]
+|  hash predicates: o_orderkey = o_orderkey
+|  fk/pk conjuncts: o_orderkey = o_orderkey
+|  runtime filters: RF000[bloom] <- o_orderkey
+|  mem-estimate=59.44MB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=0,2,4,6 row-size=32B cardinality=1
+|  in pipelines: 00(GETNEXT), 03(OPEN)
+|
+|--03:SCAN HDFS [tpch_parquet.orders]
+|     HDFS partitions=1/1 files=2 size=54.21MB
+|     stored statistics:
+|       table: rows=1.50M size=54.21MB
+|       columns: all
+|     extrapolated-rows=disabled max-scan-range-rows=1.18M
+|     mem-estimate=40.00MB mem-reservation=4.00MB thread-reservation=1
+|     tuple-ids=6 row-size=8B cardinality=1.50M
+|     in pipelines: 03(GETNEXT)
+|
+05:HASH JOIN [INNER JOIN]
+|  hash predicates: o_orderkey = o_orderkey
+|  fk/pk conjuncts: o_orderkey = o_orderkey
+|  runtime filters: RF002[bloom] <- o_orderkey
+|  mem-estimate=59.44MB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=0,2,4 row-size=24B cardinality=1
+|  in pipelines: 00(GETNEXT), 02(OPEN)
+|
+|--02:SCAN HDFS [tpch_parquet.orders]
+|     HDFS partitions=1/1 files=2 size=54.21MB
+|     runtime filters: RF000[bloom] -> o_orderkey
+|     stored statistics:
+|       table: rows=1.50M size=54.21MB
+|       columns: all
+|     extrapolated-rows=disabled max-scan-range-rows=1.18M
+|     mem-estimate=40.00MB mem-reservation=4.00MB thread-reservation=1
+|     tuple-ids=4 row-size=8B cardinality=1.50M
+|     in pipelines: 02(GETNEXT)
+|
+04:HASH JOIN [INNER JOIN]
+|  hash predicates: l_orderkey = o_orderkey
+|  fk/pk conjuncts: l_orderkey = o_orderkey
+|  runtime filters: RF004[bloom] <- o_orderkey
+|  mem-estimate=59.44MB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=0,2 row-size=16B cardinality=1
+|  in pipelines: 00(GETNEXT), 01(OPEN)
+|
+|--01:SCAN HDFS [tpch_parquet.orders]
+|     HDFS partitions=1/1 files=2 size=54.21MB
+|     runtime filters: RF000[bloom] -> tpch_parquet.orders.o_orderkey, RF002[bloom] -> o_orderkey
+|     stored statistics:
+|       table: rows=1.50M size=54.21MB
+|       columns: all
+|     extrapolated-rows=disabled max-scan-range-rows=1.18M
+|     mem-estimate=40.00MB mem-reservation=4.00MB thread-reservation=1
+|     tuple-ids=2 row-size=8B cardinality=1.50M
+|     in pipelines: 01(GETNEXT)
+|
+00:SCAN HDFS [tpch_parquet.lineitem]
+   HDFS partitions=1/1 files=3 size=194.00MB
+   runtime filters: RF000[bloom] -> tpch_parquet.lineitem.l_orderkey, RF002[bloom] -> tpch_parquet.lineitem.l_orderkey, RF004[bloom] -> l_orderkey
+   stored statistics:
+     table: rows=6.00M size=194.00MB
+     columns: all
+   extrapolated-rows=disabled max-scan-range-rows=2.14M
+   mem-estimate=80.00MB mem-reservation=4.00MB thread-reservation=1
+   tuple-ids=0 row-size=8B cardinality=6.00M
+   in pipelines: 00(GETNEXT)
+---- DISTRIBUTEDPLAN
+F04:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=32.00KB mem-reservation=0B thread-reservation=1
+PLAN-ROOT SINK
+|  output exprs: count(*)
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|
+12:AGGREGATE [FINALIZE]
+|  output: count:merge(*)
+|  mem-estimate=16.00KB mem-reservation=0B spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=8 row-size=8B cardinality=1
+|  in pipelines: 12(GETNEXT), 07(OPEN)
+|
+11:EXCHANGE [UNPARTITIONED]
+|  mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=8 row-size=8B cardinality=1
+|  in pipelines: 07(GETNEXT)
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+Per-Host Resources: mem-estimate=289.90MB mem-reservation=107.50MB thread-reservation=2 runtime-filters-memory=1.50MB
+07:AGGREGATE
+|  output: count(*)
+|  mem-estimate=16.00KB mem-reservation=0B spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=8 row-size=8B cardinality=1
+|  in pipelines: 07(GETNEXT), 00(OPEN)
+|
+06:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: o_orderkey = o_orderkey
+|  fk/pk conjuncts: o_orderkey = o_orderkey
+|  runtime filters: RF000[bloom] <- o_orderkey
+|  mem-estimate=59.44MB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=0,2,4,6 row-size=32B cardinality=5.76M
+|  in pipelines: 00(GETNEXT), 03(OPEN)
+|
+|--10:EXCHANGE [BROADCAST]
+|  |  mem-estimate=10.02MB mem-reservation=0B thread-reservation=0
+|  |  tuple-ids=6 row-size=8B cardinality=1.50M
+|  |  in pipelines: 03(GETNEXT)
+|  |
+|  F03:PLAN FRAGMENT [RANDOM] hosts=2 instances=2
+|  Per-Host Resources: mem-estimate=40.05MB mem-reservation=4.00MB thread-reservation=2
+|  03:SCAN HDFS [tpch_parquet.orders, RANDOM]
+|     HDFS partitions=1/1 files=2 size=54.21MB
+|     stored statistics:
+|       table: rows=1.50M size=54.21MB
+|       columns: all
+|     extrapolated-rows=disabled max-scan-range-rows=1.18M
+|     mem-estimate=40.00MB mem-reservation=4.00MB thread-reservation=1
+|     tuple-ids=6 row-size=8B cardinality=1.50M
+|     in pipelines: 03(GETNEXT)
+|
+05:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: o_orderkey = o_orderkey
+|  fk/pk conjuncts: o_orderkey = o_orderkey
+|  runtime filters: RF002[bloom] <- o_orderkey
+|  mem-estimate=59.44MB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=0,2,4 row-size=24B cardinality=5.76M
+|  in pipelines: 00(GETNEXT), 02(OPEN)
+|
+|--09:EXCHANGE [BROADCAST]
+|  |  mem-estimate=10.02MB mem-reservation=0B thread-reservation=0
+|  |  tuple-ids=4 row-size=8B cardinality=1.50M
+|  |  in pipelines: 02(GETNEXT)
+|  |
+|  F02:PLAN FRAGMENT [RANDOM] hosts=2 instances=2
+|  Per-Host Resources: mem-estimate=40.55MB mem-reservation=4.50MB thread-reservation=2 runtime-filters-memory=512.00KB
+|  02:SCAN HDFS [tpch_parquet.orders, RANDOM]
+|     HDFS partitions=1/1 files=2 size=54.21MB
+|     runtime filters: RF000[bloom] -> o_orderkey
+|     stored statistics:
+|       table: rows=1.50M size=54.21MB
+|       columns: all
+|     extrapolated-rows=disabled max-scan-range-rows=1.18M
+|     mem-estimate=40.00MB mem-reservation=4.00MB thread-reservation=1
+|     tuple-ids=4 row-size=8B cardinality=1.50M
+|     in pipelines: 02(GETNEXT)
+|
+04:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: l_orderkey = o_orderkey
+|  fk/pk conjuncts: l_orderkey = o_orderkey
+|  runtime filters: RF004[bloom] <- o_orderkey
+|  mem-estimate=59.44MB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=0,2 row-size=16B cardinality=5.76M
+|  in pipelines: 00(GETNEXT), 01(OPEN)
+|
+|--08:EXCHANGE [BROADCAST]
+|  |  mem-estimate=10.02MB mem-reservation=0B thread-reservation=0
+|  |  tuple-ids=2 row-size=8B cardinality=1.50M
+|  |  in pipelines: 01(GETNEXT)
+|  |
+|  F01:PLAN FRAGMENT [RANDOM] hosts=2 instances=2
+|  Per-Host Resources: mem-estimate=41.05MB mem-reservation=5.00MB thread-reservation=2 runtime-filters-memory=1.00MB
+|  01:SCAN HDFS [tpch_parquet.orders, RANDOM]
+|     HDFS partitions=1/1 files=2 size=54.21MB
+|     runtime filters: RF000[bloom] -> tpch_parquet.orders.o_orderkey, RF002[bloom] -> o_orderkey
+|     stored statistics:
+|       table: rows=1.50M size=54.21MB
+|       columns: all
+|     extrapolated-rows=disabled max-scan-range-rows=1.18M
+|     mem-estimate=40.00MB mem-reservation=4.00MB thread-reservation=1
+|     tuple-ids=2 row-size=8B cardinality=1.50M
+|     in pipelines: 01(GETNEXT)
+|
+00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
+   HDFS partitions=1/1 files=3 size=194.00MB
+   runtime filters: RF000[bloom] -> tpch_parquet.lineitem.l_orderkey, RF002[bloom] -> tpch_parquet.lineitem.l_orderkey, RF004[bloom] -> l_orderkey
+   stored statistics:
+     table: rows=6.00M size=194.00MB
+     columns: all
+   extrapolated-rows=disabled max-scan-range-rows=2.14M
+   mem-estimate=80.00MB mem-reservation=4.00MB thread-reservation=1
+   tuple-ids=0 row-size=8B cardinality=6.00M
+   in pipelines: 00(GETNEXT)
+====
+# IMPALA-12357: filter size < 512KB can not achieve fpp lower than 0.9 and will be skipped.
+select STRAIGHT_JOIN count(*) from
+     (select l_orderkey from tpch_parquet.lineitem) a
+     join (select o_orderkey from tpch_parquet.orders) l1 on a.l_orderkey = l1.o_orderkey
+     join (select o_orderkey from tpch_parquet.orders) l2 on l1.o_orderkey = l2.o_orderkey
+     join (select o_orderkey from tpch_parquet.orders) l3 on l2.o_orderkey = l3.o_orderkey;
+---- QUERYOPTIONS
+RUNTIME_FILTER_MAX_SIZE=256KB
+EXPLAIN_LEVEL=2
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=378.33MB mem-reservation=118.00MB thread-reservation=5
+PLAN-ROOT SINK
+|  output exprs: count(*)
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|
+07:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  mem-estimate=16.00KB mem-reservation=0B spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=8 row-size=8B cardinality=1
+|  in pipelines: 07(GETNEXT), 00(OPEN)
+|
+06:HASH JOIN [INNER JOIN]
+|  hash predicates: o_orderkey = o_orderkey
+|  fk/pk conjuncts: o_orderkey = o_orderkey
+|  mem-estimate=59.44MB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=0,2,4,6 row-size=32B cardinality=1
+|  in pipelines: 00(GETNEXT), 03(OPEN)
+|
+|--03:SCAN HDFS [tpch_parquet.orders]
+|     HDFS partitions=1/1 files=2 size=54.21MB
+|     stored statistics:
+|       table: rows=1.50M size=54.21MB
+|       columns: all
+|     extrapolated-rows=disabled max-scan-range-rows=1.18M
+|     mem-estimate=40.00MB mem-reservation=4.00MB thread-reservation=1
+|     tuple-ids=6 row-size=8B cardinality=1.50M
+|     in pipelines: 03(GETNEXT)
+|
+05:HASH JOIN [INNER JOIN]
+|  hash predicates: o_orderkey = o_orderkey
+|  fk/pk conjuncts: o_orderkey = o_orderkey
+|  mem-estimate=59.44MB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=0,2,4 row-size=24B cardinality=1
+|  in pipelines: 00(GETNEXT), 02(OPEN)
+|
+|--02:SCAN HDFS [tpch_parquet.orders]
+|     HDFS partitions=1/1 files=2 size=54.21MB
+|     stored statistics:
+|       table: rows=1.50M size=54.21MB
+|       columns: all
+|     extrapolated-rows=disabled max-scan-range-rows=1.18M
+|     mem-estimate=40.00MB mem-reservation=4.00MB thread-reservation=1
+|     tuple-ids=4 row-size=8B cardinality=1.50M
+|     in pipelines: 02(GETNEXT)
+|
+04:HASH JOIN [INNER JOIN]
+|  hash predicates: l_orderkey = o_orderkey
+|  fk/pk conjuncts: l_orderkey = o_orderkey
+|  mem-estimate=59.44MB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=0,2 row-size=16B cardinality=1
+|  in pipelines: 00(GETNEXT), 01(OPEN)
+|
+|--01:SCAN HDFS [tpch_parquet.orders]
+|     HDFS partitions=1/1 files=2 size=54.21MB
+|     stored statistics:
+|       table: rows=1.50M size=54.21MB
+|       columns: all
+|     extrapolated-rows=disabled max-scan-range-rows=1.18M
+|     mem-estimate=40.00MB mem-reservation=4.00MB thread-reservation=1
+|     tuple-ids=2 row-size=8B cardinality=1.50M
+|     in pipelines: 01(GETNEXT)
+|
+00:SCAN HDFS [tpch_parquet.lineitem]
+   HDFS partitions=1/1 files=3 size=194.00MB
+   stored statistics:
+     table: rows=6.00M size=194.00MB
+     columns: all
+   extrapolated-rows=disabled max-scan-range-rows=2.14M
+   mem-estimate=80.00MB mem-reservation=4.00MB thread-reservation=1
+   tuple-ids=0 row-size=8B cardinality=6.00M
+   in pipelines: 00(GETNEXT)
+---- DISTRIBUTEDPLAN
+F04:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=32.00KB mem-reservation=0B thread-reservation=1
+PLAN-ROOT SINK
+|  output exprs: count(*)
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|
+12:AGGREGATE [FINALIZE]
+|  output: count:merge(*)
+|  mem-estimate=16.00KB mem-reservation=0B spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=8 row-size=8B cardinality=1
+|  in pipelines: 12(GETNEXT), 07(OPEN)
+|
+11:EXCHANGE [UNPARTITIONED]
+|  mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=8 row-size=8B cardinality=1
+|  in pipelines: 07(GETNEXT)
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+Per-Host Resources: mem-estimate=288.40MB mem-reservation=106.00MB thread-reservation=2
+07:AGGREGATE
+|  output: count(*)
+|  mem-estimate=16.00KB mem-reservation=0B spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=8 row-size=8B cardinality=1
+|  in pipelines: 07(GETNEXT), 00(OPEN)
+|
+06:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: o_orderkey = o_orderkey
+|  fk/pk conjuncts: o_orderkey = o_orderkey
+|  mem-estimate=59.44MB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=0,2,4,6 row-size=32B cardinality=5.76M
+|  in pipelines: 00(GETNEXT), 03(OPEN)
+|
+|--10:EXCHANGE [BROADCAST]
+|  |  mem-estimate=10.02MB mem-reservation=0B thread-reservation=0
+|  |  tuple-ids=6 row-size=8B cardinality=1.50M
+|  |  in pipelines: 03(GETNEXT)
+|  |
+|  F03:PLAN FRAGMENT [RANDOM] hosts=2 instances=2
+|  Per-Host Resources: mem-estimate=40.05MB mem-reservation=4.00MB thread-reservation=2
+|  03:SCAN HDFS [tpch_parquet.orders, RANDOM]
+|     HDFS partitions=1/1 files=2 size=54.21MB
+|     stored statistics:
+|       table: rows=1.50M size=54.21MB
+|       columns: all
+|     extrapolated-rows=disabled max-scan-range-rows=1.18M
+|     mem-estimate=40.00MB mem-reservation=4.00MB thread-reservation=1
+|     tuple-ids=6 row-size=8B cardinality=1.50M
+|     in pipelines: 03(GETNEXT)
+|
+05:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: o_orderkey = o_orderkey
+|  fk/pk conjuncts: o_orderkey = o_orderkey
+|  mem-estimate=59.44MB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=0,2,4 row-size=24B cardinality=5.76M
+|  in pipelines: 00(GETNEXT), 02(OPEN)
+|
+|--09:EXCHANGE [BROADCAST]
+|  |  mem-estimate=10.02MB mem-reservation=0B thread-reservation=0
+|  |  tuple-ids=4 row-size=8B cardinality=1.50M
+|  |  in pipelines: 02(GETNEXT)
+|  |
+|  F02:PLAN FRAGMENT [RANDOM] hosts=2 instances=2
+|  Per-Host Resources: mem-estimate=40.05MB mem-reservation=4.00MB thread-reservation=2
+|  02:SCAN HDFS [tpch_parquet.orders, RANDOM]
+|     HDFS partitions=1/1 files=2 size=54.21MB
+|     stored statistics:
+|       table: rows=1.50M size=54.21MB
+|       columns: all
+|     extrapolated-rows=disabled max-scan-range-rows=1.18M
+|     mem-estimate=40.00MB mem-reservation=4.00MB thread-reservation=1
+|     tuple-ids=4 row-size=8B cardinality=1.50M
+|     in pipelines: 02(GETNEXT)
+|
+04:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: l_orderkey = o_orderkey
+|  fk/pk conjuncts: l_orderkey = o_orderkey
+|  mem-estimate=59.44MB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=0,2 row-size=16B cardinality=5.76M
+|  in pipelines: 00(GETNEXT), 01(OPEN)
+|
+|--08:EXCHANGE [BROADCAST]
+|  |  mem-estimate=10.02MB mem-reservation=0B thread-reservation=0
+|  |  tuple-ids=2 row-size=8B cardinality=1.50M
+|  |  in pipelines: 01(GETNEXT)
+|  |
+|  F01:PLAN FRAGMENT [RANDOM] hosts=2 instances=2
+|  Per-Host Resources: mem-estimate=40.05MB mem-reservation=4.00MB thread-reservation=2
+|  01:SCAN HDFS [tpch_parquet.orders, RANDOM]
+|     HDFS partitions=1/1 files=2 size=54.21MB
+|     stored statistics:
+|       table: rows=1.50M size=54.21MB
+|       columns: all
+|     extrapolated-rows=disabled max-scan-range-rows=1.18M
+|     mem-estimate=40.00MB mem-reservation=4.00MB thread-reservation=1
+|     tuple-ids=2 row-size=8B cardinality=1.50M
+|     in pipelines: 01(GETNEXT)
+|
+00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
+   HDFS partitions=1/1 files=3 size=194.00MB
+   stored statistics:
+     table: rows=6.00M size=194.00MB
+     columns: all
+   extrapolated-rows=disabled max-scan-range-rows=2.14M
+   mem-estimate=80.00MB mem-reservation=4.00MB thread-reservation=1
+   tuple-ids=0 row-size=8B cardinality=6.00M
+   in pipelines: 00(GETNEXT)
+====
+# IMPALA-12357: no filter elimination because no fk/pk conjuncts detected.
+select STRAIGHT_JOIN count(*) from
+     (select l_orderkey from tpch_parquet.lineitem a LIMIT 1) a
+     join (select l_orderkey from tpch_parquet.lineitem) l1 on a.l_orderkey = l1.l_orderkey
+     join (select l_orderkey from tpch_parquet.lineitem) l2 on l1.l_orderkey = l2.l_orderkey
+     join (select l_orderkey from tpch_parquet.lineitem) l3 on l2.l_orderkey = l3.l_orderkey;
+---- QUERYOPTIONS
+RUNTIME_FILTER_MAX_SIZE=256KB
+EXPLAIN_LEVEL=2
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=1.21GB mem-reservation=118.75MB thread-reservation=5 runtime-filters-memory=768.00KB
+PLAN-ROOT SINK
+|  output exprs: count(*)
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|
+07:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  mem-estimate=16.00KB mem-reservation=0B spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=8 row-size=8B cardinality=1
+|  in pipelines: 07(GETNEXT), 00(OPEN)
+|
+06:HASH JOIN [INNER JOIN]
+|  hash predicates: l_orderkey = l_orderkey
+|  fk/pk conjuncts: none
+|  runtime filters: RF000[bloom] <- l_orderkey
+|  mem-estimate=305.50MB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=0,2,4,6 row-size=32B cardinality=58
+|  in pipelines: 00(GETNEXT), 03(OPEN)
+|
+|--03:SCAN HDFS [tpch_parquet.lineitem]
+|     HDFS partitions=1/1 files=3 size=194.00MB
+|     stored statistics:
+|       table: rows=6.00M size=194.00MB
+|       columns: all
+|     extrapolated-rows=disabled max-scan-range-rows=2.14M
+|     mem-estimate=80.00MB mem-reservation=4.00MB thread-reservation=1
+|     tuple-ids=6 row-size=8B cardinality=6.00M
+|     in pipelines: 03(GETNEXT)
+|
+05:HASH JOIN [INNER JOIN]
+|  hash predicates: l_orderkey = l_orderkey
+|  fk/pk conjuncts: none
+|  runtime filters: RF002[bloom] <- l_orderkey
+|  mem-estimate=305.50MB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=0,2,4 row-size=24B cardinality=15
+|  in pipelines: 00(GETNEXT), 02(OPEN)
+|
+|--02:SCAN HDFS [tpch_parquet.lineitem]
+|     HDFS partitions=1/1 files=3 size=194.00MB
+|     runtime filters: RF000[bloom] -> l_orderkey
+|     stored statistics:
+|       table: rows=6.00M size=194.00MB
+|       columns: all
+|     extrapolated-rows=disabled max-scan-range-rows=2.14M
+|     mem-estimate=80.00MB mem-reservation=4.00MB thread-reservation=1
+|     tuple-ids=4 row-size=8B cardinality=6.00M
+|     in pipelines: 02(GETNEXT)
+|
+04:HASH JOIN [INNER JOIN]
+|  hash predicates: l_orderkey = l_orderkey
+|  fk/pk conjuncts: none
+|  runtime filters: RF004[bloom] <- l_orderkey
+|  mem-estimate=305.50MB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=0,2 row-size=16B cardinality=4
+|  in pipelines: 00(GETNEXT), 01(OPEN)
+|
+|--01:SCAN HDFS [tpch_parquet.lineitem]
+|     HDFS partitions=1/1 files=3 size=194.00MB
+|     runtime filters: RF000[bloom] -> tpch_parquet.lineitem.l_orderkey, RF002[bloom] -> l_orderkey
+|     stored statistics:
+|       table: rows=6.00M size=194.00MB
+|       columns: all
+|     extrapolated-rows=disabled max-scan-range-rows=2.14M
+|     mem-estimate=80.00MB mem-reservation=4.00MB thread-reservation=1
+|     tuple-ids=2 row-size=8B cardinality=6.00M
+|     in pipelines: 01(GETNEXT)
+|
+00:SCAN HDFS [tpch_parquet.lineitem a]
+   HDFS partitions=1/1 files=3 size=194.00MB
+   runtime filters: RF004[bloom] -> l_orderkey
+   stored statistics:
+     table: rows=6.00M size=194.00MB
+     columns: all
+   extrapolated-rows=disabled max-scan-range-rows=2.14M
+   limit: 1
+   mem-estimate=80.00MB mem-reservation=4.00MB thread-reservation=1
+   tuple-ids=0 row-size=8B cardinality=1
+   in pipelines: 00(GETNEXT)
+---- DISTRIBUTEDPLAN
+F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=947.37MB mem-reservation=102.75MB thread-reservation=1 runtime-filters-memory=768.00KB
+PLAN-ROOT SINK
+|  output exprs: count(*)
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|
+07:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  mem-estimate=16.00KB mem-reservation=0B spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=8 row-size=8B cardinality=1
+|  in pipelines: 07(GETNEXT), 00(OPEN)
+|
+06:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: l_orderkey = l_orderkey
+|  fk/pk conjuncts: none
+|  runtime filters: RF000[bloom] <- l_orderkey
+|  mem-estimate=305.50MB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=0,2,4,6 row-size=32B cardinality=58
+|  in pipelines: 00(GETNEXT), 03(OPEN)
+|
+|--11:EXCHANGE [UNPARTITIONED]
+|  |  mem-estimate=10.04MB mem-reservation=0B thread-reservation=0
+|  |  tuple-ids=6 row-size=8B cardinality=6.00M
+|  |  in pipelines: 03(GETNEXT)
+|  |
+|  F04:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+|  Per-Host Resources: mem-estimate=80.05MB mem-reservation=4.00MB thread-reservation=2
+|  03:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
+|     HDFS partitions=1/1 files=3 size=194.00MB
+|     stored statistics:
+|       table: rows=6.00M size=194.00MB
+|       columns: all
+|     extrapolated-rows=disabled max-scan-range-rows=2.14M
+|     mem-estimate=80.00MB mem-reservation=4.00MB thread-reservation=1
+|     tuple-ids=6 row-size=8B cardinality=6.00M
+|     in pipelines: 03(GETNEXT)
+|
+05:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: l_orderkey = l_orderkey
+|  fk/pk conjuncts: none
+|  runtime filters: RF002[bloom] <- l_orderkey
+|  mem-estimate=305.50MB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=0,2,4 row-size=24B cardinality=15
+|  in pipelines: 00(GETNEXT), 02(OPEN)
+|
+|--10:EXCHANGE [UNPARTITIONED]
+|  |  mem-estimate=10.04MB mem-reservation=0B thread-reservation=0
+|  |  tuple-ids=4 row-size=8B cardinality=6.00M
+|  |  in pipelines: 02(GETNEXT)
+|  |
+|  F03:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+|  Per-Host Resources: mem-estimate=80.30MB mem-reservation=4.25MB thread-reservation=2 runtime-filters-memory=256.00KB
+|  02:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
+|     HDFS partitions=1/1 files=3 size=194.00MB
+|     runtime filters: RF000[bloom] -> l_orderkey
+|     stored statistics:
+|       table: rows=6.00M size=194.00MB
+|       columns: all
+|     extrapolated-rows=disabled max-scan-range-rows=2.14M
+|     mem-estimate=80.00MB mem-reservation=4.00MB thread-reservation=1
+|     tuple-ids=4 row-size=8B cardinality=6.00M
+|     in pipelines: 02(GETNEXT)
+|
+04:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: l_orderkey = l_orderkey
+|  fk/pk conjuncts: none
+|  runtime filters: RF004[bloom] <- l_orderkey
+|  mem-estimate=305.50MB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=0,2 row-size=16B cardinality=4
+|  in pipelines: 00(GETNEXT), 01(OPEN)
+|
+|--09:EXCHANGE [UNPARTITIONED]
+|  |  mem-estimate=10.04MB mem-reservation=0B thread-reservation=0
+|  |  tuple-ids=2 row-size=8B cardinality=6.00M
+|  |  in pipelines: 01(GETNEXT)
+|  |
+|  F02:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+|  Per-Host Resources: mem-estimate=80.55MB mem-reservation=4.50MB thread-reservation=2 runtime-filters-memory=512.00KB
+|  01:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
+|     HDFS partitions=1/1 files=3 size=194.00MB
+|     runtime filters: RF000[bloom] -> tpch_parquet.lineitem.l_orderkey, RF002[bloom] -> l_orderkey
+|     stored statistics:
+|       table: rows=6.00M size=194.00MB
+|       columns: all
+|     extrapolated-rows=disabled max-scan-range-rows=2.14M
+|     mem-estimate=80.00MB mem-reservation=4.00MB thread-reservation=1
+|     tuple-ids=2 row-size=8B cardinality=6.00M
+|     in pipelines: 01(GETNEXT)
+|
+08:EXCHANGE [UNPARTITIONED]
+|  limit: 1
+|  mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=0 row-size=8B cardinality=1
+|  in pipelines: 00(GETNEXT)
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+Per-Host Resources: mem-estimate=80.30MB mem-reservation=4.25MB thread-reservation=2 runtime-filters-memory=256.00KB
+00:SCAN HDFS [tpch_parquet.lineitem a, RANDOM]
+   HDFS partitions=1/1 files=3 size=194.00MB
+   runtime filters: RF004[bloom] -> l_orderkey
+   stored statistics:
+     table: rows=6.00M size=194.00MB
+     columns: all
+   extrapolated-rows=disabled max-scan-range-rows=2.14M
+   limit: 1
+   mem-estimate=80.00MB mem-reservation=4.00MB thread-reservation=1
+   tuple-ids=0 row-size=8B cardinality=1
+   in pipelines: 00(GETNEXT)
+====
+# IMPALA-12357: RF004 still created because 01:SCAN has filter predicate.
+select STRAIGHT_JOIN count(*) from
+     (select l_orderkey from tpch_parquet.lineitem) a
+     join (select o_orderkey, o_custkey from tpch_parquet.orders) l1 on a.l_orderkey = l1.o_orderkey
+     join (select o_orderkey from tpch_parquet.orders) l2 on l1.o_orderkey = l2.o_orderkey
+     join (select o_orderkey from tpch_parquet.orders) l3 on l2.o_orderkey = l3.o_orderkey
+where l1.o_custkey < 1000;
+---- QUERYOPTIONS
+RUNTIME_FILTER_MAX_SIZE=256KB
+EXPLAIN_LEVEL=2
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=367.64MB mem-reservation=96.75MB thread-reservation=5 runtime-filters-memory=256.00KB
+PLAN-ROOT SINK
+|  output exprs: count(*)
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|
+07:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  mem-estimate=16.00KB mem-reservation=0B spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=8 row-size=8B cardinality=1
+|  in pipelines: 07(GETNEXT), 00(OPEN)
+|
+06:HASH JOIN [INNER JOIN]
+|  hash predicates: o_orderkey = o_orderkey
+|  fk/pk conjuncts: o_orderkey = o_orderkey
+|  mem-estimate=59.44MB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=0,2,4,6 row-size=40B cardinality=1
+|  in pipelines: 00(GETNEXT), 03(OPEN)
+|
+|--03:SCAN HDFS [tpch_parquet.orders]
+|     HDFS partitions=1/1 files=2 size=54.21MB
+|     stored statistics:
+|       table: rows=1.50M size=54.21MB
+|       columns: all
+|     extrapolated-rows=disabled max-scan-range-rows=1.18M
+|     mem-estimate=40.00MB mem-reservation=4.00MB thread-reservation=1
+|     tuple-ids=6 row-size=8B cardinality=1.50M
+|     in pipelines: 03(GETNEXT)
+|
+05:HASH JOIN [INNER JOIN]
+|  hash predicates: o_orderkey = o_orderkey
+|  fk/pk conjuncts: o_orderkey = o_orderkey
+|  mem-estimate=59.44MB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=0,2,4 row-size=32B cardinality=1
+|  in pipelines: 00(GETNEXT), 02(OPEN)
+|
+|--02:SCAN HDFS [tpch_parquet.orders]
+|     HDFS partitions=1/1 files=2 size=54.21MB
+|     stored statistics:
+|       table: rows=1.50M size=54.21MB
+|       columns: all
+|     extrapolated-rows=disabled max-scan-range-rows=1.18M
+|     mem-estimate=40.00MB mem-reservation=4.00MB thread-reservation=1
+|     tuple-ids=4 row-size=8B cardinality=1.50M
+|     in pipelines: 02(GETNEXT)
+|
+04:HASH JOIN [INNER JOIN]
+|  hash predicates: l_orderkey = o_orderkey
+|  fk/pk conjuncts: l_orderkey = o_orderkey
+|  runtime filters: RF004[bloom] <- o_orderkey
+|  mem-estimate=8.50MB mem-reservation=8.50MB spill-buffer=512.00KB thread-reservation=0
+|  tuple-ids=0,2 row-size=24B cardinality=1
+|  in pipelines: 00(GETNEXT), 01(OPEN)
+|
+|--01:SCAN HDFS [tpch_parquet.orders]
+|     HDFS partitions=1/1 files=2 size=54.21MB
+|     predicates: tpch_parquet.orders.o_custkey < CAST(1000 AS BIGINT)
+|     stored statistics:
+|       table: rows=1.50M size=54.21MB
+|       columns: all
+|     extrapolated-rows=disabled max-scan-range-rows=1.18M
+|     parquet statistics predicates: tpch_parquet.orders.o_custkey < CAST(1000 AS BIGINT)
+|     parquet dictionary predicates: tpch_parquet.orders.o_custkey < CAST(1000 AS BIGINT)
+|     mem-estimate=80.00MB mem-reservation=8.00MB thread-reservation=1
+|     tuple-ids=2 row-size=16B cardinality=150.00K
+|     in pipelines: 01(GETNEXT)
+|
+00:SCAN HDFS [tpch_parquet.lineitem]
+   HDFS partitions=1/1 files=3 size=194.00MB
+   runtime filters: RF004[bloom] -> l_orderkey
+   stored statistics:
+     table: rows=6.00M size=194.00MB
+     columns: all
+   extrapolated-rows=disabled max-scan-range-rows=2.14M
+   mem-estimate=80.00MB mem-reservation=4.00MB thread-reservation=1
+   tuple-ids=0 row-size=8B cardinality=6.00M
+   in pipelines: 00(GETNEXT)
+---- DISTRIBUTEDPLAN
+F05:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=32.00KB mem-reservation=0B thread-reservation=1
+PLAN-ROOT SINK
+|  output exprs: count(*)
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|
+13:AGGREGATE [FINALIZE]
+|  output: count:merge(*)
+|  mem-estimate=16.00KB mem-reservation=0B spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=8 row-size=8B cardinality=1
+|  in pipelines: 13(GETNEXT), 07(OPEN)
+|
+12:EXCHANGE [UNPARTITIONED]
+|  mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=8 row-size=8B cardinality=1
+|  in pipelines: 07(GETNEXT)
+|
+F03:PLAN FRAGMENT [HASH(o_orderkey)] hosts=3 instances=3
+Per-Host Resources: mem-estimate=83.98MB mem-reservation=68.00MB thread-reservation=1
+07:AGGREGATE
+|  output: count(*)
+|  mem-estimate=16.00KB mem-reservation=0B spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=8 row-size=8B cardinality=1
+|  in pipelines: 07(GETNEXT), 00(OPEN)
+|
+06:HASH JOIN [INNER JOIN, PARTITIONED]
+|  hash predicates: o_orderkey = o_orderkey
+|  fk/pk conjuncts: o_orderkey = o_orderkey
+|  mem-estimate=34.00MB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=0,2,4,6 row-size=40B cardinality=575.77K
+|  in pipelines: 00(GETNEXT), 03(OPEN)
+|
+|--11:EXCHANGE [HASH(o_orderkey)]
+|  |  mem-estimate=5.75MB mem-reservation=0B thread-reservation=0
+|  |  tuple-ids=6 row-size=8B cardinality=1.50M
+|  |  in pipelines: 03(GETNEXT)
+|  |
+|  F04:PLAN FRAGMENT [RANDOM] hosts=2 instances=2
+|  Per-Host Resources: mem-estimate=40.14MB mem-reservation=4.00MB thread-reservation=2
+|  03:SCAN HDFS [tpch_parquet.orders, RANDOM]
+|     HDFS partitions=1/1 files=2 size=54.21MB
+|     stored statistics:
+|       table: rows=1.50M size=54.21MB
+|       columns: all
+|     extrapolated-rows=disabled max-scan-range-rows=1.18M
+|     mem-estimate=40.00MB mem-reservation=4.00MB thread-reservation=1
+|     tuple-ids=6 row-size=8B cardinality=1.50M
+|     in pipelines: 03(GETNEXT)
+|
+05:HASH JOIN [INNER JOIN, PARTITIONED]
+|  hash predicates: o_orderkey = o_orderkey
+|  fk/pk conjuncts: o_orderkey = o_orderkey
+|  mem-estimate=34.00MB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=0,2,4 row-size=32B cardinality=575.77K
+|  in pipelines: 00(GETNEXT), 02(OPEN)
+|
+|--10:EXCHANGE [HASH(o_orderkey)]
+|  |  mem-estimate=5.75MB mem-reservation=0B thread-reservation=0
+|  |  tuple-ids=4 row-size=8B cardinality=1.50M
+|  |  in pipelines: 02(GETNEXT)
+|  |
+|  F02:PLAN FRAGMENT [RANDOM] hosts=2 instances=2
+|  Per-Host Resources: mem-estimate=40.14MB mem-reservation=4.00MB thread-reservation=2
+|  02:SCAN HDFS [tpch_parquet.orders, RANDOM]
+|     HDFS partitions=1/1 files=2 size=54.21MB
+|     stored statistics:
+|       table: rows=1.50M size=54.21MB
+|       columns: all
+|     extrapolated-rows=disabled max-scan-range-rows=1.18M
+|     mem-estimate=40.00MB mem-reservation=4.00MB thread-reservation=1
+|     tuple-ids=4 row-size=8B cardinality=1.50M
+|     in pipelines: 02(GETNEXT)
+|
+09:EXCHANGE [HASH(o_orderkey)]
+|  mem-estimate=4.49MB mem-reservation=0B thread-reservation=0
+|  tuple-ids=0,2 row-size=24B cardinality=575.77K
+|  in pipelines: 00(GETNEXT)
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+Per-Host Resources: mem-estimate=91.08MB mem-reservation=12.75MB thread-reservation=2 runtime-filters-memory=256.00KB
+04:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: l_orderkey = o_orderkey
+|  fk/pk conjuncts: l_orderkey = o_orderkey
+|  runtime filters: RF004[bloom] <- o_orderkey
+|  mem-estimate=8.50MB mem-reservation=8.50MB spill-buffer=512.00KB thread-reservation=0
+|  tuple-ids=0,2 row-size=24B cardinality=575.77K
+|  in pipelines: 00(GETNEXT), 01(OPEN)
+|
+|--08:EXCHANGE [BROADCAST]
+|  |  mem-estimate=2.33MB mem-reservation=0B thread-reservation=0
+|  |  tuple-ids=2 row-size=16B cardinality=150.00K
+|  |  in pipelines: 01(GETNEXT)
+|  |
+|  F01:PLAN FRAGMENT [RANDOM] hosts=2 instances=2
+|  Per-Host Resources: mem-estimate=80.08MB mem-reservation=8.00MB thread-reservation=2
+|  01:SCAN HDFS [tpch_parquet.orders, RANDOM]
+|     HDFS partitions=1/1 files=2 size=54.21MB
+|     predicates: tpch_parquet.orders.o_custkey < CAST(1000 AS BIGINT)
+|     stored statistics:
+|       table: rows=1.50M size=54.21MB
+|       columns: all
+|     extrapolated-rows=disabled max-scan-range-rows=1.18M
+|     parquet statistics predicates: tpch_parquet.orders.o_custkey < CAST(1000 AS BIGINT)
+|     parquet dictionary predicates: tpch_parquet.orders.o_custkey < CAST(1000 AS BIGINT)
+|     mem-estimate=80.00MB mem-reservation=8.00MB thread-reservation=1
+|     tuple-ids=2 row-size=16B cardinality=150.00K
+|     in pipelines: 01(GETNEXT)
+|
+00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
+   HDFS partitions=1/1 files=3 size=194.00MB
+   runtime filters: RF004[bloom] -> l_orderkey
+   stored statistics:
+     table: rows=6.00M size=194.00MB
+     columns: all
+   extrapolated-rows=disabled max-scan-range-rows=2.14M
+   mem-estimate=80.00MB mem-reservation=4.00MB thread-reservation=1
+   tuple-ids=0 row-size=8B cardinality=6.00M
+   in pipelines: 00(GETNEXT)
+====
+# IMPALA-12357: build fragment with union still eligible for filter elimination.
+select STRAIGHT_JOIN count(*) from
+     (select l_orderkey from tpch_parquet.lineitem) a
+     join (
+      select o_orderkey from tpch_parquet.orders
+      union all
+      select o_orderkey from tpch_parquet.orders
+     ) l1 on a.l_orderkey = l1.o_orderkey;
+---- QUERYOPTIONS
+RUNTIME_FILTER_MAX_SIZE=256KB
+EXPLAIN_LEVEL=2
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=262.03MB mem-reservation=42.25MB thread-reservation=3
+PLAN-ROOT SINK
+|  output exprs: count(*)
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|
+05:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  mem-estimate=16.00KB mem-reservation=0B spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=6 row-size=8B cardinality=1
+|  in pipelines: 05(GETNEXT), 00(OPEN)
+|
+04:HASH JOIN [INNER JOIN]
+|  hash predicates: l_orderkey = o_orderkey
+|  fk/pk conjuncts: assumed fk/pk
+|  mem-estimate=141.78MB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=0,4 row-size=16B cardinality=2
+|  in pipelines: 00(GETNEXT), 02(OPEN), 03(OPEN)
+|
+|--01:UNION
+|  |  pass-through-operands: all
+|  |  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|  |  tuple-ids=4 row-size=8B cardinality=3.00M
+|  |  in pipelines: 02(GETNEXT), 03(GETNEXT)
+|  |
+|  |--03:SCAN HDFS [tpch_parquet.orders]
+|  |     HDFS partitions=1/1 files=2 size=54.21MB
+|  |     stored statistics:
+|  |       table: rows=1.50M size=54.21MB
+|  |       columns: all
+|  |     extrapolated-rows=disabled max-scan-range-rows=1.18M
+|  |     mem-estimate=40.00MB mem-reservation=4.00MB thread-reservation=1
+|  |     tuple-ids=3 row-size=8B cardinality=1.50M
+|  |     in pipelines: 03(GETNEXT)
+|  |
+|  02:SCAN HDFS [tpch_parquet.orders]
+|     HDFS partitions=1/1 files=2 size=54.21MB
+|     stored statistics:
+|       table: rows=1.50M size=54.21MB
+|       columns: all
+|     extrapolated-rows=disabled max-scan-range-rows=1.18M
+|     mem-estimate=40.00MB mem-reservation=4.00MB thread-reservation=1
+|     tuple-ids=2 row-size=8B cardinality=1.50M
+|     in pipelines: 02(GETNEXT)
+|
+00:SCAN HDFS [tpch_parquet.lineitem]
+   HDFS partitions=1/1 files=3 size=194.00MB
+   stored statistics:
+     table: rows=6.00M size=194.00MB
+     columns: all
+   extrapolated-rows=disabled max-scan-range-rows=2.14M
+   mem-estimate=80.00MB mem-reservation=4.00MB thread-reservation=1
+   tuple-ids=0 row-size=8B cardinality=6.00M
+   in pipelines: 00(GETNEXT)
+---- DISTRIBUTEDPLAN
+F05:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=32.00KB mem-reservation=0B thread-reservation=1
+PLAN-ROOT SINK
+|  output exprs: count(*)
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|
+09:AGGREGATE [FINALIZE]
+|  output: count:merge(*)
+|  mem-estimate=16.00KB mem-reservation=0B spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=6 row-size=8B cardinality=1
+|  in pipelines: 09(GETNEXT), 05(OPEN)
+|
+08:EXCHANGE [UNPARTITIONED]
+|  mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=6 row-size=8B cardinality=1
+|  in pipelines: 05(GETNEXT)
+|
+F04:PLAN FRAGMENT [HASH(l_orderkey)] hosts=3 instances=3
+Per-Host Resources: mem-estimate=67.32MB mem-reservation=34.00MB thread-reservation=1
+05:AGGREGATE
+|  output: count(*)
+|  mem-estimate=16.00KB mem-reservation=0B spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=6 row-size=8B cardinality=1
+|  in pipelines: 05(GETNEXT), 00(OPEN)
+|
+04:HASH JOIN [INNER JOIN, PARTITIONED]
+|  hash predicates: l_orderkey = o_orderkey
+|  fk/pk conjuncts: assumed fk/pk
+|  mem-estimate=47.26MB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=0,4 row-size=16B cardinality=11.52M
+|  in pipelines: 00(GETNEXT), 02(OPEN), 03(OPEN)
+|
+|--07:EXCHANGE [HASH(o_orderkey)]
+|  |  mem-estimate=10.02MB mem-reservation=0B thread-reservation=0
+|  |  tuple-ids=4 row-size=8B cardinality=3.00M
+|  |  in pipelines: 02(GETNEXT), 03(GETNEXT)
+|  |
+|  F03:PLAN FRAGMENT [RANDOM] hosts=2 instances=2
+|  Per-Host Resources: mem-estimate=40.14MB mem-reservation=4.00MB thread-reservation=2
+|  01:UNION
+|  |  pass-through-operands: all
+|  |  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|  |  tuple-ids=4 row-size=8B cardinality=3.00M
+|  |  in pipelines: 02(GETNEXT), 03(GETNEXT)
+|  |
+|  |--03:SCAN HDFS [tpch_parquet.orders, RANDOM]
+|  |     HDFS partitions=1/1 files=2 size=54.21MB
+|  |     stored statistics:
+|  |       table: rows=1.50M size=54.21MB
+|  |       columns: all
+|  |     extrapolated-rows=disabled max-scan-range-rows=1.18M
+|  |     mem-estimate=40.00MB mem-reservation=4.00MB thread-reservation=1
+|  |     tuple-ids=3 row-size=8B cardinality=1.50M
+|  |     in pipelines: 03(GETNEXT)
+|  |
+|  02:SCAN HDFS [tpch_parquet.orders, RANDOM]
+|     HDFS partitions=1/1 files=2 size=54.21MB
+|     stored statistics:
+|       table: rows=1.50M size=54.21MB
+|       columns: all
+|     extrapolated-rows=disabled max-scan-range-rows=1.18M
+|     mem-estimate=40.00MB mem-reservation=4.00MB thread-reservation=1
+|     tuple-ids=2 row-size=8B cardinality=1.50M
+|     in pipelines: 02(GETNEXT)
+|
+06:EXCHANGE [HASH(l_orderkey)]
+|  mem-estimate=10.04MB mem-reservation=0B thread-reservation=0
+|  tuple-ids=0 row-size=8B cardinality=6.00M
+|  in pipelines: 00(GETNEXT)
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+Per-Host Resources: mem-estimate=80.14MB mem-reservation=4.00MB thread-reservation=2
+00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
+   HDFS partitions=1/1 files=3 size=194.00MB
+   stored statistics:
+     table: rows=6.00M size=194.00MB
+     columns: all
+   extrapolated-rows=disabled max-scan-range-rows=2.14M
+   mem-estimate=80.00MB mem-reservation=4.00MB thread-reservation=1
+   tuple-ids=0 row-size=8B cardinality=6.00M
+   in pipelines: 00(GETNEXT)
+====
+# IMPALA-12357: build fragment with plan node other than scan or union is not eligible
+# for filter elimination.
+select STRAIGHT_JOIN count(*) from
+     (select l_orderkey from tpch_parquet.lineitem) a
+     join (select distinct o_orderkey from tpch_parquet.orders) l1 on a.l_orderkey = l1.o_orderkey;
+---- QUERYOPTIONS
+RUNTIME_FILTER_MAX_SIZE=256KB
+EXPLAIN_LEVEL=2
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=173.69MB mem-reservation=72.25MB thread-reservation=3 runtime-filters-memory=256.00KB
+PLAN-ROOT SINK
+|  output exprs: count(*)
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|
+04:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  mem-estimate=16.00KB mem-reservation=0B spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=5 row-size=8B cardinality=1
+|  in pipelines: 04(GETNEXT), 00(OPEN)
+|
+03:HASH JOIN [INNER JOIN]
+|  hash predicates: l_orderkey = o_orderkey
+|  fk/pk conjuncts: l_orderkey = o_orderkey
+|  runtime filters: RF000[bloom] <- o_orderkey
+|  mem-estimate=59.44MB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=0,3 row-size=16B cardinality=5.76M
+|  in pipelines: 00(GETNEXT), 02(OPEN)
+|
+|--02:AGGREGATE [FINALIZE]
+|  |  group by: o_orderkey
+|  |  mem-estimate=34.00MB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0
+|  |  tuple-ids=3 row-size=8B cardinality=1.50M
+|  |  in pipelines: 02(GETNEXT), 01(OPEN)
+|  |
+|  01:SCAN HDFS [tpch_parquet.orders]
+|     HDFS partitions=1/1 files=2 size=54.21MB
+|     stored statistics:
+|       table: rows=1.50M size=54.21MB
+|       columns: all
+|     extrapolated-rows=disabled max-scan-range-rows=1.18M
+|     mem-estimate=40.00MB mem-reservation=4.00MB thread-reservation=1
+|     tuple-ids=2 row-size=8B cardinality=1.50M
+|     in pipelines: 01(GETNEXT)
+|
+00:SCAN HDFS [tpch_parquet.lineitem]
+   HDFS partitions=1/1 files=3 size=194.00MB
+   runtime filters: RF000[bloom] -> l_orderkey
+   stored statistics:
+     table: rows=6.00M size=194.00MB
+     columns: all
+   extrapolated-rows=disabled max-scan-range-rows=2.14M
+   mem-estimate=80.00MB mem-reservation=4.00MB thread-reservation=1
+   tuple-ids=0 row-size=8B cardinality=6.00M
+   in pipelines: 00(GETNEXT)
+---- DISTRIBUTEDPLAN
+F03:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=32.00KB mem-reservation=0B thread-reservation=1
+PLAN-ROOT SINK
+|  output exprs: count(*)
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|
+09:AGGREGATE [FINALIZE]
+|  output: count:merge(*)
+|  mem-estimate=16.00KB mem-reservation=0B spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=5 row-size=8B cardinality=1
+|  in pipelines: 09(GETNEXT), 04(OPEN)
+|
+08:EXCHANGE [UNPARTITIONED]
+|  mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=5 row-size=8B cardinality=1
+|  in pipelines: 04(GETNEXT)
+|
+F02:PLAN FRAGMENT [HASH(o_orderkey)] hosts=3 instances=3
+Per-Host Resources: mem-estimate=61.29MB mem-reservation=51.25MB thread-reservation=1 runtime-filters-memory=256.00KB
+04:AGGREGATE
+|  output: count(*)
+|  mem-estimate=16.00KB mem-reservation=0B spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=5 row-size=8B cardinality=1
+|  in pipelines: 04(GETNEXT), 00(OPEN)
+|
+03:HASH JOIN [INNER JOIN, PARTITIONED]
+|  hash predicates: l_orderkey = o_orderkey
+|  fk/pk conjuncts: l_orderkey = o_orderkey
+|  runtime filters: RF000[bloom] <- o_orderkey
+|  mem-estimate=34.00MB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=0,3 row-size=16B cardinality=5.76M
+|  in pipelines: 00(GETNEXT), 06(OPEN)
+|
+|--06:AGGREGATE [FINALIZE]
+|  |  group by: o_orderkey
+|  |  mem-estimate=17.00MB mem-reservation=17.00MB spill-buffer=1.00MB thread-reservation=0
+|  |  tuple-ids=3 row-size=8B cardinality=1.50M
+|  |  in pipelines: 06(GETNEXT), 01(OPEN)
+|  |
+|  05:EXCHANGE [HASH(o_orderkey)]
+|  |  mem-estimate=5.75MB mem-reservation=0B thread-reservation=0
+|  |  tuple-ids=3 row-size=8B cardinality=1.50M
+|  |  in pipelines: 01(GETNEXT)
+|  |
+|  F01:PLAN FRAGMENT [RANDOM] hosts=2 instances=2
+|  Per-Host Resources: mem-estimate=57.14MB mem-reservation=21.00MB thread-reservation=2
+|  02:AGGREGATE [STREAMING]
+|  |  group by: o_orderkey
+|  |  mem-estimate=17.00MB mem-reservation=17.00MB spill-buffer=1.00MB thread-reservation=0
+|  |  tuple-ids=3 row-size=8B cardinality=1.50M
+|  |  in pipelines: 01(GETNEXT)
+|  |
+|  01:SCAN HDFS [tpch_parquet.orders, RANDOM]
+|     HDFS partitions=1/1 files=2 size=54.21MB
+|     stored statistics:
+|       table: rows=1.50M size=54.21MB
+|       columns: all
+|     extrapolated-rows=disabled max-scan-range-rows=1.18M
+|     mem-estimate=40.00MB mem-reservation=4.00MB thread-reservation=1
+|     tuple-ids=2 row-size=8B cardinality=1.50M
+|     in pipelines: 01(GETNEXT)
+|
+07:EXCHANGE [HASH(l_orderkey)]
+|  mem-estimate=10.04MB mem-reservation=0B thread-reservation=0
+|  tuple-ids=0 row-size=8B cardinality=6.00M
+|  in pipelines: 00(GETNEXT)
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+Per-Host Resources: mem-estimate=80.39MB mem-reservation=4.25MB thread-reservation=2 runtime-filters-memory=256.00KB
+00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
+   HDFS partitions=1/1 files=3 size=194.00MB
+   runtime filters: RF000[bloom] -> l_orderkey
+   stored statistics:
+     table: rows=6.00M size=194.00MB
+     columns: all
+   extrapolated-rows=disabled max-scan-range-rows=2.14M
+   mem-estimate=80.00MB mem-reservation=4.00MB thread-reservation=1
+   tuple-ids=0 row-size=8B cardinality=6.00M
+   in pipelines: 00(GETNEXT)
+====
+# IMPALA-12357: build fragment scanning collection type is not eligible
+# for filter elimination.
+select STRAIGHT_JOIN count(*) from
+     (select l_orderkey from tpch_parquet.lineitem) a
+     join (select item from functional_parquet.complextypestbl_medium.int_array_array.item) l1 on a.l_orderkey = l1.item;
+---- QUERYOPTIONS
+RUNTIME_FILTER_MAX_SIZE=256KB
+EXPLAIN_LEVEL=2
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=980.77MB mem-reservation=38.75MB thread-reservation=3 runtime-filters-memory=256.00KB
+PLAN-ROOT SINK
+|  output exprs: count(*)
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|
+03:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  mem-estimate=16.00KB mem-reservation=0B spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=4 row-size=8B cardinality=1
+|  in pipelines: 03(GETNEXT), 00(OPEN)
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: l_orderkey = item
+|  fk/pk conjuncts: assumed fk/pk
+|  runtime filters: RF000[bloom] <- item
+|  mem-estimate=884.52MB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=0,2 row-size=12B cardinality=6.00M
+|  in pipelines: 00(GETNEXT), 01(OPEN)
+|
+|--01:SCAN HDFS [functional_parquet.complextypestbl_medium.int_array_array.item]
+|     HDFS partitions=1/1 files=1 size=480.14KB
+|     stored statistics:
+|       table: rows=unavailable size=480.14KB
+|       columns: all
+|     extrapolated-rows=disabled max-scan-range-rows=unavailable
+|     mem-estimate=16.00MB mem-reservation=512.00KB thread-reservation=1
+|     tuple-ids=2 row-size=4B cardinality=30.54M
+|     in pipelines: 01(GETNEXT)
+|
+00:SCAN HDFS [tpch_parquet.lineitem]
+   HDFS partitions=1/1 files=3 size=194.00MB
+   runtime filters: RF000[bloom] -> l_orderkey
+   stored statistics:
+     table: rows=6.00M size=194.00MB
+     columns: all
+   extrapolated-rows=disabled max-scan-range-rows=2.14M
+   mem-estimate=80.00MB mem-reservation=4.00MB thread-reservation=1
+   tuple-ids=0 row-size=8B cardinality=6.00M
+   in pipelines: 00(GETNEXT)
+---- DISTRIBUTEDPLAN
+F03:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=32.00KB mem-reservation=0B thread-reservation=1
+PLAN-ROOT SINK
+|  output exprs: count(*)
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|
+07:AGGREGATE [FINALIZE]
+|  output: count:merge(*)
+|  mem-estimate=16.00KB mem-reservation=0B spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=4 row-size=8B cardinality=1
+|  in pipelines: 07(GETNEXT), 03(OPEN)
+|
+06:EXCHANGE [UNPARTITIONED]
+|  mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=4 row-size=8B cardinality=1
+|  in pipelines: 03(GETNEXT)
+|
+F02:PLAN FRAGMENT [HASH(l_orderkey)] hosts=3 instances=3
+Per-Host Resources: mem-estimate=315.13MB mem-reservation=34.25MB thread-reservation=1 runtime-filters-memory=256.00KB
+03:AGGREGATE
+|  output: count(*)
+|  mem-estimate=16.00KB mem-reservation=0B spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=4 row-size=8B cardinality=1
+|  in pipelines: 03(GETNEXT), 00(OPEN)
+|
+02:HASH JOIN [INNER JOIN, PARTITIONED]
+|  hash predicates: l_orderkey = item
+|  fk/pk conjuncts: assumed fk/pk
+|  runtime filters: RF000[bloom] <- item
+|  mem-estimate=294.84MB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=0,2 row-size=12B cardinality=6.00M
+|  in pipelines: 00(GETNEXT), 01(OPEN)
+|
+|--05:EXCHANGE [HASH(item)]
+|  |  mem-estimate=10.01MB mem-reservation=0B thread-reservation=0
+|  |  tuple-ids=2 row-size=4B cardinality=30.54M
+|  |  in pipelines: 01(GETNEXT)
+|  |
+|  F01:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=16.09MB mem-reservation=512.00KB thread-reservation=2
+|  01:SCAN HDFS [functional_parquet.complextypestbl_medium.int_array_array.item, RANDOM]
+|     HDFS partitions=1/1 files=1 size=480.14KB
+|     stored statistics:
+|       table: rows=unavailable size=480.14KB
+|       columns: all
+|     extrapolated-rows=disabled max-scan-range-rows=unavailable
+|     mem-estimate=16.00MB mem-reservation=512.00KB thread-reservation=1
+|     tuple-ids=2 row-size=4B cardinality=30.54M
+|     in pipelines: 01(GETNEXT)
+|
+04:EXCHANGE [HASH(l_orderkey)]
+|  mem-estimate=10.04MB mem-reservation=0B thread-reservation=0
+|  tuple-ids=0 row-size=8B cardinality=6.00M
+|  in pipelines: 00(GETNEXT)
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+Per-Host Resources: mem-estimate=80.39MB mem-reservation=4.25MB thread-reservation=2 runtime-filters-memory=256.00KB
+00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
+   HDFS partitions=1/1 files=3 size=194.00MB
+   runtime filters: RF000[bloom] -> l_orderkey
+   stored statistics:
+     table: rows=6.00M size=194.00MB
+     columns: all
+   extrapolated-rows=disabled max-scan-range-rows=2.14M
+   mem-estimate=80.00MB mem-reservation=4.00MB thread-reservation=1
+   tuple-ids=0 row-size=8B cardinality=6.00M
+   in pipelines: 00(GETNEXT)
+====


[impala] 01/03: IMPALA-12383: Fix SingleNodePlanner aggregation limits

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 704ff7788d015dcbe66a319fb017d0a3f8a76399
Author: Michael Smith <mi...@cloudera.com>
AuthorDate: Fri Aug 18 11:38:11 2023 -0700

    IMPALA-12383: Fix SingleNodePlanner aggregation limits
    
    IMPALA-2581 added enforcement of the limit when adding entries to the
    grouping aggregation. It would stop adding new entries if the number of
    entries in the grouping aggregation was >= the limit. If the grouping
    aggregation never contains more entries than the limit, then it would
    not output more entries.
    
    However, this limit was not enforced exactly when adding. It would add a
    whole batch before checking the limit, so it can go past the limit. In
    practice the exchange in a distributed aggregation would enforce limits,
    so this would only show up when num_nodes=1. As a result, the following
    query incorrectly returns 16 rows, not 10:
    
      set num_nodes=1;
      select distinct l_orderkey from tpch.lineitem limit 10;
    
    One option is to be exact when adding items to the group aggregation,
    which would require testing the limit on each row (we don't know which
    are duplicates). This is awkward. Removing the limit on the output of
    the aggregation also is not really needed for the original change
    (stopping the children early once the limit is reached). Instead, we
    restore the limit on the output of the grouping agg (which is already
    known to work).
    
    Testing:
    - added a test case where we assert number of rows returned by an
      aggregation node (rather than an exchange or top-n).
    - restores definition of ALL_CLUSTER_SIZES and makes it simpler to
      enable for individual test suites. Filed IMPALA-12394 to generally
      re-enable testing with ALL_CLUSTER_SIZES. Enables ALL_CLUSTER_SIZES
      for aggregation tests.
    
    Change-Id: Ic5eec1190e8e182152aa954897b79cc3f219c816
    Reviewed-on: http://gerrit.cloudera.org:8080/20379
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: Joe McDonnell <jo...@cloudera.com>
---
 be/src/exec/aggregation-node-base.cc |  3 +--
 be/src/exec/grouping-aggregator.cc   |  6 +-----
 be/src/exec/grouping-aggregator.h    |  5 +----
 tests/common/impala_test_suite.py    | 12 ++++++------
 tests/common/test_dimensions.py      |  9 +++++----
 tests/query_test/test_aggregation.py | 18 +++++++++++++++++-
 6 files changed, 31 insertions(+), 22 deletions(-)

diff --git a/be/src/exec/aggregation-node-base.cc b/be/src/exec/aggregation-node-base.cc
index 33e285a78..2e748c09c 100644
--- a/be/src/exec/aggregation-node-base.cc
+++ b/be/src/exec/aggregation-node-base.cc
@@ -82,8 +82,7 @@ AggregationNodeBase::AggregationNodeBase(
           static_cast<const GroupingAggregatorConfig*>(agg);
       DCHECK(grouping_config != nullptr);
       node.reset(new GroupingAggregator(this, pool_, *grouping_config,
-          pnode.tnode_->agg_node.estimated_input_cardinality,
-          pnode.tnode_->agg_node.fast_limit_check));
+          pnode.tnode_->agg_node.estimated_input_cardinality));
     }
     aggs_.push_back(std::move(node));
     runtime_profile_->AddChild(aggs_[i]->runtime_profile());
diff --git a/be/src/exec/grouping-aggregator.cc b/be/src/exec/grouping-aggregator.cc
index cd6162c83..7ff9038aa 100644
--- a/be/src/exec/grouping-aggregator.cc
+++ b/be/src/exec/grouping-aggregator.cc
@@ -133,8 +133,7 @@ static const int STREAMING_HT_MIN_REDUCTION_SIZE =
     sizeof(STREAMING_HT_MIN_REDUCTION) / sizeof(STREAMING_HT_MIN_REDUCTION[0]);
 
 GroupingAggregator::GroupingAggregator(ExecNode* exec_node, ObjectPool* pool,
-    const GroupingAggregatorConfig& config, int64_t estimated_input_cardinality,
-    bool needUnsetLimit)
+    const GroupingAggregatorConfig& config, int64_t estimated_input_cardinality)
   : Aggregator(exec_node, pool, config,
       Substitute("$0$1", RuntimeProfile::PREFIX_GROUPING_AGGREGATOR, config.agg_idx_)),
     hash_table_config_(*config.hash_table_config_),
@@ -152,9 +151,6 @@ GroupingAggregator::GroupingAggregator(ExecNode* exec_node, ObjectPool* pool,
     estimated_input_cardinality_(estimated_input_cardinality),
     partition_pool_(new ObjectPool()) {
   DCHECK_EQ(PARTITION_FANOUT, 1 << NUM_PARTITIONING_BITS);
-  if (needUnsetLimit) {
-    UnsetLimit();
-  }
 }
 
 Status GroupingAggregator::Prepare(RuntimeState* state) {
diff --git a/be/src/exec/grouping-aggregator.h b/be/src/exec/grouping-aggregator.h
index b71ba9885..e07a9fdea 100644
--- a/be/src/exec/grouping-aggregator.h
+++ b/be/src/exec/grouping-aggregator.h
@@ -192,8 +192,7 @@ class GroupingAggregatorConfig : public AggregatorConfig {
 class GroupingAggregator : public Aggregator {
  public:
   GroupingAggregator(ExecNode* exec_node, ObjectPool* pool,
-      const GroupingAggregatorConfig& config, int64_t estimated_input_cardinality,
-      bool needUnsetLimit);
+      const GroupingAggregatorConfig& config, int64_t estimated_input_cardinality);
 
   virtual Status Prepare(RuntimeState* state) override;
   virtual Status Open(RuntimeState* state) override;
@@ -215,8 +214,6 @@ class GroupingAggregator : public Aggregator {
 
   virtual int64_t GetNumKeys() const override;
 
-  void UnsetLimit() { limit_ = -1; }
-
  private:
   struct Partition;
 
diff --git a/tests/common/impala_test_suite.py b/tests/common/impala_test_suite.py
index bc83ca609..7b23ae53e 100644
--- a/tests/common/impala_test_suite.py
+++ b/tests/common/impala_test_suite.py
@@ -48,7 +48,6 @@ from tests.common.impala_connection import create_connection
 from tests.common.impala_service import ImpaladService
 from tests.common.test_dimensions import (
     ALL_BATCH_SIZES,
-    ALL_CLUSTER_SIZES,
     ALL_DISABLE_CODEGEN_OPTIONS,
     ALL_NODES_ONLY,
     TableFormatInfo,
@@ -166,7 +165,7 @@ GROUP_NAME = grp.getgrgid(pwd.getpwnam(getuser()).pw_gid).gr_name
 # Base class for Impala tests. All impala test cases should inherit from this class
 class ImpalaTestSuite(BaseTestSuite):
   @classmethod
-  def add_test_dimensions(cls):
+  def add_test_dimensions(cls, cluster_sizes=None):
     """
     A hook for adding additional dimensions.
 
@@ -176,7 +175,10 @@ class ImpalaTestSuite(BaseTestSuite):
     super(ImpalaTestSuite, cls).add_test_dimensions()
     cls.ImpalaTestMatrix.add_dimension(
         cls.create_table_info_dimension(cls.exploration_strategy()))
-    cls.ImpalaTestMatrix.add_dimension(cls.__create_exec_option_dimension())
+    if not cluster_sizes:
+      # TODO IMPALA-12394: switch to ALL_CLUSTER_SIZES for exhaustive runs
+      cluster_sizes = ALL_NODES_ONLY
+    cls.ImpalaTestMatrix.add_dimension(cls.__create_exec_option_dimension(cluster_sizes))
     # Execute tests through Beeswax by default. Individual tests that have been converted
     # to work with the HS2 client can add HS2 in addition to or instead of beeswax.
     cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('protocol', 'beeswax'))
@@ -1081,14 +1083,12 @@ class ImpalaTestSuite(BaseTestSuite):
     return tf_dimensions
 
   @classmethod
-  def __create_exec_option_dimension(cls):
-    cluster_sizes = ALL_CLUSTER_SIZES
+  def __create_exec_option_dimension(cls, cluster_sizes):
     disable_codegen_options = ALL_DISABLE_CODEGEN_OPTIONS
     batch_sizes = ALL_BATCH_SIZES
     exec_single_node_option = [0]
     if cls.exploration_strategy() == 'core':
       disable_codegen_options = [False]
-      cluster_sizes = ALL_NODES_ONLY
     return create_exec_option_dimension(cluster_sizes, disable_codegen_options,
                                         batch_sizes,
                                         exec_single_node_option=exec_single_node_option,
diff --git a/tests/common/test_dimensions.py b/tests/common/test_dimensions.py
index 0c5c2fef3..9b957d3bb 100644
--- a/tests/common/test_dimensions.py
+++ b/tests/common/test_dimensions.py
@@ -203,9 +203,8 @@ def orc_schema_resolution_constraint(v):
 # Common sets of values for the exec option vectors
 ALL_BATCH_SIZES = [0]
 
-# Don't run with NUM_NODES=1 due to IMPALA-561
-# ALL_CLUSTER_SIZES = [0, 1]
-ALL_CLUSTER_SIZES = [0]
+# Test SingleNode and Distributed Planners
+ALL_CLUSTER_SIZES = [0, 1]
 
 SINGLE_NODE_ONLY = [1]
 ALL_NODES_ONLY = [0]
@@ -219,7 +218,9 @@ def create_single_exec_option_dimension(num_nodes=0, disable_codegen_rows_thresh
       disable_codegen_rows_threshold_options=[disable_codegen_rows_threshold],
       batch_sizes=[0])
 
-def create_exec_option_dimension(cluster_sizes=ALL_CLUSTER_SIZES,
+
+# TODO IMPALA-12394: switch to ALL_CLUSTER_SIZES
+def create_exec_option_dimension(cluster_sizes=ALL_NODES_ONLY,
                                  disable_codegen_options=ALL_DISABLE_CODEGEN_OPTIONS,
                                  batch_sizes=ALL_BATCH_SIZES,
                                  sync_ddl=None, exec_single_node_option=[0],
diff --git a/tests/query_test/test_aggregation.py b/tests/query_test/test_aggregation.py
index 0df1767a7..2e20abe7e 100644
--- a/tests/query_test/test_aggregation.py
+++ b/tests/query_test/test_aggregation.py
@@ -24,6 +24,7 @@ import pytest
 from testdata.common import widetable
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.test_dimensions import (
+    ALL_CLUSTER_SIZES,
     create_exec_option_dimension,
     create_exec_option_dimension_from_dict,
     create_uncompressed_text_dimension)
@@ -91,7 +92,7 @@ class TestAggregation(ImpalaTestSuite):
 
   @classmethod
   def add_test_dimensions(cls):
-    super(TestAggregation, cls).add_test_dimensions()
+    super(TestAggregation, cls).add_test_dimensions(ALL_CLUSTER_SIZES)
 
     # Add two more dimensions
     cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('agg_func', *AGG_FUNCTIONS))
@@ -391,6 +392,21 @@ class TestAggregationQueries(ImpalaTestSuite):
       pytest.xfail(reason="IMPALA-283 - HBase null handling is inconsistent")
     self.run_test_case('QueryTest/grouping-sets', vector)
 
+  def test_aggregation_limit(self, vector):
+    """Test that limits are honoured when enforced by aggregation node."""
+    # 1-phase
+    result = self.execute_query(
+        "select distinct l_orderkey from tpch.lineitem limit 10",
+        vector.get_value('exec_option'))
+    assert len(result.data) == 10
+
+    # 2-phase with transpose
+    result = self.execute_query(
+        "select count(distinct l_discount), group_concat(distinct l_linestatus), "
+        "max(l_quantity) from tpch.lineitem group by l_tax, l_shipmode limit 10;",
+        vector.get_value('exec_option'))
+    assert len(result.data) == 10
+
 
 class TestDistinctAggregation(ImpalaTestSuite):
   """Run the distinct aggregation test suite, with codegen and shuffle_distinct_exprs


[impala] 03/03: IMPALA-12432: Make LdapKerberosImpalaShellTest* work with Guava 28

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 1642886b41a83302f12dd95cbee763060f5bef5d
Author: Joe McDonnell <jo...@cloudera.com>
AuthorDate: Fri Sep 8 19:18:24 2023 -0700

    IMPALA-12432: Make LdapKerberosImpalaShellTest* work with Guava 28
    
    In the change for IMPALA-11726, we added some usages of
    Guava's ImmutableMap.of() with >5 key/value pairs. This
    won't compile in older versions of Guava (like Guava 28)
    that only support up to 5 key/value pairs in ImmutableMap.of().
    
    Being compatible with older versions of Guava makes it
    easier for Impala to match the Guava version of Hadoop,
    Hive, etc when compiling against older versions.
    
    This switches to using ImmutableMap.builder() to build
    the larger maps.
    
    Testing:
     - Built with Guava 28 and ran the tests
    
    Change-Id: Ic6d9c69fff749bcdb0887a6676574db9526f68e8
    Reviewed-on: http://gerrit.cloudera.org:8080/20471
    Reviewed-by: Michael Smith <mi...@cloudera.com>
    Tested-by: Michael Smith <mi...@cloudera.com>
---
 .../customcluster/LdapKerberosImpalaShellTest.java | 44 +++++++++++-----------
 .../LdapKerberosImpalaShellTestBase.java           | 20 +++++-----
 2 files changed, 32 insertions(+), 32 deletions(-)

diff --git a/fe/src/test/java/org/apache/impala/customcluster/LdapKerberosImpalaShellTest.java b/fe/src/test/java/org/apache/impala/customcluster/LdapKerberosImpalaShellTest.java
index 2f50098ac..9b90f84bb 100644
--- a/fe/src/test/java/org/apache/impala/customcluster/LdapKerberosImpalaShellTest.java
+++ b/fe/src/test/java/org/apache/impala/customcluster/LdapKerberosImpalaShellTest.java
@@ -155,17 +155,17 @@ public class LdapKerberosImpalaShellTest extends LdapKerberosImpalaShellTestBase
             // define custom LDAP user filter corresponding to the values
             // in fe/src/test/resources/users.ldif,
             // and allow using custom filters with Kerberos authentication
-            ImmutableMap.of(
-                    "enable_ldap_auth", "true",
-                    "ldap_uri", ldapUri,
-                    "ldap_passwords_in_clear_ok", "true",
-                    "ldap_user_search_basedn", defaultUserSearchBaseDn,
-                    "ldap_user_filter", "(cn={0})",
-                    "ldap_search_bind_authentication", "true",
-                    "ldap_bind_dn", TEST_USER_DN_1,
-                    "ldap_bind_password_cmd", passwordCommand,
-                    "allow_custom_ldap_filters_with_kerberos_auth", "true"
-            ),
+            ImmutableMap.<String, String>builder()
+                .put("enable_ldap_auth", "true")
+                .put("ldap_uri", ldapUri)
+                .put("ldap_passwords_in_clear_ok", "true")
+                .put("ldap_user_search_basedn", defaultUserSearchBaseDn)
+                .put("ldap_user_filter", "(cn={0})")
+                .put("ldap_search_bind_authentication", "true")
+                .put("ldap_bind_dn", TEST_USER_DN_1)
+                .put("ldap_bind_password_cmd", passwordCommand)
+                .put("allow_custom_ldap_filters_with_kerberos_auth", "true")
+                .build(),
 
             // set proxy user: allow TEST_USER_1 to act as a proxy for delegateUser_
             ImmutableMap.of(
@@ -614,17 +614,17 @@ public class LdapKerberosImpalaShellTest extends LdapKerberosImpalaShellTestBase
     String ldapUri = String.format("ldap://localhost:%s",
             serverRule.getLdapServer().getPort());
     String passwordCommand = String.format("'echo -n %s'", TEST_PASSWORD_1);
-    return ImmutableMap.of(
-            "enable_ldap_auth", "true",
-            "ldap_uri", ldapUri,
-            "ldap_passwords_in_clear_ok", "true",
-            "ldap_bind_pattern","'cn=#UID,ou=Users,dc=myorg,dc=com'",
-            "ldap_group_dn_pattern", GROUP_DN_PATTERN,
-            "ldap_group_membership_key", "uniqueMember",
-            "ldap_group_class_key", "groupOfUniqueNames",
-            "ldap_bind_dn", TEST_USER_DN_1,
-            "ldap_bind_password_cmd", passwordCommand
-    );
+    return ImmutableMap.<String, String>builder()
+        .put("enable_ldap_auth", "true")
+        .put("ldap_uri", ldapUri)
+        .put("ldap_passwords_in_clear_ok", "true")
+        .put("ldap_bind_pattern","'cn=#UID,ou=Users,dc=myorg,dc=com'")
+        .put("ldap_group_dn_pattern", GROUP_DN_PATTERN)
+        .put("ldap_group_membership_key", "uniqueMember")
+        .put("ldap_group_class_key", "groupOfUniqueNames")
+        .put("ldap_bind_dn", TEST_USER_DN_1)
+        .put("ldap_bind_password_cmd", passwordCommand)
+        .build();
   }
 
   private Map<String, String> getCustomLdapSimpleBindSearchFilterFlags() {
diff --git a/fe/src/test/java/org/apache/impala/customcluster/LdapKerberosImpalaShellTestBase.java b/fe/src/test/java/org/apache/impala/customcluster/LdapKerberosImpalaShellTestBase.java
index 5a857df39..ff9fe533d 100644
--- a/fe/src/test/java/org/apache/impala/customcluster/LdapKerberosImpalaShellTestBase.java
+++ b/fe/src/test/java/org/apache/impala/customcluster/LdapKerberosImpalaShellTestBase.java
@@ -52,16 +52,16 @@ public class LdapKerberosImpalaShellTestBase extends LdapImpalaShellTest {
     String ldapUri = String.format("ldap://localhost:%s",
             serverRule.getLdapServer().getPort());
     String passwordCommand = String.format("'echo -n %s'", TEST_PASSWORD_1);
-    return ImmutableMap.of(
-            "enable_ldap_auth", "true",
-            "ldap_uri", ldapUri,
-            "ldap_passwords_in_clear_ok", "true",
-            "ldap_user_search_basedn", userSearchBaseDn,
-            "ldap_group_search_basedn", groupSearchBaseDn,
-            "ldap_search_bind_authentication", "true",
-            "ldap_bind_dn", TEST_USER_DN_1,
-            "ldap_bind_password_cmd", passwordCommand
-    );
+    return ImmutableMap.<String, String>builder()
+        .put("enable_ldap_auth", "true")
+        .put("ldap_uri", ldapUri)
+        .put("ldap_passwords_in_clear_ok", "true")
+        .put("ldap_user_search_basedn", userSearchBaseDn)
+        .put("ldap_group_search_basedn", groupSearchBaseDn)
+        .put("ldap_search_bind_authentication", "true")
+        .put("ldap_bind_dn", TEST_USER_DN_1)
+        .put("ldap_bind_password_cmd", passwordCommand)
+        .build();
   }
 
   protected Map<String, String> getCustomLdapFilterFlags() {