You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2020/10/27 04:52:30 UTC

[impala] branch master updated: IMPALA-10075: Reuse unchanged partition instances

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ac382c  IMPALA-10075: Reuse unchanged partition instances
8ac382c is described below

commit 8ac382c784de89c7ee8e14d9e26718f20774d190
Author: stiga-huang <hu...@gmail.com>
AuthorDate: Tue Sep 1 17:01:40 2020 +0800

    IMPALA-10075: Reuse unchanged partition instances
    
    Currently, we always update the partition instance when we reload a
    partition. If a partition remains the same after reloading, we should
    reuse the old partition instance. So we won't send redundant updates on
    these partitions. This reduces the size of the catalog topic update.
    When a huge table is REFRESHed, catalogd only propagates the changed
    partitions.
    
    Tests:
     - Add tests to verify that partition instances are reused after some
       DDL/DMLs.
    
    Change-Id: I2dd645c260d271291021e52fdac4b74924df1170
    Reviewed-on: http://gerrit.cloudera.org:8080/16392
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../apache/impala/catalog/FileMetadataLoader.java  |  21 ++--
 .../org/apache/impala/catalog/HdfsPartition.java   |  37 ++++++
 .../java/org/apache/impala/catalog/HdfsTable.java  |  12 +-
 .../impala/catalog/ParallelFileMetadataLoader.java |   7 ++
 tests/metadata/test_reuse_partitions.py            | 131 +++++++++++++++++++++
 5 files changed, 198 insertions(+), 10 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java b/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java
index c1cba7f..b96ec25 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java
@@ -223,8 +223,8 @@ public class FileMetadataLoader {
         }
         String relPath = FileSystemUtil.relativizePath(fileStatus.getPath(), partDir_);
         FileDescriptor fd = oldFdsByRelPath_.get(relPath);
