You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/08/01 17:24:26 UTC
[1/6] impala git commit: IMPALA-7195 IMPALA-7222: [DOCS] Impala
delegation with groups
Repository: impala
Updated Branches:
refs/heads/master 316b17ac5 -> 55083f38c
IMPALA-7195 IMPALA-7222: [DOCS] Impala delegation with groups
Added clarifications about delegation.
Change-Id: I6948ab2ef9b82b123f7a1f4fdc83cfb06e9f912f
Reviewed-on: http://gerrit.cloudera.org:8080/11068
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-by: Fredy Wijaya <fw...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/b9511109
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/b9511109
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/b9511109
Branch: refs/heads/master
Commit: b95111094b70a060e145e61013559ba7f2e1799a
Parents: 316b17a
Author: Alex Rodoni <ar...@cloudera.com>
Authored: Thu Jul 26 16:45:43 2018 -0700
Committer: Alex Rodoni <ar...@cloudera.com>
Committed: Tue Jul 31 18:06:22 2018 +0000
----------------------------------------------------------------------
docs/topics/impala_delegation.xml | 265 +++++++++++++++++++++++++++------
1 file changed, 221 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/impala/blob/b9511109/docs/topics/impala_delegation.xml
----------------------------------------------------------------------
diff --git a/docs/topics/impala_delegation.xml b/docs/topics/impala_delegation.xml
index c524bf5..bd29426 100644
--- a/docs/topics/impala_delegation.xml
+++ b/docs/topics/impala_delegation.xml
@@ -36,58 +36,235 @@ under the License.
</prolog>
<conbody>
+
<p>
- <!--
- When users connect to Impala directly through the <cmdname>impala-shell</cmdname> interpreter, the Sentry
- authorization framework determines what actions they can take and what data they can see.
--->
- When users submit Impala queries through a separate application, such as
- Hue or a business intelligence tool, typically all requests are treated as
- coming from the same user. In Impala 1.2 and higher,Impala supports
- applications to pass along credentials for the users that connect to them,
- known as <q>delegation</q>, and to issue Impala queries with the
- privileges for those users. Currently, the delegation feature is available
- only for Impala queries submitted through application interfaces such as
- Hue and BI tools. For example, Impala cannot issue queries using the
- privileges of the HDFS user. </p>
- <note type="attention">Impala requires Apache Sentry on the cluster to
- enable delegation. Without Apache Sentry installed, the delegation feature
- will fail with the following error: User <i>user1</i> is not authorized to
- delegate to <i>user2</i> User delegation is disabled.</note>
- <p> The delegation feature is enabled by a startup option for
- <cmdname>impalad</cmdname>:
- <codeph>--authorized_proxy_user_config</codeph>. When you specify this
- option, users whose names you specify (such as <codeph>hue</codeph>) can
- delegate the execution of a query to another user. The query runs with the
- privileges of the delegated user, not the original user such as
- <codeph>hue</codeph>. The name of the delegated user is passed using the
- HiveServer2 configuration property <codeph>impala.doas.user</codeph>. </p>
- <p> You can specify a list of users that the application user can delegate
- to, or <codeph>*</codeph> to allow a superuser to delegate to any other
- user. For example: </p>
- <codeblock>impalad --authorized_proxy_user_config 'hue=user1,user2;admin=*' ...</codeblock>
- <note> Make sure to use single quotes or escape characters to ensure that
- any <codeph>*</codeph> characters do not undergo wildcard expansion when
- specified in command-line arguments. </note>
- <p> See <xref href="impala_config_options.xml#config_options"/> for details
- about adding or changing <cmdname>impalad</cmdname> startup options. See
- <xref
+ When users submit Impala queries through a separate application, such as Hue or a business
+ intelligence tool, typically all requests are treated as coming from the same user. In
+ Impala 1.2 and higher, Impala supports <q>delegation</q> where users whose names you
+ specify can delegate the execution of a query to another user. The query runs with the
+ privileges of the delegated user, not the original authenticated user.
+ </p>
+
+ <p>
+ Starting in <keyword keyref="impala31_full">Impala 3.1</keyword> and higher, you can
+ delegate using groups. Instead of listing a large number of delegated users, you can
+ create a group of those users and specify the delegated group name in the Impalad startup
+ option. The client sends the delegated user name, and Impala performs an authorization to
+ see if the delegated user belongs to a delegated group.
+ </p>
+
+ <p>
+ The name of the delegated user is passed using the HiveServer2 protocol configuration
+ property <codeph>impala.doas.user</codeph> when the client connects to Impala.
+ </p>
+
+ <p>
+ Currently, the delegation feature is available only for Impala queries submitted through
+ application interfaces such as Hue and BI tools. For example, Impala cannot issue queries
+ using the privileges of the HDFS user.
+ </p>
+
+ <note type="attention">
+ <ul>
+ <li>
+ When the delegation is enabled in Impala, the Impala clients should take an extra
+ caution to prevent unauthorized access for the delegate-able users.
+ </li>
+
+ <li>
+ Impala requires Apache Sentry on the cluster to enable delegation. Without Apache
+ Sentry installed, the delegation feature will fail with the following error: User
+ <i>user1</i> is not authorized to delegate to <i>user2</i>. User/group delegation is
+ disabled.
+ </li>
+ </ul>
+ </note>
+
+ <p>
+ The delegation feature is enabled by the startup options for <cmdname>impalad</cmdname>:
+ <codeph>--authorized_proxy_user_config</codeph> and
+ <codeph>--authorized_proxy_group_config</codeph>.
+ </p>
+
+ <p>
+ The syntax for the options are:
+ </p>
+
+<codeblock>--authorized_proxy_user_config=<varname>authenticated_user1</varname>=<varname>delegated_user1</varname>,<varname>delegated_user2</varname>,...;<varname>authenticated_user2</varname>=...</codeblock>
+
+<codeblock>--authorized_proxy_group_config=<varname>authenticated_user1</varname>=<varname>delegated_group1</varname>,<varname>delegated_group2</varname>,...;<varname>authenticated_user2</varname>=...</codeblock>
+
+ <ul>
+ <li>
+ The list of authorized users/groups are delimited with <codeph>;</codeph>
+ </li>
+
+ <li>
+ The list of delegated users/groups are delimited with <codeph>,</codeph> by default.
+ </li>
+
+ <li>
+ Use the <codeph>--authorized_proxy_user_config_delimiter</codeph> startup option to
+ override the default user delimiter (the comma character) to another character.
+ </li>
+
+ <li>
+ Use the <codeph>--authorized_proxy_group_config_delimiter</codeph> startup option to
+ override the default group delimiter ( (the comma character) to another character.
+ </li>
+
+ <li>
+ Wildcard (<codeph>*</codeph>) is supported to delegated to any users or any groups, e.g.
+ <codeph>--authorized_proxy_group_config=hue=*</codeph>. Make sure to use single quotes
+ or escape characters to ensure that any <codeph>*</codeph> characters do not undergo
+ wildcard expansion when specified in command-line arguments.
+ </li>
+ </ul>
+
+ <p>
+ When you start Impala with the
+ <codeph>--authorized_proxy_user_config=<varname>authenticated_user</varname>=<varname>delegated_user</varname></codeph>
+ or
+ <codeph>--authorized_proxy_group_config=<varname>authenticated_user</varname>=<varname>delegated_group</varname></codeph>
+ option:
+ </p>
+
+ <ul>
+ <li>
+ Authentication is based on the user on the left hand side
+ (<varname>authenticated_user</varname>).
+ </li>
+
+ <li>
+ Authorization is based on the right hand side user(s) or group(s)
+ (<varname>delegated_user</varname>, <varname>delegated_group</varname>).
+ </li>
+
+ <li>
+ When opening a client connection, the client must provide a delegated username via the
+ HiveServer2 protocol property,<codeph>impala.doas.user</codeph> or
+ <codeph>DelegationUID</codeph>.
+ </li>
+
+ <li>
+ It is not necessary for <varname>authenticated_user</varname> to have the permission to
+ access/edit files.
+ </li>
+
+ <li>
+ It is not necessary for the delegated users to have access to the service via Kerberos.
+ </li>
+
+ <li>
+ <varname>delegated_user</varname> and <varname>delegated_group</varname> must exist in
+ the OS.
+ </li>
+
+ <li>
+ For group delegation, use the JNI-based mapping providers for group delegation, such as
+ JniBasedUnixGroupsMappingWithFallback and JniBasedUnixGroupsNetgroupMappingWithFallback.
+ </li>
+
+ <li>
+ ShellBasedUnixGroupsNetgroupMapping and ShellBasedUnixGroupsMapping Hadoop group mapping
+ providers are not supported in Impala group delegation.
+ </li>
+
+ <li>
+ In Impala, <codeph>user()</codeph> returns <varname>authenticated_user</varname> and
+ <codeph>effective_user()</codeph> returns the delegated user that the client specified.
+ </li>
+ </ul>
+
+ <p>
+ The user or group delegation process works as follows:
+ <ol>
+ <li>
+ The Impalad daemon starts with one of the following options:
+ <ul>
+ <li>
+ <codeph>--authorized_proxy_user_config=<varname>authenticated_user</varname>=<varname>delegated_user</varname></codeph>
+ </li>
+
+ <li>
+ <codeph>--authorized_proxy_group_config=<varname>authenticated_user</varname>=<varname>delegated_group</varname></codeph>
+ </li>
+ </ul>
+ </li>
+
+ <li>
+ A client connects to Impala via the HiveServer2 protocol with the
+ <codeph>impala.doas.user</codeph> configuration property, e.g. connected user is
+ <varname>authenticated_user</varname> with
+ <codeph>impala.doas.user=<varname>delegated_user</varname></codeph>.
+ </li>
+
+ <li>
+ The client user <varname>authenticated_user</varname> sends a request to Impala as the
+ delegated user <varname>delegated_user</varname>.
+ </li>
+
+ <li>
+ Impala checks authorization:
+ <ul>
+ <li>
+ In user delegation, Impala checks if <varname>delegated_user</varname> is in the
+ list of authorized delegate users for the user
+ <varname>authenticated_user</varname>.
+ </li>
+
+ <li>
+ In group delegate, Impala checks if <varname>delegated_user</varname> belongs to
+ one of the delegated groups for the user <varname>authenticated_user</varname>,
+ <varname>delegated_group</varname> in this example.
+ </li>
+ </ul>
+ </li>
+
+ <li>
+ If the user is an authorized delegated user for <varname>authenticated_user</varname>,
+ the request is executed as the delegate user <varname>delegated_user</varname>.
+ </li>
+ </ol>
+ </p>
+
+ <p>
+ See <xref href="impala_config_options.xml#config_options"/> for details about adding or
+ changing <cmdname>impalad</cmdname> startup options.
+ </p>
+
+ <p>
+ See
+ <xref
keyref="how-hiveserver2-brings-security-and-concurrency-to-apache-hive"
- >this blog post</xref> for background information about the delegation
- capability in HiveServer2. </p>
- <p> To set up authentication for the delegated users: </p>
+ >this
+ blog post</xref> for background information about the delegation capability in
+ HiveServer2.
+ </p>
+
+ <p>
+ To set up authentication for the delegated users:
+ </p>
+
<ul>
<li>
- <p> On the server side, configure either user/password authentication
- through LDAP, or Kerberos authentication, for all the delegated users.
- See <xref href="impala_ldap.xml#ldap"/> or <xref
- href="impala_kerberos.xml#kerberos"/> for details. </p>
+ <p>
+ On the server side, configure either user/password authentication through LDAP, or
+ Kerberos authentication, for all the delegated users. See
+ <xref href="impala_ldap.xml#ldap"/> or
+ <xref
+ href="impala_kerberos.xml#kerberos"/> for details.
+ </p>
</li>
+
<li>
- <p> On the client side, to learn how to enable delegation, consult the
- documentation for the ODBC driver you are using. </p>
+ <p>
+ On the client side, to learn how to enable delegation, consult the documentation for
+ the ODBC driver you are using.
+ </p>
</li>
</ul>
+
</conbody>
</concept>
[6/6] impala git commit: IMPALA-7317: comment on long lines and
whitespace
Posted by ta...@apache.org.
IMPALA-7317: comment on long lines and whitespace
Add some basic formatting checks that can be implemented
easily by parsing the diff line-by-line. These are applied
to Java, Python, C++, shell and thrift source files with
some exclusions.
Adds a --dryrun option that does not try to post back the
comments to gerrit.
Remove the option to specify a git revision since flake8-diff doesn't
work correctly when run on source that isn't checked out.
Testing:
Added some violations to this code change that will be
fixed before merging. The output is:
{
"comments": {
"bin/jenkins/critique-gerrit-review.py": [
{
"message": "flake8: E501 line too long (107 > 90 characters)",
"range": {
"start_character": 90,
"start_line": 124,
"end_line": 124,
"end_character": 91
}
},
{
"message": "tab used for whitespace",
"line": 125
}
]
}
}
Change-Id: I36bb99560ab09d7753ff93227d1c4972582770f2
Reviewed-on: http://gerrit.cloudera.org:8080/11085
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Tim Armstrong <ta...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/55083f38
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/55083f38
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/55083f38
Branch: refs/heads/master
Commit: 55083f38cce74347a3a528dd59c402f5cfd4441a
Parents: abf6f8f
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Mon Jul 30 16:20:11 2018 -0700
Committer: Tim Armstrong <ta...@cloudera.com>
Committed: Wed Aug 1 17:23:10 2018 +0000
----------------------------------------------------------------------
bin/jenkins/critique-gerrit-review.py | 96 ++++++++++++++++++++++++++++--
1 file changed, 91 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/impala/blob/55083f38/bin/jenkins/critique-gerrit-review.py
----------------------------------------------------------------------
diff --git a/bin/jenkins/critique-gerrit-review.py b/bin/jenkins/critique-gerrit-review.py
index 3311793..409fef4 100755
--- a/bin/jenkins/critique-gerrit-review.py
+++ b/bin/jenkins/critique-gerrit-review.py
@@ -16,7 +16,7 @@
# specific language governing permissions and limitations
# under the License.
#
-# Usage: critique-gerrit-review.py <git commit>
+# Usage: critique-gerrit-review.py [--dryrun]
#
# This script is meant to run on an jenkins.impala.io build slave and post back comments
# to a code review. It does not need to run on all supported platforms so we use system
@@ -34,16 +34,16 @@
# ssh, pip, virtualenv
#
# TODO: generalise to other warnings
-# * Lines too long and trailing whitespace
# * clang-tidy
+from argparse import ArgumentParser
from collections import defaultdict
import json
import os
from os import environ
import os.path
import re
-from subprocess import check_call, Popen, PIPE
+from subprocess import check_call, check_output, Popen, PIPE
import sys
import virtualenv
@@ -55,6 +55,19 @@ VENV_BIN = os.path.join(VENV_PATH, "bin")
PIP_PATH = os.path.join(VENV_BIN, "pip")
FLAKE8_DIFF_PATH = os.path.join(VENV_BIN, "flake8-diff")
+# Limit on length of lines in source files.
+LINE_LIMIT = 90
+
+# Source file extensions that we should apply our line limit and whitespace rules to.
+SOURCE_EXTENSIONS = set([".cc", ".h", ".java", ".py", ".sh", ".thrift"])
+
+# Source file patterns that we exclude from our line limit and whitespace rules.
+EXCLUDE_FILE_PATTERNS = [
+ re.compile(r".*be/src/kudu.*"), # Kudu source code may have different rules.
+ re.compile(r".*-benchmark.cc"), # Benchmark files tend to have long lines.
+ re.compile(r".*/function-registry/impala_functions.py") # Many long strings.
+]
+
def setup_virtualenv():
"""Set up virtualenv with flake8-diff."""
@@ -108,6 +121,63 @@ def get_flake8_comments(revision):
return comments
+def get_misc_comments(revision):
+ """Get miscellaneous warnings for code changes made in the git commit 'revision', e.g.
+ long lines and trailing whitespace. These warnings are produced by directly parsing the
+ diff output."""
+ comments = defaultdict(lambda: [])
+ # Matches range information like:
+ # @@ -128 +133,2 @@ if __name__ == "__main__":
+ RANGE_RE = re.compile(r"^@@ -[0-9,]* \+([0-9]*).*$")
+
+ diff = check_output(["git", "diff", "-U0", "{0}^..{0}".format(revision)])
+ curr_file = None
+ check_source_file = False
+ curr_line_num = 0
+ for diff_line in diff.splitlines():
+ if diff_line.startswith("+++ "):
+ # Start of diff for a file. Strip off "+++ b/" to get the file path.
+ curr_file = diff_line[6:]
+ check_source_file = os.path.splitext(curr_file)[1] in SOURCE_EXTENSIONS
+ if check_source_file:
+ for pattern in EXCLUDE_FILE_PATTERNS:
+ if pattern.match(curr_file):
+ check_source_file = False
+ break
+ elif diff_line.startswith("@@ "):
+ # Figure out the starting line of the hunk. Format of unified diff is:
+ # @@ -128 +133,2 @@ if __name__ == "__main__":
+ # We want to extract the start line for the added lines
+ match = RANGE_RE.match(diff_line)
+ if not match:
+ raise Exception("Pattern did not match diff line:\n{0}".format(diff_line))
+ curr_line_num = int(match.group(1))
+ elif diff_line.startswith("+") and check_source_file:
+ # An added or modified line - check it to see if we should generate warnings.
+ add_misc_comments_for_line(comments, diff_line[1:], curr_file, curr_line_num)
+ curr_line_num += 1
+ return comments
+
+
+def add_misc_comments_for_line(comments, line, curr_file, curr_line_num):
+ """Helper for get_misc_comments to generate comments for 'line' at 'curr_line_num' in
+ 'curr_file' and append them to 'comments'."""
+ # Check for trailing whitespace.
+ if line.rstrip() != line:
+ comments[curr_file].append(
+ {"message": "line has trailing whitespace", "line": curr_line_num})
+
+ # Check for long lines. Skip .py files since flake8 already flags long lines.
+ if len(line) > 90 and os.path.splitext(curr_file)[1] != ".py":
+ msg = "line too long ({0} > {1})".format(len(line), LINE_LIMIT)
+ comments[curr_file].append(
+ {"message": msg, "line": curr_line_num})
+
+ if '\t' in line:
+ comments[curr_file].append(
+ {"message": "tab used for whitespace", "line": curr_line_num})
+
+
def post_review_to_gerrit(review_input):
"""Post a review to the gerrit patchset. 'review_input' is a ReviewInput JSON object
containing the review comments. The gerrit change and patchset are picked up from
@@ -123,8 +193,24 @@ def post_review_to_gerrit(review_input):
raise Exception("Error posting review to gerrit.")
+def merge_comments(a, b):
+ for k, v in b.iteritems():
+ a[k].extend(v)
+
+
if __name__ == "__main__":
+ parser = ArgumentParser(description="Generate and post gerrit comments")
+ parser.add_argument("--dryrun", action='store_true',
+ help="Don't post comments back to gerrit")
+ args = parser.parse_args()
+
setup_virtualenv()
- review_input = {"comments": get_flake8_comments(sys.argv[1])}
+ # flake8-diff only actually works correctly on HEAD, so this is the only revision
+ # we can correctly handle.
+ revision = 'HEAD'
+ comments = get_flake8_comments(revision)
+ merge_comments(comments, get_misc_comments(revision))
+ review_input = {"comments": comments}
print json.dumps(review_input, indent=True)
- post_review_to_gerrit(review_input)
+ if not args.dryrun:
+ post_review_to_gerrit(review_input)
[5/6] impala git commit: Fix TestKuduOperations tests in
test-with-docker by using consistent hostname.
Posted by ta...@apache.org.
Fix TestKuduOperations tests in test-with-docker by using consistent hostname.
TestKuduOperations, when run using test-with-docker, failed with errors
like:
Remote error: Service unavailable: Timed out: could not wait for desired
snapshot timestamp to be consistent: Tablet is lagging too much to be able to
serve snapshot scan. Lagging by: 1985348 ms, (max is 30000 ms):
The underlying issue, as discovered by Thomas Tauber-Marshall, is that Kudu
serializes the hostnames of Kudu tablet servers, and, different containers in
test-with-docker use different hostnames. This was exposed after "IMPALA-6812:
Fix flaky Kudu scan tests" switched to using READ_AT_SNAPSHOT for Kudu reads.
Using the same hostname for all the containers is easy and harmless;
this change does just that.
Change-Id: Iea8c5096b515a79601be2e919d32585fb2796b3d
Reviewed-on: http://gerrit.cloudera.org:8080/11082
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/abf6f8f4
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/abf6f8f4
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/abf6f8f4
Branch: refs/heads/master
Commit: abf6f8f465e6b99fdd0a97e12bbde143ef98f3db
Parents: 72db58a
Author: Philip Zeyliger <ph...@cloudera.com>
Authored: Mon Jul 30 13:15:54 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Wed Aug 1 01:25:38 2018 +0000
----------------------------------------------------------------------
docker/test-with-docker.py | 9 ++++++++-
1 file changed, 8 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/impala/blob/abf6f8f4/docker/test-with-docker.py
----------------------------------------------------------------------
diff --git a/docker/test-with-docker.py b/docker/test-with-docker.py
index 1922c57..3da700f 100755
--- a/docker/test-with-docker.py
+++ b/docker/test-with-docker.py
@@ -505,7 +505,14 @@ class TestWithDocker(object):
# requirement may be lifted in newer Docker versions.
"--privileged",
"--name", name,
- "--hostname", name,
+ # Whereas the container names vary across containers, we use the same
+ # hostname repeatedly, so that the build container and the test
+ # containers have the same hostnames. Kudu errors out with "Remote
+ # error: Service unavailable: Timed out: could not wait for desired
+ # snapshot timestamp to be consistent: Tablet is lagging too much to be
+ # able to serve snapshot scan." if reading with READ_AT_SNAPSHOT
+ # if the hostnames change underneath it.
+ "--hostname", self.name,
# Label with the git root directory for easier cleanup
"--label=pwd=" + self.git_root,
# Consistent locales
[2/6] impala git commit: IMPALA-1624: Allow toggling and unsetting
some command-line options inside impala-shell
Posted by ta...@apache.org.
IMPALA-1624: Allow toggling and unsetting some command-line options inside impala-shell
This change provides a way to modify command-line options like -B,
--output_file and --delimiter inside impala-shell without quitting
the shell and then restarting again.
Also fixed IMPALA-7286: command "unset" does not work for shell options
Testing:
Added tests for all new options in test_shell_interactive.py
Tested on Python 2.6 and Python 2.7
Change-Id: Id8d4487c24f24806223bfd5c54336914e3afd763
Reviewed-on: http://gerrit.cloudera.org:8080/10900
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/de4bdb0b
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/de4bdb0b
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/de4bdb0b
Branch: refs/heads/master
Commit: de4bdb0bbfd9a8ef17e020cc4904e3a66d2ac298
Parents: b951110
Author: nghia le <mi...@gmail.com>
Authored: Tue Jul 10 16:19:19 2018 +0200
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Tue Jul 31 20:35:27 2018 +0000
----------------------------------------------------------------------
shell/impala_shell.py | 25 +++++++++++---
tests/shell/test_shell_interactive.py | 54 ++++++++++++++++++++++++++++++
2 files changed, 75 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/impala/blob/de4bdb0b/shell/impala_shell.py
----------------------------------------------------------------------
diff --git a/shell/impala_shell.py b/shell/impala_shell.py
index 95ff8ab..aab478d 100755
--- a/shell/impala_shell.py
+++ b/shell/impala_shell.py
@@ -140,10 +140,16 @@ class ImpalaShell(object, cmd.Cmd):
DML_REGEX = re.compile("^(insert|upsert|update|delete)$", re.I)
# Seperator for queries in the history file.
HISTORY_FILE_QUERY_DELIM = '_IMP_DELIM_'
+ # Strings that are interpreted as True for some shell options.
+ TRUE_STRINGS = ("true", "TRUE", "True", "1")
VALID_SHELL_OPTIONS = {
- 'LIVE_PROGRESS' : (lambda x: x in ("true", "TRUE", "True", "1"), "print_progress"),
- 'LIVE_SUMMARY' : (lambda x: x in ("true", "TRUE", "True", "1"), "print_summary")
+ 'LIVE_PROGRESS' : (lambda x: x in ImpalaShell.TRUE_STRINGS, "print_progress"),
+ 'LIVE_SUMMARY' : (lambda x: x in ImpalaShell.TRUE_STRINGS, "print_summary"),
+ 'WRITE_DELIMITED' : (lambda x: x in ImpalaShell.TRUE_STRINGS, "write_delimited"),
+ 'VERBOSE' : (lambda x: x in ImpalaShell.TRUE_STRINGS, "verbose"),
+ 'DELIMITER' : (lambda x: " " if x == '\\s' else x, "output_delimiter"),
+ 'OUTPUT_FILE' : (lambda x: None if x == '' else x, "output_file"),
}
# Minimum time in seconds between two calls to get the exec summary.
@@ -180,7 +186,8 @@ class ImpalaShell(object, cmd.Cmd):
# Output formatting flags/options
self.output_file = options.output_file
- self.output_delimiter = options.output_delimiter
+ self.output_delimiter = " " if options.output_delimiter == "\\s" \
+ else options.output_delimiter
self.write_delimited = options.write_delimited
self.print_header = options.print_header
@@ -655,6 +662,14 @@ class ImpalaShell(object, cmd.Cmd):
except KeyError:
return False
+ def _handle_unset_shell_options(self, token):
+ try:
+ handle = self.VALID_SHELL_OPTIONS[token]
+ self.__dict__[handle[1]] = impala_shell_defaults[handle[1]]
+ return True
+ except KeyError:
+ return False
+
def _get_var_name(self, name):
"""Look for a namespace:var_name pattern in an option name.
Return the variable name if it's a match or None otherwise.
@@ -737,6 +752,8 @@ class ImpalaShell(object, cmd.Cmd):
elif self.set_query_options.get(option):
print 'Unsetting option %s' % option
del self.set_query_options[option]
+ elif self._handle_unset_shell_options(option):
+ print 'Unsetting shell option %s' % option
else:
print "No option called %s is set" % option
@@ -1447,7 +1464,7 @@ LIVE_PROGRESS=1;'.",
to remove formatting from results you want to save for later, or to benchmark Impala.",
"You can run a single query from the command line using the '-q' option.",
"When pretty-printing is disabled, you can use the '--output_delimiter' flag to set \
-the delimiter for fields in the same row. The default is ','.",
+the delimiter for fields in the same row. The default is '\\t'.",
"Run the PROFILE command after a query has finished to see a comprehensive summary of \
all the performance and diagnostic information that Impala gathered for that query. Be \
warned, it can be very long!",
http://git-wip-us.apache.org/repos/asf/impala/blob/de4bdb0b/tests/shell/test_shell_interactive.py
----------------------------------------------------------------------
diff --git a/tests/shell/test_shell_interactive.py b/tests/shell/test_shell_interactive.py
index bf4923d..1d81663 100755
--- a/tests/shell/test_shell_interactive.py
+++ b/tests/shell/test_shell_interactive.py
@@ -77,6 +77,60 @@ class TestImpalaShellInteractive(object):
self._expect_with_cmd(proc, "set", ("LIVE_PROGRESS: True", "LIVE_SUMMARY: False"))
self._expect_with_cmd(proc, "set live_summary=1")
self._expect_with_cmd(proc, "set", ("LIVE_PROGRESS: True", "LIVE_SUMMARY: True"))
+ self._expect_with_cmd(proc, "set", ("WRITE_DELIMITED: False", "VERBOSE: True"))
+ self._expect_with_cmd(proc, "set", ("DELIMITER: \\t", "OUTPUT_FILE: None"))
+ self._expect_with_cmd(proc, "set write_delimited=true")
+ self._expect_with_cmd(proc, "set", ("WRITE_DELIMITED: True", "VERBOSE: True"))
+ self._expect_with_cmd(proc, "set DELIMITER=,")
+ self._expect_with_cmd(proc, "set", ("DELIMITER: ,", "OUTPUT_FILE: None"))
+ self._expect_with_cmd(proc, "set output_file=/tmp/clmn.txt")
+ self._expect_with_cmd(proc, "set", ("DELIMITER: ,", "OUTPUT_FILE: /tmp/clmn.txt"))
+
+ @pytest.mark.execute_serially
+ def test_write_delimited(self):
+ """Test output rows in delimited mode"""
+ p = ImpalaShell()
+ p.send_cmd("use tpch")
+ p.send_cmd("set write_delimited=true")
+ p.send_cmd("select * from nation")
+ result = p.get_result()
+ assert "+----------------+" not in result.stdout
+ assert "21\tVIETNAM\t2" in result.stdout
+
+ @pytest.mark.execute_serially
+ def test_change_delimiter(self):
+ """Test change output delimiter if delimited mode is enabled"""
+ p = ImpalaShell()
+ p.send_cmd("use tpch")
+ p.send_cmd("set write_delimited=true")
+ p.send_cmd("set delimiter=,")
+ p.send_cmd("select * from nation")
+ result = p.get_result()
+ assert "21,VIETNAM,2" in result.stdout
+
+ @pytest.mark.execute_serially
+ def test_print_to_file(self):
+ """Test print to output file and unset"""
+ # test print to file
+ p1 = ImpalaShell()
+ p1.send_cmd("use tpch")
+ local_file = tempfile.NamedTemporaryFile(delete=True)
+ p1.send_cmd("set output_file=%s" % local_file.name)
+ p1.send_cmd("select * from nation")
+ result = p1.get_result()
+ assert "VIETNAM" not in result.stdout
+ with open(local_file.name, "r") as fi:
+ # check if the results were written to the file successfully
+ result = fi.read()
+ assert "VIETNAM" in result
+ # test unset to print back to stdout
+ p2 = ImpalaShell()
+ p2.send_cmd("use tpch")
+ p2.send_cmd("set output_file=%s" % local_file.name)
+ p2.send_cmd("unset output_file")
+ p2.send_cmd("select * from nation")
+ result = p2.get_result()
+ assert "VIETNAM" in result.stdout
@pytest.mark.execute_serially
def test_compute_stats_with_live_progress_options(self):
[3/6] impala git commit: IMPALA-7234: Improve memory estimates
produced by the Planner
Posted by ta...@apache.org.
IMPALA-7234: Improve memory estimates produced by the Planner
Previously, the planner used the getMajorityFormat to estimate
the memory requirements of its partitions. Additionally, before
IMPALA-6625 was merged, the majority format for a multi-format
table with no numerical majority was calculated using a HashMap,
thus producing non deterministic results. This change ensures that
the memory estimate is deterministic and always based on partition
that has the maximum memory requirement.
Testing: Ran all PlannerTests. Also, modified plans of scans with
multiple partitions to ensure that the memory estimate produced
corresponds to the partition with the maximum requirement.
Change-Id: I0666ae3d45fbd8615d3fa9a8626ebd29cf94fb4b
Reviewed-on: http://gerrit.cloudera.org:8080/11001
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/672a271f
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/672a271f
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/672a271f
Branch: refs/heads/master
Commit: 672a271fd0966bd77f38eda9b6f1e768415bac04
Parents: de4bdb0
Author: poojanilangekar <po...@cloudera.com>
Authored: Thu Jul 19 13:24:41 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Tue Jul 31 21:01:57 2018 +0000
----------------------------------------------------------------------
.../apache/impala/catalog/FeCatalogUtils.java | 31 +++--------
.../org/apache/impala/catalog/FeFsTable.java | 9 ++--
.../org/apache/impala/catalog/HdfsTable.java | 6 +--
.../impala/catalog/local/LocalFsTable.java | 12 ++---
.../org/apache/impala/planner/HdfsScanNode.java | 30 ++++++-----
.../apache/impala/planner/HdfsTableSink.java | 56 +++++++++++---------
.../PlannerTest/parquet-filtering-disabled.test | 2 +-
.../queries/PlannerTest/parquet-filtering.test | 2 +-
8 files changed, 70 insertions(+), 78 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/impala/blob/672a271f/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java b/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java
index 4f1d68d..2072228 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java
@@ -21,6 +21,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.common.StatsSetupConst;
@@ -45,6 +46,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
/**
* Static utility functions shared between FeCatalog implementations.
@@ -294,32 +296,15 @@ public abstract class FeCatalogUtils {
}
/**
- * Return the most commonly-used file format within a set of partitions.
+ * Return the set of all file formats used in the collection of partitions.
*/
- public static HdfsFileFormat getMajorityFormat(
+ public static Set<HdfsFileFormat> getFileFormats(
Iterable<? extends FeFsPartition> partitions) {
- Map<HdfsFileFormat, Integer> numPartitionsByFormat = Maps.newTreeMap();
- for (FeFsPartition partition: partitions) {
- HdfsFileFormat format = partition.getInputFormatDescriptor().getFileFormat();
- Integer numPartitions = numPartitionsByFormat.get(format);
- if (numPartitions == null) {
- numPartitions = Integer.valueOf(1);
- } else {
- numPartitions = Integer.valueOf(numPartitions.intValue() + 1);
- }
- numPartitionsByFormat.put(format, numPartitions);
- }
-
- int maxNumPartitions = Integer.MIN_VALUE;
- HdfsFileFormat majorityFormat = null;
- for (Map.Entry<HdfsFileFormat, Integer> entry: numPartitionsByFormat.entrySet()) {
- if (entry.getValue().intValue() > maxNumPartitions) {
- majorityFormat = entry.getKey();
- maxNumPartitions = entry.getValue().intValue();
- }
+ Set<HdfsFileFormat> fileFormats = Sets.newHashSet();
+ for (FeFsPartition partition : partitions) {
+ fileFormats.add(partition.getFileFormat());
}
- Preconditions.checkNotNull(majorityFormat);
- return majorityFormat;
+ return fileFormats;
}
public static THdfsPartition fsPartitionToThrift(FeFsPartition part,
http://git-wip-us.apache.org/repos/asf/impala/blob/672a271f/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java b/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
index 86b41f0..891bf62 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
@@ -95,12 +95,11 @@ public interface FeFsTable extends FeTable {
boolean isAvroTable();
/**
- * @return the format that the majority of partitions in this table use
- *
- * TODO(todd): this API needs to be removed, since it depends on loading all
- * partitions even when scanning few.
+ * @return the set of file formats that the partitions in this table use.
+ * This API is only used by the TableSink to write out partitions. It
+ * should not be used for scanning.
*/
- public HdfsFileFormat getMajorityFormat();
+ public Set<HdfsFileFormat> getFileFormats();
/**
* Return true if the table may be written to.
http://git-wip-us.apache.org/repos/asf/impala/blob/672a271f/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 0f30407..d66ddd2 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -1808,14 +1808,14 @@ public class HdfsTable extends Table implements FeFsTable {
public ListMap<TNetworkAddress> getHostIndex() { return hostIndex_; }
/**
- * Returns the file format that the majority of partitions are stored in.
+ * Returns the set of file formats that the partitions are stored in.
*/
- public HdfsFileFormat getMajorityFormat() {
+ public Set<HdfsFileFormat> getFileFormats() {
// In the case that we have no partitions added to the table yet, it's
// important to add the "prototype" partition as a fallback.
Iterable<HdfsPartition> partitionsToConsider = Iterables.concat(
partitionMap_.values(), Collections.singleton(prototypePartition_));
- return FeCatalogUtils.getMajorityFormat(partitionsToConsider);
+ return FeCatalogUtils.getFileFormats(partitionsToConsider);
}
/**
http://git-wip-us.apache.org/repos/asf/impala/blob/672a271f/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
index 82af240..7753faa 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
@@ -178,15 +178,9 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
}
@Override
- public HdfsFileFormat getMajorityFormat() {
- // TODO(todd): can we avoid loading all partitions here? this is called
- // for any INSERT query, even if the partition is specified.
- Collection<? extends FeFsPartition> parts = FeCatalogUtils.loadAllPartitions(this);
- // In the case that we have no partitions added to the table yet, it's
- // important to add the "prototype" partition as a fallback.
- Iterable<FeFsPartition> partitionsToConsider = Iterables.concat(
- parts, Collections.singleton(createPrototypePartition()));
- return FeCatalogUtils.getMajorityFormat(partitionsToConsider);
+ public Set<HdfsFileFormat> getFileFormats() {
+ // Needed by HdfsTableSink.
+ throw new UnsupportedOperationException("TODO: implement me");
}
@Override
http://git-wip-us.apache.org/repos/asf/impala/blob/672a271f/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index 55bf301..151cbe0 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -1366,18 +1366,24 @@ public class HdfsScanNode extends ScanNode {
columnReservations = computeMinColumnMemReservations();
}
- int perHostScanRanges;
- HdfsFileFormat majorityFormat = FeCatalogUtils.getMajorityFormat(partitions_);
- if (majorityFormat == HdfsFileFormat.PARQUET
- || majorityFormat == HdfsFileFormat.ORC) {
- Preconditions.checkNotNull(columnReservations);
- // For the purpose of this estimation, the number of per-host scan ranges for
- // Parquet/ORC files are equal to the number of columns read from the file. I.e.
- // excluding partition columns and columns that are populated from file metadata.
- perHostScanRanges = columnReservations.size();
- } else {
- perHostScanRanges = (int) Math.ceil(
- ((double) scanRangeSize / (double) numNodes_) * SCAN_RANGE_SKEW_FACTOR);
+ int perHostScanRanges = 0;
+ for (HdfsFileFormat format : fileFormats_) {
+ int partitionScanRange = 0;
+ if ((format == HdfsFileFormat.PARQUET) || (format == HdfsFileFormat.ORC)) {
+ Preconditions.checkNotNull(columnReservations);
+ // For the purpose of this estimation, the number of per-host scan ranges for
+ // Parquet/ORC files are equal to the number of columns read from the file. I.e.
+ // excluding partition columns and columns that are populated from file metadata.
+ partitionScanRange = columnReservations.size();
+ } else {
+ partitionScanRange = (int) Math.ceil(
+ ((double) scanRangeSize / (double) numNodes_) * SCAN_RANGE_SKEW_FACTOR);
+ }
+ // From the resource management purview, we want to conservatively estimate memory
+ // consumption based on the partition with the highest memory requirements.
+ if (partitionScanRange > perHostScanRanges) {
+ perHostScanRanges = partitionScanRange;
+ }
}
// The non-MT scan node requires at least one scanner thread.
http://git-wip-us.apache.org/repos/asf/impala/blob/672a271f/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java b/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
index 7426641..48e7c62 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
@@ -18,12 +18,14 @@
package org.apache.impala.planner;
import java.util.List;
+import java.util.Set;
import org.apache.impala.analysis.DescriptorTable;
import org.apache.impala.analysis.Expr;
import org.apache.impala.catalog.FeFsTable;
import org.apache.impala.catalog.FeTable;
import org.apache.impala.catalog.HdfsFileFormat;
+import org.apache.impala.catalog.HdfsTable;
import org.apache.impala.thrift.TDataSink;
import org.apache.impala.thrift.TDataSinkType;
import org.apache.impala.thrift.TExplainLevel;
@@ -31,8 +33,12 @@ import org.apache.impala.thrift.THdfsTableSink;
import org.apache.impala.thrift.TQueryOptions;
import org.apache.impala.thrift.TTableSink;
import org.apache.impala.thrift.TTableSinkType;
+
+import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
/**
* Sink for inserting into filesystem-backed tables.
@@ -53,6 +59,10 @@ public class HdfsTableSink extends TableSink {
// be opened, written, and closed one by one.
protected final boolean inputIsClustered_;
+ private static final Set<HdfsFileFormat> SUPPORTED_FILE_FORMATS = ImmutableSet.of(
+ HdfsFileFormat.PARQUET, HdfsFileFormat.TEXT, HdfsFileFormat.LZO_TEXT,
+ HdfsFileFormat.RC_FILE, HdfsFileFormat.SEQUENCE_FILE, HdfsFileFormat.AVRO);
+
// Stores the indices into the list of non-clustering columns of the target table that
// are stored in the 'sort.columns' table property. This is sent to the backend to
// populate the RowGroup::sorting_columns list in parquet files.
@@ -70,9 +80,6 @@ public class HdfsTableSink extends TableSink {
@Override
public void computeResourceProfile(TQueryOptions queryOptions) {
- FeFsTable table = (FeFsTable) targetTable_;
- // TODO: Estimate the memory requirements more accurately by partition type.
- HdfsFileFormat format = table.getMajorityFormat();
PlanNode inputNode = fragment_.getPlanRoot();
int numInstances = fragment_.getNumInstances(queryOptions.getMt_dop());
// Compute the per-instance number of partitions, taking the number of nodes
@@ -82,7 +89,11 @@ public class HdfsTableSink extends TableSink {
if (numPartitionsPerInstance == -1) {
numPartitionsPerInstance = DEFAULT_NUM_PARTITIONS;
}
- long perPartitionMemReq = getPerPartitionMemReq(format);
+
+ HdfsTable table = (HdfsTable) targetTable_;
+ // TODO: Estimate the memory requirements more accurately by partition type.
+ Set<HdfsFileFormat> formats = table.getFileFormats();
+ long perPartitionMemReq = getPerPartitionMemReq(formats);
long perInstanceMemEstimate;
// The estimate is based purely on the per-partition mem req if the input cardinality_
@@ -105,28 +116,25 @@ public class HdfsTableSink extends TableSink {
/**
* Returns the per-partition memory requirement for inserting into the given
- * file format.
+ * set of file formats.
*/
- private long getPerPartitionMemReq(HdfsFileFormat format) {
- switch (format) {
- case PARQUET:
- // Writing to a Parquet table requires up to 1GB of buffer per partition.
- // TODO: The per-partition memory requirement is configurable in the QueryOptions.
- return 1024L * 1024L * 1024L;
- case TEXT:
- case LZO_TEXT:
- // Very approximate estimate of amount of data buffered.
- return 100L * 1024L;
- case RC_FILE:
- case SEQUENCE_FILE:
- case AVRO:
- // Very approximate estimate of amount of data buffered.
- return 100L * 1024L;
- default:
- Preconditions.checkState(false, "Unsupported TableSink format " +
- format.toString());
+ private long getPerPartitionMemReq(Set<HdfsFileFormat> formats) {
+ Set<HdfsFileFormat> unsupportedFormats =
+ Sets.difference(formats, SUPPORTED_FILE_FORMATS);
+ if (!unsupportedFormats.isEmpty()) {
+ Preconditions.checkState(false,
+ "Unsupported TableSink format(s): " + Joiner.on(',').join(unsupportedFormats));
}
- return 0;
+ if (formats.contains(HdfsFileFormat.PARQUET)) {
+ // Writing to a Parquet partition requires up to 1GB of buffer. From a resource
+ // management purview, even if there are non-Parquet partitions, we want to be
+ // conservative and make a high memory estimate.
+ return 1024L * 1024L * 1024L;
+ }
+
+ // For all other supported formats (TEXT, LZO_TEXT, RC_FILE, SEQUENCE_FILE & AVRO)
+ // 100KB is a very approximate estimate of the amount of data buffered.
+ return 100L * 1024L;
}
@Override
http://git-wip-us.apache.org/repos/asf/impala/blob/672a271f/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering-disabled.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering-disabled.test b/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering-disabled.test
index 4cccd06..afdb8f2 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering-disabled.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering-disabled.test
@@ -306,6 +306,6 @@ PLAN-ROOT SINK
partitions: 0/4 rows=unavailable
columns missing stats: id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col
extrapolated-rows=disabled max-scan-range-rows=unavailable
- mem-estimate=32.00MB mem-reservation=88.00KB thread-reservation=1
+ mem-estimate=128.00MB mem-reservation=88.00KB thread-reservation=1
tuple-ids=0 row-size=80B cardinality=unavailable
====
http://git-wip-us.apache.org/repos/asf/impala/blob/672a271f/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test b/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test
index 8df8716..64bf5f2 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test
@@ -528,7 +528,7 @@ PLAN-ROOT SINK
extrapolated-rows=disabled max-scan-range-rows=unavailable
parquet statistics predicates: bigint_col < 5000, double_col > 100.00, float_col > 50.00, id = 1, tinyint_col < 50, string_col IN ('aaaa', 'bbbb', 'cccc'), smallint_col IN (1, 2, 3, 4, 5), date_string_col > '1993-10-01'
parquet dictionary predicates: bool_col, bigint_col < 5000, double_col > 100.00, float_col > 50.00, id = 1, tinyint_col < 50, string_col IN ('aaaa', 'bbbb', 'cccc'), smallint_col IN (1, 2, 3, 4, 5), mod(int_col, 2) = 1, timestamp_cmp(timestamp_col, TIMESTAMP '2016-11-20 00:00:00') = 1, date_string_col > '1993-10-01'
- mem-estimate=32.00MB mem-reservation=88.00KB thread-reservation=1
+ mem-estimate=128.00MB mem-reservation=88.00KB thread-reservation=1
tuple-ids=0 row-size=80B cardinality=unavailable
====
# Test a variety of predicates on a mixed format table.
[4/6] impala git commit: IMPALA-6490: Reconnect shell when remote
restarts
Posted by ta...@apache.org.
IMPALA-6490: Reconnect shell when remote restarts
If the remote impalad died while the shell waited for a
command to complete, the shell disconnected. Previously
after restarting the remote impalad, we needed to run
"connect;" to reconnect, now the shell will automatically
reconnect.
Testing:
Added test_auto_connect_after_impalad_died in
test_shell_interactive_reconnect.py
Change-Id: Ia13365a9696886f01294e98054cf4e7cd66ab712
Reviewed-on: http://gerrit.cloudera.org:8080/10992
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/72db58ac
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/72db58ac
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/72db58ac
Branch: refs/heads/master
Commit: 72db58acd054785fa6f57ef775e13e07ea0920a6
Parents: 672a271
Author: Nghia Le <mi...@gmail.com>
Authored: Wed Jul 18 18:22:08 2018 +0200
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Tue Jul 31 21:50:33 2018 +0000
----------------------------------------------------------------------
shell/impala_client.py | 14 +++++---
shell/impala_shell.py | 22 ++++++------
.../test_shell_interactive_reconnect.py | 37 ++++++++++++++++++--
3 files changed, 56 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/impala/blob/72db58ac/shell/impala_client.py
----------------------------------------------------------------------
diff --git a/shell/impala_client.py b/shell/impala_client.py
index 69c7699..2f2a5e9 100755
--- a/shell/impala_client.py
+++ b/shell/impala_client.py
@@ -30,7 +30,7 @@ from thrift.protocol import TBinaryProtocol
from thrift_sasl import TSaslClientTransport
from thrift.transport.TSocket import TSocket
from thrift.transport.TTransport import TBufferedTransport, TTransportException
-from thrift.Thrift import TApplicationException
+from thrift.Thrift import TApplicationException, TException
class RpcStatus:
"""Convenience enum to describe Rpc return statuses"""
@@ -229,11 +229,15 @@ class ImpalaClient(object):
output += first_child_output
return idx
- def test_connection(self):
- """Checks to see if the current Impala connection is still alive. If not, an exception
- will be raised."""
+ def is_connected(self):
+ """Returns True if the current Impala connection is alive and False otherwise."""
if self.connected:
- self.imp_service.PingImpalaService()
+ try:
+ return self.imp_service.PingImpalaService()
+ except TException:
+ return False
+ else:
+ return False
def connect(self):
"""Creates a connection to an Impalad instance
http://git-wip-us.apache.org/repos/asf/impala/blob/72db58ac/shell/impala_shell.py
----------------------------------------------------------------------
diff --git a/shell/impala_shell.py b/shell/impala_shell.py
index aab478d..4348a4e 100755
--- a/shell/impala_shell.py
+++ b/shell/impala_shell.py
@@ -43,7 +43,6 @@ from option_parser import get_option_parser, get_config_from_file
from shell_output import DelimitedOutputFormatter, OutputStream, PrettyOutputFormatter
from shell_output import OverwritingStdErrOutputStream
from subprocess import call
-from thrift.Thrift import TException
VERSION_FORMAT = "Impala Shell v%(version)s (%(git_hash)s) built on %(build_date)s"
VERSION_STRING = "build version not available"
@@ -231,6 +230,8 @@ class ImpalaShell(object, cmd.Cmd):
if options.impalad is not None:
self.do_connect(options.impalad)
+ # Check if the database in shell option exists
+ self._validate_database(immediately=True)
# We handle Ctrl-C ourselves, using an Event object to signal cancellation
# requests between the handler and the main shell thread.
@@ -568,6 +569,10 @@ class ImpalaShell(object, cmd.Cmd):
else:
return query
+ def set_prompt(self, db):
+ self.prompt = ImpalaShell.PROMPT_FORMAT.format(
+ host=self.impalad[0], port=self.impalad[1], db=db)
+
def precmd(self, args):
args = self.sanitise_input(args)
if not args: return args
@@ -581,9 +586,7 @@ class ImpalaShell(object, cmd.Cmd):
# If cmdqueue is populated, then commands are executed from the cmdqueue, and user
# input is ignored. Send an empty string as the user input just to be safe.
return str()
- try:
- self.imp_client.test_connection()
- except TException:
+ if not self.imp_client.is_connected():
print_to_stderr("Connection lost, reconnecting...")
self._connect()
self._validate_database(immediately=True)
@@ -812,8 +815,7 @@ class ImpalaShell(object, cmd.Cmd):
if self.imp_client.connected:
self._print_if_verbose('Connected to %s:%s' % self.impalad)
self._print_if_verbose('Server version: %s' % self.server_version)
- self.prompt = ImpalaShell.PROMPT_FORMAT.format(
- host=self.impalad[0], port=self.impalad[1], db=ImpalaShell.DEFAULT_DB)
+ self.set_prompt(ImpalaShell.DEFAULT_DB)
self._validate_database()
try:
self.imp_client.build_default_query_options_dict()
@@ -883,10 +885,12 @@ class ImpalaShell(object, cmd.Cmd):
If immediately is False, it appends the USE command to self.cmdqueue.
If immediately is True, it executes the USE command right away.
"""
+ if not self.imp_client.connected:
+ return
+ # Should only check if successfully connected.
if self.current_db:
self.current_db = self.current_db.strip('`')
use_current_db = ('use `%s`' % self.current_db)
-
if immediately:
self.onecmd(use_current_db)
else:
@@ -1185,9 +1189,7 @@ class ImpalaShell(object, cmd.Cmd):
query = self._create_beeswax_query(args)
if self._execute_stmt(query) is CmdStatus.SUCCESS:
self.current_db = args.strip('`').strip()
- self.prompt = ImpalaShell.PROMPT_FORMAT.format(host=self.impalad[0],
- port=self.impalad[1],
- db=self.current_db)
+ self.set_prompt(self.current_db)
elif args.strip('`') == self.current_db:
# args == current_db means -d option was passed but the "use [db]" operation failed.
# We need to set the current_db to None so that it does not show a database, which
http://git-wip-us.apache.org/repos/asf/impala/blob/72db58ac/tests/custom_cluster/test_shell_interactive_reconnect.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_shell_interactive_reconnect.py b/tests/custom_cluster/test_shell_interactive_reconnect.py
index 1f82468..c747139 100644
--- a/tests/custom_cluster/test_shell_interactive_reconnect.py
+++ b/tests/custom_cluster/test_shell_interactive_reconnect.py
@@ -18,11 +18,18 @@
import pytest
import tempfile
import socket
+import pexpect
+import os
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
from tests.common.impala_service import ImpaladService
from tests.common.skip import SkipIfBuildType
from tests.shell.util import ImpalaShell, move_shell_history, restore_shell_history
+# Follow tests/shell/test_shell_interactive.py naming.
+from shell.impala_shell import ImpalaShell as ImpalaShellClass
+
+SHELL_CMD = "%s/bin/impala-shell.sh" % os.environ['IMPALA_HOME']
+NUM_QUERIES = 'impala-server.num-queries'
class TestShellInteractiveReconnect(CustomClusterTestSuite):
""" Check if interactive shell is using the current DB after reconnecting """
@@ -54,8 +61,6 @@ class TestShellInteractiveReconnect(CustomClusterTestSuite):
@pytest.mark.execute_serially
def test_auto_reconnect(self):
- NUM_QUERIES = 'impala-server.num-queries'
-
impalad = ImpaladService(socket.getfqdn())
start_num_queries = impalad.get_metric_value(NUM_QUERIES)
@@ -72,3 +77,31 @@ class TestShellInteractiveReconnect(CustomClusterTestSuite):
result = p.get_result()
assert "alltypesaggmultifilesnopart" in result.stdout
+ @pytest.mark.execute_serially
+ def test_auto_reconnect_after_impalad_died(self):
+ """Test reconnect after restarting the remote impalad without using connect;"""
+ # Use pexpect instead of ImpalaShell() since after using get_result() in ImpalaShell()
+ # to check Disconnect, send_cmd() will no longer have any effect so we can not check
+ # reconnect.
+ impalad = ImpaladService(socket.getfqdn())
+ start_num_queries = impalad.get_metric_value(NUM_QUERIES)
+
+ proc = pexpect.spawn(' '.join([SHELL_CMD, "-i localhost:21000"]))
+ proc.expect("21000] default>")
+ proc.sendline("use tpch;")
+
+ # wait for the USE command to finish
+ impalad.wait_for_metric_value(NUM_QUERIES, start_num_queries + 1)
+ impalad.wait_for_num_in_flight_queries(0)
+
+ # Disconnect
+ self.cluster.impalads[0].kill()
+ proc.sendline("show tables;")
+ # Search from [1:] since the square brackets "[]" are special characters in regex
+ proc.expect(ImpalaShellClass.DISCONNECTED_PROMPT[1:])
+ # Restarting Impalad
+ self.cluster.impalads[0].start()
+ # Check reconnect
+ proc.sendline("show tables;")
+ proc.expect("nation")
+ proc.expect("21000] tpch>")