You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2020/08/26 17:11:03 UTC

[impala] 03/03: IMPALA-4364: [Addendum] Compare specific fields in StorageDescriptor

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 21c50f8dbb7ef75dbb56bee07590a710993f4d64
Author: Vihang Karajgaonkar <vi...@apache.org>
AuthorDate: Tue Aug 25 12:22:26 2020 -0700

    IMPALA-4364: [Addendum] Compare specific fields in StorageDescriptor
    
    The query option REFRESH_UPDATED_HMS_PARTITIONS was introduced
    earlier in IMPALA-4364 to detect changes in the partition
    objects in HMS when a refresh table command is issued. Originally,
    it relied on using the StorageDescriptor#equals() method to
    determine if the Partition in catalogd is same as partition
    in HMS with while executing the refresh statement.
    
    However, using StorageDescriptor#equals() is dependent on HMS
    version and may introduce inconsistent behaviors after upgrades.
    For example, when we backported the original patch to older
    distribution which uses Hive-2, the SkewedInfo field of
    StorageDescriptor is not null. This field causes the comparison
    logic to fail, since catalogd doesn't store the SkewedInfo
    field in the cached StorageDescriptor to optimize memory usage.
    
    This patch modifies the comparison logic to use explicit
    implementation in HdfsPartition class which compares only
    some fields which are cached in the HdfsPartition object.
    
    Testing:
    1. Added a new test for the comparison method.
    2. Modified existing test for the query option.
    
    Change-Id: I90c797060265f8f508d0b150e15da3d0f9961b9b
    Reviewed-on: http://gerrit.cloudera.org:8080/16363
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: Vihang Karajgaonkar <vi...@cloudera.com>
---
 .../org/apache/impala/catalog/HdfsPartition.java   |  27 +++++
 .../java/org/apache/impala/catalog/HdfsTable.java  |   3 +-
 .../impala/catalog/HdfsPartitionSdCompareTest.java | 114 +++++++++++++++++++++
 tests/metadata/test_reset_metadata.py              |  15 ++-
 4 files changed, 156 insertions(+), 3 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
index c1fba61..f632a39 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
@@ -947,6 +947,33 @@ public class HdfsPartition extends CatalogObjectImpl
     return storageDescriptor;
   }
 
