You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/11/26 06:49:41 UTC

[impala] branch master updated (1f792be -> 0f2aa50)

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from 1f792be  IMPALA-9188: Composite primary keys should use same constraint name
     new cba6e34  IMPALA-9085: [DOCS] Refactored impala_s3.xml
     new ae24252  IMPALA-8065: Add OS distribution name in OSInfo
     new ddd07e1  IMPALA-8867: Further deflake test_auto_scaling
     new 0f2aa50  IMPALA-9146: Add a configurable limit for the size of broadcast input.

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/service/query-options-test.cc               |   1 +
 be/src/service/query-options.cc                    |   9 +
 be/src/service/query-options.h                     |   5 +-
 be/src/util/CMakeLists.txt                         |   2 +
 .../os-info-test.cc}                               |  14 +-
 be/src/util/os-info.cc                             |  50 +-
 be/src/util/os-info.h                              |  10 +
 common/thrift/ImpalaInternalService.thrift         |   6 +-
 common/thrift/ImpalaService.thrift                 |   6 +
 docs/shared/impala_common.xml                      | 102 ++-
 docs/topics/impala_s3.xml                          | 804 ++++++++-------------
 .../apache/impala/planner/DistributedPlanner.java  |  11 +-
 .../org/apache/impala/planner/PlannerTest.java     |  15 +
 .../org/apache/impala/planner/PlannerTestBase.java |   6 +
 .../PlannerTest/broadcast-bytes-limit-large.test   |  24 +
 .../queries/PlannerTest/broadcast-bytes-limit.test |  53 ++
 tests/custom_cluster/test_auto_scaling.py          |   8 +-
 17 files changed, 555 insertions(+), 571 deletions(-)
 copy be/src/{exprs/agg-fn-evaluator-ir.cc => util/os-info-test.cc} (74%)
 create mode 100644 testdata/workloads/functional-planner/queries/PlannerTest/broadcast-bytes-limit-large.test
 create mode 100644 testdata/workloads/functional-planner/queries/PlannerTest/broadcast-bytes-limit.test


[impala] 03/04: IMPALA-8867: Further deflake test_auto_scaling

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit ddd07e17136302aee58c44060c0f93876d00294c
Author: Lars Volker <lv...@cloudera.com>
AuthorDate: Mon Nov 18 16:51:25 2019 -0800

    IMPALA-8867: Further deflake test_auto_scaling
    
    A previous attempt to deflake this test by lowering the threshold for
    multi-group throughput had improved things but we still saw another
    occurrence of test_auto_scaling failing recently. This change lowers the
    threshold even further to try and eradicate the flakiness. From
    inspecting the logs of the failed run I could see that the new threshold
    would have prevented the failure.
    
    Change-Id: I29808982cc6226152c544cb99f76961b582975a7
    Reviewed-on: http://gerrit.cloudera.org:8080/14740
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: Lars Volker <lv...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 tests/custom_cluster/test_auto_scaling.py | 8 +++++---
 1 file changed, 5 insertions(+), 3 deletions(-)

diff --git a/tests/custom_cluster/test_auto_scaling.py b/tests/custom_cluster/test_auto_scaling.py
index deab85a..6f999b0 100644
--- a/tests/custom_cluster/test_auto_scaling.py
+++ b/tests/custom_cluster/test_auto_scaling.py
@@ -99,8 +99,10 @@ class TestAutoScaling(CustomClusterTestSuite):
       assert self.impalad_test_service.get_metric_value(
         "cluster-membership.executor-groups.total-healthy") >= 2
 
-      # Wait for query rate to reach the maximum for a single executor group plus 20%
-      min_query_rate = 1.2 * EXECUTOR_SLOTS
+      # Wait for query rate to exceed the maximum for a single executor group. In the past
+      # we tried to wait for it to pass a higher threshold but on some platforms we saw
+      # that it was too flaky.
+      min_query_rate = EXECUTOR_SLOTS
       max_query_rate = 0
       # This barrier has been flaky in the past so we wait 2x as long as for the other
       # checks.
@@ -109,7 +111,7 @@ class TestAutoScaling(CustomClusterTestSuite):
         current_rate = workload.get_query_rate()
         LOG.info("Current rate: %s" % current_rate)
         max_query_rate = max(max_query_rate, current_rate)
-        if max_query_rate >= min_query_rate:
+        if max_query_rate > min_query_rate:
           break
         sleep(1)
 


[impala] 01/04: IMPALA-9085: [DOCS] Refactored impala_s3.xml

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit cba6e344eef1689fc22d1dd3e2b86dbd66b5797f
Author: Alex Rodoni <ar...@cloudera.com>
AuthorDate: Thu Oct 31 13:16:08 2019 -0700

    IMPALA-9085: [DOCS] Refactored impala_s3.xml
    
    Change-Id: Ib274968a0412b4b8757f31ab674d4b82311de70a
    Reviewed-on: http://gerrit.cloudera.org:8080/14627
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: Sahil Takiar <st...@cloudera.com>
---
 docs/shared/impala_common.xml | 102 +++---
 docs/topics/impala_s3.xml     | 804 ++++++++++++++++--------------------------
 2 files changed, 358 insertions(+), 548 deletions(-)

diff --git a/docs/shared/impala_common.xml b/docs/shared/impala_common.xml
index 5c22ab2..4a26885 100644
--- a/docs/shared/impala_common.xml
+++ b/docs/shared/impala_common.xml
@@ -1675,29 +1675,24 @@ drop database temp;
         Impala handles time zone considerations for the <codeph>TIMESTAMP</codeph> data type.
       </p>
 
-      <p rev="2.6.0 IMPALA-3558" id="s3_drop_table_purge">
-        For best compatibility with the S3 write support in <keyword keyref="impala26_full"/>
-        and higher:
-        <ul>
-          <li>
-            Use native Hadoop techniques to create data files in S3 for querying through Impala.
-          </li>
-
-          <li>
-            Use the <codeph>PURGE</codeph> clause of <codeph>DROP TABLE</codeph> when dropping
-            internal (managed) tables.
-          </li>
-        </ul>
-        By default, when you drop an internal (managed) table, the data files are moved to the
-        HDFS trashcan. This operation is expensive for tables that reside on the Amazon S3
-        filesystem. Therefore, for S3 tables, prefer to use <codeph>DROP TABLE
-        <varname>table_name</varname> PURGE</codeph> rather than the default <codeph>DROP
-        TABLE</codeph> statement. The <codeph>PURGE</codeph> clause makes Impala delete the data
-        files immediately, skipping the HDFS trashcan. For the <codeph>PURGE</codeph> clause to
-        work effectively, you must originally create the data files on S3 using one of the tools
-        from the Hadoop ecosystem, such as <codeph>hadoop fs -cp</codeph>, or
-        <codeph>INSERT</codeph> in Impala or Hive.
-      </p>
+      <p rev="2.6.0 IMPALA-3558" id="s3_drop_table_purge"> For best
+        compatibility with the S3 write support in <keyword
+          keyref="impala26_full"/> and higher: <ul>
+          <li> Use native Hadoop techniques to create data files in S3 for
+            querying through Impala. </li>
+          <li> Use the <codeph>PURGE</codeph> clause of <codeph>DROP
+              TABLE</codeph> when dropping internal (managed) tables. </li>
+        </ul> By default, when you drop an internal (managed) table, the data
+        files are moved to the HDFS trashcan. This operation is expensive for
+        tables that reside on the Amazon S3 object store. Therefore, for S3
+        tables, prefer to use <codeph>DROP TABLE <varname>table_name</varname>
+          PURGE</codeph> rather than the default <codeph>DROP TABLE</codeph>
+        statement. The <codeph>PURGE</codeph> clause makes Impala delete the
+        data files immediately, skipping the HDFS trashcan. For the
+          <codeph>PURGE</codeph> clause to work effectively, you must originally
+        create the data files on S3 using one of the tools from the Hadoop
+        ecosystem, such as <codeph>hadoop fs -cp</codeph>, or
+          <codeph>INSERT</codeph> in Impala or Hive. </p>
 
       <p rev="2.11.0 IMPALA-4252" id="filter_option_bloom_only">
         This query option affects only Bloom filters, not the min/max filters that are applied
@@ -1705,23 +1700,26 @@ drop database temp;
         tables.
       </p>
 
-      <p rev="2.6.0 IMPALA-1878" id="s3_dml_performance">
-        Because of differences between S3 and traditional filesystems, DML operations for S3
-        tables can take longer than for tables on HDFS. For example, both the <codeph>LOAD
-        DATA</codeph> statement and the final stage of the <codeph>INSERT</codeph> and
-        <codeph>CREATE TABLE AS SELECT</codeph> statements involve moving files from one
-        directory to another. (In the case of <codeph>INSERT</codeph> and <codeph>CREATE TABLE
-        AS SELECT</codeph>, the files are moved from a temporary staging directory to the final
-        destination directory.) Because S3 does not support a <q>rename</q> operation for
-        existing objects, in these cases Impala actually copies the data files from one location
-        to another and then removes the original files. In <keyword keyref="impala26_full"/>,
-        the <codeph>S3_SKIP_INSERT_STAGING</codeph> query option provides a way to speed up
-        <codeph>INSERT</codeph> statements for S3 tables and partitions, with the tradeoff that
-        a problem during statement execution could leave data in an inconsistent state. It does
-        not apply to <codeph>INSERT OVERWRITE</codeph> or <codeph>LOAD DATA</codeph> statements.
-        See <xref href="../topics/impala_s3_skip_insert_staging.xml#s3_skip_insert_staging"/>
-        for details.
-      </p>
+      <p rev="2.6.0 IMPALA-1878" id="s3_dml_performance"> Because of differences
+        between S3 and traditional filesystems, DML operations for S3 tables can
+        take longer than for tables on HDFS. For example, both the <codeph>LOAD
+          DATA</codeph> statement and the final stage of the
+          <codeph>INSERT</codeph> and <codeph>CREATE TABLE AS SELECT</codeph>
+        statements involve moving files from one directory to another. (In the
+        case of <codeph>INSERT</codeph> and <codeph>CREATE TABLE AS
+          SELECT</codeph>, the files are moved from a temporary staging
+        directory to the final destination directory.) Because S3 does not
+        support a <q>rename</q> operation for existing objects, in these cases
+        Impala actually copies the data files from one location to another and
+        then removes the original files. In <keyword keyref="impala26_full"/>,
+        the <codeph>S3_SKIP_INSERT_STAGING</codeph> query option provides a way
+        to speed up <codeph>INSERT</codeph> statements for S3 tables and
+        partitions, with the tradeoff that a problem during statement execution
+        could leave data in an inconsistent state. It does not apply to
+          <codeph>INSERT OVERWRITE</codeph> or <codeph>LOAD DATA</codeph>
+        statements. See <xref
+          href="../topics/impala_s3_skip_insert_staging.xml#s3_skip_insert_staging"
+          >S3_SKIP_INSERT_STAGING Query Option</xref> for details. </p>
 
       <p id="adls_block_splitting" rev="IMPALA-5383">
         Because ADLS does not expose the block sizes of data files the way HDFS does, any Impala
