You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2021/06/04 22:48:58 UTC
[gobblin] branch master updated: [GOBBLIN-1453] Improve error reporting on failed flow compilations and fix bugs wher…
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 381b659 [GOBBLIN-1453] Improve error reporting on failed flow compilations and fix bugs wher…
381b659 is described below
commit 381b659d3da7a241d6f038696396e6bba690120e
Author: William Lo <lo...@gmail.com>
AuthorDate: Fri Jun 4 15:46:57 2021 -0700
[GOBBLIN-1453] Improve error reporting on failed flow compilations and fix bugs wher…
Closes #3291 from Will-Lo/modify-flow-compilation-
error-reporting
---
.../org/apache/gobblin/service/FlowConfigTest.java | 10 ++++++-
.../apache/gobblin/service/FlowConfigV2Test.java | 10 ++++++-
.../gobblin/runtime/spec_catalog/FlowCatalog.java | 19 ++++---------
.../runtime/spec_catalog/FlowCatalogTest.java | 24 ++++++++++++++--
.../service/modules/flow/MultiHopFlowCompiler.java | 33 ++++++++++++++--------
.../flowgraph/pathfinder/AbstractPathFinder.java | 2 +-
.../scheduler/GobblinServiceJobScheduler.java | 6 ++--
.../service/modules/core/GitConfigMonitorTest.java | 14 +++++++++
.../modules/flow/MultiHopFlowCompilerTest.java | 9 +++---
.../modules/orchestration/OrchestratorTest.java | 13 +++++++++
.../scheduler/GobblinServiceJobSchedulerTest.java | 25 ++++++++++++----
11 files changed, 118 insertions(+), 47 deletions(-)
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java
index 2da8bdf..8876725 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java
@@ -24,6 +24,8 @@ import java.util.ArrayList;
import java.util.Map;
import org.apache.commons.io.FileUtils;
+import org.apache.gobblin.runtime.api.SpecCatalogListener;
+import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -50,6 +52,9 @@ import org.apache.gobblin.restli.EmbeddedRestliServer;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.runtime.spec_store.FSSpecStore;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+
@Test(groups = { "gobblin.service" }, singleThreaded = true)
public class FlowConfigTest {
@@ -78,7 +83,10 @@ public class FlowConfigTest {
Config config = configBuilder.build();
final FlowCatalog flowCatalog = new FlowCatalog(config);
-
+ final SpecCatalogListener mockListener = mock(SpecCatalogListener.class);
+ when(mockListener.getName()).thenReturn(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS);
+ when(mockListener.onAddSpec(any())).thenReturn(new AddSpecResponse(""));
+ flowCatalog.addListener(mockListener);
flowCatalog.startAsync();
flowCatalog.awaitRunning();
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
index 9f645cc..0710a3e 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
@@ -23,6 +23,8 @@ import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
+import org.apache.gobblin.runtime.api.SpecCatalogListener;
+import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
import org.mortbay.jetty.HttpStatus;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
@@ -56,6 +58,9 @@ import org.apache.gobblin.restli.EmbeddedRestliServer;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.runtime.spec_store.FSSpecStore;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+
@Test(groups = { "gobblin.service" }, singleThreaded = true)
public class FlowConfigV2Test {
@@ -93,7 +98,10 @@ public class FlowConfigV2Test {
Config config = configBuilder.build();
final FlowCatalog flowCatalog = new FlowCatalog(config);
-
+ final SpecCatalogListener mockListener = mock(SpecCatalogListener.class);
+ when(mockListener.getName()).thenReturn(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS);
+ when(mockListener.onAddSpec(any())).thenReturn(new AddSpecResponse(""));
+ flowCatalog.addListener(mockListener);
flowCatalog.startAsync();
flowCatalog.awaitRunning();
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
index be7b205..147a86b 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
@@ -17,7 +17,6 @@
package org.apache.gobblin.runtime.spec_catalog;
-import com.google.common.base.Throwables;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
@@ -29,7 +28,6 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
-import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.commons.lang3.reflect.ConstructorUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,7 +40,6 @@ import com.typesafe.config.ConfigException;
import javax.annotation.Nonnull;
import lombok.Getter;
-
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.Tag;
@@ -341,16 +338,9 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
if (triggerListener) {
AddSpecResponse<CallbacksDispatcher.CallbackResults<SpecCatalogListener, AddSpecResponse>> response = this.listeners.onAddSpec(flowSpec);
- // If flow fails callbacks, need to prevent adding the flow to the catalog
- if (!response.getValue().getFailures().isEmpty()) {
- for (Map.Entry<SpecCatalogListener, CallbackResult<AddSpecResponse>> entry: response.getValue().getFailures().entrySet()) {
- flowSpec.getCompilationErrors().add(Throwables.getStackTraceAsString(entry.getValue().getError()));
- responseMap.put(entry.getKey().getName(), new AddSpecResponse(entry.getValue().getResult()));
- }
- } else {
- for (Map.Entry<SpecCatalogListener, CallbackResult<AddSpecResponse>> entry : response.getValue().getSuccesses().entrySet()) {
- responseMap.put(entry.getKey().getName(), entry.getValue().getResult());
- }
+ // If flow fails compilation, the result will have a non-empty string with the error
+ for (Map.Entry<SpecCatalogListener, CallbackResult<AddSpecResponse>> entry : response.getValue().getSuccesses().entrySet()) {
+ responseMap.put(entry.getKey().getName(), entry.getValue().getResult());
}
}
@@ -378,8 +368,9 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
}
public static boolean isCompileSuccessful(Map<String, AddSpecResponse> responseMap) {
+ // If we cannot get the response from the scheduler, assume that the flow failed compilation
AddSpecResponse<String> addSpecResponse = responseMap.getOrDefault(
- ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS, new AddSpecResponse<>(""));
+ ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS, new AddSpecResponse<>(null));
return isCompileSuccessful(addSpecResponse.getValue());
}
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalogTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalogTest.java
index 91fee52..d69d4e5 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalogTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalogTest.java
@@ -208,15 +208,33 @@ public class FlowCatalogTest {
FlowSpec badSpec = initFlowSpec(SPEC_STORE_DIR, computeFlowSpecURI(), "badFlow");
// Assume that spec is rejected
- when(this.mockListener.onAddSpec(any())).thenThrow(new RuntimeException("Could not compile flow"));
+ when(this.mockListener.onAddSpec(any())).thenReturn(new AddSpecResponse(null));
Map<String, AddSpecResponse> response = this.flowCatalog.put(badSpec);
// Spec should be rejected from being stored
specs = flowCatalog.getSpecs();
Assert.assertEquals(specs.size(), 0);
+ }
+
+ @Test (dependsOnMethods = "testRejectBadFlow")
+ public void testRejectMissingListener() {
+ flowCatalog.removeListener(this.mockListener);
+ Collection<Spec> specs = flowCatalog.getSpecs();
+ logger.info("[Before Create] Number of specs: " + specs.size());
+ int i=0;
+ for (Spec spec : specs) {
+ FlowSpec flowSpec = (FlowSpec) spec;
+ logger.info("[Before Create] Spec " + i++ + ": " + gson.toJson(flowSpec));
+ }
+ Assert.assertTrue(specs.size() == 0, "Spec store should be empty before addition");
+
+ // Create and add Spec
- // Add compilation errors to spec so that it will print it back to user
- Assert.assertEquals(badSpec.getCompilationErrors().size(), 1);
+ Map<String, AddSpecResponse> response = this.flowCatalog.put(flowSpec);
+
+ // Spec should be rejected from being stored
+ specs = flowCatalog.getSpecs();
+ Assert.assertEquals(specs.size(), 0);
}
public static URI computeFlowSpecURI() {
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
index 2d0874e..e343e5e 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
@@ -38,6 +38,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ServiceManager;
import com.typesafe.config.Config;
@@ -201,13 +202,19 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
for (FlowSpec datasetFlowSpec : flowSpecs) {
for (DataNode destNode : destNodes) {
long authStartTime = System.nanoTime();
- boolean authorized = this.dataMovementAuthorizer.isMovementAuthorized(flowSpec, sourceNode, destNode);
- Instrumented.updateTimer(dataAuthorizationTimer, System.nanoTime() - authStartTime, TimeUnit.NANOSECONDS);
- if (!authorized) {
- String message = String.format("Data movement is not authorized for flow: %s, source: %s, destination: %s",
- flowSpec.getUri().toString(), source, destination);
- log.error(message);
- datasetFlowSpec.getCompilationErrors().add(message);
+ try {
+ boolean authorized = this.dataMovementAuthorizer.isMovementAuthorized(flowSpec, sourceNode, destNode);
+ Instrumented.updateTimer(dataAuthorizationTimer, System.nanoTime() - authStartTime, TimeUnit.NANOSECONDS);
+ if (!authorized) {
+ String message = String.format("Data movement is not authorized for flow: %s, source: %s, destination: %s",
+ flowSpec.getUri().toString(), source, destination);
+ log.error(message);
+ datasetFlowSpec.getCompilationErrors().add(message);
+ return null;
+ }
+ } catch (Exception e) {
+ Instrumented.markMeter(flowCompilationFailedMeter);
+ datasetFlowSpec.getCompilationErrors().add(Throwables.getStackTraceAsString(e));
return null;
}
}
@@ -222,14 +229,16 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
if (jobExecutionPlanDag.isEmpty()) {
Instrumented.markMeter(flowCompilationFailedMeter);
- log.info(String.format("No path found from source: %s and destination: %s", source, destination));
- return jobExecutionPlanDag;
+ String message = String.format("No path found from source: %s and destination: %s", source, destination);
+ log.info(message);
+ flowSpec.getCompilationErrors().add(message);
+ return null;
}
} catch (PathFinder.PathFinderException | SpecNotFoundException | JobTemplate.TemplateException | URISyntaxException | ReflectiveOperationException e) {
Instrumented.markMeter(flowCompilationFailedMeter);
- log.error(String
- .format("Exception encountered while compiling flow for source: %s and destination: %s", source, destination),
- e);
+ String message = String.format("Exception encountered while compiling flow for source: %s and destination: %s, %s", source, destination, Throwables.getStackTraceAsString(e));
+ log.error(message, e);
+ flowSpec.getCompilationErrors().add(message);
return null;
} finally {
this.rwLock.readLock().unlock();
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
index cc0a9a2..334b750 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
@@ -210,7 +210,7 @@ public abstract class AbstractPathFinder implements PathFinder {
try {
flowEdge.getFlowTemplate().tryResolving(mergedConfig, datasetDescriptorPair.getLeft(), datasetDescriptorPair.getRight());
} catch (JobTemplate.TemplateException | ConfigException | SpecNotFoundException e) {
- this.flowSpec.getCompilationErrors().add(e.toString());
+ this.flowSpec.getCompilationErrors().add("Error compiling edge " + flowEdge.toString() + ": " + e.toString());
continue;
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index cc9e1e6..53366ae 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -19,7 +19,6 @@ package org.apache.gobblin.service.modules.scheduler;
import java.io.IOException;
import java.net.URI;
-import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
@@ -264,7 +263,7 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
/**
*
* @param addedSpec spec to be added
- * @return add spec response
+ * @return add spec response, which contains <code>null</code> if there is an error
*/
@Override
public AddSpecResponse onAddSpec(Spec addedSpec) {
@@ -289,10 +288,9 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
// always try to compile the flow to verify if it is compilable
Dag<JobExecutionPlan> dag = this.orchestrator.getSpecCompiler().compileFlow(flowSpec);
+ // If dag is null then a compilation error has occurred
if (dag != null && !dag.isEmpty()) {
response = dag.toString();
- } else if (!flowSpec.getCompilationErrors().isEmpty()) {
- response = Arrays.toString(flowSpec.getCompilationErrors().toArray());
}
boolean compileSuccess = FlowCatalog.isCompileSuccessful(response);
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitConfigMonitorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitConfigMonitorTest.java
index 1912d15..3fddd4a 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitConfigMonitorTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitConfigMonitorTest.java
@@ -28,6 +28,9 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.SystemUtils;
+import org.apache.gobblin.runtime.api.SpecCatalogListener;
+import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
+import org.apache.gobblin.service.ServiceConfigKeys;
import org.eclipse.jgit.api.Git;
import org.eclipse.jgit.api.ResetCommand;
import org.eclipse.jgit.api.errors.GitAPIException;
@@ -55,6 +58,10 @@ import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+
+
public class GitConfigMonitorTest {
private static final Logger logger = LoggerFactory.getLogger(GitConfigMonitorTest.class);
private Repository remoteRepo;
@@ -73,6 +80,7 @@ public class GitConfigMonitorTest {
private RefSpec masterRefSpec = new RefSpec("master");
private FlowCatalog flowCatalog;
+ private SpecCatalogListener mockListener;
private Config config;
private GitConfigMonitor gitConfigMonitor;
@@ -101,6 +109,12 @@ public class GitConfigMonitorTest {
.build();
this.flowCatalog = new FlowCatalog(config);
+
+ this.mockListener = mock(SpecCatalogListener.class);
+ when(mockListener.getName()).thenReturn(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS);
+ when(mockListener.onAddSpec(any())).thenReturn(new AddSpecResponse(""));
+
+ this.flowCatalog.addListener(mockListener);
this.flowCatalog.startAsync().awaitRunning();
this.gitConfigMonitor = new GitConfigMonitor(this.config, this.flowCatalog);
this.gitConfigMonitor.setActive(true);
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
index a99928e..14f2078 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
@@ -33,7 +33,6 @@ import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.SystemUtils;
import org.apache.hadoop.conf.Configuration;
@@ -528,7 +527,7 @@ public class MultiHopFlowCompilerTest {
Dag<JobExecutionPlan> jobDag = this.specCompiler.compileFlow(spec);
//Ensure no path to destination.
- Assert.assertTrue(jobDag.isEmpty());
+ Assert.assertEquals(jobDag, null);
}
@Test (dependsOnMethods = "testCompileFlowAfterSecondEdgeDeletion")
@@ -647,9 +646,9 @@ public class MultiHopFlowCompilerTest {
Dag<JobExecutionPlan> dag = specCompiler.compileFlow(spec);
- Assert.assertTrue(dag.isEmpty());
- Assert.assertEquals(spec.getCompilationErrors().size(), 1);
- Assert.assertTrue(spec.getCompilationErrors().iterator().next().contains(AzkabanProjectConfig.USER_TO_PROXY));
+ Assert.assertEquals(dag, null);
+ Assert.assertEquals(spec.getCompilationErrors().size(), 2);
+ spec.getCompilationErrors().stream().anyMatch(s -> s.contains(AzkabanProjectConfig.USER_TO_PROXY));
}
@Test (dependsOnMethods = "testUnresolvedFlow")
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
index bdacbb8..7087c1e 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
@@ -17,7 +17,9 @@
package org.apache.gobblin.service.modules.orchestration;
+import org.apache.gobblin.runtime.api.SpecCatalogListener;
import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
import java.io.File;
import java.net.URI;
@@ -27,6 +29,7 @@ import java.util.List;
import java.util.Properties;
import org.apache.commons.io.FileUtils;
+import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,6 +53,9 @@ import org.apache.gobblin.service.modules.flow.IdentityFlowToJobSpecCompiler;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.PathUtils;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+
public class OrchestratorTest {
private static final Logger logger = LoggerFactory.getLogger(TopologyCatalog.class);
@@ -67,6 +73,7 @@ public class OrchestratorTest {
private TopologySpec topologySpec;
private FlowCatalog flowCatalog;
+ private SpecCatalogListener mockListener;
private FlowSpec flowSpec;
private Orchestrator orchestrator;
@@ -92,6 +99,12 @@ public class OrchestratorTest {
this.flowCatalog = new FlowCatalog(ConfigUtils.propertiesToConfig(flowProperties),
Optional.of(logger));
+
+ this.mockListener = mock(SpecCatalogListener.class);
+ when(mockListener.getName()).thenReturn(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS);
+ when(mockListener.onAddSpec(any())).thenReturn(new AddSpecResponse(""));
+
+ this.flowCatalog.addListener(mockListener);
this.serviceLauncher.addService(flowCatalog);
this.orchestrator = new Orchestrator(ConfigUtils.propertiesToConfig(orchestratorProperties),
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
index b7dc76f..cc84632 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
@@ -28,11 +28,13 @@ import java.util.Properties;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecCatalogListener;
import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
import org.apache.gobblin.scheduler.SchedulerService;
+import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.flow.MockedSpecCompiler;
import org.apache.gobblin.service.modules.flow.SpecCompiler;
import org.apache.gobblin.service.modules.orchestration.Orchestrator;
@@ -46,6 +48,8 @@ import org.testng.Assert;
import org.testng.annotations.Test;
import static org.apache.gobblin.runtime.spec_catalog.FlowCatalog.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
public class GobblinServiceJobSchedulerTest {
@@ -65,6 +69,10 @@ public class GobblinServiceJobSchedulerTest {
Properties properties = new Properties();
properties.setProperty(FLOWSPEC_STORE_DIR_KEY, specDir.getAbsolutePath());
FlowCatalog flowCatalog = new FlowCatalog(ConfigUtils.propertiesToConfig(properties));
+ SpecCatalogListener mockListener = Mockito.mock(SpecCatalogListener.class);
+ when(mockListener.getName()).thenReturn(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS);
+ when(mockListener.onAddSpec(any())).thenReturn(new AddSpecResponse(""));
+ flowCatalog.addListener(mockListener);
ServiceBasedAppLauncher serviceLauncher = new ServiceBasedAppLauncher(properties, "GaaSJobSchedulerTest");
serviceLauncher.addService(flowCatalog);
@@ -73,8 +81,8 @@ public class GobblinServiceJobSchedulerTest {
FlowSpec flowSpec0 = FlowCatalogTest.initFlowSpec(specDir.getAbsolutePath(), URI.create("spec0"));
FlowSpec flowSpec1 = FlowCatalogTest.initFlowSpec(specDir.getAbsolutePath(), URI.create("spec1"));
- flowCatalog.put(flowSpec0, false);
- flowCatalog.put(flowSpec1, false);
+ flowCatalog.put(flowSpec0, true);
+ flowCatalog.put(flowSpec1, true);
Assert.assertEquals(flowCatalog.getSpecs().size(), 2);
@@ -143,6 +151,12 @@ public class GobblinServiceJobSchedulerTest {
FlowCatalog flowCatalog = new FlowCatalog(ConfigUtils.propertiesToConfig(properties));
ServiceBasedAppLauncher serviceLauncher = new ServiceBasedAppLauncher(properties, "GaaSJobSchedulerTest");
+ // Assume that the catalog can store corrupted flows
+ SpecCatalogListener mockListener = Mockito.mock(SpecCatalogListener.class);
+ when(mockListener.getName()).thenReturn(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS);
+ when(mockListener.onAddSpec(any())).thenReturn(new AddSpecResponse(""));
+ flowCatalog.addListener(mockListener);
+
serviceLauncher.addService(flowCatalog);
serviceLauncher.start();
@@ -151,11 +165,10 @@ public class GobblinServiceJobSchedulerTest {
FlowSpec flowSpec1 = FlowCatalogTest.initFlowSpec(specDir.getAbsolutePath(), URI.create("spec1"));
FlowSpec flowSpec2 = FlowCatalogTest.initFlowSpec(specDir.getAbsolutePath(), URI.create("spec2"));
- // Assume that the catalog can store corrupted flows
- flowCatalog.put(flowSpec0, false);
// Ensure that these flows are scheduled
- flowCatalog.put(flowSpec1, false);
- flowCatalog.put(flowSpec2, false);
+ flowCatalog.put(flowSpec0, true);
+ flowCatalog.put(flowSpec1, true);
+ flowCatalog.put(flowSpec2, true);
Assert.assertEquals(flowCatalog.getSpecs().size(), 3);