You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/10/17 20:16:32 UTC

[impala] branch master updated (b0c6740 -> b64efa7)

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from b0c6740  IMPALA-8998: admission control accounting for mt_dop
     new dec917d  Fix webserver's use of sq_printf/sq_write
     new b64efa7  IMPALA-9053: DDLs should generate lineage graphs.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/util/webserver.cc                           | 25 ++++++++++++++--------
 .../apache/impala/analysis/ColumnLineageGraph.java |  7 ++++--
 .../apache/impala/analysis/CreateTableStmt.java    | 17 +++++++++++++++
 .../queries/QueryTest/lineage.test                 | 15 +++++++++++++
 tests/custom_cluster/test_lineage.py               | 24 +++++++++++++++++++++
 5 files changed, 77 insertions(+), 11 deletions(-)


[impala] 01/02: Fix webserver's use of sq_printf/sq_write

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit dec917d72a0adaa8909139f0827b95cc1081b74e
Author: Thomas Tauber-Marshall <tm...@cloudera.com>
AuthorDate: Wed Oct 16 15:50:15 2019 -0700

    Fix webserver's use of sq_printf/sq_write
    
    Previously, the webserver used multiple calls to sq_printf and
    sq_write when sending most responses.
    
    This can lead to a bad interaction between Nagle's algorithm and TCP
    delayed acks which can add significant latency to RTT.
    
    This patch modifies the SendResponse() function to buffer the entire
    response and send it in a single sq_write call.
    
    Testing:
    - Ran all existing webserver tests.
    
    Change-Id: I343ae83d6324bc710e4cf96d66975a7c9694706f
    Reviewed-on: http://gerrit.cloudera.org:8080/14471
    Reviewed-by: Adar Dembo <ad...@cloudera.com>
    Reviewed-by: Thomas Tauber-Marshall <tm...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/util/webserver.cc | 25 ++++++++++++++++---------
 1 file changed, 16 insertions(+), 9 deletions(-)

diff --git a/be/src/util/webserver.cc b/be/src/util/webserver.cc
index 9952a30..a468659 100644
--- a/be/src/util/webserver.cc
+++ b/be/src/util/webserver.cc
@@ -133,6 +133,8 @@ static const char* COMMON_JSON_KEY = "__common__";
 // handler.
 static const char* ERROR_KEY = "__error_msg__";
 
