You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2021/05/06 23:45:22 UTC

[gobblin] branch master updated: [GOBBLIN-1439] Handle flow configs that cause compilation errors from being added and blocking scheduler

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 7018408  [GOBBLIN-1439] Handle flow configs that cause compilation errors from being added and blocking scheduler
7018408 is described below

commit 701840858f1323a1f009a44bc70c1e14e7464935
Author: William Lo <lo...@gmail.com>
AuthorDate: Thu May 6 16:45:15 2021 -0700

    [GOBBLIN-1439] Handle flow configs that cause compilation errors from being added and blocking scheduler
    
    Closes #3273 from Will-Lo/handle-flowspec-
    compilation-errors
---
 .../gobblin/runtime/spec_catalog/FlowCatalog.java  | 16 +++++-
 .../runtime/spec_catalog/FlowCatalogTest.java      | 49 +++++++++++++++-
 .../scheduler/GobblinServiceJobScheduler.java      | 17 +++---
 .../scheduler/GobblinServiceJobSchedulerTest.java  | 66 +++++++++++++++++++++-
 4 files changed, 134 insertions(+), 14 deletions(-)

diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
index c32f69b..be7b205 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
@@ -17,6 +17,7 @@
 
 package org.apache.gobblin.runtime.spec_catalog;
 
+import com.google.common.base.Throwables;
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.net.URI;
@@ -28,6 +29,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
+import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.commons.lang3.reflect.ConstructorUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -119,6 +121,7 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
     }
 
     this.aliasResolver = new ClassAliasResolver<>(SpecStore.class);
