You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2023/12/24 00:04:17 UTC

(impala) branch master updated: IMPALA-12561: Event-processor shouldn't go into ERROR state for failures in fetching events

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 5af8fef19 IMPALA-12561: Event-processor shouldn't go into ERROR state for failures in fetching events
5af8fef19 is described below

commit 5af8fef199b60fb7725971b419596a36e48b1eec
Author: stiga-huang <hu...@gmail.com>
AuthorDate: Wed Nov 15 15:15:33 2023 +0800

    IMPALA-12561: Event-processor shouldn't go into ERROR state for failures in fetching events
    
    Any failures in fetching HMS events should be retriable. Event-processor
    should not go into the ERROR state which can only be recovered by a
    global INVALIDATE METADATA command.
    
    This patch deals with the failure in creating a new MetaStoreClient
    by throwing a MetastoreClientInstantiationException instead of an
    IllegalStateException. Previously the IllegalStateException could fail
    the process of fetching HMS events. Now callers can catch the
    MetastoreClientInstantiationException and convert it into
    MetastoreNotificationFetchException if the process is retriable. So the
    event-processor can retry in the next round. There are still other
    callers of Catalog#getMetaStoreClient() that don't catch the new
    exception since their work can't be easily retried.
    
    Also makes sure MetastoreEventsProcessor.getCurrentEventId() only throws
    MetastoreNotificationFetchException. Previously it throws
    CatalogException which will fail the event-processor. Note that
    CatalogException is used for errors in accessing objects in the Catalog,
    e.g. table not found. We shouldn't throw it when fetching HMS events
    fails.
    
    Tests:
     - Add FE unit test to verify MetastoreNotificationFetchException is
       thrown as expected. To mimic HMS connection failures, use a
       customized MetastoreClientPool that uses wrong HMS port.
     - Add e2e test in custom_cluster/test_catalog_hms_failures.py. The test
       class previously only runs in exhaustive jobs due to long running
       time. Optimize the test to only restart HMS. Adds a new option,
       -if_not_running, for run-hive-server.sh to avoid unneccessary
       restarts.
    
    Change-Id: I775684d473fdbfb9f0531234f59a6239bd0873e3
    Reviewed-on: http://gerrit.cloudera.org:8080/20707
    Reviewed-by: Riza Suminto <ri...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../java/org/apache/impala/catalog/Catalog.java    | 11 +++-
 .../impala/catalog/CatalogServiceCatalog.java      |  8 ++-
 .../apache/impala/catalog/MetaStoreClientPool.java |  4 +-
 .../MetastoreClientInstantiationException.java     | 27 ++++++++
 .../catalog/events/ExternalEventsProcessor.java    |  2 +-
 .../catalog/events/MetastoreEventsProcessor.java   | 21 ++++---
 .../events/MetastoreEventsProcessorTest.java       | 73 +++++++++++++++++++++-
 .../testutil/IncompetentMetastoreClientPool.java   | 46 ++++++++++++++
 testdata/bin/run-hive-server.sh                    | 37 +++++++++--
 tests/custom_cluster/test_catalog_hms_failures.py  | 64 ++++++++++---------
 10 files changed, 246 insertions(+), 47 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/catalog/Catalog.java b/fe/src/main/java/org/apache/impala/catalog/Catalog.java
index 430f65aef..be480ec97 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Catalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Catalog.java
@@ -53,6 +53,7 @@ import org.apache.impala.thrift.TUniqueId;
 import org.apache.impala.util.EventSequence;
 import org.apache.impala.util.PatternMatcher;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 