+static const char* CRLF = "\r\n";
+
 // Returns $IMPALA_HOME if set, otherwise /tmp/impala_www
 const char* GetDefaultDocumentRoot() {
   stringstream ss;
@@ -178,19 +180,24 @@ string HttpStatusCodeToString(HttpStatusCode code) {
 }
 
 void SendResponse(struct sq_connection* connection, const string& response_code_line,
-    const string& context_type, const string& content,
+    const string& content_type, const string& content,
     const vector<string>& header_lines) {
-  sq_printf(connection, "HTTP/1.1 %s\r\n", response_code_line.c_str());
+  // Buffer the output and send it in a single call to sq_write in order to avoid
+  // triggering an interaction between Nagle's algorithm and TCP delayed acks.
+  std::ostringstream oss;
+  oss << "HTTP/1.1 " << response_code_line << CRLF;
   for (const auto& h : header_lines) {
-    sq_printf(connection, "%s\r\n", h.c_str());
+    oss << h << CRLF;
   }
-  sq_printf(connection,
-      "X-Frame-Options: %s\r\n"
-      "Content-Type: %s\r\n"
-      "Content-Length: %zd\r\n\r\n",
-      FLAGS_webserver_x_frame_options.c_str(), context_type.c_str(), content.size());
+  oss << "X-Frame-Options: " << FLAGS_webserver_x_frame_options << CRLF;
+  oss << "Content-Type: " << content_type << CRLF;
+  oss << "Content-Length: " << content.size() << CRLF;
+  oss << CRLF;
+  oss << content;
+
   // Make sure to use sq_write for printing the body; sq_printf truncates at 8kb
-  sq_write(connection, content.c_str(), content.length());
+  string output = oss.str();
+  sq_write(connection, output.c_str(), output.length());
 }
 
 // Return the address of the remote user from the squeasel request info.


[impala] 02/02: IMPALA-9053: DDLs should generate lineage graphs.

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit b64efa76f7f0f46462400811ed3dcc878fac768d
Author: Anurag Mantripragada <an...@cloudera.com>
AuthorDate: Tue Oct 15 16:46:22 2019 -0700

    IMPALA-9053: DDLs should generate lineage graphs.
    
    DDLs like 'create table' should generate minimal lineage graphs so
    that consumers like Atlas can use information like 'queryText' to
    establish lineages.
    
    This change adds a call to the computeLineageGraph() method during
    analysis phase of createTable which populates the graph with basic
    information like queryText. If it is a CTAS, this graph is enhanced
    in the "insert" phase with dependencies.
    
    Testing:
    Add an EE test to verify lineage information and also to check it
    is flushed to disk properly.
    
    Change-Id: Ia6c7ed9fe3265fd777fe93590cf4eb2d9ba0dd1e
    Reviewed-on: http://gerrit.cloudera.org:8080/14458
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../apache/impala/analysis/ColumnLineageGraph.java |  7 +++++--
 .../apache/impala/analysis/CreateTableStmt.java    | 17 +++++++++++++++
 .../queries/QueryTest/lineage.test                 | 15 ++++++++++++++
 tests/custom_cluster/test_lineage.py               | 24 ++++++++++++++++++++++
 4 files changed, 61 insertions(+), 2 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/analysis/ColumnLineageGraph.java b/fe/src/main/java/org/apache/impala/analysis/ColumnLineageGraph.java
index 6285af6..ec401b3 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ColumnLineageGraph.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ColumnLineageGraph.java
@@ -536,8 +536,11 @@ public class ColumnLineageGraph {
    */
   public void computeLineageGraph(List<Expr> resultExprs, Analyzer rootAnalyzer) {
     init(rootAnalyzer);
-    computeProjectionDependencies(resultExprs, rootAnalyzer);
-    computeResultPredicateDependencies(rootAnalyzer);
+    // Compute the dependencies only if result expressions are available.
+    if (resultExprs != null && !resultExprs.isEmpty()) {
+      computeProjectionDependencies(resultExprs, rootAnalyzer);
+      computeResultPredicateDependencies(rootAnalyzer);
+    }
   }
 
   /**
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
index 0a1ea77..d768cfc 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
@@ -28,6 +28,8 @@ import org.apache.impala.catalog.KuduTable;
 import org.apache.impala.catalog.RowFormat;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.ImpalaRuntimeException;
+import org.apache.impala.common.RuntimeEnv;
+import org.apache.impala.service.BackendConfig;
 import org.apache.impala.thrift.TCreateTableParams;
 import org.apache.impala.thrift.THdfsFileFormat;
 import org.apache.impala.thrift.TSortingOrder;
@@ -214,6 +216,21 @@ public class CreateTableStmt extends StatementBase {
       }
       AvroSchemaUtils.setFromSerdeComment(getColumnDefs());
     }
+
+    // If lineage logging is enabled, compute minimal lineage graph.
+    if (BackendConfig.INSTANCE.getComputeLineage() || RuntimeEnv.INSTANCE.isTestEnv()) {
+       computeLineageGraph(analyzer);
+    }
+  }
+
+  /**
+   * Computes a minimal column lineage graph for create statement. This will just
+   * populate a few fields of the graph including query text. If this is a CTAS,
+   * the graph is enhanced during the "insert" phase of CTAS.
+   */
+  protected void computeLineageGraph(Analyzer analyzer) {
+    ColumnLineageGraph graph = analyzer.getColumnLineageGraph();
+    graph.computeLineageGraph(new ArrayList(), analyzer);
   }
 
   /**
diff --git a/testdata/workloads/functional-query/queries/QueryTest/lineage.test b/testdata/workloads/functional-query/queries/QueryTest/lineage.test
index 2335d5a..8f24ad2 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/lineage.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/lineage.test
@@ -17,6 +17,21 @@ create table lineage_test_db.alltypessmall like functional.alltypessmall
 ---- QUERY
 create table lineage_test_db.alltypesinsert like functional.alltypesinsert
 ====
+---- LINEAGE
+{
+  "hash":"b0d53e4deafb2467c4108c17667653b5",
+  "timestamp":1571178583,
+  "vertices":[],
+  "edges":[],
+  "queryId":"524cc93f26a86671:e8455a9500000000",
+  "user":"anurag",
+  "queryText":"create table lineage_test_db.foo (id int)",
+  "endTime":1571178584
+}
+---- QUERY
+# Test lineage is created with queryText populated for DDLs.
+create table lineage_test_db.foo (id int)
+====
 ---- QUERY
 create view lineage_test_db.alltypes_view as select * from lineage_test_db.alltypes
 ====
diff --git a/tests/custom_cluster/test_lineage.py b/tests/custom_cluster/test_lineage.py
index 33b3e99..71e61e4 100644
--- a/tests/custom_cluster/test_lineage.py
+++ b/tests/custom_cluster/test_lineage.py
@@ -34,6 +34,7 @@ LOG = logging.getLogger(__name__)
 class TestLineage(CustomClusterTestSuite):
   START_END_TIME_LINEAGE_LOG_DIR = tempfile.mkdtemp(prefix="start_end_time")
   CREATE_TABLE_TIME_LINEAGE_LOG_DIR = tempfile.mkdtemp(prefix="create_table_time")
+  DDL_LINEAGE_LOG_DIR = tempfile.mkdtemp(prefix="ddl_lineage")
   LINEAGE_TESTS_DIR = tempfile.mkdtemp(prefix="test_lineage")
 
   @classmethod
@@ -107,6 +108,29 @@ class TestLineage(CustomClusterTestSuite):
               assert "{0}.lineage_test_tbl".format(unique_database) == table_name
               assert table_create_time != -1
 
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args("--lineage_event_log_dir={0}"
+                                    .format(DDL_LINEAGE_LOG_DIR))
+  def test_ddl_lineage(self, unique_database):
+    """ Test that DDLs like 'create table' have query text populated in the lineage
+    graph."""
+    query = "create external table {0}.ddl_lineage_tbl (id int)".format(unique_database)
+    result = self.execute_query_expect_success(self.client, query)
+    profile_query_id = re.search("Query \(id=(.*)\):", result.runtime_profile).group(1)
+
+    # Wait to flush the lineage log files.
+    time.sleep(3)
+
+    for log_filename in os.listdir(self.DDL_LINEAGE_LOG_DIR):
+      log_path = os.path.join(self.DDL_LINEAGE_LOG_DIR, log_filename)
+      # Only the coordinator's log file will be populated.
+      if os.path.getsize(log_path) > 0:
+        with open(log_path) as log_file:
+          lineage_json = json.load(log_file)
+          assert lineage_json["queryId"] == profile_query_id
+          assert lineage_json["queryText"] is not None
+          assert lineage_json["queryText"] == query
+
   @SkipIfS3.hbase
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args("--lineage_event_log_dir={0}"