You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2019/06/10 17:05:43 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-798] Clean up workflows from Helix when the Gobblin applicat…
This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new af84c57 [GOBBLIN-798] Clean up workflows from Helix when the Gobblin applicat…
af84c57 is described below
commit af84c5703ee5e0528fd3aa9d5519375bb99ec8ae
Author: Hung Tran <hu...@linkedin.com>
AuthorDate: Mon Jun 10 10:05:37 2019 -0700
[GOBBLIN-798] Clean up workflows from Helix when the Gobblin applicat…
Closes #2665 from htran1/app_master_job_cleanup
---
.../gobblin/cluster/GobblinClusterManager.java | 6 +++
.../gobblin/cluster/GobblinHelixMultiManager.java | 20 ++++++--
.../gobblin/yarn/GobblinApplicationMaster.java | 2 +-
.../gobblin/yarn/GobblinYarnAppLauncherTest.java | 55 ++++++++++++++++++++++
4 files changed, 77 insertions(+), 6 deletions(-)
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
index e43b241..bef556c 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
@@ -264,6 +264,12 @@ public class GobblinClusterManager implements ApplicationLauncher, StandardMetri
this.eventBus.register(this);
this.multiManager.connect();
+ // Standalone mode registers a handler to clean up on manager leadership change, so only clean up for non-standalone
+ // mode, such as YARN mode
+ if (!this.isStandaloneMode) {
+ this.multiManager.cleanUpJobs();
+ }
+
configureHelixQuotaBasedTaskScheduling();
if (this.isStandaloneMode) {
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMultiManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMultiManager.java
index 2ef3b2e..57c672b 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMultiManager.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMultiManager.java
@@ -339,11 +339,7 @@ public class GobblinHelixMultiManager implements StandardMetricsBridge {
if (!isLeader) {
log.info("New Helix Controller leader {}", this.managerClusterHelixManager.getInstanceName());
- cleanUpJobs(this.jobClusterHelixManager);
-
- if (this.taskDriverHelixManager.isPresent()) {
- cleanUpJobs(this.taskDriverHelixManager.get());
- }
+ cleanUpJobs();
for (LeadershipChangeAwareComponent c: this.leadershipChangeAwareComponents) {
c.becomeActive();
@@ -363,12 +359,26 @@ public class GobblinHelixMultiManager implements StandardMetricsBridge {
}
}
+ /**
+ * Delete jobs from the helix cluster
+ */
+ @VisibleForTesting
+ public void cleanUpJobs() {
+ cleanUpJobs(this.jobClusterHelixManager);
+
+ if (this.taskDriverHelixManager.isPresent()) {
+ cleanUpJobs(this.taskDriverHelixManager.get());
+ }
+ }
+
private void cleanUpJobs(HelixManager helixManager) {
// Clean up existing jobs
TaskDriver taskDriver = new TaskDriver(helixManager);
Map<String, WorkflowConfig> workflows = taskDriver.getWorkflows();
+ log.debug("cleanUpJobs workflow count {} workflows {}", workflows.size(), workflows.keySet());
+
boolean cleanupDistJobs = ConfigUtils.getBoolean(this.config,
GobblinClusterConfigurationKeys.CLEAN_ALL_DIST_JOBS,
GobblinClusterConfigurationKeys.DEFAULT_CLEAN_ALL_DIST_JOBS);
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
index f0a7b48..ccb5419 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
@@ -110,7 +110,7 @@ public class GobblinApplicationMaster extends GobblinClusterManager {
/**
* Build the {@link YarnService} for the Application Master.
*/
- private YarnService buildYarnService(Config config, String applicationName, String applicationId,
+ protected YarnService buildYarnService(Config config, String applicationName, String applicationId,
YarnConfiguration yarnConfiguration, FileSystem fs)
throws Exception {
return new YarnService(config, applicationName, applicationId, yarnConfiguration, fs, this.eventBus);
diff --git a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
index c9ec435..2c64f64 100644
--- a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
+++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.yarn;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableMap;
+import com.google.common.eventbus.EventBus;
import com.google.common.io.Closer;
import com.google.common.util.concurrent.Service;
import com.typesafe.config.Config;
@@ -43,6 +44,7 @@ import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.test.TestingServer;
import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
import org.apache.gobblin.cluster.GobblinHelixConstants;
+import org.apache.gobblin.cluster.GobblinHelixMultiManager;
import org.apache.gobblin.cluster.HelixMessageTestBase;
import org.apache.gobblin.cluster.HelixUtils;
import org.apache.gobblin.cluster.TestHelper;
@@ -52,6 +54,7 @@ import org.apache.gobblin.configuration.DynamicConfigGenerator;
import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
import org.apache.gobblin.testing.AssertWithBackoff;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -63,6 +66,7 @@ import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.model.Message;
+import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -70,6 +74,8 @@ import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import static org.mockito.Mockito.times;
+
/**
* Unit tests for {@link GobblinYarnAppLauncher}.
@@ -341,6 +347,25 @@ public class GobblinYarnAppLauncherTest implements HelixMessageTestBase {
}
/**
+ * Test that the job cleanup call is called
+ */
+ @Test
+ public void testJobCleanup() throws Exception {
+ ContainerId containerId = ContainerId.newInstance(
+ ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 0), 0), 0);
+ TestApplicationMaster
+ appMaster = Mockito.spy(new TestApplicationMaster("testApp", containerId, config,
+ new YarnConfiguration()));
+
+ GobblinHelixMultiManager mockMultiManager = Mockito.mock(GobblinHelixMultiManager.class);
+
+ appMaster.setMultiManager(mockMultiManager);
+ appMaster.start();
+
+ Mockito.verify(mockMultiManager, times(1)).cleanUpJobs();
+ }
+
+ /**
* An application master for accessing protected fields in {@link GobblinApplicationMaster}
* for testing.
*/
@@ -351,6 +376,16 @@ public class GobblinYarnAppLauncherTest implements HelixMessageTestBase {
super(applicationName, containerId, config, yarnConfiguration);
}
+ @Override
+ protected YarnService buildYarnService(Config config, String applicationName, String applicationId,
+ YarnConfiguration yarnConfiguration, FileSystem fs) throws Exception {
+
+ YarnService testYarnService = new TestYarnService(config, applicationName, applicationId, yarnConfiguration, fs,
+ new EventBus("GobblinYarnAppLauncherTest"));
+
+ return testYarnService;
+ }
+
public Config getConfig() {
return this.config;
}
@@ -358,6 +393,10 @@ public class GobblinYarnAppLauncherTest implements HelixMessageTestBase {
public ServiceBasedAppLauncher getAppLauncher() {
return this.applicationLauncher;
}
+
+ public void setMultiManager(GobblinHelixMultiManager multiManager) {
+ this.multiManager = multiManager;
+ }
}
/**
@@ -373,4 +412,20 @@ public class GobblinYarnAppLauncherTest implements HelixMessageTestBase {
return ConfigFactory.parseMap(ImmutableMap.of("dynamicKey1", "dynamicValue1"));
}
}
+
+ /**
+ * Test class for mocking out YarnService. Need to use this instead of Mockito because of final methods.
+ */
+ private static class TestYarnService extends YarnService {
+ public TestYarnService(Config config, String applicationName, String applicationId, YarnConfiguration yarnConfiguration,
+ FileSystem fs, EventBus eventBus) throws Exception {
+ super(config, applicationName, applicationId, yarnConfiguration, fs, eventBus);
+ }
+
+ @Override
+ protected void startUp()
+ throws Exception {
+ // do nothing
+ }
+ }
}