You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/08/15 14:02:29 UTC

[impala] 01/04: IMPALA-8847: Ignore add partition events with empty partition list

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 0ff4f450e3f76eb3ac8622588fe0824e367c2b03
Author: Vihang Karajgaonkar <vi...@cloudera.com>
AuthorDate: Fri Aug 9 14:00:20 2019 -0700

    IMPALA-8847: Ignore add partition events with empty partition list
    
    Certain Hive queries like "alter table <table> add if not exists
    partition (<part_spec>)" generate a add_partition event even if the
    partition did not really exists. Such events have a empty partition list
    in the event message which trips on the Precondition check in the
    AddPartitionEvent. This causes event processor to go into error state.
    The only way to recover is to issue invalidate metadata in such a case.
    
    The patch adds logic to ignore such events.
    
    Testing:
    1. Added a test case which reproduces the issue. The test case works
    after the patch is applied.
    
    Change-Id: I877ce6233934e7090cd18e497f748bc6479838cb
    Reviewed-on: http://gerrit.cloudera.org:8080/14049
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: Vihang Karajgaonkar <vi...@cloudera.com>
---
 .../impala/catalog/events/MetastoreEvents.java     | 32 +++++---
 tests/custom_cluster/test_event_processing.py      | 49 ++++++++++++
 tests/util/event_processor_utils.py                | 91 ++++++++++++++++++++++
 3 files changed, 162 insertions(+), 10 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
