You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/04/30 15:36:01 UTC

[impala] 03/07: IMPALA-8454 (part 3): enable recursive file listing by default

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 246a5f661ef5f7174aacaec5386954366d1f33af
Author: Todd Lipcon <to...@apache.org>
AuthorDate: Thu Apr 25 16:41:51 2019 -0700

    IMPALA-8454 (part 3): enable recursive file listing by default
    
    This enables recursive listing of files within partition directories by
    default. This is a behavior change, but in fact makes Impala consistent
    with modern versions of Hive, Spark, Presto, etc.
    
    In fact, this is necessary for querying certain Hive tables which have
    been written out by a query containing a UNION ALL clause if that query
    is executed by Tez (see HIVE-12812 for example).
    
    Technically, this is an incompatible change. Although it's unlikely
    people were relying on the non-recursive listing, this patch offers two
    escape hatches:
    - an individual table may be marked with the
      'impala.disable.recursive.listing' property
    - impala may be globally configured with
      --recursively_list_partitions=false
    
    Given that we know this behavior is inconsistent with other SQL engines,
    and that there is no performance benefit to not recursing in the common
    case that there _are_ no subdirectories, I made the flag "hidden" and
    did not document the new table property. These are only "chicken bit"
    flags.
    
    Change-Id: Ib30e2bcaf820210f2faa8f159d1af2f947a4d0e8
    Reviewed-on: http://gerrit.cloudera.org:8080/13127
    Reviewed-by: Bharath Vissapragada <bh...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/common/global-flags.cc                      |   3 +
 be/src/util/backend-gflag-util.cc                  |   2 +
 common/thrift/BackendGflags.thrift                 |   2 +
 .../java/org/apache/impala/catalog/FeFsTable.java  |  15 +++
 .../java/org/apache/impala/catalog/HdfsTable.java  |  10 +-
 .../impala/catalog/local/DirectMetaProvider.java   |   4 +-
 .../org/apache/impala/service/BackendConfig.java   |   4 +
 tests/metadata/test_recursive_listing.py           | 113 +++++++++++++++++++++
 8 files changed, 151 insertions(+), 2 deletions(-)

diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc
index 2684fa9..5e2f019 100644
--- a/be/src/common/global-flags.cc
+++ b/be/src/common/global-flags.cc
@@ -273,6 +273,9 @@ DEFINE_bool_hidden(enable_parquet_page_index_writing_debug_only, false, "If true
 DEFINE_bool_hidden(unlock_mt_dop, false,
     "(Experimental) If true, allow specifying mt_dop for all queries.");
 
+DEFINE_bool_hidden(recursively_list_partitions, true,
+    "If true, recursively list the content of partition directories.");
+
 // ++========================++
 // || Startup flag graveyard ||
 // ++========================++
diff --git a/be/src/util/backend-gflag-util.cc b/be/src/util/backend-gflag-util.cc
index 9623fa4..e6b1570 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -76,6 +76,7 @@ DECLARE_bool(unlock_mt_dop);
 DECLARE_string(ranger_service_type);
 DECLARE_string(ranger_app_id);
 DECLARE_string(authorization_provider);
+DECLARE_bool(recursively_list_partitions);
 
 namespace impala {
 
@@ -151,6 +152,7 @@ Status GetThriftBackendGflags(JNIEnv* jni_env, jbyteArray* cfg_bytes) {
   cfg.__set_ranger_service_type(FLAGS_ranger_service_type);
   cfg.__set_ranger_app_id(FLAGS_ranger_app_id);
   cfg.__set_authorization_provider(FLAGS_authorization_provider);
+  cfg.__set_recursively_list_partitions(FLAGS_recursively_list_partitions);
   RETURN_IF_ERROR(SerializeThriftMsg(jni_env, &cfg, cfg_bytes));
   return Status::OK();
 }
diff --git a/common/thrift/BackendGflags.thrift b/common/thrift/BackendGflags.thrift
index 52561f4..c242c3e 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -127,4 +127,6 @@ struct TBackendGflags {
   50: required string ranger_app_id
 
   51: required string authorization_provider
+
+  52: required bool recursively_list_partitions
 }
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java b/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
index 5fb50cc..1620e6f 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
@@ -210,6 +210,21 @@ public interface FeFsTable extends FeTable {
     }
 
     /**
+     * Returns true if the file contents within a partition should be recursively listed.
+     * Reconciles the Impalad-wide --recursively_list_partitions and the
+     * TBL_PROP_DISABLE_RECURSIVE_LISTING table property
+     */
+    public static boolean shouldRecursivelyListPartitions(FeFsTable table) {
+      org.apache.hadoop.hive.metastore.api.Table msTbl = table.getMetaStoreTable();
+      String propVal = msTbl.getParameters().get(
+          HdfsTable.TBL_PROP_DISABLE_RECURSIVE_LISTING);
+      if (propVal == null) return BackendConfig.INSTANCE.recursivelyListPartitions();
+      // TODO(todd): we should detect if this flag is set on an ACID table and
+      // give some kind of error (we _must_ recursively list such tables)
+      return !Boolean.parseBoolean(propVal);
+    }
+
+    /**
      * Returns an estimated row count for the given number of file bytes. The row count is
      * extrapolated using the table-level row count and file bytes statistics.
      * Returns zero only if the given file bytes is zero.
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index b1a3b7a..b98f357 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -127,6 +127,13 @@ public class HdfsTable extends Table implements FeFsTable {
   public static final String TBL_PROP_ENABLE_STATS_EXTRAPOLATION =
       "impala.enable.stats.extrapolation";
 
+  // Similar to above: a table property that overwrites --recursively_list_partitions
+  // for a specific table. This is an escape hatch in case it turns out that someone
+  // was relying on the previous non-recursive behavior, even though it's known to
+  // be inconsistent with modern versions of Spark, Hive, etc.
+  public static final String TBL_PROP_DISABLE_RECURSIVE_LISTING =
+      "impala.disable.recursive.listing";
+
   // Average memory requirements (in bytes) for storing the metadata of a partition.
   private static final long PER_PARTITION_MEM_USAGE_BYTES = 2048;
 
@@ -560,7 +567,8 @@ public class HdfsTable extends Table implements FeFsTable {
     Map<Path, FileMetadataLoader> loadersByPath = Maps.newHashMap();
     for (Map.Entry<Path, List<HdfsPartition>> e : partsByPath.entrySet()) {
       List<FileDescriptor> oldFds = e.getValue().get(0).getFileDescriptors();
-      FileMetadataLoader loader = new FileMetadataLoader(e.getKey(), /*recursive=*/false,
+      FileMetadataLoader loader = new FileMetadataLoader(e.getKey(),
+          Utils.shouldRecursivelyListPartitions(this),
           oldFds, hostIndex_);
       // If there is a cached partition mapped to this path, we recompute the block
       // locations even if the underlying files have not changed.
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
index 6fbda7c..b222ba6 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
@@ -285,8 +285,10 @@ class DirectMetaProvider implements MetaProvider {
   private ImmutableList<FileDescriptor> loadFileMetadata(String fullTableName,
       String partName, Partition msPartition, ListMap<TNetworkAddress> hostIndex) {
     Path partDir = new Path(msPartition.getSd().getLocation());
+    // TODO(todd): The table property to disable recursive loading is not supported
+    // by this code path. However, DirectMetaProvider is not yet a supported feature.
     FileMetadataLoader fml = new FileMetadataLoader(partDir,
-        /* recursive= */false,
+        /* recursive= */BackendConfig.INSTANCE.recursivelyListPartitions(),
         /* oldFds= */Collections.emptyList(),
         hostIndex);
 
diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
index 527f9f6..97e0b64 100644
--- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java
+++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
@@ -145,6 +145,10 @@ public class BackendConfig {
     return backendCfg_.unlock_mt_dop;
   }
 
+  public boolean recursivelyListPartitions() {
+    return backendCfg_.recursively_list_partitions;
+  }
+
   public String getRangerServiceType() {
     return backendCfg_.getRanger_service_type();
   }
diff --git a/tests/metadata/test_recursive_listing.py b/tests/metadata/test_recursive_listing.py
new file mode 100644
index 0000000..e569ec8
--- /dev/null
+++ b/tests/metadata/test_recursive_listing.py
@@ -0,0 +1,113 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from tests.common.impala_test_suite import ImpalaTestSuite
+from tests.common.test_dimensions import create_uncompressed_text_dimension
+from tests.common.skip import SkipIfLocal
+from tests.util.filesystem_utils import WAREHOUSE
+
+
+@SkipIfLocal.hdfs_client
+class TestRecursiveListing(ImpalaTestSuite):
+  """
+  This class tests that files are recursively listed within directories
+  and partitions, and that REFRESH picks up changes within them.
+  """
+  @classmethod
+  def get_workload(self):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestRecursiveListing, cls).add_test_dimensions()
+    # don't use any exec options, running exactly once is fine
+    cls.ImpalaTestMatrix.clear_dimension('exec_option')
+    cls.ImpalaTestMatrix.add_dimension(
+        create_uncompressed_text_dimension(cls.get_workload()))
+    cls.ImpalaTestMatrix.add_constraint(lambda v:
+        (v.get_value('table_format').file_format == 'text' and
+         v.get_value('table_format').compression_codec == 'none'))
+
+  def _show_files(self, table):
+    files = self.client.execute("show files in {0}".format(table))
+    return files.data
+
+  def _get_rows(self, table):
+    result = self.client.execute("select * from {0}".format(table))
+    return result.data
+
+  def test_unpartitioned(self, vector, unique_database):
+    self._do_test(vector, unique_database, partitioned=False)
+
+  def test_partitioned(self, vector, unique_database):
+    self._do_test(vector, unique_database, partitioned=True)
+
+  def _do_test(self, vector, unique_database, partitioned):
+    tbl_name = "t"
+    fq_tbl_name = unique_database + "." + tbl_name
+    tbl_path = '%s/%s.db/%s' % (WAREHOUSE, unique_database, tbl_name)
+
+    # Create the table
+    self.execute_query_expect_success(self.client,
+        ("create table {tbl} (a string) {partclause} " +
+         "stored as textfile location '{loc}'").format(
+            tbl=fq_tbl_name,
+            partclause=(partitioned and "partitioned by (p int)" or ""),
+            loc=tbl_path))
+
+    if partitioned:
+      self.execute_query_expect_success(self.client,
+        "alter table {0} add partition (p=1)".format(fq_tbl_name))
+      part_path = tbl_path + "/p=1"
+    else:
+      part_path = tbl_path
+
+    # Add a file inside a nested directory and refresh.
+    self.filesystem_client.make_dir("{0}/dir1".format(part_path[1:]))
+    self.filesystem_client.create_file("{0}/dir1/file1.txt".format(part_path[1:]),
+        "file1")
+    self.execute_query_expect_success(self.client, "refresh {0}".format(fq_tbl_name))
+    assert len(self._show_files(fq_tbl_name)) == 1
+    assert len(self._get_rows(fq_tbl_name)) == 1
+
+    # Add another file inside the same directory, make sure it shows up.
+    self.filesystem_client.create_file("{0}/dir1/file2.txt".format(part_path[1:]),
+        "file2")
+    self.execute_query_expect_success(self.client, "refresh {0}".format(fq_tbl_name))
+    assert len(self._show_files(fq_tbl_name)) == 2
+    assert len(self._get_rows(fq_tbl_name)) == 2
+
+    # Add a file at the top level, make sure it shows up.
+    self.filesystem_client.create_file("{0}/file3.txt".format(part_path[1:]),
+        "file3")
+    self.execute_query_expect_success(self.client, "refresh {0}".format(fq_tbl_name))
+    assert len(self._show_files(fq_tbl_name)) == 3
+    assert len(self._get_rows(fq_tbl_name)) == 3
+
+    # Test that disabling recursive listings makes the nested files disappear.
+    self.execute_query_expect_success(self.client, ("alter table {0} set tblproperties(" +
+        "'impala.disable.recursive.listing'='true')").format(fq_tbl_name))
+    self.execute_query_expect_success(self.client, "refresh {0}".format(fq_tbl_name))
+    assert len(self._show_files(fq_tbl_name)) == 1
+    assert len(self._get_rows(fq_tbl_name)) == 1
+    # Re-enable.
+    self.execute_query_expect_success(self.client, ("alter table {0} set tblproperties(" +
+        "'impala.disable.recursive.listing'='false')").format(fq_tbl_name))
+    self.execute_query_expect_success(self.client, "refresh {0}".format(fq_tbl_name))
+    assert len(self._show_files(fq_tbl_name)) == 3
+    assert len(self._get_rows(fq_tbl_name)) == 3
+
+    # Remove the dir with two files. One should remain.
+    self.filesystem_client.delete_file_dir("{0}/dir1".format(part_path[1:]),
+        recursive=True)
+    self.execute_query_expect_success(self.client, "refresh {0}".format(fq_tbl_name))
+    assert len(self._show_files(fq_tbl_name)) == 1
+    assert len(self._get_rows(fq_tbl_name)) == 1