@@ -1796,18 +1794,18 @@ drop database temp;
         Impala to query the ADLS data.
       </p>
 
-      <p rev="2.6.0 IMPALA-1878" id="s3_dml">
-        In <keyword keyref="impala26_full"/> and higher, the Impala DML statements
-        (<codeph>INSERT</codeph>, <codeph>LOAD DATA</codeph>, and <codeph>CREATE TABLE AS
-        SELECT</codeph>) can write data into a table or partition that resides in the Amazon
-        Simple Storage Service (S3). The syntax of the DML statements is the same as for any
-        other tables, because the S3 location for tables and partitions is specified by an
-        <codeph>s3a://</codeph> prefix in the <codeph>LOCATION</codeph> attribute of
-        <codeph>CREATE TABLE</codeph> or <codeph>ALTER TABLE</codeph> statements. If you bring
-        data into S3 using the normal S3 transfer mechanisms instead of Impala DML statements,
-        issue a <codeph>REFRESH</codeph> statement for the table before using Impala to query
-        the S3 data.
-      </p>
+      <p rev="2.6.0 IMPALA-1878" id="s3_dml"> In <keyword keyref="impala26_full"
+        /> and higher, the Impala DML statements (<codeph>INSERT</codeph>,
+          <codeph>LOAD DATA</codeph>, and <codeph>CREATE TABLE AS
+          SELECT</codeph>) can write data into a table or partition that resides
+        in S3. The syntax of the DML statements is the same as for any other
+        tables, because the S3 location for tables and partitions is specified
+        by an <codeph>s3a://</codeph> prefix in the <codeph>LOCATION</codeph>
+        attribute of <codeph>CREATE TABLE</codeph> or <codeph>ALTER
+          TABLE</codeph> statements. If you bring data into S3 using the normal
+        S3 transfer mechanisms instead of Impala DML statements, issue a
+          <codeph>REFRESH</codeph> statement for the table before using Impala
+        to query the S3 data. </p>
 
       <p rev="2.2.0" id="s3_metadata">
         Impala caches metadata for tables where the data resides in the Amazon Simple Storage
diff --git a/docs/topics/impala_s3.xml b/docs/topics/impala_s3.xml
index 31fc348..a8868bf 100644
--- a/docs/topics/impala_s3.xml
+++ b/docs/topics/impala_s3.xml
@@ -20,7 +20,7 @@ under the License.
 <!DOCTYPE concept PUBLIC "-//OASIS//DTD DITA Concept//EN" "concept.dtd">
 <concept id="s3" rev="2.2.0">
 
-  <title>Using Impala with the Amazon S3 Filesystem</title>
+  <title>Using Impala with Amazon S3 Object Store</title>
   <titlealts audience="PDF"><navtitle>S3 Tables</navtitle></titlealts>
   <prolog>
     <metadata>
@@ -36,74 +36,156 @@ under the License.
 
   <conbody>
 
-    <note conref="../shared/impala_common.xml#common/s3_production"/>
-
     <p rev="2.2.0"> You can use Impala to query data residing on the Amazon S3
-      filesystem. This capability allows convenient access to a storage system
+      object store. This capability allows convenient access to a storage system
       that is remotely managed, accessible from anywhere, and integrated with
       various cloud-based services. Impala can query files in any supported file
       format from S3. The S3 storage location can be for an entire table, or
       individual partitions in a partitioned table. </p>
 
-    <p>
-      The default Impala tables use data files stored on HDFS, which are ideal for bulk loads and queries using
-      full-table scans. In contrast, queries against S3 data are less performant, making S3 suitable for holding
-      <q>cold</q> data that is only queried occasionally, while more frequently accessed <q>hot</q> data resides in
-      HDFS. In a partitioned table, you can set the <codeph>LOCATION</codeph> attribute for individual partitions
-      to put some partitions on HDFS and others on S3, typically depending on the age of the data.
-    </p>
-
     <p outputclass="toc inpage"/>
 
   </conbody>
+  <concept id="s3_best_practices" rev="2.6.0 IMPALA-1878">
+    <title>Best Practices for Using Impala with S3</title>
+    <prolog>
+      <metadata>
+        <data name="Category" value="Guidelines"/>
+        <data name="Category" value="Best Practices"/>
+      </metadata>
+    </prolog>
+    <conbody>
+      <p> The following guidelines summarize the best practices described in the
+        rest of this topic: </p>
+      <ul>
+        <li>
+          <p> Any reference to an S3 location must be fully qualified when S3 is
+            not designated as the default storage, for example,
+              <codeph>s3a:://[s3-bucket-name]</codeph>.</p>
+        </li>
+        <li>
+          <p> Set <codeph>fs.s3a.connection.maximum</codeph> to 1500 for
+              <cmdname>impalad</cmdname>. </p>
+        </li>
+        <li>
+          <p> Set <codeph>fs.s3a.block.size</codeph> to 134217728 (128 MB in
+            bytes) if most Parquet files queried by Impala were written by Hive
+            or ParquetMR jobs. </p>
+          <p>Set the block size to 268435456 (256 MB in bytes) if most Parquet
+            files queried by Impala were written by Impala. </p>
+          <p>Starting in Impala 3.4.0, instead of
+              <codeph>fs.s3a.block.size</codeph>, the
+              <codeph>PARQUET_OBJECT_STORE_SPLIT_SIZE</codeph> query option
+            controls the Parquet-specific split size. The default value is 256
+            MB.</p>
+        </li>
+        <li>
+          <p>
+            <codeph>DROP TABLE .. PURGE</codeph> is much faster than the default
+              <codeph>DROP TABLE</codeph>. The same applies to <codeph>ALTER
+              TABLE ... DROP PARTITION PURGE</codeph> versus the default
+              <codeph>DROP PARTITION</codeph> operation. Due to the eventually
+            consistent nature of S3, the files for that table or partition could
+            remain for some unbounded time when using <codeph>PURGE</codeph>.
+            The default <codeph>DROP TABLE/PARTITION</codeph> is slow because
+            Impala copies the files to the HDFS trash folder, and Impala waits
+            until all the data is moved. <codeph>DROP TABLE/PARTITION ..
+              PURGE</codeph> is a fast delete operation, and the Impala
+            statement finishes quickly even though the change might not have
+            propagated fully throughout S3. </p>
+        </li>
+        <li>
+          <p>
+            <codeph>INSERT</codeph> statements are faster than <codeph>INSERT
+              OVERWRITE</codeph> for S3. The query option
+              <codeph>S3_SKIP_INSERT_STAGING</codeph>, which is set to
+              <codeph>true</codeph> by default, skips the staging step for
+            regular <codeph>INSERT</codeph> (but not <codeph>INSERT
+              OVERWRITE</codeph>). This makes the operation much faster, but
+            consistency is not guaranteed: if a node fails during execution, the
+            table could end up with inconsistent data. Set this option to
+              <codeph>false</codeph> if stronger consistency is required,
+            however, this setting will make the <codeph>INSERT</codeph>
+            operations slower. </p>
+          <ul>
+            <li>
+              <p> For Impala-ACID tables, both <codeph>INSERT</codeph> and
+                  <codeph>INSERT OVERWRITE</codeph> tables for S3 are fast,
+                regardless of the setting of
+                  <codeph>S3_SKIP_INSERT_STAGING</codeph>. Plus, consistency is
+                guaranteed with ACID tables.</p>
+            </li>
+          </ul>
+        </li>
+        <li>Enable <xref href="impala_data_cache.xml#data_cache">data cache for
+            remote reads</xref>.</li>
+        <li>Enable <xref
+            href="https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/s3guard.html"
+            format="html" scope="external">S3Guard</xref> in your cluster for
+          data consistency.</li>
+        <li>
+          <p> Too many files in a table can make metadata load and update slow
+            in S3. If too many requests are made to S3, S3 has a back-off
+            mechanism and responds slower than usual.</p>
+          <ul>
+            <li>If you have many small files due to over-granular partitioning,
+              configure partitions with many megabytes of data so that even a
+              query against a single partition can be parallelized effectively. </li>
+            <li>If you have many small files because of many small
+                <codeph>INSERT</codeph> queries, use bulk
+                <codeph>INSERT</codeph>s so that more data is written to fewer
+              files. </li>
+          </ul>
+        </li>
+      </ul>
+    </conbody>
+  </concept>
 
   <concept id="s3_sql">
     <title>How Impala SQL Statements Work with S3</title>
     <conbody>
-      <p>
-        Impala SQL statements work with data on S3 as follows:
-      </p>
+      <p> Impala SQL statements work with data in S3 as follows: </p>
       <ul>
         <li>
-          <p>
-            The <xref href="impala_create_table.xml#create_table"/>
-            or <xref href="impala_alter_table.xml#alter_table"/> statements
-            can specify that a table resides on the S3 filesystem by
-            encoding an <codeph>s3a://</codeph> prefix for the <codeph>LOCATION</codeph>
-            property. <codeph>ALTER TABLE</codeph> can also set the <codeph>LOCATION</codeph>
-            property for an individual partition, so that some data in a table resides on
-            S3 and other data in the same table resides on HDFS.
-          </p>
+          <p> The <xref href="impala_create_table.xml#create_table">CREATE
+              TABLE</xref> or <xref href="impala_alter_table.xml#alter_table"
+              >ALTER TABLE</xref> statement can specify that a table resides in
+            the S3 object store by encoding an <codeph>s3a://</codeph> prefix
+            for the <codeph>LOCATION</codeph> property. <codeph>ALTER
+              TABLE</codeph> can also set the <codeph>LOCATION</codeph> property
+            for an individual partition so that some data in a table resides in
+            S3 and other data in the same table resides on HDFS. </p>
         </li>
         <li>
-          <p>
-            Once a table or partition is designated as residing on S3, the <xref href="impala_select.xml#select"/>
-            statement transparently accesses the data files from the appropriate storage layer.
-          </p>
+          <p> Once a table or partition is designated as residing in S3, the
+              <xref href="impala_select.xml#select"/> statement transparently
+            accesses the data files from the appropriate storage layer. </p>
         </li>
         <li>
           <p>
