You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2019/09/30 21:35:24 UTC

[impala] branch master updated: IMPALA-8968: Alter database events on dropped database should not put events processor in error state.

This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 0b68189  IMPALA-8968: Alter database events on dropped database should not put events processor in error state.
0b68189 is described below

commit 0b68189166cfe27a7234877b5ce52ded6d62e0ab
Author: Anurag Mantripragada <an...@cloudera.com>
AuthorDate: Tue Sep 24 01:10:46 2019 -0700

    IMPALA-8968: Alter database events on dropped database should not put
    events processor in error state.
    
    This change is two-fold:
    1. If an alter database event is received on database that does not
       exist, the event can be safely ignored. The events processor should
       only go into an error state if updateDb() fails.
    
    2. This change also adds catalog service identifiers to create/drop
       function operations as Impala generates alter database events
       with these operations and they should be detected as self-events
       and ignored.
    
    Testing:
    Add tests to verify both the above changes to
    MetastoreEventsProcessorTest.
    
    Change-Id: I5f76136aeff35d1d38fc8b3d9a38da399d36eced
    Reviewed-on: http://gerrit.cloudera.org:8080/14296
    Reviewed-by: Quanlong Huang <hu...@gmail.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../impala/catalog/CatalogServiceCatalog.java      | 13 ++++
 .../impala/catalog/events/MetastoreEvents.java     | 12 +++-
 .../apache/impala/service/CatalogOpExecutor.java   | 14 +++++
 .../events/MetastoreEventsProcessorTest.java       | 73 ++++++++++++++++++++++
 4 files changed, 110 insertions(+), 2 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 1d625f9..6e40dd0 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -2213,6 +2213,19 @@ public class CatalogServiceCatalog extends Catalog {
   }
 
   /**
+   * Update DB if it exists in catalog. Returns true if updateDb() succeeds, false
+   * otherwise.
+   */
+  public boolean updateDbIfExists(Database msdb) {
+    try {
+      updateDb(msdb);
+    } catch (DatabaseNotFoundException e) {
+      return false;
+    }
+    return true;
+  }
+
+  /**
    * Adds a new role with the given name and grant groups to the AuthorizationPolicy.
    * If a role with the same name already exists it will be overwritten.
    */
diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
index 373dcd7..bd892a5 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
@@ -1216,13 +1216,21 @@ public class MetastoreEvents {
      * the Db object from the event
      */
     @Override
-    public void process() throws CatalogException {
+    public void process() throws CatalogException, MetastoreNotificationException {
       if (isSelfEvent()) {
         infoLog("Not processing the event as it is a self-event");
         return;
       }
+      Preconditions.checkNotNull(alteredDatabase_);
       // If not self event, copy Db object from event to catalog
-      catalog_.updateDb(alteredDatabase_);
+      if (!catalog_.updateDbIfExists(alteredDatabase_)) {
+        // Okay to skip this event. Events processor will not error out.
+        debugLog("Update database {} failed as the database is not present in the "
+            + "catalog.", alteredDatabase_.getName());
+      } else {
+        infoLog("Database {} updated after alter database event.",
+            alteredDatabase_.getName());
+      }
     }
 
     @Override
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 6e4baea..0cf90a4 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -1261,6 +1261,10 @@ public class CatalogOpExecutor {
       if (db == null) {
         throw new CatalogException("Database: " + fn.dbName() + " does not exist.");
       }
+      // Get a new catalog version to assign to the database being altered. This is
+      // needed for events processor as this method creates alter database events.
+      long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
+      addCatalogServiceIdentifiers(db, catalog_.getCatalogServiceId(), newCatalogVersion);
       // Search for existing functions with the same name or signature that would
       // conflict with the function being added.
       for (Function function: db.getFunctions(fn.functionName())) {
@@ -1304,6 +1308,9 @@ public class CatalogOpExecutor {
           // Flush DB changes to metastore
           applyAlterDatabase(db.getMetaStoreDb());
           addedFunctions.add(fn.toTCatalogObject());
+          // now that HMS alter database has succeeded, add this version to list of
+          // inflight events in catalog database if event processing is enabled.
+          catalog_.addVersionsForInflightEvents(db, newCatalogVersion);
         }
       }
 
@@ -1974,6 +1981,10 @@ public class CatalogOpExecutor {
         addSummary(resp, "Database does not exist.");
         return;
       }
+      // Get a new catalog version to assign to the database being altered. This is
+      // needed for events processor as this method creates alter database events.
+      long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
+      addCatalogServiceIdentifiers(db, catalog_.getCatalogServiceId(), newCatalogVersion);
       List<TCatalogObject> removedFunctions = Lists.newArrayList();
       if (!params.isSetSignature()) {
         dropJavaFunctionFromHms(fName.getDb(), fName.getFunction(), params.if_exists);
@@ -2001,6 +2012,9 @@ public class CatalogOpExecutor {
           // Flush DB changes to metastore
           applyAlterDatabase(db.getMetaStoreDb());
           removedFunctions.add(fn.toTCatalogObject());
+          // now that HMS alter operation has succeeded, add this version to list of
+          // inflight events in catalog database if event processing is enabled.
+          catalog_.addVersionsForInflightEvents(db, newCatalogVersion);
         }
       }
 
diff --git a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
index d79d550..7056b7d 100644
--- a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
@@ -58,6 +58,8 @@ import org.apache.hadoop.hive.metastore.api.SerDeInfo;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.PrincipalType;
 import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage;
+import org.apache.impala.analysis.FunctionName;
+import org.apache.impala.analysis.HdfsUri;
 import org.apache.impala.authorization.NoopAuthorizationFactory;
 import org.apache.impala.authorization.NoopAuthorizationFactory.NoopAuthorizationManager;
 import org.apache.impala.catalog.CatalogException;
@@ -72,7 +74,9 @@ import org.apache.impala.catalog.HdfsTable;
 import org.apache.impala.catalog.IncompleteTable;
 import org.apache.impala.catalog.MetaStoreClientPool;
 import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
+import org.apache.impala.catalog.ScalarFunction;
 import org.apache.impala.catalog.Table;
+import org.apache.impala.catalog.Type;
 import org.apache.impala.catalog.events.ConfigValidator.ValidationResult;
 import org.apache.impala.catalog.events.MetastoreEvents.AlterTableEvent;
 import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEvent;
@@ -106,13 +110,16 @@ import org.apache.impala.thrift.TAlterTableType;
 import org.apache.impala.thrift.TColumn;
 import org.apache.impala.thrift.TColumnType;
 import org.apache.impala.thrift.TCreateDbParams;
+import org.apache.impala.thrift.TCreateFunctionParams;
 import org.apache.impala.thrift.TCreateTableParams;
 import org.apache.impala.thrift.TDdlExecRequest;
 import org.apache.impala.thrift.TDdlType;
 import org.apache.impala.thrift.TDropDbParams;
+import org.apache.impala.thrift.TDropFunctionParams;
 import org.apache.impala.thrift.TDropTableOrViewParams;
 import org.apache.impala.thrift.TEventProcessorMetrics;
 import org.apache.impala.thrift.TEventProcessorMetricsSummaryResponse;
+import org.apache.impala.thrift.TFunctionBinaryType;
 import org.apache.impala.thrift.THdfsFileFormat;
 import org.apache.impala.thrift.TOwnerType;
 import org.apache.impala.thrift.TPartitionDef;
@@ -535,6 +542,46 @@ public class MetastoreEventsProcessorTest {
   }
 
   /**
+   * Test empty alter database events generated by operations like create function.
+   */
+  @Test
+  public void testEmptyAlterDatabaseEventsFromImpala() throws ImpalaException {
+    assertEquals(EventProcessorStatus.ACTIVE, eventsProcessor_.getStatus());
+    createDatabaseFromImpala(TEST_DB_NAME, null);
+    assertNotNull("Db should have been found after create database statement",
+        catalog_.getDb(TEST_DB_NAME));
+    long numberOfSelfEventsBefore =
+        eventsProcessor_.getMetrics()
+            .getCounter(MetastoreEventsProcessor.NUMBER_OF_SELF_EVENTS).getCount();
+
+    // Create a dummy scalar function.
+    String fnName = "fn1";
+    ScalarFunction fn1 = new ScalarFunction(new FunctionName(TEST_DB_NAME, fnName),
+        new ArrayList<Type>(){{ add(Type.STRING);}}, Type.INT, false);
+    fn1.setBinaryType(TFunctionBinaryType.JAVA);
+    fn1.setLocation(new HdfsUri("hdfs://foo:bar/fn/fn1.jar"));
+    fn1.setSymbolName("FnClass");
+
+    // Verify alter database events generated by create function and drop function are
+    // identified as self-event.
+    createScalarFunctionFromImpala(fn1);
+    dropScalarFunctionFromImapala(fn1);
+    eventsProcessor_.processEvents();
+    long numberOfSelfEventsAfter =
+        eventsProcessor_.getMetrics()
+            .getCounter(MetastoreEventsProcessor.NUMBER_OF_SELF_EVENTS).getCount();
+    assertEquals("Unexpected number of self-events generated",
+        numberOfSelfEventsBefore + 2, numberOfSelfEventsAfter);
+
+    // Verify alter database on a dropped database does not put event processor in an
+    // error state.
+    createScalarFunctionFromImpala(fn1);
+    dropDatabaseCascadeFromImpala(TEST_DB_NAME);
+    eventsProcessor_.processEvents();
+    assertEquals(EventProcessorStatus.ACTIVE, eventsProcessor_.getStatus());
+  }
+
+  /**
    * Test creates two table (partitioned and non-partitioned) and makes sure that CatalogD
    * has the two created table objects after the CREATE_TABLE events are processed.
    */
@@ -2272,6 +2319,32 @@ public class MetastoreEventsProcessorTest {
   }
 
   /**
+   * Create a scalar function from Impala.
+   */
+  private void createScalarFunctionFromImpala(ScalarFunction fn) throws
+      ImpalaException {
+    TDdlExecRequest req = new TDdlExecRequest();
+    req.setDdl_type(TDdlType.CREATE_FUNCTION);
+    TCreateFunctionParams createFunctionParams = new TCreateFunctionParams();
+    createFunctionParams.setFn(fn.toThrift());
+    req.setCreate_fn_params(createFunctionParams);
+    catalogOpExecutor_.execDdlRequest(req);
+  }
+
+  /**
+   * Drop a scalar function from Impala.
+   */
+  private void dropScalarFunctionFromImapala(ScalarFunction fn) throws ImpalaException {
+    TDdlExecRequest req = new TDdlExecRequest();
+    req.setDdl_type(TDdlType.DROP_FUNCTION);
+    TDropFunctionParams dropFunctionParams = new TDropFunctionParams();
+    dropFunctionParams.setFn_name(fn.getFunctionName().toThrift());
+    dropFunctionParams.setArg_types(fn.toThrift().getArg_types());
+    dropFunctionParams.setSignature(fn.toThrift().getSignature());
+    req.setDrop_fn_params(dropFunctionParams);
+    catalogOpExecutor_.execDdlRequest(req);
+  }
+  /**
    * Renames a table from oldTblName to newTblName from Impala
    */
   private void renameTableFromImpala(String oldTblName, String newTblName)