You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/02/20 16:57:33 UTC

[1/2] impala git commit: KUDU-2004. Undefined behavior in TlsSocket::Writev()

Repository: impala
Updated Branches:
  refs/heads/master 62d8462e1 -> ac86e9d93


KUDU-2004. Undefined behavior in TlsSocket::Writev()

TlsSocket::Writev() was attempting to use the value of nwritten from
TlsSocket::Write(), but in the case of an error that value was never
set or initialized. A simple check to make sure the result from
TlsSocket::Write() wasn't an error was added, otherwise we break out of
the write loop to cleanup and return the error (thus skipping the line
that uses nwritten)

Dist job result from before the fix:
http://dist-test.cloudera.org/job?job_id=efan.1496860112.16151

Dist job result from after the fix:
http://dist-test.cloudera.org/job?job_id=efan.1497036430.19311

Change-Id: Ia5b5bbb3fd2ec8fcd1a48873446f3aa09546eaac
Reviewed-on: http://gerrit.cloudera.org:8080/7141
Tested-by: Kudu Jenkins
Reviewed-by: Mike Percy <mp...@apache.org>
Reviewed-on: http://gerrit.cloudera.org:8080/9359
Reviewed-by: Sailesh Mukil <sa...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/c3a8560c
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/c3a8560c
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/c3a8560c

Branch: refs/heads/master
Commit: c3a8560c384c1cc60bd1728277536d263c2ac0eb
Parents: 62d8462
Author: Edward Fancher <ef...@cloudera.com>
Authored: Fri Jun 9 13:38:53 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Tue Feb 20 09:03:23 2018 +0000

----------------------------------------------------------------------
 be/src/kudu/security/tls_socket.cc | 3 +++
 be/src/kudu/util/net/socket.h      | 7 +++++++
 2 files changed, 10 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/c3a8560c/be/src/kudu/security/tls_socket.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/security/tls_socket.cc b/be/src/kudu/security/tls_socket.cc
index dbe5c68..7aeca31 100644
--- a/be/src/kudu/security/tls_socket.cc
+++ b/be/src/kudu/security/tls_socket.cc
@@ -77,6 +77,9 @@ Status TlsSocket::Writev(const struct ::iovec *iov, int iov_len, int32_t *nwritt
     int32_t frame_size = iov[i].iov_len;
     // Don't return before unsetting TCP_CORK.
     write_status = Write(static_cast<uint8_t*>(iov[i].iov_base), frame_size, nwritten);
+    if (!write_status.ok()) break;
+
+    // nwritten should have the correct amount written.
     total_written += *nwritten;
     if (*nwritten < frame_size) break;
   }

