You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2019/06/10 17:05:43 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-798] Clean up workflows from Helix when the Gobblin applicat…

This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new af84c57  [GOBBLIN-798] Clean up workflows from Helix when the Gobblin applicat…
af84c57 is described below

commit af84c5703ee5e0528fd3aa9d5519375bb99ec8ae
Author: Hung Tran <hu...@linkedin.com>
AuthorDate: Mon Jun 10 10:05:37 2019 -0700

    [GOBBLIN-798] Clean up workflows from Helix when the Gobblin applicat…
    
    Closes #2665 from htran1/app_master_job_cleanup
---
 .../gobblin/cluster/GobblinClusterManager.java     |  6 +++
 .../gobblin/cluster/GobblinHelixMultiManager.java  | 20 ++++++--
 .../gobblin/yarn/GobblinApplicationMaster.java     |  2 +-
 .../gobblin/yarn/GobblinYarnAppLauncherTest.java   | 55 ++++++++++++++++++++++
 4 files changed, 77 insertions(+), 6 deletions(-)

diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
index e43b241..bef556c 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
@@ -264,6 +264,12 @@ public class GobblinClusterManager implements ApplicationLauncher, StandardMetri
     this.eventBus.register(this);
     this.multiManager.connect();
 
+    // Standalone mode registers a handler to clean up on manager leadership change, so only clean up for non-standalone
+    // mode, such as YARN mode
+    if (!this.isStandaloneMode) {
+      this.multiManager.cleanUpJobs();
+    }
+
     configureHelixQuotaBasedTaskScheduling();
 
     if (this.isStandaloneMode) {
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMultiManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMultiManager.java
index 2ef3b2e..57c672b 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMultiManager.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMultiManager.java
@@ -339,11 +339,7 @@ public class GobblinHelixMultiManager implements StandardMetricsBridge {
       if (!isLeader) {
         log.info("New Helix Controller leader {}", this.managerClusterHelixManager.getInstanceName());
 
-        cleanUpJobs(this.jobClusterHelixManager);
-
-        if (this.taskDriverHelixManager.isPresent()) {
-          cleanUpJobs(this.taskDriverHelixManager.get());
-        }
+        cleanUpJobs();
 
         for (LeadershipChangeAwareComponent c: this.leadershipChangeAwareComponents) {
           c.becomeActive();
@@ -363,12 +359,26 @@ public class GobblinHelixMultiManager implements StandardMetricsBridge {
     }
   }
 
+  /**
+   * Delete jobs from the helix cluster
+   */
+  @VisibleForTesting
+  public void cleanUpJobs() {
+    cleanUpJobs(this.jobClusterHelixManager);
+
+    if (this.taskDriverHelixManager.isPresent()) {
+      cleanUpJobs(this.taskDriverHelixManager.get());
+    }
+  }
+
   private void cleanUpJobs(HelixManager helixManager) {
     // Clean up existing jobs
     TaskDriver taskDriver = new TaskDriver(helixManager);
 
     Map<String, WorkflowConfig> workflows = taskDriver.getWorkflows();
 
+    log.debug("cleanUpJobs workflow count {} workflows {}", workflows.size(), workflows.keySet());
+
     boolean cleanupDistJobs = ConfigUtils.getBoolean(this.config,
         GobblinClusterConfigurationKeys.CLEAN_ALL_DIST_JOBS,
         GobblinClusterConfigurationKeys.DEFAULT_CLEAN_ALL_DIST_JOBS);
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
index f0a7b48..ccb5419 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
@@ -110,7 +110,7 @@ public class GobblinApplicationMaster extends GobblinClusterManager {
   /**
    * Build the {@link YarnService} for the Application Master.
    */
-  private YarnService buildYarnService(Config config, String applicationName, String applicationId,
+  protected YarnService buildYarnService(Config config, String applicationName, String applicationId,
       YarnConfiguration yarnConfiguration, FileSystem fs)
       throws Exception {
     return new YarnService(config, applicationName, applicationId, yarnConfiguration, fs, this.eventBus);
diff --git a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
index c9ec435..2c64f64 100644
--- a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
+++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.yarn;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Predicate;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.eventbus.EventBus;
 import com.google.common.io.Closer;
 import com.google.common.util.concurrent.Service;
 import com.typesafe.config.Config;
@@ -43,6 +44,7 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.test.TestingServer;
 import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
 import org.apache.gobblin.cluster.GobblinHelixConstants;
+import org.apache.gobblin.cluster.GobblinHelixMultiManager;
 import org.apache.gobblin.cluster.HelixMessageTestBase;
 import org.apache.gobblin.cluster.HelixUtils;
 import org.apache.gobblin.cluster.TestHelper;
@@ -52,6 +54,7 @@ import org.apache.gobblin.configuration.DynamicConfigGenerator;
 import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
 import org.apache.gobblin.testing.AssertWithBackoff;
 
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -63,6 +66,7 @@ import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
 import org.apache.helix.model.Message;
+import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -70,6 +74,8 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import static org.mockito.Mockito.times;
+
 
 /**
  * Unit tests for {@link GobblinYarnAppLauncher}.
@@ -341,6 +347,25 @@ public class GobblinYarnAppLauncherTest implements HelixMessageTestBase {
   }
 
   /**
+   * Test that the job cleanup call is called
+   */
+  @Test
+  public void testJobCleanup() throws Exception {
+    ContainerId containerId = ContainerId.newInstance(
+        ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 0), 0), 0);
+    TestApplicationMaster
+        appMaster = Mockito.spy(new TestApplicationMaster("testApp", containerId, config,
+        new YarnConfiguration()));
+
+    GobblinHelixMultiManager mockMultiManager = Mockito.mock(GobblinHelixMultiManager.class);
+
+    appMaster.setMultiManager(mockMultiManager);
+    appMaster.start();
+
+    Mockito.verify(mockMultiManager, times(1)).cleanUpJobs();
+  }
+
+  /**
    * An application master for accessing protected fields in {@link GobblinApplicationMaster}
    * for testing.
    */
@@ -351,6 +376,16 @@ public class GobblinYarnAppLauncherTest implements HelixMessageTestBase {
       super(applicationName, containerId, config, yarnConfiguration);
     }
 
+    @Override
+    protected YarnService buildYarnService(Config config, String applicationName, String applicationId,
+        YarnConfiguration yarnConfiguration, FileSystem fs) throws Exception {
+
+      YarnService testYarnService = new TestYarnService(config, applicationName, applicationId, yarnConfiguration, fs,
+          new EventBus("GobblinYarnAppLauncherTest"));
+
+      return testYarnService;
+    }
+
     public Config getConfig() {
       return this.config;
     }
@@ -358,6 +393,10 @@ public class GobblinYarnAppLauncherTest implements HelixMessageTestBase {
     public ServiceBasedAppLauncher getAppLauncher() {
       return this.applicationLauncher;
     }
+
+    public void setMultiManager(GobblinHelixMultiManager multiManager) {
+      this.multiManager = multiManager;
+    }
   }
 
   /**
@@ -373,4 +412,20 @@ public class GobblinYarnAppLauncherTest implements HelixMessageTestBase {
       return ConfigFactory.parseMap(ImmutableMap.of("dynamicKey1", "dynamicValue1"));
     }
   }
+
+  /**
+   * Test class for mocking out YarnService. Need to use this instead of Mockito because of final methods.
+   */
+  private static class TestYarnService extends YarnService {
+    public TestYarnService(Config config, String applicationName, String applicationId, YarnConfiguration yarnConfiguration,
+        FileSystem fs, EventBus eventBus) throws Exception {
+      super(config, applicationName, applicationId, yarnConfiguration, fs, eventBus);
+    }
+
+    @Override
+    protected void startUp()
+        throws Exception {
+      // do nothing
+    }
+  }
 }