You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ap...@apache.org on 2021/08/10 01:00:29 UTC

[gobblin] branch master updated: [GOBBLIN-1507] Prevent orchestrator removals when unscheduling flows (#3353)

This is an automated email from the ASF dual-hosted git repository.

aplex pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a4cda4  [GOBBLIN-1507] Prevent orchestrator removals when unscheduling flows (#3353)
8a4cda4 is described below

commit 8a4cda47aaf954041c82e43b6a10f389d1e4a4db
Author: William Lo <lo...@gmail.com>
AuthorDate: Mon Aug 9 18:00:21 2021 -0700

    [GOBBLIN-1507] Prevent orchestrator removals when unscheduling flows (#3353)
    
    When demoted from leader, GaaS removes all flows from orchestrator. We need to ensure that fail over events are not disruptive or destructive to currently running flows, we only want the new leader node to handle scheduling responsibilities from that point forward.
---
 .../scheduler/GobblinServiceJobScheduler.java      | 72 ++++++++++++---------
 .../gobblin/service/GobblinServiceManagerTest.java |  2 +-
 .../scheduler/GobblinServiceJobSchedulerTest.java  | 75 ++++++++++++++++++++++
 3 files changed, 118 insertions(+), 31 deletions(-)

diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index d9b778e..8f6327e 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -17,6 +17,11 @@
 
 package org.apache.gobblin.service.modules.scheduler;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
 import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
@@ -24,34 +29,16 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.gobblin.metrics.ContextAwareMeter;
-import org.apache.helix.HelixManager;
-import org.quartz.DisallowConcurrentExecution;
-import org.quartz.InterruptableJob;
-import org.quartz.JobDataMap;
-import org.quartz.JobExecutionContext;
-import org.quartz.JobExecutionException;
-import org.quartz.UnableToInterruptJobException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.collect.Maps;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
 import javax.inject.Inject;
 import javax.inject.Named;
 import javax.inject.Singleton;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
-
+import org.apache.commons.lang.StringUtils;
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareMeter;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.ServiceMetricNames;
 import org.apache.gobblin.runtime.JobException;
@@ -74,6 +61,15 @@ import org.apache.gobblin.service.modules.utils.InjectionNames;
 import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.PropertiesUtils;
+import org.apache.helix.HelixManager;
+import org.quartz.DisallowConcurrentExecution;
+import org.quartz.InterruptableJob;
+import org.quartz.JobDataMap;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobExecutionException;
+import org.quartz.UnableToInterruptJobException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static org.apache.gobblin.service.ServiceConfigKeys.GOBBLIN_SERVICE_PREFIX;
 
@@ -176,7 +172,11 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
       // Since we are going to change status to isActive=false, unschedule all flows
       List<Spec> specs = new ArrayList<>(this.scheduledFlowSpecs.values());
       for (Spec spec : specs) {
-        onDeleteSpec(spec.getUri(), spec.getVersion());
+        try {
+          unscheduleSpec(spec.getUri(), spec.getVersion());
+        } catch (JobException e) {
+          _log.warn(String.format("Spec with URI: %s was not unscheduled during shutdown", spec.getUri()), e);
+        }
       }
       // Need to set active=false at the end; otherwise in the onDeleteSpec(), node will forward specs to active node, which is itself.
       this.isActive = isActive;
@@ -347,6 +347,25 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
     return new AddSpecResponse<>(response);
   }
 
+  /**
+   * Remove a flowSpec from schedule due to another leader being elected
+   * Unlike onDeleteSpec, we want to avoid deleting the flowSpec on the executor
+   * and we still want to unschedule if we cannot connect to zookeeper as the current node cannot be the master
+   * @param specURI
+   * @param specVersion
+   */
+  private void unscheduleSpec(URI specURI, String specVersion) throws JobException {
+    if (this.scheduledFlowSpecs.containsKey(specURI.toString())) {
+      _log.info("Unscheduling flowSpec " + specURI + "/" + specVersion);
+      this.scheduledFlowSpecs.remove(specURI.toString());
+      unscheduleJob(specURI.toString());
+    } else {
+      throw new JobException(String.format(
+          "Spec with URI: %s was not found in cache. May be it was cleaned, if not please clean it manually",
+          specURI));
+    }
+  }
+
   public void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion) {
     onDeleteSpec(deletedSpecURI, deletedSpecVersion, new Properties());
   }
@@ -364,15 +383,8 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
 
     try {
       Spec deletedSpec = this.scheduledFlowSpecs.get(deletedSpecURI.toString());
-      if (null != deletedSpec) {
-        this.orchestrator.remove(deletedSpec, headers);
-        this.scheduledFlowSpecs.remove(deletedSpecURI.toString());
-        unscheduleJob(deletedSpecURI.toString());
-      } else {
-        _log.warn(String.format(
-            "Spec with URI: %s was not found in cache. May be it was cleaned, if not please clean it manually",
-            deletedSpecURI));
-      }
+      unscheduleSpec(deletedSpecURI, deletedSpecVersion);
+      this.orchestrator.remove(deletedSpec, headers);
     } catch (JobException | IOException e) {
       _log.warn(String.format("Spec with URI: %s was not unscheduled cleaning", deletedSpecURI), e);
     }
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
index ef30050..82eb270 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
@@ -476,7 +476,7 @@ null, null, null, null);
     FlowId flowId = new FlowId().setFlowGroup(TEST_DUMMY_GROUP_NAME).setFlowName(TEST_DUMMY_FLOW_NAME);
 
     try {
-      this.flowConfigClient.getFlowConfig(flowId);
+      this.flowConfigClient.deleteFlowConfig(flowId);
     } catch (RestLiResponseException e) {
       Assert.assertEquals(e.getStatus(), HttpStatus.NOT_FOUND_404);
       return;
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
index cc84632..d2085a5 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
@@ -23,6 +23,7 @@ import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import java.io.File;
 import java.net.URI;
+import java.util.Collection;
 import java.util.Map;
 import java.util.Properties;
 import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -43,6 +44,7 @@ import org.apache.gobblin.testing.AssertWithBackoff;
 import org.apache.gobblin.util.ConfigUtils;
 
 import org.mockito.Mockito;
+import org.mockito.invocation.Invocation;
 import org.mockito.stubbing.Answer;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -202,6 +204,79 @@ public class GobblinServiceJobSchedulerTest {
         }, "Waiting all flowSpecs to be scheduled");
   }
 
