You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/10/17 20:16:34 UTC

[impala] 02/02: IMPALA-9053: DDLs should generate lineage graphs.

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit b64efa76f7f0f46462400811ed3dcc878fac768d
Author: Anurag Mantripragada <an...@cloudera.com>
AuthorDate: Tue Oct 15 16:46:22 2019 -0700

    IMPALA-9053: DDLs should generate lineage graphs.
    
    DDLs like 'create table' should generate minimal lineage graphs so
    that consumers like Atlas can use information like 'queryText' to
    establish lineages.
    
    This change adds a call to the computeLineageGraph() method during
    analysis phase of createTable which populates the graph with basic
    information like queryText. If it is a CTAS, this graph is enhanced
    in the "insert" phase with dependencies.
    
    Testing:
    Add an EE test to verify lineage information and also to check it
    is flushed to disk properly.
    
    Change-Id: Ia6c7ed9fe3265fd777fe93590cf4eb2d9ba0dd1e
    Reviewed-on: http://gerrit.cloudera.org:8080/14458
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../apache/impala/analysis/ColumnLineageGraph.java |  7 +++++--
 .../apache/impala/analysis/CreateTableStmt.java    | 17 +++++++++++++++
 .../queries/QueryTest/lineage.test                 | 15 ++++++++++++++
 tests/custom_cluster/test_lineage.py               | 24 ++++++++++++++++++++++
 4 files changed, 61 insertions(+), 2 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/analysis/ColumnLineageGraph.java b/fe/src/main/java/org/apache/impala/analysis/ColumnLineageGraph.java
index 6285af6..ec401b3 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ColumnLineageGraph.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ColumnLineageGraph.java
@@ -536,8 +536,11 @@ public class ColumnLineageGraph {
    */
   public void computeLineageGraph(List<Expr> resultExprs, Analyzer rootAnalyzer) {
     init(rootAnalyzer);
-    computeProjectionDependencies(resultExprs, rootAnalyzer);
-    computeResultPredicateDependencies(rootAnalyzer);
+    // Compute the dependencies only if result expressions are available.
+    if (resultExprs != null && !resultExprs.isEmpty()) {
+      computeProjectionDependencies(resultExprs, rootAnalyzer);
+      computeResultPredicateDependencies(rootAnalyzer);
+    }
   }
 
   /**
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
index 0a1ea77..d768cfc 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
@@ -28,6 +28,8 @@ import org.apache.impala.catalog.KuduTable;
 import org.apache.impala.catalog.RowFormat;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.ImpalaRuntimeException;
+import org.apache.impala.common.RuntimeEnv;
+import org.apache.impala.service.BackendConfig;
 import org.apache.impala.thrift.TCreateTableParams;
 import org.apache.impala.thrift.THdfsFileFormat;
 import org.apache.impala.thrift.TSortingOrder;
@@ -214,6 +216,21 @@ public class CreateTableStmt extends StatementBase {
       }
       AvroSchemaUtils.setFromSerdeComment(getColumnDefs());
     }
+
+    // If lineage logging is enabled, compute minimal lineage graph.
+    if (BackendConfig.INSTANCE.getComputeLineage() || RuntimeEnv.INSTANCE.isTestEnv()) {
+       computeLineageGraph(analyzer);
+    }
+  }
+
+  /**
+   * Computes a minimal column lineage graph for create statement. This will just
+   * populate a few fields of the graph including query text. If this is a CTAS,
+   * the graph is enhanced during the "insert" phase of CTAS.
+   */
+  protected void computeLineageGraph(Analyzer analyzer) {
+    ColumnLineageGraph graph = analyzer.getColumnLineageGraph();
+    graph.computeLineageGraph(new ArrayList(), analyzer);
   }
 
   /**
diff --git a/testdata/workloads/functional-query/queries/QueryTest/lineage.test b/testdata/workloads/functional-query/queries/QueryTest/lineage.test
index 2335d5a..8f24ad2 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/lineage.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/lineage.test
@@ -17,6 +17,21 @@ create table lineage_test_db.alltypessmall like functional.alltypessmall
 ---- QUERY
 create table lineage_test_db.alltypesinsert like functional.alltypesinsert
 ====
+---- LINEAGE
+{
+  "hash":"b0d53e4deafb2467c4108c17667653b5",
+  "timestamp":1571178583,
+  "vertices":[],
+  "edges":[],
+  "queryId":"524cc93f26a86671:e8455a9500000000",
+  "user":"anurag",
+  "queryText":"create table lineage_test_db.foo (id int)",
+  "endTime":1571178584
+}
+---- QUERY
+# Test lineage is created with queryText populated for DDLs.
+create table lineage_test_db.foo (id int)
+====
 ---- QUERY
 create view lineage_test_db.alltypes_view as select * from lineage_test_db.alltypes
 ====
diff --git a/tests/custom_cluster/test_lineage.py b/tests/custom_cluster/test_lineage.py
index 33b3e99..71e61e4 100644
--- a/tests/custom_cluster/test_lineage.py
+++ b/tests/custom_cluster/test_lineage.py
@@ -34,6 +34,7 @@ LOG = logging.getLogger(__name__)
 class TestLineage(CustomClusterTestSuite):
   START_END_TIME_LINEAGE_LOG_DIR = tempfile.mkdtemp(prefix="start_end_time")
   CREATE_TABLE_TIME_LINEAGE_LOG_DIR = tempfile.mkdtemp(prefix="create_table_time")
+  DDL_LINEAGE_LOG_DIR = tempfile.mkdtemp(prefix="ddl_lineage")
   LINEAGE_TESTS_DIR = tempfile.mkdtemp(prefix="test_lineage")
 
   @classmethod
@@ -107,6 +108,29 @@ class TestLineage(CustomClusterTestSuite):
               assert "{0}.lineage_test_tbl".format(unique_database) == table_name
               assert table_create_time != -1
 
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args("--lineage_event_log_dir={0}"
+                                    .format(DDL_LINEAGE_LOG_DIR))
+  def test_ddl_lineage(self, unique_database):
+    """ Test that DDLs like 'create table' have query text populated in the lineage
+    graph."""
+    query = "create external table {0}.ddl_lineage_tbl (id int)".format(unique_database)
+    result = self.execute_query_expect_success(self.client, query)
+    profile_query_id = re.search("Query \(id=(.*)\):", result.runtime_profile).group(1)
+
+    # Wait to flush the lineage log files.
+    time.sleep(3)
+
+    for log_filename in os.listdir(self.DDL_LINEAGE_LOG_DIR):
+      log_path = os.path.join(self.DDL_LINEAGE_LOG_DIR, log_filename)
+      # Only the coordinator's log file will be populated.
+      if os.path.getsize(log_path) > 0:
+        with open(log_path) as log_file:
+          lineage_json = json.load(log_file)
+          assert lineage_json["queryId"] == profile_query_id
+          assert lineage_json["queryText"] is not None
+          assert lineage_json["queryText"] == query
+
   @SkipIfS3.hbase
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args("--lineage_event_log_dir={0}"