+  /**
+   * Compares the {@link StorageDescriptor} of this partition with the one provided.
+   * We only care of some fields of the StorageDescriptor (eg.
+   * {@link org.apache.hadoop.hive.metastore.api.SkewedInfo} is not used) which are
+   * relevant to determine of this HdfsPartition's storage descriptor
+   * is same as the one provided.
+   * @param hmsSd The StorageDescriptor object to compare against. Typically, this is
+   *              fetched directly from HMS.
+   * @return true if the HdfsPartition's StorageDescriptor is identical to the given
+   * StorageDescriptor, false otherwise.
+   */
+  public boolean compareSd(StorageDescriptor hmsSd) {
+    Preconditions.checkNotNull(hmsSd);
+    StorageDescriptor sd = getStorageDescriptor();
+    if (sd == null) return false;
+    if (!sd.getCols().equals(hmsSd.getCols())) return false;
+    if (!sd.getLocation().equals(hmsSd.getLocation())) return false;
+    if (!sd.getInputFormat().equals(hmsSd.getInputFormat())) return false;
+    if (!sd.getOutputFormat().equals(hmsSd.getOutputFormat())) return false;
+    if (sd.isCompressed() != hmsSd.isCompressed()) return false;
+    if (sd.getNumBuckets() != hmsSd.getNumBuckets()) return false;
+    if (!sd.getSerdeInfo().equals(hmsSd.getSerdeInfo())) return false;
+    if (!sd.getBucketCols().equals(hmsSd.getBucketCols())) return false;
+    if (!sd.getSortCols().equals(hmsSd.getSortCols())) return false;
+    return sd.getParameters().equals(hmsSd.getParameters());
+  }
+
   public static HdfsPartition prototypePartition(
       HdfsTable table, HdfsStorageDescriptor storageDescriptor) {
     return new Builder(table, CatalogObjectsConstants.PROTOTYPE_PARTITION_ID)
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 9420779..bf91168 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -1471,8 +1471,7 @@ public class HdfsTable extends Table implements FeFsTable {
       // from HMS and if they don't match we assume that the partition has been updated
       // in HMS. This would catch the cases where partition fields, locations or
       // file-format are changed from external systems.
-      StorageDescriptor sd = hdfsPartition.getStorageDescriptor();
-      if(!msPartition.getSd().equals(sd)) {
+      if(!hdfsPartition.compareSd(msPartition.getSd())) {
         // if the updatePartitionBuilder is null, it means that this partition update
         // was not from an in-progress modification in this catalog, but rather from
         // and outside update to the partition.
diff --git a/fe/src/test/java/org/apache/impala/catalog/HdfsPartitionSdCompareTest.java b/fe/src/test/java/org/apache/impala/catalog/HdfsPartitionSdCompareTest.java
new file mode 100644
index 0000000..f27e695
--- /dev/null
+++ b/fe/src/test/java/org/apache/impala/catalog/HdfsPartitionSdCompareTest.java
@@ -0,0 +1,114 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
+import org.apache.impala.testutil.CatalogServiceTestCatalog;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test for the {@link HdfsPartition#compareSd(StorageDescriptor)} method which is used
+ * to detect partitions which have been changed in HMS and needs to be reloaded.
+ */
+public class HdfsPartitionSdCompareTest {
+  private CatalogServiceCatalog catalog_;
+  private Partition hmsPartition_ = null;
+
+  @Before
+  public void init() throws Exception {
+    catalog_ = CatalogServiceTestCatalog.create();
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      hmsPartition_ = client.getHiveClient()
+          .getPartition("functional", "alltypes", "year=2009/month=1");
+    }
+  }
+
+  @After
+  public void cleanUp() { catalog_.close(); }
+
+  /**
+   * Test compares a HdfsPartition in catalog with a test storage descriptor and confirms
+   * that the comparison returns expected results.
+   */
+  @Test
+  public void testCompareSds() throws Exception {
+    assertNotNull(hmsPartition_);
+    assertNotNull(hmsPartition_.getSd());
+    StorageDescriptor hmsSd = hmsPartition_.getSd();
+    HdfsTable tbl = (HdfsTable) catalog_
+        .getOrLoadTable("functional", "alltypes", "test", null);
+    HdfsPartition hdfsPartition = tbl
+        .getPartitionsForNames(Arrays.asList("year=2009/month=1/")).get(0);
+    // make sure that the sd in HMS without any change matches with the sd in catalog.
+    assertTrue(hdfsPartition.compareSd(hmsSd));
+
+    // test location change
+    StorageDescriptor testSd = new StorageDescriptor(hmsSd);
+    testSd.setLocation("file:///tmp/year=2009/month=1");
+    assertFalse(hdfsPartition.compareSd(testSd));
+    // test input format change
+    testSd = new StorageDescriptor(hmsSd);
+    testSd
+        .setInputFormat("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat");
+    assertFalse(hdfsPartition.compareSd(testSd));
+    // test output format change
+    testSd = new StorageDescriptor(hmsSd);
+    testSd.setOutputFormat(
+        "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat");
+    assertFalse(hdfsPartition.compareSd(testSd));
+    // test change in cols
+    testSd = new StorageDescriptor(hmsSd);
+    testSd.addToCols(new FieldSchema("c1", "int", "comment"));
+    assertFalse(hdfsPartition.compareSd(testSd));
+    // test change in sortCols
+    testSd = new StorageDescriptor(hmsSd);
+    testSd.addToSortCols(new Order());
+    testSd.setLocation("file:///tmp/year=2009/month=1");
+    assertFalse(hdfsPartition.compareSd(testSd));
+    // test serde library change
+    testSd = new StorageDescriptor(hmsSd);
+    testSd.setSerdeInfo(new SerDeInfo("parquet",
+        "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe", new HashMap<>()));
+    assertFalse(hdfsPartition.compareSd(testSd));
+    // test compressed flag change
+    testSd = new StorageDescriptor(hmsSd);
+    testSd.setCompressed(!hmsSd.isCompressed());
+    assertFalse(hdfsPartition.compareSd(testSd));
+    // test number of buckets change
+    testSd = new StorageDescriptor(hmsSd);
+    testSd.setNumBuckets(hmsSd.getNumBuckets() + 1);
+    assertFalse(hdfsPartition.compareSd(testSd));
+    // test sd params change
+    testSd = new StorageDescriptor(hmsSd);
+    testSd.putToParameters("test", "value");
+    assertFalse(hdfsPartition.compareSd(testSd));
+  }
+}
\ No newline at end of file
diff --git a/tests/metadata/test_reset_metadata.py b/tests/metadata/test_reset_metadata.py
index 59eda1c..1dbe8a6 100644
--- a/tests/metadata/test_reset_metadata.py
+++ b/tests/metadata/test_reset_metadata.py
@@ -82,6 +82,8 @@ class TestResetMetadata(TestDdlBase):
       query_options={"REFRESH_UPDATED_HMS_PARTITIONS": "true"})
     result = self.execute_query("show partitions {0}".format(tbl))
     assert new_loc in result.get_data()
+    result = self.get_impala_partition_info(unique_database + ".test", "year", "month")
+    assert len(result) == 4
 
     # case2: change the partition to a different file-format, note that the table's
     # file-format is text.
@@ -109,9 +111,20 @@ class TestResetMetadata(TestDdlBase):
     result = self.execute_query(
       "show files in {0} partition (year=2020, month=8)".format(tbl))
     assert ".parq" in result.get_data()
-    # make sure that the other partitions are still in text format
+    result = self.get_impala_partition_info(unique_database + ".test", "year", "month")
+    assert len(result) == 5
+    # make sure that the other partitions are still in text format new as well as old
     self.execute_query("insert into {0} partition (year=2020, month=1) "
       "select c1 from {0} where year=2009 and month=1".format(tbl))
     result = self.execute_query(
       "show files in {0} partition (year=2020, month=1)".format(tbl))
     assert ".txt" in result.get_data()
+    result = self.get_impala_partition_info(unique_database + ".test", "year", "month")
+    assert len(result) == 6
+    self.execute_query("insert into {0} partition (year=2009, month=3) "
+                       "select c1 from {0} where year=2009 and month=1".format(tbl))
+    result = self.execute_query(
+      "show files in {0} partition (year=2009, month=3)".format(tbl))
+    assert ".txt" in result.get_data()
+    result = self.get_impala_partition_info(unique_database + ".test", "year", "month")
+    assert len(result) == 6