You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2022/06/15 05:31:25 UTC

[impala] branch master updated: IMPALA-11337: Flush row output before writing "Fetched X row(s)"

This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 7eb200abf IMPALA-11337: Flush row output before writing "Fetched X row(s)"
7eb200abf is described below

commit 7eb200abf18a79f0ab127324d1e7108732c60f25
Author: Joe McDonnell <jo...@cloudera.com>
AuthorDate: Mon Jun 13 15:08:42 2022 -0700

    IMPALA-11337: Flush row output before writing "Fetched X row(s)"
    
    When redirecting stdout and stderr to a file, the
    existing code can sometimes output the "Fetched X row(s)"
    line before finishing the row output. e.g.
    impala-shell -B -q "select 1" >> outfile.txt 2>> outfile.txt
    
    The rows output goes to stdout while the control messages
    like "Fetched X row(s)" go to stderr. Since stdout can buffer
    output, that can delay the output. This adds a flush for
    stdout before writing the "Fetched X row(s)" message.
    
    Testing:
     - Added a shell test that redirects stdout and stderr to
       a file and verifies the contents. This consistently
       fails without the flush.
     - Other shell tests pass
    
    Change-Id: I83f89c110fd90d2d54331c7121e407d9de99146c
    Reviewed-on: http://gerrit.cloudera.org:8080/18625
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 shell/impala_shell.py                 |  4 ++++
 shell/shell_output.py                 |  6 ++++++
 tests/shell/test_shell_commandline.py | 21 +++++++++++++++++++++
 tests/shell/util.py                   | 33 +++++++++++++++++++++++----------
 4 files changed, 54 insertions(+), 10 deletions(-)

diff --git a/shell/impala_shell.py b/shell/impala_shell.py
index 735f2354a..503ef222c 100755
--- a/shell/impala_shell.py
+++ b/shell/impala_shell.py
@@ -1332,6 +1332,10 @@ class ImpalaShell(cmd.Cmd, object):
         # retrieve the error log
         warning_log = self.imp_client.get_warning_log(self.last_query_handle)
 
+        # Flush the row output. This is important so that the row output will not
+        # come after the "Fetch X row(s)" message.
+        self.output_stream.flush()
+
       end_time = time.time()
 
       if warning_log:
diff --git a/shell/shell_output.py b/shell/shell_output.py
index 371cb911d..b5ccbe167 100644
--- a/shell/shell_output.py
+++ b/shell/shell_output.py
@@ -159,6 +159,12 @@ class OutputStream(object):
       # If filename is None, then just print to stdout
       print(formatted_data)
 
