You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2021/05/06 23:45:22 UTC
[gobblin] branch master updated: [GOBBLIN-1439] Handle flow configs
that cause compilation errors from being added and blocking scheduler
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 7018408 [GOBBLIN-1439] Handle flow configs that cause compilation errors from being added and blocking scheduler
7018408 is described below
commit 701840858f1323a1f009a44bc70c1e14e7464935
Author: William Lo <lo...@gmail.com>
AuthorDate: Thu May 6 16:45:15 2021 -0700
[GOBBLIN-1439] Handle flow configs that cause compilation errors from being added and blocking scheduler
Closes #3273 from Will-Lo/handle-flowspec-
compilation-errors
---
.../gobblin/runtime/spec_catalog/FlowCatalog.java | 16 +++++-
.../runtime/spec_catalog/FlowCatalogTest.java | 49 +++++++++++++++-
.../scheduler/GobblinServiceJobScheduler.java | 17 +++---
.../scheduler/GobblinServiceJobSchedulerTest.java | 66 +++++++++++++++++++++-
4 files changed, 134 insertions(+), 14 deletions(-)
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
index c32f69b..be7b205 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
@@ -17,6 +17,7 @@
package org.apache.gobblin.runtime.spec_catalog;
+import com.google.common.base.Throwables;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
@@ -28,6 +29,7 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
+import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.commons.lang3.reflect.ConstructorUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -119,6 +121,7 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
}
this.aliasResolver = new ClassAliasResolver<>(SpecStore.class);
+
try {
Config newConfig = config;
if (config.hasPath(FLOWSPEC_STORE_DIR_KEY)) {
@@ -338,8 +341,16 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
if (triggerListener) {
AddSpecResponse<CallbacksDispatcher.CallbackResults<SpecCatalogListener, AddSpecResponse>> response = this.listeners.onAddSpec(flowSpec);
- for (Map.Entry<SpecCatalogListener, CallbackResult<AddSpecResponse>> entry: response.getValue().getSuccesses().entrySet()) {
- responseMap.put(entry.getKey().getName(), entry.getValue().getResult());
+ // If flow fails callbacks, need to prevent adding the flow to the catalog
+ if (!response.getValue().getFailures().isEmpty()) {
+ for (Map.Entry<SpecCatalogListener, CallbackResult<AddSpecResponse>> entry: response.getValue().getFailures().entrySet()) {
+ flowSpec.getCompilationErrors().add(Throwables.getStackTraceAsString(entry.getValue().getError()));
+ responseMap.put(entry.getKey().getName(), new AddSpecResponse(entry.getValue().getResult()));
+ }
+ } else {
+ for (Map.Entry<SpecCatalogListener, CallbackResult<AddSpecResponse>> entry : response.getValue().getSuccesses().entrySet()) {
+ responseMap.put(entry.getKey().getName(), entry.getValue().getResult());
+ }
}
}
@@ -369,7 +380,6 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
public static boolean isCompileSuccessful(Map<String, AddSpecResponse> responseMap) {
AddSpecResponse<String> addSpecResponse = responseMap.getOrDefault(
ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS, new AddSpecResponse<>(""));
-
return isCompileSuccessful(addSpecResponse.getValue());
}
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalogTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalogTest.java
index 3d73281..91fee52 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalogTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalogTest.java
@@ -24,17 +24,22 @@ import com.typesafe.config.Config;
import java.io.File;
import java.net.URI;
import java.util.Collection;
+import java.util.Map;
import java.util.Properties;
import org.apache.commons.io.FileUtils;
+import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecCatalogListener;
import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
+import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.PathUtils;
import org.apache.hadoop.fs.Path;
+import static org.mockito.Mockito.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -52,10 +57,12 @@ public class FlowCatalogTest {
private static final String SPEC_GROUP_DIR = "/tmp/flowTestSpecStore/flowTestGroupDir";
private static final String SPEC_DESCRIPTION = "Test Flow Spec";
private static final String SPEC_VERSION = FlowSpec.Builder.DEFAULT_VERSION;
+ private static final String UNCOMPILABLE_FLOW = "uncompilableFlow";
private ServiceBasedAppLauncher serviceLauncher;
private FlowCatalog flowCatalog;
private FlowSpec flowSpec;
+ private SpecCatalogListener mockListener;
@BeforeClass
public void setup() throws Exception {
@@ -71,6 +78,13 @@ public class FlowCatalogTest {
this.flowCatalog = new FlowCatalog(ConfigUtils.propertiesToConfig(properties),
Optional.of(logger));
+
+ this.mockListener = mock(SpecCatalogListener.class);
+ when(mockListener.getName()).thenReturn(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS);
+ when(mockListener.onAddSpec(any())).thenReturn(new AddSpecResponse(""));
+
+ this.flowCatalog.addListener(mockListener);
+
this.serviceLauncher.addService(flowCatalog);
// Start Catalog
@@ -84,14 +98,19 @@ public class FlowCatalogTest {
* Create FlowSpec with default URI
*/
public static FlowSpec initFlowSpec(String specStore) {
- return initFlowSpec(specStore, computeFlowSpecURI());
+ return initFlowSpec(specStore, computeFlowSpecURI(), "flowName");
+ }
+
+ public static FlowSpec initFlowSpec(String specStore, URI uri){
+ return initFlowSpec(specStore, uri, "flowName");
}
/**
* Create FLowSpec with specified URI and SpecStore location.
*/
- public static FlowSpec initFlowSpec(String specStore, URI uri){
+ public static FlowSpec initFlowSpec(String specStore, URI uri, String flowName){
Properties properties = new Properties();
+ properties.put(ConfigurationKeys.FLOW_NAME_KEY, flowName);
properties.put("specStore.fs.dir", specStore);
properties.put("specExecInstance.capabilities", "source:destination");
Config config = ConfigUtils.propertiesToConfig(properties);
@@ -174,6 +193,32 @@ public class FlowCatalogTest {
Assert.assertTrue(specs.size() == 0, "Spec store should be empty after deletion");
}
+ @Test (dependsOnMethods = "deleteFlowSpec")
+ public void testRejectBadFlow() {
+ Collection<Spec> specs = flowCatalog.getSpecs();
+ logger.info("[Before Create] Number of specs: " + specs.size());
+ int i=0;
+ for (Spec spec : specs) {
+ FlowSpec flowSpec = (FlowSpec) spec;
+ logger.info("[Before Create] Spec " + i++ + ": " + gson.toJson(flowSpec));
+ }
+ Assert.assertTrue(specs.size() == 0, "Spec store should be empty before addition");
+
+ // Create and add Spec
+ FlowSpec badSpec = initFlowSpec(SPEC_STORE_DIR, computeFlowSpecURI(), "badFlow");
+
+ // Assume that spec is rejected
+ when(this.mockListener.onAddSpec(any())).thenThrow(new RuntimeException("Could not compile flow"));
+ Map<String, AddSpecResponse> response = this.flowCatalog.put(badSpec);
+
+ // Spec should be rejected from being stored
+ specs = flowCatalog.getSpecs();
+ Assert.assertEquals(specs.size(), 0);
+
+ // Add compilation errors to spec so that it will print it back to user
+ Assert.assertEquals(badSpec.getCompilationErrors().size(), 1);
+ }
+
public static URI computeFlowSpecURI() {
// Make sure this is relative
URI uri = PathUtils.relativizePath(new Path(SPEC_GROUP_DIR), new Path(SPEC_STORE_PARENT_DIR)).toUri();
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index 7e644db..cc9e1e6 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -185,25 +185,26 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
Iterator<URI> drUris = this.flowCatalog.get().getSpecURISWithTag(DR_FILTER_TAG);
clearRunningFlowState(drUris);
}
-
} catch (IOException e) {
throw new RuntimeException("Failed to get the iterator of all Spec URIS", e);
}
- try {
- while (specUris.hasNext()) {
- Spec spec = this.flowCatalog.get().getSpecWrapper(specUris.next());
- //Disable FLOW_RUN_IMMEDIATELY on service startup or leadership change
+ while (specUris.hasNext()) {
+ Spec spec = this.flowCatalog.get().getSpecWrapper(specUris.next());
+ try {
+ // Disable FLOW_RUN_IMMEDIATELY on service startup or leadership change
if (spec instanceof FlowSpec) {
Spec modifiedSpec = disableFlowRunImmediatelyOnStart((FlowSpec) spec);
onAddSpec(modifiedSpec);
} else {
onAddSpec(spec);
}
+ } catch (Exception e) {
+ // If there is an uncaught error thrown during compilation, log it and continue adding flows
+ _log.error("Could not schedule spec {} from flowCatalog due to ", spec, e);
}
- } finally {
- this.flowCatalog.get().getMetrics().updateGetSpecTime(startTime);
}
+ this.flowCatalog.get().getMetrics().updateGetSpecTime(startTime);
}
/**
@@ -297,7 +298,7 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
boolean compileSuccess = FlowCatalog.isCompileSuccessful(response);
if (isExplain || !compileSuccess || !this.isActive) {
- // todo: in case of a scheudled job, we should also check if the job schedule is a valid cron schedule
+ // todo: in case of a scheduled job, we should also check if the job schedule is a valid cron schedule
// so it can be scheduled
_log.info("Ignoring the spec {}. isExplain: {}, compileSuccess: {}, master: {}",
addedSpec, isExplain, compileSuccess, this.isActive);
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
index a67b14f..b7dc76f 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
@@ -33,6 +33,7 @@ import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
import org.apache.gobblin.scheduler.SchedulerService;
+import org.apache.gobblin.service.modules.flow.MockedSpecCompiler;
import org.apache.gobblin.service.modules.flow.SpecCompiler;
import org.apache.gobblin.service.modules.orchestration.Orchestrator;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalogTest;
@@ -92,7 +93,7 @@ public class GobblinServiceJobSchedulerTest {
scheduler.setActive(true);
- AssertWithBackoff.create().timeoutMs(6000).maxSleepMs(2000).backoffFactor(2)
+ AssertWithBackoff.create().timeoutMs(20000).maxSleepMs(2000).backoffFactor(2)
.assertTrue(new Predicate<Void>() {
@Override
public boolean apply(Void input) {
@@ -129,6 +130,65 @@ public class GobblinServiceJobSchedulerTest {
Assert.assertEquals(modifiedConfig.getString(ConfigurationKeys.JOB_NAME_KEY), TEST_FLOW_NAME);
}
+ /**
+ * Test that flowSpecs that throw compilation errors do not block the scheduling of other flowSpecs
+ */
+ @Test
+ public void testJobSchedulerInitWithFailedSpec() throws Exception {
+ // Mock a FlowCatalog.
+ File specDir = Files.createTempDir();
+
+ Properties properties = new Properties();
+ properties.setProperty(FLOWSPEC_STORE_DIR_KEY, specDir.getAbsolutePath());
+ FlowCatalog flowCatalog = new FlowCatalog(ConfigUtils.propertiesToConfig(properties));
+ ServiceBasedAppLauncher serviceLauncher = new ServiceBasedAppLauncher(properties, "GaaSJobSchedulerTest");
+
+ serviceLauncher.addService(flowCatalog);
+ serviceLauncher.start();
+
+ FlowSpec flowSpec0 = FlowCatalogTest.initFlowSpec(specDir.getAbsolutePath(), URI.create("spec0"),
+ MockedSpecCompiler.UNCOMPILABLE_FLOW);
+ FlowSpec flowSpec1 = FlowCatalogTest.initFlowSpec(specDir.getAbsolutePath(), URI.create("spec1"));
+ FlowSpec flowSpec2 = FlowCatalogTest.initFlowSpec(specDir.getAbsolutePath(), URI.create("spec2"));
+
+ // Assume that the catalog can store corrupted flows
+ flowCatalog.put(flowSpec0, false);
+ // Ensure that these flows are scheduled
+ flowCatalog.put(flowSpec1, false);
+ flowCatalog.put(flowSpec2, false);
+
+ Assert.assertEquals(flowCatalog.getSpecs().size(), 3);
+
+ Orchestrator mockOrchestrator = Mockito.mock(Orchestrator.class);
+
+ // Mock a GaaS scheduler.
+ TestGobblinServiceJobScheduler scheduler = new TestGobblinServiceJobScheduler("testscheduler",
+ ConfigFactory.empty(), Optional.of(flowCatalog), null, mockOrchestrator, null);
+
+ SpecCompiler mockCompiler = Mockito.mock(SpecCompiler.class);
+ Mockito.when(mockOrchestrator.getSpecCompiler()).thenReturn(mockCompiler);
+ Mockito.doAnswer((Answer<Void>) a -> {
+ scheduler.isCompilerHealthy = true;
+ return null;
+ }).when(mockCompiler).awaitHealthy();
+
+ scheduler.setActive(true);
+
+ AssertWithBackoff.create().timeoutMs(20000).maxSleepMs(2000).backoffFactor(2)
+ .assertTrue(new Predicate<Void>() {
+ @Override
+ public boolean apply(Void input) {
+ Map<String, Spec> scheduledFlowSpecs = scheduler.scheduledFlowSpecs;
+ if (scheduledFlowSpecs != null && scheduledFlowSpecs.size() == 2) {
+ return scheduler.scheduledFlowSpecs.containsKey("spec1") &&
+ scheduler.scheduledFlowSpecs.containsKey("spec2");
+ } else {
+ return false;
+ }
+ }
+ }, "Waiting all flowSpecs to be scheduled");
+ }
+
class TestGobblinServiceJobScheduler extends GobblinServiceJobScheduler {
public boolean isCompilerHealthy = false;
@@ -143,6 +203,10 @@ public class GobblinServiceJobSchedulerTest {
*/
@Override
public AddSpecResponse onAddSpec(Spec addedSpec) {
+ String flowName = (String) ((FlowSpec) addedSpec).getConfigAsProperties().get(ConfigurationKeys.FLOW_NAME_KEY);
+ if (flowName.equals(MockedSpecCompiler.UNCOMPILABLE_FLOW)) {
+ throw new RuntimeException("Could not compile flow");
+ }
super.scheduledFlowSpecs.put(addedSpec.getUri().toString(), addedSpec);
// Check that compiler is healthy at time of scheduling flows
Assert.assertTrue(isCompilerHealthy);