You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2019/09/23 23:37:38 UTC

[impala] branch master updated (eb09070 -> 3984c69)

This is an automated email from the ASF dual-hosted git repository.

stakiar pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from eb09070  [DOCS] Added the Jaro- and Jaro-Winkler functions to the functions TOC
     new f08bad4  [DOCS] Copy edited INSTR() in impala_string_functions.xml
     new 3984c69  IMPALA-8942: Set file format specific split sizes on non-block stores

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/service/query-options.cc                    | 18 +++++
 be/src/service/query-options.h                     |  4 +-
 common/thrift/ImpalaInternalService.thrift         |  3 +
 common/thrift/ImpalaService.thrift                 | 11 +++
 docs/topics/impala_s3.xml                          |  5 +-
 docs/topics/impala_string_functions.xml            | 93 ++++++++--------------
 .../org/apache/impala/planner/HdfsScanNode.java    |  6 +-
 tests/query_test/test_scanners.py                  | 49 ++++++++++++
 8 files changed, 125 insertions(+), 64 deletions(-)


[impala] 02/02: IMPALA-8942: Set file format specific split sizes on non-block stores

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stakiar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 3984c69f03c14355731dd6a518aa64dfc8219450
Author: stakiar <st...@cloudera.com>
AuthorDate: Thu Sep 12 13:58:41 2019 -0700

    IMPALA-8942: Set file format specific split sizes on non-block stores
    
    On non-block based stores (e.g. S3, ADLS, etc.), the planner creates
    split sizes based on the value of FileSystem.getDefaultBlockSize(Path).
    This does not work well for Parquet, because the scanners will only
    process a split if the data range defined by the split overlaps with
    the midpoint of the Parquet row group. This is done to ensure that
    scanners treat Parquet row groups as the unit of processing. The default
    block size for non-block based stores is typically much lower than the
    Parquet row group size. This causes a lot of dummy Parquet splits to be
    created and processed, most of which end up doing nothing. The major
    issue this causes is skew, and each scanner ends up processing a skewed
    amount of data (see IMPALA-3453 for details on the skew issue).
    
    This patch adds a new query option PARQUET_OBJECT_STORE_SPLIT_SIZE
    (defaults to 256 MB) that controls the size of Parquet splits on
    non-block stores.
    
    Impala docs actually recommend setting fs.s3a.block.size to 128 MB
    (row group size used by Hive / Spark) or 256 MB (row group size used by
    Impala). Setting the block size to the row group size results in ideal
    split assignment, but experiments show that using a 256 MB block size
    for 128 MB row groups is better than using a 128 MB block size for 256
    MB row groups, so the default value of PARQUET_OBJECT_STORE_SPLIT_SIZE is
    256 MB. Updated the docs accordingly.
    
    Testing:
    * Ran core tests
    * Added tests to test_scanners.py
    
    Change-Id: I0995b2a3b732d39d6f58e9b3bb04111ac04601e6
    Reviewed-on: http://gerrit.cloudera.org:8080/14247
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/service/query-options.cc                    | 18 ++++++++
 be/src/service/query-options.h                     |  4 +-
 common/thrift/ImpalaInternalService.thrift         |  3 ++
 common/thrift/ImpalaService.thrift                 | 11 +++++
 docs/topics/impala_s3.xml                          |  5 ++-
 .../org/apache/impala/planner/HdfsScanNode.java    |  6 ++-
 tests/query_test/test_scanners.py                  | 49 ++++++++++++++++++++++
 7 files changed, 92 insertions(+), 4 deletions(-)

diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index ea89eb5..cd7007b 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -881,6 +881,24 @@ Status impala::SetQueryOption(const string& key, const string& value,
         query_options->__set_now_string(value);
         break;
       }
