You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/10/17 20:16:34 UTC
[impala] 02/02: IMPALA-9053: DDLs should generate lineage graphs.
This is an automated email from the ASF dual-hosted git repository.
tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit b64efa76f7f0f46462400811ed3dcc878fac768d
Author: Anurag Mantripragada <an...@cloudera.com>
AuthorDate: Tue Oct 15 16:46:22 2019 -0700
IMPALA-9053: DDLs should generate lineage graphs.
DDLs like 'create table' should generate minimal lineage graphs so
that consumers like Atlas can use information like 'queryText' to
establish lineages.
This change adds a call to the computeLineageGraph() method during
analysis phase of createTable which populates the graph with basic
information like queryText. If it is a CTAS, this graph is enhanced
in the "insert" phase with dependencies.
Testing:
Add an EE test to verify lineage information and also to check it
is flushed to disk properly.
Change-Id: Ia6c7ed9fe3265fd777fe93590cf4eb2d9ba0dd1e
Reviewed-on: http://gerrit.cloudera.org:8080/14458
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
.../apache/impala/analysis/ColumnLineageGraph.java | 7 +++++--
.../apache/impala/analysis/CreateTableStmt.java | 17 +++++++++++++++
.../queries/QueryTest/lineage.test | 15 ++++++++++++++
tests/custom_cluster/test_lineage.py | 24 ++++++++++++++++++++++
4 files changed, 61 insertions(+), 2 deletions(-)
diff --git a/fe/src/main/java/org/apache/impala/analysis/ColumnLineageGraph.java b/fe/src/main/java/org/apache/impala/analysis/ColumnLineageGraph.java
index 6285af6..ec401b3 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ColumnLineageGraph.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ColumnLineageGraph.java
@@ -536,8 +536,11 @@ public class ColumnLineageGraph {
*/
public void computeLineageGraph(List<Expr> resultExprs, Analyzer rootAnalyzer) {
init(rootAnalyzer);
- computeProjectionDependencies(resultExprs, rootAnalyzer);
- computeResultPredicateDependencies(rootAnalyzer);
+ // Compute the dependencies only if result expressions are available.
+ if (resultExprs != null && !resultExprs.isEmpty()) {
+ computeProjectionDependencies(resultExprs, rootAnalyzer);
+ computeResultPredicateDependencies(rootAnalyzer);
+ }
}
/**
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
index 0a1ea77..d768cfc 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
@@ -28,6 +28,8 @@ import org.apache.impala.catalog.KuduTable;
import org.apache.impala.catalog.RowFormat;
import org.apache.impala.common.AnalysisException;
import org.apache.impala.common.ImpalaRuntimeException;
+import org.apache.impala.common.RuntimeEnv;
+import org.apache.impala.service.BackendConfig;
import org.apache.impala.thrift.TCreateTableParams;
import org.apache.impala.thrift.THdfsFileFormat;
import org.apache.impala.thrift.TSortingOrder;
@@ -214,6 +216,21 @@ public class CreateTableStmt extends StatementBase {
}
AvroSchemaUtils.setFromSerdeComment(getColumnDefs());
}
+
+ // If lineage logging is enabled, compute minimal lineage graph.
+ if (BackendConfig.INSTANCE.getComputeLineage() || RuntimeEnv.INSTANCE.isTestEnv()) {
+ computeLineageGraph(analyzer);
+ }
+ }
+
+ /**
+ * Computes a minimal column lineage graph for create statement. This will just
+ * populate a few fields of the graph including query text. If this is a CTAS,
+ * the graph is enhanced during the "insert" phase of CTAS.
+ */
+ protected void computeLineageGraph(Analyzer analyzer) {
+ ColumnLineageGraph graph = analyzer.getColumnLineageGraph();
+ graph.computeLineageGraph(new ArrayList(), analyzer);
}
/**
diff --git a/testdata/workloads/functional-query/queries/QueryTest/lineage.test b/testdata/workloads/functional-query/queries/QueryTest/lineage.test
index 2335d5a..8f24ad2 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/lineage.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/lineage.test
@@ -17,6 +17,21 @@ create table lineage_test_db.alltypessmall like functional.alltypessmall
---- QUERY
create table lineage_test_db.alltypesinsert like functional.alltypesinsert
====
+---- LINEAGE
+{
+ "hash":"b0d53e4deafb2467c4108c17667653b5",
+ "timestamp":1571178583,
+ "vertices":[],
+ "edges":[],
+ "queryId":"524cc93f26a86671:e8455a9500000000",
+ "user":"anurag",
+ "queryText":"create table lineage_test_db.foo (id int)",
+ "endTime":1571178584
+}
+---- QUERY
+# Test lineage is created with queryText populated for DDLs.
+create table lineage_test_db.foo (id int)
+====
---- QUERY
create view lineage_test_db.alltypes_view as select * from lineage_test_db.alltypes
====
diff --git a/tests/custom_cluster/test_lineage.py b/tests/custom_cluster/test_lineage.py
index 33b3e99..71e61e4 100644
--- a/tests/custom_cluster/test_lineage.py
+++ b/tests/custom_cluster/test_lineage.py
@@ -34,6 +34,7 @@ LOG = logging.getLogger(__name__)
class TestLineage(CustomClusterTestSuite):
START_END_TIME_LINEAGE_LOG_DIR = tempfile.mkdtemp(prefix="start_end_time")
CREATE_TABLE_TIME_LINEAGE_LOG_DIR = tempfile.mkdtemp(prefix="create_table_time")
+ DDL_LINEAGE_LOG_DIR = tempfile.mkdtemp(prefix="ddl_lineage")
LINEAGE_TESTS_DIR = tempfile.mkdtemp(prefix="test_lineage")
@classmethod
@@ -107,6 +108,29 @@ class TestLineage(CustomClusterTestSuite):
assert "{0}.lineage_test_tbl".format(unique_database) == table_name
assert table_create_time != -1
+ @pytest.mark.execute_serially
+ @CustomClusterTestSuite.with_args("--lineage_event_log_dir={0}"
+ .format(DDL_LINEAGE_LOG_DIR))
+ def test_ddl_lineage(self, unique_database):
+ """ Test that DDLs like 'create table' have query text populated in the lineage
+ graph."""
+ query = "create external table {0}.ddl_lineage_tbl (id int)".format(unique_database)
+ result = self.execute_query_expect_success(self.client, query)
+ profile_query_id = re.search("Query \(id=(.*)\):", result.runtime_profile).group(1)
+
+ # Wait to flush the lineage log files.
+ time.sleep(3)
+
+ for log_filename in os.listdir(self.DDL_LINEAGE_LOG_DIR):
+ log_path = os.path.join(self.DDL_LINEAGE_LOG_DIR, log_filename)
+ # Only the coordinator's log file will be populated.
+ if os.path.getsize(log_path) > 0:
+ with open(log_path) as log_file:
+ lineage_json = json.load(log_file)
+ assert lineage_json["queryId"] == profile_query_id
+ assert lineage_json["queryText"] is not None
+ assert lineage_json["queryText"] == query
+
@SkipIfS3.hbase
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args("--lineage_event_log_dir={0}"