You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by la...@apache.org on 2022/06/28 19:01:30 UTC

[impala] branch master updated (df0a014e3 -> a625a95db)

This is an automated email from the ASF dual-hosted git repository.

laszlog pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


    from df0a014e3 IMPALA-10927: Deflaky TestFetchAndSpooling.test_rows_sent_counters
     new f38c53235 IMPALA-11279: Optimize plain count(*) queries for Iceberg tables
     new a625a95db IMPALA-9615: re2's max_mem opt configurable via an Impala startup flag

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/common/global-flags.cc                      |   9 +
 be/src/common/init.cc                              |  13 ++
 be/src/exprs/like-predicate.cc                     |   5 +
 be/src/exprs/string-functions-ir.cc                |  19 +-
 be/src/exprs/string-functions.h                    |   4 +
 .../java/org/apache/impala/analysis/Analyzer.java  |  21 +-
 .../apache/impala/analysis/FunctionCallExpr.java   |  19 +-
 .../org/apache/impala/analysis/SelectStmt.java     |  71 ++++++-
 .../java/org/apache/impala/catalog/FeFsTable.java  |  68 ++++++
 .../CountStarToConstRule.java}                     |  29 +--
 .../iceberg-compound-predicate-push-down.test      |   4 +-
 .../QueryTest/iceberg-in-predicate-push-down.test  |   4 +-
 .../iceberg-is-null-predicate-push-down.test       |   2 +-
 .../QueryTest/iceberg-partitioned-insert.test      |   2 +-
 .../iceberg-plain-count-star-optimization.test     | 235 +++++++++++++++++++++
 .../iceberg-upper-lower-bound-metrics.test         |   6 +-
 tests/custom_cluster/test_re2_max_mem.py           |  69 ++++++
 tests/query_test/test_iceberg.py                   |  37 ++++
 18 files changed, 589 insertions(+), 28 deletions(-)
 copy fe/src/main/java/org/apache/impala/{analysis/AlterViewSetOwnerStmt.java => rewrite/CountStarToConstRule.java} (52%)
 create mode 100644 testdata/workloads/functional-query/queries/QueryTest/iceberg-plain-count-star-optimization.test
 create mode 100755 tests/custom_cluster/test_re2_max_mem.py


[impala] 01/02: IMPALA-11279: Optimize plain count(*) queries for Iceberg tables

Posted by la...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

laszlog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit f38c53235f1797f91ff9a65bb734d3f38f1aadc9
Author: LPL <li...@sensorsdata.cn>
AuthorDate: Sun May 29 12:49:43 2022 +0800

    IMPALA-11279: Optimize plain count(*) queries for Iceberg tables
    
    This commit optimizes the plain count(*) queries for the Iceberg tables.
    When the `org.apache.iceberg.SnapshotSummary#TOTAL_RECORDS_PROP` can be
    retrieved from the current `org.apache.iceberg.BaseSnapshot#summary` of
    the Iceberg table, this kind of query can be very fast. If this property
    is not retrieved, the query will aggregate the `num_rows` of parquet
    `file_metadata_` as usual.
    
    Queries that can be optimized need to meet the following requirements:
     - SelectStmt does not have WHERE clause
     - SelectStmt does not have GROUP BY clause
     - SelectStmt does not have HAVING clause
     - The TableRefs of FROM clause contains only one BaseTableRef
     - Only for the Iceberg table
     - SelectList must contain 'count(*)' or 'count(constant)'
     - SelectList can contain other agg functions, e.g. min, sum, etc
     - SelectList can contain constant
    
    Testing:
     - Added end-to-end test
     - Existing tests
     - Test it in a real cluster
    
    Change-Id: I8e9c48bbba7ab2320fa80915e7001ce54f1ef6d9
    Reviewed-on: http://gerrit.cloudera.org:8080/18574
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../java/org/apache/impala/analysis/Analyzer.java  |  21 +-
 .../apache/impala/analysis/FunctionCallExpr.java   |  19 +-
 .../org/apache/impala/analysis/SelectStmt.java     |  71 ++++++-
 .../java/org/apache/impala/catalog/FeFsTable.java  |  68 ++++++
 .../impala/rewrite/CountStarToConstRule.java       |  42 ++++
 .../iceberg-compound-predicate-push-down.test      |   4 +-
 .../QueryTest/iceberg-in-predicate-push-down.test  |   4 +-
 .../iceberg-is-null-predicate-push-down.test       |   2 +-
 .../QueryTest/iceberg-partitioned-insert.test      |   2 +-
 .../iceberg-plain-count-star-optimization.test     | 235 +++++++++++++++++++++
 .../iceberg-upper-lower-bound-metrics.test         |   6 +-
 tests/query_test/test_iceberg.py                   |  37 ++++
 12 files changed, 496 insertions(+), 15 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
index 93d3b6edb..5b5976ab1 100644
--- a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
+++ b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
@@ -33,8 +33,6 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.function.Function;
-import java.util.stream.Stream;
-import java.util.stream.Collectors;
 
 import org.apache.impala.analysis.Path.PathType;
 import org.apache.impala.analysis.StmtMetadataLoader.StmtTableCache;
@@ -48,7 +46,6 @@ import org.apache.impala.authorization.PrivilegeRequestBuilder;
 import org.apache.impala.authorization.TableMask;
 import org.apache.impala.authorization.User;
 import org.apache.impala.catalog.Column;
-import org.apache.impala.catalog.ColumnStats;
 import org.apache.impala.catalog.DatabaseNotFoundException;
 import org.apache.impala.catalog.FeCatalog;
 import org.apache.impala.catalog.FeDataSourceTable;
@@ -77,6 +74,7 @@ import org.apache.impala.compat.MetastoreShim;
 import org.apache.impala.planner.JoinNode;
 import org.apache.impala.planner.PlanNode;
 import org.apache.impala.rewrite.BetweenToCompoundRule;