-        if (listWithLocations || forceRefreshLocations ||
-            hasFileChanged(fd, fileStatus)) {
+        if (listWithLocations || forceRefreshLocations || fd == null ||
+            fd.isChanged(fileStatus)) {
           fd = createFd(fs, fileStatus, relPath, numUnknownDiskIds);
           ++loadStats_.loadedFiles;
         } else {
@@ -272,13 +272,18 @@ public class FileMetadataLoader {
   }
 
   /**
-   * Compares the modification time and file size between the FileDescriptor and the
-   * FileStatus to determine if the file has changed. Returns true if the file has changed
-   * and false otherwise.
+   * Given a file descriptor list 'oldFds', returns true if the loaded file descriptors
+   * are the same as them.
    */
-  private static boolean hasFileChanged(FileDescriptor fd, FileStatus status) {
-    return (fd == null) || (fd.getFileLength() != status.getLen()) ||
-      (fd.getModificationTime() != status.getModificationTime());
+  public boolean hasFilesChangedCompareTo(List<FileDescriptor> oldFds) {
+    if (oldFds.size() != loadedFds_.size()) return true;
+    ImmutableMap<String, FileDescriptor> oldFdsByRelPath =
+        Maps.uniqueIndex(oldFds, FileDescriptor::getRelativePath);
+    for (FileDescriptor fd : loadedFds_) {
+      FileDescriptor oldFd = oldFdsByRelPath.get(fd.getRelativePath());
+      if (fd.isChanged(oldFd)) return true;
+    }
+    return false;
   }
 
   /**
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
index f64a1f0..50fbde7 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
@@ -281,6 +281,27 @@ public class HdfsPartition extends CatalogObjectImpl
     }
 
     /**
+     * Compares the modification time and file size between current FileDescriptor and the
+     * latest FileStatus to determine if the file has changed. Returns true if the file
+     * has changed and false otherwise. Note that block location changes are not
+     * considered as file changes. Table reloading won't recognize block location changes
+     * which require an INVALIDATE METADATA command on the table to clear the stale
+     * locations.
+     */
+    public boolean isChanged(FileStatus latestStatus) {
+      return latestStatus == null || getFileLength() != latestStatus.getLen()
+          || getModificationTime() != latestStatus.getModificationTime();
+    }
+
+    /**
+     * Same as above but compares to a FileDescriptor instance.
+     */
+    public boolean isChanged(FileDescriptor latestFd) {
+      return latestFd == null || getFileLength() != latestFd.getFileLength()
+          || getModificationTime() != latestFd.getModificationTime();
+    }
+
+    /**
      * Function to convert from a byte[] flatbuffer to the wrapper class. Note that
      * this returns a shallow copy which continues to reflect any changes to the
      * passed byte[].
@@ -1555,6 +1576,22 @@ public class HdfsPartition extends CatalogObjectImpl
             ") has invalid partition column values: ", e);
       }
     }
+
+    public boolean equalsToOriginal(HdfsPartition oldInstance) {
+      return (oldInstance == oldInstance_
+          && encodedFileDescriptors_ == oldInstance.encodedFileDescriptors_
+          && encodedInsertFileDescriptors_ == oldInstance.encodedInsertFileDescriptors_
+          && encodedDeleteFileDescriptors_ == oldInstance.encodedDeleteFileDescriptors_
+          && fileFormatDescriptor_ == oldInstance.fileFormatDescriptor_
+          && location_ == oldInstance.location_
+          && isMarkedCached_ == oldInstance.isMarkedCached_
+          && accessLevel_ == oldInstance.accessLevel_
+          && hmsParameters_.equals(oldInstance.hmsParameters_)
+          && partitionStats_ == oldInstance.partitionStats_
+          && hasIncrementalStats_ == oldInstance.hasIncrementalStats_
+          && numRows_ == oldInstance.numRows_
+          && writeId_ == oldInstance.writeId_);
+    }
   }
 
   /**
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index c99c1c5..5d82fe1 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -928,8 +928,16 @@ public class HdfsTable extends Table implements FeFsTable {
 
   public void updatePartition(HdfsPartition.Builder partBuilder) throws CatalogException {
     HdfsPartition oldPartition = partBuilder.getOldInstance();
-    Preconditions.checkNotNull(oldPartition);
-    Preconditions.checkState(partitionMap_.containsKey(oldPartition.getId()));
+    Preconditions.checkNotNull(oldPartition,
+        "Old partition instance should exist for updates");
+    Preconditions.checkState(partitionMap_.containsKey(oldPartition.getId()),
+        "Updating a non existing partition instance");
+    Preconditions.checkState(partitionMap_.get(partBuilder.getOldId()) == oldPartition,
+        "Concurrent modification on partitions: old instance changed");
+    boolean partitionNotChanged = partBuilder.equalsToOriginal(oldPartition);
+    LOG.trace("Partition {} {}", oldPartition.getName(),
+        partitionNotChanged ? "changed" : "unchanged");
+    if (partitionNotChanged) return;
     HdfsPartition newPartition = partBuilder.build();
     // Partition is reloaded and hence cache directives are not dropped.
     dropPartition(oldPartition, false);
diff --git a/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java b/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java
index 8ac4e2d..3f8fdcc 100644
--- a/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java
+++ b/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java
@@ -120,6 +120,13 @@ public class ParallelFileMetadataLoader {
       FileMetadataLoader loader = loaders_.get(p);
 
       for (HdfsPartition.Builder partBuilder : e.getValue()) {
+        // Checks if we can reuse the old file descriptors. Partition builders in the list
+        // may have different old file descriptors. We need to verify them one by one.
+        if ((!loader.hasFilesChangedCompareTo(partBuilder.getFileDescriptors()))) {
+          LOG.trace("Detected files unchanged on partition {}",
+              partBuilder.getPartitionName());
+          continue;
+        }
         partBuilder.clearFileDescriptors();
         List<FileDescriptor> deleteDescriptors = loader.getLoadedDeleteDeltaFds();
         if (deleteDescriptors != null && !deleteDescriptors.isEmpty()) {
diff --git a/tests/metadata/test_reuse_partitions.py b/tests/metadata/test_reuse_partitions.py
new file mode 100644
index 0000000..72bd0d8
--- /dev/null
+++ b/tests/metadata/test_reuse_partitions.py
@@ -0,0 +1,131 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import json
+import requests
+from tests.common.impala_test_suite import ImpalaTestSuite
+from tests.common.test_dimensions import create_uncompressed_text_dimension
+
+
+class TestReusePartitions(ImpalaTestSuite):
+  """Tests for catalogd reusing unchanged partition instances for DDL/DMLs"""
+
+  JSON_TABLE_OBJECT_URL = "http://localhost:25020/catalog_object?" \
+                          "json&object_type=TABLE&object_name={0}.{1}"
+
+  @classmethod
+  def get_workload(self):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestReusePartitions, cls).add_test_dimensions()
+
+    # There is no reason to run these tests using all dimensions.
+    cls.ImpalaTestMatrix.add_dimension(
+        create_uncompressed_text_dimension(cls.get_workload()))
+
+  def __get_partition_id_set(self, db_name, tbl_name):
+    obj_url = self.JSON_TABLE_OBJECT_URL.format(db_name, tbl_name)
+    response = requests.get(obj_url)
+    assert response.status_code == requests.codes.ok
+    catalog_obj = json.loads(json.loads(response.text)["json_string"])
+    assert "table" in catalog_obj
+    assert "hdfs_table" in catalog_obj["table"]
+    tbl_obj = catalog_obj["table"]["hdfs_table"]
+    assert "partitions" in tbl_obj
+    return set(tbl_obj["partitions"].keys())
+
+  def test_reuse_partitions_nontransactional(self, unique_database):
+    self.__test_reuse_partitions_helper(unique_database, transactional=False)
+
+  def test_reuse_partitions_transactional(self, unique_database):
+    self.__test_reuse_partitions_helper(unique_database, transactional=True)
+
+  def __test_reuse_partitions_helper(self, unique_database, transactional=False):
+    """Test catalogd reuses partition instances by verifying the partition ids
+    are unchanged"""
+    tbl_name = "tbl"
+    create_tbl_ddl =\
+        "create table %s.%s (id int) partitioned by (p int) stored as textfile"\
+        % (unique_database, tbl_name)
+    if transactional:
+      create_tbl_ddl += " tblproperties('transactional'='true'," \
+                        " 'transactional_properties'='insert_only')"
+    # Creates a partitioned table with 3 partitions.
+    self.client.execute(create_tbl_ddl)
+    self.client.execute("insert into %s.%s partition (p) values (1, 1), (2, 2), (3, 3)"
+                        % (unique_database, tbl_name))
+    part_ids = self.__get_partition_id_set(unique_database, tbl_name)
+    assert len(part_ids) == 3
+
+    # REFRESH can reuse the existing partition instances.
+    self.client.execute("refresh %s.%s" % (unique_database, tbl_name))
+    assert self.__get_partition_id_set(unique_database, tbl_name) == part_ids
+    # INSERT query that only touches one partition will reuse the other partitions.
+    self.client.execute("insert into %s.%s partition (p) values (1, 1)"
+                        % (unique_database, tbl_name))
+    new_part_ids = self.__get_partition_id_set(unique_database, tbl_name)
+    assert len(part_ids.intersection(new_part_ids)) == 2
+    part_ids = new_part_ids
+    # INSERT query that adds a new partition will reuse the existing partitions.
+    self.client.execute("insert into %s.%s partition(p) values (4, 4)"
+                        % (unique_database, tbl_name))
+    new_part_ids = self.__get_partition_id_set(unique_database, tbl_name)
+    assert len(part_ids.intersection(new_part_ids)) == 3
+    part_ids = new_part_ids
+
+    # ALTER TABLE not supported on transactional tables (IMPALA-8831).
+    if not transactional:
+      # ALTER statements that don't touch data will reuse the existing partitions.
+      self.client.execute("alter table %s.%s set tblproperties('numRows'='4')"
+                          % (unique_database, tbl_name))
+      assert self.__get_partition_id_set(unique_database, tbl_name) == part_ids
+      self.client.execute("alter table %s.%s add column name string"
+                          % (unique_database, tbl_name))
+      assert self.__get_partition_id_set(unique_database, tbl_name) == part_ids
+      self.client.execute("alter table %s.%s drop column name"
+                          % (unique_database, tbl_name))
+      assert self.__get_partition_id_set(unique_database, tbl_name) == part_ids
+      # ALTER statements that modify a partition will reuse other partitions.
+      self.client.execute("alter table %s.%s add partition (p=5)"
+                          % (unique_database, tbl_name))
+      new_part_ids = self.__get_partition_id_set(unique_database, tbl_name)
+      assert len(new_part_ids) == 5
+      assert len(part_ids.intersection(new_part_ids)) == 4
+      self.client.execute("alter table %s.%s drop partition (p=5)"
+                          % (unique_database, tbl_name))
+      new_part_ids = self.__get_partition_id_set(unique_database, tbl_name)
+      assert part_ids == new_part_ids
+
+    # Updating stats will also update partition stats so no instances can be reused.
+    self.client.execute("compute stats %s.%s" % (unique_database, tbl_name))
+    new_part_ids = self.__get_partition_id_set(unique_database, tbl_name)
+    assert len(new_part_ids) == 4
+    assert len(part_ids.intersection(new_part_ids)) == 0
+    self.client.execute("compute incremental stats %s.%s" % (unique_database, tbl_name))
+    new_part_ids = self.__get_partition_id_set(unique_database, tbl_name)
+    assert len(new_part_ids) == 4
+    assert len(part_ids.intersection(new_part_ids)) == 0
+    part_ids = new_part_ids
+    # DROP STATS not supported on transactional tables (HIVE-22104).
+    if not transactional:
+      # Drop incremental stats of one partition can reuse the other 3 partitions.
+      self.client.execute("drop incremental stats %s.%s partition (p=1)"
+                          % (unique_database, tbl_name))
+      new_part_ids = self.__get_partition_id_set(unique_database, tbl_name)
+      assert len(part_ids.intersection(new_part_ids)) == 3