You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2020/10/14 22:48:33 UTC

[impala] 02/06: IMPALA-10113: Add feature flag for incremental metadata updates

This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 308d692a1ba0c86c7ff718425495d787571c1484
Author: stiga-huang <hu...@gmail.com>
AuthorDate: Wed Sep 9 10:20:46 2020 +0800

    IMPALA-10113: Add feature flag for incremental metadata updates
    
    This patch adds a feature flag, enable_incremental_metadata_updates, to
    turn off incremental metadata (i.e. partition level metadata)
    propagation from catalogd to coordinators. It defaults to true. When
    setting to false, catalogd will send metadata updates in table
    granularity (the legacy behavior).
    
    Also fixes a bug of logging an empty aggregated partition update log
    when no partitions are changed in a DDL.
    
    Tests:
     - Run CORE tests with this flag set to true and false.
     - Add tests with enable_incremental_metadata_updates=false.
    
    Change-Id: I98676fc8ca886f3d9f550f9b96fa6d6bff178ebb
    Reviewed-on: http://gerrit.cloudera.org:8080/16436
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/common/global-flags.cc                         |  8 ++++++++
 be/src/util/backend-gflag-util.cc                     |  3 +++
 common/thrift/BackendGflags.thrift                    |  2 ++
 .../apache/impala/catalog/CatalogServiceCatalog.java  | 19 +++++++++++++------
 .../java/org/apache/impala/catalog/HdfsTable.java     | 15 +++++++++++----
 .../org/apache/impala/catalog/ImpaladCatalog.java     |  2 +-
 .../apache/impala/catalog/PartitionMetaSummary.java   |  5 +++++
 .../java/org/apache/impala/service/BackendConfig.java |  4 ++++
 tests/custom_cluster/test_disable_features.py         | 14 ++++++++++++++
 tests/custom_cluster/test_local_catalog.py            |  3 ++-
 10 files changed, 63 insertions(+), 12 deletions(-)

diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc
index 55a62b9..3025be9 100644
--- a/be/src/common/global-flags.cc
+++ b/be/src/common/global-flags.cc
@@ -341,6 +341,14 @@ DEFINE_bool_hidden(use_customized_user_groups_mapper_for_ranger, false,
 DEFINE_bool(enable_column_masking, true,
     "If false, disable the column masking feature. Defaults to be true.");
 
+DEFINE_bool(enable_incremental_metadata_updates, true,
+    "If true, Catalog Server will send incremental table updates in partition level in "
+    "the statestore topic updates. Legacy coordinators will apply the partition updates "
+    "incrementally, i.e. reuse unchanged partition metadata. Disable this feature by "
+    "setting this to false in the Catalog Server. Then metadata of each table will be "
+    "propagated as a whole object in the statestore topic updates. Note that legacy "
+    "coordinators can apply incremental or full table updates so don't need this flag.");
+
 // ++========================++
 // || Startup flag graveyard ||
 // ++========================++
diff --git a/be/src/util/backend-gflag-util.cc b/be/src/util/backend-gflag-util.cc
index 8401b3b..3a2c470 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -86,6 +86,7 @@ DECLARE_int32(num_check_authorization_threads);
 DECLARE_bool(use_customized_user_groups_mapper_for_ranger);
 DECLARE_bool(enable_column_masking);
 DECLARE_bool(compact_catalog_topic);
+DECLARE_bool(enable_incremental_metadata_updates);
 
 namespace impala {
 
@@ -174,6 +175,8 @@ Status GetThriftBackendGflags(JNIEnv* jni_env, jbyteArray* cfg_bytes) {
       FLAGS_use_customized_user_groups_mapper_for_ranger);
   cfg.__set_enable_column_masking(FLAGS_enable_column_masking);
   cfg.__set_compact_catalog_topic(FLAGS_compact_catalog_topic);
+  cfg.__set_enable_incremental_metadata_updates(
+      FLAGS_enable_incremental_metadata_updates);
   RETURN_IF_ERROR(SerializeThriftMsg(jni_env, &cfg, cfg_bytes));
   return Status::OK();
 }
diff --git a/common/thrift/BackendGflags.thrift b/common/thrift/BackendGflags.thrift
index 48ac997..017ad18 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -159,4 +159,6 @@ struct TBackendGflags {
   67: required bool enable_insert_events
 
   68: required bool compact_catalog_topic
+
+  69: required bool enable_incremental_metadata_updates
 }
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 0c22e6b..5c6bf1a 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -861,9 +861,15 @@ public class CatalogServiceCatalog extends Catalog {
           Catalog.toCatalogObjectKey(removedObject))) {
         ctx.addCatalogObject(removedObject, true);
       }