+      case TImpalaQueryOptions::PARQUET_OBJECT_STORE_SPLIT_SIZE: {
+        int64_t parquet_object_store_split_size;
+        RETURN_IF_ERROR(ParseMemValue(
+            value, "parquet object store split size", &parquet_object_store_split_size));
+        // The MIN_SYNTHETIC_BLOCK_SIZE from HdfsPartition.java. HdfsScanNode.java forces
+        // the block size to be greater than or equal to this value, so reject any
+        // attempt to set PARQUET_OBJECT_STORE_SPLIT_SIZE to a value lower than
+        // MIN_SYNTHETIC_BLOCK_SIZE.
+        int min_synthetic_block_size = 1024 * 1024;
+        if (parquet_object_store_split_size < min_synthetic_block_size) {
+          return Status(Substitute("Invalid parquet object store split size: '$0'. Must "
+                                   "be greater than or equal to '$1'.",
+              value, min_synthetic_block_size));
+        }
+        query_options->__set_parquet_object_store_split_size(
+            parquet_object_store_split_size);
+        break;
+      }
       default:
         if (IsRemovedQueryOption(key)) {
           LOG(WARNING) << "Ignoring attempt to set removed query option '" << key << "'";
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index 843afc9..baabd6a 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -47,7 +47,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
 // time we add or remove a query option to/from the enum TImpalaQueryOptions.
 #define QUERY_OPTS_TABLE\
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
-      TImpalaQueryOptions::NOW_STRING + 1);\
+      TImpalaQueryOptions::PARQUET_OBJECT_STORE_SPLIT_SIZE + 1);\
   REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)\
   REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\
@@ -190,6 +190,8 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
   QUERY_OPT_FN(fetch_rows_timeout_ms, FETCH_ROWS_TIMEOUT_MS,\
       TQueryOptionLevel::ADVANCED)\
   QUERY_OPT_FN(now_string, NOW_STRING, TQueryOptionLevel::DEVELOPMENT)\
+  QUERY_OPT_FN(parquet_object_store_split_size, PARQUET_OBJECT_STORE_SPLIT_SIZE,\
+      TQueryOptionLevel::ADVANCED)
   ;
 
 /// Enforce practical limits on some query options to avoid undesired query state.
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index d64f59c..d52e2ca 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -397,6 +397,9 @@ struct TQueryOptions {
 
   // For testing purposes
   95: optional string now_string = "";
+
+  // See comment in ImpalaService.thrift
+  96: optional i64 parquet_object_store_split_size = 268435456;
 }
 
 // Impala currently has two types of sessions: Beeswax and HiveServer2
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index fa6797c..8ba1066 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -464,6 +464,17 @@ enum TImpalaQueryOptions {
   // For testing purposes only. This can provide a datetime string to use as now() for
   // tests.
   NOW_STRING = 94
+
+  // The split size of Parquet files when scanning non-block-based storage systems (e.g.
+  // S3, ADLS, etc.). When reading from block-based storage systems (e.g. HDFS), Impala
+  // sets the split size for Parquet files to the size of the blocks. This is done
+  // because Impala assumes Parquet files have a single row group per block (which is
+  // the recommended way Parquet files should be written). However, since non-block-based
+  // storage systems have no concept of blocks, there is no way to derive a good default
+  // value for Parquet split sizes. Defaults to 256 MB, which is the default size of
+  // Parquet files written by Impala (Impala writes Parquet files with a single row
+  // group per file). Must be >= 1 MB.
+  PARQUET_OBJECT_STORE_SPLIT_SIZE = 95
 }
 
 // The summary of a DML statement.
diff --git a/docs/topics/impala_s3.xml b/docs/topics/impala_s3.xml
index 0533b61..e67d9db 100644
--- a/docs/topics/impala_s3.xml
+++ b/docs/topics/impala_s3.xml
@@ -733,7 +733,10 @@ under the License.
           <p> Set <codeph>fs.s3a.block.size</codeph> to 134217728 (128 MB in
             bytes) if most Parquet files queried by Impala were written by Hive
             or ParquetMR jobs. Set the block size to 268435456 (256 MB in bytes)