+  /**
+   * Test that flowSpecs that throw compilation errors do not block the scheduling of other flowSpecs
+   */
+  @Test
+  public void testJobSchedulerUnschedule() throws Exception {
+    // Mock a FlowCatalog.
+    File specDir = Files.createTempDir();
+
+    Properties properties = new Properties();
+    properties.setProperty(FLOWSPEC_STORE_DIR_KEY, specDir.getAbsolutePath());
+    FlowCatalog flowCatalog = new FlowCatalog(ConfigUtils.propertiesToConfig(properties));
+    ServiceBasedAppLauncher serviceLauncher = new ServiceBasedAppLauncher(properties, "GaaSJobSchedulerTest");
+
+    // Assume that the catalog can store corrupted flows
+    SpecCatalogListener mockListener = Mockito.mock(SpecCatalogListener.class);
+    when(mockListener.getName()).thenReturn(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS);
+    when(mockListener.onAddSpec(any())).thenReturn(new AddSpecResponse(""));
+    flowCatalog.addListener(mockListener);
+
+    serviceLauncher.addService(flowCatalog);
+    serviceLauncher.start();
+
+    FlowSpec flowSpec0 = FlowCatalogTest.initFlowSpec(specDir.getAbsolutePath(), URI.create("spec0"),
+        MockedSpecCompiler.UNCOMPILABLE_FLOW);
+    FlowSpec flowSpec1 = FlowCatalogTest.initFlowSpec(specDir.getAbsolutePath(), URI.create("spec1"));
+    FlowSpec flowSpec2 = FlowCatalogTest.initFlowSpec(specDir.getAbsolutePath(), URI.create("spec2"));
+
+    // Ensure that these flows are scheduled
+    flowCatalog.put(flowSpec0, true);
+    flowCatalog.put(flowSpec1, true);
+    flowCatalog.put(flowSpec2, true);
+
+    Assert.assertEquals(flowCatalog.getSpecs().size(), 3);
+
+    Orchestrator mockOrchestrator = Mockito.mock(Orchestrator.class);
+
+    // Mock a GaaS scheduler.
+    TestGobblinServiceJobScheduler scheduler = new TestGobblinServiceJobScheduler("testscheduler",
+        ConfigFactory.empty(), Optional.of(flowCatalog), null, mockOrchestrator, null);
+
+    SpecCompiler mockCompiler = Mockito.mock(SpecCompiler.class);
+    Mockito.when(mockOrchestrator.getSpecCompiler()).thenReturn(mockCompiler);
+    Mockito.doAnswer((Answer<Void>) a -> {
+      scheduler.isCompilerHealthy = true;
+      return null;
+    }).when(mockCompiler).awaitHealthy();
+
+    scheduler.setActive(true);
+
+    AssertWithBackoff.create().timeoutMs(20000).maxSleepMs(2000).backoffFactor(2)
+        .assertTrue(new Predicate<Void>() {
+          @Override
+          public boolean apply(Void input) {
+            Map<String, Spec> scheduledFlowSpecs = scheduler.scheduledFlowSpecs;
+            if (scheduledFlowSpecs != null && scheduledFlowSpecs.size() == 2) {
+              return scheduler.scheduledFlowSpecs.containsKey("spec1") &&
+                  scheduler.scheduledFlowSpecs.containsKey("spec2");
+            } else {
+              return false;
+            }
+          }
+        }, "Waiting all flowSpecs to be scheduled");
+
+    // set scheduler to be inactive and unschedule flows
+    scheduler.setActive(false);
+    Collection<Invocation> invocations = Mockito.mockingDetails(mockOrchestrator).getInvocations();
+
+    for (Invocation invocation: invocations) {
+      // ensure that orchestrator is not calling remove
+      Assert.assertFalse(invocation.getMethod().getName().equals("remove"));
+    }
+  }
+
   class TestGobblinServiceJobScheduler extends GobblinServiceJobScheduler {
     public boolean isCompilerHealthy = false;