-      // If this is a HdfsTable, make sure we send deletes for removed partitions.
-      // So we won't leak partition topic entries in the statestored catalog topic.
-      if (removedObject.type == TCatalogObjectType.TABLE
+      // If this is a HdfsTable and incremental metadata updates are enabled, make sure we
+      // send deletes for removed partitions. So we won't leak partition topic entries in
+      // the statestored catalog topic. Partitions are only included as objects in topic
+      // updates if incremental metadata updates are enabled. Don't need this if
+      // incremental metadata updates are disabled, because in this case the table
+      // snapshot will be sent as a complete object. See more details in
+      // addTableToCatalogDeltaHelper().
+      if (BackendConfig.INSTANCE.isIncrementalMetadataUpdatesEnabled()
+          && removedObject.type == TCatalogObjectType.TABLE
           && removedObject.getTable().getTable_type() == TTableType.HDFS_TABLE) {
         THdfsTable hdfsTable = removedObject.getTable().getHdfs_table();
         Preconditions.checkState(
@@ -885,7 +891,7 @@ public class CatalogServiceCatalog extends Catalog {
             ctx.addCatalogObject(removedPart, true, deleteSummary);
           }
         }
-        LOG.info(deleteSummary.toString());
+        if (deleteSummary.hasUpdates()) LOG.info(deleteSummary.toString());
       }
     }
     // Each topic update should contain a single "TCatalog" object which is used to
