You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/08/01 17:24:26 UTC

[1/6] impala git commit: IMPALA-7195 IMPALA-7222: [DOCS] Impala delegation with groups

Repository: impala
Updated Branches:
  refs/heads/master 316b17ac5 -> 55083f38c


IMPALA-7195 IMPALA-7222: [DOCS] Impala delegation with groups

Added clarifications about delegation.

Change-Id: I6948ab2ef9b82b123f7a1f4fdc83cfb06e9f912f
Reviewed-on: http://gerrit.cloudera.org:8080/11068
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-by: Fredy Wijaya <fw...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/b9511109
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/b9511109
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/b9511109

Branch: refs/heads/master
Commit: b95111094b70a060e145e61013559ba7f2e1799a
Parents: 316b17a
Author: Alex Rodoni <ar...@cloudera.com>
Authored: Thu Jul 26 16:45:43 2018 -0700
Committer: Alex Rodoni <ar...@cloudera.com>
Committed: Tue Jul 31 18:06:22 2018 +0000

----------------------------------------------------------------------
 docs/topics/impala_delegation.xml | 265 +++++++++++++++++++++++++++------
 1 file changed, 221 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/b9511109/docs/topics/impala_delegation.xml
----------------------------------------------------------------------
diff --git a/docs/topics/impala_delegation.xml b/docs/topics/impala_delegation.xml
index c524bf5..bd29426 100644
--- a/docs/topics/impala_delegation.xml
+++ b/docs/topics/impala_delegation.xml
@@ -36,58 +36,235 @@ under the License.
   </prolog>
 
   <conbody>
+
     <p>
