You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/01/14 23:15:15 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-1016] Allow Gobblin Application Master to join Helix cluster …
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new ff0b417 [GOBBLIN-1016] Allow Gobblin Application Master to join Helix cluster …
ff0b417 is described below
commit ff0b417082d9e92634bb4c2719c8f3decb17491e
Author: sv2000 <su...@gmail.com>
AuthorDate: Tue Jan 14 15:15:09 2020 -0800
[GOBBLIN-1016] Allow Gobblin Application Master to join Helix cluster …
Closes #2863 from sv2000/sendShutdown
---
.../cluster/GobblinHelixMessagingService.java | 8 +-
.../gobblin/cluster/GobblinHelixMultiManager.java | 2 +-
.../yarn/AbstractYarnAppSecurityManager.java | 27 ++++--
.../gobblin/yarn/GobblinYarnAppLauncher.java | 29 ++++--
.../yarn/YarnAppSecurityManagerWithKeytabs.java | 32 ++++---
.../gobblin/yarn/GobblinYarnAppLauncherTest.java | 103 ++++++++++++++++-----
.../gobblin/yarn/YarnSecurityManagerTest.java | 40 +++++++-
7 files changed, 184 insertions(+), 57 deletions(-)
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMessagingService.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMessagingService.java
index 4101f17..2132335 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMessagingService.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMessagingService.java
@@ -173,13 +173,17 @@ public class GobblinHelixMessagingService extends DefaultMessagingService {
* @param row row of currently persisted data
* @return true if it matches, false otherwise
*/
+
private boolean rowMatches(Criteria criteria, ZNRecordRow row) {
String instanceName = normalizePattern(criteria.getInstanceName());
String resourceName = normalizePattern(criteria.getResource());
String partitionName = normalizePattern(criteria.getPartition());
String partitionState = normalizePattern(criteria.getPartitionState());
- return stringMatches(instanceName, row.getMapSubKey()) && stringMatches(resourceName, row.getRecordId())
- && stringMatches(partitionName, row.getMapKey()) && stringMatches(partitionState, row.getMapValue());
+ return (stringMatches(instanceName, Strings.nullToEmpty(row.getMapSubKey())) ||
+ stringMatches(instanceName, Strings.nullToEmpty(row.getRecordId())))
+ && stringMatches(resourceName, Strings.nullToEmpty(row.getRecordId()))
+ && stringMatches(partitionName, Strings.nullToEmpty(row.getMapKey()))
+ && stringMatches(partitionState, Strings.nullToEmpty(row.getMapValue()));
}
/**
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMultiManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMultiManager.java
index cdc720a..7b25134 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMultiManager.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMultiManager.java
@@ -232,7 +232,7 @@ public class GobblinHelixMultiManager implements StandardMetricsBridge {
this.managerClusterHelixManager = buildHelixManager(this.config,
zkConnectionString,
GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY,
- isHelixClusterManaged ? InstanceType.ADMINISTRATOR : InstanceType.CONTROLLER);
+ isHelixClusterManaged ? InstanceType.PARTICIPANT : InstanceType.CONTROLLER);
this.jobClusterHelixManager = this.managerClusterHelixManager;
}
}
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/AbstractYarnAppSecurityManager.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/AbstractYarnAppSecurityManager.java
index 0b069cd..af31fa2 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/AbstractYarnAppSecurityManager.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/AbstractYarnAppSecurityManager.java
@@ -17,20 +17,13 @@
package org.apache.gobblin.yarn;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.base.Throwables;
-import com.google.common.util.concurrent.AbstractIdleService;
-import com.typesafe.config.Config;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
-import org.apache.gobblin.cluster.GobblinHelixMessagingService;
-import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.util.ExecutorsUtils;
+
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
@@ -44,6 +37,17 @@ import org.apache.helix.model.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Strings;
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.cluster.GobblinHelixMessagingService;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
+
/**
* <p>
* The super class for key management
@@ -167,8 +171,13 @@ public abstract class AbstractYarnAppSecurityManager extends AbstractIdleService
*/
@VisibleForTesting
protected void sendTokenFileUpdatedMessage(InstanceType instanceType) {
+ sendTokenFileUpdatedMessage(instanceType, "");
+ }
+
+ @VisibleForTesting
+ protected void sendTokenFileUpdatedMessage(InstanceType instanceType, String instanceName) {
Criteria criteria = new Criteria();
- criteria.setInstanceName("%");
+ criteria.setInstanceName(Strings.isNullOrEmpty(instanceName) ? "%" : instanceName);
criteria.setResource("%");
criteria.setPartition("%");
criteria.setPartitionState("%");
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
index 6aee52b..e0a66e8 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
@@ -92,8 +92,10 @@ import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+import org.apache.gobblin.cluster.GobblinClusterManager;
import org.apache.gobblin.cluster.GobblinClusterUtils;
import org.apache.gobblin.cluster.GobblinHelixConstants;
+import org.apache.gobblin.cluster.GobblinHelixMessagingService;
import org.apache.gobblin.cluster.HelixUtils;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.rest.JobExecutionInfoServer;
@@ -188,6 +190,8 @@ public class GobblinYarnAppLauncher {
private final Path sinkLogRootDir;
private final Closer closer = Closer.create();
+ private final String helixInstanceName;
+ private final GobblinHelixMessagingService messagingService;
// Yarn application ID
private volatile Optional<ApplicationId> applicationId = Optional.absent();
@@ -207,6 +211,7 @@ public class GobblinYarnAppLauncher {
private volatile boolean stopped = false;
private final boolean emailNotificationOnShutdown;
+ private final boolean isHelixClusterManaged;
private final int appMasterMemoryMbs;
private final int jvmMemoryOverheadMbs;
@@ -275,6 +280,14 @@ public class GobblinYarnAppLauncher {
GobblinYarnConfigurationKeys.DEFAULT_APP_VIEW_ACL);
this.containerTimezone = ConfigUtils.getString(this.config, GobblinYarnConfigurationKeys.GOBBLIN_YARN_CONTAINER_TIMEZONE,
GobblinYarnConfigurationKeys.DEFAULT_GOBBLIN_YARN_CONTAINER_TIMEZONE);
+
+ this.isHelixClusterManaged = ConfigUtils.getBoolean(this.config, GobblinClusterConfigurationKeys.IS_HELIX_CLUSTER_MANAGED,
+ GobblinClusterConfigurationKeys.DEFAULT_IS_HELIX_CLUSTER_MANAGED);
+ this.helixInstanceName = ConfigUtils.getString(config, GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_KEY,
+ GobblinClusterManager.class.getSimpleName());
+
+ this.messagingService = new GobblinHelixMessagingService(this.helixManager);
+
}
/**
@@ -286,9 +299,7 @@ public class GobblinYarnAppLauncher {
public void launch() throws IOException, YarnException {
this.eventBus.register(this);
- boolean isHelixClusterManaged = ConfigUtils.getBoolean(this.config, GobblinClusterConfigurationKeys.IS_HELIX_CLUSTER_MANAGED,
- GobblinClusterConfigurationKeys.DEFAULT_IS_HELIX_CLUSTER_MANAGED);
- if (isHelixClusterManaged) {
+ if (this.isHelixClusterManaged) {
LOGGER.info("Helix cluster is managed; skipping creation of Helix cluster");
} else {
String clusterName = this.config.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
@@ -797,10 +808,16 @@ public class GobblinYarnAppLauncher {
void sendShutdownRequest() {
Criteria criteria = new Criteria();
criteria.setInstanceName("%");
- criteria.setResource("%");
criteria.setPartition("%");
criteria.setPartitionState("%");
- criteria.setRecipientInstanceType(InstanceType.CONTROLLER);
+ criteria.setResource("%");
+ if (this.isHelixClusterManaged) {
+ //In the managed mode, the Gobblin Yarn Application Master connects to the Helix cluster in the Participant role.
+ criteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
+ criteria.setInstanceName(this.helixInstanceName);
+ } else {
+ criteria.setRecipientInstanceType(InstanceType.CONTROLLER);
+ }
criteria.setSessionSpecific(true);
Message shutdownRequest = new Message(GobblinHelixConstants.SHUTDOWN_MESSAGE_TYPE,
@@ -809,7 +826,7 @@ public class GobblinYarnAppLauncher {
shutdownRequest.setMsgState(Message.MessageState.NEW);
shutdownRequest.setTgtSessionId("*");
- int messagesSent = this.helixManager.getMessagingService().send(criteria, shutdownRequest);
+ int messagesSent = this.messagingService.send(criteria, shutdownRequest);
if (messagesSent == 0) {
LOGGER.error(String.format("Failed to send the %s message to the controller", shutdownRequest.getMsgSubType()));
}
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAppSecurityManagerWithKeytabs.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAppSecurityManagerWithKeytabs.java
index 3ee6107..5f234d2 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAppSecurityManagerWithKeytabs.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAppSecurityManagerWithKeytabs.java
@@ -25,18 +25,18 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-
import org.apache.helix.HelixManager;
import org.apache.helix.InstanceType;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Strings;
-
import com.typesafe.config.Config;
+import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+import org.apache.gobblin.cluster.GobblinClusterManager;
+import org.apache.gobblin.util.ConfigUtils;
+
/**
* A class for managing Kerberos login and token renewing on the client side that has access to
@@ -62,6 +62,7 @@ import com.typesafe.config.Config;
*/
public class YarnAppSecurityManagerWithKeytabs extends AbstractYarnAppSecurityManager {
+ private final String helixInstanceName;
private UserGroupInformation loginUser;
private Optional<ScheduledFuture<?>> scheduledTokenRenewTask = Optional.absent();
@@ -69,11 +70,16 @@ public class YarnAppSecurityManagerWithKeytabs extends AbstractYarnAppSecurityMa
// sent to the controller and the participants as they may not be up running yet. The first login
// happens after this class starts up so the token gets regularly refreshed before the next login.
private volatile boolean firstLogin = true;
+ private final boolean isHelixClusterManaged;
public YarnAppSecurityManagerWithKeytabs(Config config, HelixManager helixManager, FileSystem fs, Path tokenFilePath)
throws IOException {
super(config, helixManager, fs, tokenFilePath);
this.loginUser = UserGroupInformation.getLoginUser();
+ this.isHelixClusterManaged = ConfigUtils.getBoolean(config, GobblinClusterConfigurationKeys.IS_HELIX_CLUSTER_MANAGED,
+ GobblinClusterConfigurationKeys.DEFAULT_IS_HELIX_CLUSTER_MANAGED);
+ this.helixInstanceName = ConfigUtils.getString(config, GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_KEY,
+ GobblinClusterManager.class.getSimpleName());
}
/**
@@ -84,9 +90,12 @@ public class YarnAppSecurityManagerWithKeytabs extends AbstractYarnAppSecurityMa
writeDelegationTokenToFile();
if (!this.firstLogin) {
- // Send a message to the controller and all the participants if this is not the first login
- sendTokenFileUpdatedMessage(InstanceType.CONTROLLER);
- sendTokenFileUpdatedMessage(InstanceType.PARTICIPANT);
+ // Send a message to the controller (when the cluster is not managed)
+ // and all the participants if this is not the first login
+ if (!this.isHelixClusterManaged) {
+ sendTokenFileUpdatedMessage(InstanceType.CONTROLLER);
+ }
+ sendTokenFileUpdatedMessage(InstanceType.PARTICIPANT, this.helixInstanceName);
}
}
@@ -129,10 +138,11 @@ public class YarnAppSecurityManagerWithKeytabs extends AbstractYarnAppSecurityMa
writeDelegationTokenToFile();
if (!this.firstLogin) {
- // Send a message to the controller and all the participants
- sendTokenFileUpdatedMessage(InstanceType.CONTROLLER);
- sendTokenFileUpdatedMessage(InstanceType.PARTICIPANT);
+ // Send a message to the controller (when the cluster is not managed) and all the participants
+ if (!this.isHelixClusterManaged) {
+ sendTokenFileUpdatedMessage(InstanceType.CONTROLLER);
+ }
+ sendTokenFileUpdatedMessage(InstanceType.PARTICIPANT, this.helixInstanceName);
}
}
-
}
diff --git a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
index 2c64f64..9c37748 100644
--- a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
+++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
@@ -17,15 +17,6 @@
package org.apache.gobblin.yarn;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Predicate;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.eventbus.EventBus;
-import com.google.common.io.Closer;
-import com.google.common.util.concurrent.Service;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigValueFactory;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
@@ -42,18 +33,6 @@ import java.util.concurrent.TimeoutException;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.test.TestingServer;
-import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
-import org.apache.gobblin.cluster.GobblinHelixConstants;
-import org.apache.gobblin.cluster.GobblinHelixMultiManager;
-import org.apache.gobblin.cluster.HelixMessageTestBase;
-import org.apache.gobblin.cluster.HelixUtils;
-import org.apache.gobblin.cluster.TestHelper;
-import org.apache.gobblin.cluster.TestShutdownMessageHandlerFactory;
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.configuration.DynamicConfigGenerator;
-import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
-import org.apache.gobblin.testing.AssertWithBackoff;
-
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -74,6 +53,28 @@ import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.eventbus.EventBus;
+import com.google.common.io.Closer;
+import com.google.common.util.concurrent.Service;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+import org.apache.gobblin.cluster.GobblinHelixConstants;
+import org.apache.gobblin.cluster.GobblinHelixMultiManager;
+import org.apache.gobblin.cluster.HelixMessageTestBase;
+import org.apache.gobblin.cluster.HelixUtils;
+import org.apache.gobblin.cluster.TestHelper;
+import org.apache.gobblin.cluster.TestShutdownMessageHandlerFactory;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.DynamicConfigGenerator;
+import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
+import org.apache.gobblin.testing.AssertWithBackoff;
+
import static org.mockito.Mockito.times;
@@ -92,6 +93,8 @@ import static org.mockito.Mockito.times;
*/
@Test(groups = { "gobblin.yarn" }, singleThreaded=true)
public class GobblinYarnAppLauncherTest implements HelixMessageTestBase {
+ private static final Object MANAGED_HELIX_CLUSTER_NAME = "GobblinYarnAppLauncherTestManagedHelix";
+ public static final String TEST_HELIX_INSTANCE_NAME_MANAGED = HelixUtils.getHelixInstanceName("TestInstance", 1);
public static final String DYNAMIC_CONF_PATH = "dynamic.conf";
public static final String YARN_SITE_XML_PATH = "yarn-site.xml";
@@ -102,10 +105,13 @@ public class GobblinYarnAppLauncherTest implements HelixMessageTestBase {
private CuratorFramework curatorFramework;
private Config config;
+ private Config configManagedHelix;
private HelixManager helixManager;
+ private HelixManager helixManagerManagedHelix;
private GobblinYarnAppLauncher gobblinYarnAppLauncher;
+ private GobblinYarnAppLauncher gobblinYarnAppLauncherManagedHelix;
private ApplicationId applicationId;
private final Closer closer = Closer.create();
@@ -185,6 +191,20 @@ public class GobblinYarnAppLauncherTest implements HelixMessageTestBase {
InstanceType.CONTROLLER, zkConnectionString);
this.gobblinYarnAppLauncher = new GobblinYarnAppLauncher(this.config, clusterConf);
+
+ this.configManagedHelix = ConfigFactory.parseURL(url)
+ .withValue("gobblin.cluster.zk.connection.string",
+ ConfigValueFactory.fromAnyRef(testingZKServer.getConnectString()))
+ .withValue(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY, ConfigValueFactory.fromAnyRef(MANAGED_HELIX_CLUSTER_NAME))
+ .withValue(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_KEY, ConfigValueFactory.fromAnyRef(TEST_HELIX_INSTANCE_NAME_MANAGED))
+ .withValue(GobblinClusterConfigurationKeys.IS_HELIX_CLUSTER_MANAGED, ConfigValueFactory.fromAnyRef("true"))
+ .resolve();
+
+ this.helixManagerManagedHelix = HelixManagerFactory.getZKHelixManager(
+ this.configManagedHelix.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY), TEST_HELIX_INSTANCE_NAME_MANAGED,
+ InstanceType.PARTICIPANT, zkConnectionString);
+
+ this.gobblinYarnAppLauncherManagedHelix = new GobblinYarnAppLauncher(this.configManagedHelix, clusterConf);
}
@Test
@@ -208,6 +228,18 @@ public class GobblinYarnAppLauncherTest implements HelixMessageTestBase {
this.curatorFramework.checkExists()
.forPath(String.format("/%s/CONTROLLER", GobblinYarnAppLauncherTest.class.getSimpleName())).getVersion(),
0);
+
+ //Create managed Helix cluster and test it is created successfully
+ HelixUtils.createGobblinHelixCluster(
+ this.configManagedHelix.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY),
+ this.configManagedHelix.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY));
+
+ Assert.assertEquals(this.curatorFramework.checkExists()
+ .forPath(String.format("/%s/CONTROLLER", MANAGED_HELIX_CLUSTER_NAME)).getVersion(), 0);
+ Assert.assertEquals(
+ this.curatorFramework.checkExists()
+ .forPath(String.format("/%s/CONTROLLER", MANAGED_HELIX_CLUSTER_NAME)).getVersion(),
+ 0);
}
/**
@@ -265,8 +297,8 @@ public class GobblinYarnAppLauncherTest implements HelixMessageTestBase {
Assert.assertEquals(this.curatorFramework.checkExists()
.forPath(String.format("/%s/CONTROLLER/MESSAGES", GobblinYarnAppLauncherTest.class.getSimpleName()))
.getVersion(), 0);
- YarnSecurityManagerTest.GetControllerMessageNumFunc getCtrlMessageNum =
- new YarnSecurityManagerTest.GetControllerMessageNumFunc(GobblinYarnAppLauncherTest.class.getSimpleName(),
+ YarnSecurityManagerTest.GetHelixMessageNumFunc getCtrlMessageNum =
+ new YarnSecurityManagerTest.GetHelixMessageNumFunc(GobblinYarnAppLauncherTest.class.getSimpleName(), InstanceType.CONTROLLER, "",
this.curatorFramework);
AssertWithBackoff assertWithBackoff =
AssertWithBackoff.create().logger(LoggerFactory.getLogger("testSendShutdownRequest")).timeoutMs(20000);
@@ -274,6 +306,27 @@ public class GobblinYarnAppLauncherTest implements HelixMessageTestBase {
// Give Helix sometime to handle the message
assertWithBackoff.assertEquals(getCtrlMessageNum, 0, "all controller messages processed");
+
+ this.helixManagerManagedHelix.connect();
+ this.helixManagerManagedHelix.getMessagingService().registerMessageHandlerFactory(GobblinHelixConstants.SHUTDOWN_MESSAGE_TYPE,
+ new TestShutdownMessageHandlerFactory(this));
+
+ this.gobblinYarnAppLauncherManagedHelix.connectHelixManager();
+ this.gobblinYarnAppLauncherManagedHelix.sendShutdownRequest();
+
+ Assert.assertEquals(this.curatorFramework.checkExists()
+ .forPath(String.format("/%s/INSTANCES/%s/MESSAGES", this.configManagedHelix.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY), TEST_HELIX_INSTANCE_NAME_MANAGED))
+ .getVersion(), 0);
+ YarnSecurityManagerTest.GetHelixMessageNumFunc getInstanceMessageNum =
+ new YarnSecurityManagerTest.GetHelixMessageNumFunc(this.configManagedHelix.getString(
+ GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY),
+ InstanceType.PARTICIPANT, TEST_HELIX_INSTANCE_NAME_MANAGED, this.curatorFramework);
+ assertWithBackoff =
+ AssertWithBackoff.create().logger(LoggerFactory.getLogger("testSendShutdownRequest")).timeoutMs(20000);
+ assertWithBackoff.assertEquals(getInstanceMessageNum, 1, "1 controller message queued");
+
+ // Give Helix sometime to handle the message
+ assertWithBackoff.assertEquals(getInstanceMessageNum, 0, "all controller messages processed");
}
@AfterClass
@@ -288,6 +341,10 @@ public class GobblinYarnAppLauncherTest implements HelixMessageTestBase {
this.helixManager.disconnect();
}
+ if (this.helixManagerManagedHelix.isConnected()) {
+ this.helixManagerManagedHelix.disconnect();
+ }
+
this.gobblinYarnAppLauncher.disconnectHelixManager();
if (applicationId != null) {
diff --git a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnSecurityManagerTest.java b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnSecurityManagerTest.java
index 14a8d5b..489270a 100644
--- a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnSecurityManagerTest.java
+++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnSecurityManagerTest.java
@@ -77,10 +77,12 @@ import org.apache.gobblin.testing.AssertWithBackoff;
@Test(groups = { "gobblin.yarn" })
public class YarnSecurityManagerTest {
final Logger LOG = LoggerFactory.getLogger(YarnSecurityManagerTest.class);
+ private static final String HELIX_TEST_INSTANCE_PARTICIPANT = HelixUtils.getHelixInstanceName("TestInstance", 1);
private CuratorFramework curatorFramework;
private HelixManager helixManager;
+ private HelixManager helixManagerParticipant;
private Configuration configuration;
private FileSystem localFs;
@@ -121,6 +123,10 @@ public class YarnSecurityManagerTest {
helixClusterName, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.SPECTATOR, zkConnectingString);
this.helixManager.connect();
+ this.helixManagerParticipant = HelixManagerFactory.getZKHelixManager(
+ helixClusterName, HELIX_TEST_INSTANCE_PARTICIPANT, InstanceType.PARTICIPANT, zkConnectingString);
+ this.helixManagerParticipant.connect();
+
this.configuration = new Configuration();
this.localFs = Mockito.spy(FileSystem.getLocal(this.configuration));
@@ -150,20 +156,34 @@ public class YarnSecurityManagerTest {
}
- static class GetControllerMessageNumFunc implements Function<Void, Integer> {
+ static class GetHelixMessageNumFunc implements Function<Void, Integer> {
private final CuratorFramework curatorFramework;
private final String testName;
+ private final String instanceName;
+ private final InstanceType instanceType;
+ private final String path;
- public GetControllerMessageNumFunc(String testName, CuratorFramework curatorFramework) {
+ public GetHelixMessageNumFunc(String testName, InstanceType instanceType, String instanceName, CuratorFramework curatorFramework) {
this.curatorFramework = curatorFramework;
this.testName = testName;
+ this.instanceType = instanceType;
+ this.instanceName = instanceName;
+ switch (instanceType) {
+ case CONTROLLER:
+ this.path = String.format("/%s/CONTROLLER/MESSAGES", this.testName);
+ break;
+ case PARTICIPANT:
+ this.path = String.format("/%s/INSTANCES/%s/MESSAGES", this.testName, this.instanceName);
+ break;
+ default:
+ throw new RuntimeException("Invalid instance type " + instanceType.name());
+ }
}
@Override
public Integer apply(Void input) {
try {
- return this.curatorFramework.getChildren().forPath(String.format("/%s/CONTROLLER/MESSAGES",
- this.testName)).size();
+ return this.curatorFramework.getChildren().forPath(this.path).size();
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -178,8 +198,15 @@ public class YarnSecurityManagerTest {
Assert.assertEquals(this.curatorFramework.checkExists().forPath(
String.format("/%s/CONTROLLER/MESSAGES", YarnSecurityManagerTest.class.getSimpleName())).getVersion(), 0);
AssertWithBackoff.create().logger(log).timeoutMs(20000)
- .assertEquals(new GetControllerMessageNumFunc(YarnSecurityManagerTest.class.getSimpleName(),
+ .assertEquals(new GetHelixMessageNumFunc(YarnSecurityManagerTest.class.getSimpleName(), InstanceType.CONTROLLER, "",
this.curatorFramework), 1, "1 controller message queued");
+
+ this._yarnAppYarnAppSecurityManagerWithKeytabs.sendTokenFileUpdatedMessage(InstanceType.PARTICIPANT, HELIX_TEST_INSTANCE_PARTICIPANT);
+ Assert.assertEquals(this.curatorFramework.checkExists().forPath(
+ String.format("/%s/INSTANCES/%s/MESSAGES", YarnSecurityManagerTest.class.getSimpleName(), HELIX_TEST_INSTANCE_PARTICIPANT)).getVersion(), 0);
+ AssertWithBackoff.create().logger(log).timeoutMs(20000)
+ .assertEquals(new GetHelixMessageNumFunc(YarnSecurityManagerTest.class.getSimpleName(), InstanceType.PARTICIPANT, HELIX_TEST_INSTANCE_PARTICIPANT,
+ this.curatorFramework), 1, "1 controller message queued");
}
@Test(dependsOnMethods = "testWriteDelegationTokenToFile")
@@ -196,6 +223,9 @@ public class YarnSecurityManagerTest {
if (this.helixManager.isConnected()) {
this.helixManager.disconnect();
}
+ if (this.helixManagerParticipant.isConnected()) {
+ this.helixManagerParticipant.disconnect();
+ }
this.localFs.delete(this.baseDir, true);
} catch (Throwable t) {
throw this.closer.rethrow(t);