You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2021/11/10 23:09:28 UTC
[gobblin] branch master updated: [GOBBLIN-1564] codestyle changes, typo corrections, improved javadoc and fix a sync… (#3415)
This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 5bc22a55 [GOBBLIN-1564] codestyle changes, typo corrections, improved javadoc and fix a sync… (#3415)
5bc22a55 is described below
commit 5bc22a5502ddeb810a35b0fb996dd9dfc8c81121
Author: Arjun Singh Bora <ab...@linkedin.com>
AuthorDate: Wed Nov 10 15:09:20 2021 -0800
[GOBBLIN-1564] codestyle changes, typo corrections, improved javadoc and fix a sync… (#3415)
* codestyle changes, typo corrections, improved javadoc and fix a synchronization issue
* address review comments
* add review comments
* address review comments
* address review comments
* fix bugsFixMain
---
.../service/modules/orchestration/DagManager.java | 170 +++++++++++++--------
.../modules/orchestration/DagManagerUtils.java | 3 +-
2 files changed, 107 insertions(+), 66 deletions(-)
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index eb7b19f..4b304e5 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -38,12 +38,14 @@ import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.eventbus.Subscribe;
@@ -111,7 +113,6 @@ import static org.apache.gobblin.service.ExecutionStatus.*;
* The {@link DagManager} is active only in the leader mode. To ensure, each {@link Dag} managed by a {@link DagManager} is
* checkpointed to a persistent location. On start up or leadership change,
* the {@link DagManager} loads all the checkpointed {@link Dag}s and adds them to the {@link BlockingQueue}.
- * Current implementation supports only FileSystem-based checkpointing of the Dag statuses.
*/
@Alpha
@Slf4j
@@ -121,7 +122,6 @@ public class DagManager extends AbstractIdleService {
public static final String DAG_MANAGER_PREFIX = "gobblin.service.dagManager.";
- private static final String JOB_STATUS_RETRIEVER_KEY = DAG_MANAGER_PREFIX + "jobStatusRetriever";
private static final Integer DEFAULT_JOB_STATUS_POLLING_INTERVAL = 10;
public static final Integer DEFAULT_NUM_THREADS = 3;
private static final Integer TERMINATION_TIMEOUT = 30;
@@ -166,15 +166,14 @@ public class DagManager extends AbstractIdleService {
}
}
- private BlockingQueue<Dag<JobExecutionPlan>>[] queue;
- private BlockingQueue<String>[] cancelQueue;
- private BlockingQueue<String>[] resumeQueue;
+ private final BlockingQueue<Dag<JobExecutionPlan>>[] runQueue;
+ private final BlockingQueue<String>[] cancelQueue;
+ private final BlockingQueue<String>[] resumeQueue;
DagManagerThread[] dagManagerThreads;
- private ScheduledExecutorService scheduledExecutorPool;
- private boolean instrumentationEnabled;
+ private final ScheduledExecutorService scheduledExecutorPool;
+ private final boolean instrumentationEnabled;
private DagStateStore dagStateStore;
- private DagStateStore failedDagStateStore;
private Map<URI, TopologySpec> topologySpecMap;
@Getter
@@ -194,9 +193,9 @@ public class DagManager extends AbstractIdleService {
public DagManager(Config config, JobStatusRetriever jobStatusRetriever, boolean instrumentationEnabled) {
this.config = config;
this.numThreads = ConfigUtils.getInt(config, NUM_THREADS_KEY, DEFAULT_NUM_THREADS);
- this.queue = initializeDagQueue(this.numThreads);
- this.cancelQueue = initializeDagQueue(this.numThreads);
- this.resumeQueue = initializeDagQueue(this.numThreads);
+ this.runQueue = (BlockingQueue<Dag<JobExecutionPlan>>[]) initializeDagQueue(this.numThreads);
+ this.cancelQueue = (BlockingQueue<String>[]) initializeDagQueue(this.numThreads);
+ this.resumeQueue = (BlockingQueue<String>[]) initializeDagQueue(this.numThreads);
this.scheduledExecutorPool = Executors.newScheduledThreadPool(numThreads);
this.pollingInterval = ConfigUtils.getInt(config, JOB_STATUS_POLLING_INTERVAL_KEY, DEFAULT_JOB_STATUS_POLLING_INTERVAL);
this.retentionPollingInterval = ConfigUtils.getInt(config, FAILED_DAG_POLLING_INTERVAL, DEFAULT_FAILED_DAG_POLLING_INTERVAL);
@@ -209,10 +208,12 @@ public class DagManager extends AbstractIdleService {
}
this.defaultQuota = ConfigUtils.getInt(config, USER_JOB_QUOTA_KEY, DEFAULT_USER_JOB_QUOTA);
- this.perUserQuota = new HashMap<>();
+
+ ImmutableMap.Builder<String, Integer> mapBuilder = ImmutableMap.builder();
for (String userQuota : ConfigUtils.getStringList(config, PER_USER_QUOTA)) {
- this.perUserQuota.put(userQuota.split(QUOTA_SEPERATOR)[0], Integer.parseInt(userQuota.split(QUOTA_SEPERATOR)[1]));
+ mapBuilder.put(userQuota.split(QUOTA_SEPERATOR)[0], Integer.parseInt(userQuota.split(QUOTA_SEPERATOR)[1]));
}
+ this.perUserQuota = mapBuilder.build();
this.jobStatusRetriever = jobStatusRetriever;
@@ -222,7 +223,7 @@ public class DagManager extends AbstractIdleService {
DagStateStore createDagStateStore(Config config, Map<URI, TopologySpec> topologySpecMap) {
try {
- Class dagStateStoreClass = Class.forName(ConfigUtils.getString(config, DAG_STATESTORE_CLASS_KEY, FSDagStateStore.class.getName()));
+ Class<?> dagStateStoreClass = Class.forName(ConfigUtils.getString(config, DAG_STATESTORE_CLASS_KEY, FSDagStateStore.class.getName()));
return (DagStateStore) GobblinConstructorUtils.invokeLongestConstructor(dagStateStoreClass, config, topologySpecMap);
} catch (ReflectiveOperationException e) {
throw new RuntimeException(e);
@@ -230,8 +231,8 @@ public class DagManager extends AbstractIdleService {
}
// Initializes and returns an array of Queue of size numThreads
- private static LinkedBlockingDeque[] initializeDagQueue(int numThreads) {
- LinkedBlockingDeque[] queue = new LinkedBlockingDeque[numThreads];
+ private static LinkedBlockingDeque<?>[] initializeDagQueue(int numThreads) {
+ LinkedBlockingDeque<?>[] queue = new LinkedBlockingDeque[numThreads];
for (int i=0; i< numThreads; i++) {
queue[i] = new LinkedBlockingDeque<>();
@@ -244,8 +245,8 @@ public class DagManager extends AbstractIdleService {
this(config, jobStatusRetriever, true);
}
- /** Start the service. On startup, the service launches a fixed pool of {@link DagManagerThread}s, which are scheduled at
- * fixed intervals. The service also loads any {@link Dag}s
+ /** Do Nothing on service startup. Scheduling of {@link DagManagerThread}s and loading of any {@link Dag}s is done
+ * during leadership change.
*/
@Override
protected void startUp() {
@@ -270,7 +271,7 @@ public class DagManager extends AbstractIdleService {
// Flow cancellation request has to be forwarded to the same DagManagerThread where the
// flow create request was forwarded. This is because Azkaban Exec Id is stored in the DagNode of the
// specific DagManagerThread queue
- if (!this.queue[queueId].offer(dag)) {
+ if (!this.runQueue[queueId].offer(dag)) {
throw new IOException("Could not add dag" + DagManagerUtils.generateDagId(dag) + "to queue");
}
if (setStatus) {
@@ -290,7 +291,7 @@ public class DagManager extends AbstractIdleService {
}
/**
- * Method to submit a {@link URI} for cancellation requsts to the {@link DagManager}.
+ * Method to submit a {@link URI} for cancellation requests to the {@link DagManager}.
* The {@link DagManager} adds the dag to the {@link BlockingQueue} to be picked up by one of the {@link DagManagerThread}s.
*/
synchronized public void stopDag(URI uri) throws IOException {
@@ -358,8 +359,10 @@ public class DagManager extends AbstractIdleService {
log.info("Scheduling {} DagManager threads", numThreads);
//Initializing state store for persisting Dags.
this.dagStateStore = createDagStateStore(config, topologySpecMap);
- this.failedDagStateStore = createDagStateStore(ConfigUtils.getConfigOrEmpty(config, FAILED_DAG_STATESTORE_PREFIX).withFallback(config), topologySpecMap);
- Set<String> failedDagIds = Collections.synchronizedSet(this.failedDagStateStore.getDagIds());
+ DagStateStore failedDagStateStore =
+ createDagStateStore(ConfigUtils.getConfigOrEmpty(config, FAILED_DAG_STATESTORE_PREFIX).withFallback(config),
+ topologySpecMap);
+ Set<String> failedDagIds = Collections.synchronizedSet(failedDagStateStore.getDagIds());
ContextAwareMeter allSuccessfulMeter = null;
ContextAwareMeter allFailedMeter = null;
@@ -375,7 +378,7 @@ public class DagManager extends AbstractIdleService {
this.dagManagerThreads = new DagManagerThread[numThreads];
for (int i = 0; i < numThreads; i++) {
DagManagerThread dagManagerThread = new DagManagerThread(jobStatusRetriever, dagStateStore, failedDagStateStore,
- queue[i], cancelQueue[i], resumeQueue[i], instrumentationEnabled, defaultQuota, perUserQuota, failedDagIds,
+ runQueue[i], cancelQueue[i], resumeQueue[i], instrumentationEnabled, defaultQuota, perUserQuota, failedDagIds,
allSuccessfulMeter, allFailedMeter);
this.dagManagerThreads[i] = dagManagerThread;
this.scheduledExecutorPool.scheduleAtFixedRate(dagManagerThread, 0, this.pollingInterval, TimeUnit.SECONDS);
@@ -415,7 +418,7 @@ public class DagManager extends AbstractIdleService {
private static final Map<String, Integer> proxyUserToJobCount = new ConcurrentHashMap<>();
private static final Map<String, Integer> requesterToJobCount = new ConcurrentHashMap<>();
private final Map<String, Dag<JobExecutionPlan>> dags = new HashMap<>();
- private Set<String> failedDagIds;
+ private final Set<String> failedDagIds;
private final Map<String, Dag<JobExecutionPlan>> resumingDags = new HashMap<>();
// dagToJobs holds a map of dagId to running jobs of that dag
final Map<String, LinkedList<DagNode<JobExecutionPlan>>> dagToJobs = new HashMap<>();
@@ -434,12 +437,12 @@ public class DagManager extends AbstractIdleService {
private static final Map<String, ContextAwareMeter> groupSuccessfulMeters = Maps.newConcurrentMap();
private static final Map<String, ContextAwareMeter> groupFailureMeters = Maps.newConcurrentMap();
- private JobStatusRetriever jobStatusRetriever;
- private DagStateStore dagStateStore;
- private DagStateStore failedDagStateStore;
- private BlockingQueue<Dag<JobExecutionPlan>> queue;
- private BlockingQueue<String> cancelQueue;
- private BlockingQueue<String> resumeQueue;
+ private final JobStatusRetriever jobStatusRetriever;
+ private final DagStateStore dagStateStore;
+ private final DagStateStore failedDagStateStore;
+ private final BlockingQueue<Dag<JobExecutionPlan>> queue;
+ private final BlockingQueue<String> cancelQueue;
+ private final BlockingQueue<String> resumeQueue;
/**
* Constructor.
@@ -464,7 +467,7 @@ public class DagManager extends AbstractIdleService {
this.eventSubmitter = Optional.of(new EventSubmitter.Builder(this.metricContext, "org.apache.gobblin.service").build());
this.jobStatusPolledTimer = Optional.of(this.metricContext.timer(ServiceMetricNames.JOB_STATUS_POLLED_TIMER));
ContextAwareGauge<Long> orchestrationDelayMetric = metricContext.newContextAwareGauge(ServiceMetricNames.FLOW_ORCHESTRATION_DELAY,
- () -> orchestrationDelay.get());
+ orchestrationDelay::get);
this.metricContext.register(orchestrationDelayMetric);
} else {
this.metricContext = null;
@@ -492,7 +495,7 @@ public class DagManager extends AbstractIdleService {
//Poll the queue for a new Dag to execute.
if (dag != null) {
if (dag.isEmpty()) {
- log.info("Empty dag; ignoring the dag");
+ log.warn("Empty dag; ignoring the dag");
}
//Initialize dag.
initialize(dag);
@@ -671,7 +674,7 @@ public class DagManager extends AbstractIdleService {
log.debug("Dag {} submitting jobs ready for execution.", DagManagerUtils.getFullyQualifiedDagName(dag));
//Determine the next set of jobs to run and submit them for execution
Map<String, Set<DagNode<JobExecutionPlan>>> nextSubmitted = submitNext(dagId);
- for (DagNode dagNode: nextSubmitted.get(dagId)) {
+ for (DagNode<JobExecutionPlan> dagNode: nextSubmitted.get(dagId)) {
addJobState(dagId, dagNode);
}
@@ -915,7 +918,7 @@ public class DagManager extends AbstractIdleService {
String specExecutorUri = DagManagerUtils.getSpecExecutorUri(dagNode);
// Run this spec on selected executor
- SpecProducer producer = null;
+ SpecProducer<Spec> producer;
try {
checkQuota(dagNode);
producer = DagManagerUtils.getSpecProducer(dagNode);
@@ -926,14 +929,14 @@ public class DagManager extends AbstractIdleService {
// The SpecProducer implementations submit the job to the underlying executor and return when the submission is complete,
// either successfully or unsuccessfully. To catch any exceptions in the job submission, the DagManagerThread
// blocks (by calling Future#get()) until the submission is completed.
- Future addSpecFuture = producer.addSpec(jobSpec);
+ Future<?> addSpecFuture = producer.addSpec(jobSpec);
dagNode.getValue().setJobFuture(Optional.of(addSpecFuture));
//Persist the dag
this.dagStateStore.writeCheckpoint(this.dags.get(DagManagerUtils.generateDagId(dagNode)));
if (this.metricContext != null) {
getRunningJobsCounter(dagNode).inc();
- getRunningJobsCounterForUser(dagNode).forEach(counter -> counter.inc());
+ getRunningJobsCounterForUser(dagNode).forEach(ContextAwareCounter::inc);
}
addSpecFuture.get();
@@ -960,49 +963,77 @@ public class DagManager extends AbstractIdleService {
String proxyUser = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), AzkabanProjectConfig.USER_TO_PROXY, null);
String specExecutorUri = DagManagerUtils.getSpecExecutorUri(dagNode);
boolean proxyUserCheck = true;
+ int proxyQuotaIncrement;
+ Set<String> usersQuotaIncrement = new HashSet<>(); // holds the users for which quota is increased
+ StringBuilder requesterMessage = new StringBuilder();
+
if (proxyUser != null) {
- proxyUserCheck = incrementMapAndCheckQuota(proxyUserToJobCount, proxyUser, dagNode);
+ proxyQuotaIncrement = incrementJobCountAndCheckUserQuota(proxyUserToJobCount, proxyUser, dagNode);
+ proxyUserCheck = proxyQuotaIncrement < 0; // proxy user quota check failed
+ if (!proxyUserCheck) {
+ requesterMessage.append(String.format(
+ "Quota exceeded for proxy user %s on executor %s : quota=%s, runningJobs=%d%n",
+ proxyUser, specExecutorUri, getQuotaForUser(proxyUser), Math.abs(proxyQuotaIncrement)));
+ }
}
String serializedRequesters = DagManagerUtils.getSerializedRequesterList(dagNode);
boolean requesterCheck = true;
- String requesterMessage = null;
+
if (serializedRequesters != null) {
- for (ServiceRequester requester : RequesterService.deserialize(serializedRequesters)) {
- requesterCheck &= incrementMapAndCheckQuota(requesterToJobCount, requester.getName(), dagNode);
- if (!requesterCheck && requesterMessage == null) {
- requesterMessage = "Quota exceeded for requester " + requester.getName() + " on executor " + specExecutorUri + ": quota="
- + getQuotaForUser(requester.getName()) + ", runningJobs=" + requesterToJobCount.get(DagManagerUtils.getUserQuotaKey(requester.getName(), dagNode));
+ List<String> uniqueRequesters = RequesterService.deserialize(serializedRequesters).stream()
+ .map(ServiceRequester::getName).distinct().collect(Collectors.toList());
+ for (String requester : uniqueRequesters) {
+ int userQuotaIncrement = incrementJobCountAndCheckUserQuota(requesterToJobCount, requester, dagNode);
+ boolean thisRequesterCheck = userQuotaIncrement < 0; // user quota check failed
+ usersQuotaIncrement.add(requester);
+ requesterCheck = requesterCheck && thisRequesterCheck;
+ if (!thisRequesterCheck) {
+ requesterMessage.append(String.format(
+ "Quota exceeded for requester %s on executor %s : quota=%s, runningJobs=%d%n",
+ requester, specExecutorUri, getQuotaForUser(requester), Math.abs(userQuotaIncrement)));
}
}
}
// Throw errors for reach quota at the end to avoid inconsistent job counts
- if (!proxyUserCheck) {
- throw new IOException("Quota exceeded for proxy user " + proxyUser + " on executor " + specExecutorUri +
- ": quota=" + getQuotaForUser(proxyUser) + ", runningJobs=" + proxyUserToJobCount.get(DagManagerUtils.getUserQuotaKey(proxyUser, dagNode)));
- }
-
- if (!requesterCheck) {
- throw new IOException(requesterMessage);
+ if (!proxyUserCheck || !requesterCheck) {
+ // roll back the increased counts in this block
+ decrementQuotaUsage(proxyUserToJobCount, proxyUser);
+ decrementQuotaUsageForUsers(usersQuotaIncrement);
+ throw new IOException(requesterMessage.toString());
}
}
/**
* Increment quota by one for the given map and key.
- * @return true if quota is not reached for this user or user is whitelisted, false otherwise.
+ * @return a negative number if quota is already reached for this user
+ * a positive number if the quota is not reached for this user
+ * the absolute value of the number is the used quota before this increment request
+ * 0 if quota usage is not changed
*/
- private boolean incrementMapAndCheckQuota(Map<String, Integer> quotaMap, String user, DagNode<JobExecutionPlan> dagNode) {
+ private int incrementJobCountAndCheckUserQuota(Map<String, Integer> quotaMap, String user, DagNode<JobExecutionPlan> dagNode) {
String key = DagManagerUtils.getUserQuotaKey(user, dagNode);
- int jobCount = quotaMap.getOrDefault(key, 0);
// Only increment job count for first attempt, since job is considered running between retries
- if (dagNode.getValue().getCurrentAttempts() == 1) {
- jobCount++;
- quotaMap.put(key, jobCount);
+ if (dagNode.getValue().getCurrentAttempts() != 1) {
+ return 0;
+ }
+
+ Integer currentCount;
+ do {
+ currentCount = quotaMap.get(key);
+ } while (currentCount == null ? quotaMap.putIfAbsent(key, 1) != null : !quotaMap.replace(key, currentCount, currentCount + 1));
+
+ if (currentCount == null) {
+ currentCount = 0;
}
- return jobCount <= getQuotaForUser(user);
+ if (currentCount >= getQuotaForUser(user)) {
+ return -currentCount; // increment must have crossed the quota
+ } else {
+ return currentCount;
+ }
}
private int getQuotaForUser(String user) {
@@ -1028,7 +1059,7 @@ public class DagManager extends AbstractIdleService {
if (this.metricContext != null) {
getRunningJobsCounter(dagNode).dec();
- getRunningJobsCounterForUser(dagNode).forEach(counter -> counter.dec());
+ getRunningJobsCounterForUser(dagNode).forEach(ContextAwareCounter::dec);
}
switch (jobStatus) {
@@ -1063,9 +1094,7 @@ public class DagManager extends AbstractIdleService {
String proxyUser = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), AzkabanProjectConfig.USER_TO_PROXY, null);
if (proxyUser != null) {
String proxyUserKey = DagManagerUtils.getUserQuotaKey(proxyUser, dagNode);
- if (proxyUserToJobCount.containsKey(proxyUserKey) && proxyUserToJobCount.get(proxyUserKey) > 0) {
- proxyUserToJobCount.put(proxyUserKey, proxyUserToJobCount.get(proxyUserKey) - 1);
- }
+ decrementQuotaUsage(proxyUserToJobCount, proxyUserKey);
}
String serializedRequesters = DagManagerUtils.getSerializedRequesterList(dagNode);
@@ -1073,9 +1102,7 @@ public class DagManager extends AbstractIdleService {
try {
for (ServiceRequester requester : RequesterService.deserialize(serializedRequesters)) {
String requesterKey = DagManagerUtils.getUserQuotaKey(requester.getName(), dagNode);
- if (requesterToJobCount.containsKey(requesterKey) && requesterToJobCount.get(requesterKey) > 0) {
- requesterToJobCount.put(requesterKey, requesterToJobCount.get(requesterKey) - 1);
- }
+ decrementQuotaUsage(requesterToJobCount, requesterKey);
}
} catch (IOException e) {
log.error("Failed to release quota for requester list " + serializedRequesters, e);
@@ -1083,6 +1110,19 @@ public class DagManager extends AbstractIdleService {
}
}
+ private void decrementQuotaUsage(Map<String, Integer> quotaMap, String user) {
+ Integer currentCount;
+ do {
+ currentCount = quotaMap.get(user);
+ } while (currentCount != null && currentCount > 0 && !quotaMap.replace(user, currentCount, currentCount - 1));
+ }
+
+ private void decrementQuotaUsageForUsers(Set<String> requestersToDecreaseCount) {
+ for (String requester : requestersToDecreaseCount) {
+ decrementQuotaUsage(DagManagerThread.requesterToJobCount, requester);
+ }
+ }
+
private void deleteJobState(String dagId, DagNode<JobExecutionPlan> dagNode) {
this.jobToDag.remove(dagNode);
this.dagToJobs.get(dagId).remove(dagNode);
@@ -1151,7 +1191,7 @@ public class DagManager extends AbstractIdleService {
/**
* Perform clean up. Remove a dag from the dagstore if the dag is complete and update internal state.
*/
- private void cleanUp() throws IOException {
+ private void cleanUp() {
List<String> dagIdstoClean = new ArrayList<>();
//Clean up failed dags
for (String dagId : this.failedDagIdsFinishRunning) {
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index e2bd0d1..565c021 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -33,6 +33,7 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecProducer;
import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.FlowId;
@@ -154,7 +155,7 @@ public class DagManagerUtils {
return dagNode.getValue().getJobSpec().getConfig();
}
- static SpecProducer getSpecProducer(DagNode<JobExecutionPlan> dagNode)
+ static SpecProducer<Spec> getSpecProducer(DagNode<JobExecutionPlan> dagNode)
throws ExecutionException, InterruptedException {
return dagNode.getValue().getSpecExecutor().getProducer().get();
}