You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2022/10/28 17:24:34 UTC
[gobblin] branch master updated: [GOBBLIN-1730] Include flow execution id when try to cancel/submit job using SimpleKafkaSpecProducer (#3588)
This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 58faa9a6a [GOBBLIN-1730] Include flow execution id when try to cancel/submit job using SimpleKafkaSpecProducer (#3588)
58faa9a6a is described below
commit 58faa9a6ae32605755baecf4d64c57fc424649e5
Author: Zihan Li <zi...@linkedin.com>
AuthorDate: Fri Oct 28 10:24:27 2022 -0700
[GOBBLIN-1730] Include flow execution id when try to cancel/submit job using SimpleKafkaSpecProducer (#3588)
* address comments
* use connectionmanager when httpclient is not cloesable
* [GOBBLIN-1730] Include flow execution id when try to cancel/submit job using SimpleKafkaSpecProducer
* remove unnecessary dependency
Co-authored-by: Zihan Li <zi...@zihli-mn2.linkedin.biz>
---
.../gobblin/service/SimpleKafkaSpecProducer.java | 40 +++++++++++++++++++---
.../modules/core/GobblinServiceGuiceModule.java | 1 -
.../service/modules/orchestration/DagManager.java | 6 ++--
.../modules/orchestration/Orchestrator.java | 5 ++-
4 files changed, 44 insertions(+), 8 deletions(-)
diff --git a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java
index 0548a20e0..a96ba48ae 100644
--- a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java
+++ b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java
@@ -17,15 +17,18 @@
package org.apache.gobblin.service;
+import com.google.common.base.Joiner;
import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
+import java.net.URISyntaxException;
import java.util.List;
import java.util.concurrent.Future;
import java.util.Properties;
import org.apache.commons.lang3.reflect.ConstructorUtils;
+import org.apache.gobblin.configuration.ConfigurationKeys;
import org.slf4j.Logger;
import com.codahale.metrics.Meter;
@@ -102,11 +105,37 @@ public class SimpleKafkaSpecProducer implements SpecProducer<Spec>, Closeable {
return this.metricContext.meter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, getClass().getSimpleName(), suffix));
}
+ private Spec addExecutionIdToJobSpecUri(Spec spec) {
+ JobSpec newSpec = (JobSpec)spec;
+ if (newSpec.getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) {
+ try {
+ newSpec.setUri(new URI(Joiner.on("/").
+ join(spec.getUri().toString(), newSpec.getConfig().getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY))));
+ } catch (URISyntaxException e) {
+ log.error("Cannot create job uri to cancel job", e);
+ }
+ }
+ return newSpec;
+ }
+
+ private URI getURIWithExecutionId(URI originalURI, Properties props) {
+ if (props.contains(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) {
+ try {
+ originalURI = new URI(Joiner.on("/").
+ join(originalURI.toString(), props.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)));
+ } catch (URISyntaxException e) {
+ log.error("Cannot create job uri to cancel job", e);
+ }
+ }
+ return originalURI;
+ }
+
@Override
public Future<?> addSpec(Spec addedSpec) {
- AvroJobSpec avroJobSpec = convertToAvroJobSpec(addedSpec, SpecExecutor.Verb.ADD);
+ Spec spec = addExecutionIdToJobSpecUri(addedSpec);
+ AvroJobSpec avroJobSpec = convertToAvroJobSpec(spec, SpecExecutor.Verb.ADD);
- log.info("Adding Spec: " + addedSpec + " using Kafka.");
+ log.info("Adding Spec: " + spec + " using Kafka.");
this.addSpecMeter.mark();
return getKafkaProducer().write(_serializer.serializeRecord(avroJobSpec), new KafkaWriteCallback(avroJobSpec));
@@ -114,9 +143,10 @@ public class SimpleKafkaSpecProducer implements SpecProducer<Spec>, Closeable {
@Override
public Future<?> updateSpec(Spec updatedSpec) {
- AvroJobSpec avroJobSpec = convertToAvroJobSpec(updatedSpec, SpecExecutor.Verb.UPDATE);
+ Spec spec = addExecutionIdToJobSpecUri(updatedSpec);
+ AvroJobSpec avroJobSpec = convertToAvroJobSpec(spec, SpecExecutor.Verb.UPDATE);
- log.info("Updating Spec: " + updatedSpec + " using Kafka.");
+ log.info("Updating Spec: " + spec + " using Kafka.");
this.updateSpecMeter.mark();
return getKafkaProducer().write(_serializer.serializeRecord(avroJobSpec), new KafkaWriteCallback(avroJobSpec));
@@ -124,6 +154,7 @@ public class SimpleKafkaSpecProducer implements SpecProducer<Spec>, Closeable {
@Override
public Future<?> deleteSpec(URI deletedSpecURI, Properties headers) {
+ deletedSpecURI = getURIWithExecutionId(deletedSpecURI, headers);
AvroJobSpec avroJobSpec = AvroJobSpec.newBuilder().setUri(deletedSpecURI.toString())
.setMetadata(ImmutableMap.of(SpecExecutor.VERB_KEY, SpecExecutor.Verb.DELETE.name()))
@@ -137,6 +168,7 @@ public class SimpleKafkaSpecProducer implements SpecProducer<Spec>, Closeable {
@Override
public Future<?> cancelJob(URI deletedSpecURI, Properties properties) {
+ deletedSpecURI = getURIWithExecutionId(deletedSpecURI, properties);
AvroJobSpec avroJobSpec = AvroJobSpec.newBuilder().setUri(deletedSpecURI.toString())
.setMetadata(ImmutableMap.of(SpecExecutor.VERB_KEY, SpecExecutor.Verb.CANCEL.name()))
.setProperties(Maps.fromProperties(properties)).build();
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
index 16ef015a8..642f78f11 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
@@ -22,7 +22,6 @@ import java.util.Objects;
import org.apache.gobblin.runtime.api.DagActionStore;
import org.apache.gobblin.runtime.dag_action_store.MysqlDagActionStore;
import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
-//import org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigV2ResourceHandlerWithWarmStandby;
import org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigV2ResourceHandlerWithWarmStandby;
import org.apache.gobblin.service.modules.restli.GobblinServiceFlowExecutionResourceHandlerWithWarmStandby;
import org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitor;
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 0bcceaf62..8532ec9b8 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -666,8 +666,10 @@ public class DagManager extends AbstractIdleService {
props.put(ConfigurationKeys.SPEC_PRODUCER_SERIALIZED_FUTURE, serializedFuture);
sendCancellationEvent(dagNodeToCancel.getValue());
}
- props.setProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
- ConfigUtils.getString(dagNodeToCancel.getValue().getJobSpec().getConfig(), ConfigurationKeys.FLOW_EXECUTION_ID_KEY, ""));
+ if (dagNodeToCancel.getValue().getJobSpec().getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) {
+ props.setProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
+ dagNodeToCancel.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY));
+ }
DagManagerUtils.getSpecProducer(dagNodeToCancel).cancelJob(dagNodeToCancel.getValue().getJobSpec().getUri(), props);
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index 19a1acdc5..9ba8abd38 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -398,9 +398,12 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
// Delete all compiled JobSpecs on their respective Executor
for (Dag.DagNode<JobExecutionPlan> dagNode : jobExecutionPlanDag.getNodes()) {
JobExecutionPlan jobExecutionPlan = dagNode.getValue();
- Spec jobSpec = jobExecutionPlan.getJobSpec();
+ JobSpec jobSpec = jobExecutionPlan.getJobSpec();
try {
SpecProducer<Spec> producer = jobExecutionPlan.getSpecExecutor().getProducer().get();
+ if (jobSpec.getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) {
+ headers.setProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, jobSpec.getConfig().getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY));
+ }
_log.info(String.format("Going to delete JobSpec: %s on Executor: %s", jobSpec, producer));
producer.deleteSpec(jobSpec.getUri(), headers);
} catch (Exception e) {