http://git-wip-us.apache.org/repos/asf/impala/blob/c3a8560c/be/src/kudu/util/net/socket.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/net/socket.h b/be/src/kudu/util/net/socket.h
index ce5b7bb..7c04ff6 100644
--- a/be/src/kudu/util/net/socket.h
+++ b/be/src/kudu/util/net/socket.h
@@ -120,8 +120,15 @@ class Socket {
   // get the error status using getsockopt(2)
   Status GetSockError() const;
 
+  // Write up to 'amt' bytes from 'buf' to the socket. The number of bytes
+  // actually written will be stored in 'nwritten'. If an error is returned,
+  // the value of 'nwritten' is undefined.
   virtual Status Write(const uint8_t *buf, int32_t amt, int32_t *nwritten);
 
+  // Vectorized Write.
+  // If there is an error, that error needs to be resolved before calling again.
+  // If there was no error, but not all the bytes were written, the unwritten
+  // bytes must be retried. See writev(2) for more information.
   virtual Status Writev(const struct ::iovec *iov, int iov_len, int32_t *nwritten);
 
   // Blocking Write call, returns IOError unless full buffer is sent.


[2/2] impala git commit: IMPALA-6497: add "Last row fetched" and AC events

Posted by ta...@apache.org.
IMPALA-6497: add "Last row fetched" and AC events

This makes it more observable that all rows were returned to the client
and also that resources were released for admission control.

Testing:
Manually inspected some query profiles.

Added a basic observability test that ensures that the expected events
appear in the profile. Ran it in a loop for a bit to make sure it wasn't
flaky.

Change-Id: I32a707e4660061e75c86ad967f1fac6f6737da7e
Reviewed-on: http://gerrit.cloudera.org:8080/9271
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/ac86e9d9
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/ac86e9d9
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/ac86e9d9

Branch: refs/heads/master
Commit: ac86e9d931fa6e5d273e51c26cd7c1d1ad2cc6ce
Parents: c3a8560
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Fri Feb 9 12:00:54 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Tue Feb 20 09:32:38 2018 +0000

----------------------------------------------------------------------
 be/src/runtime/coordinator.cc          |  4 ++-
 tests/query_test/test_observability.py | 56 ++++++++++++++++++++++++-----
 2 files changed, 50 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/ac86e9d9/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 7973775..b69b848 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -863,6 +863,7 @@ Status Coordinator::GetNext(QueryResultSet* results, int max_rows, bool* eos) {
 
   if (*eos) {
     returned_all_results_ = true;
+    query_events_->MarkEvent("Last row fetched");
     // release query execution resources here, since we won't be fetching more result rows
     ReleaseExecResources();
     // wait for all backends to complete before computing the summary
@@ -1087,12 +1088,13 @@ void Coordinator::ReleaseAdmissionControlResources() {
 
 void Coordinator::ReleaseAdmissionControlResourcesLocked() {
   if (released_admission_control_resources_) return;
-  LOG(INFO) << "Release admssion control resources for query "
+  LOG(INFO) << "Release admission control resources for query "
             << PrintId(query_ctx_.query_id);
   AdmissionController* admission_controller =
       ExecEnv::GetInstance()->admission_controller();
   if (admission_controller != nullptr) admission_controller->ReleaseQuery(schedule_);
   released_admission_control_resources_ = true;
+  query_events_->MarkEvent("Released admission control resources");
 }
 
 void Coordinator::UpdateFilter(const TUpdateFilterParams& params) {

http://git-wip-us.apache.org/repos/asf/impala/blob/ac86e9d9/tests/query_test/test_observability.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_observability.py b/tests/query_test/test_observability.py
index 8e803de..54cbf62 100644
--- a/tests/query_test/test_observability.py
+++ b/tests/query_test/test_observability.py
@@ -20,6 +20,7 @@ from tests.common.skip import SkipIfS3, SkipIfADLS, SkipIfIsilon, SkipIfLocal
 from tests.common.impala_cluster import ImpalaCluster
 import logging
 import pytest
+import re
 import time
 
 class TestObservability(ImpalaTestSuite):
@@ -184,16 +185,53 @@ class TestObservability(ImpalaTestSuite):
       query_id, MAX_WAIT)
     assert False, dbg_str
 
-  def test_query_profile_contains_instance_events(self, unique_database):
+  def test_query_profile_contains_query_events(self):
+    """Test that the expected events show up in a query profile."""
+    event_regexes = [r'Query Timeline:',
+        r'Query submitted:',
+        r'Planning finished:',
+        r'Submit for admission:',
+        r'Completed admission:',
+        r'Ready to start on .* backends:',
+        r'All .* execution backends \(.* fragment instances\) started:',
+        r'Rows available:',
+        r'First row fetched:',
+        r'Last row fetched:',
+        r'Released admission control resources:']
+    query = "select * from functional.alltypes"
+    runtime_profile = self.execute_query(query).runtime_profile
+    self.__verify_profile_event_sequence(event_regexes, runtime_profile)
+
+  def test_query_profile_contains_instance_events(self):
     """Test that /query_profile_encoded contains an event timeline for fragment
     instances, even when there are errors."""
-    events = ["Fragment Instance Lifecycle Event Timeline",
-              "Prepare Finished",
-              "Open Finished",
-              "First Batch Produced",
-              "First Batch Sent",
-              "ExecInternal Finished"]
+    event_regexes = [r'Fragment Instance Lifecycle Event Timeline',
+                     r'Prepare Finished',
+                     r'Open Finished',
+                     r'First Batch Produced',
+                     r'First Batch Sent',
+                     r'ExecInternal Finished']
     query = "select count(*) from functional.alltypes"
     runtime_profile = self.execute_query(query).runtime_profile
-    for event in events:
-      assert event in runtime_profile
+    self.__verify_profile_event_sequence(event_regexes, runtime_profile)
+
+  def __verify_profile_event_sequence(self, event_regexes, runtime_profile):
+    """Check that 'event_regexes' appear in a consecutive series of lines in
+       'runtime_profile'"""
+    lines = runtime_profile.splitlines()
+    event_regex_index = 0
+
+    # Check that the strings appear in the above order with no gaps in the profile.
+    for line in runtime_profile.splitlines():
+      match = re.search(event_regexes[event_regex_index], line)
+      if match is not None:
+        event_regex_index += 1
+        if event_regex_index == len(event_regexes):
+          # Found all the lines - we're done.
+          return
+      else:
+        # Haven't found the first regex yet.
+        assert event_regex_index == 0, \
+            event_regexes[event_regex_index] + " not in " + line + "\n" + runtime_profile
+    assert event_regex_index == len(event_regexes), \
+        "Didn't find all events in profile: \n" + runtime_profile