You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by wl...@apache.org on 2023/05/11 16:57:06 UTC
[gobblin] branch master updated: [GOBBLIN-1831] Use flowexecutionid in kafka monitor and jobnames (#3694)
This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 3a045cdd9 [GOBBLIN-1831] Use flowexecutionid in kafka monitor and jobnames (#3694)
3a045cdd9 is described below
commit 3a045cdd989df84af9a44199469ea4bcc53919b3
Author: William Lo <lo...@gmail.com>
AuthorDate: Thu May 11 12:56:59 2023 -0400
[GOBBLIN-1831] Use flowexecutionid in kafka monitor and jobnames (#3694)
* Use FlowexecutionId as job id instead of generated timestamp if the properties has a flow execution ID property
* Add flowexecution ID to kafkajob monitor for cancelled flows if applicable
* Address review
* Add enhanced logs
---
.../apache/gobblin/cluster/HelixJobsMapping.java | 9 +-
.../service/StreamingKafkaSpecExecutorTest.java | 107 +++++++++++++++++++--
.../gobblin/service/SimpleKafkaSpecProducer.java | 46 +++------
.../runtime/job_monitor/KafkaJobMonitor.java | 30 +++++-
4 files changed, 143 insertions(+), 49 deletions(-)
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixJobsMapping.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixJobsMapping.java
index ab4d87c4e..8128aa0aa 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixJobsMapping.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixJobsMapping.java
@@ -36,6 +36,7 @@ import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.JobLauncherUtils;
+import org.apache.gobblin.util.PropertiesUtils;
/**
@@ -96,12 +97,14 @@ public class HelixJobsMapping {
public static String createPlanningJobId (Properties jobPlanningProps) {
return JobLauncherUtils.newJobId(GobblinClusterConfigurationKeys.PLANNING_JOB_NAME_PREFIX
- + JobState.getJobNameFromProps(jobPlanningProps));
+ + JobState.getJobNameFromProps(jobPlanningProps),
+ PropertiesUtils.getPropAsLong(jobPlanningProps, ConfigurationKeys.FLOW_EXECUTION_ID_KEY, System.currentTimeMillis()));
}
public static String createActualJobId (Properties jobProps) {
- return JobLauncherUtils.newJobId(GobblinClusterConfigurationKeys.ACTUAL_JOB_NAME_PREFIX
- + JobState.getJobNameFromProps(jobProps));
+ return JobLauncherUtils.newJobId(GobblinClusterConfigurationKeys.ACTUAL_JOB_NAME_PREFIX
+ + JobState.getJobNameFromProps(jobProps),
+ PropertiesUtils.getPropAsLong(jobProps, ConfigurationKeys.FLOW_EXECUTION_ID_KEY, System.currentTimeMillis()));
}
@Nullable
diff --git a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/service/StreamingKafkaSpecExecutorTest.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/service/StreamingKafkaSpecExecutorTest.java
index abbdc610c..bf7cf071b 100644
--- a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/service/StreamingKafkaSpecExecutorTest.java
+++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/service/StreamingKafkaSpecExecutorTest.java
@@ -29,25 +29,26 @@ import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.testng.Assert;
import org.testng.annotations.AfterSuite;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeSuite;
import org.testng.annotations.Test;
import com.google.common.io.Closer;
import com.typesafe.config.Config;
+import lombok.extern.slf4j.Slf4j;
+
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.kafka.KafkaTestBase;
import org.apache.gobblin.kafka.client.Kafka09ConsumerClient;
import org.apache.gobblin.kafka.writer.KafkaWriterConfigurationKeys;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.runtime.job_catalog.NonObservingFSJobCatalog;
import org.apache.gobblin.runtime.job_monitor.KafkaJobMonitor;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.writer.WriteResponse;
-import org.apache.gobblin.runtime.api.SpecExecutor;
-
-import lombok.extern.slf4j.Slf4j;
@Slf4j
@@ -63,9 +64,12 @@ public class StreamingKafkaSpecExecutorTest extends KafkaTestBase {
private String _kafkaBrokers;
private static final String _TEST_DIR_PATH = "/tmp/StreamingKafkaSpecExecutorTest";
private static final String _JOBS_DIR_PATH = _TEST_DIR_PATH + "/jobs";
+ String flowSpecUriString = "/flowgroup/flowname/spec";
+ Spec flowSpec = initJobSpecWithFlowExecutionId(flowSpecUriString, "12345");
String specUriString = "/foo/bar/spec";
Spec spec = initJobSpec(specUriString);
+
@BeforeSuite
public void beforeSuite() {
log.info("Process id = " + ManagementFactory.getRuntimeMXBean().getName());
@@ -92,9 +96,8 @@ public class StreamingKafkaSpecExecutorTest extends KafkaTestBase {
}
}
}
-
- @Test
- public void testAddSpec() throws Exception {
+ @BeforeClass
+ public void setup() throws Exception {
_closer = Closer.create();
_properties = new Properties();
@@ -116,9 +119,6 @@ public class StreamingKafkaSpecExecutorTest extends KafkaTestBase {
// SEI Producer
_seip = _closer.register(new SimpleKafkaSpecProducer(config));
- WriteResponse writeResponse = (WriteResponse) _seip.addSpec(spec).get();
- log.info("WriteResponse: " + writeResponse);
-
_jobCatalog = new NonObservingFSJobCatalog(config.getConfig("gobblin.cluster"));
_jobCatalog.startAsync().awaitRunning();
@@ -126,6 +126,13 @@ public class StreamingKafkaSpecExecutorTest extends KafkaTestBase {
_seic = _closer.register(new StreamingKafkaSpecConsumer(config, _jobCatalog));
_seic.startAsync().awaitRunning();
+ }
+
+ @Test
+ public void testAddSpec() throws Exception {
+ WriteResponse writeResponse = (WriteResponse) _seip.addSpec(spec).get();
+ log.info("WriteResponse: " + writeResponse);
+
List<Pair<SpecExecutor.Verb, Spec>> consumedEvent = _seic.changedSpecs().get();
Assert.assertTrue(consumedEvent.size() == 1, "Consumption did not match production");
@@ -165,6 +172,78 @@ public class StreamingKafkaSpecExecutorTest extends KafkaTestBase {
Assert.assertTrue(consumedSpecAction.getValue() instanceof JobSpec, "Expected JobSpec");
}
+ @Test(dependsOnMethods = "testDeleteSpec")
+ public void testCancelSpec() throws Exception {
+ // Cancel an existing spec that was added
+ _seip.addSpec(spec).get();
+ WriteResponse writeResponse = (WriteResponse) _seip.cancelJob(new URI(specUriString), new Properties()).get();
+ log.info("WriteResponse: " + writeResponse);
+
+ // Wait for the cancellation to be processed
+ Thread.sleep(5000);
+ List<Pair<SpecExecutor.Verb, Spec>> consumedEvent = _seic.changedSpecs().get();
+ Assert.assertTrue(consumedEvent.size() == 3, "Consumption did not match production");
+
+ Map.Entry<SpecExecutor.Verb, Spec> consumedSpecAction = consumedEvent.get(2);
+ log.info(consumedSpecAction.getKey().toString());
+ Assert.assertTrue(consumedEvent.get(0).getKey().equals(SpecExecutor.Verb.ADD), "Verb did not match");
+ Assert.assertTrue(consumedEvent.get(1).getKey().equals(SpecExecutor.Verb.DELETE), "Verb did not match");
+ Assert.assertTrue(consumedSpecAction.getKey().equals(SpecExecutor.Verb.CANCEL), "Verb did not match");
+ Assert.assertTrue(consumedSpecAction.getValue().getUri().toString().equals(specUriString), "Expected URI did not match");
+ Assert.assertTrue(consumedSpecAction.getValue() instanceof JobSpec, "Expected JobSpec");
+ }
+
+ @Test (dependsOnMethods = "testCancelSpec")
+ public void testCancelSpecNoopDefault() throws Exception {
+ _seip.addSpec(flowSpec).get();
+ Properties props = new Properties();
+ props.setProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, "54321"); // Does not match with added jobspec, so should not cancel job
+ WriteResponse writeResponse = (WriteResponse) _seip.cancelJob(new URI(flowSpecUriString), props).get();
+ log.info("WriteResponse: " + writeResponse);
+ // Wait for the cancellation to be processed, but it should ignore the spec as flow execution IDs do not match
+ Thread.sleep(5000);
+ List<Pair<SpecExecutor.Verb, Spec>> consumedEvent = _seic.changedSpecs().get();
+ Assert.assertTrue(consumedEvent.size() == 1, "Consumption did not match production");
+
+ Map.Entry<SpecExecutor.Verb, Spec> consumedSpecAction = consumedEvent.get(0);
+ Assert.assertTrue(consumedSpecAction.getKey().equals(SpecExecutor.Verb.ADD), "Verb did not match");
+ Assert.assertTrue(consumedSpecAction.getValue().getUri().toString().equals(flowSpecUriString), "Expected URI did not match");
+ Assert.assertTrue(consumedSpecAction.getValue() instanceof JobSpec, "Expected JobSpec");
+
+ _seip.cancelJob(new URI(flowSpecUriString), new Properties()).get();
+ Thread.sleep(5000);
+ consumedEvent = _seic.changedSpecs().get();
+ Assert.assertTrue(consumedEvent.size() == 2, "Should emit cancellation event if no flow ID provided");
+ consumedSpecAction = consumedEvent.get(1);
+ Assert.assertTrue(consumedEvent.get(0).getKey().equals(SpecExecutor.Verb.DELETE), "Verb did not match");
+ Assert.assertTrue(consumedSpecAction.getKey().equals(SpecExecutor.Verb.CANCEL), "Verb did not match");
+ Assert.assertTrue(consumedSpecAction.getValue().getUri().toString().equals(flowSpecUriString), "Expected URI did not match");
+ Assert.assertTrue(consumedSpecAction.getValue() instanceof JobSpec, "Expected JobSpec");
+ }
+
+ @Test(dependsOnMethods = "testCancelSpecNoopDefault")
+ public void testCancelSpecWithFlowExecutionId() throws Exception {
+ _seip.addSpec(flowSpec).get();
+ Properties props = new Properties();
+ props.setProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, "12345");
+ WriteResponse writeResponse = (WriteResponse) _seip.cancelJob(new URI(flowSpecUriString), props).get();
+ log.info("WriteResponse: " + writeResponse);
+
+ // Wait for the cancellation to be processed
+ Thread.sleep(5000);
+ List<Pair<SpecExecutor.Verb, Spec>> consumedEvent = _seic.changedSpecs().get();
+ Assert.assertTrue(consumedEvent.size() == 3, "Consumption did not match production");
+
+ Map.Entry<SpecExecutor.Verb, Spec> consumedSpecAction = consumedEvent.get(2);
+ log.info(consumedSpecAction.getKey().toString());
+ Assert.assertTrue(consumedEvent.get(0).getKey().equals(SpecExecutor.Verb.ADD), "Verb did not match");
+ Assert.assertTrue(consumedEvent.get(1).getKey().equals(SpecExecutor.Verb.DELETE), "Verb did not match");
+ Assert.assertTrue(consumedSpecAction.getKey().equals(SpecExecutor.Verb.CANCEL), "Verb did not match");
+ Assert.assertTrue(consumedSpecAction.getValue().getUri().toString().equals(flowSpecUriString), "Expected URI did not match");
+ Assert.assertTrue(consumedSpecAction.getValue() instanceof JobSpec, "Expected JobSpec");
+ }
+
+
private static JobSpec initJobSpec(String specUri) {
Properties properties = new Properties();
return JobSpec.builder(specUri)
@@ -174,6 +253,16 @@ public class StreamingKafkaSpecExecutorTest extends KafkaTestBase {
.build();
}
+ private static JobSpec initJobSpecWithFlowExecutionId(String specUri, String flowExecutionId) {
+ Properties properties = new Properties();
+ properties.setProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, flowExecutionId);
+ return JobSpec.builder(specUri)
+ .withConfig(ConfigUtils.propertiesToConfig(properties))
+ .withVersion("1")
+ .withDescription("Spec Description")
+ .build();
+ }
+
@AfterSuite
public void after() {
try {
diff --git a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java
index 953d841f1..0d01bd4f9 100644
--- a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java
+++ b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java
@@ -17,27 +17,30 @@
package org.apache.gobblin.service;
-import com.google.common.base.Joiner;
import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
-import java.util.concurrent.Future;
import java.util.Properties;
+import java.util.concurrent.Future;
import org.apache.commons.lang3.reflect.ConstructorUtils;
-import org.apache.gobblin.configuration.ConfigurationKeys;
import org.slf4j.Logger;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
+import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
+import javax.annotation.concurrent.NotThreadSafe;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.MetricContext;
@@ -54,9 +57,6 @@ import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.writer.AsyncDataWriter;
import org.apache.gobblin.writer.WriteCallback;
-import javax.annotation.concurrent.NotThreadSafe;
-import lombok.extern.slf4j.Slf4j;
-
@Slf4j
@NotThreadSafe
public class SimpleKafkaSpecProducer implements SpecProducer<Spec>, Closeable {
@@ -105,19 +105,6 @@ public class SimpleKafkaSpecProducer implements SpecProducer<Spec>, Closeable {
return this.metricContext.meter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, getClass().getSimpleName(), suffix));
}
- private Spec addExecutionIdToJobSpecUri(Spec spec) {
- JobSpec newSpec = (JobSpec)spec;
- if (newSpec.getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) {
- try {
- newSpec.setUri(new URI(Joiner.on("/").
- join(spec.getUri().toString(), newSpec.getConfig().getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY))));
- } catch (URISyntaxException e) {
- log.error("Cannot create job uri to cancel job", e);
- }
- }
- return newSpec;
- }
-
private URI getURIWithExecutionId(URI originalURI, Properties props) {
URI result = originalURI;
if (props.containsKey(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) {
@@ -133,10 +120,9 @@ public class SimpleKafkaSpecProducer implements SpecProducer<Spec>, Closeable {
@Override
public Future<?> addSpec(Spec addedSpec) {
- Spec spec = addExecutionIdToJobSpecUri(addedSpec);
- AvroJobSpec avroJobSpec = convertToAvroJobSpec(spec, SpecExecutor.Verb.ADD);
+ AvroJobSpec avroJobSpec = convertToAvroJobSpec(addedSpec, SpecExecutor.Verb.ADD);
- log.info("Adding Spec: " + spec + " using Kafka.");
+ log.info("Adding Spec: " + addedSpec + " using Kafka.");
this.addSpecMeter.mark();
return getKafkaProducer().write(_serializer.serializeRecord(avroJobSpec), new KafkaWriteCallback(avroJobSpec));
@@ -144,10 +130,9 @@ public class SimpleKafkaSpecProducer implements SpecProducer<Spec>, Closeable {
@Override
public Future<?> updateSpec(Spec updatedSpec) {
- Spec spec = addExecutionIdToJobSpecUri(updatedSpec);
- AvroJobSpec avroJobSpec = convertToAvroJobSpec(spec, SpecExecutor.Verb.UPDATE);
+ AvroJobSpec avroJobSpec = convertToAvroJobSpec(updatedSpec, SpecExecutor.Verb.UPDATE);
- log.info("Updating Spec: " + spec + " using Kafka.");
+ log.info("Updating Spec: " + updatedSpec + " using Kafka.");
this.updateSpecMeter.mark();
return getKafkaProducer().write(_serializer.serializeRecord(avroJobSpec), new KafkaWriteCallback(avroJobSpec));
@@ -155,13 +140,11 @@ public class SimpleKafkaSpecProducer implements SpecProducer<Spec>, Closeable {
@Override
public Future<?> deleteSpec(URI deletedSpecURI, Properties headers) {
- URI finalDeletedSpecURI = getURIWithExecutionId(deletedSpecURI, headers);
-
- AvroJobSpec avroJobSpec = AvroJobSpec.newBuilder().setUri(finalDeletedSpecURI.toString())
+ AvroJobSpec avroJobSpec = AvroJobSpec.newBuilder().setUri(deletedSpecURI.toString())
.setMetadata(ImmutableMap.of(SpecExecutor.VERB_KEY, SpecExecutor.Verb.DELETE.name()))
.setProperties(Maps.fromProperties(headers)).build();
- log.info("Deleting Spec: " + finalDeletedSpecURI + " using Kafka.");
+ log.info("Deleting Spec: " + deletedSpecURI + " using Kafka.");
this.deleteSpecMeter.mark();
return getKafkaProducer().write(_serializer.serializeRecord(avroJobSpec), new KafkaWriteCallback(avroJobSpec));
@@ -169,12 +152,11 @@ public class SimpleKafkaSpecProducer implements SpecProducer<Spec>, Closeable {
@Override
public Future<?> cancelJob(URI deletedSpecURI, Properties properties) {
- URI finalDeletedSpecURI = getURIWithExecutionId(deletedSpecURI, properties);
- AvroJobSpec avroJobSpec = AvroJobSpec.newBuilder().setUri(finalDeletedSpecURI.toString())
+ AvroJobSpec avroJobSpec = AvroJobSpec.newBuilder().setUri(deletedSpecURI.toString())
.setMetadata(ImmutableMap.of(SpecExecutor.VERB_KEY, SpecExecutor.Verb.CANCEL.name()))
.setProperties(Maps.fromProperties(properties)).build();
- log.info("Cancelling job: " + finalDeletedSpecURI + " using Kafka.");
+ log.info("Cancelling job: " + deletedSpecURI + " using Kafka.");
this.cancelSpecMeter.mark();
return getKafkaProducer().write(_serializer.serializeRecord(avroJobSpec), new KafkaWriteCallback(avroJobSpec));
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java
index 8a1bf3161..6952dcff8 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java
@@ -28,11 +28,13 @@ import com.typesafe.config.Config;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
import org.apache.gobblin.metastore.DatasetStateStore;
import org.apache.gobblin.metrics.ContextAwareMeter;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.JobSpecMonitor;
+import org.apache.gobblin.runtime.api.JobSpecNotFoundException;
import org.apache.gobblin.runtime.api.MutableJobCatalog;
import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.runtime.kafka.HighLevelConsumer;
@@ -136,14 +138,32 @@ public abstract class KafkaJobMonitor extends HighLevelConsumer<byte[], byte[]>
break;
case DELETE:
this.removedSpecs.mark();
- URI jobSpecUri = parsedMessage.getUri();
- this.jobCatalog.remove(jobSpecUri);
+ this.jobCatalog.remove(parsedMessage.getUri());
// Delete the job state if it is a delete spec request
- deleteStateStore(jobSpecUri);
+ deleteStateStore(parsedMessage.getUri());
break;
case CANCEL:
- this.cancelledSpecs.mark();
- this.jobCatalog.remove(parsedMessage.getUri(), true);
+ URI specUri = parsedMessage.getUri();
+ try {
+ JobSpec spec = this.jobCatalog.getJobSpec(specUri);
+ // If incoming job or existing job does not have an associated flow execution ID, default to cancelling the job
+ if (!spec.getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY) || !parsedMessage.getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) {
+ this.cancelledSpecs.mark();
+ this.jobCatalog.remove(specUri, true);
+ } else {
+ // Validate that the flow execution ID of the running flow matches the one in the incoming job spec
+ String flowIdToCancel = parsedMessage.getConfig().getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+ if (spec.getConfig().getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY).equals(flowIdToCancel)) {
+ this.cancelledSpecs.mark();
+ this.jobCatalog.remove(specUri, true);
+ } else {
+ log.warn("Job spec {} that has flow execution ID {} could not be cancelled, incoming request expects to cancel flow execution ID {}", specUri,
+ spec.getConfig().getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY), flowIdToCancel);
+ }
+ }
+ } catch (JobSpecNotFoundException e) {
+ log.warn("Could not find job spec {} to cancel in job catalog", specUri);
+ }
break;
default:
log.error("Cannot process spec {} with verb {}", parsedMessage.getUri(), verb);