You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2022/06/15 05:31:25 UTC
[impala] branch master updated: IMPALA-11337: Flush row output before writing "Fetched X row(s)"
This is an automated email from the ASF dual-hosted git repository.
joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 7eb200abf IMPALA-11337: Flush row output before writing "Fetched X row(s)"
7eb200abf is described below
commit 7eb200abf18a79f0ab127324d1e7108732c60f25
Author: Joe McDonnell <jo...@cloudera.com>
AuthorDate: Mon Jun 13 15:08:42 2022 -0700
IMPALA-11337: Flush row output before writing "Fetched X row(s)"
When redirecting stdout and stderr to a file, the
existing code can sometimes output the "Fetched X row(s)"
line before finishing the row output. e.g.
impala-shell -B -q "select 1" >> outfile.txt 2>> outfile.txt
The rows output goes to stdout while the control messages
like "Fetched X row(s)" go to stderr. Since stdout can buffer
output, that can delay the output. This adds a flush for
stdout before writing the "Fetched X row(s)" message.
Testing:
- Added a shell test that redirects stdout and stderr to
a file and verifies the contents. This consistently
fails without the flush.
- Other shell tests pass
Change-Id: I83f89c110fd90d2d54331c7121e407d9de99146c
Reviewed-on: http://gerrit.cloudera.org:8080/18625
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
shell/impala_shell.py | 4 ++++
shell/shell_output.py | 6 ++++++
tests/shell/test_shell_commandline.py | 21 +++++++++++++++++++++
tests/shell/util.py | 33 +++++++++++++++++++++++----------
4 files changed, 54 insertions(+), 10 deletions(-)
diff --git a/shell/impala_shell.py b/shell/impala_shell.py
index 735f2354a..503ef222c 100755
--- a/shell/impala_shell.py
+++ b/shell/impala_shell.py
@@ -1332,6 +1332,10 @@ class ImpalaShell(cmd.Cmd, object):
# retrieve the error log
warning_log = self.imp_client.get_warning_log(self.last_query_handle)
+ # Flush the row output. This is important so that the row output will not
+ # come after the "Fetch X row(s)" message.
+ self.output_stream.flush()
+
end_time = time.time()
if warning_log:
diff --git a/shell/shell_output.py b/shell/shell_output.py
index 371cb911d..b5ccbe167 100644
--- a/shell/shell_output.py
+++ b/shell/shell_output.py
@@ -159,6 +159,12 @@ class OutputStream(object):
# If filename is None, then just print to stdout
print(formatted_data)
+ def flush(self):
+ # When outputing to a file, the file is currently closed with each write,
+ # so the flush doesn't need to do anything.
+ if self.filename is None:
+ sys.stdout.flush()
+
class OverwritingStdErrOutputStream(object):
"""This class is used to write output to stderr and overwrite the previous text as
diff --git a/tests/shell/test_shell_commandline.py b/tests/shell/test_shell_commandline.py
index fbcbdbced..597c98b20 100644
--- a/tests/shell/test_shell_commandline.py
+++ b/tests/shell/test_shell_commandline.py
@@ -1272,3 +1272,24 @@ class TestImpalaShell(ImpalaTestSuite):
output = run_impala_shell_cmd(vector, ['-q', query, '-B'])
assert "Fetched 1 row(s)" in output.stderr
assert "Trailing Whitespace \n" in output.stdout
+
+ def test_shell_flush(self, vector, tmp_file):
+ """Verify that the rows are flushed before the Fetch X row(s) message"""
+
+ # Run a simple "select 1" with stdout and stderr redirected to the same file.
+ with open(tmp_file, "w") as f:
+ output = run_impala_shell_cmd(vector, ['-q', DEFAULT_QUERY, '-B'], stdout_file=f,
+ stderr_file=f)
+ # Stdout and stderr should be empty
+ assert output.stderr is None
+ assert output.stdout is None
+
+ # Verify the file contents
+ # The output should be in this order:
+ # 1\n
+ # Fetched 1 row(s) in ...\n
+ with open(tmp_file, "r") as f:
+ lines = f.readlines()
+ assert len(lines) >= 2
+ assert "1\n" in lines[len(lines) - 2]
+ assert "Fetched 1 row(s)" in lines[len(lines) - 1]
diff --git a/tests/shell/util.py b/tests/shell/util.py
index f42488c48..0f1a76881 100755
--- a/tests/shell/util.py
+++ b/tests/shell/util.py
@@ -115,14 +115,17 @@ def assert_pattern(pattern, result, text, message):
def run_impala_shell_cmd(vector, shell_args, env=None, expect_success=True,
- stdin_input=None, wait_until_connected=True):
+ stdin_input=None, wait_until_connected=True,
+ stdout_file=None, stderr_file=None):
"""Runs the Impala shell on the commandline.
'shell_args' is a string which represents the commandline options.
Returns a ImpalaShellResult.
"""
result = run_impala_shell_cmd_no_expect(vector, shell_args, env, stdin_input,
- expect_success and wait_until_connected)
+ expect_success and wait_until_connected,
+ stdout_file=stdout_file,
+ stderr_file=stderr_file)
if expect_success:
assert result.rc == 0, "Cmd %s was expected to succeed: %s" % (shell_args,
result.stderr)
@@ -132,7 +135,8 @@ def run_impala_shell_cmd(vector, shell_args, env=None, expect_success=True,
def run_impala_shell_cmd_no_expect(vector, shell_args, env=None, stdin_input=None,
- wait_until_connected=True):
+ wait_until_connected=True, stdout_file=None,
+ stderr_file=None):
"""Runs the Impala shell on the commandline.
'shell_args' is a string which represents the commandline options.
@@ -140,7 +144,8 @@ def run_impala_shell_cmd_no_expect(vector, shell_args, env=None, stdin_input=Non
Does not assert based on success or failure of command.
"""
- p = ImpalaShell(vector, shell_args, env=env, wait_until_connected=wait_until_connected)
+ p = ImpalaShell(vector, shell_args, env=env, wait_until_connected=wait_until_connected,
+ stdout_file=stdout_file, stderr_file=stderr_file)
result = p.get_result(stdin_input)
return result
@@ -213,11 +218,16 @@ class ImpalaShell(object):
get_result() to retrieve the process output. This constructor will wait until
Impala shell is connected for the specified timeout unless wait_until_connected is
set to False or --quiet is passed into the args."""
- def __init__(self, vector, args=None, env=None, wait_until_connected=True, timeout=60):
- self.shell_process = self._start_new_shell_process(vector, args, env=env)
+ def __init__(self, vector, args=None, env=None, wait_until_connected=True, timeout=60,
+ stdout_file=None, stderr_file=None):
+ self.shell_process = self._start_new_shell_process(vector, args, env=env,
+ stdout_file=stdout_file,
+ stderr_file=stderr_file)
# When --quiet option is passed to Impala shell, we should not wait until we see
- # "Connected to" because it will never be printed to stderr.
- if wait_until_connected and (args is None or "--quiet" not in args):
+ # "Connected to" because it will never be printed to stderr. The same is true
+ # if stderr is redirected.
+ if wait_until_connected and (args is None or "--quiet" not in args) and \
+ stderr_file is None:
start_time = time.time()
connected = False
while time.time() - start_time < timeout and not connected:
@@ -262,11 +272,14 @@ class ImpalaShell(object):
result.rc = self.shell_process.returncode
return result
- def _start_new_shell_process(self, vector, args=None, env=None):
+ def _start_new_shell_process(self, vector, args=None, env=None, stdout_file=None,
+ stderr_file=None):
"""Starts a shell process and returns the process handle"""
cmd = get_shell_cmd(vector)
if args is not None: cmd += args
- return Popen(cmd, shell=False, stdout=PIPE, stdin=PIPE, stderr=PIPE,
+ stdout_arg = stdout_file if stdout_file is not None else PIPE
+ stderr_arg = stderr_file if stderr_file is not None else PIPE
+ return Popen(cmd, shell=False, stdout=stdout_arg, stdin=PIPE, stderr=stderr_arg,
env=build_shell_env(env))