-            If the S3 table is an internal table, the <xref href="impala_drop_table.xml#drop_table"/> statement
+            If the S3 table is an internal table, the <xref
+              href="impala_drop_table.xml#drop_table">DROP TABLE</xref> statement
             removes the corresponding data files from S3 when the table is dropped.
           </p>
         </li>
         <li>
-          <p>
-            The <xref href="impala_truncate_table.xml#truncate_table"/> statement always removes the corresponding
-            data files from S3 when the table is truncated.
-          </p>
+          <p> The <xref href="impala_truncate_table.xml#truncate_table">TRUNCATE
+              TABLE</xref> statement always removes the corresponding
+            data files from S3 when the table is truncated. </p>
         </li>
         <li>
           <p>
-            The <xref href="impala_load_data.xml#load_data"/> can move data files residing in HDFS into
+            The <xref href="impala_load_data.xml#load_data">LOAD DATA</xref>
+            statement can move data files residing in HDFS into
             an S3 table.
           </p>
         </li>
         <li>
           <p>
-            The <xref href="impala_insert.xml#insert"/> statement, or the <codeph>CREATE TABLE AS SELECT</codeph>
+            The <xref href="impala_insert.xml#insert">INSERT</xref> statement, or the <codeph>CREATE TABLE AS SELECT</codeph>
             form of the <codeph>CREATE TABLE</codeph> statement, can copy data from an HDFS table or another S3
-            table into an S3 table. The <xref href="impala_s3_skip_insert_staging.xml#s3_skip_insert_staging"/>
+            table into an S3 table. The <xref
+              href="impala_s3_skip_insert_staging.xml#s3_skip_insert_staging">S3_SKIP_INSERT_STAGING</xref>
             query option chooses whether or not to use a fast code path for these write operations to S3,
             with the tradeoff of potential inconsistency in the case of a failure during the statement.
           </p>
@@ -122,14 +204,8 @@ under the License.
 
     <conbody>
 
-      <p>
-        <indexterm audience="hidden">fs.s3a.access.key configuration setting</indexterm>
-        <indexterm audience="hidden">fs.s3a.secret.key configuration setting</indexterm>
-        <indexterm audience="hidden">access.key configuration setting</indexterm>
-        <indexterm audience="hidden">secret.key configuration setting</indexterm>
-        To allow Impala to access data in S3, specify values for the following configuration settings in your
-        <filepath>core-site.xml</filepath> file:
-      </p>
+      <p> To allow Impala to access data in S3, specify values for the following
+        configuration settings in your <filepath>core-site.xml</filepath> file: </p>
 <codeblock>
 &lt;property&gt;
 &lt;name&gt;fs.s3a.access.key&lt;/name&gt;
@@ -141,11 +217,9 @@ under the License.
 &lt;/property&gt;
 </codeblock>
 
-      <p>
-        After specifying the credentials, restart both the Impala and
-        Hive services. (Restarting Hive is required because Impala queries, CREATE TABLE statements, and so on go
-        through the Hive metastore.)
-      </p>
+      <p> After specifying the credentials, restart both the Impala and Hive
+        services. Restarting Hive is required because Impala statements, such as
+          <codeph>CREATE TABLE</codeph>, go through the Hive Metastore. </p>
 
       <note type="important">
           <p>
@@ -156,6 +230,10 @@ under the License.
             trusted users.
           </p>
       </note>
+      <p>See <xref
+          href="https://www.google.com/url?q=https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html%23Authenticating_with_S3&amp;sa=D&amp;ust=1572980027740000&amp;usg=AFQjCNFnzPSfNBMVRgJZRenvhLblezHbdw"
+          format="html" scope="external">Authenticating with S3</xref> for
+        additional authentication mechanisms to access S3.</p>
 
     </conbody>
 
@@ -184,13 +262,23 @@ under the License.
     <concept id="s3_dml" rev="2.6.0 IMPALA-1878">
       <title>Using Impala DML Statements for S3 Data</title>
       <conbody>
-        <p conref="../shared/impala_common.xml#common/s3_dml"/>
+        <p>The Impala DML statements (<codeph>INSERT</codeph>, <codeph>LOAD
+            DATA</codeph>, and <codeph>CREATE TABLE AS SELECT</codeph>) can
+          write data into a table or partition that resides in S3. The syntax of
+          the DML statements is the same as for any other tables because the S3
+          location for tables and partitions is specified by an
+            <codeph>s3a://</codeph> prefix in the <codeph>LOCATION</codeph>
+          attribute of <codeph>CREATE TABLE</codeph> or <codeph>ALTER
+            TABLE</codeph> statements. If you bring data into S3 using the
+          normal S3 transfer mechanisms instead of Impala DML statements, issue
+          a <codeph>REFRESH</codeph> statement for the table before using Impala
+          to query the S3 data.</p>
         <p conref="../shared/impala_common.xml#common/s3_dml_performance"/>
       </conbody>
     </concept>
 
     <concept id="s3_manual_etl">
-      <title>Manually Loading Data into Impala Tables on S3</title>
+      <title>Manually Loading Data into Impala Tables in S3</title>
       <conbody>
         <p>
           As an alternative, or on earlier Impala releases without DML support for S3,
@@ -203,32 +291,10 @@ under the License.
           <p conref="../shared/impala_common.xml#common/s3_drop_table_purge"/>
         </note>
 
-        <p>
-          Alternative file creation techniques (less compatible with the <codeph>PURGE</codeph> clause) include:
-        </p>
-
-        <ul>
-          <li>
-            The <xref href="https://console.aws.amazon.com/s3/home" scope="external" format="html">Amazon AWS / S3
-            web interface</xref> to upload from a web browser.
-          </li>
-
-          <li>
-            The <xref href="http://aws.amazon.com/cli/" scope="external" format="html">Amazon AWS CLI</xref> to
-            manipulate files from the command line.
-          </li>
-
-          <li>
-            Other S3-enabled software, such as
-            <xref href="http://s3tools.org/s3cmd" scope="external" format="html">the S3Tools client software</xref>.
-          </li>
-        </ul>
-
-        <p>
-          After you upload data files to a location already mapped to an Impala table or partition, or if you delete
-          files in S3 from such a location, issue the <codeph>REFRESH <varname>table_name</varname></codeph>
-          statement to make Impala aware of the new set of data files.
-        </p>
+        <p> After you upload data files to a location already mapped to an
+          Impala table or partition, or if you delete files in S3 from such a
+          location, issue the <codeph>REFRESH</codeph> statement to make Impala
+          aware of the new set of data files. </p>
 
       </conbody>
     </concept>
@@ -237,7 +303,8 @@ under the License.
 
   <concept id="s3_ddl">
 
-    <title>Creating Impala Databases, Tables, and Partitions for Data Stored on S3</title>
+    <title>Creating Impala Databases, Tables, and Partitions for Data Stored in
+      S3</title>
   <prolog>
     <metadata>
       <data name="Category" value="Databases"/>
@@ -245,109 +312,80 @@ under the License.
   </prolog>
 
     <conbody>
-
-      <p>
-        Impala reads data for a table or partition from S3 based on the <codeph>LOCATION</codeph> attribute for the
-        table or partition. Specify the S3 details in the <codeph>LOCATION</codeph> clause of a <codeph>CREATE
-        TABLE</codeph> or <codeph>ALTER TABLE</codeph> statement. The notation for the <codeph>LOCATION</codeph>
-        clause is <codeph>s3a://<varname>bucket_name</varname>/<varname>path/to/file</varname></codeph>. The
-        filesystem prefix is always <codeph>s3a://</codeph> because Impala does not support the <codeph>s3://</codeph> or
-        <codeph>s3n://</codeph> prefixes.
-      </p>
-
-      <p>
-        For a partitioned table, either specify a separate <codeph>LOCATION</codeph> clause for each new partition,
-        or specify a base <codeph>LOCATION</codeph> for the table and set up a directory structure in S3 to mirror
-        the way Impala partitioned tables are structured in HDFS. Although, strictly speaking, S3 filenames do not
-        have directory paths, Impala treats S3 filenames with <codeph>/</codeph> characters the same as HDFS
-        pathnames that include directories.
-      </p>
-
-      <p>
-        You point a nonpartitioned table or an individual partition at S3 by specifying a single directory
-        path in S3, which could be any arbitrary directory. To replicate the structure of an entire Impala
-        partitioned table or database in S3 requires more care, with directories and subdirectories nested and
-        named to match the equivalent directory tree in HDFS. Consider setting up an empty staging area if
-        necessary in HDFS, and recording the complete directory structure so that you can replicate it in S3.
-        <!--
-        Or, specify an S3 location for an entire database, after which all tables and partitions created inside that
-        database automatically inherit the database <codeph>LOCATION</codeph> and create new S3 directories
-        underneath the database directory.
-        -->
-      </p>
-
-      <p>
-        For convenience when working with multiple tables with data files stored in S3, you can create a database
-        with a <codeph>LOCATION</codeph> attribute pointing to an S3 path.
-        Specify a URL of the form <codeph>s3a://<varname>bucket</varname>/<varname>root/path/for/database</varname></codeph>
-        for the <codeph>LOCATION</codeph> attribute of the database.
-        Any tables created inside that database
-        automatically create directories underneath the one specified by the database
-        <codeph>LOCATION</codeph> attribute.
-      </p>
-
-      <p>
-        For example, the following session creates a partitioned table where only a single partition resides on S3.
-        The partitions for years 2013 and 2014 are located on HDFS. The partition for year 2015 includes a
-        <codeph>LOCATION</codeph> attribute with an <codeph>s3a://</codeph> URL, and so refers to data residing on
-        S3, under a specific path underneath the bucket <codeph>impala-demo</codeph>.
-      </p>
-
-<codeblock>[localhost:21000] > create database db_on_hdfs;
-[localhost:21000] > use db_on_hdfs;
-[localhost:21000] > create table mostly_on_hdfs (x int) partitioned by (year int);
-[localhost:21000] > alter table mostly_on_hdfs add partition (year=2013);
-[localhost:21000] > alter table mostly_on_hdfs add partition (year=2014);
-[localhost:21000] > alter table mostly_on_hdfs add partition (year=2015)
-                  >   location 's3a://impala-demo/dir1/dir2/dir3/t1';
+      <p>To create a table that resides in S3, run the <codeph>CREATE
+          TABLE</codeph> or <codeph>ALTER TABLE</codeph> statement with the
+          <codeph>LOCATION</codeph> clause. </p>
+      <p><codeph>ALTER TABLE</codeph> can set the <codeph>LOCATION</codeph>
+        property for an individual partition, so that some data in a table
+        resides in S3 and other data in the same table resides on HDFS.</p>
+      <p>The syntax for the <codeph>LOCATION</codeph> clause is:</p>
+      <codeblock>LOCATION 's3a://<varname>bucket_name</varname>/<varname>path</varname>/<varname>to</varname>/<varname>file</varname>'</codeblock>
+      <p>The file system prefix is always <codeph>s3a://</codeph>. Impala does
+        not support the <codeph>s3://</codeph> or <codeph>s3n://</codeph>
+        prefixes. </p>
+      <p> For a partitioned table, either specify a separate
+          <codeph>LOCATION</codeph> clause for each new partition, or specify a
+        base <codeph>LOCATION</codeph> for the table and set up a directory
+        structure in S3 to mirror the way Impala partitioned tables are
+        structured in S3. </p>
+
+      <p> You point a nonpartitioned table or an individual partition at S3 by
+        specifying a single directory path in S3, which could be any arbitrary
+        directory. To replicate the structure of an entire Impala partitioned
+        table or database in S3 requires more care, with directories and
+        subdirectories nested and named to match the equivalent directory tree
+        in HDFS. Consider setting up an empty staging area if necessary in HDFS,
+        and recording the complete directory structure so that you can replicate
+        it in S3.  </p>
+      <p> When working with multiple tables with data files stored in S3, you can
+        create a database with a <codeph>LOCATION</codeph> attribute pointing to
+        an S3 path. Specify a URL of the form
+            <codeph>s3a://<varname>bucket</varname>/<varname>root</varname>/<varname>path</varname>/<varname>for</varname>/<varname>database</varname></codeph>
+        for the <codeph>LOCATION</codeph> attribute of the database. Any tables
+        created inside that database automatically create directories underneath
+        the one specified by the database <codeph>LOCATION</codeph> attribute. </p>
+      <p>The following example creates a table with one partition for the year
+        2017 resides on HDFS and one partition for the year 2018 resides in
+        S3.</p>
+
+      <p>The partition for year 2018 includes a <codeph>LOCATION</codeph>
+        attribute with an <codeph>s3a://</codeph> URL, and so refers to data
+        residing in S3, under a specific path underneath the bucket
+          <codeph>impala-demo</codeph>. </p>
+
+<codeblock>CREATE TABLE mostly_on_hdfs (x int) PARTITIONED BY (year INT);
+ALTER TABLE mostly_on_hdfs ADD PARTITION (year=2017);
+ALTER TABLE mostly_on_hdfs ADD PARTITION (year=2018) 
+   LOCATION 's3a://impala-demo/dir1/dir2/dir3/t1';
 </codeblock>
 