-            if most Parquet files queried by Impala were written by Impala. </p>
+            if most Parquet files queried by Impala were written by Impala.
+            As of Impala 3.4.0, the query option <codeph>PARQUET_OBJECT_STORE_SPLIT_SIZE</codeph>
+            controls the Parquet split size for non-block stores (e.g. S3, ADLS, etc.).
+            The default value is 256 MB.</p>
         </li>
         <li>
           <p>
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index a282a49..c4885b6 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -887,9 +887,11 @@ public class HdfsScanNode extends ScanNode {
       boolean fsHasBlocks = FileSystemUtil.supportsStorageIds(partitionFs);
       if (!fsHasBlocks) {
         // Limit the scan range length if generating scan ranges.
+        long defaultBlockSize = partition.getFileFormat() == HdfsFileFormat.PARQUET ?
+            analyzer.getQueryOptions().parquet_object_store_split_size :
+            partitionFs.getDefaultBlockSize(partition.getLocationPath());
         long maxBlockSize =
-            Math.max(partitionFs.getDefaultBlockSize(partition.getLocationPath()),
-                FileDescriptor.MIN_SYNTHETIC_BLOCK_SIZE);
+            Math.max(defaultBlockSize, FileDescriptor.MIN_SYNTHETIC_BLOCK_SIZE);
         if (scanRangeBytesLimit > 0) {
           scanRangeBytesLimit = Math.min(scanRangeBytesLimit, maxBlockSize);
         } else {
diff --git a/tests/query_test/test_scanners.py b/tests/query_test/test_scanners.py
index 15686de..198ae4c 100644
--- a/tests/query_test/test_scanners.py
+++ b/tests/query_test/test_scanners.py
@@ -962,6 +962,55 @@ class TestTpchScanRangeLengths(ImpalaTestSuite):
     vector.get_value('exec_option')['max_scan_range_length'] = max_scan_range_length
     self.run_test_case('tpch-scan-range-lengths', vector)
 
+
+@SkipIf.not_s3
+class TestParquetScanRangeAssigment(ImpalaTestSuite):
+  """Test scan range assignment for Parquet files on S3. Since scan range assignment
+  cannot be validated in the S3PlannerTest (see IMPALA-8942), validate it here."""
+
+  @classmethod
+  def get_workload(cls):
+    return 'tpch'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestParquetScanRangeAssigment, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_constraint(lambda v:
+        v.get_value('table_format').file_format == 'parquet')
+
+  def test_scan_range_skew(self, vector):
+    """Validate that each scanner reads an even number of row groups (e.g. there is no
+    skew). While scan ranges might be assigned evenly, scanners skip Parquet scan ranges
+    that do not process a range that overlaps the Parquet row-group midpoint."""
+
+    # Run TPC-H Q6, which re-produces the scan range assignment bug described in
+    # IMPALA-3453.
+    result = self.execute_query("select sum(l_extendedprice * l_discount) as revenue "
+        "from tpch_parquet.lineitem where l_shipdate >= '1994-01-01' and "
+        "l_shipdate < '1995-01-01' and l_discount between 0.05 and 0.07 and "
+        "l_quantity < 24")
+
+    # NumRowGroups tracks the number of row groups actually read, not necessarily the
+    # number assigned. Assert that each fragment processed exactly one row group.
+    self.__assert_counter_equals(r'NumRowGroups: (\d+)', 1, result.runtime_profile)
+    # ScanRangesComplete tracks the number of scan ranges assigned to each fragment.
+    # Assert that each fragment was assigned exactly one scan range.
+    self.__assert_counter_equals(r'ScanRangesComplete: (\d+)', 1, result.runtime_profile)
+    # NumScannersWithNoReads tracks the number of scan ranges that did not trigger any
+    # reads. In the case of Parquet, this counter would be > 0 if a fragment was assigned
+    # a scan range that does *not* contain the midpoint of a Parquet row group. Assert
+    # that this value is always 0.
+    self.__assert_counter_equals(r'NumScannersWithNoReads: (\d+)', 0,
+        result.runtime_profile)
+
+  def __assert_counter_equals(self, counter_regex, value, runtime_profile):
+    """Helper method that asserts that the given counter_regex is in the given
+    runtime_profile and that each occurence of the counter matches the expected value."""
+    num_row_groups_counters = re.findall(counter_regex, runtime_profile)
+    assert len(num_row_groups_counters) > 1
+    for num_row_groups in num_row_groups_counters: assert int(num_row_groups) == value
+
+
 # More tests for text scanner
 # 1. Test file that ends w/o tuple delimiter
 # 2. Test file with escape character


[impala] 01/02: [DOCS] Copy edited INSTR() in impala_string_functions.xml

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stakiar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit f08bad4f4a7b2b593b31ded72a3ddb34a881831b
Author: Alex Rodoni <ar...@cloudera.com>
AuthorDate: Mon Sep 23 14:57:25 2019 -0700

    [DOCS] Copy edited INSTR() in impala_string_functions.xml
    
    Change-Id: Ib14d49d4fb1341f810a2e355fa35067277e8ad3b
    Reviewed-on: http://gerrit.cloudera.org:8080/14283
    Reviewed-by: Alex Rodoni <ar...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 docs/topics/impala_string_functions.xml | 93 ++++++++++++---------------------
 1 file changed, 33 insertions(+), 60 deletions(-)

diff --git a/docs/topics/impala_string_functions.xml b/docs/topics/impala_string_functions.xml
index bc7d704..438d303 100644
--- a/docs/topics/impala_string_functions.xml
+++ b/docs/topics/impala_string_functions.xml
@@ -587,27 +587,22 @@ SELECT chr(97);
         </dt>
 
         <dd>
-          <b>Purpose:</b> Returns the position (starting from 1) of the first occurrence of a
-          substring within a longer string.
-          <p>
-            <b>Return type:</b> <codeph>INT</codeph>
+          <b>Purpose:</b> Returns the position (starting from 1) of the first
+          occurrence of a <varname>substr</varname> within a longer string. <p>
+            <b>Return type:</b>
+            <codeph>INT</codeph>
           </p>
-
           <p conref="../shared/impala_common.xml#common/usage_notes_blurb"/>
-
-          <p>
-            If the substring is not present in the string, the function returns 0:
-          </p>
-
-          <p rev="IMPALA-3973 2.8.0">
-            The optional third and fourth arguments let you find instances of the substring
-            other than the first instance starting from the left:
-          </p>
+          <p> If the <varname>substr</varname> is not present in
+              <varname>str</varname>, the function returns 0. </p>
+          <p rev="IMPALA-3973 2.8.0"> The optional third and fourth arguments
+            let you find instances of the <varname>substr</varname> other than
+            the first instance starting from the left. </p>
           <ul>
-            <li>
-              The third argument lets you specify a starting point within the string other than
-              1.
-<codeblock>
+            <li> The third argument, <varname>position</varname>, lets you
+              specify a starting point within the <varname>str</varname> other
+              than 1.
+              <codeblock>
 -- Restricting the search to positions 7..end,
 -- the first occurrence of 'b' is at position 9.
 select instr('foo bar bletch', 'b', 7);
@@ -618,18 +613,13 @@ select instr('foo bar bletch', 'b', 7);
 +---------------------------------+
 </codeblock>
             </li>
-
-            <li>
-              If there are no more occurrences after the specified position, the result is 0.
-            </li>
-
-            <li>
-              <p>
-                If the third argument is negative, the search works right-to-left starting that
-                many characters from the right. The return value still represents the position
-                starting from the left side of the string.
-              </p>
-<codeblock rev="IMPALA-3973 2.8.0">
+            <li> If there are no more occurrences after the specified position,
+              the result is 0. </li>
+            <li> If <varname>position</varname> is negative, the search works
+              right-to-left starting that many characters from the right. The
+              return value still represents the position starting from the left
+              side of <varname>str</varname>.
+              <codeblock rev="IMPALA-3973 2.8.0">
 -- Scanning right to left, the first occurrence of 'o'
 -- is at position 8. (8th character from the left.)
 select instr('hello world','o',-1);
@@ -639,14 +629,10 @@ select instr('hello world','o',-1);
 | 8                             |
 +-------------------------------+
 
-</codeblock>
-            </li>
-
-            <li>
-              <p>
-                The fourth argument lets you specify an occurrence other than the first:
-              </p>
-<codeblock rev="IMPALA-3973 2.8.0">
+</codeblock></li>
+            <li> The fourth argument, <varname>occurrence</varname>, lets you
+              specify an occurrence other than the first.
+              <codeblock rev="IMPALA-3973 2.8.0">
 -- 2nd occurrence of 'b' is at position 9.
 select instr('foo bar bletch', 'b', 1, 2);
 +------------------------------------+
@@ -654,29 +640,16 @@ select instr('foo bar bletch', 'b', 1, 2);
 +------------------------------------+
 | 9                                  |
 +------------------------------------+
-</codeblock>
-            </li>
-
-            <li>
-              Negative position argument means scan right-to-left.
-            </li>
-
-            <li>
-              If the fourth argument is greater than the number of matching occurrences, the
-              function returns 0:
-            </li>
-
+</codeblock></li>
+            <li> If <varname>occurrence</varname> is greater than the number of
+              matching occurrences, the function returns 0. </li>
             <li>
-              The fourth argument cannot be negative or zero. A non-positive value for this
-              argument causes an error:
-            </li>
-
-            <li>
-              <p>
-                If either of the optional arguments is <codeph>NULL</codeph>, the function also
-                returns <codeph>NULL</codeph>:
-              </p>
-            </li>
+              <varname>occurrence</varname> cannot be negative or zero. A
+              non-positive value for this argument causes an error. </li>
+            <li> If either of the optional arguments,
+                <varname>position</varname> or <varname>occurrence</varname>, is
+                <codeph>NULL</codeph>, the function also returns
+                <codeph>NULL</codeph>.</li>
           </ul>
         </dd>