You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2020/08/07 21:02:53 UTC
[impala] branch master updated: IMPALA-9478: Profiles should
indicate if custom UDFs are being used
This is an automated email from the ASF dual-hosted git repository.
stakiar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new a005778 IMPALA-9478: Profiles should indicate if custom UDFs are being used
a005778 is described below
commit a0057788c5c2300f58b6615a27116b8331171e06
Author: Sahil Takiar <ta...@gmail.com>
AuthorDate: Mon Jul 13 11:37:16 2020 -0700
IMPALA-9478: Profiles should indicate if custom UDFs are being used
Adds a marker to runtime profiles and explain plans indicating if custom
(e.g. non-built in) user-defined functions are being used. For explain
plans, a SQL-style comment is added after any function call. For runtime
profiles, a new Frontend entry called "User Defined Functions (UDFs)"
lists out all UDFs analyzed during planning.
Take the following example:
create function hive_lower(string) returns string location
'/test-warehouse/hive-exec.jar'
symbol='org.apache.hadoop.hive.ql.udf.UDFLower';
set explain_level=3;
explain select * from functional.alltypes order by hive_lower(string_col);
...
01:SORT
order by: default.hive_lower(string_col) /* JAVA UDF */ ASC
materialized: default.hive_lower(string_col) /* JAVA UDF */
...
This shows up in the runtime profile as well.
When the above query is actually run, the runtime profile includes the
following entry:
Frontend
User Defined Functions (UDFs): default.hive_lower
Error messages will also include SQL-style comments about any UDFs used.
For example:
select aggfn(int_col) over (partition by int_col) from
functional.alltypesagg
Throws:
Aggregate function 'default.aggfn(int_col) /* NATIVE UDF */' not
supported with OVER clause.
Testing:
* Added tests to test_udfs.py
* Ran core tests
Change-Id: I79122e6cc74fd5a62c76962289a1615fbac2f345
Reviewed-on: http://gerrit.cloudera.org:8080/16188
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
.../apache/impala/analysis/FunctionCallExpr.java | 14 +++++++++
.../org/apache/impala/service/FrontendProfile.java | 27 +++++++++++++++++
.../apache/impala/analysis/AnalyzeStmtsTest.java | 5 ++--
.../impala/analysis/AnalyzeSubqueriesTest.java | 2 +-
.../org/apache/impala/common/FrontendTestBase.java | 35 +++++++++++++---------
.../PlannerTest/sort-expr-materialization.test | 8 ++---
tests/query_test/test_udfs.py | 18 +++++++++++
7 files changed, 88 insertions(+), 21 deletions(-)
diff --git a/fe/src/main/java/org/apache/impala/analysis/FunctionCallExpr.java b/fe/src/main/java/org/apache/impala/analysis/FunctionCallExpr.java
index 86a8876..b3dcd55 100644
--- a/fe/src/main/java/org/apache/impala/analysis/FunctionCallExpr.java
+++ b/fe/src/main/java/org/apache/impala/analysis/FunctionCallExpr.java
@@ -29,6 +29,7 @@ import org.apache.impala.catalog.ScalarType;
import org.apache.impala.catalog.Type;
import org.apache.impala.common.AnalysisException;
import org.apache.impala.common.TreeNode;
+import org.apache.impala.service.FrontendProfile;
import org.apache.impala.thrift.TAggregateExpr;
import org.apache.impala.thrift.TColumnType;
import org.apache.impala.thrift.TExprNode;
@@ -228,6 +229,11 @@ public class FunctionCallExpr extends Expr {
sb.append(Joiner.on(", ").join(childrenToSql(options)));
if (params_.isIgnoreNulls()) sb.append(" IGNORE NULLS");
sb.append(")");
+ if (fn_ != null && !fnName_.isBuiltin()) {
+ sb.append(" /* ");
+ sb.append(fn_.getBinaryType());
+ sb.append(" UDF */");
+ }
return sb.toString();
}
@@ -500,6 +506,14 @@ public class FunctionCallExpr extends Expr {
@Override
protected void analyzeImpl(Analyzer analyzer) throws AnalysisException {
fnName_.analyze(analyzer);
+ if (!fnName_.isBuiltin()) {
+ FrontendProfile profile = FrontendProfile.getCurrent();
+ String udfInfoStringKey = "User Defined Functions (UDFs)";
+ String functionName = fnName_.toString();
+ if (!profile.getInfoString(udfInfoStringKey).contains(functionName)) {
+ profile.appendInfoString(udfInfoStringKey, functionName);
+ }
+ }
if (isMergeAggFn()) {
// This is the function call expr after splitting up to a merge aggregation.
diff --git a/fe/src/main/java/org/apache/impala/service/FrontendProfile.java b/fe/src/main/java/org/apache/impala/service/FrontendProfile.java
index 3344bf6..8a471cc 100644
--- a/fe/src/main/java/org/apache/impala/service/FrontendProfile.java
+++ b/fe/src/main/java/org/apache/impala/service/FrontendProfile.java
@@ -127,6 +127,33 @@ public class FrontendProfile {
}
/**
+ * Appends an informational key/value string pair to the profile. These are written out
+ * as is to the user. Values are appended to a comma separated list of values.
+ */
+ public synchronized void appendInfoString(String key, String val) {
+ Preconditions.checkState(profile_ != null, "already emitted profile");
+ Preconditions.checkNotNull(key);
+ Preconditions.checkNotNull(val);
+ Map<String, String> info_strings = profile_.getInfo_strings();
+ if (info_strings.containsKey(key)) {
+ info_strings.put(key, info_strings.get(key) + ", " + val);
+ } else {
+ info_strings.put(key, val);
+ profile_.getInfo_strings_display_order().add(key);
+ }
+ }
+
+ /**
+ * Returns the info string associated with the given key. Returns an empty String if
+ * the key does not exist.
+ */
+ public synchronized String getInfoString(String key) {
+ Preconditions.checkState(profile_ != null, "already emitted profile");
+ Preconditions.checkNotNull(key);
+ return profile_.getInfo_strings().getOrDefault(key, "");
+ }
+
+ /**
* Add 'delta' to the counter with the given name and unit. Counters are created
* on-demand.
*/
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
index 73dae60..98c5576 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
@@ -2170,7 +2170,8 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
"aggregation without a FROM clause is not allowed");
AnalysisError(
"select aggfn(int_col) over (partition by int_col) from functional.alltypesagg",
- "Aggregate function 'default.aggfn(int_col)' not supported with OVER clause.");
+ "Aggregate function 'default.aggfn(int_col) /* NATIVE UDF */' not supported " +
+ "with OVER clause.");
AnalysisError("select aggfn(distinct int_col) from functional.alltypesagg",
"User defined aggregates do not support DISTINCT.");
AnalyzesOk("select default.aggfn(int_col) from functional.alltypes");
@@ -2228,7 +2229,7 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
"aggregate function must not contain analytic parameters");
AnalysisError("select min(aggfn(int_col)) from functional.alltypes",
"aggregate function must not contain aggregate parameters: " +
- "min(default.aggfn(int_col))");
+ "min(default.aggfn(int_col) /* NATIVE UDF */)");
// wrong type
AnalysisError("select sum(timestamp_col) from functional.alltypes",
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeSubqueriesTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeSubqueriesTest.java
index c490450..2613b3c 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeSubqueriesTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeSubqueriesTest.java
@@ -1295,7 +1295,7 @@ public class AnalyzeSubqueriesTest extends AnalyzerTest {
AnalysisError("select * from functional.alltypesagg g where " +
"(select aggfn(int_col) from functional.alltypes s where " +
"s.id = g.id) = 10", "UDAs are not supported in the select list of " +
- "correlated subqueries: (SELECT default.aggfn(int_col) FROM " +
+ "correlated subqueries: (SELECT default.aggfn(int_col) /* NATIVE UDF */ FROM " +
"functional.alltypes s WHERE s.id = g.id)");
AnalyzesOk("select * from functional.alltypesagg g where " +
"(select aggfn(int_col) from functional.alltypes s where " +
diff --git a/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java b/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java
index 1087b19..980dca4 100644
--- a/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java
+++ b/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java
@@ -55,6 +55,7 @@ import org.apache.impala.catalog.Table;
import org.apache.impala.catalog.Type;
import org.apache.impala.service.FeCatalogManager;
import org.apache.impala.service.Frontend;
+import org.apache.impala.service.FrontendProfile;
import org.apache.impala.testutil.ImpaladTestCatalog;
import org.apache.impala.thrift.TAccessEvent;
import org.apache.impala.thrift.TQueryOptions;
@@ -241,20 +242,24 @@ public class FrontendTestBase extends AbstractFrontendTest {
* If 'expectedWarning' is not null, asserts that a warning is produced.
*/
public ParseNode AnalyzesOk(String stmt, AnalysisContext ctx, String expectedWarning) {
- return feFixture_.analyzeStmt(stmt, ctx, expectedWarning);
+ try (FrontendProfile.Scope scope = FrontendProfile.createNewWithScope()) {
+ return feFixture_.analyzeStmt(stmt, ctx, expectedWarning);
+ }
}
/**
* Analyzes the given statement without performing rewrites or authorization.
*/
public StatementBase AnalyzesOkNoRewrite(StatementBase stmt) throws ImpalaException {
- AnalysisContext ctx = createAnalysisCtx();
- StmtMetadataLoader mdLoader =
- new StmtMetadataLoader(frontend_, ctx.getQueryCtx().session.database, null);
- StmtTableCache loadedTables = mdLoader.loadTables(stmt);
- Analyzer analyzer = ctx.createAnalyzer(loadedTables);
- stmt.analyze(analyzer);
- return stmt;
+ try (FrontendProfile.Scope scope = FrontendProfile.createNewWithScope()) {
+ AnalysisContext ctx = createAnalysisCtx();
+ StmtMetadataLoader mdLoader =
+ new StmtMetadataLoader(frontend_, ctx.getQueryCtx().session.database, null);
+ StmtTableCache loadedTables = mdLoader.loadTables(stmt);
+ Analyzer analyzer = ctx.createAnalyzer(loadedTables);
+ stmt.analyze(analyzer);
+ return stmt;
+ }
}
/**
@@ -307,12 +312,14 @@ public class FrontendTestBase extends AbstractFrontendTest {
protected AnalysisResult parseAndAnalyze(String stmt, AnalysisContext ctx, Frontend fe)
throws ImpalaException {
- ctx.getQueryCtx().getClient_request().setStmt(stmt);
- StatementBase parsedStmt = Parser.parse(stmt, ctx.getQueryOptions());
- StmtMetadataLoader mdLoader =
- new StmtMetadataLoader(fe, ctx.getQueryCtx().session.database, null);
- StmtTableCache stmtTableCache = mdLoader.loadTables(parsedStmt);
- return ctx.analyzeAndAuthorize(parsedStmt, stmtTableCache, fe.getAuthzChecker());
+ try (FrontendProfile.Scope scope = FrontendProfile.createNewWithScope()) {
+ ctx.getQueryCtx().getClient_request().setStmt(stmt);
+ StatementBase parsedStmt = Parser.parse(stmt, ctx.getQueryOptions());
+ StmtMetadataLoader mdLoader =
+ new StmtMetadataLoader(fe, ctx.getQueryCtx().session.database, null);
+ StmtTableCache stmtTableCache = mdLoader.loadTables(parsedStmt);
+ return ctx.analyzeAndAuthorize(parsedStmt, stmtTableCache, fe.getAuthzChecker());
+ }
}
/**
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/sort-expr-materialization.test b/testdata/workloads/functional-planner/queries/PlannerTest/sort-expr-materialization.test
index 005b709..5cb6b35 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/sort-expr-materialization.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/sort-expr-materialization.test
@@ -180,8 +180,8 @@ PLAN-ROOT SINK
| mem-estimate=0B mem-reservation=0B thread-reservation=0
|
01:SORT
-| order by: default.testfn(double_col) ASC
-| materialized: default.testfn(double_col)
+| order by: default.testfn(double_col) /* NATIVE UDF */ ASC
+| materialized: default.testfn(double_col) /* NATIVE UDF */
| mem-estimate=12.00MB mem-reservation=12.00MB spill-buffer=2.00MB thread-reservation=0
| tuple-ids=1 row-size=93B cardinality=7.30K
| in pipelines: 01(GETNEXT), 00(OPEN)
@@ -253,8 +253,8 @@ PLAN-ROOT SINK
| in pipelines: 01(GETNEXT)
|
01:SORT
-| order by: default.testfn(double_col) ASC NULLS LAST, random() ASC
-| materialized: default.testfn(double_col), random()
+| order by: default.testfn(double_col) /* NATIVE UDF */ ASC NULLS LAST, random() ASC
+| materialized: default.testfn(double_col) /* NATIVE UDF */, random()
| mem-estimate=6.00MB mem-reservation=6.00MB spill-buffer=2.00MB thread-reservation=0
| tuple-ids=6 row-size=24B cardinality=8
| in pipelines: 01(GETNEXT), 00(OPEN)
diff --git a/tests/query_test/test_udfs.py b/tests/query_test/test_udfs.py
index 8572db8..3d73918 100644
--- a/tests/query_test/test_udfs.py
+++ b/tests/query_test/test_udfs.py
@@ -17,6 +17,7 @@
from copy import copy
import os
+import re
import pytest
import tempfile
from subprocess import call, check_call
@@ -614,3 +615,20 @@ class TestUdfTargeted(TestUdfBase):
results = self.client.fetch(query, handle, -1)
assert results.success
assert len(results.data) == 9999
+
+ def test_udf_profile(self, vector, unique_database):
+ """Test to validate that explain plans and runtime profiles contain information about
+ any custom UDFs used in an Impala query."""
+ self.client.execute(
+ "create function {0}.hive_substring(string, int) returns string location '{1}' "
+ "symbol='org.apache.hadoop.hive.ql.udf.UDFSubstr'".format(
+ unique_database, get_fs_path('/test-warehouse/hive-exec.jar')))
+ profile = self.execute_query_expect_success(self.client,
+ "select {0}.hive_substring(string_col, 1), {0}.hive_substring(string_col, 2) "
+ "from functional.alltypes limit 10".format(unique_database)).runtime_profile
+
+ assert re.search("output exprs.*hive_substring.*/\* JAVA UDF \*/", profile)
+ # Ensure that hive_substring only shows up once in the list of UDFs.
+ assert re.search(
+ "User Defined Functions \(UDFs\): {0}\.hive_substring\s*[\r\n]".format(
+ unique_database), profile)