-      <p>
-        The following session creates a database and two partitioned tables residing entirely on S3, one
-        partitioned by a single column and the other partitioned by multiple columns. Because a
-        <codeph>LOCATION</codeph> attribute with an <codeph>s3a://</codeph> URL is specified for the database, the
-        tables inside that database are automatically created on S3 underneath the database directory. To see the
-        names of the associated subdirectories, including the partition key values, we use an S3 client tool to
-        examine how the directory structure is organized on S3. For example, Impala partition directories such as
-        <codeph>month=1</codeph> do not include leading zeroes, which sometimes appear in partition directories created
-        through Hive.
-      </p>
+      <p> The following session creates a database and two partitioned tables
+        residing entirely in S3, one partitioned by a single column and the
+        other partitioned by multiple columns. </p>
+      <ul>
+        <li>Because a <codeph>LOCATION</codeph> attribute with an
+            <codeph>s3a://</codeph> URL is specified for the database, the
+          tables inside that database are automatically created in S3 underneath
+          the database directory. </li>
+        <li>To see the names of the associated subdirectories, including the
+          partition key values, use an S3 client tool to examine how the
+          directory structure is organized in S3. </li>
+      </ul>
 
-<codeblock>[localhost:21000] > create database db_on_s3 location 's3a://impala-demo/dir1/dir2/dir3';
-[localhost:21000] > use db_on_s3;
+<codeblock>CREATE DATABASE db_on_s3 LOCATION 's3a://impala-demo/dir1/dir2/dir3';
+CREATE TABLE partitioned_multiple_keys (x INT)
+   PARTITIONED BY (year SMALLINT, month TINYINT, day TINYINT);
 
-[localhost:21000] > create table partitioned_on_s3 (x int) partitioned by (year int);
-[localhost:21000] > alter table partitioned_on_s3 add partition (year=2013);
-[localhost:21000] > alter table partitioned_on_s3 add partition (year=2014);
-[localhost:21000] > alter table partitioned_on_s3 add partition (year=2015);
+ALTER TABLE partitioned_multiple_keys
+   ADD PARTITION (year=2015,month=1,day=1);
+ALTER TABLE partitioned_multiple_keys
+   ADD PARTITION (year=2015,month=1,day=31);
 
-[localhost:21000] > !aws s3 ls s3://impala-demo/dir1/dir2/dir3 --recursive;
-2015-03-17 13:56:34          0 dir1/dir2/dir3/
-2015-03-17 16:43:28          0 dir1/dir2/dir3/partitioned_on_s3/
-2015-03-17 16:43:49          0 dir1/dir2/dir3/partitioned_on_s3/year=2013/
-2015-03-17 16:43:53          0 dir1/dir2/dir3/partitioned_on_s3/year=2014/
-2015-03-17 16:43:58          0 dir1/dir2/dir3/partitioned_on_s3/year=2015/
-
-[localhost:21000] > create table partitioned_multiple_keys (x int)
-                  >   partitioned by (year smallint, month tinyint, day tinyint);
-[localhost:21000] > alter table partitioned_multiple_keys
-                  >   add partition (year=2015,month=1,day=1);
-[localhost:21000] > alter table partitioned_multiple_keys
-                  >   add partition (year=2015,month=1,day=31);
-[localhost:21000] > alter table partitioned_multiple_keys
-                  >   add partition (year=2015,month=2,day=28);
-
-[localhost:21000] > !aws s3 ls s3://impala-demo/dir1/dir2/dir3 --recursive;
+!hdfs dfs -ls -R s3a://impala-demo/dir1/dir2/dir3
 2015-03-17 13:56:34          0 dir1/dir2/dir3/
 2015-03-17 16:47:13          0 dir1/dir2/dir3/partitioned_multiple_keys/
 2015-03-17 16:47:44          0 dir1/dir2/dir3/partitioned_multiple_keys/year=2015/month=1/day=1/
-2015-03-17 16:47:50          0 dir1/dir2/dir3/partitioned_multiple_keys/year=2015/month=1/day=31/
-2015-03-17 16:47:57          0 dir1/dir2/dir3/partitioned_multiple_keys/year=2015/month=2/day=28/
-2015-03-17 16:43:28          0 dir1/dir2/dir3/partitioned_on_s3/
-2015-03-17 16:43:49          0 dir1/dir2/dir3/partitioned_on_s3/year=2013/
-2015-03-17 16:43:53          0 dir1/dir2/dir3/partitioned_on_s3/year=2014/
-2015-03-17 16:43:58          0 dir1/dir2/dir3/partitioned_on_s3/year=2015/
-</codeblock>
+2015-03-17 16:47:50          0 dir1/dir2/dir3/partitioned_multiple_keys/year=2015/month=1/day=31/</codeblock>
 
       <p>
         The <codeph>CREATE DATABASE</codeph> and <codeph>CREATE TABLE</codeph> statements create the associated
@@ -356,21 +394,19 @@ under the License.
         -p</codeph>.
       </p>
 
-      <p>
-        Use the standard S3 file upload methods to actually put the data files into the right locations. You can
-        also put the directory paths and data files in place before creating the associated Impala databases or
-        tables, and Impala automatically uses the data from the appropriate location after the associated databases
-        and tables are created.
-      </p>
-
-      <p>
-        You can switch whether an existing table or partition points to data in HDFS or S3. For example, if you
-        have an Impala table or partition pointing to data files in HDFS or S3, and you later transfer those data
-        files to the other filesystem, use an <codeph>ALTER TABLE</codeph> statement to adjust the
-        <codeph>LOCATION</codeph> attribute of the corresponding table or partition to reflect that change. Because
-        Impala does not have an <codeph>ALTER DATABASE</codeph> statement, this location-switching technique is not
-        practical for entire databases that have a custom <codeph>LOCATION</codeph> attribute.
-      </p>
+      <p> Use the standard S3 file upload methods to put the actual data files
+        into the right locations. You can also put the directory paths and data
+        files in place before creating the associated Impala databases or
+        tables, and Impala automatically uses the data from the appropriate
+        location after the associated databases and tables are created. </p>
+      <p>Use the <codeph>ALTER TABLE</codeph> statement with the
+          <codeph>LOCATION</codeph> clause to switch whether an existing table
+        or partition points to data in HDFS or S3. For example, if you have an
+        Impala table or partition pointing to data files in HDFS or S3, and you
+        later transfer those data files to the other filesystem, use the
+          <codeph>ALTER TABLE</codeph> statement to adjust the
+          <codeph>LOCATION</codeph> attribute of the corresponding table or
+        partition to reflect that change. </p>
 
     </conbody>
 
@@ -378,140 +414,28 @@ under the License.
 
   <concept id="s3_internal_external">
 
-    <title>Internal and External Tables Located on S3</title>
+    <title>Internal and External Tables Located in S3</title>
 
     <conbody>
 
