You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2020/08/07 21:02:53 UTC

[impala] branch master updated: IMPALA-9478: Profiles should indicate if custom UDFs are being used

This is an automated email from the ASF dual-hosted git repository.

stakiar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new a005778  IMPALA-9478: Profiles should indicate if custom UDFs are being used
a005778 is described below

commit a0057788c5c2300f58b6615a27116b8331171e06
Author: Sahil Takiar <ta...@gmail.com>
AuthorDate: Mon Jul 13 11:37:16 2020 -0700

    IMPALA-9478: Profiles should indicate if custom UDFs are being used
    
    Adds a marker to runtime profiles and explain plans indicating if custom
    (e.g. non-built in) user-defined functions are being used. For explain
    plans, a SQL-style comment is added after any function call. For runtime
    profiles, a new Frontend entry called "User Defined Functions (UDFs)"
    lists out all UDFs analyzed during planning.
    
    Take the following example:
    
      create function hive_lower(string) returns string location
      '/test-warehouse/hive-exec.jar'
      symbol='org.apache.hadoop.hive.ql.udf.UDFLower';
      set explain_level=3;
      explain select * from functional.alltypes order by hive_lower(string_col);
      ...
      01:SORT
        order by: default.hive_lower(string_col) /* JAVA UDF */ ASC
        materialized: default.hive_lower(string_col) /* JAVA UDF */
      ...
    
    This shows up in the runtime profile as well.
    
    When the above query is actually run, the runtime profile includes the
    following entry:
    
      Frontend
        User Defined Functions (UDFs): default.hive_lower
    
    Error messages will also include SQL-style comments about any UDFs used.
    For example:
    
      select aggfn(int_col) over (partition by int_col) from
      functional.alltypesagg
    
    Throws:
    
      Aggregate function 'default.aggfn(int_col) /* NATIVE UDF */' not
      supported with OVER clause.
    
    Testing:
    * Added tests to test_udfs.py
    * Ran core tests
    
    Change-Id: I79122e6cc74fd5a62c76962289a1615fbac2f345
    Reviewed-on: http://gerrit.cloudera.org:8080/16188
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../apache/impala/analysis/FunctionCallExpr.java   | 14 +++++++++
 .../org/apache/impala/service/FrontendProfile.java | 27 +++++++++++++++++
 .../apache/impala/analysis/AnalyzeStmtsTest.java   |  5 ++--
 .../impala/analysis/AnalyzeSubqueriesTest.java     |  2 +-
 .../org/apache/impala/common/FrontendTestBase.java | 35 +++++++++++++---------
 .../PlannerTest/sort-expr-materialization.test     |  8 ++---
 tests/query_test/test_udfs.py                      | 18 +++++++++++
 7 files changed, 88 insertions(+), 21 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/analysis/FunctionCallExpr.java b/fe/src/main/java/org/apache/impala/analysis/FunctionCallExpr.java
index 86a8876..b3dcd55 100644
--- a/fe/src/main/java/org/apache/impala/analysis/FunctionCallExpr.java
+++ b/fe/src/main/java/org/apache/impala/analysis/FunctionCallExpr.java
@@ -29,6 +29,7 @@ import org.apache.impala.catalog.ScalarType;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.TreeNode;
+import org.apache.impala.service.FrontendProfile;
 import org.apache.impala.thrift.TAggregateExpr;
 import org.apache.impala.thrift.TColumnType;
 import org.apache.impala.thrift.TExprNode;
@@ -228,6 +229,11 @@ public class FunctionCallExpr extends Expr {
     sb.append(Joiner.on(", ").join(childrenToSql(options)));
     if (params_.isIgnoreNulls()) sb.append(" IGNORE NULLS");
     sb.append(")");
+    if (fn_ != null && !fnName_.isBuiltin()) {
+      sb.append(" /* ");
+      sb.append(fn_.getBinaryType());
+      sb.append(" UDF */");
+    }
     return sb.toString();
   }
 