+
     try {
       Config newConfig = config;
       if (config.hasPath(FLOWSPEC_STORE_DIR_KEY)) {
@@ -338,8 +341,16 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
 
     if (triggerListener) {
       AddSpecResponse<CallbacksDispatcher.CallbackResults<SpecCatalogListener, AddSpecResponse>> response = this.listeners.onAddSpec(flowSpec);
-      for (Map.Entry<SpecCatalogListener, CallbackResult<AddSpecResponse>> entry: response.getValue().getSuccesses().entrySet()) {
-        responseMap.put(entry.getKey().getName(), entry.getValue().getResult());
+      // If flow fails callbacks, need to prevent adding the flow to the catalog
+      if (!response.getValue().getFailures().isEmpty()) {
+        for (Map.Entry<SpecCatalogListener, CallbackResult<AddSpecResponse>> entry: response.getValue().getFailures().entrySet()) {
+          flowSpec.getCompilationErrors().add(Throwables.getStackTraceAsString(entry.getValue().getError()));
+          responseMap.put(entry.getKey().getName(), new AddSpecResponse(entry.getValue().getResult()));
+        }
+      } else {
+        for (Map.Entry<SpecCatalogListener, CallbackResult<AddSpecResponse>> entry : response.getValue().getSuccesses().entrySet()) {
+          responseMap.put(entry.getKey().getName(), entry.getValue().getResult());
+        }
       }
     }
 
@@ -369,7 +380,6 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
   public static boolean isCompileSuccessful(Map<String, AddSpecResponse> responseMap) {
     AddSpecResponse<String> addSpecResponse = responseMap.getOrDefault(
         ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS, new AddSpecResponse<>(""));
-
     return isCompileSuccessful(addSpecResponse.getValue());
   }
 
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalogTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalogTest.java
index 3d73281..91fee52 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalogTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalogTest.java
@@ -24,17 +24,22 @@ import com.typesafe.config.Config;
 import java.io.File;
 import java.net.URI;
 import java.util.Collection;
+import java.util.Map;
 import java.util.Properties;
 import org.apache.commons.io.FileUtils;
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecCatalogListener;
 import org.apache.gobblin.runtime.api.SpecExecutor;
 import org.apache.gobblin.runtime.api.SpecNotFoundException;
 import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
 import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
+import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.PathUtils;
 import org.apache.hadoop.fs.Path;
+import static org.mockito.Mockito.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -52,10 +57,12 @@ public class FlowCatalogTest {
   private static final String SPEC_GROUP_DIR = "/tmp/flowTestSpecStore/flowTestGroupDir";
   private static final String SPEC_DESCRIPTION = "Test Flow Spec";
   private static final String SPEC_VERSION = FlowSpec.Builder.DEFAULT_VERSION;
+  private static final String UNCOMPILABLE_FLOW = "uncompilableFlow";
 
   private ServiceBasedAppLauncher serviceLauncher;
   private FlowCatalog flowCatalog;
   private FlowSpec flowSpec;
+  private SpecCatalogListener mockListener;
 
   @BeforeClass
   public void setup() throws Exception {
@@ -71,6 +78,13 @@ public class FlowCatalogTest {
 
     this.flowCatalog = new FlowCatalog(ConfigUtils.propertiesToConfig(properties),
         Optional.of(logger));
+
+    this.mockListener = mock(SpecCatalogListener.class);
+    when(mockListener.getName()).thenReturn(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS);
+    when(mockListener.onAddSpec(any())).thenReturn(new AddSpecResponse(""));
+
+    this.flowCatalog.addListener(mockListener);
+
     this.serviceLauncher.addService(flowCatalog);
 
     // Start Catalog
@@ -84,14 +98,19 @@ public class FlowCatalogTest {
    * Create FlowSpec with default URI
    */
   public static FlowSpec initFlowSpec(String specStore) {
-    return initFlowSpec(specStore, computeFlowSpecURI());
+    return initFlowSpec(specStore, computeFlowSpecURI(), "flowName");
+  }
+
+  public static FlowSpec initFlowSpec(String specStore, URI uri){
+    return initFlowSpec(specStore, uri, "flowName");
   }
 
   /**
    * Create FLowSpec with specified URI and SpecStore location.
    */
-  public static FlowSpec initFlowSpec(String specStore, URI uri){
+  public static FlowSpec initFlowSpec(String specStore, URI uri, String flowName){
     Properties properties = new Properties();
+    properties.put(ConfigurationKeys.FLOW_NAME_KEY, flowName);
     properties.put("specStore.fs.dir", specStore);
     properties.put("specExecInstance.capabilities", "source:destination");
     Config config = ConfigUtils.propertiesToConfig(properties);
@@ -174,6 +193,32 @@ public class FlowCatalogTest {
     Assert.assertTrue(specs.size() == 0, "Spec store should be empty after deletion");
   }
 
+  @Test (dependsOnMethods = "deleteFlowSpec")
+  public void testRejectBadFlow() {
+    Collection<Spec> specs = flowCatalog.getSpecs();
+    logger.info("[Before Create] Number of specs: " + specs.size());
+    int i=0;
+    for (Spec spec : specs) {
+      FlowSpec flowSpec = (FlowSpec) spec;
+      logger.info("[Before Create] Spec " + i++ + ": " + gson.toJson(flowSpec));
+    }
+    Assert.assertTrue(specs.size() == 0, "Spec store should be empty before addition");
+
+    // Create and add Spec
+    FlowSpec badSpec = initFlowSpec(SPEC_STORE_DIR, computeFlowSpecURI(), "badFlow");
+
+    // Assume that spec is rejected
+    when(this.mockListener.onAddSpec(any())).thenThrow(new RuntimeException("Could not compile flow"));
+    Map<String, AddSpecResponse> response = this.flowCatalog.put(badSpec);
+
+    // Spec should be rejected from being stored
+    specs = flowCatalog.getSpecs();
+    Assert.assertEquals(specs.size(), 0);
+
+    // Add compilation errors to spec so that it will print it back to user
+    Assert.assertEquals(badSpec.getCompilationErrors().size(), 1);
+  }
+
   public static URI computeFlowSpecURI() {
     // Make sure this is relative
     URI uri = PathUtils.relativizePath(new Path(SPEC_GROUP_DIR), new Path(SPEC_STORE_PARENT_DIR)).toUri();
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index 7e644db..cc9e1e6 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -185,25 +185,26 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
         Iterator<URI> drUris = this.flowCatalog.get().getSpecURISWithTag(DR_FILTER_TAG);
         clearRunningFlowState(drUris);
       }
-
     } catch (IOException e) {
       throw new RuntimeException("Failed to get the iterator of all Spec URIS", e);
     }
 
-    try {
-      while (specUris.hasNext()) {
-        Spec spec = this.flowCatalog.get().getSpecWrapper(specUris.next());
-        //Disable FLOW_RUN_IMMEDIATELY on service startup or leadership change
+    while (specUris.hasNext()) {
+      Spec spec = this.flowCatalog.get().getSpecWrapper(specUris.next());
+      try {
+        // Disable FLOW_RUN_IMMEDIATELY on service startup or leadership change
         if (spec instanceof FlowSpec) {
           Spec modifiedSpec = disableFlowRunImmediatelyOnStart((FlowSpec) spec);
           onAddSpec(modifiedSpec);
         } else {
           onAddSpec(spec);
         }
+      } catch (Exception e) {
+        // If there is an uncaught error thrown during compilation, log it and continue adding flows
+        _log.error("Could not schedule spec {} from flowCatalog due to ", spec, e);
       }
-    } finally {
-      this.flowCatalog.get().getMetrics().updateGetSpecTime(startTime);
     }
+    this.flowCatalog.get().getMetrics().updateGetSpecTime(startTime);
   }
 
   /**
@@ -297,7 +298,7 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
     boolean compileSuccess = FlowCatalog.isCompileSuccessful(response);
 
     if (isExplain || !compileSuccess || !this.isActive) {
-      // todo: in case of a scheudled job, we should also check if the job schedule is a valid cron schedule
+      // todo: in case of a scheduled job, we should also check if the job schedule is a valid cron schedule
       //  so it can be scheduled
       _log.info("Ignoring the spec {}. isExplain: {}, compileSuccess: {}, master: {}",
           addedSpec, isExplain, compileSuccess, this.isActive);
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
index a67b14f..b7dc76f 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
@@ -33,6 +33,7 @@ import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
 import org.apache.gobblin.scheduler.SchedulerService;
+import org.apache.gobblin.service.modules.flow.MockedSpecCompiler;
 import org.apache.gobblin.service.modules.flow.SpecCompiler;
 import org.apache.gobblin.service.modules.orchestration.Orchestrator;
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalogTest;
@@ -92,7 +93,7 @@ public class GobblinServiceJobSchedulerTest {
 
     scheduler.setActive(true);
 
-    AssertWithBackoff.create().timeoutMs(6000).maxSleepMs(2000).backoffFactor(2)
+    AssertWithBackoff.create().timeoutMs(20000).maxSleepMs(2000).backoffFactor(2)
         .assertTrue(new Predicate<Void>() {
           @Override
           public boolean apply(Void input) {
@@ -129,6 +130,65 @@ public class GobblinServiceJobSchedulerTest {
     Assert.assertEquals(modifiedConfig.getString(ConfigurationKeys.JOB_NAME_KEY), TEST_FLOW_NAME);
   }
 
+  /**
+   * Test that flowSpecs that throw compilation errors do not block the scheduling of other flowSpecs
+   */
+  @Test
+  public void testJobSchedulerInitWithFailedSpec() throws Exception {
+    // Mock a FlowCatalog.
+    File specDir = Files.createTempDir();
+
+    Properties properties = new Properties();
+    properties.setProperty(FLOWSPEC_STORE_DIR_KEY, specDir.getAbsolutePath());
+    FlowCatalog flowCatalog = new FlowCatalog(ConfigUtils.propertiesToConfig(properties));
+    ServiceBasedAppLauncher serviceLauncher = new ServiceBasedAppLauncher(properties, "GaaSJobSchedulerTest");
+
+    serviceLauncher.addService(flowCatalog);
+    serviceLauncher.start();
+
+    FlowSpec flowSpec0 = FlowCatalogTest.initFlowSpec(specDir.getAbsolutePath(), URI.create("spec0"),
+        MockedSpecCompiler.UNCOMPILABLE_FLOW);
+    FlowSpec flowSpec1 = FlowCatalogTest.initFlowSpec(specDir.getAbsolutePath(), URI.create("spec1"));
+    FlowSpec flowSpec2 = FlowCatalogTest.initFlowSpec(specDir.getAbsolutePath(), URI.create("spec2"));
+
+    // Assume that the catalog can store corrupted flows
+    flowCatalog.put(flowSpec0, false);
+    // Ensure that these flows are scheduled
+    flowCatalog.put(flowSpec1, false);
+    flowCatalog.put(flowSpec2, false);
+
+    Assert.assertEquals(flowCatalog.getSpecs().size(), 3);
+
+    Orchestrator mockOrchestrator = Mockito.mock(Orchestrator.class);
+
+    // Mock a GaaS scheduler.
+    TestGobblinServiceJobScheduler scheduler = new TestGobblinServiceJobScheduler("testscheduler",
+        ConfigFactory.empty(), Optional.of(flowCatalog), null, mockOrchestrator, null);
+
+    SpecCompiler mockCompiler = Mockito.mock(SpecCompiler.class);
+    Mockito.when(mockOrchestrator.getSpecCompiler()).thenReturn(mockCompiler);
+    Mockito.doAnswer((Answer<Void>) a -> {
+      scheduler.isCompilerHealthy = true;
+      return null;
+    }).when(mockCompiler).awaitHealthy();
+
+    scheduler.setActive(true);
+
+    AssertWithBackoff.create().timeoutMs(20000).maxSleepMs(2000).backoffFactor(2)
+        .assertTrue(new Predicate<Void>() {
+          @Override
+          public boolean apply(Void input) {
+            Map<String, Spec> scheduledFlowSpecs = scheduler.scheduledFlowSpecs;
+            if (scheduledFlowSpecs != null && scheduledFlowSpecs.size() == 2) {
+              return scheduler.scheduledFlowSpecs.containsKey("spec1") &&
+                  scheduler.scheduledFlowSpecs.containsKey("spec2");
+            } else {
+              return false;
+            }
+          }
+        }, "Waiting all flowSpecs to be scheduled");
+  }
+
   class TestGobblinServiceJobScheduler extends GobblinServiceJobScheduler {
     public boolean isCompilerHealthy = false;
 
@@ -143,6 +203,10 @@ public class GobblinServiceJobSchedulerTest {
      */
     @Override
     public AddSpecResponse onAddSpec(Spec addedSpec) {
+      String flowName = (String) ((FlowSpec) addedSpec).getConfigAsProperties().get(ConfigurationKeys.FLOW_NAME_KEY);
+      if (flowName.equals(MockedSpecCompiler.UNCOMPILABLE_FLOW)) {
+        throw new RuntimeException("Could not compile flow");
+      }
       super.scheduledFlowSpecs.put(addedSpec.getUri().toString(), addedSpec);
       // Check that compiler is healthy at time of scheduling flows
       Assert.assertTrue(isCompilerHealthy);