@@ -80,7 +81,7 @@ public abstract class Catalog implements AutoCloseable {
   public static final TUniqueId INITIAL_CATALOG_SERVICE_ID = new TUniqueId(0L, 0L);
   public static final String DEFAULT_DB = "default";
 
-  private final MetaStoreClientPool metaStoreClientPool_;
+  private MetaStoreClientPool metaStoreClientPool_;
 
   // Cache of authorization policy metadata. Populated from data retried from the
   // Sentry Service, if configured.
@@ -403,6 +404,14 @@ public abstract class Catalog implements AutoCloseable {
   @Override
   public void close() { metaStoreClientPool_.close(); }
 
+  @VisibleForTesting
+  public MetaStoreClientPool getMetaStoreClientPool() { return metaStoreClientPool_; }
+
+  @VisibleForTesting
+  public void setMetaStoreClientPool(MetaStoreClientPool pool) {
+    metaStoreClientPool_ = pool;
+  }
+
   /**
    * Returns a managed meta store client from the client connection pool.
    */
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 9f7ae5aea..59c24dda5 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -70,6 +70,7 @@ import org.apache.impala.catalog.events.MetastoreEvents.EventFactoryForSyncToLat
 import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventFactory;
 import org.apache.impala.catalog.events.MetastoreEventsProcessor;
 import org.apache.impala.catalog.events.MetastoreEventsProcessor.EventProcessorStatus;
+import org.apache.impala.catalog.events.MetastoreNotificationFetchException;
 import org.apache.impala.catalog.events.SelfEventContext;
 import org.apache.impala.catalog.metastore.CatalogHmsUtils;
 import org.apache.impala.catalog.monitor.CatalogMonitor;
@@ -2053,7 +2054,12 @@ public class CatalogServiceCatalog extends Catalog {
     // alter events, however it is likely that some tables would be unnecessarily
     // refreshed. That would happen when during reset, there were external alter events
     // and by the time we processed them, catalog had already loaded them.
-    long currentEventId = metastoreEventProcessor_.getCurrentEventId();
+    long currentEventId;
+    try {
+      currentEventId = metastoreEventProcessor_.getCurrentEventId();
+    } catch (MetastoreNotificationFetchException e) {
+      throw new CatalogException("Failed to fetch current event id", e);
+    }
     // pause the event processing since the cache is anyways being cleared
     metastoreEventProcessor_.pause();
     // Update the HDFS cache pools
diff --git a/fe/src/main/java/org/apache/impala/catalog/MetaStoreClientPool.java b/fe/src/main/java/org/apache/impala/catalog/MetaStoreClientPool.java
index fdec7fd99..7c4c455b8 100644
--- a/fe/src/main/java/org/apache/impala/catalog/MetaStoreClientPool.java
+++ b/fe/src/main/java/org/apache/impala/catalog/MetaStoreClientPool.java
@@ -106,7 +106,9 @@ public class MetaStoreClientPool {
         } catch (Exception e) {
           // If time is up, throw an unchecked exception
           long delayUntilMillis = System.currentTimeMillis() + retryDelayMillis;
-          if (delayUntilMillis >= endTimeMillis) throw new IllegalStateException(e);
+          if (delayUntilMillis >= endTimeMillis) {
+            throw new MetastoreClientInstantiationException(e);
+          }
 
           LOG.warn("Failed to connect to Hive MetaStore. Retrying.", e);
           while (delayUntilMillis > System.currentTimeMillis()) {
diff --git a/fe/src/main/java/org/apache/impala/catalog/MetastoreClientInstantiationException.java b/fe/src/main/java/org/apache/impala/catalog/MetastoreClientInstantiationException.java
new file mode 100644
index 000000000..86b6aaa64
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/MetastoreClientInstantiationException.java
@@ -0,0 +1,27 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog;
+
+/**
+ * Exception thrown in the constructor of MetaStoreClient when we fail to initialize it.
+ */
+public class MetastoreClientInstantiationException extends RuntimeException {
+  public MetastoreClientInstantiationException(Throwable cause) {
+    super(cause);
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/catalog/events/ExternalEventsProcessor.java b/fe/src/main/java/org/apache/impala/catalog/events/ExternalEventsProcessor.java
index 672a63e72..d9a70e7bf 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/ExternalEventsProcessor.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/ExternalEventsProcessor.java
@@ -38,7 +38,7 @@ public interface ExternalEventsProcessor {
    * Get the current event id on metastore. Useful for restarting the event processing
    * from a given event id
    */
-  long getCurrentEventId() throws CatalogException;
+  long getCurrentEventId() throws MetastoreNotificationFetchException;
 
   /**
    * Pauses the event processing. Use <code>start(fromEventId)</code> method below to
diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
index f066c7078..3a5a6efd7 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
@@ -48,7 +48,9 @@ import org.apache.impala.catalog.CatalogException;
 import org.apache.impala.catalog.CatalogServiceCatalog;
 import org.apache.impala.catalog.Db;
 import org.apache.impala.catalog.HdfsTable;
+import org.apache.impala.catalog.MetaStoreClientPool;
 import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
+import org.apache.impala.catalog.MetastoreClientInstantiationException;
 import org.apache.impala.catalog.Table;
 import org.apache.impala.catalog.IncompleteTable;
 import org.apache.impala.catalog.events.ConfigValidator.ValidationResult;
@@ -331,7 +333,7 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor {
         }
       }
       return result;
-    } catch (TException e) {
+    } catch (MetastoreClientInstantiationException | TException e) {
       throw new MetastoreNotificationFetchException(String.format(
           CatalogOpExecutor.HMS_RPC_ERROR_FORMAT_STR, "getNextNotification"), e);
     }
@@ -738,12 +740,12 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor {
    * Get the current notification event id from metastore
    */
   @Override
-  public long getCurrentEventId() throws CatalogException {
+  public long getCurrentEventId() throws MetastoreNotificationFetchException {
     try (MetaStoreClient metaStoreClient = catalog_.getMetaStoreClient()) {
       return metaStoreClient.getHiveClient().getCurrentNotificationEventId().getEventId();
-    } catch (TException e) {
-      throw new CatalogException("Unable to fetch the current notification event id. "
-          + "Check if metastore service is accessible");
+    } catch (MetastoreClientInstantiationException | TException e) {
+      throw new MetastoreNotificationFetchException("Unable to fetch the current " +
+          "notification event id. Check if metastore service is accessible", e);
     }
   }
 
@@ -785,10 +787,13 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor {
    * Get the event time by fetching the specified event from HMS.
    * @return 0 if the event has been cleaned up or any error occurs.
    */
-  private int getEventTimeFromHMS(long eventId) {
+  @VisibleForTesting
+  public int getEventTimeFromHMS(long eventId) {
     try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
       NotificationEvent event = getEventFromHMS(msClient, eventId);
       if (event != null) return event.getEventTime();
+    } catch (MetastoreClientInstantiationException e) {
+      LOG.error("Failed to get event time from HMS for event {}", eventId, e);
     }
     return 0;
   }
@@ -903,7 +908,7 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor {
         if (filter.accept(event)) filteredEvents.add(event);
       }
       return filteredEvents;
-    } catch (TException e) {
+    } catch (MetastoreClientInstantiationException | TException e) {
       throw new MetastoreNotificationFetchException(
           "Unable to fetch notifications from metastore. Last synced event id is "
               + eventId, e);
@@ -918,7 +923,7 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor {
    */
   @VisibleForTesting
   protected List<NotificationEvent> getNextMetastoreEvents()
-      throws MetastoreNotificationFetchException, CatalogException {
+      throws MetastoreNotificationFetchException {
     return getNextMetastoreEvents(getCurrentEventId());
   }
 
diff --git a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
index bba5f717a..bbdc22700 100644
--- a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
@@ -110,6 +110,7 @@ import org.apache.impala.hive.executor.TestHiveJavaFunctionFactory;
 import org.apache.impala.service.CatalogOpExecutor;
 import org.apache.impala.service.FeSupport;
 import org.apache.impala.testutil.CatalogServiceTestCatalog;
+import org.apache.impala.testutil.IncompetentMetastoreClientPool;
 import org.apache.impala.testutil.TestUtils;
 import org.apache.impala.thrift.TAlterDbParams;
 import org.apache.impala.thrift.TAlterDbSetOwnerParams;
@@ -242,7 +243,7 @@ public class MetastoreEventsProcessorTest {
    * @throws TException
    */
   @Before
-  public void beforeTest() throws TException, CatalogException {
+  public void beforeTest() throws Exception {
     try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
       msClient.getHiveClient().dropDatabase(TEST_DB_NAME, true, true, true);
     }
@@ -1432,10 +1433,10 @@ public class MetastoreEventsProcessorTest {
 
     @Override
     public List<NotificationEvent> getNextMetastoreEvents()
-        throws MetastoreNotificationFetchException, CatalogException {
+        throws MetastoreNotificationFetchException {
       // Throw exception roughly half of the time
       Random rand = new Random();
-      if (rand.nextInt(10) % 2 == 0){
+      if (rand.nextInt(10) % 2 == 0) {
         throw new MetastoreNotificationFetchException("Fetch Exception");
       }
       return super.getNextMetastoreEvents();
@@ -3366,6 +3367,72 @@ public class MetastoreEventsProcessorTest {
     }
   }
 
+  /**
+   * Test getCurrentEventId() throws MetastoreNotificationFetchException when there are
+   * connection issues with HMS.
+   */
+  @Test(expected = MetastoreNotificationFetchException.class)
+  public void testHMSClientFailureInGettingCurrentEventId() throws Exception {
+    MetaStoreClientPool origPool = catalog_.getMetaStoreClientPool();
+    try {
+      MetaStoreClientPool badPool = new IncompetentMetastoreClientPool(0, 0);
+      catalog_.setMetaStoreClientPool(badPool);
+      eventsProcessor_.getCurrentEventId();
+    } finally {
+      catalog_.setMetaStoreClientPool(origPool);
+    }
+  }
+
+  /**
+   * Test getNextMetastoreEvents() throws MetastoreNotificationFetchException when there
+   * are connection issues with HMS.
+   */
+  @Test(expected = MetastoreNotificationFetchException.class)
+  public void testHMSClientFailureInFetchingEvents() throws Exception {
+    MetaStoreClientPool origPool = catalog_.getMetaStoreClientPool();
+    try {
+      MetaStoreClientPool badPool = new IncompetentMetastoreClientPool(0, 0);
+      catalog_.setMetaStoreClientPool(badPool);
+      // Use a fake currentEventId that is larger than lastSyncedEventId
+      // so getNextMetastoreEvents() will fetch new events.
+      long currentEventId = eventsProcessor_.getLastSyncedEventId() + 1;
+      eventsProcessor_.getNextMetastoreEvents(currentEventId);
+    } finally {
+      catalog_.setMetaStoreClientPool(origPool);
+    }
+  }
+
+  /**
+   * Test getNextMetastoreEventsInBatches() throws MetastoreNotificationFetchException
+   * when there are connection issues with HMS.
+   */
+  @Test(expected = MetastoreNotificationFetchException.class)
+  public void testHMSClientFailureInFetchingEventsInBatches() throws Exception {
+    MetaStoreClientPool origPool = catalog_.getMetaStoreClientPool();
+    try {
+      MetaStoreClientPool badPool = new IncompetentMetastoreClientPool(0, 0);
+      catalog_.setMetaStoreClientPool(badPool);
+      MetastoreEventsProcessor.getNextMetastoreEventsInBatches(catalog_, 0, null);
+    } finally {
+      catalog_.setMetaStoreClientPool(origPool);
+    }
+  }
+
+  /**
+   * Test getEventTimeFromHMS() returns 0 when there are connection issues with HMS.
+   */
+  @Test
+  public void testHMSClientFailureInGettingEventTime() {
+    MetaStoreClientPool origPool = catalog_.getMetaStoreClientPool();
+    try {
+      MetaStoreClientPool badPool = new IncompetentMetastoreClientPool(0, 0);
+      catalog_.setMetaStoreClientPool(badPool);
+      assertEquals(0, eventsProcessor_.getEventTimeFromHMS(0));
+    } finally {
+      catalog_.setMetaStoreClientPool(origPool);
+    }
+  }
+
   private void createDatabase(String catName, String dbName,
       Map<String, String> params) throws TException {
     try(MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
diff --git a/fe/src/test/java/org/apache/impala/testutil/IncompetentMetastoreClientPool.java b/fe/src/test/java/org/apache/impala/testutil/IncompetentMetastoreClientPool.java
new file mode 100644
index 000000000..ab36ab6a5
--- /dev/null
+++ b/fe/src/test/java/org/apache/impala/testutil/IncompetentMetastoreClientPool.java
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.testutil;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
+import org.apache.impala.catalog.MetaStoreClientPool;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An implementation of MetastoreClientPool that creates HiveMetastoreClient objects
+ * using wrong HMS address to mimic HMS connection failures. Its getClient() will always
+ * fail and throws MetastoreClientInstantiationException.
+ */
+public class IncompetentMetastoreClientPool extends MetaStoreClientPool {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(IncompetentMetastoreClientPool.class);
+
+  public IncompetentMetastoreClientPool(int initialSize, int initialCnxnTimeoutSec) {
+    super(initialSize, initialCnxnTimeoutSec, generateHMSConfWithWrongAddr());
+  }
+
+  private static HiveConf generateHMSConfWithWrongAddr() {
+    LOG.info("Creating IncompetentMetastoreClientPool using a wrong HMS port");
+    HiveConf conf = new HiveConf(IncompetentMetastoreClientPool.class);
+    MetastoreConf.setVar(conf, ConfVars.THRIFT_URIS, "thrift://localhost:123");
+    return conf;
+  }
+}
diff --git a/testdata/bin/run-hive-server.sh b/testdata/bin/run-hive-server.sh
index fa57972ea..245d4ef09 100755
--- a/testdata/bin/run-hive-server.sh
+++ b/testdata/bin/run-hive-server.sh
@@ -30,6 +30,7 @@ METASTORE_TRANSPORT="buffered"
 START_METASTORE=1
 START_HIVESERVER=1
 ENABLE_RANGER_AUTH=0
+RESTART_SERVICE=1
 
 CLUSTER_BIN=${IMPALA_HOME}/testdata/bin
 
@@ -57,11 +58,15 @@ do
     -only_hiveserver)
       START_METASTORE=0
       ;;
+    -if_not_running)
+      RESTART_SERVICE=0
+      ;;
     -help|-h|*)
       echo "run-hive-server.sh : Starts the hive server and the metastore."
       echo "[-only_metastore] : Only starts the hive metastore."
       echo "[-only_hiveserver] : Only starts the hive server."
       echo "[-with_ranger] : Starts with Ranger authorization (only for Hive 3)."
+      echo "[-if_not_running] : Only starts services when they are not running."
       exit 1;
       ;;
     esac
@@ -73,13 +78,37 @@ if [[ $START_METASTORE -eq 0 && $START_HIVESERVER -eq 0 ]]; then
   exit 1;
 fi
 
+NEEDS_START=0
+HMS_PID=
+HS2_PID=
+if [[ $START_METASTORE -eq 1 && $RESTART_SERVICE -eq 0 ]]; then
+  HMS_PID=$(jps -m | (grep HiveMetaStore || true) | awk '{print $1}')
+  if [[ -n $HMS_PID ]]; then
+    echo "Found HiveMetaStore running. PID=$HMS_PID"
+  else
+    NEEDS_START=1
+  fi
+fi
+if [[ $START_HIVESERVER -eq 1 && $RESTART_SERVICE -eq 0 ]]; then
+  HS2_PID=$(jps -m | (grep HiveServer || true) | awk '{print $1}')
+  if [[ -n $HS2_PID ]]; then
+    echo "Found HiveServer running. PID=$HS2_PID"
+  else
+    NEEDS_START=1
+  fi
+fi
+if [[ $NEEDS_START -eq 0 && $RESTART_SERVICE -eq 0 ]]; then
+  echo "Required services are all running."
+  exit 0
+fi
+
 # TODO: We should have a retry loop for every service we start.
 # Kill for a clean start.
-if [[ $START_HIVESERVER -eq 1 ]]; then
+if [[ $START_HIVESERVER -eq 1 && $RESTART_SERVICE -eq 1 ]]; then
   ${CLUSTER_BIN}/kill-hive-server.sh -only_hiveserver &> /dev/null
 fi
 
-if [[ $START_METASTORE -eq 1 ]]; then
+if [[ $START_METASTORE -eq 1 && $RESTART_SERVICE -eq 1 ]]; then
   ${CLUSTER_BIN}/kill-hive-server.sh -only_metastore &> /dev/null
 fi
 
@@ -129,7 +158,7 @@ export KUDU_SKIP_HMS_PLUGIN_VALIDATION=${KUDU_SKIP_HMS_PLUGIN_VALIDATION:-1}
 # Starts a Hive Metastore Server on the specified port.
 # To debug log4j2 loading issues, add to HADOOP_CLIENT_OPTS:
 #   -Dorg.apache.logging.log4j.simplelog.StatusLogger.level=TRACE
-if [ ${START_METASTORE} -eq 1 ]; then
+if [[ ${START_METASTORE} -eq 1 && -z $HMS_PID ]]; then
   HADOOP_CLIENT_OPTS="-Xmx2024m -Dhive.log.file=hive-metastore.log" hive \
       --service metastore -p $HIVE_METASTORE_PORT >> ${LOGDIR}/hive-metastore.out 2>&1 &
 
@@ -147,7 +176,7 @@ export LD_LIBRARY_PATH="${LD_LIBRARY_PATH-}:${GCC_HOME}/lib64"
 
 export HIVESERVER2_HADOOP_OPTS="-Xdebug -Xrunjdwp:transport=dt_socket,server=y,\
 suspend=n,address=30020"
-if [ ${START_HIVESERVER} -eq 1 ]; then
+if [[ ${START_HIVESERVER} -eq 1 && -z $HS2_PID ]]; then
   # Starts a HiveServer2 instance on the port specified by the HIVE_SERVER2_THRIFT_PORT
   # environment variable. HADOOP_HEAPSIZE should be set to at least 2048 to avoid OOM
   # when loading ORC tables like widerow.
diff --git a/tests/custom_cluster/test_catalog_hms_failures.py b/tests/custom_cluster/test_catalog_hms_failures.py
index 9c1623bb8..35350da2c 100644
--- a/tests/custom_cluster/test_catalog_hms_failures.py
+++ b/tests/custom_cluster/test_catalog_hms_failures.py
@@ -26,6 +26,7 @@ from tests.common.custom_cluster_test_suite import (
     CustomClusterTestSuite,
     DEFAULT_CLUSTER_SIZE)
 from tests.common.skip import SkipIf
+from tests.util.event_processor_utils import EventProcessorUtils
 from tests.util.filesystem_utils import IS_ISILON, IS_LOCAL
 
 
@@ -43,23 +44,21 @@ class TestHiveMetaStoreFailure(CustomClusterTestSuite):
 
   @classmethod
   def setup_class(cls):
-    if cls.exploration_strategy() != 'exhaustive':
-      pytest.skip('These tests only run in exhaustive')
     super(TestHiveMetaStoreFailure, cls).setup_class()
 
   @classmethod
-  def run_hive_server(cls):
+  def run_hive_metastore(cls, if_not_running=False):
     script = os.path.join(os.environ['IMPALA_HOME'], 'testdata/bin/run-hive-server.sh')
-    run_cmd = [script]
-    if IS_LOCAL or IS_ISILON:
-      run_cmd.append('-only_metastore')
+    run_cmd = [script, '-only_metastore']
+    if if_not_running:
+      run_cmd.append('-if_not_running')
     check_call(run_cmd, close_fds=True)
 
   @classmethod
   def teardown_class(cls):
     # Make sure the metastore is running even if the test aborts somewhere unexpected
     # before restarting the metastore itself.
-    cls.run_hive_server()
+    cls.run_hive_metastore(if_not_running=True)
     super(TestHiveMetaStoreFailure, cls).teardown_class()
 
   @pytest.mark.execute_serially
@@ -73,7 +72,7 @@ class TestHiveMetaStoreFailure(CustomClusterTestSuite):
     tbl_name = "functional.alltypes"
     self.client.execute("invalidate metadata %s" % tbl_name)
     kill_cmd = os.path.join(os.environ['IMPALA_HOME'], 'testdata/bin/kill-hive-server.sh')
-    check_call([kill_cmd], close_fds=True)
+    check_call([kill_cmd, '-only_metastore'], close_fds=True)
 
     try:
       self.client.execute("describe %s" % tbl_name)
@@ -81,7 +80,7 @@ class TestHiveMetaStoreFailure(CustomClusterTestSuite):
       print(str(e))
       assert "Failed to load metadata for table: %s. Running 'invalidate metadata %s' "\
           "may resolve this problem." % (tbl_name, tbl_name) in str(e)
-    self.run_hive_server()
+    self.run_hive_metastore()
 
     self.client.execute("invalidate metadata %s" % tbl_name)
     self.client.execute("describe %s" % tbl_name)
@@ -97,7 +96,7 @@ class TestHiveMetaStoreFailure(CustomClusterTestSuite):
     tbl_name = "functional.alltypes"
     self.client.execute("invalidate metadata %s" % tbl_name)
     kill_cmd = os.path.join(os.environ['IMPALA_HOME'], 'testdata/bin/kill-hive-server.sh')
-    check_call([kill_cmd], close_fds=True)
+    check_call([kill_cmd, '-only_metastore'], close_fds=True)
 
     # Run a query asynchronously.
     query = "select * from {0} limit 1".format(tbl_name)
@@ -107,13 +106,22 @@ class TestHiveMetaStoreFailure(CustomClusterTestSuite):
 
     # Wait 1 second for the catalogd to start contacting HMS, then start HMS.
     time.sleep(1)
-    self.run_hive_server()
+    self.run_hive_metastore()
 
     # Wait for the query to complete, assert that the HMS client retried the connection.
     thread.join()
     self.assert_catalogd_log_contains("INFO",
         "MetaStoreClient lost connection. Attempting to reconnect", expected_count=-1)
 
+  @CustomClusterTestSuite.with_args(
+    impalad_args='--use_local_catalog',
+    catalogd_args='--catalog_topic_mode=minimal')
+  def test_event_processor_tolerate_hms_restart(self):
+    """IMPALA-12561: Test that event-processor won't go into ERROR state when there are
+    connection issues with HMS (mocked by a restart on HMS)"""
+    assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
+    self.run_hive_metastore()
+    assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
 
 @SkipIf.is_test_jdk
 class TestCatalogHMSFailures(CustomClusterTestSuite):
@@ -126,11 +134,11 @@ class TestCatalogHMSFailures(CustomClusterTestSuite):
     super(TestCatalogHMSFailures, cls).setup_class()
 
   @classmethod
-  def run_hive_server(cls):
+  def run_hive_metastore(cls, if_not_running=False):
     script = os.path.join(os.environ['IMPALA_HOME'], 'testdata/bin/run-hive-server.sh')
-    run_cmd = [script]
-    if IS_LOCAL or IS_ISILON:
-      run_cmd.append('-only_metastore')
+    run_cmd = [script, '-only_metastore']
+    if if_not_running:
+      run_cmd.append('-if_not_running')
     check_call(run_cmd, close_fds=True)
 
   @classmethod
@@ -148,7 +156,7 @@ class TestCatalogHMSFailures(CustomClusterTestSuite):
   def teardown_class(cls):
     # Make sure the metastore is running even if the test aborts somewhere unexpected
     # before restarting the metastore itself.
-    cls.run_hive_server()
+    cls.run_hive_metastore(if_not_running=True)
     super(TestCatalogHMSFailures, cls).teardown_class()
 
   @classmethod
@@ -169,9 +177,9 @@ class TestCatalogHMSFailures(CustomClusterTestSuite):
     client = impalad.service.create_beeswax_client()
     self.reload_metadata(client)
 
-    # Kill Hive
+    # Kill HMS
     kill_cmd = os.path.join(os.environ['IMPALA_HOME'], 'testdata/bin/kill-hive-server.sh')
-    check_call([kill_cmd], close_fds=True)
+    check_call([kill_cmd, '-only_metastore'], close_fds=True)
 
     # Metadata load should fail quickly
     start = time.time()
@@ -184,8 +192,8 @@ class TestCatalogHMSFailures(CustomClusterTestSuite):
     end = time.time()
     assert end - start < 30, "Metadata load hasn't failed quickly enough"
 
-    # Start Hive
-    self.run_hive_server()
+    # Start HMS
+    self.run_hive_metastore()
 
     # Metadata load should work now
     self.reload_metadata(client)
@@ -203,9 +211,9 @@ class TestCatalogHMSFailures(CustomClusterTestSuite):
     client = impalad.service.create_beeswax_client()
     self.reload_metadata(client)
 
-    # Kill Hive
+    # Kill HMS
     kill_cmd = os.path.join(os.environ['IMPALA_HOME'], 'testdata/bin/kill-hive-server.sh')
-    check_call([kill_cmd], close_fds=True)
+    check_call([kill_cmd, '-only_metastore'], close_fds=True)
 
     # Kill the catalogd.
     catalogd = self.cluster.catalogd
@@ -222,8 +230,8 @@ class TestCatalogHMSFailures(CustomClusterTestSuite):
       # startup.
       time.sleep(10)
 
-      # Start Hive and wait for catalogd to come up
-      self.run_hive_server()
+      # Start HMS and wait for catalogd to come up
+      self.run_hive_metastore()
       statestored.service.wait_for_live_subscribers(NUM_SUBSCRIBERS, timeout=60)
       impalad.service.wait_for_metric_value('catalog.ready', True, timeout=60)
 
@@ -245,9 +253,9 @@ class TestCatalogHMSFailures(CustomClusterTestSuite):
     client = impalad.service.create_beeswax_client()
     self.reload_metadata(client)
 
-    # Kill Hive
+    # Kill HMS
     kill_cmd = os.path.join(os.environ['IMPALA_HOME'], 'testdata/bin/kill-hive-server.sh')
-    check_call([kill_cmd], close_fds=True)
+    check_call([kill_cmd, '-only_metastore'], close_fds=True)
 
     # Kill the catalogd.
     catalogd = self.cluster.catalogd
@@ -264,8 +272,8 @@ class TestCatalogHMSFailures(CustomClusterTestSuite):
       # than initial_hms_cnxn_timeout_s.
       time.sleep(40)
 
-      # Start Hive
-      self.run_hive_server()
+      # Start HMS
+      self.run_hive_metastore()
 
       # catalogd has terminated by now
       assert not catalogd.get_pid(), "catalogd should have terminated"