@@ -500,6 +506,14 @@ public class FunctionCallExpr extends Expr {
   @Override
   protected void analyzeImpl(Analyzer analyzer) throws AnalysisException {
     fnName_.analyze(analyzer);
+    if (!fnName_.isBuiltin()) {
+      FrontendProfile profile = FrontendProfile.getCurrent();
+      String udfInfoStringKey = "User Defined Functions (UDFs)";
+      String functionName = fnName_.toString();
+      if (!profile.getInfoString(udfInfoStringKey).contains(functionName)) {
+        profile.appendInfoString(udfInfoStringKey, functionName);
+      }
+    }
 
     if (isMergeAggFn()) {
       // This is the function call expr after splitting up to a merge aggregation.
diff --git a/fe/src/main/java/org/apache/impala/service/FrontendProfile.java b/fe/src/main/java/org/apache/impala/service/FrontendProfile.java
index 3344bf6..8a471cc 100644
--- a/fe/src/main/java/org/apache/impala/service/FrontendProfile.java
+++ b/fe/src/main/java/org/apache/impala/service/FrontendProfile.java
@@ -127,6 +127,33 @@ public class FrontendProfile {
   }
 
   /**
+   * Appends an informational key/value string pair to the profile. These are written out
+   * as is to the user. Values are appended to a comma separated list of values.
+   */
+  public synchronized void appendInfoString(String key, String val) {
+    Preconditions.checkState(profile_ != null, "already emitted profile");
+    Preconditions.checkNotNull(key);
+    Preconditions.checkNotNull(val);
+    Map<String, String> info_strings = profile_.getInfo_strings();
+    if (info_strings.containsKey(key)) {
+      info_strings.put(key, info_strings.get(key) + ", " + val);
+    } else {
+      info_strings.put(key, val);
+      profile_.getInfo_strings_display_order().add(key);
+    }
+  }
+
+  /**
+   * Returns the info string associated with the given key. Returns an empty String if
+   * the key does not exist.
+   */
+  public synchronized String getInfoString(String key) {
+    Preconditions.checkState(profile_ != null, "already emitted profile");
+    Preconditions.checkNotNull(key);
+    return profile_.getInfo_strings().getOrDefault(key, "");
+  }
+
+  /**
    * Add 'delta' to the counter with the given name and unit. Counters are created
    * on-demand.
    */
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
index 73dae60..98c5576 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
@@ -2170,7 +2170,8 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
         "aggregation without a FROM clause is not allowed");
     AnalysisError(
         "select aggfn(int_col) over (partition by int_col) from functional.alltypesagg",
-        "Aggregate function 'default.aggfn(int_col)' not supported with OVER clause.");
+        "Aggregate function 'default.aggfn(int_col) /* NATIVE UDF */' not supported " +
+        "with OVER clause.");
     AnalysisError("select aggfn(distinct int_col) from functional.alltypesagg",
         "User defined aggregates do not support DISTINCT.");
     AnalyzesOk("select default.aggfn(int_col) from functional.alltypes");
@@ -2228,7 +2229,7 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
         "aggregate function must not contain analytic parameters");
     AnalysisError("select min(aggfn(int_col)) from functional.alltypes",
         "aggregate function must not contain aggregate parameters: " +
-        "min(default.aggfn(int_col))");
+        "min(default.aggfn(int_col) /* NATIVE UDF */)");
 
     // wrong type
     AnalysisError("select sum(timestamp_col) from functional.alltypes",
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeSubqueriesTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeSubqueriesTest.java
index c490450..2613b3c 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeSubqueriesTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeSubqueriesTest.java
@@ -1295,7 +1295,7 @@ public class AnalyzeSubqueriesTest extends AnalyzerTest {
     AnalysisError("select * from functional.alltypesagg g where " +
         "(select aggfn(int_col) from functional.alltypes s where " +
         "s.id = g.id) = 10", "UDAs are not supported in the select list of " +
-        "correlated subqueries: (SELECT default.aggfn(int_col) FROM " +
+        "correlated subqueries: (SELECT default.aggfn(int_col) /* NATIVE UDF */ FROM " +
         "functional.alltypes s WHERE s.id = g.id)");
     AnalyzesOk("select * from functional.alltypesagg g where " +
         "(select aggfn(int_col) from functional.alltypes s where " +
diff --git a/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java b/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java
index 1087b19..980dca4 100644
--- a/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java
+++ b/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java
@@ -55,6 +55,7 @@ import org.apache.impala.catalog.Table;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.service.FeCatalogManager;
 import org.apache.impala.service.Frontend;
+import org.apache.impala.service.FrontendProfile;
 import org.apache.impala.testutil.ImpaladTestCatalog;
 import org.apache.impala.thrift.TAccessEvent;
 import org.apache.impala.thrift.TQueryOptions;
@@ -241,20 +242,24 @@ public class FrontendTestBase extends AbstractFrontendTest {
    * If 'expectedWarning' is not null, asserts that a warning is produced.
    */
   public ParseNode AnalyzesOk(String stmt, AnalysisContext ctx, String expectedWarning) {
-    return feFixture_.analyzeStmt(stmt, ctx, expectedWarning);
+    try (FrontendProfile.Scope scope = FrontendProfile.createNewWithScope()) {
+      return feFixture_.analyzeStmt(stmt, ctx, expectedWarning);
+    }
   }
 
   /**
    * Analyzes the given statement without performing rewrites or authorization.
    */
   public StatementBase AnalyzesOkNoRewrite(StatementBase stmt) throws ImpalaException {
-    AnalysisContext ctx = createAnalysisCtx();
-    StmtMetadataLoader mdLoader =
-        new StmtMetadataLoader(frontend_, ctx.getQueryCtx().session.database, null);
-    StmtTableCache loadedTables = mdLoader.loadTables(stmt);
-    Analyzer analyzer = ctx.createAnalyzer(loadedTables);
-    stmt.analyze(analyzer);
-    return stmt;
+    try (FrontendProfile.Scope scope = FrontendProfile.createNewWithScope()) {
+      AnalysisContext ctx = createAnalysisCtx();
+      StmtMetadataLoader mdLoader =
+          new StmtMetadataLoader(frontend_, ctx.getQueryCtx().session.database, null);
+      StmtTableCache loadedTables = mdLoader.loadTables(stmt);
+      Analyzer analyzer = ctx.createAnalyzer(loadedTables);
+      stmt.analyze(analyzer);
+      return stmt;
+    }
   }
 
   /**
@@ -307,12 +312,14 @@ public class FrontendTestBase extends AbstractFrontendTest {
 
   protected AnalysisResult parseAndAnalyze(String stmt, AnalysisContext ctx, Frontend fe)
       throws ImpalaException {
-    ctx.getQueryCtx().getClient_request().setStmt(stmt);
-    StatementBase parsedStmt = Parser.parse(stmt, ctx.getQueryOptions());
-    StmtMetadataLoader mdLoader =
-        new StmtMetadataLoader(fe, ctx.getQueryCtx().session.database, null);
-    StmtTableCache stmtTableCache = mdLoader.loadTables(parsedStmt);
-    return ctx.analyzeAndAuthorize(parsedStmt, stmtTableCache, fe.getAuthzChecker());
+    try (FrontendProfile.Scope scope = FrontendProfile.createNewWithScope()) {
+      ctx.getQueryCtx().getClient_request().setStmt(stmt);
+      StatementBase parsedStmt = Parser.parse(stmt, ctx.getQueryOptions());
+      StmtMetadataLoader mdLoader =
+          new StmtMetadataLoader(fe, ctx.getQueryCtx().session.database, null);
+      StmtTableCache stmtTableCache = mdLoader.loadTables(parsedStmt);
+      return ctx.analyzeAndAuthorize(parsedStmt, stmtTableCache, fe.getAuthzChecker());
+    }
   }
 
   /**
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/sort-expr-materialization.test b/testdata/workloads/functional-planner/queries/PlannerTest/sort-expr-materialization.test
index 005b709..5cb6b35 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/sort-expr-materialization.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/sort-expr-materialization.test
@@ -180,8 +180,8 @@ PLAN-ROOT SINK
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 01:SORT
-|  order by: default.testfn(double_col) ASC
-|  materialized: default.testfn(double_col)
+|  order by: default.testfn(double_col) /* NATIVE UDF */ ASC
+|  materialized: default.testfn(double_col) /* NATIVE UDF */
 |  mem-estimate=12.00MB mem-reservation=12.00MB spill-buffer=2.00MB thread-reservation=0
 |  tuple-ids=1 row-size=93B cardinality=7.30K
 |  in pipelines: 01(GETNEXT), 00(OPEN)
@@ -253,8 +253,8 @@ PLAN-ROOT SINK
 |  in pipelines: 01(GETNEXT)
 |
 01:SORT
-|  order by: default.testfn(double_col) ASC NULLS LAST, random() ASC
-|  materialized: default.testfn(double_col), random()
+|  order by: default.testfn(double_col) /* NATIVE UDF */ ASC NULLS LAST, random() ASC
+|  materialized: default.testfn(double_col) /* NATIVE UDF */, random()
 |  mem-estimate=6.00MB mem-reservation=6.00MB spill-buffer=2.00MB thread-reservation=0
 |  tuple-ids=6 row-size=24B cardinality=8
 |  in pipelines: 01(GETNEXT), 00(OPEN)
diff --git a/tests/query_test/test_udfs.py b/tests/query_test/test_udfs.py
index 8572db8..3d73918 100644
--- a/tests/query_test/test_udfs.py
+++ b/tests/query_test/test_udfs.py
@@ -17,6 +17,7 @@
 
 from copy import copy
 import os
+import re
 import pytest
 import tempfile
 from subprocess import call, check_call
@@ -614,3 +615,20 @@ class TestUdfTargeted(TestUdfBase):
     results = self.client.fetch(query, handle, -1)
     assert results.success
     assert len(results.data) == 9999
+
+  def test_udf_profile(self, vector, unique_database):
+    """Test to validate that explain plans and runtime profiles contain information about
+    any custom UDFs used in an Impala query."""
+    self.client.execute(
+        "create function {0}.hive_substring(string, int) returns string location '{1}' "
+        "symbol='org.apache.hadoop.hive.ql.udf.UDFSubstr'".format(
+            unique_database, get_fs_path('/test-warehouse/hive-exec.jar')))
+    profile = self.execute_query_expect_success(self.client,
+        "select {0}.hive_substring(string_col, 1), {0}.hive_substring(string_col, 2) "
+        "from functional.alltypes limit 10".format(unique_database)).runtime_profile
+
+    assert re.search("output exprs.*hive_substring.*/\* JAVA UDF \*/", profile)
+    # Ensure that hive_substring only shows up once in the list of UDFs.
+    assert re.search(
+        "User Defined Functions \(UDFs\): {0}\.hive_substring\s*[\r\n]".format(
+            unique_database), profile)