You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2022/08/08 23:37:11 UTC
[impala] 14/27: IMPALA-11337: Flush row output before writing "Fetched X row(s)"
This is an automated email from the ASF dual-hosted git repository.
stigahuang pushed a commit to branch branch-4.1.1
in repository https://gitbox.apache.org/repos/asf/impala.git
commit 328eedf96725c1eafd29675eea89b72990a06d15
Author: Joe McDonnell <jo...@cloudera.com>
AuthorDate: Mon Jun 13 15:08:42 2022 -0700
IMPALA-11337: Flush row output before writing "Fetched X row(s)"
When redirecting stdout and stderr to a file, the
existing code can sometimes output the "Fetched X row(s)"
line before finishing the row output. e.g.
impala-shell -B -q "select 1" >> outfile.txt 2>> outfile.txt
The rows output goes to stdout while the control messages
like "Fetched X row(s)" go to stderr. Since stdout can buffer
output, that can delay the output. This adds a flush for
stdout before writing the "Fetched X row(s)" message.
Testing:
- Added a shell test that redirects stdout and stderr to
a file and verifies the contents. This consistently
fails without the flush.
- Other shell tests pass
Change-Id: I83f89c110fd90d2d54331c7121e407d9de99146c
Reviewed-on: http://gerrit.cloudera.org:8080/18625
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
shell/impala_shell.py | 4 ++++
shell/shell_output.py | 6 ++++++
tests/shell/test_shell_commandline.py | 21 +++++++++++++++++++++
tests/shell/util.py | 33 +++++++++++++++++++++++----------
4 files changed, 54 insertions(+), 10 deletions(-)
diff --git a/shell/impala_shell.py b/shell/impala_shell.py
index 7bf891d56..acaa02b88 100755
--- a/shell/impala_shell.py
+++ b/shell/impala_shell.py
@@ -1328,6 +1328,10 @@ class ImpalaShell(cmd.Cmd, object):
# retrieve the error log
warning_log = self.imp_client.get_warning_log(self.last_query_handle)
+ # Flush the row output. This is important so that the row output will not
+ # come after the "Fetch X row(s)" message.
+ self.output_stream.flush()
+
end_time = time.time()
if warning_log:
diff --git a/shell/shell_output.py b/shell/shell_output.py
index 608a8ca92..cac21455d 100644
--- a/shell/shell_output.py
+++ b/shell/shell_output.py
@@ -137,6 +137,12 @@ class OutputStream(object):
# If filename is None, then just print to stdout
print(formatted_data)
+ def flush(self):
+ # When outputing to a file, the file is currently closed with each write,
+ # so the flush doesn't need to do anything.
+ if self.filename is None:
+ sys.stdout.flush()
+
class OverwritingStdErrOutputStream(object):
"""This class is used to write output to stderr and overwrite the previous text as
diff --git a/tests/shell/test_shell_commandline.py b/tests/shell/test_shell_commandline.py
index 1cfd90746..ef6536e26 100644
--- a/tests/shell/test_shell_commandline.py
+++ b/tests/shell/test_shell_commandline.py
@@ -1250,3 +1250,24 @@ class TestImpalaShell(ImpalaTestSuite):
output = run_impala_shell_cmd(vector, ['-q', query, '-B'])
assert "Fetched 1 row(s)" in output.stderr
assert "Trailing Whitespace \n" in output.stdout
+
+ def test_shell_flush(self, vector, tmp_file):
+ """Verify that the rows are flushed before the Fetch X row(s) message"""
+
+ # Run a simple "select 1" with stdout and stderr redirected to the same file.
+ with open(tmp_file, "w") as f:
+ output = run_impala_shell_cmd(vector, ['-q', DEFAULT_QUERY, '-B'], stdout_file=f,
+ stderr_file=f)
+ # Stdout and stderr should be empty
+ assert output.stderr is None
+ assert output.stdout is None
+
+ # Verify the file contents
+ # The output should be in this order:
+ # 1\n
+ # Fetched 1 row(s) in ...\n
+ with open(tmp_file, "r") as f:
+ lines = f.readlines()
+ assert len(lines) >= 2
+ assert "1\n" in lines[len(lines) - 2]
+ assert "Fetched 1 row(s)" in lines[len(lines) - 1]
diff --git a/tests/shell/util.py b/tests/shell/util.py
index fc0365f3b..1aa49b2cb 100755
--- a/tests/shell/util.py
+++ b/tests/shell/util.py
@@ -165,14 +165,17 @@ def assert_pattern(pattern, result, text, message):
def run_impala_shell_cmd(vector, shell_args, env=None, expect_success=True,
- stdin_input=None, wait_until_connected=True):
+ stdin_input=None, wait_until_connected=True,
+ stdout_file=None, stderr_file=None):
"""Runs the Impala shell on the commandline.
'shell_args' is a string which represents the commandline options.
Returns a ImpalaShellResult.
"""
result = run_impala_shell_cmd_no_expect(vector, shell_args, env, stdin_input,
- expect_success and wait_until_connected)
+ expect_success and wait_until_connected,
+ stdout_file=stdout_file,
+ stderr_file=stderr_file)
if expect_success:
assert result.rc == 0, "Cmd %s was expected to succeed: %s" % (shell_args,
result.stderr)
@@ -182,7 +185,8 @@ def run_impala_shell_cmd(vector, shell_args, env=None, expect_success=True,
def run_impala_shell_cmd_no_expect(vector, shell_args, env=None, stdin_input=None,
- wait_until_connected=True):
+ wait_until_connected=True, stdout_file=None,
+ stderr_file=None):
"""Runs the Impala shell on the commandline.
'shell_args' is a string which represents the commandline options.
@@ -190,7 +194,8 @@ def run_impala_shell_cmd_no_expect(vector, shell_args, env=None, stdin_input=Non
Does not assert based on success or failure of command.
"""
- p = ImpalaShell(vector, shell_args, env=env, wait_until_connected=wait_until_connected)
+ p = ImpalaShell(vector, shell_args, env=env, wait_until_connected=wait_until_connected,
+ stdout_file=stdout_file, stderr_file=stderr_file)
result = p.get_result(stdin_input)
return result
@@ -262,11 +267,16 @@ class ImpalaShell(object):
get_result() to retrieve the process output. This constructor will wait until
Impala shell is connected for the specified timeout unless wait_until_connected is
set to False or --quiet is passed into the args."""
- def __init__(self, vector, args=None, env=None, wait_until_connected=True, timeout=60):
- self.shell_process = self._start_new_shell_process(vector, args, env=env)
+ def __init__(self, vector, args=None, env=None, wait_until_connected=True, timeout=60,
+ stdout_file=None, stderr_file=None):
+ self.shell_process = self._start_new_shell_process(vector, args, env=env,
+ stdout_file=stdout_file,
+ stderr_file=stderr_file)
# When --quiet option is passed to Impala shell, we should not wait until we see
- # "Connected to" because it will never be printed to stderr.
- if wait_until_connected and (args is None or "--quiet" not in args):
+ # "Connected to" because it will never be printed to stderr. The same is true
+ # if stderr is redirected.
+ if wait_until_connected and (args is None or "--quiet" not in args) and \
+ stderr_file is None:
start_time = time.time()
connected = False
while time.time() - start_time < timeout and not connected:
@@ -311,11 +321,14 @@ class ImpalaShell(object):
result.rc = self.shell_process.returncode
return result
- def _start_new_shell_process(self, vector, args=None, env=None):
+ def _start_new_shell_process(self, vector, args=None, env=None, stdout_file=None,
+ stderr_file=None):
"""Starts a shell process and returns the process handle"""
cmd = get_shell_cmd(vector)
if args is not None: cmd += args
- return Popen(cmd, shell=False, stdout=PIPE, stdin=PIPE, stderr=PIPE,
+ stdout_arg = stdout_file if stdout_file is not None else PIPE
+ stderr_arg = stderr_file if stderr_file is not None else PIPE
+ return Popen(cmd, shell=False, stdout=stdout_arg, stdin=PIPE, stderr=stderr_arg,
env=build_shell_env(env))