You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2021/06/04 22:48:58 UTC

[gobblin] branch master updated: [GOBBLIN-1453] Improve error reporting on failed flow compilations and fix bugs wher…

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 381b659  [GOBBLIN-1453] Improve error reporting on failed flow compilations and fix bugs wher…
381b659 is described below

commit 381b659d3da7a241d6f038696396e6bba690120e
Author: William Lo <lo...@gmail.com>
AuthorDate: Fri Jun 4 15:46:57 2021 -0700

    [GOBBLIN-1453] Improve error reporting on failed flow compilations and fix bugs wher…
    
    Closes #3291 from Will-Lo/modify-flow-compilation-
    error-reporting
---
 .../org/apache/gobblin/service/FlowConfigTest.java | 10 ++++++-
 .../apache/gobblin/service/FlowConfigV2Test.java   | 10 ++++++-
 .../gobblin/runtime/spec_catalog/FlowCatalog.java  | 19 ++++---------
 .../runtime/spec_catalog/FlowCatalogTest.java      | 24 ++++++++++++++--
 .../service/modules/flow/MultiHopFlowCompiler.java | 33 ++++++++++++++--------
 .../flowgraph/pathfinder/AbstractPathFinder.java   |  2 +-
 .../scheduler/GobblinServiceJobScheduler.java      |  6 ++--
 .../service/modules/core/GitConfigMonitorTest.java | 14 +++++++++
 .../modules/flow/MultiHopFlowCompilerTest.java     |  9 +++---
 .../modules/orchestration/OrchestratorTest.java    | 13 +++++++++
 .../scheduler/GobblinServiceJobSchedulerTest.java  | 25 ++++++++++++----
 11 files changed, 118 insertions(+), 47 deletions(-)

diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java
index 2da8bdf..8876725 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java
@@ -24,6 +24,8 @@ import java.util.ArrayList;
 import java.util.Map;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.gobblin.runtime.api.SpecCatalogListener;