-      <!--
-      When users connect to Impala directly through the <cmdname>impala-shell</cmdname> interpreter, the Sentry
-      authorization framework determines what actions they can take and what data they can see.
--->
-      When users submit Impala queries through a separate application, such as
-      Hue or a business intelligence tool, typically all requests are treated as
-      coming from the same user. In Impala 1.2 and higher,Impala supports
-      applications to pass along credentials for the users that connect to them,
-      known as <q>delegation</q>, and to issue Impala queries with the
-      privileges for those users. Currently, the delegation feature is available
-      only for Impala queries submitted through application interfaces such as
-      Hue and BI tools. For example, Impala cannot issue queries using the
-      privileges of the HDFS user. </p>
-    <note type="attention">Impala requires Apache Sentry on the cluster to
-      enable delegation. Without Apache Sentry installed, the delegation feature
-      will fail with the following error: User <i>user1</i> is not authorized to
-      delegate to <i>user2</i> User delegation is disabled.</note>
-    <p> The delegation feature is enabled by a startup option for
-        <cmdname>impalad</cmdname>:
-        <codeph>--authorized_proxy_user_config</codeph>. When you specify this
-      option, users whose names you specify (such as <codeph>hue</codeph>) can
-      delegate the execution of a query to another user. The query runs with the
-      privileges of the delegated user, not the original user such as
-        <codeph>hue</codeph>. The name of the delegated user is passed using the
-      HiveServer2 configuration property <codeph>impala.doas.user</codeph>. </p>
-    <p> You can specify a list of users that the application user can delegate
-      to, or <codeph>*</codeph> to allow a superuser to delegate to any other
-      user. For example: </p>
-    <codeblock>impalad --authorized_proxy_user_config 'hue=user1,user2;admin=*' ...</codeblock>
-    <note> Make sure to use single quotes or escape characters to ensure that
-      any <codeph>*</codeph> characters do not undergo wildcard expansion when
-      specified in command-line arguments. </note>
-    <p> See <xref href="impala_config_options.xml#config_options"/> for details
-      about adding or changing <cmdname>impalad</cmdname> startup options. See
-        <xref
+      When users submit Impala queries through a separate application, such as Hue or a business
+      intelligence tool, typically all requests are treated as coming from the same user. In
+      Impala 1.2 and higher, Impala supports <q>delegation</q> where users whose names you
+      specify can delegate the execution of a query to another user. The query runs with the
+      privileges of the delegated user, not the original authenticated user.
+    </p>
+
+    <p>
+      Starting in <keyword keyref="impala31_full">Impala 3.1</keyword> and higher, you can
+      delegate using groups. Instead of listing a large number of delegated users, you can
+      create a group of those users and specify the delegated group name in the Impalad startup
+      option. The client sends the delegated user name, and Impala performs an authorization to
+      see if the delegated user belongs to a delegated group.
+    </p>
+
+    <p>
+      The name of the delegated user is passed using the HiveServer2 protocol configuration
+      property <codeph>impala.doas.user</codeph> when the client connects to Impala.
+    </p>
+
+    <p>
+      Currently, the delegation feature is available only for Impala queries submitted through
+      application interfaces such as Hue and BI tools. For example, Impala cannot issue queries
+      using the privileges of the HDFS user.
+    </p>
+
+    <note type="attention">
+      <ul>
+        <li>
+          When the delegation is enabled in Impala, the Impala clients should take an extra
+          caution to prevent unauthorized access for the delegate-able users.
+        </li>
+
+        <li>
+          Impala requires Apache Sentry on the cluster to enable delegation. Without Apache
+          Sentry installed, the delegation feature will fail with the following error: User
+          <i>user1</i> is not authorized to delegate to <i>user2</i>. User/group delegation is
+          disabled.
+        </li>
+      </ul>
+    </note>
+
+    <p>
+      The delegation feature is enabled by the startup options for <cmdname>impalad</cmdname>:
+      <codeph>--authorized_proxy_user_config</codeph> and
+      <codeph>--authorized_proxy_group_config</codeph>.
+    </p>
+
+    <p>
+      The syntax for the options are:
+    </p>
+
+<codeblock>--authorized_proxy_user_config=<varname>authenticated_user1</varname>=<varname>delegated_user1</varname>,<varname>delegated_user2</varname>,...;<varname>authenticated_user2</varname>=...</codeblock>
+
+<codeblock>--authorized_proxy_group_config=<varname>authenticated_user1</varname>=<varname>delegated_group1</varname>,<varname>delegated_group2</varname>,...;<varname>authenticated_user2</varname>=...</codeblock>
+
+    <ul>
+      <li>
+        The list of authorized users/groups are delimited with <codeph>;</codeph>
+      </li>
+
+      <li>
+        The list of delegated users/groups are delimited with <codeph>,</codeph> by default.
+      </li>
+
+      <li>
+        Use the <codeph>--authorized_proxy_user_config_delimiter</codeph> startup option to
+        override the default user delimiter (the comma character) to another character.
+      </li>
+
+      <li>
+        Use the <codeph>--authorized_proxy_group_config_delimiter</codeph> startup option to
+        override the default group delimiter ( (the comma character) to another character.
+      </li>
+
+      <li>
+        Wildcard (<codeph>*</codeph>) is supported to delegated to any users or any groups, e.g.
+        <codeph>--authorized_proxy_group_config=hue=*</codeph>. Make sure to use single quotes
+        or escape characters to ensure that any <codeph>*</codeph> characters do not undergo
+        wildcard expansion when specified in command-line arguments.
+      </li>
+    </ul>
+
+    <p>
+      When you start Impala with the
+      <codeph>--authorized_proxy_user_config=<varname>authenticated_user</varname>=<varname>delegated_user</varname></codeph>
+      or
+      <codeph>--authorized_proxy_group_config=<varname>authenticated_user</varname>=<varname>delegated_group</varname></codeph>
+      option:
+    </p>
+
+    <ul>
+      <li>
+        Authentication is based on the user on the left hand side
+        (<varname>authenticated_user</varname>).
+      </li>
+
+      <li>
+        Authorization is based on the right hand side user(s) or group(s)
+        (<varname>delegated_user</varname>, <varname>delegated_group</varname>).
+      </li>
+
+      <li>
+        When opening a client connection, the client must provide a delegated username via the
+        HiveServer2 protocol property,<codeph>impala.doas.user</codeph> or
+        <codeph>DelegationUID</codeph>.
+      </li>
+
+      <li>
+        It is not necessary for <varname>authenticated_user</varname> to have the permission to
+        access/edit files.
+      </li>
+
+      <li>
+        It is not necessary for the delegated users to have access to the service via Kerberos.
+      </li>
+
+      <li>
+        <varname>delegated_user</varname> and <varname>delegated_group</varname> must exist in
+        the OS.
+      </li>
+
+      <li>
+        For group delegation, use the JNI-based mapping providers for group delegation, such as
+        JniBasedUnixGroupsMappingWithFallback and JniBasedUnixGroupsNetgroupMappingWithFallback.
+      </li>
+
+      <li>
+        ShellBasedUnixGroupsNetgroupMapping and ShellBasedUnixGroupsMapping Hadoop group mapping
+        providers are not supported in Impala group delegation.
+      </li>
+
+      <li>
+        In Impala, <codeph>user()</codeph> returns <varname>authenticated_user</varname> and
+        <codeph>effective_user()</codeph> returns the delegated user that the client specified.
+      </li>
+    </ul>
+
+    <p>
+      The user or group delegation process works as follows:
+      <ol>
+        <li>
+          The Impalad daemon starts with one of the following options:
+          <ul>
+            <li>
+              <codeph>--authorized_proxy_user_config=<varname>authenticated_user</varname>=<varname>delegated_user</varname></codeph>
+            </li>
+
+            <li>
+              <codeph>--authorized_proxy_group_config=<varname>authenticated_user</varname>=<varname>delegated_group</varname></codeph>
+            </li>
+          </ul>
+        </li>
+
+        <li>
+          A client connects to Impala via the HiveServer2 protocol with the
+          <codeph>impala.doas.user</codeph> configuration property, e.g. connected user is
+          <varname>authenticated_user</varname> with
+          <codeph>impala.doas.user=<varname>delegated_user</varname></codeph>.
+        </li>
+
+        <li>
+          The client user <varname>authenticated_user</varname> sends a request to Impala as the
+          delegated user <varname>delegated_user</varname>.
+        </li>
+
+        <li>
+          Impala checks authorization:
+          <ul>
+            <li>
+              In user delegation, Impala checks if <varname>delegated_user</varname> is in the
+              list of authorized delegate users for the user
+              <varname>authenticated_user</varname>.
+            </li>
+
+            <li>
+              In group delegate, Impala checks if <varname>delegated_user</varname> belongs to
+              one of the delegated groups for the user <varname>authenticated_user</varname>,
+              <varname>delegated_group</varname> in this example.
+            </li>
+          </ul>
+        </li>
+
+        <li>
+          If the user is an authorized delegated user for <varname>authenticated_user</varname>,
+          the request is executed as the delegate user <varname>delegated_user</varname>.
+        </li>
+      </ol>
+    </p>
+
+    <p>
+      See <xref href="impala_config_options.xml#config_options"/> for details about adding or
+      changing <cmdname>impalad</cmdname> startup options.
+    </p>
+
+    <p>
+      See
+      <xref
         keyref="how-hiveserver2-brings-security-and-concurrency-to-apache-hive"
-        >this blog post</xref> for background information about the delegation
-      capability in HiveServer2. </p>
-    <p> To set up authentication for the delegated users: </p>
+        >this
+      blog post</xref> for background information about the delegation capability in
+      HiveServer2.
+    </p>
+
+    <p>
+      To set up authentication for the delegated users:
+    </p>
+
     <ul>
       <li>
-        <p> On the server side, configure either user/password authentication
-          through LDAP, or Kerberos authentication, for all the delegated users.
-          See <xref href="impala_ldap.xml#ldap"/> or <xref
-            href="impala_kerberos.xml#kerberos"/> for details. </p>
+        <p>
+          On the server side, configure either user/password authentication through LDAP, or
+          Kerberos authentication, for all the delegated users. See
+          <xref href="impala_ldap.xml#ldap"/> or
+          <xref
+            href="impala_kerberos.xml#kerberos"/> for details.
+        </p>
       </li>
+
       <li>
-        <p> On the client side, to learn how to enable delegation, consult the
-          documentation for the ODBC driver you are using. </p>
+        <p>
+          On the client side, to learn how to enable delegation, consult the documentation for
+          the ODBC driver you are using.
+        </p>
       </li>
     </ul>
+
   </conbody>
 
 </concept>


[6/6] impala git commit: IMPALA-7317: comment on long lines and whitespace

Posted by ta...@apache.org.
IMPALA-7317: comment on long lines and whitespace

Add some basic formatting checks that can be implemented
easily by parsing the diff line-by-line. These are applied
to Java, Python, C++, shell and thrift source files with
some exclusions.

Adds a --dryrun option that does not try to post back the
comments to gerrit.

Remove the option to specify a git revision since flake8-diff doesn't
work correctly when run on source that isn't checked out.

Testing:
Added some violations to this code change that will be
fixed before merging. The output is:

  {
   "comments": {
    "bin/jenkins/critique-gerrit-review.py": [
     {
      "message": "flake8: E501 line too long (107 > 90 characters)",
      "range": {
       "start_character": 90,
       "start_line": 124,
       "end_line": 124,
       "end_character": 91
      }
     },
     {
      "message": "tab used for whitespace",
      "line": 125
     }
    ]
   }
  }

Change-Id: I36bb99560ab09d7753ff93227d1c4972582770f2
Reviewed-on: http://gerrit.cloudera.org:8080/11085
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Tim Armstrong <ta...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/55083f38
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/55083f38
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/55083f38

Branch: refs/heads/master
Commit: 55083f38cce74347a3a528dd59c402f5cfd4441a
Parents: abf6f8f
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Mon Jul 30 16:20:11 2018 -0700
Committer: Tim Armstrong <ta...@cloudera.com>
Committed: Wed Aug 1 17:23:10 2018 +0000

----------------------------------------------------------------------
 bin/jenkins/critique-gerrit-review.py | 96 ++++++++++++++++++++++++++++--
 1 file changed, 91 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/55083f38/bin/jenkins/critique-gerrit-review.py
----------------------------------------------------------------------
diff --git a/bin/jenkins/critique-gerrit-review.py b/bin/jenkins/critique-gerrit-review.py
index 3311793..409fef4 100755
--- a/bin/jenkins/critique-gerrit-review.py
+++ b/bin/jenkins/critique-gerrit-review.py
@@ -16,7 +16,7 @@
 # specific language governing permissions and limitations
 # under the License.
 #
-# Usage: critique-gerrit-review.py <git commit>
+# Usage: critique-gerrit-review.py [--dryrun]
 #
 # This script is meant to run on an jenkins.impala.io build slave and post back comments
 # to a code review. It does not need to run on all supported platforms so we use system
@@ -34,16 +34,16 @@
 # ssh, pip, virtualenv
 #
 # TODO: generalise to other warnings
-# * Lines too long and trailing whitespace
 # * clang-tidy
 
+from argparse import ArgumentParser
 from collections import defaultdict
 import json
 import os
 from os import environ
 import os.path
 import re
-from subprocess import check_call, Popen, PIPE
+from subprocess import check_call, check_output, Popen, PIPE
 import sys
 import virtualenv
 
@@ -55,6 +55,19 @@ VENV_BIN = os.path.join(VENV_PATH, "bin")
 PIP_PATH = os.path.join(VENV_BIN, "pip")
 FLAKE8_DIFF_PATH = os.path.join(VENV_BIN, "flake8-diff")
 
+# Limit on length of lines in source files.
+LINE_LIMIT = 90
+
+# Source file extensions that we should apply our line limit and whitespace rules to.
+SOURCE_EXTENSIONS = set([".cc", ".h", ".java", ".py", ".sh", ".thrift"])
+
+# Source file patterns that we exclude from our line limit and whitespace rules.
+EXCLUDE_FILE_PATTERNS = [
+    re.compile(r".*be/src/kudu.*"),  # Kudu source code may have different rules.
+    re.compile(r".*-benchmark.cc"),  # Benchmark files tend to have long lines.
+    re.compile(r".*/function-registry/impala_functions.py")  # Many long strings.
+]
+
 
 def setup_virtualenv():
   """Set up virtualenv with flake8-diff."""
@@ -108,6 +121,63 @@ def get_flake8_comments(revision):
   return comments
 
 
+def get_misc_comments(revision):
+  """Get miscellaneous warnings for code changes made in the git commit 'revision', e.g.
+  long lines and trailing whitespace. These warnings are produced by directly parsing the
+  diff output."""
+  comments = defaultdict(lambda: [])
+  # Matches range information like:
+  #  @@ -128 +133,2 @@ if __name__ == "__main__":
+  RANGE_RE = re.compile(r"^@@ -[0-9,]* \+([0-9]*).*$")
+
+  diff = check_output(["git", "diff", "-U0", "{0}^..{0}".format(revision)])
+  curr_file = None
+  check_source_file = False
+  curr_line_num = 0
+  for diff_line in diff.splitlines():
+    if diff_line.startswith("+++ "):
+      # Start of diff for a file. Strip off "+++ b/" to get the file path.
+      curr_file = diff_line[6:]
+      check_source_file = os.path.splitext(curr_file)[1] in SOURCE_EXTENSIONS
+      if check_source_file:
+        for pattern in EXCLUDE_FILE_PATTERNS:
+          if pattern.match(curr_file):
+            check_source_file = False
+            break
+    elif diff_line.startswith("@@ "):
+      # Figure out the starting line of the hunk. Format of unified diff is:
+      #  @@ -128 +133,2 @@ if __name__ == "__main__":
+      # We want to extract the start line for the added lines
+      match = RANGE_RE.match(diff_line)
+      if not match:
+        raise Exception("Pattern did not match diff line:\n{0}".format(diff_line))
+      curr_line_num = int(match.group(1))
+    elif diff_line.startswith("+") and check_source_file:
+      # An added or modified line - check it to see if we should generate warnings.
+      add_misc_comments_for_line(comments, diff_line[1:], curr_file, curr_line_num)
+      curr_line_num += 1
+  return comments
+
+
+def add_misc_comments_for_line(comments, line, curr_file, curr_line_num):
+  """Helper for get_misc_comments to generate comments for 'line' at 'curr_line_num' in
+  'curr_file' and append them to 'comments'."""
+  # Check for trailing whitespace.
+  if line.rstrip() != line:
+    comments[curr_file].append(
+        {"message": "line has trailing whitespace", "line": curr_line_num})
+
+  # Check for long lines. Skip .py files since flake8 already flags long lines.
+  if len(line) > 90 and os.path.splitext(curr_file)[1] != ".py":
+    msg = "line too long ({0} > {1})".format(len(line), LINE_LIMIT)
+    comments[curr_file].append(
+        {"message": msg, "line": curr_line_num})
+
+  if '\t' in line:
+    comments[curr_file].append(
+        {"message": "tab used for whitespace", "line": curr_line_num})
+
+
 def post_review_to_gerrit(review_input):
   """Post a review to the gerrit patchset. 'review_input' is a ReviewInput JSON object
   containing the review comments. The gerrit change and patchset are picked up from
@@ -123,8 +193,24 @@ def post_review_to_gerrit(review_input):
     raise Exception("Error posting review to gerrit.")
 
 
+def merge_comments(a, b):
+  for k, v in b.iteritems():
+    a[k].extend(v)
+
+
 if __name__ == "__main__":
+  parser = ArgumentParser(description="Generate and post gerrit comments")
+  parser.add_argument("--dryrun", action='store_true',
+                      help="Don't post comments back to gerrit")
+  args = parser.parse_args()
+
   setup_virtualenv()
-  review_input = {"comments": get_flake8_comments(sys.argv[1])}
+  # flake8-diff only actually works correctly on HEAD, so this is the only revision
+  # we can correctly handle.
+  revision = 'HEAD'
+  comments = get_flake8_comments(revision)
+  merge_comments(comments, get_misc_comments(revision))
+  review_input = {"comments": comments}
   print json.dumps(review_input, indent=True)
-  post_review_to_gerrit(review_input)
+  if not args.dryrun:
+    post_review_to_gerrit(review_input)


[5/6] impala git commit: Fix TestKuduOperations tests in test-with-docker by using consistent hostname.

Posted by ta...@apache.org.
Fix TestKuduOperations tests in test-with-docker by using consistent hostname.

TestKuduOperations, when run using test-with-docker, failed with errors
like:

  Remote error: Service unavailable: Timed out: could not wait for desired
  snapshot timestamp to be consistent: Tablet is lagging too much to be able to
  serve snapshot scan. Lagging by: 1985348 ms, (max is 30000 ms):

The underlying issue, as discovered by Thomas Tauber-Marshall, is that Kudu
serializes the hostnames of Kudu tablet servers, and, different containers in
test-with-docker use different hostnames.  This was exposed after "IMPALA-6812:
Fix flaky Kudu scan tests" switched to using READ_AT_SNAPSHOT for Kudu reads.

Using the same hostname for all the containers is easy and harmless;
this change does just that.

Change-Id: Iea8c5096b515a79601be2e919d32585fb2796b3d
Reviewed-on: http://gerrit.cloudera.org:8080/11082
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/abf6f8f4
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/abf6f8f4
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/abf6f8f4

Branch: refs/heads/master
Commit: abf6f8f465e6b99fdd0a97e12bbde143ef98f3db
Parents: 72db58a
Author: Philip Zeyliger <ph...@cloudera.com>
Authored: Mon Jul 30 13:15:54 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Wed Aug 1 01:25:38 2018 +0000

----------------------------------------------------------------------
 docker/test-with-docker.py | 9 ++++++++-
 1 file changed, 8 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/abf6f8f4/docker/test-with-docker.py
----------------------------------------------------------------------
diff --git a/docker/test-with-docker.py b/docker/test-with-docker.py
index 1922c57..3da700f 100755
--- a/docker/test-with-docker.py
+++ b/docker/test-with-docker.py
@@ -505,7 +505,14 @@ class TestWithDocker(object):
         # requirement may be lifted in newer Docker versions.
         "--privileged",
         "--name", name,
-        "--hostname", name,
+        # Whereas the container names vary across containers, we use the same
+        # hostname repeatedly, so that the build container and the test
+        # containers have the same hostnames. Kudu errors out with "Remote
+        # error: Service unavailable: Timed out: could not wait for desired
+        # snapshot timestamp to be consistent: Tablet is lagging too much to be
+        # able to serve snapshot scan." if reading with READ_AT_SNAPSHOT
+        # if the hostnames change underneath it.
+        "--hostname", self.name,
         # Label with the git root directory for easier cleanup
         "--label=pwd=" + self.git_root,
         # Consistent locales


[2/6] impala git commit: IMPALA-1624: Allow toggling and unsetting some command-line options inside impala-shell

Posted by ta...@apache.org.
IMPALA-1624: Allow toggling and unsetting some command-line options inside impala-shell

This change provides a way to modify command-line options like -B,
--output_file and --delimiter inside impala-shell without quitting
the shell and then restarting again.
Also fixed IMPALA-7286: command "unset" does not work for shell options

Testing:
Added tests for all new options in test_shell_interactive.py
Tested on Python 2.6 and Python 2.7

Change-Id: Id8d4487c24f24806223bfd5c54336914e3afd763
Reviewed-on: http://gerrit.cloudera.org:8080/10900
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/de4bdb0b
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/de4bdb0b
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/de4bdb0b

Branch: refs/heads/master
Commit: de4bdb0bbfd9a8ef17e020cc4904e3a66d2ac298
Parents: b951110
Author: nghia le <mi...@gmail.com>
Authored: Tue Jul 10 16:19:19 2018 +0200
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Tue Jul 31 20:35:27 2018 +0000

----------------------------------------------------------------------
 shell/impala_shell.py                 | 25 +++++++++++---
 tests/shell/test_shell_interactive.py | 54 ++++++++++++++++++++++++++++++
 2 files changed, 75 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/de4bdb0b/shell/impala_shell.py
----------------------------------------------------------------------
diff --git a/shell/impala_shell.py b/shell/impala_shell.py
index 95ff8ab..aab478d 100755
--- a/shell/impala_shell.py
+++ b/shell/impala_shell.py
@@ -140,10 +140,16 @@ class ImpalaShell(object, cmd.Cmd):
   DML_REGEX = re.compile("^(insert|upsert|update|delete)$", re.I)
   # Seperator for queries in the history file.
   HISTORY_FILE_QUERY_DELIM = '_IMP_DELIM_'
+  # Strings that are interpreted as True for some shell options.
+  TRUE_STRINGS = ("true", "TRUE", "True", "1")
 
   VALID_SHELL_OPTIONS = {
-    'LIVE_PROGRESS' : (lambda x: x in ("true", "TRUE", "True", "1"), "print_progress"),
-    'LIVE_SUMMARY' : (lambda x: x in ("true", "TRUE", "True", "1"), "print_summary")
+    'LIVE_PROGRESS' : (lambda x: x in ImpalaShell.TRUE_STRINGS, "print_progress"),
+    'LIVE_SUMMARY' : (lambda x: x in ImpalaShell.TRUE_STRINGS, "print_summary"),
+    'WRITE_DELIMITED' : (lambda x: x in ImpalaShell.TRUE_STRINGS, "write_delimited"),
+    'VERBOSE' : (lambda x: x in ImpalaShell.TRUE_STRINGS, "verbose"),
+    'DELIMITER' : (lambda x: " " if x == '\\s' else x, "output_delimiter"),
+    'OUTPUT_FILE' : (lambda x: None if x == '' else x, "output_file"),
   }
 
   # Minimum time in seconds between two calls to get the exec summary.
@@ -180,7 +186,8 @@ class ImpalaShell(object, cmd.Cmd):
 
     # Output formatting flags/options
     self.output_file = options.output_file
-    self.output_delimiter = options.output_delimiter
+    self.output_delimiter = " " if options.output_delimiter == "\\s" \
+        else options.output_delimiter
     self.write_delimited = options.write_delimited
     self.print_header = options.print_header
 
@@ -655,6 +662,14 @@ class ImpalaShell(object, cmd.Cmd):
     except KeyError:
       return False
 
+  def _handle_unset_shell_options(self, token):
+    try:
+      handle = self.VALID_SHELL_OPTIONS[token]
+      self.__dict__[handle[1]] = impala_shell_defaults[handle[1]]
+      return True
+    except KeyError:
+      return False
+
   def _get_var_name(self, name):
     """Look for a namespace:var_name pattern in an option name.
        Return the variable name if it's a match or None otherwise.
@@ -737,6 +752,8 @@ class ImpalaShell(object, cmd.Cmd):
     elif self.set_query_options.get(option):
       print 'Unsetting option %s' % option
       del self.set_query_options[option]
+    elif self._handle_unset_shell_options(option):
+      print 'Unsetting shell option %s' % option
     else:
       print "No option called %s is set" % option
 
@@ -1447,7 +1464,7 @@ LIVE_PROGRESS=1;'.",
 to remove formatting from results you want to save for later, or to benchmark Impala.",
   "You can run a single query from the command line using the '-q' option.",
   "When pretty-printing is disabled, you can use the '--output_delimiter' flag to set \
-the delimiter for fields in the same row. The default is ','.",
+the delimiter for fields in the same row. The default is '\\t'.",
   "Run the PROFILE command after a query has finished to see a comprehensive summary of \
 all the performance and diagnostic information that Impala gathered for that query. Be \
 warned, it can be very long!",

http://git-wip-us.apache.org/repos/asf/impala/blob/de4bdb0b/tests/shell/test_shell_interactive.py
----------------------------------------------------------------------
diff --git a/tests/shell/test_shell_interactive.py b/tests/shell/test_shell_interactive.py
index bf4923d..1d81663 100755
--- a/tests/shell/test_shell_interactive.py
+++ b/tests/shell/test_shell_interactive.py
@@ -77,6 +77,60 @@ class TestImpalaShellInteractive(object):
     self._expect_with_cmd(proc, "set", ("LIVE_PROGRESS: True", "LIVE_SUMMARY: False"))
     self._expect_with_cmd(proc, "set live_summary=1")
     self._expect_with_cmd(proc, "set", ("LIVE_PROGRESS: True", "LIVE_SUMMARY: True"))
+    self._expect_with_cmd(proc, "set", ("WRITE_DELIMITED: False", "VERBOSE: True"))
+    self._expect_with_cmd(proc, "set", ("DELIMITER: \\t", "OUTPUT_FILE: None"))
+    self._expect_with_cmd(proc, "set write_delimited=true")
+    self._expect_with_cmd(proc, "set", ("WRITE_DELIMITED: True", "VERBOSE: True"))
+    self._expect_with_cmd(proc, "set DELIMITER=,")
+    self._expect_with_cmd(proc, "set", ("DELIMITER: ,", "OUTPUT_FILE: None"))
+    self._expect_with_cmd(proc, "set output_file=/tmp/clmn.txt")
+    self._expect_with_cmd(proc, "set", ("DELIMITER: ,", "OUTPUT_FILE: /tmp/clmn.txt"))
+
+  @pytest.mark.execute_serially
+  def test_write_delimited(self):
+    """Test output rows in delimited mode"""
+    p = ImpalaShell()
+    p.send_cmd("use tpch")
+    p.send_cmd("set write_delimited=true")
+    p.send_cmd("select * from nation")
+    result = p.get_result()
+    assert "+----------------+" not in result.stdout
+    assert "21\tVIETNAM\t2" in result.stdout
+
+  @pytest.mark.execute_serially
+  def test_change_delimiter(self):
+    """Test change output delimiter if delimited mode is enabled"""
+    p = ImpalaShell()
+    p.send_cmd("use tpch")
+    p.send_cmd("set write_delimited=true")
+    p.send_cmd("set delimiter=,")
+    p.send_cmd("select * from nation")
+    result = p.get_result()
+    assert "21,VIETNAM,2" in result.stdout
+
+  @pytest.mark.execute_serially
+  def test_print_to_file(self):
+    """Test print to output file and unset"""
+    # test print to file
+    p1 = ImpalaShell()
+    p1.send_cmd("use tpch")
+    local_file = tempfile.NamedTemporaryFile(delete=True)
+    p1.send_cmd("set output_file=%s" % local_file.name)
+    p1.send_cmd("select * from nation")
+    result = p1.get_result()
+    assert "VIETNAM" not in result.stdout
+    with open(local_file.name, "r") as fi:
+      # check if the results were written to the file successfully
+      result = fi.read()
+      assert "VIETNAM" in result
+    # test unset to print back to stdout
+    p2 = ImpalaShell()
+    p2.send_cmd("use tpch")
+    p2.send_cmd("set output_file=%s" % local_file.name)
+    p2.send_cmd("unset output_file")
+    p2.send_cmd("select * from nation")
+    result = p2.get_result()
+    assert "VIETNAM" in result.stdout
 
   @pytest.mark.execute_serially
   def test_compute_stats_with_live_progress_options(self):


[3/6] impala git commit: IMPALA-7234: Improve memory estimates produced by the Planner

Posted by ta...@apache.org.
IMPALA-7234: Improve memory estimates produced by the Planner

Previously, the planner used the getMajorityFormat to estimate
the memory requirements of its partitions. Additionally, before
IMPALA-6625 was merged, the majority format for a multi-format
table with no numerical majority was calculated using a HashMap,
thus producing non deterministic results. This change ensures that
the memory estimate is deterministic and always based on partition
that has the maximum memory requirement.

Testing: Ran all PlannerTests. Also, modified plans of scans with
multiple partitions to ensure that the memory estimate produced
corresponds to the partition with the maximum requirement.

Change-Id: I0666ae3d45fbd8615d3fa9a8626ebd29cf94fb4b
Reviewed-on: http://gerrit.cloudera.org:8080/11001
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/672a271f
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/672a271f
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/672a271f

Branch: refs/heads/master
Commit: 672a271fd0966bd77f38eda9b6f1e768415bac04
Parents: de4bdb0
Author: poojanilangekar <po...@cloudera.com>
Authored: Thu Jul 19 13:24:41 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Tue Jul 31 21:01:57 2018 +0000

----------------------------------------------------------------------
 .../apache/impala/catalog/FeCatalogUtils.java   | 31 +++--------
 .../org/apache/impala/catalog/FeFsTable.java    |  9 ++--
 .../org/apache/impala/catalog/HdfsTable.java    |  6 +--
 .../impala/catalog/local/LocalFsTable.java      | 12 ++---
 .../org/apache/impala/planner/HdfsScanNode.java | 30 ++++++-----
 .../apache/impala/planner/HdfsTableSink.java    | 56 +++++++++++---------
 .../PlannerTest/parquet-filtering-disabled.test |  2 +-
 .../queries/PlannerTest/parquet-filtering.test  |  2 +-
 8 files changed, 70 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/672a271f/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java b/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java
index 4f1d68d..2072228 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java
@@ -21,6 +21,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.common.StatsSetupConst;
@@ -45,6 +46,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 
 /**
  * Static utility functions shared between FeCatalog implementations.
@@ -294,32 +296,15 @@ public abstract class FeCatalogUtils {
   }
 
   /**
-   * Return the most commonly-used file format within a set of partitions.
+   * Return the set of all file formats used in the collection of partitions.
    */
-  public static HdfsFileFormat getMajorityFormat(
+  public static Set<HdfsFileFormat> getFileFormats(
       Iterable<? extends FeFsPartition> partitions) {
-    Map<HdfsFileFormat, Integer> numPartitionsByFormat = Maps.newTreeMap();
-    for (FeFsPartition partition: partitions) {
-      HdfsFileFormat format = partition.getInputFormatDescriptor().getFileFormat();
-      Integer numPartitions = numPartitionsByFormat.get(format);
-      if (numPartitions == null) {
-        numPartitions = Integer.valueOf(1);
-      } else {
-        numPartitions = Integer.valueOf(numPartitions.intValue() + 1);
-      }
-      numPartitionsByFormat.put(format, numPartitions);
-    }
-
-    int maxNumPartitions = Integer.MIN_VALUE;
-    HdfsFileFormat majorityFormat = null;
-    for (Map.Entry<HdfsFileFormat, Integer> entry: numPartitionsByFormat.entrySet()) {
-      if (entry.getValue().intValue() > maxNumPartitions) {
-        majorityFormat = entry.getKey();
-        maxNumPartitions = entry.getValue().intValue();
-      }
+    Set<HdfsFileFormat> fileFormats = Sets.newHashSet();
+    for (FeFsPartition partition : partitions) {
+      fileFormats.add(partition.getFileFormat());
     }
-    Preconditions.checkNotNull(majorityFormat);
-    return majorityFormat;
+    return fileFormats;
   }
 
   public static THdfsPartition fsPartitionToThrift(FeFsPartition part,

http://git-wip-us.apache.org/repos/asf/impala/blob/672a271f/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java b/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
index 86b41f0..891bf62 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
@@ -95,12 +95,11 @@ public interface FeFsTable extends FeTable {
   boolean isAvroTable();
 
   /**
-   * @return the format that the majority of partitions in this table use
-   *
-   * TODO(todd): this API needs to be removed, since it depends on loading all
-   * partitions even when scanning few.
+   * @return the set of file formats that the partitions in this table use.
+   * This API is only used by the TableSink to write out partitions. It
+   * should not be used for scanning.
    */
-  public HdfsFileFormat getMajorityFormat();
+  public Set<HdfsFileFormat> getFileFormats();
 
   /**
    * Return true if the table may be written to.

http://git-wip-us.apache.org/repos/asf/impala/blob/672a271f/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 0f30407..d66ddd2 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -1808,14 +1808,14 @@ public class HdfsTable extends Table implements FeFsTable {
   public ListMap<TNetworkAddress> getHostIndex() { return hostIndex_; }
 
   /**
-   * Returns the file format that the majority of partitions are stored in.
+   * Returns the set of file formats that the partitions are stored in.
    */
-  public HdfsFileFormat getMajorityFormat() {
+  public Set<HdfsFileFormat> getFileFormats() {
     // In the case that we have no partitions added to the table yet, it's
     // important to add the "prototype" partition as a fallback.
     Iterable<HdfsPartition> partitionsToConsider = Iterables.concat(
         partitionMap_.values(), Collections.singleton(prototypePartition_));
-    return FeCatalogUtils.getMajorityFormat(partitionsToConsider);
+    return FeCatalogUtils.getFileFormats(partitionsToConsider);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/impala/blob/672a271f/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
index 82af240..7753faa 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
@@ -178,15 +178,9 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
   }
 
   @Override
-  public HdfsFileFormat getMajorityFormat() {
-    // TODO(todd): can we avoid loading all partitions here? this is called
-    // for any INSERT query, even if the partition is specified.
-    Collection<? extends FeFsPartition> parts = FeCatalogUtils.loadAllPartitions(this);
-    // In the case that we have no partitions added to the table yet, it's
-    // important to add the "prototype" partition as a fallback.
-    Iterable<FeFsPartition> partitionsToConsider = Iterables.concat(
-        parts, Collections.singleton(createPrototypePartition()));
-    return FeCatalogUtils.getMajorityFormat(partitionsToConsider);
+  public Set<HdfsFileFormat> getFileFormats() {
+    // Needed by HdfsTableSink.
+    throw new UnsupportedOperationException("TODO: implement me");
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/impala/blob/672a271f/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index 55bf301..151cbe0 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -1366,18 +1366,24 @@ public class HdfsScanNode extends ScanNode {
       columnReservations = computeMinColumnMemReservations();
     }
 
-    int perHostScanRanges;
-    HdfsFileFormat majorityFormat = FeCatalogUtils.getMajorityFormat(partitions_);
-    if (majorityFormat == HdfsFileFormat.PARQUET
-        || majorityFormat == HdfsFileFormat.ORC) {
-      Preconditions.checkNotNull(columnReservations);
-      // For the purpose of this estimation, the number of per-host scan ranges for
-      // Parquet/ORC files are equal to the number of columns read from the file. I.e.
-      // excluding partition columns and columns that are populated from file metadata.
-      perHostScanRanges = columnReservations.size();
-    } else {
-      perHostScanRanges = (int) Math.ceil(
-          ((double) scanRangeSize / (double) numNodes_) * SCAN_RANGE_SKEW_FACTOR);
+    int perHostScanRanges = 0;
+    for (HdfsFileFormat format : fileFormats_) {
+      int partitionScanRange = 0;
+      if ((format == HdfsFileFormat.PARQUET) || (format == HdfsFileFormat.ORC)) {
+        Preconditions.checkNotNull(columnReservations);
+        // For the purpose of this estimation, the number of per-host scan ranges for
+        // Parquet/ORC files are equal to the number of columns read from the file. I.e.
+        // excluding partition columns and columns that are populated from file metadata.
+        partitionScanRange = columnReservations.size();
+      } else {
+        partitionScanRange = (int) Math.ceil(
+            ((double) scanRangeSize / (double) numNodes_) * SCAN_RANGE_SKEW_FACTOR);
+      }
+      // From the resource management purview, we want to conservatively estimate memory
+      // consumption based on the partition with the highest memory requirements.
+      if (partitionScanRange > perHostScanRanges) {
+        perHostScanRanges = partitionScanRange;
+      }
     }
 
     // The non-MT scan node requires at least one scanner thread.

http://git-wip-us.apache.org/repos/asf/impala/blob/672a271f/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java b/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
index 7426641..48e7c62 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
@@ -18,12 +18,14 @@
 package org.apache.impala.planner;
 
 import java.util.List;
+import java.util.Set;
 
 import org.apache.impala.analysis.DescriptorTable;
 import org.apache.impala.analysis.Expr;
 import org.apache.impala.catalog.FeFsTable;
 import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.HdfsFileFormat;
+import org.apache.impala.catalog.HdfsTable;
 import org.apache.impala.thrift.TDataSink;
 import org.apache.impala.thrift.TDataSinkType;
 import org.apache.impala.thrift.TExplainLevel;
@@ -31,8 +33,12 @@ import org.apache.impala.thrift.THdfsTableSink;
 import org.apache.impala.thrift.TQueryOptions;
 import org.apache.impala.thrift.TTableSink;
 import org.apache.impala.thrift.TTableSinkType;
+
+import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 
 /**
  * Sink for inserting into filesystem-backed tables.
@@ -53,6 +59,10 @@ public class HdfsTableSink extends TableSink {
   // be opened, written, and closed one by one.
   protected final boolean inputIsClustered_;
 
+  private static final Set<HdfsFileFormat> SUPPORTED_FILE_FORMATS = ImmutableSet.of(
+      HdfsFileFormat.PARQUET, HdfsFileFormat.TEXT, HdfsFileFormat.LZO_TEXT,
+      HdfsFileFormat.RC_FILE, HdfsFileFormat.SEQUENCE_FILE, HdfsFileFormat.AVRO);
+
   // Stores the indices into the list of non-clustering columns of the target table that
   // are stored in the 'sort.columns' table property. This is sent to the backend to
   // populate the RowGroup::sorting_columns list in parquet files.
@@ -70,9 +80,6 @@ public class HdfsTableSink extends TableSink {
 
   @Override
   public void computeResourceProfile(TQueryOptions queryOptions) {
-    FeFsTable table = (FeFsTable) targetTable_;
-    // TODO: Estimate the memory requirements more accurately by partition type.
-    HdfsFileFormat format = table.getMajorityFormat();
     PlanNode inputNode = fragment_.getPlanRoot();
     int numInstances = fragment_.getNumInstances(queryOptions.getMt_dop());
     // Compute the per-instance number of partitions, taking the number of nodes
@@ -82,7 +89,11 @@ public class HdfsTableSink extends TableSink {
     if (numPartitionsPerInstance == -1) {
       numPartitionsPerInstance = DEFAULT_NUM_PARTITIONS;
     }
-    long perPartitionMemReq = getPerPartitionMemReq(format);
+
+    HdfsTable table = (HdfsTable) targetTable_;
+    // TODO: Estimate the memory requirements more accurately by partition type.
+    Set<HdfsFileFormat> formats = table.getFileFormats();
+    long perPartitionMemReq = getPerPartitionMemReq(formats);
 
     long perInstanceMemEstimate;
     // The estimate is based purely on the per-partition mem req if the input cardinality_
@@ -105,28 +116,25 @@ public class HdfsTableSink extends TableSink {
 
   /**
    * Returns the per-partition memory requirement for inserting into the given
-   * file format.
+   * set of file formats.
    */
-  private long getPerPartitionMemReq(HdfsFileFormat format) {
-    switch (format) {
-      case PARQUET:
-        // Writing to a Parquet table requires up to 1GB of buffer per partition.
-        // TODO: The per-partition memory requirement is configurable in the QueryOptions.
-        return 1024L * 1024L * 1024L;
-      case TEXT:
-      case LZO_TEXT:
-        // Very approximate estimate of amount of data buffered.
-        return 100L * 1024L;
-      case RC_FILE:
-      case SEQUENCE_FILE:
-      case AVRO:
-        // Very approximate estimate of amount of data buffered.
-        return 100L * 1024L;
-      default:
-        Preconditions.checkState(false, "Unsupported TableSink format " +
-            format.toString());
+  private long getPerPartitionMemReq(Set<HdfsFileFormat> formats) {
+    Set<HdfsFileFormat> unsupportedFormats =
+        Sets.difference(formats, SUPPORTED_FILE_FORMATS);
+    if (!unsupportedFormats.isEmpty()) {
+      Preconditions.checkState(false,
+          "Unsupported TableSink format(s): " + Joiner.on(',').join(unsupportedFormats));
     }
-    return 0;
+    if (formats.contains(HdfsFileFormat.PARQUET)) {
+      // Writing to a Parquet partition requires up to 1GB of buffer. From a resource
+      // management purview, even if there are non-Parquet partitions, we want to be
+      // conservative and make a high memory estimate.
+      return 1024L * 1024L * 1024L;
+    }
+
+    // For all other supported formats (TEXT, LZO_TEXT, RC_FILE, SEQUENCE_FILE & AVRO)
+    // 100KB is a very approximate estimate of the amount of data buffered.
+    return 100L * 1024L;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/impala/blob/672a271f/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering-disabled.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering-disabled.test b/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering-disabled.test
index 4cccd06..afdb8f2 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering-disabled.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering-disabled.test
@@ -306,6 +306,6 @@ PLAN-ROOT SINK
      partitions: 0/4 rows=unavailable
      columns missing stats: id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col
    extrapolated-rows=disabled max-scan-range-rows=unavailable
-   mem-estimate=32.00MB mem-reservation=88.00KB thread-reservation=1
+   mem-estimate=128.00MB mem-reservation=88.00KB thread-reservation=1
    tuple-ids=0 row-size=80B cardinality=unavailable
 ====

http://git-wip-us.apache.org/repos/asf/impala/blob/672a271f/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test b/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test
index 8df8716..64bf5f2 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test
@@ -528,7 +528,7 @@ PLAN-ROOT SINK
    extrapolated-rows=disabled max-scan-range-rows=unavailable
    parquet statistics predicates: bigint_col < 5000, double_col > 100.00, float_col > 50.00, id = 1, tinyint_col < 50, string_col IN ('aaaa', 'bbbb', 'cccc'), smallint_col IN (1, 2, 3, 4, 5), date_string_col > '1993-10-01'
    parquet dictionary predicates: bool_col, bigint_col < 5000, double_col > 100.00, float_col > 50.00, id = 1, tinyint_col < 50, string_col IN ('aaaa', 'bbbb', 'cccc'), smallint_col IN (1, 2, 3, 4, 5), mod(int_col, 2) = 1, timestamp_cmp(timestamp_col, TIMESTAMP '2016-11-20 00:00:00') = 1, date_string_col > '1993-10-01'
-   mem-estimate=32.00MB mem-reservation=88.00KB thread-reservation=1
+   mem-estimate=128.00MB mem-reservation=88.00KB thread-reservation=1
    tuple-ids=0 row-size=80B cardinality=unavailable
 ====
 # Test a variety of predicates on a mixed format table.


[4/6] impala git commit: IMPALA-6490: Reconnect shell when remote restarts

Posted by ta...@apache.org.
IMPALA-6490: Reconnect shell when remote restarts

If the remote impalad died while the shell waited for a
command to complete, the shell disconnected. Previously
after restarting the remote impalad, we needed to run
"connect;" to reconnect, now the shell will automatically
reconnect.

Testing:
Added test_auto_connect_after_impalad_died in
test_shell_interactive_reconnect.py

Change-Id: Ia13365a9696886f01294e98054cf4e7cd66ab712
Reviewed-on: http://gerrit.cloudera.org:8080/10992
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/72db58ac
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/72db58ac
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/72db58ac

Branch: refs/heads/master
Commit: 72db58acd054785fa6f57ef775e13e07ea0920a6
Parents: 672a271
Author: Nghia Le <mi...@gmail.com>
Authored: Wed Jul 18 18:22:08 2018 +0200
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Tue Jul 31 21:50:33 2018 +0000

----------------------------------------------------------------------
 shell/impala_client.py                          | 14 +++++---
 shell/impala_shell.py                           | 22 ++++++------
 .../test_shell_interactive_reconnect.py         | 37 ++++++++++++++++++--
 3 files changed, 56 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/72db58ac/shell/impala_client.py
----------------------------------------------------------------------
diff --git a/shell/impala_client.py b/shell/impala_client.py
index 69c7699..2f2a5e9 100755
--- a/shell/impala_client.py
+++ b/shell/impala_client.py
@@ -30,7 +30,7 @@ from thrift.protocol import TBinaryProtocol
 from thrift_sasl import TSaslClientTransport
 from thrift.transport.TSocket import TSocket
 from thrift.transport.TTransport import TBufferedTransport, TTransportException
-from thrift.Thrift import TApplicationException
+from thrift.Thrift import TApplicationException, TException
 
 class RpcStatus:
   """Convenience enum to describe Rpc return statuses"""
@@ -229,11 +229,15 @@ class ImpalaClient(object):
       output += first_child_output
     return idx
 
-  def test_connection(self):
-    """Checks to see if the current Impala connection is still alive. If not, an exception
-    will be raised."""
+  def is_connected(self):
+    """Returns True if the current Impala connection is alive and False otherwise."""
     if self.connected:
-      self.imp_service.PingImpalaService()
+      try:
+        return self.imp_service.PingImpalaService()
+      except TException:
+        return False
+    else:
+      return False
 
   def connect(self):
     """Creates a connection to an Impalad instance

http://git-wip-us.apache.org/repos/asf/impala/blob/72db58ac/shell/impala_shell.py
----------------------------------------------------------------------
diff --git a/shell/impala_shell.py b/shell/impala_shell.py
index aab478d..4348a4e 100755
--- a/shell/impala_shell.py
+++ b/shell/impala_shell.py
@@ -43,7 +43,6 @@ from option_parser import get_option_parser, get_config_from_file
 from shell_output import DelimitedOutputFormatter, OutputStream, PrettyOutputFormatter
 from shell_output import OverwritingStdErrOutputStream
 from subprocess import call
-from thrift.Thrift import TException
 
 VERSION_FORMAT = "Impala Shell v%(version)s (%(git_hash)s) built on %(build_date)s"
 VERSION_STRING = "build version not available"
@@ -231,6 +230,8 @@ class ImpalaShell(object, cmd.Cmd):
 
     if options.impalad is not None:
       self.do_connect(options.impalad)
+      # Check if the database in shell option exists
+      self._validate_database(immediately=True)
 
     # We handle Ctrl-C ourselves, using an Event object to signal cancellation
     # requests between the handler and the main shell thread.
@@ -568,6 +569,10 @@ class ImpalaShell(object, cmd.Cmd):
     else:
       return query
 
+  def set_prompt(self, db):
+    self.prompt = ImpalaShell.PROMPT_FORMAT.format(
+        host=self.impalad[0], port=self.impalad[1], db=db)
+
   def precmd(self, args):
     args = self.sanitise_input(args)
     if not args: return args
@@ -581,9 +586,7 @@ class ImpalaShell(object, cmd.Cmd):
       # If cmdqueue is populated, then commands are executed from the cmdqueue, and user
       # input is ignored. Send an empty string as the user input just to be safe.
       return str()
-    try:
-      self.imp_client.test_connection()
-    except TException:
+    if not self.imp_client.is_connected():
       print_to_stderr("Connection lost, reconnecting...")
       self._connect()
       self._validate_database(immediately=True)
@@ -812,8 +815,7 @@ class ImpalaShell(object, cmd.Cmd):
     if self.imp_client.connected:
       self._print_if_verbose('Connected to %s:%s' % self.impalad)
       self._print_if_verbose('Server version: %s' % self.server_version)
-      self.prompt = ImpalaShell.PROMPT_FORMAT.format(
-        host=self.impalad[0], port=self.impalad[1], db=ImpalaShell.DEFAULT_DB)
+      self.set_prompt(ImpalaShell.DEFAULT_DB)
       self._validate_database()
     try:
       self.imp_client.build_default_query_options_dict()
@@ -883,10 +885,12 @@ class ImpalaShell(object, cmd.Cmd):
     If immediately is False, it appends the USE command to self.cmdqueue.
     If immediately is True, it executes the USE command right away.
     """
+    if not self.imp_client.connected:
+      return
+    # Should only check if successfully connected.
     if self.current_db:
       self.current_db = self.current_db.strip('`')
       use_current_db = ('use `%s`' % self.current_db)
-
       if immediately:
         self.onecmd(use_current_db)
       else:
@@ -1185,9 +1189,7 @@ class ImpalaShell(object, cmd.Cmd):
     query = self._create_beeswax_query(args)
     if self._execute_stmt(query) is CmdStatus.SUCCESS:
       self.current_db = args.strip('`').strip()
-      self.prompt = ImpalaShell.PROMPT_FORMAT.format(host=self.impalad[0],
-                                                     port=self.impalad[1],
-                                                     db=self.current_db)
+      self.set_prompt(self.current_db)
     elif args.strip('`') == self.current_db:
       # args == current_db means -d option was passed but the "use [db]" operation failed.
       # We need to set the current_db to None so that it does not show a database, which

http://git-wip-us.apache.org/repos/asf/impala/blob/72db58ac/tests/custom_cluster/test_shell_interactive_reconnect.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_shell_interactive_reconnect.py b/tests/custom_cluster/test_shell_interactive_reconnect.py
index 1f82468..c747139 100644
--- a/tests/custom_cluster/test_shell_interactive_reconnect.py
+++ b/tests/custom_cluster/test_shell_interactive_reconnect.py
@@ -18,11 +18,18 @@
 import pytest
 import tempfile
 import socket
+import pexpect
+import os
 
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 from tests.common.impala_service import ImpaladService
 from tests.common.skip import SkipIfBuildType
 from tests.shell.util import ImpalaShell, move_shell_history, restore_shell_history
+# Follow tests/shell/test_shell_interactive.py naming.
+from shell.impala_shell import ImpalaShell as ImpalaShellClass
+
+SHELL_CMD = "%s/bin/impala-shell.sh" % os.environ['IMPALA_HOME']
+NUM_QUERIES = 'impala-server.num-queries'
 
 class TestShellInteractiveReconnect(CustomClusterTestSuite):
   """ Check if interactive shell is using the current DB after reconnecting """
@@ -54,8 +61,6 @@ class TestShellInteractiveReconnect(CustomClusterTestSuite):
 
   @pytest.mark.execute_serially
   def test_auto_reconnect(self):
-    NUM_QUERIES = 'impala-server.num-queries'
-
     impalad = ImpaladService(socket.getfqdn())
     start_num_queries = impalad.get_metric_value(NUM_QUERIES)
 
@@ -72,3 +77,31 @@ class TestShellInteractiveReconnect(CustomClusterTestSuite):
     result = p.get_result()
     assert "alltypesaggmultifilesnopart" in result.stdout
 
+  @pytest.mark.execute_serially
+  def test_auto_reconnect_after_impalad_died(self):
+    """Test reconnect after restarting the remote impalad without using connect;"""
+    # Use pexpect instead of ImpalaShell() since after using get_result() in ImpalaShell()
+    # to check Disconnect, send_cmd() will no longer have any effect so we can not check
+    # reconnect.
+    impalad = ImpaladService(socket.getfqdn())
+    start_num_queries = impalad.get_metric_value(NUM_QUERIES)
+
+    proc = pexpect.spawn(' '.join([SHELL_CMD, "-i localhost:21000"]))
+    proc.expect("21000] default>")
+    proc.sendline("use tpch;")
+
+    # wait for the USE command to finish
+    impalad.wait_for_metric_value(NUM_QUERIES, start_num_queries + 1)
+    impalad.wait_for_num_in_flight_queries(0)
+
+    # Disconnect
+    self.cluster.impalads[0].kill()
+    proc.sendline("show tables;")
+    # Search from [1:] since the square brackets "[]" are special characters in regex
+    proc.expect(ImpalaShellClass.DISCONNECTED_PROMPT[1:])
+    # Restarting Impalad
+    self.cluster.impalads[0].start()
+    # Check reconnect
+    proc.sendline("show tables;")
+    proc.expect("nation")
+    proc.expect("21000] tpch>")