+import org.apache.impala.rewrite.CountStarToConstRule;
 import org.apache.impala.rewrite.SimplifyCastExprRule;
 import org.apache.impala.rewrite.ConvertToCNFRule;
 import org.apache.impala.rewrite.EqualityDisjunctsToInRule;
@@ -230,6 +228,9 @@ public class Analyzer {
   // if an exception was encountered.
   private String mvAuthExceptionMsg_ = null;
 
+  // Total records num of the Iceberg table.
+  private long totalRecordsNum_;
+
   // Required Operation type: Read, write, any(read or write).
   public enum OperationType {
     READ,
@@ -572,6 +573,7 @@ public class Analyzer {
         rules.add(DefaultNdvScaleRule.INSTANCE);
         rules.add(SimplifyCastExprRule.INSTANCE);
       }
+      rules.add(CountStarToConstRule.INSTANCE);
       exprRewriter_ = new ExprRewriter(rules);
     }
   };
@@ -983,6 +985,19 @@ public class Analyzer {
     return mvAuthExceptionMsg_;
   }
 
+  public void setTotalRecordsNum(long totalRecordsNum) {
+    totalRecordsNum_ = totalRecordsNum;
+  }
+
+  public long getTotalRecordsNum() { return totalRecordsNum_; }
+
+  /**
+   * Check if 'count(*)' FunctionCallExpr can be rewritten as LiteralExpr. When
+   * totalRecordsNum_ is 0, no optimization 'count(*)' is still very fast, so return true
+   * only if totalRecordsNum_ is greater than 0.
+   */
+  public boolean canRewriteCountStarToConst() { return totalRecordsNum_ > 0; }
+
   /**
    * Register conjuncts that are outer joined by a full outer join. For a given
    * predicate, we record the last full outer join that outer-joined any of its
diff --git a/fe/src/main/java/org/apache/impala/analysis/FunctionCallExpr.java b/fe/src/main/java/org/apache/impala/analysis/FunctionCallExpr.java
index ec1c98a98..3ffe61a51 100644
--- a/fe/src/main/java/org/apache/impala/analysis/FunctionCallExpr.java
+++ b/fe/src/main/java/org/apache/impala/analysis/FunctionCallExpr.java
@@ -18,7 +18,6 @@
 package org.apache.impala.analysis;
 
 import java.util.Arrays;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.HashSet;
 import java.util.Optional;
@@ -181,6 +180,24 @@ public class FunctionCallExpr extends Expr {
     return result;
   }
 
+  /**
+   * Return true if an Expr is count star FunctionCallExpr.
+   * e.g. count(*), count(<literal>).
+   */
+  public static boolean isCountStarFunctionCallExpr(Expr expr) {
+    if (!(expr instanceof FunctionCallExpr)) return false;
+    FunctionCallExpr func = (FunctionCallExpr) expr;
+    if (!func.getFnName().getFunction().equalsIgnoreCase("count")) {
+      return false;
+    }
+    if (func.getParams().isStar()) return true;
+    if (func.getParams().isDistinct()) return false;
+    if (func.getParams().exprs().size() != 1) return false;
+    Expr child = func.getChild(0);
+    if (!Expr.IS_LITERAL.apply(child)) return false;
+    return !Expr.IS_NULL_VALUE.apply(child);
+  }
+
   /**
    * Copy c'tor used in clone().
    */
diff --git a/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java b/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java
index 3b52c8173..aad599a2a 100644
--- a/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java
@@ -29,15 +29,17 @@ import java.util.Set;
 import java.util.stream.Stream;
 import java.util.stream.Collectors;
 
+import org.apache.iceberg.SnapshotSummary;
 import org.apache.impala.analysis.Path.PathType;
-import org.apache.impala.analysis.TableRef.ZippingUnnestType;
 import org.apache.impala.authorization.Privilege;
 import org.apache.impala.catalog.ArrayType;
 import org.apache.impala.catalog.Column;
+import org.apache.impala.catalog.DatabaseNotFoundException;
+import org.apache.impala.catalog.FeFsTable.Utils;
+import org.apache.impala.catalog.FeIcebergTable;
 import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.FeView;
-import org.apache.impala.catalog.MapType;
 import org.apache.impala.catalog.StructField;
 import org.apache.impala.catalog.StructType;
 import org.apache.impala.catalog.TableLoadingException;