index 315a38c..51d9ac2 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
@@ -1331,7 +1331,7 @@ public class MetastoreEvents {
   }
 
   public static class AddPartitionEvent extends TableInvalidatingEvent {
-    private final Partition lastAddedPartition_;
+    private Partition lastAddedPartition_;
     private final List<Partition> addedPartitions_;
 
     /**
@@ -1350,12 +1350,16 @@ public class MetastoreEvents {
                 .getAddPartitionMessage(event.getMessage());
         addedPartitions_ =
             Lists.newArrayList(addPartitionMessage_.getPartitionObjs());
-        Preconditions.checkState(addedPartitions_.size() > 0);
-        // when multiple partitions are added in HMS they are all added as one transaction
-        // Hence all the partitions which are present in the message must have the same
-        // serviceId and version if it is set. hence it is fine to just look at the
-        // last added partition in the list and use it for the self-event ids
-        lastAddedPartition_ = addedPartitions_.get(addedPartitions_.size() - 1);
+        // it is possible that the added partitions is empty in certain cases. See
+        // IMPALA-8847 for example
+        if (!addedPartitions_.isEmpty()) {
+          // when multiple partitions are added in HMS they are all added as one
+          // transaction Hence all the partitions which are present in the message must
+          // have the same serviceId and version if it is set. hence it is fine to just
+          // look at the last added partition in the list and use it for the self-event
+          // ids
+          lastAddedPartition_ = addedPartitions_.get(addedPartitions_.size() - 1);
+        }
         msTbl_ = addPartitionMessage_.getTableObj();
       } catch (Exception ex) {
         throw new MetastoreNotificationException(ex);
@@ -1364,12 +1368,15 @@ public class MetastoreEvents {
 
     @Override
     public void process() throws MetastoreNotificationException, CatalogException {
+      // bail out early if there are not partitions to process
+      if (addedPartitions_.isEmpty()) {
+        infoLog("Partition list is empty. Ignoring this event.");
+        return;
+      }
       if (isSelfEvent()) {
         infoLog("Not processing the event as it is a self-event");
         return;
       }
-      // Notification is created for newly created partitions only. We need not worry
-      // about "IF NOT EXISTS".
       try {
         // Reload the whole table if it's a transactional table.
         if (AcidUtils.isTransactionalTable(msTbl_.getParameters())) {
@@ -1541,7 +1548,6 @@ public class MetastoreEvents {
         msTbl_ = Preconditions.checkNotNull(dropPartitionMessage.getTableObj());
         droppedPartitions_ = dropPartitionMessage.getPartitions();
         Preconditions.checkNotNull(droppedPartitions_);
-        Preconditions.checkState(droppedPartitions_.size() > 0);
       } catch (Exception ex) {
         throw new MetastoreNotificationException(
             debugString("Could not parse event message. "
@@ -1553,6 +1559,12 @@ public class MetastoreEvents {
 
     @Override
     public void process() throws MetastoreNotificationException {
+      // we have seen cases where a add_partition event is generated with empty
+      // partition list (see IMPALA-8547 for details. Make sure that droppedPartitions
+      // list is not empty
+      if (droppedPartitions_.isEmpty()) {
+        infoLog("Partition list is empty. Ignoring this event.");
+      }
       // We do not need self event as dropPartition() call is a no-op if the directory
       // doesn't exist.
       try {
diff --git a/tests/custom_cluster/test_event_processing.py b/tests/custom_cluster/test_event_processing.py
index 2aa22d4..f099978 100644
--- a/tests/custom_cluster/test_event_processing.py
+++ b/tests/custom_cluster/test_event_processing.py
@@ -26,6 +26,7 @@ from tests.common.skip import SkipIfS3, SkipIfABFS, SkipIfADLS, SkipIfIsilon, \
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 from tests.common.skip import SkipIfS3, SkipIfABFS, SkipIfADLS, SkipIfIsilon, SkipIfLocal
 from tests.util.hive_utils import HiveDbWrapper
+from tests.util.event_processor_utils import EventProcessorUtils
 
 
 @SkipIfS3.hive
@@ -129,6 +130,54 @@ class TestEventProcessing(CustomClusterTestSuite):
          " year=2019 and id=101" % (db_name, TBL_INSERT_PART))
      assert data.split('\t') == ['101', 'z', '28', '3', '2019']
 
+  @CustomClusterTestSuite.with_args(
+    catalogd_args="--hms_event_polling_interval_s=1"
+  )
+  @SkipIfHive2.acid
+  def test_empty_partition_events_transactional(self, unique_database):
+    self._run_test_empty_partition_events(unique_database, True)
+
+  @CustomClusterTestSuite.with_args(
+    catalogd_args="--hms_event_polling_interval_s=1"
+  )
+  def test_empty_partition_events(self, unique_database):
+    self._run_test_empty_partition_events(unique_database, False)
+
+  def _run_test_empty_partition_events(self, unique_database, is_transactional):
+    TBLPROPERTIES = ""
+    if is_transactional:
+       TBLPROPERTIES = "TBLPROPERTIES ('transactional'='true'," \
+           "'transactional_properties'='insert_only')"
+    test_tbl = unique_database + ".test_events"
+    self.run_stmt_in_hive("create table {0} (key string, value string) \
+      partitioned by (year int) {1} stored as parquet".format(test_tbl, TBLPROPERTIES))
+    EventProcessorUtils.wait_for_event_processing(self.hive_client)
+    self.client.execute("describe {0}".format(test_tbl))
+
+    self.run_stmt_in_hive(
+      "alter table {0} add partition (year=2019)".format(test_tbl))
+    EventProcessorUtils.wait_for_event_processing(self.hive_client)
+    assert [('2019',)] == self.get_impala_partition_info(test_tbl, 'year')
+
+    self.run_stmt_in_hive(
+      "alter table {0} add if not exists partition (year=2019)".format(test_tbl))
+    EventProcessorUtils.wait_for_event_processing(self.hive_client)
+    assert [('2019',)] == self.get_impala_partition_info(test_tbl, 'year')
+    assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
+
+    self.run_stmt_in_hive(
+      "alter table {0} drop partition (year=2019)".format(test_tbl))
+    EventProcessorUtils.wait_for_event_processing(self.hive_client)
+    assert ('2019') not in self.get_impala_partition_info(test_tbl, 'year')
+    assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
+
+    self.run_stmt_in_hive(
+      "alter table {0} drop if exists partition (year=2019)".format(test_tbl))
+    EventProcessorUtils.wait_for_event_processing(self.hive_client)
+    assert ('2019') not in self.get_impala_partition_info(test_tbl, 'year')
+    assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
+
+
   def wait_for_insert_event_processing(self, previous_event_id):
     """Waits until the event processor has finished processing insert events. Since two
     events are created for every insert done through hive, we wait until the event id is
diff --git a/tests/util/event_processor_utils.py b/tests/util/event_processor_utils.py
new file mode 100644
index 0000000..78123e3
--- /dev/null
+++ b/tests/util/event_processor_utils.py
@@ -0,0 +1,91 @@
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# Impala tests for Hive Metastore, covering the expected propagation
+# of metadata from Hive to Impala or Impala to Hive. Each test
+# modifies the metadata via Hive and checks that the modification
+# succeeded by querying Impala, or vice versa.
+
+import requests
+import time
+import json
+from tests.common.environ import build_flavor_timeout
+
+
+class EventProcessorUtils(object):
+
+  DEFAULT_CATALOG_URL = "http://localhost:25020"
+
+  @staticmethod
+  def wait_for_event_processing(hive_client, timeout=10):
+      """Waits till the event processor has synced to the latest event id from metastore
+         or the timeout value in seconds whichever is earlier"""
+      success = False
+      assert timeout > 0
+      assert hive_client is not None
+      current_event_id = EventProcessorUtils.get_current_notification_id(hive_client)
+      end_time = time.time() + timeout
+      while time.time() < end_time:
+        new_event_id = EventProcessorUtils.get_last_synced_event_id()
+        if new_event_id >= current_event_id:
+          success = True
+          break
+        time.sleep(0.1)
+      if not success:
+        raise Exception(
+          "Event processor did not sync till last known event id{0} \
+          within {1} seconds".format(current_event_id, timeout))
+      # Wait for catalog update to be propagated.
+      time.sleep(build_flavor_timeout(6, slow_build_timeout=10))
+      return success
+
+  @staticmethod
+  def get_event_processor_metrics():
+     """Scrapes the catalog's /events webpage and return a dictionary with the event
+     processor metrics."""
+     response = requests.get("%s/events?json" % EventProcessorUtils.DEFAULT_CATALOG_URL)
+     assert response.status_code == requests.codes.ok
+     varz_json = json.loads(response.text)
+     metrics = varz_json["event_processor_metrics"].strip().splitlines()
+
+     # Helper to strip a pair of elements
+     def strip_pair(p):
+       return (p[0].strip(), p[1].strip())
+
+     pairs = [strip_pair(kv.split(':')) for kv in metrics if kv]
+     return dict(pairs)
+
+  @staticmethod
+  def get_last_synced_event_id():
+    """Returns the last_synced_event_id."""
+    metrics = EventProcessorUtils.get_event_processor_metrics()
+    assert 'last-synced-event-id' in metrics.keys()
+    return int(metrics['last-synced-event-id'])
+
+  @staticmethod
+  def get_event_processor_status():
+    """
+    Returns the current status of the EventsProcessor
+    """
+    metrics = EventProcessorUtils.get_event_processor_metrics()
+    assert 'status' in metrics.keys()
+    return metrics['status']
+
+  @staticmethod
+  def get_current_notification_id(hive_client):
+    """Returns the current notification from metastore"""
+    assert hive_client is not None
+    return hive_client.get_current_notificationEventId()