You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by wl...@apache.org on 2023/05/11 16:57:06 UTC

[gobblin] branch master updated: [GOBBLIN-1831] Use flowexecutionid in kafka monitor and jobnames (#3694)

This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a045cdd9 [GOBBLIN-1831] Use flowexecutionid in kafka monitor and jobnames (#3694)
3a045cdd9 is described below

commit 3a045cdd989df84af9a44199469ea4bcc53919b3
Author: William Lo <lo...@gmail.com>
AuthorDate: Thu May 11 12:56:59 2023 -0400

    [GOBBLIN-1831] Use flowexecutionid in kafka monitor and jobnames (#3694)
    
    * Use FlowexecutionId as job id instead of generated timestamp if the properties has a flow execution ID property
    
    * Add flowexecution ID to kafkajob monitor for cancelled flows if applicable
    
    * Address review
    
    * Add enhanced logs
---
 .../apache/gobblin/cluster/HelixJobsMapping.java   |   9 +-
 .../service/StreamingKafkaSpecExecutorTest.java    | 107 +++++++++++++++++++--
 .../gobblin/service/SimpleKafkaSpecProducer.java   |  46 +++------
 .../runtime/job_monitor/KafkaJobMonitor.java       |  30 +++++-
 4 files changed, 143 insertions(+), 49 deletions(-)

diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixJobsMapping.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixJobsMapping.java
index ab4d87c4e..8128aa0aa 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixJobsMapping.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixJobsMapping.java
@@ -36,6 +36,7 @@ import org.apache.gobblin.runtime.JobState;
 import org.apache.gobblin.util.ClassAliasResolver;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.JobLauncherUtils;
+import org.apache.gobblin.util.PropertiesUtils;
 
 
 /**
@@ -96,12 +97,14 @@ public class HelixJobsMapping {
 
   public static String createPlanningJobId (Properties jobPlanningProps) {
     return JobLauncherUtils.newJobId(GobblinClusterConfigurationKeys.PLANNING_JOB_NAME_PREFIX
-        + JobState.getJobNameFromProps(jobPlanningProps));
+            + JobState.getJobNameFromProps(jobPlanningProps),
+        PropertiesUtils.getPropAsLong(jobPlanningProps, ConfigurationKeys.FLOW_EXECUTION_ID_KEY, System.currentTimeMillis()));
   }
 
   public static String createActualJobId (Properties jobProps) {
-    return JobLauncherUtils.newJobId(GobblinClusterConfigurationKeys.ACTUAL_JOB_NAME_PREFIX
-        + JobState.getJobNameFromProps(jobProps));
+     return JobLauncherUtils.newJobId(GobblinClusterConfigurationKeys.ACTUAL_JOB_NAME_PREFIX
+             + JobState.getJobNameFromProps(jobProps),
+          PropertiesUtils.getPropAsLong(jobProps, ConfigurationKeys.FLOW_EXECUTION_ID_KEY, System.currentTimeMillis()));
   }
 
   @Nullable
diff --git a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/service/StreamingKafkaSpecExecutorTest.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/service/StreamingKafkaSpecExecutorTest.java
index abbdc610c..bf7cf071b 100644
--- a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/service/StreamingKafkaSpecExecutorTest.java
+++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/service/StreamingKafkaSpecExecutorTest.java
@@ -29,25 +29,26 @@ import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.testng.Assert;
 import org.testng.annotations.AfterSuite;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeSuite;
 import org.testng.annotations.Test;
 
 import com.google.common.io.Closer;
 import com.typesafe.config.Config;
 
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.kafka.KafkaTestBase;
 import org.apache.gobblin.kafka.client.Kafka09ConsumerClient;
 import org.apache.gobblin.kafka.writer.KafkaWriterConfigurationKeys;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecExecutor;
 import org.apache.gobblin.runtime.job_catalog.NonObservingFSJobCatalog;
 import org.apache.gobblin.runtime.job_monitor.KafkaJobMonitor;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.writer.WriteResponse;
-import org.apache.gobblin.runtime.api.SpecExecutor;
-
-import lombok.extern.slf4j.Slf4j;
 
 
 @Slf4j
@@ -63,9 +64,12 @@ public class StreamingKafkaSpecExecutorTest extends KafkaTestBase {
   private String _kafkaBrokers;
   private static final String _TEST_DIR_PATH = "/tmp/StreamingKafkaSpecExecutorTest";
   private static final String _JOBS_DIR_PATH = _TEST_DIR_PATH + "/jobs";
+  String flowSpecUriString = "/flowgroup/flowname/spec";
+  Spec flowSpec = initJobSpecWithFlowExecutionId(flowSpecUriString, "12345");
   String specUriString = "/foo/bar/spec";
   Spec spec = initJobSpec(specUriString);
 
+
   @BeforeSuite
   public void beforeSuite() {
     log.info("Process id = " + ManagementFactory.getRuntimeMXBean().getName());
@@ -92,9 +96,8 @@ public class StreamingKafkaSpecExecutorTest extends KafkaTestBase {
       }
     }
   }
-
-  @Test
-  public void testAddSpec() throws Exception {
+  @BeforeClass
+  public void setup() throws Exception {
     _closer = Closer.create();
     _properties = new Properties();
 
@@ -116,9 +119,6 @@ public class StreamingKafkaSpecExecutorTest extends KafkaTestBase {
     // SEI Producer
     _seip = _closer.register(new SimpleKafkaSpecProducer(config));
 
-    WriteResponse writeResponse = (WriteResponse) _seip.addSpec(spec).get();
-    log.info("WriteResponse: " + writeResponse);
-
     _jobCatalog = new NonObservingFSJobCatalog(config.getConfig("gobblin.cluster"));
     _jobCatalog.startAsync().awaitRunning();
 
@@ -126,6 +126,13 @@ public class StreamingKafkaSpecExecutorTest extends KafkaTestBase {
     _seic = _closer.register(new StreamingKafkaSpecConsumer(config, _jobCatalog));
     _seic.startAsync().awaitRunning();
 
+  }
+
+  @Test
+  public void testAddSpec() throws Exception {
+    WriteResponse writeResponse = (WriteResponse) _seip.addSpec(spec).get();
+    log.info("WriteResponse: " + writeResponse);
+
     List<Pair<SpecExecutor.Verb, Spec>> consumedEvent = _seic.changedSpecs().get();
     Assert.assertTrue(consumedEvent.size() == 1, "Consumption did not match production");
 
@@ -165,6 +172,78 @@ public class StreamingKafkaSpecExecutorTest extends KafkaTestBase {
     Assert.assertTrue(consumedSpecAction.getValue() instanceof JobSpec, "Expected JobSpec");
   }
 
+  @Test(dependsOnMethods = "testDeleteSpec")
+  public void testCancelSpec() throws Exception {
+    // Cancel an existing spec that was added
+    _seip.addSpec(spec).get();
+    WriteResponse writeResponse = (WriteResponse) _seip.cancelJob(new URI(specUriString), new Properties()).get();
+    log.info("WriteResponse: " + writeResponse);
+
+    // Wait for the cancellation to be processed
+    Thread.sleep(5000);
+    List<Pair<SpecExecutor.Verb, Spec>> consumedEvent = _seic.changedSpecs().get();
+    Assert.assertTrue(consumedEvent.size() == 3, "Consumption did not match production");
+
+    Map.Entry<SpecExecutor.Verb, Spec> consumedSpecAction = consumedEvent.get(2);
+    log.info(consumedSpecAction.getKey().toString());
+    Assert.assertTrue(consumedEvent.get(0).getKey().equals(SpecExecutor.Verb.ADD), "Verb did not match");
+    Assert.assertTrue(consumedEvent.get(1).getKey().equals(SpecExecutor.Verb.DELETE), "Verb did not match");
+    Assert.assertTrue(consumedSpecAction.getKey().equals(SpecExecutor.Verb.CANCEL), "Verb did not match");
+    Assert.assertTrue(consumedSpecAction.getValue().getUri().toString().equals(specUriString), "Expected URI did not match");
+    Assert.assertTrue(consumedSpecAction.getValue() instanceof JobSpec, "Expected JobSpec");
+  }
+
+  @Test (dependsOnMethods = "testCancelSpec")
+  public void testCancelSpecNoopDefault() throws Exception {
+     _seip.addSpec(flowSpec).get();
+    Properties props = new Properties();
+    props.setProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, "54321"); // Does not match with added jobspec, so should not cancel job
+    WriteResponse writeResponse = (WriteResponse) _seip.cancelJob(new URI(flowSpecUriString), props).get();
+    log.info("WriteResponse: " + writeResponse);
+    // Wait for the cancellation to be processed, but it should ignore the spec as flow execution IDs do not match
+    Thread.sleep(5000);
+    List<Pair<SpecExecutor.Verb, Spec>> consumedEvent = _seic.changedSpecs().get();
+    Assert.assertTrue(consumedEvent.size() == 1, "Consumption did not match production");
+
+    Map.Entry<SpecExecutor.Verb, Spec> consumedSpecAction = consumedEvent.get(0);
+    Assert.assertTrue(consumedSpecAction.getKey().equals(SpecExecutor.Verb.ADD), "Verb did not match");
+    Assert.assertTrue(consumedSpecAction.getValue().getUri().toString().equals(flowSpecUriString), "Expected URI did not match");
+    Assert.assertTrue(consumedSpecAction.getValue() instanceof JobSpec, "Expected JobSpec");
+
+    _seip.cancelJob(new URI(flowSpecUriString), new Properties()).get();
+    Thread.sleep(5000);
+    consumedEvent = _seic.changedSpecs().get();
+    Assert.assertTrue(consumedEvent.size() == 2, "Should emit cancellation event if no flow ID provided");
+    consumedSpecAction = consumedEvent.get(1);
+    Assert.assertTrue(consumedEvent.get(0).getKey().equals(SpecExecutor.Verb.DELETE), "Verb did not match");
+    Assert.assertTrue(consumedSpecAction.getKey().equals(SpecExecutor.Verb.CANCEL), "Verb did not match");
+    Assert.assertTrue(consumedSpecAction.getValue().getUri().toString().equals(flowSpecUriString), "Expected URI did not match");
+    Assert.assertTrue(consumedSpecAction.getValue() instanceof JobSpec, "Expected JobSpec");
+  }
+
+  @Test(dependsOnMethods = "testCancelSpecNoopDefault")
+  public void testCancelSpecWithFlowExecutionId() throws Exception {
+    _seip.addSpec(flowSpec).get();
+    Properties props = new Properties();
+    props.setProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, "12345");
+    WriteResponse writeResponse = (WriteResponse) _seip.cancelJob(new URI(flowSpecUriString), props).get();
+    log.info("WriteResponse: " + writeResponse);
+
+    // Wait for the cancellation to be processed
+    Thread.sleep(5000);
+    List<Pair<SpecExecutor.Verb, Spec>> consumedEvent = _seic.changedSpecs().get();
+    Assert.assertTrue(consumedEvent.size() == 3, "Consumption did not match production");
+
+    Map.Entry<SpecExecutor.Verb, Spec> consumedSpecAction = consumedEvent.get(2);
+    log.info(consumedSpecAction.getKey().toString());
+    Assert.assertTrue(consumedEvent.get(0).getKey().equals(SpecExecutor.Verb.ADD), "Verb did not match");
+    Assert.assertTrue(consumedEvent.get(1).getKey().equals(SpecExecutor.Verb.DELETE), "Verb did not match");
+    Assert.assertTrue(consumedSpecAction.getKey().equals(SpecExecutor.Verb.CANCEL), "Verb did not match");
+    Assert.assertTrue(consumedSpecAction.getValue().getUri().toString().equals(flowSpecUriString), "Expected URI did not match");
+    Assert.assertTrue(consumedSpecAction.getValue() instanceof JobSpec, "Expected JobSpec");
+  }
+
+
   private static JobSpec initJobSpec(String specUri) {
     Properties properties = new Properties();
     return JobSpec.builder(specUri)
@@ -174,6 +253,16 @@ public class StreamingKafkaSpecExecutorTest extends KafkaTestBase {
         .build();
   }
 
+  private static JobSpec initJobSpecWithFlowExecutionId(String specUri, String flowExecutionId) {
+    Properties properties = new Properties();
+    properties.setProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, flowExecutionId);
+    return JobSpec.builder(specUri)
+        .withConfig(ConfigUtils.propertiesToConfig(properties))
+        .withVersion("1")
+        .withDescription("Spec Description")
+        .build();
+  }
+
   @AfterSuite
   public void after() {
     try {
diff --git a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java
index 953d841f1..0d01bd4f9 100644
--- a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java
+++ b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java
@@ -17,27 +17,30 @@
 
 package org.apache.gobblin.service;
 
-import com.google.common.base.Joiner;
 import java.io.Closeable;
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.List;
-import java.util.concurrent.Future;
 import java.util.Properties;
+import java.util.concurrent.Future;
 
 import org.apache.commons.lang3.reflect.ConstructorUtils;
-import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.slf4j.Logger;
 
 import com.codahale.metrics.Meter;
 import com.codahale.metrics.MetricRegistry;
+import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import com.typesafe.config.Config;
 
+import javax.annotation.concurrent.NotThreadSafe;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.metrics.MetricContext;
@@ -54,9 +57,6 @@ import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.writer.AsyncDataWriter;
 import org.apache.gobblin.writer.WriteCallback;
 
-import javax.annotation.concurrent.NotThreadSafe;
-import lombok.extern.slf4j.Slf4j;
-
 @Slf4j
 @NotThreadSafe
 public class SimpleKafkaSpecProducer implements SpecProducer<Spec>, Closeable  {
@@ -105,19 +105,6 @@ public class SimpleKafkaSpecProducer implements SpecProducer<Spec>, Closeable  {
     return this.metricContext.meter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, getClass().getSimpleName(), suffix));
   }
 
-  private Spec addExecutionIdToJobSpecUri(Spec spec) {
-    JobSpec newSpec = (JobSpec)spec;
-    if (newSpec.getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) {
-      try {
-        newSpec.setUri(new URI(Joiner.on("/").
-            join(spec.getUri().toString(), newSpec.getConfig().getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY))));
-      } catch (URISyntaxException e) {
-        log.error("Cannot create job uri to cancel job", e);
-      }
-    }
-    return newSpec;
-  }
-
   private URI getURIWithExecutionId(URI originalURI, Properties props) {
     URI result = originalURI;
     if (props.containsKey(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) {
@@ -133,10 +120,9 @@ public class SimpleKafkaSpecProducer implements SpecProducer<Spec>, Closeable  {
 
   @Override
   public Future<?> addSpec(Spec addedSpec) {
-    Spec spec = addExecutionIdToJobSpecUri(addedSpec);
-    AvroJobSpec avroJobSpec = convertToAvroJobSpec(spec, SpecExecutor.Verb.ADD);
+    AvroJobSpec avroJobSpec = convertToAvroJobSpec(addedSpec, SpecExecutor.Verb.ADD);
 
-    log.info("Adding Spec: " + spec + " using Kafka.");
+    log.info("Adding Spec: " + addedSpec + " using Kafka.");
     this.addSpecMeter.mark();
 
     return getKafkaProducer().write(_serializer.serializeRecord(avroJobSpec), new KafkaWriteCallback(avroJobSpec));
@@ -144,10 +130,9 @@ public class SimpleKafkaSpecProducer implements SpecProducer<Spec>, Closeable  {
 
   @Override
   public Future<?> updateSpec(Spec updatedSpec) {
-    Spec spec = addExecutionIdToJobSpecUri(updatedSpec);
-    AvroJobSpec avroJobSpec = convertToAvroJobSpec(spec, SpecExecutor.Verb.UPDATE);
+    AvroJobSpec avroJobSpec = convertToAvroJobSpec(updatedSpec, SpecExecutor.Verb.UPDATE);
 
-    log.info("Updating Spec: " + spec + " using Kafka.");
+    log.info("Updating Spec: " + updatedSpec + " using Kafka.");
     this.updateSpecMeter.mark();
 
     return getKafkaProducer().write(_serializer.serializeRecord(avroJobSpec), new KafkaWriteCallback(avroJobSpec));
@@ -155,13 +140,11 @@ public class SimpleKafkaSpecProducer implements SpecProducer<Spec>, Closeable  {
 
   @Override
   public Future<?> deleteSpec(URI deletedSpecURI, Properties headers) {
-    URI finalDeletedSpecURI = getURIWithExecutionId(deletedSpecURI, headers);
-
-    AvroJobSpec avroJobSpec = AvroJobSpec.newBuilder().setUri(finalDeletedSpecURI.toString())
+    AvroJobSpec avroJobSpec = AvroJobSpec.newBuilder().setUri(deletedSpecURI.toString())
         .setMetadata(ImmutableMap.of(SpecExecutor.VERB_KEY, SpecExecutor.Verb.DELETE.name()))
         .setProperties(Maps.fromProperties(headers)).build();
 
-    log.info("Deleting Spec: " + finalDeletedSpecURI + " using Kafka.");
+    log.info("Deleting Spec: " + deletedSpecURI + " using Kafka.");
     this.deleteSpecMeter.mark();
 
     return getKafkaProducer().write(_serializer.serializeRecord(avroJobSpec), new KafkaWriteCallback(avroJobSpec));
@@ -169,12 +152,11 @@ public class SimpleKafkaSpecProducer implements SpecProducer<Spec>, Closeable  {
 
   @Override
   public Future<?> cancelJob(URI deletedSpecURI, Properties properties) {
-    URI finalDeletedSpecURI = getURIWithExecutionId(deletedSpecURI, properties);
-    AvroJobSpec avroJobSpec = AvroJobSpec.newBuilder().setUri(finalDeletedSpecURI.toString())
+    AvroJobSpec avroJobSpec = AvroJobSpec.newBuilder().setUri(deletedSpecURI.toString())
         .setMetadata(ImmutableMap.of(SpecExecutor.VERB_KEY, SpecExecutor.Verb.CANCEL.name()))
         .setProperties(Maps.fromProperties(properties)).build();
 
-    log.info("Cancelling job: " + finalDeletedSpecURI + " using Kafka.");
+    log.info("Cancelling job: " + deletedSpecURI + " using Kafka.");
     this.cancelSpecMeter.mark();
 
     return getKafkaProducer().write(_serializer.serializeRecord(avroJobSpec), new KafkaWriteCallback(avroJobSpec));
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java
index 8a1bf3161..6952dcff8 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java
@@ -28,11 +28,13 @@ import com.typesafe.config.Config;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
 import org.apache.gobblin.metastore.DatasetStateStore;
 import org.apache.gobblin.metrics.ContextAwareMeter;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.JobSpecMonitor;
+import org.apache.gobblin.runtime.api.JobSpecNotFoundException;
 import org.apache.gobblin.runtime.api.MutableJobCatalog;
 import org.apache.gobblin.runtime.api.SpecExecutor;
 import org.apache.gobblin.runtime.kafka.HighLevelConsumer;
@@ -136,14 +138,32 @@ public abstract class KafkaJobMonitor extends HighLevelConsumer<byte[], byte[]>
             break;
           case DELETE:
             this.removedSpecs.mark();
-            URI jobSpecUri = parsedMessage.getUri();
-            this.jobCatalog.remove(jobSpecUri);
+            this.jobCatalog.remove(parsedMessage.getUri());
             // Delete the job state if it is a delete spec request
-            deleteStateStore(jobSpecUri);
+            deleteStateStore(parsedMessage.getUri());
             break;
           case CANCEL:
-            this.cancelledSpecs.mark();
-            this.jobCatalog.remove(parsedMessage.getUri(), true);
+            URI specUri = parsedMessage.getUri();
+            try {
+              JobSpec spec = this.jobCatalog.getJobSpec(specUri);
+              // If incoming job or existing job does not have an associated flow execution ID, default to cancelling the job
+              if (!spec.getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY) || !parsedMessage.getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) {
+                this.cancelledSpecs.mark();
+                this.jobCatalog.remove(specUri, true);
+              } else {
+                // Validate that the flow execution ID of the running flow matches the one in the incoming job spec
+                String flowIdToCancel = parsedMessage.getConfig().getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+                if (spec.getConfig().getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY).equals(flowIdToCancel)) {
+                  this.cancelledSpecs.mark();
+                  this.jobCatalog.remove(specUri, true);
+                } else {
+                  log.warn("Job spec {} that has flow execution ID {} could not be cancelled, incoming request expects to cancel flow execution ID {}", specUri,
+                      spec.getConfig().getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY), flowIdToCancel);
+                }
+              }
+            } catch (JobSpecNotFoundException e) {
+              log.warn("Could not find job spec {} to cancel in job catalog", specUri);
+            }
             break;
           default:
             log.error("Cannot process spec {} with verb {}", parsedMessage.getUri(), verb);