+import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -50,6 +52,9 @@ import org.apache.gobblin.restli.EmbeddedRestliServer;
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 import org.apache.gobblin.runtime.spec_store.FSSpecStore;
 
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+
 
 @Test(groups = { "gobblin.service" }, singleThreaded = true)
 public class FlowConfigTest {
@@ -78,7 +83,10 @@ public class FlowConfigTest {
 
     Config config = configBuilder.build();
     final FlowCatalog flowCatalog = new FlowCatalog(config);
-
+    final SpecCatalogListener mockListener = mock(SpecCatalogListener.class);
+    when(mockListener.getName()).thenReturn(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS);
+    when(mockListener.onAddSpec(any())).thenReturn(new AddSpecResponse(""));
+    flowCatalog.addListener(mockListener);
     flowCatalog.startAsync();
     flowCatalog.awaitRunning();
 
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
index 9f645cc..0710a3e 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
@@ -23,6 +23,8 @@ import java.util.Map;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
+import org.apache.gobblin.runtime.api.SpecCatalogListener;
+import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
 import org.mortbay.jetty.HttpStatus;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -56,6 +58,9 @@ import org.apache.gobblin.restli.EmbeddedRestliServer;
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 import org.apache.gobblin.runtime.spec_store.FSSpecStore;
 
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+
 
 @Test(groups = { "gobblin.service" }, singleThreaded = true)
 public class FlowConfigV2Test {
@@ -93,7 +98,10 @@ public class FlowConfigV2Test {
 
     Config config = configBuilder.build();
     final FlowCatalog flowCatalog = new FlowCatalog(config);
-
+    final SpecCatalogListener mockListener = mock(SpecCatalogListener.class);
+    when(mockListener.getName()).thenReturn(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS);
+    when(mockListener.onAddSpec(any())).thenReturn(new AddSpecResponse(""));
+    flowCatalog.addListener(mockListener);
     flowCatalog.startAsync();
     flowCatalog.awaitRunning();
 
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
index be7b205..147a86b 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
@@ -17,7 +17,6 @@
 
 package org.apache.gobblin.runtime.spec_catalog;
 
-import com.google.common.base.Throwables;
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.net.URI;
@@ -29,7 +28,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
-import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.commons.lang3.reflect.ConstructorUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,7 +40,6 @@ import com.typesafe.config.ConfigException;
 
 import javax.annotation.Nonnull;
 import lombok.Getter;
-
 import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.Tag;
@@ -341,16 +338,9 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
 
     if (triggerListener) {
       AddSpecResponse<CallbacksDispatcher.CallbackResults<SpecCatalogListener, AddSpecResponse>> response = this.listeners.onAddSpec(flowSpec);
-      // If flow fails callbacks, need to prevent adding the flow to the catalog
-      if (!response.getValue().getFailures().isEmpty()) {
-        for (Map.Entry<SpecCatalogListener, CallbackResult<AddSpecResponse>> entry: response.getValue().getFailures().entrySet()) {
-          flowSpec.getCompilationErrors().add(Throwables.getStackTraceAsString(entry.getValue().getError()));
-          responseMap.put(entry.getKey().getName(), new AddSpecResponse(entry.getValue().getResult()));
-        }
-      } else {
-        for (Map.Entry<SpecCatalogListener, CallbackResult<AddSpecResponse>> entry : response.getValue().getSuccesses().entrySet()) {
-          responseMap.put(entry.getKey().getName(), entry.getValue().getResult());
-        }
+      // If flow fails compilation, the result will have a non-empty string with the error
+      for (Map.Entry<SpecCatalogListener, CallbackResult<AddSpecResponse>> entry : response.getValue().getSuccesses().entrySet()) {
+        responseMap.put(entry.getKey().getName(), entry.getValue().getResult());
       }
     }
 
@@ -378,8 +368,9 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
   }
 
   public static boolean isCompileSuccessful(Map<String, AddSpecResponse> responseMap) {
+    // If we cannot get the response from the scheduler, assume that the flow failed compilation
     AddSpecResponse<String> addSpecResponse = responseMap.getOrDefault(
-        ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS, new AddSpecResponse<>(""));
+        ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS, new AddSpecResponse<>(null));
     return isCompileSuccessful(addSpecResponse.getValue());
   }
 
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalogTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalogTest.java
index 91fee52..d69d4e5 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalogTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalogTest.java
@@ -208,15 +208,33 @@ public class FlowCatalogTest {
     FlowSpec badSpec = initFlowSpec(SPEC_STORE_DIR, computeFlowSpecURI(), "badFlow");
 
     // Assume that spec is rejected
-    when(this.mockListener.onAddSpec(any())).thenThrow(new RuntimeException("Could not compile flow"));
+    when(this.mockListener.onAddSpec(any())).thenReturn(new AddSpecResponse(null));
     Map<String, AddSpecResponse> response = this.flowCatalog.put(badSpec);
 
     // Spec should be rejected from being stored
     specs = flowCatalog.getSpecs();
     Assert.assertEquals(specs.size(), 0);
+  }
+
+  @Test (dependsOnMethods = "testRejectBadFlow")
+  public void testRejectMissingListener() {
+    flowCatalog.removeListener(this.mockListener);
+    Collection<Spec> specs = flowCatalog.getSpecs();
+    logger.info("[Before Create] Number of specs: " + specs.size());
+    int i=0;
+    for (Spec spec : specs) {
+      FlowSpec flowSpec = (FlowSpec) spec;
+      logger.info("[Before Create] Spec " + i++ + ": " + gson.toJson(flowSpec));
+    }
+    Assert.assertTrue(specs.size() == 0, "Spec store should be empty before addition");
+
+    // Create and add Spec
 
-    // Add compilation errors to spec so that it will print it back to user
-    Assert.assertEquals(badSpec.getCompilationErrors().size(), 1);
+    Map<String, AddSpecResponse> response = this.flowCatalog.put(flowSpec);
+
+    // Spec should be rejected from being stored
+    specs = flowCatalog.getSpecs();
+    Assert.assertEquals(specs.size(), 0);
   }
 
   public static URI computeFlowSpecURI() {
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
index 2d0874e..e343e5e 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
@@ -38,6 +38,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ServiceManager;
 import com.typesafe.config.Config;
@@ -201,13 +202,19 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
       for (FlowSpec datasetFlowSpec : flowSpecs) {
         for (DataNode destNode : destNodes) {
           long authStartTime = System.nanoTime();
-          boolean authorized = this.dataMovementAuthorizer.isMovementAuthorized(flowSpec, sourceNode, destNode);
-          Instrumented.updateTimer(dataAuthorizationTimer, System.nanoTime() - authStartTime, TimeUnit.NANOSECONDS);
-          if (!authorized) {
-            String message = String.format("Data movement is not authorized for flow: %s, source: %s, destination: %s",
-                flowSpec.getUri().toString(), source, destination);
-            log.error(message);
-            datasetFlowSpec.getCompilationErrors().add(message);
+          try {
+            boolean authorized = this.dataMovementAuthorizer.isMovementAuthorized(flowSpec, sourceNode, destNode);
+            Instrumented.updateTimer(dataAuthorizationTimer, System.nanoTime() - authStartTime, TimeUnit.NANOSECONDS);
+            if (!authorized) {
+              String message = String.format("Data movement is not authorized for flow: %s, source: %s, destination: %s",
+                  flowSpec.getUri().toString(), source, destination);
+              log.error(message);
+              datasetFlowSpec.getCompilationErrors().add(message);
+              return null;
+            }
+          } catch (Exception e) {
+            Instrumented.markMeter(flowCompilationFailedMeter);
+            datasetFlowSpec.getCompilationErrors().add(Throwables.getStackTraceAsString(e));
             return null;
           }
         }
@@ -222,14 +229,16 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
 
       if (jobExecutionPlanDag.isEmpty()) {
         Instrumented.markMeter(flowCompilationFailedMeter);
-        log.info(String.format("No path found from source: %s and destination: %s", source, destination));
-        return jobExecutionPlanDag;
+        String message = String.format("No path found from source: %s and destination: %s", source, destination);
+        log.info(message);
+        flowSpec.getCompilationErrors().add(message);
+        return null;
       }
     } catch (PathFinder.PathFinderException | SpecNotFoundException | JobTemplate.TemplateException | URISyntaxException | ReflectiveOperationException e) {
       Instrumented.markMeter(flowCompilationFailedMeter);
-      log.error(String
-              .format("Exception encountered while compiling flow for source: %s and destination: %s", source, destination),
-          e);
+      String message = String.format("Exception encountered while compiling flow for source: %s and destination: %s, %s", source, destination, Throwables.getStackTraceAsString(e));
+      log.error(message, e);
+      flowSpec.getCompilationErrors().add(message);
       return null;
     } finally {
       this.rwLock.readLock().unlock();
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
index cc0a9a2..334b750 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
@@ -210,7 +210,7 @@ public abstract class AbstractPathFinder implements PathFinder {
             try {
               flowEdge.getFlowTemplate().tryResolving(mergedConfig, datasetDescriptorPair.getLeft(), datasetDescriptorPair.getRight());
             } catch (JobTemplate.TemplateException | ConfigException | SpecNotFoundException e) {
-              this.flowSpec.getCompilationErrors().add(e.toString());
+              this.flowSpec.getCompilationErrors().add("Error compiling edge " + flowEdge.toString() + ": " + e.toString());
               continue;
             }
 
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index cc9e1e6..53366ae 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -19,7 +19,6 @@ package org.apache.gobblin.service.modules.scheduler;
 
 import java.io.IOException;
 import java.net.URI;
-import java.util.Arrays;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Properties;
@@ -264,7 +263,7 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
   /**
    *
    * @param addedSpec spec to be added
-   * @return add spec response
+   * @return add spec response, which contains <code>null</code> if there is an error
    */
   @Override
   public AddSpecResponse onAddSpec(Spec addedSpec) {
@@ -289,10 +288,9 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
 
     // always try to compile the flow to verify if it is compilable
     Dag<JobExecutionPlan> dag = this.orchestrator.getSpecCompiler().compileFlow(flowSpec);
+    // If dag is null then a compilation error has occurred
     if (dag != null && !dag.isEmpty()) {
       response = dag.toString();
-    } else if (!flowSpec.getCompilationErrors().isEmpty()) {
-      response = Arrays.toString(flowSpec.getCompilationErrors().toArray());
     }
 
     boolean compileSuccess = FlowCatalog.isCompileSuccessful(response);
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitConfigMonitorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitConfigMonitorTest.java
index 1912d15..3fddd4a 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitConfigMonitorTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitConfigMonitorTest.java
@@ -28,6 +28,9 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.SystemUtils;
+import org.apache.gobblin.runtime.api.SpecCatalogListener;
+import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
+import org.apache.gobblin.service.ServiceConfigKeys;
 import org.eclipse.jgit.api.Git;
 import org.eclipse.jgit.api.ResetCommand;
 import org.eclipse.jgit.api.errors.GitAPIException;
@@ -55,6 +58,10 @@ import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+
+
 public class GitConfigMonitorTest {
   private static final Logger logger = LoggerFactory.getLogger(GitConfigMonitorTest.class);
   private Repository remoteRepo;
@@ -73,6 +80,7 @@ public class GitConfigMonitorTest {
 
   private RefSpec masterRefSpec = new RefSpec("master");
   private FlowCatalog flowCatalog;
+  private SpecCatalogListener mockListener;
   private Config config;
   private GitConfigMonitor gitConfigMonitor;
 
@@ -101,6 +109,12 @@ public class GitConfigMonitorTest {
         .build();
 
     this.flowCatalog = new FlowCatalog(config);
+
+    this.mockListener = mock(SpecCatalogListener.class);
+    when(mockListener.getName()).thenReturn(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS);
+    when(mockListener.onAddSpec(any())).thenReturn(new AddSpecResponse(""));
+
+    this.flowCatalog.addListener(mockListener);
     this.flowCatalog.startAsync().awaitRunning();
     this.gitConfigMonitor = new GitConfigMonitor(this.config, this.flowCatalog);
     this.gitConfigMonitor.setActive(true);
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
index a99928e..14f2078 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
@@ -33,7 +33,6 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.SystemUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -528,7 +527,7 @@ public class MultiHopFlowCompilerTest {
     Dag<JobExecutionPlan> jobDag = this.specCompiler.compileFlow(spec);
 
     //Ensure no path to destination.
-    Assert.assertTrue(jobDag.isEmpty());
+    Assert.assertEquals(jobDag, null);
   }
 
   @Test (dependsOnMethods = "testCompileFlowAfterSecondEdgeDeletion")
@@ -647,9 +646,9 @@ public class MultiHopFlowCompilerTest {
 
     Dag<JobExecutionPlan> dag = specCompiler.compileFlow(spec);
 
-    Assert.assertTrue(dag.isEmpty());
-    Assert.assertEquals(spec.getCompilationErrors().size(), 1);
-    Assert.assertTrue(spec.getCompilationErrors().iterator().next().contains(AzkabanProjectConfig.USER_TO_PROXY));
+    Assert.assertEquals(dag, null);
+    Assert.assertEquals(spec.getCompilationErrors().size(), 2);
+    spec.getCompilationErrors().stream().anyMatch(s -> s.contains(AzkabanProjectConfig.USER_TO_PROXY));
   }
 
   @Test (dependsOnMethods = "testUnresolvedFlow")
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
index bdacbb8..7087c1e 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
@@ -17,7 +17,9 @@
 
 package org.apache.gobblin.service.modules.orchestration;
 
+import org.apache.gobblin.runtime.api.SpecCatalogListener;
 import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
 import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
 import java.io.File;
 import java.net.URI;
@@ -27,6 +29,7 @@ import java.util.List;
 import java.util.Properties;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,6 +53,9 @@ import org.apache.gobblin.service.modules.flow.IdentityFlowToJobSpecCompiler;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.PathUtils;
 
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+
 
 public class OrchestratorTest {
   private static final Logger logger = LoggerFactory.getLogger(TopologyCatalog.class);
@@ -67,6 +73,7 @@ public class OrchestratorTest {
   private TopologySpec topologySpec;
 
   private FlowCatalog flowCatalog;
+  private SpecCatalogListener mockListener;
   private FlowSpec flowSpec;
 
   private Orchestrator orchestrator;
@@ -92,6 +99,12 @@ public class OrchestratorTest {
 
     this.flowCatalog = new FlowCatalog(ConfigUtils.propertiesToConfig(flowProperties),
         Optional.of(logger));
+
+    this.mockListener = mock(SpecCatalogListener.class);
+    when(mockListener.getName()).thenReturn(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS);
+    when(mockListener.onAddSpec(any())).thenReturn(new AddSpecResponse(""));
+
+    this.flowCatalog.addListener(mockListener);
     this.serviceLauncher.addService(flowCatalog);
 
     this.orchestrator = new Orchestrator(ConfigUtils.propertiesToConfig(orchestratorProperties),
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
index b7dc76f..cc84632 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
@@ -28,11 +28,13 @@ import java.util.Properties;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecCatalogListener;
 import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
 import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
 import org.apache.gobblin.scheduler.SchedulerService;
+import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.service.modules.flow.MockedSpecCompiler;
 import org.apache.gobblin.service.modules.flow.SpecCompiler;
 import org.apache.gobblin.service.modules.orchestration.Orchestrator;
@@ -46,6 +48,8 @@ import org.testng.Assert;
 import org.testng.annotations.Test;
 
 import static org.apache.gobblin.runtime.spec_catalog.FlowCatalog.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
 
 
 public class GobblinServiceJobSchedulerTest {
@@ -65,6 +69,10 @@ public class GobblinServiceJobSchedulerTest {
     Properties properties = new Properties();
     properties.setProperty(FLOWSPEC_STORE_DIR_KEY, specDir.getAbsolutePath());
     FlowCatalog flowCatalog = new FlowCatalog(ConfigUtils.propertiesToConfig(properties));
+    SpecCatalogListener mockListener = Mockito.mock(SpecCatalogListener.class);
+    when(mockListener.getName()).thenReturn(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS);
+    when(mockListener.onAddSpec(any())).thenReturn(new AddSpecResponse(""));
+    flowCatalog.addListener(mockListener);
     ServiceBasedAppLauncher serviceLauncher = new ServiceBasedAppLauncher(properties, "GaaSJobSchedulerTest");
 
     serviceLauncher.addService(flowCatalog);
@@ -73,8 +81,8 @@ public class GobblinServiceJobSchedulerTest {
     FlowSpec flowSpec0 = FlowCatalogTest.initFlowSpec(specDir.getAbsolutePath(), URI.create("spec0"));
     FlowSpec flowSpec1 = FlowCatalogTest.initFlowSpec(specDir.getAbsolutePath(), URI.create("spec1"));
 
-    flowCatalog.put(flowSpec0, false);
-    flowCatalog.put(flowSpec1, false);
+    flowCatalog.put(flowSpec0, true);
+    flowCatalog.put(flowSpec1, true);
 
     Assert.assertEquals(flowCatalog.getSpecs().size(), 2);
 
@@ -143,6 +151,12 @@ public class GobblinServiceJobSchedulerTest {
     FlowCatalog flowCatalog = new FlowCatalog(ConfigUtils.propertiesToConfig(properties));
     ServiceBasedAppLauncher serviceLauncher = new ServiceBasedAppLauncher(properties, "GaaSJobSchedulerTest");
 
+    // Assume that the catalog can store corrupted flows
+    SpecCatalogListener mockListener = Mockito.mock(SpecCatalogListener.class);
+    when(mockListener.getName()).thenReturn(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS);
+    when(mockListener.onAddSpec(any())).thenReturn(new AddSpecResponse(""));
+    flowCatalog.addListener(mockListener);
+
     serviceLauncher.addService(flowCatalog);
     serviceLauncher.start();
 
@@ -151,11 +165,10 @@ public class GobblinServiceJobSchedulerTest {
     FlowSpec flowSpec1 = FlowCatalogTest.initFlowSpec(specDir.getAbsolutePath(), URI.create("spec1"));
     FlowSpec flowSpec2 = FlowCatalogTest.initFlowSpec(specDir.getAbsolutePath(), URI.create("spec2"));
 
-    // Assume that the catalog can store corrupted flows
-    flowCatalog.put(flowSpec0, false);
     // Ensure that these flows are scheduled
-    flowCatalog.put(flowSpec1, false);
-    flowCatalog.put(flowSpec2, false);
+    flowCatalog.put(flowSpec0, true);
+    flowCatalog.put(flowSpec1, true);
+    flowCatalog.put(flowSpec2, true);
 
     Assert.assertEquals(flowCatalog.getSpecs().size(), 3);