@@ -1320,7 +1326,8 @@ public class CatalogServiceCatalog extends Catalog {
         return;
       }
       try {
-        if (tbl instanceof HdfsTable) {
+        if (BackendConfig.INSTANCE.isIncrementalMetadataUpdatesEnabled()
+            && tbl instanceof HdfsTable) {
           catalogTbl.setTable(((HdfsTable) tbl).toThriftWithMinimalPartitions());
           addHdfsPartitionsToCatalogDelta((HdfsTable) tbl, ctx);
         } else {
@@ -1364,7 +1371,7 @@ public class CatalogServiceCatalog extends Catalog {
     }
     hdfsTable.resetDroppedPartitions();
 
-    LOG.info(updateSummary.toString());
+    if (updateSummary.hasUpdates()) LOG.info(updateSummary.toString());
   }
 
   /**
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index c287af6..361de4e 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -63,6 +63,7 @@ import org.apache.impala.common.Pair;
 import org.apache.impala.common.PrintUtils;
 import org.apache.impala.compat.MetastoreShim;
 import org.apache.impala.fb.FbFileBlock;
+import org.apache.impala.service.BackendConfig;
 import org.apache.impala.thrift.CatalogLookupStatus;
 import org.apache.impala.thrift.CatalogObjectsConstants;
 import org.apache.impala.thrift.TAccessLevel;
@@ -291,7 +292,7 @@ public class HdfsTable extends Table implements FeFsTable {
 
   // Dropped partitions since last catalog update. These partitions need to be removed
   // in coordinator's cache if there are no updates on them.
-  private final Set<HdfsPartition> droppedPartitions = new HashSet<>();
+  private final Set<HdfsPartition> droppedPartitions_ = new HashSet<>();
 
   // Represents a set of storage-related statistics aggregated at the table or partition
   // level.
@@ -987,7 +988,10 @@ public class HdfsTable extends Table implements FeFsTable {
     // nullPartitionIds_ and partitionValuesMap_ are only maintained in coordinators.
     if (!isStoredInImpaladCatalogCache()) {
       dirtyPartitions_.remove(partitionId);
-      droppedPartitions.add(partition.genMinimalPartition());
+      // Only tracks the dropped partition instances when we need partition-level updates.
+      if (BackendConfig.INSTANCE.isIncrementalMetadataUpdatesEnabled()) {
+        droppedPartitions_.add(partition.genMinimalPartition());
+      }
       return partition;
     }
     for (int i = 0; i < partition.getPartitionValues().size(); ++i) {
@@ -1910,6 +1914,9 @@ public class HdfsTable extends Table implements FeFsTable {
   @Override
   public TCatalogObject toMinimalTCatalogObject() {
     TCatalogObject catalogObject = super.toMinimalTCatalogObject();
+    if (!BackendConfig.INSTANCE.isIncrementalMetadataUpdatesEnabled()) {
+      return catalogObject;
+    }
     catalogObject.getTable().setTable_type(TTableType.HDFS_TABLE);
     THdfsTable hdfsTable = new THdfsTable(hdfsBaseDir_, getColumnNames(),
         nullPartitionKeyValue_, nullColumnValue_,
@@ -1949,13 +1956,13 @@ public class HdfsTable extends Table implements FeFsTable {
    * Gets the deleted/replaced partition instances since last catalog topic update.
    */
   public List<HdfsPartition> getDroppedPartitions() {
-    return ImmutableList.copyOf(droppedPartitions);
+    return ImmutableList.copyOf(droppedPartitions_);
   }
 
   /**
    * Clears the deleted/replaced partition instance set.
    */
-  public void resetDroppedPartitions() { droppedPartitions.clear(); }
+  public void resetDroppedPartitions() { droppedPartitions_.clear(); }
 
   /**
    * Gets catalog objects of new partitions since last catalog update. They are partitions
diff --git a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
index 54ea1ca..85b447d 100644
--- a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
@@ -246,7 +246,7 @@ public class ImpaladCatalog extends Catalog implements FeCatalog {
       }
     }
     for (PartitionMetaSummary summary : partUpdates.values()) {
-      LOG.info(summary.toString());
+      if (summary.hasUpdates()) LOG.info(summary.toString());
     }
 
     for (TCatalogObject catalogObject: sequencer.getUpdatedObjects()) {
diff --git a/fe/src/main/java/org/apache/impala/catalog/PartitionMetaSummary.java b/fe/src/main/java/org/apache/impala/catalog/PartitionMetaSummary.java
index aac98e0..a307c1d 100644
--- a/fe/src/main/java/org/apache/impala/catalog/PartitionMetaSummary.java
+++ b/fe/src/main/java/org/apache/impala/catalog/PartitionMetaSummary.java
@@ -188,6 +188,11 @@ public class PartitionMetaSummary {
     }
   }
 
+  /**
+   * Returns whether we have collected any partition updates/deletes.
+   */
+  public boolean hasUpdates() { return numUpdatedParts_ > 0 || numDeletedParts_ > 0; }
+
   @Override
   public String toString() {
     StringBuilder res = new StringBuilder();
diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
index 675040c..c1a5264 100644
--- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java
+++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
@@ -212,6 +212,10 @@ public class BackendConfig {
 
   public boolean isCompactCatalogTopic() { return backendCfg_.compact_catalog_topic; }
 
+  public boolean isIncrementalMetadataUpdatesEnabled() {
+    return backendCfg_.enable_incremental_metadata_updates;
+  }
+
   // Inits the auth_to_local configuration in the static KerberosName class.
   private static void initAuthToLocal() {
     // If auth_to_local is enabled, we read the configuration hadoop.security.auth_to_local
diff --git a/tests/custom_cluster/test_disable_features.py b/tests/custom_cluster/test_disable_features.py
index 067051c..1434711 100644
--- a/tests/custom_cluster/test_disable_features.py
+++ b/tests/custom_cluster/test_disable_features.py
@@ -18,6 +18,7 @@
 import pytest
 
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.common.parametrize import UniqueDatabase
 
 
 class TestDisableFeatures(CustomClusterTestSuite):
@@ -31,3 +32,16 @@ class TestDisableFeatures(CustomClusterTestSuite):
   @CustomClusterTestSuite.with_args("--enable_orc_scanner=false")
   def test_disable_orc_scanner(self, vector):
     self.run_test_case('QueryTest/disable-orc-scanner', vector)
+
+  @pytest.mark.execute_serially
+  @UniqueDatabase.parametrize(sync_ddl=True)
+  @CustomClusterTestSuite.with_args(
+    catalogd_args="--enable_incremental_metadata_updates=false")
+  def test_disable_incremental_metadata_updates(self, vector, unique_database):
+    """Canary tests for disabling incremental metadata updates. Copy some partition
+    related tests in metadata/test_ddl.py here."""
+    vector.get_value('exec_option')['sync_ddl'] = True
+    self.run_test_case('QueryTest/alter-table-hdfs-caching', vector,
+        use_db=unique_database, multiple_impalad=True)
+    self.run_test_case('QueryTest/alter-table-set-column-stats', vector,
+        use_db=unique_database, multiple_impalad=True)
diff --git a/tests/custom_cluster/test_local_catalog.py b/tests/custom_cluster/test_local_catalog.py
index 109665f..98d6e35 100644
--- a/tests/custom_cluster/test_local_catalog.py
+++ b/tests/custom_cluster/test_local_catalog.py
@@ -160,7 +160,8 @@ class TestCompactCatalogUpdates(CustomClusterTestSuite):
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
     impalad_args="--use_local_catalog=true",
-    catalogd_args="--catalog_topic_mode=minimal")
+    catalogd_args="--catalog_topic_mode=minimal "
+                  "--enable_incremental_metadata_updates=true")
   def test_invalidate_stale_partitions(self, unique_database):
     """
     Test that partition level invalidations are sent from catalogd and processed