You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/01/14 23:15:15 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-1016] Allow Gobblin Application Master to join Helix cluster …

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new ff0b417  [GOBBLIN-1016] Allow Gobblin Application Master to join Helix cluster …
ff0b417 is described below

commit ff0b417082d9e92634bb4c2719c8f3decb17491e
Author: sv2000 <su...@gmail.com>
AuthorDate: Tue Jan 14 15:15:09 2020 -0800

    [GOBBLIN-1016] Allow Gobblin Application Master to join Helix cluster …
    
    Closes #2863 from sv2000/sendShutdown
---
 .../cluster/GobblinHelixMessagingService.java      |   8 +-
 .../gobblin/cluster/GobblinHelixMultiManager.java  |   2 +-
 .../yarn/AbstractYarnAppSecurityManager.java       |  27 ++++--
 .../gobblin/yarn/GobblinYarnAppLauncher.java       |  29 ++++--
 .../yarn/YarnAppSecurityManagerWithKeytabs.java    |  32 ++++---
 .../gobblin/yarn/GobblinYarnAppLauncherTest.java   | 103 ++++++++++++++++-----
 .../gobblin/yarn/YarnSecurityManagerTest.java      |  40 +++++++-
 7 files changed, 184 insertions(+), 57 deletions(-)

diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMessagingService.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMessagingService.java
index 4101f17..2132335 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMessagingService.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMessagingService.java
@@ -173,13 +173,17 @@ public class GobblinHelixMessagingService extends DefaultMessagingService {
      * @param row row of currently persisted data
      * @return true if it matches, false otherwise
      */
+
     private boolean rowMatches(Criteria criteria, ZNRecordRow row) {
       String instanceName = normalizePattern(criteria.getInstanceName());
       String resourceName = normalizePattern(criteria.getResource());
       String partitionName = normalizePattern(criteria.getPartition());
       String partitionState = normalizePattern(criteria.getPartitionState());
-      return stringMatches(instanceName, row.getMapSubKey()) && stringMatches(resourceName, row.getRecordId())
-          && stringMatches(partitionName, row.getMapKey()) && stringMatches(partitionState, row.getMapValue());
+      return (stringMatches(instanceName, Strings.nullToEmpty(row.getMapSubKey())) ||
+          stringMatches(instanceName, Strings.nullToEmpty(row.getRecordId())))
+          && stringMatches(resourceName, Strings.nullToEmpty(row.getRecordId()))
+          && stringMatches(partitionName, Strings.nullToEmpty(row.getMapKey()))
+          && stringMatches(partitionState, Strings.nullToEmpty(row.getMapValue()));
     }
 
     /**
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMultiManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMultiManager.java
index cdc720a..7b25134 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMultiManager.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMultiManager.java
@@ -232,7 +232,7 @@ public class GobblinHelixMultiManager implements StandardMetricsBridge {
       this.managerClusterHelixManager = buildHelixManager(this.config,
           zkConnectionString,
           GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY,
-          isHelixClusterManaged ? InstanceType.ADMINISTRATOR : InstanceType.CONTROLLER);
+          isHelixClusterManaged ? InstanceType.PARTICIPANT : InstanceType.CONTROLLER);
       this.jobClusterHelixManager = this.managerClusterHelixManager;
     }
   }
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/AbstractYarnAppSecurityManager.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/AbstractYarnAppSecurityManager.java
index 0b069cd..af31fa2 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/AbstractYarnAppSecurityManager.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/AbstractYarnAppSecurityManager.java
@@ -17,20 +17,13 @@
 
 package org.apache.gobblin.yarn;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.base.Throwables;
-import com.google.common.util.concurrent.AbstractIdleService;
-import com.typesafe.config.Config;
 import java.io.IOException;
 import java.util.UUID;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
-import org.apache.gobblin.cluster.GobblinHelixMessagingService;
-import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.util.ExecutorsUtils;
+
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsAction;
@@ -44,6 +37,17 @@ import org.apache.helix.model.Message;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Strings;
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.cluster.GobblinHelixMessagingService;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
+
 /**
  * <p>
  *   The super class for key management
@@ -167,8 +171,13 @@ public abstract class AbstractYarnAppSecurityManager extends AbstractIdleService
    */
   @VisibleForTesting
   protected void sendTokenFileUpdatedMessage(InstanceType instanceType) {
+    sendTokenFileUpdatedMessage(instanceType, "");
+  }
+
+  @VisibleForTesting
+  protected void sendTokenFileUpdatedMessage(InstanceType instanceType, String instanceName) {
     Criteria criteria = new Criteria();
-    criteria.setInstanceName("%");
+    criteria.setInstanceName(Strings.isNullOrEmpty(instanceName) ? "%" : instanceName);
     criteria.setResource("%");
     criteria.setPartition("%");
     criteria.setPartitionState("%");
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
index 6aee52b..e0a66e8 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
@@ -92,8 +92,10 @@ import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
 import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+import org.apache.gobblin.cluster.GobblinClusterManager;
 import org.apache.gobblin.cluster.GobblinClusterUtils;
 import org.apache.gobblin.cluster.GobblinHelixConstants;
+import org.apache.gobblin.cluster.GobblinHelixMessagingService;
 import org.apache.gobblin.cluster.HelixUtils;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.rest.JobExecutionInfoServer;
@@ -188,6 +190,8 @@ public class GobblinYarnAppLauncher {
   private final Path sinkLogRootDir;
 
   private final Closer closer = Closer.create();
+  private final String helixInstanceName;
+  private final GobblinHelixMessagingService messagingService;
 
   // Yarn application ID
   private volatile Optional<ApplicationId> applicationId = Optional.absent();
@@ -207,6 +211,7 @@ public class GobblinYarnAppLauncher {
   private volatile boolean stopped = false;
 
   private final boolean emailNotificationOnShutdown;
+  private final boolean isHelixClusterManaged;
 
   private final int appMasterMemoryMbs;
   private final int jvmMemoryOverheadMbs;
@@ -275,6 +280,14 @@ public class GobblinYarnAppLauncher {
         GobblinYarnConfigurationKeys.DEFAULT_APP_VIEW_ACL);
     this.containerTimezone = ConfigUtils.getString(this.config, GobblinYarnConfigurationKeys.GOBBLIN_YARN_CONTAINER_TIMEZONE,
         GobblinYarnConfigurationKeys.DEFAULT_GOBBLIN_YARN_CONTAINER_TIMEZONE);
+
+    this.isHelixClusterManaged = ConfigUtils.getBoolean(this.config, GobblinClusterConfigurationKeys.IS_HELIX_CLUSTER_MANAGED,
+        GobblinClusterConfigurationKeys.DEFAULT_IS_HELIX_CLUSTER_MANAGED);
+    this.helixInstanceName = ConfigUtils.getString(config, GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_KEY,
+        GobblinClusterManager.class.getSimpleName());
+
+    this.messagingService = new GobblinHelixMessagingService(this.helixManager);
+
   }
 
   /**
@@ -286,9 +299,7 @@ public class GobblinYarnAppLauncher {
   public void launch() throws IOException, YarnException {
     this.eventBus.register(this);
 
-    boolean isHelixClusterManaged = ConfigUtils.getBoolean(this.config, GobblinClusterConfigurationKeys.IS_HELIX_CLUSTER_MANAGED,
-        GobblinClusterConfigurationKeys.DEFAULT_IS_HELIX_CLUSTER_MANAGED);
-    if (isHelixClusterManaged) {
+    if (this.isHelixClusterManaged) {
       LOGGER.info("Helix cluster is managed; skipping creation of Helix cluster");
     } else {
       String clusterName = this.config.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
@@ -797,10 +808,16 @@ public class GobblinYarnAppLauncher {
   void sendShutdownRequest() {
     Criteria criteria = new Criteria();
     criteria.setInstanceName("%");
-    criteria.setResource("%");
     criteria.setPartition("%");
     criteria.setPartitionState("%");
-    criteria.setRecipientInstanceType(InstanceType.CONTROLLER);
+    criteria.setResource("%");
+    if (this.isHelixClusterManaged) {
+      //In the managed mode, the Gobblin Yarn Application Master connects to the Helix cluster in the Participant role.
+      criteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
+      criteria.setInstanceName(this.helixInstanceName);
+    } else {
+      criteria.setRecipientInstanceType(InstanceType.CONTROLLER);
+    }
     criteria.setSessionSpecific(true);
 
     Message shutdownRequest = new Message(GobblinHelixConstants.SHUTDOWN_MESSAGE_TYPE,
@@ -809,7 +826,7 @@ public class GobblinYarnAppLauncher {
     shutdownRequest.setMsgState(Message.MessageState.NEW);
     shutdownRequest.setTgtSessionId("*");
 
-    int messagesSent = this.helixManager.getMessagingService().send(criteria, shutdownRequest);
+    int messagesSent = this.messagingService.send(criteria, shutdownRequest);
     if (messagesSent == 0) {
       LOGGER.error(String.format("Failed to send the %s message to the controller", shutdownRequest.getMsgSubType()));
     }
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAppSecurityManagerWithKeytabs.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAppSecurityManagerWithKeytabs.java
index 3ee6107..5f234d2 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAppSecurityManagerWithKeytabs.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAppSecurityManagerWithKeytabs.java
@@ -25,18 +25,18 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-
 import org.apache.helix.HelixManager;
 import org.apache.helix.InstanceType;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Strings;
-
 import com.typesafe.config.Config;
 
+import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+import org.apache.gobblin.cluster.GobblinClusterManager;
+import org.apache.gobblin.util.ConfigUtils;
+
 
 /**
  * A class for managing Kerberos login and token renewing on the client side that has access to
@@ -62,6 +62,7 @@ import com.typesafe.config.Config;
  */
 public class YarnAppSecurityManagerWithKeytabs extends AbstractYarnAppSecurityManager {
 
+  private final String helixInstanceName;
   private UserGroupInformation loginUser;
   private Optional<ScheduledFuture<?>> scheduledTokenRenewTask = Optional.absent();
 
@@ -69,11 +70,16 @@ public class YarnAppSecurityManagerWithKeytabs extends AbstractYarnAppSecurityMa
   // sent to the controller and the participants as they may not be up running yet. The first login
   // happens after this class starts up so the token gets regularly refreshed before the next login.
   private volatile boolean firstLogin = true;
+  private final boolean isHelixClusterManaged;
 
   public YarnAppSecurityManagerWithKeytabs(Config config, HelixManager helixManager, FileSystem fs, Path tokenFilePath)
       throws IOException {
     super(config, helixManager, fs, tokenFilePath);
     this.loginUser = UserGroupInformation.getLoginUser();
+    this.isHelixClusterManaged = ConfigUtils.getBoolean(config, GobblinClusterConfigurationKeys.IS_HELIX_CLUSTER_MANAGED,
+        GobblinClusterConfigurationKeys.DEFAULT_IS_HELIX_CLUSTER_MANAGED);
+    this.helixInstanceName = ConfigUtils.getString(config, GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_KEY,
+        GobblinClusterManager.class.getSimpleName());
   }
 
   /**
@@ -84,9 +90,12 @@ public class YarnAppSecurityManagerWithKeytabs extends AbstractYarnAppSecurityMa
     writeDelegationTokenToFile();
 
     if (!this.firstLogin) {
-      // Send a message to the controller and all the participants if this is not the first login
-      sendTokenFileUpdatedMessage(InstanceType.CONTROLLER);
-      sendTokenFileUpdatedMessage(InstanceType.PARTICIPANT);
+      // Send a message to the controller (when the cluster is not managed)
+      // and all the participants if this is not the first login
+      if (!this.isHelixClusterManaged) {
+        sendTokenFileUpdatedMessage(InstanceType.CONTROLLER);
+      }
+      sendTokenFileUpdatedMessage(InstanceType.PARTICIPANT, this.helixInstanceName);
     }
   }
 
@@ -129,10 +138,11 @@ public class YarnAppSecurityManagerWithKeytabs extends AbstractYarnAppSecurityMa
     writeDelegationTokenToFile();
 
     if (!this.firstLogin) {
-      // Send a message to the controller and all the participants
-      sendTokenFileUpdatedMessage(InstanceType.CONTROLLER);
-      sendTokenFileUpdatedMessage(InstanceType.PARTICIPANT);
+      // Send a message to the controller (when the cluster is not managed) and all the participants
+      if (!this.isHelixClusterManaged) {
+        sendTokenFileUpdatedMessage(InstanceType.CONTROLLER);
+      }
+      sendTokenFileUpdatedMessage(InstanceType.PARTICIPANT, this.helixInstanceName);
     }
   }
-
 }
diff --git a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
index 2c64f64..9c37748 100644
--- a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
+++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
@@ -17,15 +17,6 @@
 
 package org.apache.gobblin.yarn;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Predicate;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.eventbus.EventBus;
-import com.google.common.io.Closer;
-import com.google.common.util.concurrent.Service;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigValueFactory;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
@@ -42,18 +33,6 @@ import java.util.concurrent.TimeoutException;
 
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.test.TestingServer;
-import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
-import org.apache.gobblin.cluster.GobblinHelixConstants;
-import org.apache.gobblin.cluster.GobblinHelixMultiManager;
-import org.apache.gobblin.cluster.HelixMessageTestBase;
-import org.apache.gobblin.cluster.HelixUtils;
-import org.apache.gobblin.cluster.TestHelper;
-import org.apache.gobblin.cluster.TestShutdownMessageHandlerFactory;
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.configuration.DynamicConfigGenerator;
-import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
-import org.apache.gobblin.testing.AssertWithBackoff;
-
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -74,6 +53,28 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.eventbus.EventBus;
+import com.google.common.io.Closer;
+import com.google.common.util.concurrent.Service;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+import org.apache.gobblin.cluster.GobblinHelixConstants;
+import org.apache.gobblin.cluster.GobblinHelixMultiManager;
+import org.apache.gobblin.cluster.HelixMessageTestBase;
+import org.apache.gobblin.cluster.HelixUtils;
+import org.apache.gobblin.cluster.TestHelper;
+import org.apache.gobblin.cluster.TestShutdownMessageHandlerFactory;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.DynamicConfigGenerator;
+import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
+import org.apache.gobblin.testing.AssertWithBackoff;
+
 import static org.mockito.Mockito.times;
 
 
@@ -92,6 +93,8 @@ import static org.mockito.Mockito.times;
  */
 @Test(groups = { "gobblin.yarn" }, singleThreaded=true)
 public class GobblinYarnAppLauncherTest implements HelixMessageTestBase {
+  private static final Object MANAGED_HELIX_CLUSTER_NAME = "GobblinYarnAppLauncherTestManagedHelix";
+  public static final String TEST_HELIX_INSTANCE_NAME_MANAGED = HelixUtils.getHelixInstanceName("TestInstance", 1);
 
   public static final String DYNAMIC_CONF_PATH = "dynamic.conf";
   public static final String YARN_SITE_XML_PATH = "yarn-site.xml";
@@ -102,10 +105,13 @@ public class GobblinYarnAppLauncherTest implements HelixMessageTestBase {
   private CuratorFramework curatorFramework;
 
   private Config config;
+  private Config configManagedHelix;
 
   private HelixManager helixManager;
+  private HelixManager helixManagerManagedHelix;
 
   private GobblinYarnAppLauncher gobblinYarnAppLauncher;
+  private GobblinYarnAppLauncher gobblinYarnAppLauncherManagedHelix;
   private ApplicationId applicationId;
 
   private final Closer closer = Closer.create();
@@ -185,6 +191,20 @@ public class GobblinYarnAppLauncherTest implements HelixMessageTestBase {
         InstanceType.CONTROLLER, zkConnectionString);
 
     this.gobblinYarnAppLauncher = new GobblinYarnAppLauncher(this.config, clusterConf);
+
+    this.configManagedHelix = ConfigFactory.parseURL(url)
+        .withValue("gobblin.cluster.zk.connection.string",
+            ConfigValueFactory.fromAnyRef(testingZKServer.getConnectString()))
+        .withValue(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY, ConfigValueFactory.fromAnyRef(MANAGED_HELIX_CLUSTER_NAME))
+        .withValue(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_KEY, ConfigValueFactory.fromAnyRef(TEST_HELIX_INSTANCE_NAME_MANAGED))
+        .withValue(GobblinClusterConfigurationKeys.IS_HELIX_CLUSTER_MANAGED, ConfigValueFactory.fromAnyRef("true"))
+        .resolve();
+
+    this.helixManagerManagedHelix = HelixManagerFactory.getZKHelixManager(
+        this.configManagedHelix.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY), TEST_HELIX_INSTANCE_NAME_MANAGED,
+        InstanceType.PARTICIPANT, zkConnectionString);
+
+    this.gobblinYarnAppLauncherManagedHelix = new GobblinYarnAppLauncher(this.configManagedHelix, clusterConf);
   }
 
   @Test
@@ -208,6 +228,18 @@ public class GobblinYarnAppLauncherTest implements HelixMessageTestBase {
         this.curatorFramework.checkExists()
             .forPath(String.format("/%s/CONTROLLER", GobblinYarnAppLauncherTest.class.getSimpleName())).getVersion(),
         0);
+
+    //Create managed Helix cluster and test it is created successfully
+    HelixUtils.createGobblinHelixCluster(
+        this.configManagedHelix.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY),
+        this.configManagedHelix.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY));
+
+    Assert.assertEquals(this.curatorFramework.checkExists()
+        .forPath(String.format("/%s/CONTROLLER", MANAGED_HELIX_CLUSTER_NAME)).getVersion(), 0);
+    Assert.assertEquals(
+        this.curatorFramework.checkExists()
+            .forPath(String.format("/%s/CONTROLLER", MANAGED_HELIX_CLUSTER_NAME)).getVersion(),
+        0);
   }
 
   /**
@@ -265,8 +297,8 @@ public class GobblinYarnAppLauncherTest implements HelixMessageTestBase {
     Assert.assertEquals(this.curatorFramework.checkExists()
         .forPath(String.format("/%s/CONTROLLER/MESSAGES", GobblinYarnAppLauncherTest.class.getSimpleName()))
         .getVersion(), 0);
-    YarnSecurityManagerTest.GetControllerMessageNumFunc getCtrlMessageNum =
-        new YarnSecurityManagerTest.GetControllerMessageNumFunc(GobblinYarnAppLauncherTest.class.getSimpleName(),
+    YarnSecurityManagerTest.GetHelixMessageNumFunc getCtrlMessageNum =
+        new YarnSecurityManagerTest.GetHelixMessageNumFunc(GobblinYarnAppLauncherTest.class.getSimpleName(), InstanceType.CONTROLLER, "",
             this.curatorFramework);
     AssertWithBackoff assertWithBackoff =
         AssertWithBackoff.create().logger(LoggerFactory.getLogger("testSendShutdownRequest")).timeoutMs(20000);
@@ -274,6 +306,27 @@ public class GobblinYarnAppLauncherTest implements HelixMessageTestBase {
 
     // Give Helix sometime to handle the message
     assertWithBackoff.assertEquals(getCtrlMessageNum, 0, "all controller messages processed");
+
+    this.helixManagerManagedHelix.connect();
+    this.helixManagerManagedHelix.getMessagingService().registerMessageHandlerFactory(GobblinHelixConstants.SHUTDOWN_MESSAGE_TYPE,
+        new TestShutdownMessageHandlerFactory(this));
+
+    this.gobblinYarnAppLauncherManagedHelix.connectHelixManager();
+    this.gobblinYarnAppLauncherManagedHelix.sendShutdownRequest();
+
+    Assert.assertEquals(this.curatorFramework.checkExists()
+        .forPath(String.format("/%s/INSTANCES/%s/MESSAGES", this.configManagedHelix.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY), TEST_HELIX_INSTANCE_NAME_MANAGED))
+        .getVersion(), 0);
+    YarnSecurityManagerTest.GetHelixMessageNumFunc getInstanceMessageNum =
+        new YarnSecurityManagerTest.GetHelixMessageNumFunc(this.configManagedHelix.getString(
+            GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY),
+            InstanceType.PARTICIPANT, TEST_HELIX_INSTANCE_NAME_MANAGED, this.curatorFramework);
+    assertWithBackoff =
+        AssertWithBackoff.create().logger(LoggerFactory.getLogger("testSendShutdownRequest")).timeoutMs(20000);
+    assertWithBackoff.assertEquals(getInstanceMessageNum, 1, "1 controller message queued");
+
+    // Give Helix sometime to handle the message
+    assertWithBackoff.assertEquals(getInstanceMessageNum, 0, "all controller messages processed");
   }
 
   @AfterClass
@@ -288,6 +341,10 @@ public class GobblinYarnAppLauncherTest implements HelixMessageTestBase {
         this.helixManager.disconnect();
       }
 
+      if (this.helixManagerManagedHelix.isConnected()) {
+        this.helixManagerManagedHelix.disconnect();
+      }
+
       this.gobblinYarnAppLauncher.disconnectHelixManager();
 
       if (applicationId != null) {
diff --git a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnSecurityManagerTest.java b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnSecurityManagerTest.java
index 14a8d5b..489270a 100644
--- a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnSecurityManagerTest.java
+++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnSecurityManagerTest.java
@@ -77,10 +77,12 @@ import org.apache.gobblin.testing.AssertWithBackoff;
 @Test(groups = { "gobblin.yarn" })
 public class YarnSecurityManagerTest {
   final Logger LOG = LoggerFactory.getLogger(YarnSecurityManagerTest.class);
+  private static final String HELIX_TEST_INSTANCE_PARTICIPANT = HelixUtils.getHelixInstanceName("TestInstance", 1);
 
   private CuratorFramework curatorFramework;
 
   private HelixManager helixManager;
+  private HelixManager helixManagerParticipant;
 
   private Configuration configuration;
   private FileSystem localFs;
@@ -121,6 +123,10 @@ public class YarnSecurityManagerTest {
         helixClusterName, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.SPECTATOR, zkConnectingString);
     this.helixManager.connect();
 
+    this.helixManagerParticipant = HelixManagerFactory.getZKHelixManager(
+        helixClusterName, HELIX_TEST_INSTANCE_PARTICIPANT, InstanceType.PARTICIPANT, zkConnectingString);
+    this.helixManagerParticipant.connect();
+
     this.configuration = new Configuration();
     this.localFs = Mockito.spy(FileSystem.getLocal(this.configuration));
 
@@ -150,20 +156,34 @@ public class YarnSecurityManagerTest {
   }
 
 
-  static class GetControllerMessageNumFunc implements Function<Void, Integer> {
+  static class GetHelixMessageNumFunc implements Function<Void, Integer> {
     private final CuratorFramework curatorFramework;
     private final String testName;
+    private final String instanceName;
+    private final InstanceType instanceType;
+    private final String path;
 
-    public GetControllerMessageNumFunc(String testName, CuratorFramework curatorFramework) {
+    public GetHelixMessageNumFunc(String testName, InstanceType instanceType, String instanceName, CuratorFramework curatorFramework) {
       this.curatorFramework = curatorFramework;
       this.testName = testName;
+      this.instanceType = instanceType;
+      this.instanceName = instanceName;
+      switch (instanceType) {
+        case CONTROLLER:
+          this.path = String.format("/%s/CONTROLLER/MESSAGES", this.testName);
+          break;
+        case PARTICIPANT:
+          this.path = String.format("/%s/INSTANCES/%s/MESSAGES", this.testName, this.instanceName);
+          break;
+        default:
+          throw new RuntimeException("Invalid instance type " + instanceType.name());
+      }
     }
 
     @Override
     public Integer apply(Void input) {
       try {
-        return this.curatorFramework.getChildren().forPath(String.format("/%s/CONTROLLER/MESSAGES",
-            this.testName)).size();
+        return this.curatorFramework.getChildren().forPath(this.path).size();
       } catch (Exception e) {
         throw new RuntimeException(e);
       }
@@ -178,8 +198,15 @@ public class YarnSecurityManagerTest {
     Assert.assertEquals(this.curatorFramework.checkExists().forPath(
         String.format("/%s/CONTROLLER/MESSAGES", YarnSecurityManagerTest.class.getSimpleName())).getVersion(), 0);
     AssertWithBackoff.create().logger(log).timeoutMs(20000)
-      .assertEquals(new GetControllerMessageNumFunc(YarnSecurityManagerTest.class.getSimpleName(),
+      .assertEquals(new GetHelixMessageNumFunc(YarnSecurityManagerTest.class.getSimpleName(), InstanceType.CONTROLLER, "",
           this.curatorFramework), 1, "1 controller message queued");
+
+    this._yarnAppYarnAppSecurityManagerWithKeytabs.sendTokenFileUpdatedMessage(InstanceType.PARTICIPANT, HELIX_TEST_INSTANCE_PARTICIPANT);
+    Assert.assertEquals(this.curatorFramework.checkExists().forPath(
+        String.format("/%s/INSTANCES/%s/MESSAGES", YarnSecurityManagerTest.class.getSimpleName(), HELIX_TEST_INSTANCE_PARTICIPANT)).getVersion(), 0);
+    AssertWithBackoff.create().logger(log).timeoutMs(20000)
+        .assertEquals(new GetHelixMessageNumFunc(YarnSecurityManagerTest.class.getSimpleName(), InstanceType.PARTICIPANT, HELIX_TEST_INSTANCE_PARTICIPANT,
+            this.curatorFramework), 1, "1 controller message queued");
   }
 
   @Test(dependsOnMethods = "testWriteDelegationTokenToFile")
@@ -196,6 +223,9 @@ public class YarnSecurityManagerTest {
       if (this.helixManager.isConnected()) {
         this.helixManager.disconnect();
       }
+      if (this.helixManagerParticipant.isConnected()) {
+        this.helixManagerParticipant.disconnect();
+      }
       this.localFs.delete(this.baseDir, true);
     } catch (Throwable t) {
       throw this.closer.rethrow(t);