-      <p>
-        Just as with tables located on HDFS storage, you can designate S3-based tables as either internal (managed
-        by Impala) or external, by using the syntax <codeph>CREATE TABLE</codeph> or <codeph>CREATE EXTERNAL
-        TABLE</codeph> respectively. When you drop an internal table, the files associated with the table are
-        removed, even if they are on S3 storage. When you drop an external table, the files associated with the
-        table are left alone, and are still available for access by other tools or components. See
-        <xref href="impala_tables.xml#tables"/> for details.
-      </p>
-
-      <p>
-        If the data on S3 is intended to be long-lived and accessed by other tools in addition to Impala, create
-        any associated S3 tables with the <codeph>CREATE EXTERNAL TABLE</codeph> syntax, so that the files are not
-        deleted from S3 when the table is dropped.
-      </p>
-
-      <p>
-        If the data on S3 is only needed for querying by Impala and can be safely discarded once the Impala
-        workflow is complete, create the associated S3 tables using the <codeph>CREATE TABLE</codeph> syntax, so
-        that dropping the table also deletes the corresponding data files on S3.
-      </p>
-
-      <p>
-        For example, this session creates a table in S3 with the same column layout as a table in HDFS, then
-        examines the S3 table and queries some data from it. The table in S3 works the same as a table in HDFS as
-        far as the expected file format of the data, table and column statistics, and other table properties. The
-        only indication that it is not an HDFS table is the <codeph>s3a://</codeph> URL in the
-        <codeph>LOCATION</codeph> property. Many data files can reside in the S3 directory, and their combined
-        contents form the table data. Because the data in this example is uploaded after the table is created, a
-        <codeph>REFRESH</codeph> statement prompts Impala to update its cached information about the data files.
-      </p>
-
-<codeblock>[localhost:21000] > create table usa_cities_s3 like usa_cities location 's3a://impala-demo/usa_cities';
-[localhost:21000] > desc usa_cities_s3;
-+-------+----------+---------+
-| name  | type     | comment |
-+-------+----------+---------+
-| id    | smallint |         |
-| city  | string   |         |
-| state | string   |         |
-+-------+----------+---------+
-
--- Now from a web browser, upload the same data file(s) to S3 as in the HDFS table,
--- under the relevant bucket and path. If you already have the data in S3, you would
--- point the table LOCATION at an existing path.
-
-[localhost:21000] > refresh usa_cities_s3;
-[localhost:21000] > select count(*) from usa_cities_s3;
-+----------+
-| count(*) |
-+----------+
-| 289      |
-+----------+
-[localhost:21000] > select distinct state from sample_data_s3 limit 5;
-+----------------------+
-| state                |
-+----------------------+
-| Louisiana            |
-| Minnesota            |
-| Georgia              |
-| Alaska               |
-| Ohio                 |
-+----------------------+
-[localhost:21000] > desc formatted usa_cities_s3;
-+------------------------------+------------------------------+---------+
-| name                         | type                         | comment |
-+------------------------------+------------------------------+---------+
-| # col_name                   | data_type                    | comment |
-|                              | NULL                         | NULL    |
-| id                           | smallint                     | NULL    |
-| city                         | string                       | NULL    |
-| state                        | string                       | NULL    |
-|                              | NULL                         | NULL    |
-| # Detailed Table Information | NULL                         | NULL    |
-| Database:                    | s3_testing                   | NULL    |
-| Owner:                       | jrussell                     | NULL    |
-| CreateTime:                  | Mon Mar 16 11:36:25 PDT 2015 | NULL    |
-| LastAccessTime:              | UNKNOWN                      | NULL    |
-| Protect Mode:                | None                         | NULL    |
-| Retention:                   | 0                            | NULL    |
-| Location:                    | s3a://impala-demo/usa_cities | NULL    |
-| Table Type:                  | MANAGED_TABLE                | NULL    |
-...
-+------------------------------+------------------------------+---------+
-</codeblock>
-
-<!-- Cut out unnecessary output, makes the example too wide.
-| Table Parameters:            | NULL                                                           | NULL                 |
-|                              | COLUMN_STATS_ACCURATE                                          | false                |
-|                              | numFiles                                                       | 0                    |
-|                              | numRows                                                        | -1                   |
-|                              | rawDataSize                                                    | -1                   |
-|                              | totalSize                                                      | 0                    |
-|                              | transient_lastDdlTime                                          | 1426528176           |
-|                              | NULL                                                           | NULL                 |
-| # Storage Information        | NULL                                                           | NULL                 |
-| SerDe Library:               | org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe             | NULL                 |
-| InputFormat:                 | org.apache.hadoop.mapred.TextInputFormat                       | NULL                 |
-| OutputFormat:                | org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat     | NULL                 |
-| Compressed:                  | No                                                             | NULL                 |
-| Num Buckets:                 | 0                                                              | NULL                 |
-| Bucket Columns:              | []                                                             | NULL                 |
-| Sort Columns:                | []                                                             | NULL                 |
--->
-
-      <p>
-        In this case, we have already uploaded a Parquet file with a million rows of data to the
-        <codeph>sample_data</codeph> directory underneath the <codeph>impala-demo</codeph> bucket on S3. This
-        session creates a table with matching column settings pointing to the corresponding location in S3, then
-        queries the table. Because the data is already in place on S3 when the table is created, no
-        <codeph>REFRESH</codeph> statement is required.
-      </p>
-
-<codeblock>[localhost:21000] > create table sample_data_s3
-                  > (id int, id bigint, val int, zerofill string,
-                  > name string, assertion boolean, city string, state string)
-                  > stored as parquet location 's3a://impala-demo/sample_data';
-[localhost:21000] > select count(*) from sample_data_s3;;
-+----------+
-| count(*) |
-+----------+
-| 1000000  |
-+----------+
-[localhost:21000] > select count(*) howmany, assertion from sample_data_s3 group by assertion;
-+---------+-----------+
-| howmany | assertion |
-+---------+-----------+
-| 667149  | true      |
-| 332851  | false     |
-+---------+-----------+
-</codeblock>
+      <p> Just as with tables located on HDFS storage, you can designate
+        S3-based tables as either internal (managed by Impala) or external, by
+        using the syntax <codeph>CREATE TABLE</codeph> or <codeph>CREATE
+          EXTERNAL TABLE</codeph> respectively. </p>
+      <p>When you drop an internal table, the files associated with the table
+        are removed, even if they are in S3 storage. When you drop an external
+        table, the files associated with the table are left alone, and are still
+        available for access by other tools or components.</p>
+
+      <p> If the data in S3 is intended to be long-lived and accessed by other
+        tools in addition to Impala, create any associated S3 tables with the
+          <codeph>CREATE EXTERNAL TABLE</codeph> syntax, so that the files are
+        not deleted from S3 when the table is dropped. </p>
+
+      <p> If the data in S3 is only needed for querying by Impala and can be
+        safely discarded once the Impala workflow is complete, create the
+        associated S3 tables using the <codeph>CREATE TABLE</codeph> syntax, so
+        that dropping the table also deletes the corresponding data files in S3. </p>
 
     </conbody>
 
@@ -519,14 +443,12 @@ under the License.
 
   <concept id="s3_queries">
 
-    <title>Running and Tuning Impala Queries for Data Stored on S3</title>
+    <title>Running and Tuning Impala Queries for Data Stored in S3</title>
 
     <conbody>
-
-      <p>
-        Once the appropriate <codeph>LOCATION</codeph> attributes are set up at the table or partition level, you
-        query data stored in S3 exactly the same as data stored on HDFS or in HBase:
-      </p>
+      <p> Once a table or partition is designated as residing in S3, the
+          <codeph>SELECT</codeph> statement transparently accesses the data
+        files from the appropriate storage layer. </p>
 
       <ul>
         <li>
@@ -543,34 +465,26 @@ under the License.
           HDFS and HBase tables can be joined to S3 tables, or S3 tables can be joined with each other.
         </li>
 
-        <li>
-          Authorization using the Sentry framework to control access to databases, tables, or columns works the
-          same whether the data is in HDFS or in S3.
-        </li>
-
-        <li>
-          The <cmdname>catalogd</cmdname> daemon caches metadata for both HDFS and S3 tables. Use
-          <codeph>REFRESH</codeph> and <codeph>INVALIDATE METADATA</codeph> for S3 tables in the same situations
-          where you would issue those statements for HDFS tables.
-        </li>
+        <li> Authorization to control access to databases, tables, or columns
+          works the same whether the data is in HDFS or in S3. </li>
+        <li> The Catalog Server (<cmdname>catalogd</cmdname>) daemon caches
+          metadata for both HDFS and S3 tables.</li>
 
         <li>
           Queries against S3 tables are subject to the same kinds of admission control and resource management as
           HDFS tables.
         </li>
 
-        <li>
-          Metadata about S3 tables is stored in the same metastore database as for HDFS tables.
-        </li>
+        <li> Metadata about S3 tables is stored in the same Metastore database
+          as for HDFS tables. </li>
 
         <li>
           You can set up views referring to S3 tables, the same as for HDFS tables.
         </li>
 
-        <li>
-          The <codeph>COMPUTE STATS</codeph>, <codeph>SHOW TABLE STATS</codeph>, and <codeph>SHOW COLUMN
-          STATS</codeph> statements work for S3 tables also.
-        </li>
+        <li> The <codeph>COMPUTE STATS</codeph>, <codeph>SHOW TABLE
+            STATS</codeph>, and <codeph>SHOW COLUMN STATS</codeph> statements
+          work for S3 tables. </li>
       </ul>
 
     </conbody>
@@ -586,50 +500,45 @@ under the License.
 
       <conbody>
 
