You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2021/11/10 23:09:28 UTC

[gobblin] branch master updated: [GOBBLIN-1564] codestyle changes, typo corrections, improved javadoc and fix a sync… (#3415)

This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 5bc22a55 [GOBBLIN-1564] codestyle changes, typo corrections, improved javadoc  and fix a sync… (#3415)
5bc22a55 is described below

commit 5bc22a5502ddeb810a35b0fb996dd9dfc8c81121
Author: Arjun Singh Bora <ab...@linkedin.com>
AuthorDate: Wed Nov 10 15:09:20 2021 -0800

    [GOBBLIN-1564] codestyle changes, typo corrections, improved javadoc  and fix a sync… (#3415)
    
    * codestyle changes, typo corrections, improved javadoc  and fix a synchronization issue
    
    * address review comments
    
    * add review comments
    
    * address review comments
    
    * address review comments
    
    * fix bugsFixMain
---
 .../service/modules/orchestration/DagManager.java  | 170 +++++++++++++--------
 .../modules/orchestration/DagManagerUtils.java     |   3 +-
 2 files changed, 107 insertions(+), 66 deletions(-)

diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index eb7b19f..4b304e5 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -38,12 +38,14 @@ import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 import org.apache.commons.lang3.StringUtils;
 
 import com.codahale.metrics.MetricRegistry;
 import com.codahale.metrics.Timer;
 import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.eventbus.Subscribe;
@@ -111,7 +113,6 @@ import static org.apache.gobblin.service.ExecutionStatus.*;
  * The {@link DagManager} is active only in the leader mode. To ensure, each {@link Dag} managed by a {@link DagManager} is
  * checkpointed to a persistent location. On start up or leadership change,
  * the {@link DagManager} loads all the checkpointed {@link Dag}s and adds them to the {@link  BlockingQueue}.
- * Current implementation supports only FileSystem-based checkpointing of the Dag statuses.
  */
 @Alpha
 @Slf4j
@@ -121,7 +122,6 @@ public class DagManager extends AbstractIdleService {
 
   public static final String DAG_MANAGER_PREFIX = "gobblin.service.dagManager.";
 
-  private static final String JOB_STATUS_RETRIEVER_KEY = DAG_MANAGER_PREFIX + "jobStatusRetriever";
   private static final Integer DEFAULT_JOB_STATUS_POLLING_INTERVAL = 10;
   public static final Integer DEFAULT_NUM_THREADS = 3;
   private static final Integer TERMINATION_TIMEOUT = 30;
@@ -166,15 +166,14 @@ public class DagManager extends AbstractIdleService {
     }
   }
 
-  private BlockingQueue<Dag<JobExecutionPlan>>[] queue;
-  private BlockingQueue<String>[] cancelQueue;
-  private BlockingQueue<String>[] resumeQueue;
+  private final BlockingQueue<Dag<JobExecutionPlan>>[] runQueue;
+  private final BlockingQueue<String>[] cancelQueue;
+  private final BlockingQueue<String>[] resumeQueue;
   DagManagerThread[] dagManagerThreads;
 
-  private ScheduledExecutorService scheduledExecutorPool;
-  private boolean instrumentationEnabled;
+  private final ScheduledExecutorService scheduledExecutorPool;
+  private final boolean instrumentationEnabled;
   private DagStateStore dagStateStore;
-  private DagStateStore failedDagStateStore;
   private Map<URI, TopologySpec> topologySpecMap;
 
   @Getter
@@ -194,9 +193,9 @@ public class DagManager extends AbstractIdleService {
   public DagManager(Config config, JobStatusRetriever jobStatusRetriever, boolean instrumentationEnabled) {
     this.config = config;
     this.numThreads = ConfigUtils.getInt(config, NUM_THREADS_KEY, DEFAULT_NUM_THREADS);
-    this.queue = initializeDagQueue(this.numThreads);
-    this.cancelQueue = initializeDagQueue(this.numThreads);
-    this.resumeQueue = initializeDagQueue(this.numThreads);
+    this.runQueue = (BlockingQueue<Dag<JobExecutionPlan>>[]) initializeDagQueue(this.numThreads);
+    this.cancelQueue = (BlockingQueue<String>[]) initializeDagQueue(this.numThreads);
+    this.resumeQueue = (BlockingQueue<String>[]) initializeDagQueue(this.numThreads);
     this.scheduledExecutorPool = Executors.newScheduledThreadPool(numThreads);
     this.pollingInterval = ConfigUtils.getInt(config, JOB_STATUS_POLLING_INTERVAL_KEY, DEFAULT_JOB_STATUS_POLLING_INTERVAL);
     this.retentionPollingInterval = ConfigUtils.getInt(config, FAILED_DAG_POLLING_INTERVAL, DEFAULT_FAILED_DAG_POLLING_INTERVAL);
@@ -209,10 +208,12 @@ public class DagManager extends AbstractIdleService {
     }
 
     this.defaultQuota = ConfigUtils.getInt(config, USER_JOB_QUOTA_KEY, DEFAULT_USER_JOB_QUOTA);
-    this.perUserQuota = new HashMap<>();
+
+    ImmutableMap.Builder<String, Integer> mapBuilder = ImmutableMap.builder();
     for (String userQuota : ConfigUtils.getStringList(config, PER_USER_QUOTA)) {
-      this.perUserQuota.put(userQuota.split(QUOTA_SEPERATOR)[0], Integer.parseInt(userQuota.split(QUOTA_SEPERATOR)[1]));
+      mapBuilder.put(userQuota.split(QUOTA_SEPERATOR)[0], Integer.parseInt(userQuota.split(QUOTA_SEPERATOR)[1]));
     }
+    this.perUserQuota = mapBuilder.build();
 
     this.jobStatusRetriever = jobStatusRetriever;
 
@@ -222,7 +223,7 @@ public class DagManager extends AbstractIdleService {
 
   DagStateStore createDagStateStore(Config config, Map<URI, TopologySpec> topologySpecMap) {
     try {
-      Class dagStateStoreClass = Class.forName(ConfigUtils.getString(config, DAG_STATESTORE_CLASS_KEY, FSDagStateStore.class.getName()));
+      Class<?> dagStateStoreClass = Class.forName(ConfigUtils.getString(config, DAG_STATESTORE_CLASS_KEY, FSDagStateStore.class.getName()));
       return (DagStateStore) GobblinConstructorUtils.invokeLongestConstructor(dagStateStoreClass, config, topologySpecMap);
     } catch (ReflectiveOperationException e) {
       throw new RuntimeException(e);
@@ -230,8 +231,8 @@ public class DagManager extends AbstractIdleService {
   }
 
   // Initializes and returns an array of Queue of size numThreads
-  private static LinkedBlockingDeque[] initializeDagQueue(int numThreads) {
-    LinkedBlockingDeque[] queue = new LinkedBlockingDeque[numThreads];
+  private static LinkedBlockingDeque<?>[] initializeDagQueue(int numThreads) {
+    LinkedBlockingDeque<?>[] queue = new LinkedBlockingDeque[numThreads];
 
     for (int i=0; i< numThreads; i++) {
       queue[i] = new LinkedBlockingDeque<>();
@@ -244,8 +245,8 @@ public class DagManager extends AbstractIdleService {
     this(config, jobStatusRetriever, true);
   }
 
-  /** Start the service. On startup, the service launches a fixed pool of {@link DagManagerThread}s, which are scheduled at
-   * fixed intervals. The service also loads any {@link Dag}s
+  /** Do Nothing on service startup. Scheduling of {@link DagManagerThread}s and loading of any {@link Dag}s is done
+   * during leadership change.
    */
   @Override
   protected void startUp() {
@@ -270,7 +271,7 @@ public class DagManager extends AbstractIdleService {
     // Flow cancellation request has to be forwarded to the same DagManagerThread where the
     // flow create request was forwarded. This is because Azkaban Exec Id is stored in the DagNode of the
     // specific DagManagerThread queue
-    if (!this.queue[queueId].offer(dag)) {
+    if (!this.runQueue[queueId].offer(dag)) {
       throw new IOException("Could not add dag" + DagManagerUtils.generateDagId(dag) + "to queue");
     }
     if (setStatus) {
@@ -290,7 +291,7 @@ public class DagManager extends AbstractIdleService {
   }
 
   /**
-   * Method to submit a {@link URI} for cancellation requsts to the {@link DagManager}.
+   * Method to submit a {@link URI} for cancellation requests to the {@link DagManager}.
    * The {@link DagManager} adds the dag to the {@link BlockingQueue} to be picked up by one of the {@link DagManagerThread}s.
    */
   synchronized public void stopDag(URI uri) throws IOException {
@@ -358,8 +359,10 @@ public class DagManager extends AbstractIdleService {
         log.info("Scheduling {} DagManager threads", numThreads);
         //Initializing state store for persisting Dags.
         this.dagStateStore = createDagStateStore(config, topologySpecMap);
-        this.failedDagStateStore = createDagStateStore(ConfigUtils.getConfigOrEmpty(config, FAILED_DAG_STATESTORE_PREFIX).withFallback(config), topologySpecMap);
-        Set<String> failedDagIds = Collections.synchronizedSet(this.failedDagStateStore.getDagIds());
+        DagStateStore failedDagStateStore =
+            createDagStateStore(ConfigUtils.getConfigOrEmpty(config, FAILED_DAG_STATESTORE_PREFIX).withFallback(config),
+                topologySpecMap);
+        Set<String> failedDagIds = Collections.synchronizedSet(failedDagStateStore.getDagIds());
 
         ContextAwareMeter allSuccessfulMeter = null;
         ContextAwareMeter allFailedMeter = null;
@@ -375,7 +378,7 @@ public class DagManager extends AbstractIdleService {
         this.dagManagerThreads = new DagManagerThread[numThreads];
         for (int i = 0; i < numThreads; i++) {
           DagManagerThread dagManagerThread = new DagManagerThread(jobStatusRetriever, dagStateStore, failedDagStateStore,
-              queue[i], cancelQueue[i], resumeQueue[i], instrumentationEnabled, defaultQuota, perUserQuota, failedDagIds,
+              runQueue[i], cancelQueue[i], resumeQueue[i], instrumentationEnabled, defaultQuota, perUserQuota, failedDagIds,
               allSuccessfulMeter, allFailedMeter);
           this.dagManagerThreads[i] = dagManagerThread;
           this.scheduledExecutorPool.scheduleAtFixedRate(dagManagerThread, 0, this.pollingInterval, TimeUnit.SECONDS);
@@ -415,7 +418,7 @@ public class DagManager extends AbstractIdleService {
     private static final Map<String, Integer> proxyUserToJobCount = new ConcurrentHashMap<>();
     private static final Map<String, Integer> requesterToJobCount = new ConcurrentHashMap<>();
     private final Map<String, Dag<JobExecutionPlan>> dags = new HashMap<>();
-    private Set<String> failedDagIds;
+    private final Set<String> failedDagIds;
     private final Map<String, Dag<JobExecutionPlan>> resumingDags = new HashMap<>();
     // dagToJobs holds a map of dagId to running jobs of that dag
     final Map<String, LinkedList<DagNode<JobExecutionPlan>>> dagToJobs = new HashMap<>();
@@ -434,12 +437,12 @@ public class DagManager extends AbstractIdleService {
     private static final Map<String, ContextAwareMeter> groupSuccessfulMeters = Maps.newConcurrentMap();
     private static final Map<String, ContextAwareMeter> groupFailureMeters = Maps.newConcurrentMap();
 
-    private JobStatusRetriever jobStatusRetriever;
-    private DagStateStore dagStateStore;
-    private DagStateStore failedDagStateStore;
-    private BlockingQueue<Dag<JobExecutionPlan>> queue;
-    private BlockingQueue<String> cancelQueue;
-    private BlockingQueue<String> resumeQueue;
+    private final JobStatusRetriever jobStatusRetriever;
+    private final DagStateStore dagStateStore;
+    private final DagStateStore failedDagStateStore;
+    private final BlockingQueue<Dag<JobExecutionPlan>> queue;
+    private final BlockingQueue<String> cancelQueue;
+    private final BlockingQueue<String> resumeQueue;
 
     /**
      * Constructor.
@@ -464,7 +467,7 @@ public class DagManager extends AbstractIdleService {
         this.eventSubmitter = Optional.of(new EventSubmitter.Builder(this.metricContext, "org.apache.gobblin.service").build());
         this.jobStatusPolledTimer = Optional.of(this.metricContext.timer(ServiceMetricNames.JOB_STATUS_POLLED_TIMER));
         ContextAwareGauge<Long> orchestrationDelayMetric = metricContext.newContextAwareGauge(ServiceMetricNames.FLOW_ORCHESTRATION_DELAY,
-            () -> orchestrationDelay.get());
+            orchestrationDelay::get);
         this.metricContext.register(orchestrationDelayMetric);
       } else {
         this.metricContext = null;
@@ -492,7 +495,7 @@ public class DagManager extends AbstractIdleService {
           //Poll the queue for a new Dag to execute.
           if (dag != null) {
             if (dag.isEmpty()) {
-              log.info("Empty dag; ignoring the dag");
+              log.warn("Empty dag; ignoring the dag");
             }
             //Initialize dag.
             initialize(dag);
@@ -671,7 +674,7 @@ public class DagManager extends AbstractIdleService {
       log.debug("Dag {} submitting jobs ready for execution.", DagManagerUtils.getFullyQualifiedDagName(dag));
       //Determine the next set of jobs to run and submit them for execution
       Map<String, Set<DagNode<JobExecutionPlan>>> nextSubmitted = submitNext(dagId);
-      for (DagNode dagNode: nextSubmitted.get(dagId)) {
+      for (DagNode<JobExecutionPlan> dagNode: nextSubmitted.get(dagId)) {
         addJobState(dagId, dagNode);
       }
 
@@ -915,7 +918,7 @@ public class DagManager extends AbstractIdleService {
       String specExecutorUri = DagManagerUtils.getSpecExecutorUri(dagNode);
 
       // Run this spec on selected executor
-      SpecProducer producer = null;
+      SpecProducer<Spec> producer;
       try {
         checkQuota(dagNode);
         producer = DagManagerUtils.getSpecProducer(dagNode);
@@ -926,14 +929,14 @@ public class DagManager extends AbstractIdleService {
         // The SpecProducer implementations submit the job to the underlying executor and return when the submission is complete,
         // either successfully or unsuccessfully. To catch any exceptions in the job submission, the DagManagerThread
         // blocks (by calling Future#get()) until the submission is completed.
-        Future addSpecFuture = producer.addSpec(jobSpec);
+        Future<?> addSpecFuture = producer.addSpec(jobSpec);
         dagNode.getValue().setJobFuture(Optional.of(addSpecFuture));
         //Persist the dag
         this.dagStateStore.writeCheckpoint(this.dags.get(DagManagerUtils.generateDagId(dagNode)));
 
         if (this.metricContext != null) {
           getRunningJobsCounter(dagNode).inc();
-          getRunningJobsCounterForUser(dagNode).forEach(counter -> counter.inc());
+          getRunningJobsCounterForUser(dagNode).forEach(ContextAwareCounter::inc);
         }
 
         addSpecFuture.get();
@@ -960,49 +963,77 @@ public class DagManager extends AbstractIdleService {
       String proxyUser = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), AzkabanProjectConfig.USER_TO_PROXY, null);
       String specExecutorUri = DagManagerUtils.getSpecExecutorUri(dagNode);
       boolean proxyUserCheck = true;
+      int proxyQuotaIncrement;
+      Set<String> usersQuotaIncrement = new HashSet<>(); // holds the users for which quota is increased
+      StringBuilder requesterMessage = new StringBuilder();
+
       if (proxyUser != null) {
-        proxyUserCheck = incrementMapAndCheckQuota(proxyUserToJobCount, proxyUser, dagNode);
+        proxyQuotaIncrement = incrementJobCountAndCheckUserQuota(proxyUserToJobCount, proxyUser, dagNode);
+        proxyUserCheck = proxyQuotaIncrement < 0;  // proxy user quota check failed
+        if (!proxyUserCheck) {
+          requesterMessage.append(String.format(
+              "Quota exceeded for proxy user %s on executor %s : quota=%s, runningJobs=%d%n",
+              proxyUser, specExecutorUri, getQuotaForUser(proxyUser), Math.abs(proxyQuotaIncrement)));
+        }
       }
 
       String serializedRequesters = DagManagerUtils.getSerializedRequesterList(dagNode);
       boolean requesterCheck = true;
-      String requesterMessage = null;
+
       if (serializedRequesters != null) {
-        for (ServiceRequester requester : RequesterService.deserialize(serializedRequesters)) {
-          requesterCheck &= incrementMapAndCheckQuota(requesterToJobCount, requester.getName(), dagNode);
-          if (!requesterCheck && requesterMessage == null) {
-            requesterMessage = "Quota exceeded for requester " + requester.getName() + " on executor " + specExecutorUri + ": quota="
-                + getQuotaForUser(requester.getName()) + ", runningJobs=" + requesterToJobCount.get(DagManagerUtils.getUserQuotaKey(requester.getName(), dagNode));
+        List<String> uniqueRequesters = RequesterService.deserialize(serializedRequesters).stream()
+            .map(ServiceRequester::getName).distinct().collect(Collectors.toList());
+        for (String requester : uniqueRequesters) {
+          int userQuotaIncrement = incrementJobCountAndCheckUserQuota(requesterToJobCount, requester, dagNode);
+          boolean thisRequesterCheck = userQuotaIncrement < 0;  // user quota check failed
+          usersQuotaIncrement.add(requester);
+          requesterCheck = requesterCheck && thisRequesterCheck;
+          if (!thisRequesterCheck) {
+            requesterMessage.append(String.format(
+                "Quota exceeded for requester %s on executor %s : quota=%s, runningJobs=%d%n",
+                requester, specExecutorUri, getQuotaForUser(requester), Math.abs(userQuotaIncrement)));
           }
         }
       }
 
       // Throw errors for reach quota at the end to avoid inconsistent job counts
-      if (!proxyUserCheck) {
-        throw new IOException("Quota exceeded for proxy user " + proxyUser + " on executor " + specExecutorUri +
-            ": quota=" + getQuotaForUser(proxyUser) + ", runningJobs=" + proxyUserToJobCount.get(DagManagerUtils.getUserQuotaKey(proxyUser, dagNode)));
-      }
-
-      if (!requesterCheck) {
-        throw new IOException(requesterMessage);
+      if (!proxyUserCheck || !requesterCheck) {
+        // roll back the increased counts in this block
+        decrementQuotaUsage(proxyUserToJobCount, proxyUser);
+        decrementQuotaUsageForUsers(usersQuotaIncrement);
+        throw new IOException(requesterMessage.toString());
       }
     }
 
     /**
      * Increment quota by one for the given map and key.
-     * @return true if quota is not reached for this user or user is whitelisted, false otherwise.
+     * @return a negative number if quota is already reached for this user
+     *         a positive number if the quota is not reached for this user
+     *         the absolute value of the number is the used quota before this increment request
+     *         0 if quota usage is not changed
      */
-    private boolean incrementMapAndCheckQuota(Map<String, Integer> quotaMap, String user, DagNode<JobExecutionPlan> dagNode) {
+    private int incrementJobCountAndCheckUserQuota(Map<String, Integer> quotaMap, String user, DagNode<JobExecutionPlan> dagNode) {
       String key = DagManagerUtils.getUserQuotaKey(user, dagNode);
-      int jobCount = quotaMap.getOrDefault(key, 0);
 
       // Only increment job count for first attempt, since job is considered running between retries
-      if (dagNode.getValue().getCurrentAttempts() == 1) {
-        jobCount++;
-        quotaMap.put(key, jobCount);
+      if (dagNode.getValue().getCurrentAttempts() != 1) {
+        return 0;
+      }
+
+      Integer currentCount;
+      do {
+        currentCount = quotaMap.get(key);
+      } while (currentCount == null ? quotaMap.putIfAbsent(key, 1) != null : !quotaMap.replace(key, currentCount, currentCount + 1));
+
+      if (currentCount == null) {
+        currentCount = 0;
       }
 
-      return jobCount <= getQuotaForUser(user);
+      if (currentCount >= getQuotaForUser(user)) {
+        return -currentCount; // increment must have crossed the quota
+      } else {
+        return currentCount;
+      }
     }
 
     private int getQuotaForUser(String user) {
@@ -1028,7 +1059,7 @@ public class DagManager extends AbstractIdleService {
 
       if (this.metricContext != null) {
         getRunningJobsCounter(dagNode).dec();
-        getRunningJobsCounterForUser(dagNode).forEach(counter -> counter.dec());
+        getRunningJobsCounterForUser(dagNode).forEach(ContextAwareCounter::dec);
       }
 
       switch (jobStatus) {
@@ -1063,9 +1094,7 @@ public class DagManager extends AbstractIdleService {
       String proxyUser = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), AzkabanProjectConfig.USER_TO_PROXY, null);
       if (proxyUser != null) {
         String proxyUserKey = DagManagerUtils.getUserQuotaKey(proxyUser, dagNode);
-        if (proxyUserToJobCount.containsKey(proxyUserKey) && proxyUserToJobCount.get(proxyUserKey) > 0) {
-          proxyUserToJobCount.put(proxyUserKey, proxyUserToJobCount.get(proxyUserKey) - 1);
-        }
+        decrementQuotaUsage(proxyUserToJobCount, proxyUserKey);
       }
 
       String serializedRequesters = DagManagerUtils.getSerializedRequesterList(dagNode);
@@ -1073,9 +1102,7 @@ public class DagManager extends AbstractIdleService {
         try {
           for (ServiceRequester requester : RequesterService.deserialize(serializedRequesters)) {
             String requesterKey = DagManagerUtils.getUserQuotaKey(requester.getName(), dagNode);
-            if (requesterToJobCount.containsKey(requesterKey) && requesterToJobCount.get(requesterKey) > 0) {
-              requesterToJobCount.put(requesterKey, requesterToJobCount.get(requesterKey) - 1);
-            }
+            decrementQuotaUsage(requesterToJobCount, requesterKey);
           }
         } catch (IOException e) {
           log.error("Failed to release quota for requester list " + serializedRequesters, e);
@@ -1083,6 +1110,19 @@ public class DagManager extends AbstractIdleService {
       }
     }
 
+    private void decrementQuotaUsage(Map<String, Integer> quotaMap, String user) {
+      Integer currentCount;
+      do {
+        currentCount = quotaMap.get(user);
+      } while (currentCount != null && currentCount > 0 && !quotaMap.replace(user, currentCount, currentCount - 1));
+    }
+
+    private void decrementQuotaUsageForUsers(Set<String> requestersToDecreaseCount) {
+      for (String requester : requestersToDecreaseCount) {
+       decrementQuotaUsage(DagManagerThread.requesterToJobCount, requester);
+      }
+    }
+
     private void deleteJobState(String dagId, DagNode<JobExecutionPlan> dagNode) {
       this.jobToDag.remove(dagNode);
       this.dagToJobs.get(dagId).remove(dagNode);
@@ -1151,7 +1191,7 @@ public class DagManager extends AbstractIdleService {
     /**
      * Perform clean up. Remove a dag from the dagstore if the dag is complete and update internal state.
      */
-    private void cleanUp() throws IOException {
+    private void cleanUp() {
       List<String> dagIdstoClean = new ArrayList<>();
       //Clean up failed dags
       for (String dagId : this.failedDagIdsFinishRunning) {
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index e2bd0d1..565c021 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -33,6 +33,7 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.metrics.event.TimingEvent;
 import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.runtime.api.SpecProducer;
 import org.apache.gobblin.service.ExecutionStatus;
 import org.apache.gobblin.service.FlowId;
@@ -154,7 +155,7 @@ public class DagManagerUtils {
     return dagNode.getValue().getJobSpec().getConfig();
   }
 
-  static SpecProducer getSpecProducer(DagNode<JobExecutionPlan> dagNode)
+  static SpecProducer<Spec> getSpecProducer(DagNode<JobExecutionPlan> dagNode)
       throws ExecutionException, InterruptedException {
     return dagNode.getValue().getSpecExecutor().getProducer().get();
   }