@@ -267,6 +269,7 @@ public class SelectStmt extends QueryStmt {
     if (isAnalyzed()) return;
     super.analyze(analyzer);
     new SelectAnalyzer(analyzer).analyze();
+    this.optimizePlainCountStarQuery();
   }
 
   /**
@@ -1304,6 +1307,70 @@ public class SelectStmt extends QueryStmt {
     }
   }
 
+
+  /**
+   * Set totalRecordsNum_ in analyzer_ for the plain count(*) queries of Iceberg tables.
+   * Queries that can be rewritten need to meet the following requirements:
+   *  - stmt does not have WHERE clause
+   *  - stmt does not have GROUP BY clause
+   *  - stmt does not have HAVING clause
+   *  - tableRefs contains only one BaseTableRef
+   *  - table is the Iceberg table
+   *  - SelectList must contains 'count(*)' or 'count(constant)'
+   *  - SelectList can contain other agg functions, e.g. min, sum, etc
+   *  - SelectList can contain constant
+   *
+   * e.g. 'SELECT count(*) FROM iceberg_tbl' would be rewritten as 'SELECT constant'.
+   */
+  public void optimizePlainCountStarQuery() throws AnalysisException {
+    if (this.hasWhereClause()) return;
+    if (this.hasGroupByClause()) return;
+    if (this.hasHavingClause()) return;
+
+    List<TableRef> tables = this.getTableRefs();
+    if (tables.size() != 1) return;
+    TableRef tableRef = tables.get(0);
+    if (!(tableRef instanceof BaseTableRef)) return;
+
+    TableName tableName = tableRef.getDesc().getTableName();
+    FeTable table;
+    try {
+      table = analyzer_.getCatalog().getTable(tableName.getDb(), tableName.getTbl());
+    } catch (DatabaseNotFoundException e) {
+      throw new AnalysisException(
+          Analyzer.DB_DOES_NOT_EXIST_ERROR_MSG + tableName.getDb(), e);
+    }
+    if (!(table instanceof FeIcebergTable)) return;
+
+    boolean hasCountStarFunc = false;
+    boolean hasAggFunc = false;
+    analyzer_.checkStmtExprLimit();
+    for (SelectListItem selectItem : this.getSelectList().getItems()) {
+      Expr expr = selectItem.getExpr();
+      if (expr == null) continue;
+      if (expr.isConstant()) continue;
+      if (FunctionCallExpr.isCountStarFunctionCallExpr(expr)) {
+        hasCountStarFunc = true;
+      } else if (expr.isAggregate()) {
+        hasAggFunc = true;
+      } else {
+        return;
+      }
+    }
+    if (!hasCountStarFunc) return;
+
+    long num = Utils.getSnapshotSummaryPropOfTypeLong(
+        ((FeIcebergTable) table).getIcebergApiTable(), tableRef.getTimeTravelSpec(),
+        SnapshotSummary.TOTAL_RECORDS_PROP);
+    if (num <= 0) return;
+    analyzer_.setTotalRecordsNum(num);
+
+    if (hasAggFunc) return;
+    // When all select items are 'count(*)' or constant, 'select count(*) from ice_tbl;'
+    // would need to be rewritten as 'select const;'
+    fromClause_.getTableRefs().clear();
+  }
+
   @Override
   public void rewriteExprs(ExprRewriter rewriter) throws AnalysisException {
     Preconditions.checkState(isAnalyzed());
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java b/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
index acb83a5b5..1e9342034 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
@@ -34,9 +34,14 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
 import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.util.SnapshotUtil;
 import org.apache.impala.analysis.Expr;
 import org.apache.impala.analysis.LiteralExpr;
 import org.apache.impala.analysis.PartitionKeyValue;
+import org.apache.impala.analysis.TimeTravelSpec;
+import org.apache.impala.analysis.TimeTravelSpec.Kind;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.FileSystemUtil;
@@ -57,7 +62,10 @@ import org.apache.thrift.TException;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Joiner;
+import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Frontend interface for interacting with a filesystem-backed table.
@@ -351,6 +359,9 @@ public interface FeFsTable extends FeTable {
    * these can become default methods of the interface.
    */
   abstract class Utils {
+
+    private final static Logger LOG = LoggerFactory.getLogger(Utils.class);
+
     // Table property key for skip.header.line.count
     public static final String TBL_PROP_SKIP_HEADER_LINE_COUNT = "skip.header.line.count";
 
@@ -450,6 +461,63 @@ public interface FeFsTable extends FeTable {
       return result;
     }
 
+    /**
+     * Get the snapshot summary property of type long from the Iceberg table.
+     */
+    public static long getSnapshotSummaryPropOfTypeLong(Table icebergTable,
+        TimeTravelSpec travelSpec, String propName) {
+      String propValue = getSnapshotSummaryProperty(icebergTable, travelSpec, propName);
+
+      if (Strings.isNullOrEmpty(propValue)) return -1;
+
+      try {
+        return Long.parseLong(propValue);
+      } catch (NumberFormatException ex) {
+        LOG.warn("Failed to get {} from iceberg table summary. Table name: {}, "
+                + "Table location: {}, Prop value: {}", propName, icebergTable.name(),
+            icebergTable.location(), propValue, ex);
+      }
+
+      return -1;
+    }
+
+    /**
+     * Get the snapshot summary property from the Iceberg table.
+     */
+    private static String getSnapshotSummaryProperty(Table icebergTable,
+        TimeTravelSpec travelSpec, String propName) {
+      Snapshot snapshot = getIcebergSnapshot(icebergTable, travelSpec);
+      // There are no snapshots for the tables created for the first time.
+      if (snapshot == null) { return null; }
+      return snapshot.summary().get(propName);
+    }
+
+    /**
+     * Get time-travel snapshot or current snapshot of the Iceberg table.
+     * Only current snapshot can return null.
+     */
+    private static Snapshot getIcebergSnapshot(Table icebergTable,
+        TimeTravelSpec travelSpec) {
+
+      if (travelSpec == null) return icebergTable.currentSnapshot();
+
+      Snapshot snapshot;
+      if (travelSpec.getKind().equals(Kind.VERSION_AS_OF)) {
+        long snapshotId = travelSpec.getAsOfVersion();
+        snapshot = icebergTable.snapshot(snapshotId);
+        Preconditions.checkArgument(snapshot != null, "Cannot find snapshot with ID %s",
+            snapshotId);
+      } else {
+        long timestampMillis = travelSpec.getAsOfMillis();
+        long snapshotId = SnapshotUtil.snapshotIdAsOfTime(icebergTable, timestampMillis);
+        snapshot = icebergTable.snapshot(snapshotId);
+        Preconditions.checkArgument(snapshot != null,
+            "Cannot find snapshot with ID %s, timestampMillis %s", snapshotId,
+            timestampMillis);
+      }
+      return snapshot;
+    }
+
     /**
      * Get file info for the given fe iceberg table.
      */
diff --git a/fe/src/main/java/org/apache/impala/rewrite/CountStarToConstRule.java b/fe/src/main/java/org/apache/impala/rewrite/CountStarToConstRule.java
new file mode 100644
index 000000000..6709ec28d
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/rewrite/CountStarToConstRule.java
@@ -0,0 +1,42 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.rewrite;
+
+import org.apache.impala.analysis.Analyzer;
+import org.apache.impala.analysis.Expr;
+import org.apache.impala.analysis.FunctionCallExpr;
+import org.apache.impala.analysis.LiteralExpr;
+import org.apache.impala.catalog.Type;
+import org.apache.impala.common.AnalysisException;
+
+/**
+ * Rewrite rule to replace plain count star function call expr to const expr.
+ */
+public enum CountStarToConstRule implements ExprRewriteRule {
+
+    INSTANCE,
+    ;
+
+    @Override
+    public Expr apply(Expr expr, Analyzer analyzer) throws AnalysisException {
+        if (!FunctionCallExpr.isCountStarFunctionCallExpr(expr)) return expr;
+        if (!analyzer.canRewriteCountStarToConst()) return expr;
+        return LiteralExpr.createFromUnescapedStr(String.valueOf(
+            analyzer.getTotalRecordsNum()), Type.BIGINT);
+    }
+}
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-compound-predicate-push-down.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-compound-predicate-push-down.test
index 8e7afedb2..433d51601 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-compound-predicate-push-down.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-compound-predicate-push-down.test
@@ -39,7 +39,7 @@ select count(1) from ice_compound_pred_pd;
 BIGINT
 ---- RUNTIME_PROFILE
 aggregation(SUM, NumRowGroups): 0
-aggregation(SUM, NumFileMetadataRead): 12
+aggregation(SUM, NumFileMetadataRead): 0
 ====
 ---- QUERY
 show files in ice_compound_pred_pd;
@@ -417,7 +417,7 @@ select count(1) from ice_compound_pred_pd1;
 BIGINT
 ---- RUNTIME_PROFILE
 aggregation(SUM, NumRowGroups): 0
-aggregation(SUM, NumFileMetadataRead): 10
+aggregation(SUM, NumFileMetadataRead): 0
 ====
 ---- QUERY
 show files in ice_compound_pred_pd1;
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-in-predicate-push-down.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-in-predicate-push-down.test
index 8aa4ae1b4..91144dd64 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-in-predicate-push-down.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-in-predicate-push-down.test
@@ -33,7 +33,7 @@ select count(1) from ice_pred_pd1;
 BIGINT
 ---- RUNTIME_PROFILE
 aggregation(SUM, NumRowGroups): 0
-aggregation(SUM, NumFileMetadataRead): 3
+aggregation(SUM, NumFileMetadataRead): 0
 ====
 ---- QUERY
 show files in ice_pred_pd1;
@@ -491,7 +491,7 @@ select count(1) from ice_pred_pd2;
 BIGINT
 ---- RUNTIME_PROFILE
 aggregation(SUM, NumRowGroups): 0
-aggregation(SUM, NumFileMetadataRead): 3
+aggregation(SUM, NumFileMetadataRead): 0
 ====
 ---- QUERY
 show files in ice_pred_pd2;
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-is-null-predicate-push-down.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-is-null-predicate-push-down.test
index e9b01cfbf..2ec3111f3 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-is-null-predicate-push-down.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-is-null-predicate-push-down.test
@@ -35,7 +35,7 @@ select count(1) from ice_is_null_pred_pd;
 BIGINT
 ---- RUNTIME_PROFILE
 aggregation(SUM, NumRowGroups): 0
-aggregation(SUM, NumFileMetadataRead): 9
+aggregation(SUM, NumFileMetadataRead): 0
 ====
 ---- QUERY
 show files in ice_is_null_pred_pd;
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-partitioned-insert.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-partitioned-insert.test
index 8ae4e66dd..41c070109 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-partitioned-insert.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-partitioned-insert.test
@@ -91,7 +91,7 @@ select count(*) from ice_bigints;
 BIGINT
 ---- RUNTIME_PROFILE
 aggregation(SUM, NumRowGroups): 0
-aggregation(SUM, NumFileMetadataRead): 6
+aggregation(SUM, NumFileMetadataRead): 0
 ====
 ---- QUERY
 select count(*) from ice_bigints
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-plain-count-star-optimization.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-plain-count-star-optimization.test
new file mode 100644
index 000000000..cd97ab8f6
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-plain-count-star-optimization.test
@@ -0,0 +1,235 @@
+====
+---- QUERY
+create table ice_tbl (
+  col_i INT,
+  col_s STRING
+) partitioned by spec (col_s) stored as iceberg tblproperties ('write.format.default' = 'parquet');
+---- RESULTS
+'Table has been created.'
+====
+---- QUERY
+select count(*) from ice_tbl;
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+aggregation(SUM, NumFileMetadataRead): 0
+====
+---- QUERY
+insert into
+  ice_tbl
+values
+  (1, "odd"),
+  (3, "odd"),
+  (5, "odd");
+show files in ice_tbl;
+---- RESULTS
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_tbl/data/col_s=odd/.*.0.parq','.*',''
+---- TYPES
+STRING, STRING, STRING
+====
+---- QUERY
+select count(*) from ice_tbl;
+---- RESULTS
+3
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+aggregation(SUM, NumFileMetadataRead): 0
+====
+---- QUERY
+create table ice_tbl_u1 stored as iceberg as select * from ice_tbl;
+---- RESULTS
+'Inserted 3 row(s)'
+====
+---- QUERY
+select count(*) from ice_tbl_u1;
+---- RESULTS
+3
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+aggregation(SUM, NumFileMetadataRead): 0
+====
+---- QUERY
+insert into
+  ice_tbl
+values
+  (2, "even"),
+  (4, "even"),
+  (6, "even");
+show files in ice_tbl;
+---- RESULTS
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_tbl/data/col_s=even/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_tbl/data/col_s=odd/.*.0.parq','.*',''
+---- TYPES
+STRING, STRING, STRING
+====
+---- QUERY
+select count(*) from ice_tbl;
+---- RESULTS
+6
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+aggregation(SUM, NumFileMetadataRead): 0
+====
+---- QUERY
+create table ice_tbl_u2 stored as iceberg as select * from ice_tbl;
+---- RESULTS
+'Inserted 6 row(s)'
+====
+---- QUERY
+select count(*) from ice_tbl_u2;
+---- RESULTS
+6
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+aggregation(SUM, NumFileMetadataRead): 0
+====
+---- QUERY
+insert into
+  ice_tbl
+values
+  (1, "odd"),
+  (2, "even");
+show files in ice_tbl;
+---- RESULTS
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_tbl/data/col_s=even/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_tbl/data/col_s=even/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_tbl/data/col_s=odd/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_tbl/data/col_s=odd/.*.0.parq','.*',''
+---- TYPES
+STRING, STRING, STRING
+====
+---- QUERY
+select count(*) from ice_tbl;
+---- RESULTS
+8
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+aggregation(SUM, NumFileMetadataRead): 0
+====
+---- QUERY
+select count(*) from ice_tbl for system_time as of now();
+---- RESULTS
+8
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+aggregation(SUM, NumFileMetadataRead): 0
+====
+---- QUERY
+set explain_level=3;
+explain select count(col_i), count(*) from ice_tbl;
+---- RESULTS: VERIFY_IS_SUBSET
+'Analyzed query: SELECT count(col_i), CAST(8 AS BIGINT) FROM'
+'$DATABASE.ice_tbl'
+====
+---- QUERY
+set explain_level=3;
+explain select count(distinct col_i), count(*) from ice_tbl;
+---- RESULTS: VERIFY_IS_SUBSET
+'Analyzed query: SELECT count(DISTINCT col_i), CAST(8 AS BIGINT) FROM'
+'$DATABASE.ice_tbl'
+====
+---- QUERY
+set explain_level=3;
+explain select min(col_i), count(*), max(col_i) from ice_tbl;
+---- RESULTS: VERIFY_IS_SUBSET
+'Analyzed query: SELECT min(col_i), CAST(8 AS BIGINT), max(col_i) FROM'
+'$DATABASE.ice_tbl'
+====
+---- QUERY
+set explain_level=3;
+explain select 123, count(*), 321 from ice_tbl;
+---- RESULTS: VERIFY_IS_SUBSET
+'Analyzed query: SELECT CAST(123 AS TINYINT), CAST(8 AS BIGINT), CAST(321 AS'
+'SMALLINT)'
+====
+---- QUERY
+select
+  count(*)
+from
+  ice_tbl
+where
+  col_s = 'odd';
+---- RESULTS
+4
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, NumFileMetadataRead): 0
+====
+---- QUERY
+select
+  count(*)
+from
+  ice_tbl
+having
+  avg(col_i) < 0;
+---- RESULTS
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 4
+aggregation(SUM, NumFileMetadataRead): 0
+====
+---- QUERY
+select
+  count(*)
+from
+  ice_tbl
+group by
+  col_s;
+---- RESULTS
+4
+4
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 4
+aggregation(SUM, NumFileMetadataRead): 0
+====
+---- QUERY
+select
+  count(distinct col_i)
+from
+  ice_tbl;
+---- RESULTS
+6
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 4
+aggregation(SUM, NumFileMetadataRead): 0
+====
+---- QUERY
+truncate ice_tbl;
+---- RESULTS
+'Table has been truncated.'
+====
+---- QUERY
+select count(*) from ice_tbl;
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+aggregation(SUM, NumFileMetadataRead): 0
+====
+---- QUERY
+create table parq_tbl(col_i INT, col_s STRING) PARTITIONED BY(x INT) STORED AS PARQUET;
+---- RESULTS
+'Table has been created.'
+====
+---- QUERY
+insert into parq_tbl PARTITION(x = 12340) values (0, "a");
+insert into parq_tbl PARTITION(x = 12341) values (1, "b");
+insert into parq_tbl PARTITION(x = 12342) values (2, "c");
+select count(*) from parq_tbl;
+---- RESULTS
+3
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+aggregation(SUM, NumFileMetadataRead): 3
+====
+---- QUERY
+select count(*) as c from ice_tbl_u1  union all (select count(*) c from ice_tbl_u2) order by c;
+---- RESULTS
+3
+6
+---- TYPES
+BIGINT
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+aggregation(SUM, NumFileMetadataRead): 0
+====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-upper-lower-bound-metrics.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-upper-lower-bound-metrics.test
index c35ed16d2..78a4219d1 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-upper-lower-bound-metrics.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-upper-lower-bound-metrics.test
@@ -20,7 +20,7 @@ select count(*) from ice_types1;
 BIGINT
 ---- RUNTIME_PROFILE
 aggregation(SUM, NumRowGroups): 0
-aggregation(SUM, NumFileMetadataRead): 3
+aggregation(SUM, NumFileMetadataRead): 0
 ====
 ---- QUERY
 show files in ice_types1;
@@ -252,7 +252,7 @@ select count(*) from ice_types2;
 BIGINT
 ---- RUNTIME_PROFILE
 aggregation(SUM, NumRowGroups): 0
-aggregation(SUM, NumFileMetadataRead): 3
+aggregation(SUM, NumFileMetadataRead): 0
 ====
 ---- QUERY
 show files in ice_types2;
@@ -370,7 +370,7 @@ select count(*) from ice_types3;
 BIGINT
 ---- RUNTIME_PROFILE
 aggregation(SUM, NumRowGroups): 0
-aggregation(SUM, NumFileMetadataRead): 3
+aggregation(SUM, NumFileMetadataRead): 0
 ====
 ---- QUERY
 show files in ice_types3;
diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py
index c5ff5a539..defcc8e36 100644
--- a/tests/query_test/test_iceberg.py
+++ b/tests/query_test/test_iceberg.py
@@ -224,16 +224,34 @@ class TestIcebergTable(ImpalaTestSuite):
       for r in expected_results:
         assert r in data.data
 
+    def expect_for_count_star(query, expected):
+      data = impalad_client.execute(query)
+      assert len(data.data) == 1
+      assert expected in data.data
+      assert "NumRowGroups" not in data.runtime_profile
+      assert "NumFileMetadataRead" not in data.runtime_profile
+
     def expect_results_t(ts, expected_results):
       expect_results(
           "select * from {0} for system_time as of {1}".format(tbl_name, ts),
           expected_results)
 
+    def expect_for_count_star_t(ts, expected):
+      expect_for_count_star(
+          "select count(*) from {0} for system_time as of {1}".format(tbl_name, ts),
+          expected)
+
     def expect_results_v(snapshot_id, expected_results):
       expect_results(
           "select * from {0} for system_version as of {1}".format(tbl_name, snapshot_id),
           expected_results)
 
+    def expect_for_count_star_v(snapshot_id, expected):
+      expect_for_count_star(
+          "select count(*) from {0} for system_version as of {1}".format(
+              tbl_name, snapshot_id),
+          expected)
+
     def quote(s):
       return "'{0}'".format(s)
 
@@ -279,6 +297,21 @@ class TestIcebergTable(ImpalaTestSuite):
       expect_results_v(snapshots[2], [])
       expect_results_v(snapshots[3], ['100'])
 
+      # Test of plain count star optimization
+      # 'NumRowGroups' and 'NumFileMetadataRead' should not appear in profile
+      expect_for_count_star_t("now()", '1')
+      expect_for_count_star_t(quote(ts_1), '1')
+      expect_for_count_star_t(quote(ts_2), '2')
+      expect_for_count_star_t(quote(ts_3), '0')
+      expect_for_count_star_t(cast_ts(ts_3) + " + interval 1 seconds", '0')
+      expect_for_count_star_t(quote(ts_4), '1')
+      expect_for_count_star_t(cast_ts(ts_4) + " - interval 5 seconds", '0')
+      expect_for_count_star_t(cast_ts(ts_4) + " + interval 1 hours", '1')
+      expect_for_count_star_v(snapshots[0], '1')
+      expect_for_count_star_v(snapshots[1], '2')
+      expect_for_count_star_v(snapshots[2], '0')
+      expect_for_count_star_v(snapshots[3], '1')
+
       # SELECT diff
       expect_results("""SELECT * FROM {tbl} FOR SYSTEM_TIME AS OF '{ts_new}'
                         MINUS
@@ -624,3 +657,7 @@ class TestIcebergTable(ImpalaTestSuite):
   def test_compound_predicate_push_down(self, vector, unique_database):
       self.run_test_case('QueryTest/iceberg-compound-predicate-push-down', vector,
                          use_db=unique_database)
+
+  def test_plain_count_star_optimization(self, vector, unique_database):
+      self.run_test_case('QueryTest/iceberg-plain-count-star-optimization', vector,
+                         use_db=unique_database)


[impala] 02/02: IMPALA-9615: re2's max_mem opt configurable via an Impala startup flag

Posted by la...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

laszlog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit a625a95dbd347d5a5e64566c77bcb27e991ce352
Author: Omid Shahidi <os...@cloudera.com>
AuthorDate: Thu Jun 2 16:39:40 2022 -0700

    IMPALA-9615: re2's max_mem opt configurable via an Impala startup flag
    
    Some regex patterns require more memory to be compiled and pattern matched
    using different string functions and like predicate available.
    For more memory consuming patterns this can cause the following error:
    "re2/re2.cc:667: DFA out of memory:
            size xxxxx, bytemap range xx, list count xxxxx".
    
    To avoid such errors in Impalad's ERROR log, a global flag can
    be added to impala cluster startup. The re2_mem_limit flag will
    accept a memory specification string to set the re2 max_mem parameter for
    memory used to store regexps in Bytes.
    
    Testing:
     - Use a long regex pattern to use up all the memory in the
       case of allocating less or the same amount of memory as default for re2.
       By using a greater value for re2_mem_limit flag, the regexp can be
       consumed with no error.
    
    Change-Id: Idf28d2f7217b1322ab8fdfb2c02fff0608078571
    Reviewed-on: http://gerrit.cloudera.org:8080/18602
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/common/global-flags.cc            |  9 +++++
 be/src/common/init.cc                    | 13 ++++++
 be/src/exprs/like-predicate.cc           |  5 +++
 be/src/exprs/string-functions-ir.cc      | 19 ++++++++-
 be/src/exprs/string-functions.h          |  4 ++
 tests/custom_cluster/test_re2_max_mem.py | 69 ++++++++++++++++++++++++++++++++
 6 files changed, 118 insertions(+), 1 deletion(-)

diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc
index 563ed05be..c427ed595 100644
--- a/be/src/common/global-flags.cc
+++ b/be/src/common/global-flags.cc
@@ -228,6 +228,15 @@ DEFINE_int32(max_log_files, 10, "Maximum number of log files to retain per sever
     "level. The most recent log files are retained. If set to 0, all log files are "
     "retained.");
 
+static const string re2_mem_limit_help_msg =
+    "Maximum bytes of memory to be used by re2's regex engine "
+    "to hold the compiled form of the regexp. For more memory-consuming patterns, "
+    "this can be set to be a higher number."
+    + Substitute(MEM_UNITS_HELP_MSG, "memory limit for RE2 max_mem opt")
+    + "Default to 8MB. Using percentage is discouraged.";
+
+DEFINE_string(re2_mem_limit, "8MB", re2_mem_limit_help_msg.c_str());
+
 // The read size is the preferred size of the reads issued to HDFS or the local FS.
 // There is a trade off of latency and throughput, trying to keep disks busy but
 // not introduce seeks.  The literature seems to agree that with 8 MB reads, random
diff --git a/be/src/common/init.cc b/be/src/common/init.cc
index 2befa4807..fdeb2a835 100644
--- a/be/src/common/init.cc
+++ b/be/src/common/init.cc
@@ -26,6 +26,7 @@
 #include "common/status.h"
 #include "exec/kudu-util.h"
 #include "exprs/scalar-expr-evaluator.h"
+#include "exprs/string-functions.h"
 #include "exprs/timezone_db.h"
 #include "gutil/atomicops.h"
 #include "gutil/strings/substitute.h"
@@ -50,6 +51,7 @@
 #include "util/network-util.h"
 #include "util/openssl-util.h"
 #include "util/os-info.h"
+#include "util/parse-util.h"
 #include "util/periodic-counter-updater.h"
 #include "util/pretty-printer.h"
 #include "util/redactor.h"
@@ -72,6 +74,7 @@ DECLARE_int32(max_log_files);
 DECLARE_int32(max_minidumps);
 DECLARE_string(redaction_rules_file);
 DECLARE_bool(redirect_stdout_stderr);
+DECLARE_string(re2_mem_limit);
 DECLARE_string(reserved_words_version);
 DECLARE_bool(symbolize_stacktrace);
 DECLARE_string(debug_actions);
@@ -338,6 +341,16 @@ void impala::InitCommonRuntime(int argc, char** argv, bool init_jvm,
     CLEAN_EXIT_WITH_ERROR(Substitute("read_size can not be lower than $0",
         READ_SIZE_MIN_VALUE));
   }
+
+  bool is_percent = false; // not used
+  int64_t re2_mem_limit = ParseUtil::ParseMemSpec(FLAGS_re2_mem_limit, &is_percent, 0);
+  if (re2_mem_limit <= 0) {
+    CLEAN_EXIT_WITH_ERROR(
+        Substitute("Invalid mem limit for re2's regex engine: $0", FLAGS_re2_mem_limit));
+  } else {
+    StringFunctions::SetRE2MemLimit(re2_mem_limit);
+  }
+
   if (FLAGS_reserved_words_version != "2.11.0" && FLAGS_reserved_words_version != "3.0.0")
   {
     CLEAN_EXIT_WITH_ERROR(Substitute("Invalid flag reserved_words_version. The value must"
diff --git a/be/src/exprs/like-predicate.cc b/be/src/exprs/like-predicate.cc
index 4eb1110a3..899251920 100644
--- a/be/src/exprs/like-predicate.cc
+++ b/be/src/exprs/like-predicate.cc
@@ -101,6 +101,7 @@ void LikePredicate::LikePrepareInternal(FunctionContext* context,
       opts.set_never_nl(false);
       opts.set_dot_nl(true);
       opts.set_case_sensitive(case_sensitive);
+      StringFunctions::SetRE2MemOpt(&opts);
       state->regex_.reset(new RE2(re_pattern, opts));
       if (!state->regex_->ok()) {
         context->SetError(Substitute("Invalid regex: $0", pattern_str).c_str());
@@ -164,6 +165,7 @@ void LikePredicate::RegexPrepareInternal(FunctionContext* context,
     } else {
       RE2::Options opts;
       opts.set_case_sensitive(case_sensitive);
+      StringFunctions::SetRE2MemOpt(&opts);
       state->regex_.reset(new RE2(pattern_str, opts));
       if (!state->regex_->ok()) {
         context->SetError(
@@ -196,6 +198,7 @@ void LikePredicate::RegexpLikePrepare(FunctionContext* context,
     }
     RE2::Options opts;
     string error_str;
+    StringFunctions::SetRE2MemOpt(&opts);
     if (!StringFunctions::SetRE2Options(*match_parameter, &error_str, &opts)) {
       context->SetError(error_str.c_str());
       return;
@@ -220,6 +223,7 @@ BooleanVal LikePredicate::RegexpLikeInternal(FunctionContext* context,
     if (match_parameter.is_null) return BooleanVal::null();
     RE2::Options opts;
     string error_str;
+    StringFunctions::SetRE2MemOpt(&opts);
     if (!StringFunctions::SetRE2Options(match_parameter, &error_str, &opts)) {
       context->SetError(error_str.c_str());
       return BooleanVal(false);
@@ -340,6 +344,7 @@ BooleanVal LikePredicate::RegexMatch(FunctionContext* context,
   } else {
     string re_pattern;
     RE2::Options opts;
+    StringFunctions::SetRE2MemOpt(&opts);
     if (is_like_pattern) {
       ConvertLikePattern(context, pattern_value, &re_pattern);
       opts.set_never_nl(false);
diff --git a/be/src/exprs/string-functions-ir.cc b/be/src/exprs/string-functions-ir.cc
index 96724ad9b..ee09f13e1 100644
--- a/be/src/exprs/string-functions-ir.cc
+++ b/be/src/exprs/string-functions-ir.cc
@@ -49,6 +49,8 @@ namespace impala {
 const char* ERROR_CHARACTER_LIMIT_EXCEEDED =
   "$0 is larger than allowed limit of $1 character data.";
 
+uint64_t StringFunctions::re2_mem_limit_ = 8 << 20;
+
 // This behaves identically to the mysql implementation, namely:
 //  - 1-indexed positions
 //  - supported negative positions (count from the end of the string)
@@ -880,6 +882,8 @@ re2::RE2* CompileRegex(const StringVal& pattern, string* error_str,
   options.set_log_errors(false);
   // Return the leftmost longest match (rather than the first match).
   options.set_longest_match(true);
+  // Set the maximum memory used by re2's regex engine for storage
+  StringFunctions::SetRE2MemOpt(&options);
   if (!match_parameter.is_null &&
       !StringFunctions::SetRE2Options(match_parameter, error_str, &options)) {
     return NULL;
@@ -926,6 +930,19 @@ bool StringFunctions::SetRE2Options(const StringVal& match_parameter,
   return true;
 }
 
+void StringFunctions::SetRE2MemLimit(int64_t re2_mem_limit) {
+  // TODO: include the memory requirement for re2 in the memory planner estimates
+  DCHECK(re2_mem_limit > 0);
+  StringFunctions::re2_mem_limit_ = re2_mem_limit;
+}
+
+// Set the maximum memory used by re2's regex engine for a compiled regex expression's
+// storage. By default, it uses 8 MiB. This can be used to avoid DFA state cache flush
+// resulting in slower execution
+void StringFunctions::SetRE2MemOpt(re2::RE2::Options* opts) {
+  opts->set_max_mem(StringFunctions::re2_mem_limit_);
+}
+
 void StringFunctions::RegexpPrepare(
     FunctionContext* context, FunctionContext::FunctionStateScope scope) {
   if (scope != FunctionContext::THREAD_LOCAL) return;
@@ -1778,4 +1795,4 @@ IntVal StringFunctions::DamerauLevenshtein(
   ctx->Free(reinterpret_cast<uint8_t*>(rows));
   return IntVal(result);
 }
-}
+}
\ No newline at end of file
diff --git a/be/src/exprs/string-functions.h b/be/src/exprs/string-functions.h
index e2b7dea9a..1deb8b9fa 100644
--- a/be/src/exprs/string-functions.h
+++ b/be/src/exprs/string-functions.h
@@ -128,6 +128,7 @@ class StringFunctions {
 
   static bool SetRE2Options(const StringVal& match_parameter, std::string* error_str,
       re2::RE2::Options* opts);
+  static void SetRE2MemOpt(re2::RE2::Options* opts);
   static void RegexpPrepare(FunctionContext*, FunctionContext::FunctionStateScope);
   static void RegexpClose(FunctionContext*, FunctionContext::FunctionStateScope);
   static StringVal RegexpEscape(FunctionContext*, const StringVal& str);
@@ -155,6 +156,8 @@ class StringFunctions {
       const StringVal& key, const StringVal& part);
   static void ParseUrlClose(FunctionContext*, FunctionContext::FunctionStateScope);
 
+  static void SetRE2MemLimit(int64_t re2_mem_limit);
+
   /// Converts ASCII 'val' to corresponding character.
   static StringVal Chr(FunctionContext* context, const IntVal& val);
 
@@ -201,6 +204,7 @@ class StringFunctions {
       const DoubleVal& boost_threshold);
 
  private:
+  static uint64_t re2_mem_limit_;
   /// Templatized implementation of the actual string trimming function.
   /// The first parameter, 'D', is one of StringFunctions::TrimPosition values.
   /// The second parameter, 'IS_IMPLICIT_WHITESPACE', is true when the set of characters
diff --git a/tests/custom_cluster/test_re2_max_mem.py b/tests/custom_cluster/test_re2_max_mem.py
new file mode 100755
index 000000000..9348bcb1a
--- /dev/null
+++ b/tests/custom_cluster/test_re2_max_mem.py
@@ -0,0 +1,69 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+
+
+class TestRE2MaxMem(CustomClusterTestSuite):
+    """test if re2 max_mem parameter is set using the global flag in imapalad"""
+    @classmethod
+    def get_workload(cls):
+        return 'tpch'
+
+    def _test_re2_max_mem(self, expect_fail, dfa_out_of_mem):
+        """"test to see given an amount of memory (in Bytes) does the re2 print an error for
+        DFA run out of memory when compiling and pattern matching a long regexp"""
+
+        client = self.create_impala_client()
+
+        query = (
+          "select c_comment from tpch.customer where regexp_like(c_comment,"
+          "repeat('([a-c].*[t-w]|[t].?[h]|[^xyz]|.*?(\\\\d+))\\\\w', 1000),'i');")
+
+        # RE2 regex compilation storage is also dependent on max_mem parameter, for small
+        # values the regex pattern will fail as invalid pattern although it can be valid
+        # given a higher max_mem for RE2.
+        # See: https://github.com/google/re2/blob/3e9622e/re2/re2.h#L619-L648
+        if expect_fail:
+            # if we expect the regex compilation to fail we can ignore
+            # DFA out of memory issue at that will be brought up when
+            # RE2 does not having enough storage to store compiled regex
+            self.execute_query_expect_failure(client, query)
+        else:
+            self.execute_query_expect_success(client, query)
+            self.assert_impalad_log_contains(
+              "ERROR", "DFA out of memory", -1 if dfa_out_of_mem else 0)
+
+    @pytest.mark.execute_serially
+    @CustomClusterTestSuite.with_args(cluster_size=1,
+      impalad_args="--re2_mem_limit=1KB")
+    def test_dfa_out_of_mem(self):
+        self._test_re2_max_mem(True, True)
+
+    @pytest.mark.execute_serially
+    @CustomClusterTestSuite.with_args(cluster_size=1)
+    def test_re2_max_mem_not_specified(self):
+        # default max_mem set by re2's regex engine is 8 MiB
+        self._test_re2_max_mem(False, True)
+
+    @pytest.mark.execute_serially
+    @CustomClusterTestSuite.with_args(cluster_size=1,
+        impalad_args="--re2_mem_limit=200MB")
+    def test_dfa_not_out_of_mem(self):
+        self._test_re2_max_mem(False, False)