+  def flush(self):
+    # When outputing to a file, the file is currently closed with each write,
+    # so the flush doesn't need to do anything.
+    if self.filename is None:
+      sys.stdout.flush()
+
 
 class OverwritingStdErrOutputStream(object):
   """This class is used to write output to stderr and overwrite the previous text as
diff --git a/tests/shell/test_shell_commandline.py b/tests/shell/test_shell_commandline.py
index fbcbdbced..597c98b20 100644
--- a/tests/shell/test_shell_commandline.py
+++ b/tests/shell/test_shell_commandline.py
@@ -1272,3 +1272,24 @@ class TestImpalaShell(ImpalaTestSuite):
     output = run_impala_shell_cmd(vector, ['-q', query, '-B'])
     assert "Fetched 1 row(s)" in output.stderr
     assert "Trailing Whitespace          \n" in output.stdout
+
+  def test_shell_flush(self, vector, tmp_file):
+    """Verify that the rows are flushed before the Fetch X row(s) message"""
+
+    # Run a simple "select 1" with stdout and stderr redirected to the same file.
+    with open(tmp_file, "w") as f:
+      output = run_impala_shell_cmd(vector, ['-q', DEFAULT_QUERY, '-B'], stdout_file=f,
+                                    stderr_file=f)
+      # Stdout and stderr should be empty
+      assert output.stderr is None
+      assert output.stdout is None
+
+    # Verify the file contents
+    # The output should be in this order:
+    # 1\n
+    # Fetched 1 row(s) in ...\n
+    with open(tmp_file, "r") as f:
+      lines = f.readlines()
+      assert len(lines) >= 2
+      assert "1\n" in lines[len(lines) - 2]
+      assert "Fetched 1 row(s)" in lines[len(lines) - 1]
diff --git a/tests/shell/util.py b/tests/shell/util.py
index f42488c48..0f1a76881 100755
--- a/tests/shell/util.py
+++ b/tests/shell/util.py
@@ -115,14 +115,17 @@ def assert_pattern(pattern, result, text, message):
 
 
 def run_impala_shell_cmd(vector, shell_args, env=None, expect_success=True,
-                         stdin_input=None, wait_until_connected=True):
+                         stdin_input=None, wait_until_connected=True,
+                         stdout_file=None, stderr_file=None):
   """Runs the Impala shell on the commandline.
 
   'shell_args' is a string which represents the commandline options.
   Returns a ImpalaShellResult.
   """
   result = run_impala_shell_cmd_no_expect(vector, shell_args, env, stdin_input,
-                                          expect_success and wait_until_connected)
+                                          expect_success and wait_until_connected,
+                                          stdout_file=stdout_file,
+                                          stderr_file=stderr_file)
   if expect_success:
     assert result.rc == 0, "Cmd %s was expected to succeed: %s" % (shell_args,
                                                                    result.stderr)
@@ -132,7 +135,8 @@ def run_impala_shell_cmd(vector, shell_args, env=None, expect_success=True,
 
 
 def run_impala_shell_cmd_no_expect(vector, shell_args, env=None, stdin_input=None,
-                                   wait_until_connected=True):
+                                   wait_until_connected=True, stdout_file=None,
+                                   stderr_file=None):
   """Runs the Impala shell on the commandline.
 
   'shell_args' is a string which represents the commandline options.
@@ -140,7 +144,8 @@ def run_impala_shell_cmd_no_expect(vector, shell_args, env=None, stdin_input=Non
 
   Does not assert based on success or failure of command.
   """
-  p = ImpalaShell(vector, shell_args, env=env, wait_until_connected=wait_until_connected)
+  p = ImpalaShell(vector, shell_args, env=env, wait_until_connected=wait_until_connected,
+                  stdout_file=stdout_file, stderr_file=stderr_file)
   result = p.get_result(stdin_input)
   return result
 
@@ -213,11 +218,16 @@ class ImpalaShell(object):
      get_result() to retrieve the process output. This constructor will wait until
      Impala shell is connected for the specified timeout unless wait_until_connected is
      set to False or --quiet is passed into the args."""
-  def __init__(self, vector, args=None, env=None, wait_until_connected=True, timeout=60):
-    self.shell_process = self._start_new_shell_process(vector, args, env=env)
+  def __init__(self, vector, args=None, env=None, wait_until_connected=True, timeout=60,
+               stdout_file=None, stderr_file=None):
+    self.shell_process = self._start_new_shell_process(vector, args, env=env,
+                                                       stdout_file=stdout_file,
+                                                       stderr_file=stderr_file)
     # When --quiet option is passed to Impala shell, we should not wait until we see
-    # "Connected to" because it will never be printed to stderr.
-    if wait_until_connected and (args is None or "--quiet" not in args):
+    # "Connected to" because it will never be printed to stderr. The same is true
+    # if stderr is redirected.
+    if wait_until_connected and (args is None or "--quiet" not in args) and \
+       stderr_file is None:
       start_time = time.time()
       connected = False
       while time.time() - start_time < timeout and not connected:
@@ -262,11 +272,14 @@ class ImpalaShell(object):
     result.rc = self.shell_process.returncode
     return result
 
-  def _start_new_shell_process(self, vector, args=None, env=None):
+  def _start_new_shell_process(self, vector, args=None, env=None, stdout_file=None,
+                               stderr_file=None):
     """Starts a shell process and returns the process handle"""
     cmd = get_shell_cmd(vector)
     if args is not None: cmd += args
-    return Popen(cmd, shell=False, stdout=PIPE, stdin=PIPE, stderr=PIPE,
+    stdout_arg = stdout_file if stdout_file is not None else PIPE
+    stderr_arg = stderr_file if stderr_file is not None else PIPE
+    return Popen(cmd, shell=False, stdout=stdout_arg, stdin=PIPE, stderr=stderr_arg,
                  env=build_shell_env(env))