-        <p>
-          Although Impala queries for data stored in S3 might be less performant than queries against the
-          equivalent data stored in HDFS, you can still do some tuning. Here are techniques you can use to
-          interpret explain plans and profiles for queries against S3 data, and tips to achieve the best
-          performance possible for such queries.
-        </p>
-
-        <p>
-          All else being equal, performance is expected to be lower for queries running against data on S3 rather
-          than HDFS. The actual mechanics of the <codeph>SELECT</codeph> statement are somewhat different when the
-          data is in S3. Although the work is still distributed across the datanodes of the cluster, Impala might
-          parallelize the work for a distributed query differently for data on HDFS and S3. S3 does not have the
-          same block notion as HDFS, so Impala uses heuristics to determine how to split up large S3 files for
-          processing in parallel. Because all hosts can access any S3 data file with equal efficiency, the
-          distribution of work might be different than for HDFS data, where the data blocks are physically read
-          using short-circuit local reads by hosts that contain the appropriate block replicas. Although the I/O to
-          read the S3 data might be spread evenly across the hosts of the cluster, the fact that all data is
-          initially retrieved across the network means that the overall query performance is likely to be lower for
-          S3 data than for HDFS data.
-        </p>
-
-        <p conref="../shared/impala_common.xml#common/s3_block_splitting"/>
-        <p>Starting in Impala 3.4.0, use the query option
-            <codeph>PARQUET_OBJECT_STORE_SPLIT_SIZE</codeph> to control the
-          Parquet split size for non-block stores (e.g. S3, ADLS, etc.). The
-          default value is 256 MB.</p>
-
-        <p conref="../shared/impala_common.xml#common/s3_dml_performance"/>
-
-        <p>
-          When optimizing aspects of for complex queries such as the join order, Impala treats tables on HDFS and
-          S3 the same way. Therefore, follow all the same tuning recommendations for S3 tables as for HDFS ones,
-          such as using the <codeph>COMPUTE STATS</codeph> statement to help Impala construct accurate estimates of
-          row counts and cardinality. See <xref href="impala_performance.xml#performance"/> for details.
-        </p>
-
-        <p>
-          In query profile reports, the numbers for <codeph>BytesReadLocal</codeph>,
-          <codeph>BytesReadShortCircuit</codeph>, <codeph>BytesReadDataNodeCached</codeph>, and
-          <codeph>BytesReadRemoteUnexpected</codeph> are blank because those metrics come from HDFS.
-          If you do see any indications that a query against an S3 table performed <q>remote read</q>
-          operations, do not be alarmed. That is expected because, by definition, all the I/O for S3 tables involves
-          remote reads.
-        </p>
+        <p>Here are techniques you can use to interpret explain plans and
+          profiles for queries against S3 data, and tips to achieve the best
+          performance possible for such queries. </p>
+
+        <p> All else being equal, performance is expected to be lower for
+          queries running against data in S3 rather than HDFS. The actual
+          mechanics of the <codeph>SELECT</codeph> statement are somewhat
+          different when the data is in S3. Although the work is still
+          distributed across the DataNodes of the cluster, Impala might
+          parallelize the work for a distributed query differently for data on
+          HDFS and S3.</p>
+        <p>S3 does not have the same block notion as HDFS, so Impala uses
+          heuristics to determine how to split up large S3 files for processing
+          in parallel. Because all hosts can access any S3 data file with equal
+          efficiency, the distribution of work might be different than for HDFS
+          data, where the data blocks are physically read using short-circuit
+          local reads by hosts that contain the appropriate block replicas.
+          Although the I/O to read the S3 data might be spread evenly across the
+          hosts of the cluster, the fact that all data is initially retrieved
+          across the network means that the overall query performance is likely
+          to be lower for S3 data than for HDFS data. </p>
+        <p>Use the <codeph>PARQUET_OBJECT_STORE_SPLIT_SIZE</codeph> query option
+          to control the Parquet-specific split size. The default value is 256
+          MB.</p>
+
+        <p> When optimizing aspects of complex queries, such as the join order,
+          Impala treats tables on HDFS and S3 the same way. Therefore, follow
+          all the same tuning recommendations for S3 tables as for HDFS ones,
+          such as using the <codeph>COMPUTE STATS</codeph> statement to help
+          Impala construct accurate estimates of row counts and cardinality. See
+            <xref href="impala_performance.xml#performance"/> for details. </p>
+
+        <p> In query profile reports, the numbers for
+            <codeph>BytesReadLocal</codeph>,
+            <codeph>BytesReadShortCircuit</codeph>,
+            <codeph>BytesReadDataNodeCached</codeph>, and
+            <codeph>BytesReadRemoteUnexpected</codeph> are blank because those
+          metrics come from HDFS. By definition, all the I/O for S3 tables
+          involves remote reads. </p>
 
       </conbody>
 
@@ -643,120 +552,23 @@ under the License.
 
     <conbody>
 
-      <p>
-        Impala requires that the default filesystem for the cluster be HDFS. You cannot use S3 as the only
-        filesystem in the cluster.
-      </p>
-
-      <p rev="2.6.0 IMPALA-1878">
-        Prior to <keyword keyref="impala26_full"/> Impala could not perform DML operations (<codeph>INSERT</codeph>,
-        <codeph>LOAD DATA</codeph>, or <codeph>CREATE TABLE AS SELECT</codeph>) where the destination is a table
-        or partition located on an S3 filesystem. This restriction is lifted in <keyword keyref="impala26_full"/> and higher.
-      </p>
-
-      <p>
-        Impala does not support the old <codeph>s3://</codeph> block-based and <codeph>s3n://</codeph> filesystem
-        schemes, only <codeph>s3a://</codeph>.
-      </p>
-
-      <p>
-        Although S3 is often used to store JSON-formatted data, the current Impala support for S3 does not include
-        directly querying JSON data. For Impala queries, use data files in one of the file formats listed in
-        <xref href="impala_file_formats.xml#file_formats"/>. If you have data in JSON format, you can prepare a
-        flattened version of that data for querying by Impala as part of your ETL cycle.
-      </p>
-
-      <p>
-        You cannot use the <codeph>ALTER TABLE ... SET CACHED</codeph> statement for tables or partitions that are
-        located in S3.
-      </p>
-
-    </conbody>
-
-  </concept>
-
-  <concept id="s3_best_practices" rev="2.6.0 IMPALA-1878">
-    <title>Best Practices for Using Impala with S3</title>
-    <prolog>
-      <metadata>
-        <data name="Category" value="Guidelines"/>
-        <data name="Category" value="Best Practices"/>
-      </metadata>
-    </prolog>
-    <conbody>
-      <p>
-        The following guidelines represent best practices derived from testing and field experience with Impala on S3:
-      </p>
+      <p>The following restrictions apply when using Impala with S3:</p>
       <ul>
-        <li>
-          <p>
-            Any reference to an S3 location must be fully qualified. (This rule applies when
-            S3 is not designated as the default filesystem.)
-          </p>
-        </li>
-        <li>
-          <p> Set <codeph>fs.s3a.connection.maximum</codeph> to 1500 for
-              <cmdname>impalad</cmdname>. </p>
-        </li>
-        <li>
-          <p> Set <codeph>fs.s3a.block.size</codeph> to 134217728 (128 MB in
-            bytes) if most Parquet files queried by Impala were written by Hive
-            or ParquetMR jobs. Set the block size to 268435456 (256 MB in bytes)
-            if most Parquet files queried by Impala were written by Impala. </p>
-          <p>As of Impala 3.4.0, the query option
-              <codeph>PARQUET_OBJECT_STORE_SPLIT_SIZE</codeph> controls the
-            Parquet split size for non-block stores (e.g. S3, ADLS, etc.). The
-            default value is 256 MB.</p>
-        </li>
-        <li>
-          <p>
-            <codeph>DROP TABLE .. PURGE</codeph> is much faster than the default <codeph>DROP TABLE</codeph>.
-            The same applies to <codeph>ALTER TABLE ... DROP PARTITION PURGE</codeph>
-            versus the default <codeph>DROP PARTITION</codeph> operation.
-            However, due to the eventually consistent nature of S3, the files for that
-            table or partition could remain for some unbounded time when using <codeph>PURGE</codeph>.
-            The default <codeph>DROP TABLE/PARTITION</codeph> is slow because Impala copies the files to the HDFS trash folder,
-            and Impala waits until all the data is moved. <codeph>DROP TABLE/PARTITION .. PURGE</codeph> is a
-            fast delete operation, and the Impala statement finishes quickly even though the change might not
-            have propagated fully throughout S3.
-          </p>
-        </li>
-        <li>
-          <p>
-            <codeph>INSERT</codeph> statements are faster than <codeph>INSERT OVERWRITE</codeph> for S3.
-            The query option <codeph>S3_SKIP_INSERT_STAGING</codeph>, which is set to <codeph>true</codeph> by default,
-            skips the staging step for regular <codeph>INSERT</codeph> (but not <codeph>INSERT OVERWRITE</codeph>).
-            This makes the operation much faster, but consistency is not guaranteed: if a node fails during execution, the
-            table could end up with inconsistent data. Set this option to <codeph>false</codeph> if stronger
-            consistency is required, however this setting will make the <codeph>INSERT</codeph> operations slower.
-          </p>
-        </li>
-        <li>
-          <p>
-            Too many files in a table can make metadata loading and updating slow on S3.
-            If too many requests are made to S3, S3 has a back-off mechanism and
-            responds slower than usual. You might have many small files because of:
-          </p>
-          <ul>
-            <li>
-              <p>
-                Too many partitions due to over-granular partitioning. Prefer partitions with
-                many megabytes of data, so that even a query against a single partition can
-                be parallelized effectively.
-              </p>
-            </li>
-            <li>
-              <p>
-                Many small <codeph>INSERT</codeph> queries. Prefer bulk
-                <codeph>INSERT</codeph>s so that more data is written to fewer
-                files.
-              </p>
-            </li>
-          </ul>
-        </li>
+        <li> Impala does not support the old <codeph>s3://</codeph> block-based
+          and <codeph>s3n://</codeph> filesystem schemes, and it only supports
+            <codeph>s3a://</codeph>. </li>
+        <li>Although S3 is often used to store JSON-formatted data, the current
+          Impala support for S3 does not include directly querying JSON data.
+          For Impala queries, use data files in one of the file formats listed
+          in <xref href="impala_file_formats.xml#file_formats"/>. If you have
+          data in JSON format, you can prepare a flattened version of that data
+          for querying by Impala as part of your ETL cycle. </li>
+        <li>You cannot use the <codeph>ALTER TABLE ... SET CACHED</codeph>
+          statement for tables or partitions that are located in S3. </li>
       </ul>
 
     </conbody>
+
   </concept>
 
 


[impala] 02/04: IMPALA-8065: Add OS distribution name in OSInfo

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit ae2425241d51ac5b9559401faa3c66c694f0fd1c
Author: xiaomeng <xi...@cloudera.com>
AuthorDate: Tue Oct 22 14:01:30 2019 -0700

    IMPALA-8065: Add OS distribution name in OSInfo
    
    Before this change OsInfo::DebugString() would print two lines:
    - OS version: the long name of the Linux kernel from /proc/version
    - Clock: the type of clock used
    After this change OsInfo::DebugString() will print three lines:
    - OS distribution: the short name of the OS release.
      If Docker is being used this is the name of the Container OS.
    - OS version: the long name of the Linux kernel from /proc/version.
      If Docker is being used this is the description of the Host Kernel.
    - Clock: the type of clock used.
    
    Tested locally, the displayed OS Info in Ubuntu16 dev box is:
    OS distribution: Ubuntu 16.04.6 LTS
    OS version: Linux version 4.15.0-65-generic (buildd@lcy01-amd64-017)
    (gcc version 5.4.0 20160609 (Ubuntu 5.4.0-6ubuntu1~16.04.10))
    Clock: clocksource: 'tsc', clockid_t: CLOCK_MONOTONIC
    
    Also checked with diff OS in docker: centos, redhat, ubuntu, oracle,
    debian to make sure /etc/os-release exists and PRETTY_NAME in that file.
    Each OS picked one version to test.
    Specially for centos6 and redhat6, which have redhat-release instead of
    os-release, copied redhat-release into Ubuntu16 dev box and verified os
    version in mini-cluster.
    
    Added new backend test os-info-test.cc.
    
    Change-Id: I848c9e53ee4e0bf8ae0874bb6da28e8efa7f7c8a
    Reviewed-on: http://gerrit.cloudera.org:8080/14531
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/util/CMakeLists.txt                 |  2 ++
 be/src/util/{os-info.h => os-info-test.cc} | 45 +++++----------------------
 be/src/util/os-info.cc                     | 50 +++++++++++++++++++++++++-----
 be/src/util/os-info.h                      | 10 ++++++
 4 files changed, 62 insertions(+), 45 deletions(-)

diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt
index e0bbda0..ef3fbb2 100644
--- a/be/src/util/CMakeLists.txt
+++ b/be/src/util/CMakeLists.txt
@@ -120,6 +120,7 @@ add_library(UtilTests STATIC
   metrics-test.cc
   min-max-filter-test.cc
   openssl-util-test.cc
+  os-info-test.cc
   os-util-test.cc
   parse-util-test.cc
   pretty-printer-test.cc
@@ -182,6 +183,7 @@ ADD_UNIFIED_BE_LSAN_TEST(lru-cache-test "FifoMultimap.*")
 ADD_UNIFIED_BE_LSAN_TEST(metrics-test "MetricsTest.*")
 ADD_UNIFIED_BE_LSAN_TEST(min-max-filter-test "MinMaxFilterTest.*")
 ADD_UNIFIED_BE_LSAN_TEST(openssl-util-test "OpenSSLUtilTest.*")
+ADD_UNIFIED_BE_LSAN_TEST(os-info-test "OsInfo.*")
 ADD_UNIFIED_BE_LSAN_TEST(os-util-test "OsUtil.*")
 ADD_UNIFIED_BE_LSAN_TEST(parse-util-test "ParseMemSpecs.*")
 ADD_UNIFIED_BE_LSAN_TEST(pretty-printer-test "PrettyPrinterTest.*")
diff --git a/be/src/util/os-info.h b/be/src/util/os-info-test.cc
similarity index 51%
copy from be/src/util/os-info.h
copy to be/src/util/os-info-test.cc
index 295d0b2..ebc5225 100644
--- a/be/src/util/os-info.h
+++ b/be/src/util/os-info-test.cc
@@ -15,43 +15,14 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#ifndef IMPALA_UTIL_OS_INFO_H
-#define IMPALA_UTIL_OS_INFO_H
+#include "os-info.h"
+#include "testutil/gtest-util.h"
 
-#include <time.h>
-
-#include <string>
-
-#include "common/logging.h"
-
-namespace impala {
-
-/// Provides information about the OS we're running on.
-class OsInfo {
- public:
-  /// Initialize OsInfo.
-  static void Init();
-
-  static const std::string os_version() {
-    DCHECK(initialized_);
-    return os_version_;
-  }
-
-  /// Return CLOCK_MONOTONIC if it's fast. Otherwise CLOCK_MONOTONIC_COARSE, which will be
-  /// fast but lower resolution.
-  static clockid_t fast_clock() {
-    DCHECK(initialized_);
-    return fast_clock_;
-  }
-
-  static std::string DebugString();
-
- private:
-  static bool initialized_;
-  static std::string os_version_;
-  static clockid_t fast_clock_;
-  static std::string clock_name_;
-};
+using namespace impala;
 
+// Test OsInfo can extract os_distribution and os_version correctly from local machine.
+TEST(OsInfo, GetOsVersion) {
+  OsInfo osinfo;
+  ASSERT_NE(osinfo.os_distribution(), "Unknown");
+  ASSERT_NE(osinfo.os_version(), "Unknown");
 }
-#endif
diff --git a/be/src/util/os-info.cc b/be/src/util/os-info.cc
index cf76b72..0103e76 100644
--- a/be/src/util/os-info.cc
+++ b/be/src/util/os-info.cc
@@ -17,19 +17,26 @@
 
 #include "util/os-info.h"
 
-#include <iostream>
-#include <fstream>
-#include <sstream>
 #include <stdlib.h>
 #include <string.h>
+#include <fstream>
+#include <iostream>
+#include <sstream>
 
 #include <unistd.h>
+#include <boost/algorithm/string.hpp>
+#include <sys/stat.h>
 
 #include "common/names.h"
 
+using boost::algorithm::is_any_of;
+using boost::algorithm::split;
+using boost::algorithm::token_compress_on;
+
 namespace impala {
 
 bool OsInfo::initialized_ = false;
+string OsInfo::os_distribution_ = "Unknown";
 string OsInfo::os_version_ = "Unknown";
 clockid_t OsInfo::fast_clock_ = CLOCK_MONOTONIC;
 std::string OsInfo::clock_name_ =
@@ -46,10 +53,36 @@ std::string OsInfo::clock_name_ =
 
 void OsInfo::Init() {
   DCHECK(!initialized_);
+  struct stat info;
+  // Read from /etc/os-release
+  if (stat("/etc/os-release", &info) == 0) {
+    ifstream os_distribution("/etc/os-release", ios::in);
+    string line;
+    while (os_distribution.good() && !os_distribution.eof()) {
+      getline(os_distribution, line);
+      vector<string> fields;
+      split(fields, line, is_any_of("="), token_compress_on);
+      if (fields[0].compare("PRETTY_NAME") == 0) {
+        os_distribution_ = fields[1].data();
+        // remove quotes around os distribution
+        os_distribution_.erase(
+            remove(os_distribution_.begin(), os_distribution_.end(), '\"'),
+            os_distribution_.end());
+        break;
+      }
+    }
+    if (os_distribution.is_open()) os_distribution.close();
+  } else if (stat("/etc/redhat-release", &info) == 0) {
+    // Only old distributions like centos 6, redhat 6
+    ifstream os_distribution("/etc/redhat-release", ios::in);
+    if (os_distribution.good()) getline(os_distribution, os_distribution_);
+    if (os_distribution.is_open()) os_distribution.close();
+  }
+
   // Read from /proc/version
-  ifstream version("/proc/version", ios::in);
-  if (version.good()) getline(version, os_version_);
-  if (version.is_open()) version.close();
+  ifstream os_version("/proc/version", ios::in);
+  if (os_version.good()) getline(os_version, os_version_);
+  if (os_version.is_open()) os_version.close();
 
   // Read the current clocksource to see if CLOCK_MONOTONIC is known to be fast. "tsc" is
   // fast, while "xen" is slow (40 times slower than "tsc" on EC2). If CLOCK_MONOTONIC is
@@ -76,9 +109,10 @@ void OsInfo::Init() {
 string OsInfo::DebugString() {
   DCHECK(initialized_);
   stringstream stream;
-  stream << "OS version: " << os_version_ << endl
+  stream << "OS distribution: " << os_distribution_ << endl
+         << "OS version: " << os_version_ << endl
          << "Clock: " << clock_name_ << endl;
   return stream.str();
 }
 
-}
+} // namespace impala
diff --git a/be/src/util/os-info.h b/be/src/util/os-info.h
index 295d0b2..6c9f404 100644
--- a/be/src/util/os-info.h
+++ b/be/src/util/os-info.h
@@ -32,6 +32,15 @@ class OsInfo {
   /// Initialize OsInfo.
   static void Init();
 
+  /// Simple name of the OS.
+  /// If Docker is used this is the name of Container OS.
+  static const std::string os_distribution() {
+    DCHECK(initialized_);
+    return os_distribution_;
+  }
+
+  /// The version of Linux kernel and the version of the compiler used to build it.
+  /// If Docker is used this is the host kernel.
   static const std::string os_version() {
     DCHECK(initialized_);
     return os_version_;
@@ -48,6 +57,7 @@ class OsInfo {
 
  private:
   static bool initialized_;
+  static std::string os_distribution_;
   static std::string os_version_;
   static clockid_t fast_clock_;
   static std::string clock_name_;


[impala] 04/04: IMPALA-9146: Add a configurable limit for the size of broadcast input.

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 0f2aa509891642e29763aa24e47f07554932c7d2
Author: Aman Sinha <am...@cloudera.com>
AuthorDate: Sun Nov 3 17:50:17 2019 -0500

    IMPALA-9146: Add a configurable limit for the size of broadcast input.
    
    Impala's DistributedPlanner may sometimes accidentally choose broadcast
    distribution for inputs that are larger than the destination executor's
    total memory. This could potentially happen if the cluster membership is
    not accurately known and the planner's cost computation of the
    broadcastCost vs partitionCost happens to favor the broadcast
    distribution. This causes spilling and severely affects performance.
    Although the DistributedPlanner does a mem_limit check before picking
    broadcast, the mem_limit is not an accurate reflection since it is
    assigned during admission control.
    
    As a safety here we introduce an explicit configurable limit:
    broadcast_bytes_limit for the size of the broadcast input and set it
    to default of 32GB. The default is chosen based on analysis of existing
    benchmark queries and representative workloads such that in vast
    majority of the cases the parameter value does not need to be changed.
    If the estimated input size on the build side is greater than this
    threshold, the DistributedPlanner will fall back to a partition
    distribution. Setting this parameter to 0 causes it to be ignored.
    
    Testing:
     - Ran all regression tests on Jenkins successfully
     - Added a few unit testis in PlannerTest that (a) set the
    broadcast_bytes_limit to a small value and checks whether the
    distributed plan does hash partitioning on the build side instead
    of broadcast, (b) pass a broadcast hint to override the config
    setting, (c) verify the standard case where broadcast threshold
    is larger than the build input size.
    
    Change-Id: Ibe5639ca38acb72e0194aa80bc6ebb6cafb2acd9
    Reviewed-on: http://gerrit.cloudera.org:8080/14690
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/service/query-options-test.cc               |  1 +
 be/src/service/query-options.cc                    |  9 ++++
 be/src/service/query-options.h                     |  5 +-
 common/thrift/ImpalaInternalService.thrift         |  6 ++-
 common/thrift/ImpalaService.thrift                 |  6 +++
 .../apache/impala/planner/DistributedPlanner.java  | 11 ++++-
 .../org/apache/impala/planner/PlannerTest.java     | 15 ++++++
 .../org/apache/impala/planner/PlannerTestBase.java |  6 +++
 .../PlannerTest/broadcast-bytes-limit-large.test   | 24 ++++++++++
 .../queries/PlannerTest/broadcast-bytes-limit.test | 53 ++++++++++++++++++++++
 10 files changed, 131 insertions(+), 5 deletions(-)

diff --git a/be/src/service/query-options-test.cc b/be/src/service/query-options-test.cc
index e587103..87bb3d5 100644
--- a/be/src/service/query-options-test.cc
+++ b/be/src/service/query-options-test.cc
@@ -151,6 +151,7 @@ TEST(QueryOptions, SetByteOptions) {
       {MAKE_OPTIONDEF(scan_bytes_limit), {-1, I64_MAX}},
       {MAKE_OPTIONDEF(topn_bytes_limit), {-1, I64_MAX}},
       {MAKE_OPTIONDEF(mem_limit_executors), {-1, I64_MAX}},
+      {MAKE_OPTIONDEF(broadcast_bytes_limit), {-1, I64_MAX}},
   };
   vector<pair<OptionDef<int32_t>, Range<int32_t>>> case_set_i32{
       {MAKE_OPTIONDEF(runtime_filter_min_size),
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index 5d1ab07..a787865 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -907,6 +907,15 @@ Status impala::SetQueryOption(const string& key, const string& value,
         query_options->__set_mem_limit_executors(bytes_limit);
         break;
       }
+      case TImpalaQueryOptions::BROADCAST_BYTES_LIMIT: {
+        // Parse the broadcast_bytes limit and validate it
+        int64_t broadcast_bytes_limit;
+        RETURN_IF_ERROR(
+            ParseMemValue(value, "broadcast bytes limit for join operations",
+                &broadcast_bytes_limit));
+        query_options->__set_broadcast_bytes_limit(broadcast_bytes_limit);
+        break;
+      }
       default:
         if (IsRemovedQueryOption(key)) {
           LOG(WARNING) << "Ignoring attempt to set removed query option '" << key << "'";
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index 01112ad..032bfb5 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -47,7 +47,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
 // time we add or remove a query option to/from the enum TImpalaQueryOptions.
 #define QUERY_OPTS_TABLE\
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
-      TImpalaQueryOptions::MEM_LIMIT_EXECUTORS + 1);\
+      TImpalaQueryOptions::BROADCAST_BYTES_LIMIT + 1);\
   REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)\
   REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\
@@ -192,7 +192,8 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
   QUERY_OPT_FN(now_string, NOW_STRING, TQueryOptionLevel::DEVELOPMENT)\
   QUERY_OPT_FN(parquet_object_store_split_size, PARQUET_OBJECT_STORE_SPLIT_SIZE,\
       TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(mem_limit_executors, MEM_LIMIT_EXECUTORS, TQueryOptionLevel::DEVELOPMENT)
+  QUERY_OPT_FN(mem_limit_executors, MEM_LIMIT_EXECUTORS, TQueryOptionLevel::DEVELOPMENT)\
+  QUERY_OPT_FN(broadcast_bytes_limit, BROADCAST_BYTES_LIMIT, TQueryOptionLevel::ADVANCED)
   ;
 
 /// Enforce practical limits on some query options to avoid undesired query state.
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index 62b396e..3ba97c9 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -402,7 +402,11 @@ struct TQueryOptions {
   96: optional i64 parquet_object_store_split_size = 268435456;
 
   // See comment in ImpalaService.thrift
-  97: optional i64 mem_limit_executors = 0
+  97: optional i64 mem_limit_executors = 0;
+
+  // See comment in ImpalaService.thrift
+  // The default value is set to 32 GB
+  98: optional i64 broadcast_bytes_limit = 34359738368;
 }
 
 // Impala currently has two types of sessions: Beeswax and HiveServer2
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index a80eb45..33d50ba 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -484,6 +484,12 @@ enum TImpalaQueryOptions {
   // a) an int (= number of bytes);
   // b) a float followed by "M" (MB) or "G" (GB)
   MEM_LIMIT_EXECUTORS = 96
+
+  // The max number of estimated bytes eligible for a Broadcast operation during a join.
+  // If the planner thinks the total bytes sent to all destinations of a broadcast
+  // exchange will exceed this limit, it will not consider a broadcast and instead
+  // fall back on a hash partition exchange. 0 or -1 means this has no effect.
+  BROADCAST_BYTES_LIMIT = 97
 }
 
 // The summary of a DML statement.
diff --git a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
index 4297c87..cfd238b 100644
--- a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
@@ -556,14 +556,21 @@ public class DistributedPlanner {
          ctx_.getQueryOptions().getDefault_join_distribution_mode());
    }
 
-   // Decide the distribution mode based on the estimated costs and the mem limit.
    int mt_dop = ctx_.getQueryOptions().mt_dop;
+
+   // Decide the distribution mode based on the estimated costs, the mem limit and
+   // the broadcast bytes limit. The last value is a safety check to ensure we
+   // don't broadcast very large inputs (for example in case the broadcast cost was
+   // not computed correctly and the query mem limit has not been set or set too high)
    long htSize = Math.round(rhsDataSize * PlannerContext.HASH_TBL_SPACE_OVERHEAD);
    // TODO: IMPALA-4224: update this once we can share the broadcast join data between
    // finstances.
    if (mt_dop > 1) htSize *= mt_dop;
    long memLimit = ctx_.getQueryOptions().mem_limit;
-   if (broadcastCost <= partitionCost && (memLimit == 0 || htSize <= memLimit)) {
+   long broadcast_bytes_limit = ctx_.getQueryOptions().getBroadcast_bytes_limit();
+
+   if (broadcastCost <= partitionCost && (memLimit == 0 || htSize <= memLimit) &&
+           (broadcast_bytes_limit == 0 || htSize <= broadcast_bytes_limit)) {
      return DistributionMode.BROADCAST;
    }
    // Partitioned was cheaper or the broadcast HT would not fit within the mem limit.
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index c123ad8..fa81c84 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -1003,4 +1003,19 @@ public class PlannerTest extends PlannerTestBase {
                                         PlannerTestOption.INCLUDE_RESOURCE_HEADER,
                                         PlannerTestOption.VALIDATE_RESOURCES));
   }
+
+  @Test
+  public void testBroadcastBytesLimit() {
+    TQueryOptions options = new TQueryOptions();
+    // broadcast limit is smaller than the build side of hash join, so we should
+    // NOT pick broadcast unless it is overridden through a join hint
+    options.setBroadcast_bytes_limit(100);
+    runPlannerTestFile("broadcast-bytes-limit", "tpch_parquet", options);
+
+    // broadcast limit is larger than the build side of hash join, so we SHOULD
+    // pick broadcast (i.e verify the standard case)
+    options.setBroadcast_bytes_limit(1000000);
+    runPlannerTestFile("broadcast-bytes-limit-large", "tpch_parquet", options);
+  }
+
 }
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
index 58692a6..7a7e8ca 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
@@ -855,6 +855,12 @@ public class PlannerTestBase extends FrontendTestBase {
     runPlannerTestFile(testFile, dbName, defaultQueryOptions(), testOptions);
   }
 
+  protected void runPlannerTestFile(
+      String testFile, String dbName, TQueryOptions options) {
+    runPlannerTestFile(testFile, dbName, options,
+        Collections.<PlannerTestOption>emptySet());
+  }
+
   private void runPlannerTestFile(String testFile, String dbName, TQueryOptions options,
         Set<PlannerTestOption> testOptions) {
     String fileName = testDir_.resolve(testFile + ".test").toString();
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/broadcast-bytes-limit-large.test b/testdata/workloads/functional-planner/queries/PlannerTest/broadcast-bytes-limit-large.test
new file mode 100644
index 0000000..bd27412
--- /dev/null
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/broadcast-bytes-limit-large.test
@@ -0,0 +1,24 @@
+# Check that broadcast distribution IS chosen if size of
+# hash join build side input is smaller than broadcast_bytes_limit
+select c_name from customer inner join nation on c_nationkey = n_nationkey
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+04:EXCHANGE [UNPARTITIONED]
+|
+02:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: c_nationkey = n_nationkey
+|  runtime filters: RF000 <- n_nationkey
+|  row-size=34B cardinality=150.00K
+|
+|--03:EXCHANGE [BROADCAST]
+|  |
+|  01:SCAN HDFS [tpch_parquet.nation]
+|     HDFS partitions=1/1 files=1 size=3.04KB
+|     row-size=2B cardinality=25
+|
+00:SCAN HDFS [tpch_parquet.customer]
+   HDFS partitions=1/1 files=1 size=12.34MB
+   runtime filters: RF000 -> c_nationkey
+   row-size=32B cardinality=150.00K
+====
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/broadcast-bytes-limit.test b/testdata/workloads/functional-planner/queries/PlannerTest/broadcast-bytes-limit.test
new file mode 100644
index 0000000..b187890
--- /dev/null
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/broadcast-bytes-limit.test
@@ -0,0 +1,53 @@
+# check that broadcast distribution is NOT chosen if estimated size of hash join
+# build side input is greater than broadcast_bytes_limit
+select c_name from customer inner join nation on c_nationkey = n_nationkey
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+05:EXCHANGE [UNPARTITIONED]
+|
+02:HASH JOIN [INNER JOIN, PARTITIONED]
+|  hash predicates: c_nationkey = n_nationkey
+|  runtime filters: RF000 <- n_nationkey
+|  row-size=34B cardinality=150.00K
+|
+|--04:EXCHANGE [HASH(n_nationkey)]
+|  |
+|  01:SCAN HDFS [tpch_parquet.nation]
+|     HDFS partitions=1/1 files=1 size=3.04KB
+|     row-size=2B cardinality=25
+|
+03:EXCHANGE [HASH(c_nationkey)]
+|
+00:SCAN HDFS [tpch_parquet.customer]
+   HDFS partitions=1/1 files=1 size=12.34MB
+   runtime filters: RF000 -> c_nationkey
+   row-size=32B cardinality=150.00K
+====
+# negative test: check that broadcast distribution IS chosen
+# even if estimated size of hash join build side input is
+# greater than broadcast_bytes_limit because the query has a
+# join hint forcing broadcast distribution
+select c_name from customer inner join /* +broadcast */ nation
+    on c_nationkey = n_nationkey
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+04:EXCHANGE [UNPARTITIONED]
+|
+02:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: c_nationkey = n_nationkey
+|  runtime filters: RF000 <- n_nationkey
+|  row-size=34B cardinality=150.00K
+|
+|--03:EXCHANGE [BROADCAST]
+|  |
+|  01:SCAN HDFS [tpch_parquet.nation]
+|     HDFS partitions=1/1 files=1 size=3.04KB
+|     row-size=2B cardinality=25
+|
+00:SCAN HDFS [tpch_parquet.customer]
+   HDFS partitions=1/1 files=1 size=12.34MB
+   runtime filters: RF000 -> c_nationkey
+   row-size=32B cardinality=150.00K
+====