You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by wl...@apache.org on 2023/12/15 23:57:32 UTC

(gobblin) branch master updated: [GOBBLIN-1968] Temporal commit step integration (#3829)

This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 18fba9e5c [GOBBLIN-1968] Temporal commit step integration (#3829)
18fba9e5c is described below

commit 18fba9e5c3d70f21a7b1dbe2b65ee83c23bfa4e6
Author: William Lo <lo...@gmail.com>
AuthorDate: Fri Dec 15 18:57:27 2023 -0500

    [GOBBLIN-1968] Temporal commit step integration (#3829)
    
    Add commit step to Gobblin temporal workflow for job publish
---
 .../org/apache/gobblin/runtime/JobContext.java     |   2 +-
 .../apache/gobblin/runtime/SafeDatasetCommit.java  |   6 +-
 .../gobblin/runtime/TaskStateCollectorService.java | 108 +++++++------
 .../temporal/ddm/activity/CommitActivity.java      |  36 +++++
 .../ddm/activity/impl/CommitActivityImpl.java      | 175 +++++++++++++++++++++
 .../ddm/launcher/ProcessWorkUnitsJobLauncher.java  |   2 +-
 .../temporal/ddm/worker/WorkFulfillmentWorker.java |   6 +-
 .../temporal/ddm/workflow/CommitStepWorkflow.java  |  37 +++++
 .../ddm/workflow/impl/CommitStepWorkflowImpl.java  |  53 +++++++
 .../impl/ProcessWorkUnitsWorkflowImpl.java         |  26 ++-
 10 files changed, 396 insertions(+), 55 deletions(-)

diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
index 89d1dc41b..c2bdc592d 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
@@ -543,7 +543,7 @@ public class JobContext implements Closeable {
    * Data should be committed by the job if either {@link ConfigurationKeys#JOB_COMMIT_POLICY_KEY} is set to "full",
    * or {@link ConfigurationKeys#PUBLISH_DATA_AT_JOB_LEVEL} is set to true.
    */
-  private static boolean shouldCommitDataInJob(State state) {
+  public static boolean shouldCommitDataInJob(State state) {
     boolean jobCommitPolicyIsFull =
         JobCommitPolicy.getCommitPolicy(state.getProperties()) == JobCommitPolicy.COMMIT_ON_FULL_SUCCESS;
     boolean publishDataAtJobLevel = state.getPropAsBoolean(ConfigurationKeys.PUBLISH_DATA_AT_JOB_LEVEL,
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
index a2885c168..e07083ff1 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
@@ -63,7 +63,7 @@ import org.apache.gobblin.source.extractor.JobCommitPolicy;
  */
 @RequiredArgsConstructor
 @Slf4j
-final class SafeDatasetCommit implements Callable<Void> {
+public final class SafeDatasetCommit implements Callable<Void> {
 
   private static final Object GLOBAL_LOCK = new Object();
 
@@ -319,8 +319,8 @@ final class SafeDatasetCommit implements Callable<Void> {
         datasetState.setState(JobState.RunningState.FAILED);
         datasetState.incrementJobFailures();
         Optional<String> taskStateException = taskState.getTaskFailureException();
-        log.warn("At least one task did not get committed successfully. Setting dataset state to FAILED. "
-            + (taskStateException.isPresent() ? taskStateException.get() : "Exception not set."));
+        log.warn("Failed task state for {} At least one task did not get committed successfully. Setting dataset state to FAILED. {}" ,
+            taskState.getWorkunit().getOutputFilePath(), taskStateException.isPresent() ? taskStateException.get() : "Exception not set.");
         return;
       }
     }
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java
index 431646385..b7eb436fa 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.Queue;
 import java.util.concurrent.Callable;
@@ -29,10 +30,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Optional;
 import com.google.common.base.Predicate;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Queues;
@@ -68,8 +66,6 @@ import org.apache.gobblin.util.ParallelRunner;
 @Slf4j
 public class TaskStateCollectorService extends AbstractScheduledService {
 
-  private static final Logger LOGGER = LoggerFactory.getLogger(TaskStateCollectorService.class);
-
   private final JobState jobState;
 
   private final EventBus eventBus;
@@ -145,7 +141,7 @@ public class TaskStateCollectorService extends AbstractScheduledService {
         throw new RuntimeException("Could not construct TaskCollectorHandler " + handlerTypeName, rfe);
       }
     } else {
-      optionalTaskCollectorHandler = Optional.absent();
+      optionalTaskCollectorHandler = Optional.empty();
     }
 
     isJobProceedOnCollectorServiceFailure =
@@ -166,13 +162,13 @@ public class TaskStateCollectorService extends AbstractScheduledService {
 
   @Override
   protected void startUp() throws Exception {
-    LOGGER.info("Starting the " + TaskStateCollectorService.class.getSimpleName());
+    log.info("Starting the " + TaskStateCollectorService.class.getSimpleName());
     super.startUp();
   }
 
   @Override
   protected void shutDown() throws Exception {
-    LOGGER.info("Stopping the " + TaskStateCollectorService.class.getSimpleName());
+    log.info("Stopping the " + TaskStateCollectorService.class.getSimpleName());
     try {
       runOneIteration();
     } finally {
@@ -193,42 +189,14 @@ public class TaskStateCollectorService extends AbstractScheduledService {
    * @throws IOException if it fails to collect the output {@link TaskState}s
    */
   private void collectOutputTaskStates() throws IOException {
-    List<String> taskStateNames = taskStateStore.getTableNames(outputTaskStateDir.getName(), new Predicate<String>() {
-      @Override
-      public boolean apply(String input) {
-        return input.endsWith(AbstractJobLauncher.TASK_STATE_STORE_TABLE_SUFFIX)
-        && !input.startsWith(FsStateStore.TMP_FILE_PREFIX);
-      }});
 
-    if (taskStateNames == null || taskStateNames.size() == 0) {
-      LOGGER.debug("No output task state files found in " + this.outputTaskStateDir);
+    final Optional<Queue<TaskState>> taskStateQueue = deserializeTaskStatesFromFolder(taskStateStore, outputTaskStateDir.getName(), this.stateSerDeRunnerThreads);
+    if (!taskStateQueue.isPresent()) {
       return;
     }
-
-    final Queue<TaskState> taskStateQueue = Queues.newConcurrentLinkedQueue();
-    try (ParallelRunner stateSerDeRunner = new ParallelRunner(this.stateSerDeRunnerThreads, null)) {
-      for (final String taskStateName : taskStateNames) {
-        LOGGER.debug("Found output task state file " + taskStateName);
-        // Deserialize the TaskState and delete the file
-        stateSerDeRunner.submitCallable(new Callable<Void>() {
-          @Override
-          public Void call() throws Exception {
-            TaskState taskState = taskStateStore.getAll(outputTaskStateDir.getName(), taskStateName).get(0);
-            taskStateQueue.add(taskState);
-            taskStateStore.delete(outputTaskStateDir.getName(), taskStateName);
-            return null;
-          }
-        }, "Deserialize state for " + taskStateName);
-      }
-    } catch (IOException ioe) {
-      LOGGER.warn("Could not read all task state files.");
-    }
-
-    LOGGER.info(String.format("Collected task state of %d completed tasks", taskStateQueue.size()));
-
     // Add the TaskStates of completed tasks to the JobState so when the control
     // returns to the launcher, it sees the TaskStates of all completed tasks.
-    for (TaskState taskState : taskStateQueue) {
+    for (TaskState taskState : taskStateQueue.get()) {
       consumeTaskIssues(taskState);
       taskState.setJobState(this.jobState);
       this.jobState.addTaskState(taskState);
@@ -241,14 +209,14 @@ public class TaskStateCollectorService extends AbstractScheduledService {
     // Finish any additional steps defined in handler on driver level.
     // Currently implemented handler for Hive registration only.
     if (optionalTaskCollectorHandler.isPresent()) {
-      LOGGER.info("Execute Pipelined TaskStateCollectorService Handler for " + taskStateQueue.size() + " tasks");
+      log.info("Execute Pipelined TaskStateCollectorService Handler for " + taskStateQueue.get().size() + " tasks");
 
       try {
-        optionalTaskCollectorHandler.get().handle(taskStateQueue);
+        optionalTaskCollectorHandler.get().handle(taskStateQueue.get());
       } catch (Throwable t) {
         if (isJobProceedOnCollectorServiceFailure) {
           log.error("Failed to commit dataset while job proceeds", t);
-          SafeDatasetCommit.setTaskFailureException(taskStateQueue, t);
+          SafeDatasetCommit.setTaskFailureException(taskStateQueue.get(), t);
         } else {
           throw new RuntimeException("Hive Registration as the TaskStateCollectorServiceHandler failed.", t);
         }
@@ -256,7 +224,53 @@ public class TaskStateCollectorService extends AbstractScheduledService {
     }
 
     // Notify the listeners for the completion of the tasks
-    this.eventBus.post(new NewTaskCompletionEvent(ImmutableList.copyOf(taskStateQueue)));
+    this.eventBus.post(new NewTaskCompletionEvent(ImmutableList.copyOf(taskStateQueue.get())));
+  }
+
+  /**
+   * Reads in a {@link StateStore} and deserializes all task states found in the provided table name
+   * Task State files are populated by the {@link GobblinMultiTaskAttempt} to record the output of remote concurrent tasks (e.g. MR mappers)
+   * @param taskStateStore
+   * @param taskStateTableName
+   * @param numDeserializerThreads
+   * @return Queue of TaskStates, optional if no task states are found in the provided state store
+   * @throws IOException
+   */
+  public static Optional<Queue<TaskState>> deserializeTaskStatesFromFolder(StateStore<TaskState> taskStateStore, String taskStateTableName,
+      int numDeserializerThreads) throws IOException {
+    List<String> taskStateNames = taskStateStore.getTableNames(taskStateTableName, new Predicate<String>() {
+      @Override
+      public boolean apply(String input) {
+        return input != null
+            && input.endsWith(AbstractJobLauncher.TASK_STATE_STORE_TABLE_SUFFIX)
+            && !input.startsWith(FsStateStore.TMP_FILE_PREFIX);
+      }});
+
+    if (taskStateNames == null || taskStateNames.isEmpty()) {
+      log.warn("No output task state files found in " + taskStateTableName);
+      return Optional.empty();
+    }
+
+    final Queue<TaskState> taskStateQueue = Queues.newConcurrentLinkedQueue();
+    try (ParallelRunner stateSerDeRunner = new ParallelRunner(numDeserializerThreads, null)) {
+      for (final String taskStateName : taskStateNames) {
+        log.debug("Found output task state file " + taskStateName);
+        // Deserialize the TaskState and delete the file
+        stateSerDeRunner.submitCallable(new Callable<Void>() {
+          @Override
+          public Void call() throws Exception {
+            TaskState taskState = taskStateStore.getAll(taskStateTableName, taskStateName).get(0);
+            taskStateQueue.add(taskState);
+            taskStateStore.delete(taskStateTableName, taskStateName);
+            return null;
+          }
+        }, "Deserialize state for " + taskStateName);
+      }
+    } catch (IOException ioe) {
+      log.error("Could not read all task state files due to", ioe);
+    }
+    log.info(String.format("Collected task state of %d completed tasks in %s", taskStateQueue.size(), taskStateTableName));
+    return Optional.of(taskStateQueue);
   }
 
   /**
@@ -267,7 +281,7 @@ public class TaskStateCollectorService extends AbstractScheduledService {
   private void reportJobProgress(TaskState taskState) {
     String stringSize = taskState.getProp(ServiceConfigKeys.WORK_UNIT_SIZE);
     if (stringSize == null) {
-      LOGGER.warn("Expected to report job progress but work unit byte size property null");
+      log.warn("Expected to report job progress but work unit byte size property null");
       return;
     }
 
@@ -275,7 +289,7 @@ public class TaskStateCollectorService extends AbstractScheduledService {
 
     // If progress reporting is enabled, value should be present
     if (!this.jobState.contains(ServiceConfigKeys.TOTAL_WORK_UNIT_SIZE)) {
-      LOGGER.warn("Expected to report job progress but total bytes to copy property null");
+      log.warn("Expected to report job progress but total bytes to copy property null");
       return;
     }
     this.totalSizeToCopy = this.jobState.getPropAsLong(ServiceConfigKeys.TOTAL_WORK_UNIT_SIZE);
@@ -287,7 +301,7 @@ public class TaskStateCollectorService extends AbstractScheduledService {
       this.workUnitsCompletedSoFar += 1;
 
       if (this.totalNumWorkUnits == 0) {
-        LOGGER.warn("Expected to report job progress but work units are not countable");
+        log.warn("Expected to report job progress but work units are not countable");
         return;
       }
       newPercentageCopied = this.workUnitsCompletedSoFar / this.totalNumWorkUnits;
@@ -307,7 +321,7 @@ public class TaskStateCollectorService extends AbstractScheduledService {
       Map<String, String> progress = new HashMap<>();
       progress.put(TimingEvent.JOB_COMPLETION_PERCENTAGE, String.valueOf(percentageToReport));
 
-      LOGGER.info("Sending copy progress event with percentage " + percentageToReport + "%");
+      log.info("Sending copy progress event with percentage " + percentageToReport + "%");
       new TimingEvent(this.eventSubmitter, TimingEvent.JOB_COMPLETION_PERCENTAGE).stop(progress);
     }
   }
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/CommitActivity.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/CommitActivity.java
new file mode 100644
index 000000000..1f29a7abb
--- /dev/null
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/CommitActivity.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity;
+
+import io.temporal.activity.ActivityInterface;
+import io.temporal.activity.ActivityMethod;
+import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
+
+
+/** Activity for reading the output of work done by {@link org.apache.gobblin.temporal.ddm.activity.impl.ProcessWorkUnitImpl} by
+ * reading in a {@link WUProcessingSpec} to determine the location of the output task states */
+@ActivityInterface
+public interface CommitActivity {
+  /**
+   * Commit the output of the work done by {@link org.apache.gobblin.temporal.ddm.activity.impl.ProcessWorkUnitImpl}
+   * @param workSpec
+   * @return number of workunits committed
+   */
+  @ActivityMethod
+  int commit(WUProcessingSpec workSpec);
+}
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
new file mode 100644
index 000000000..8874bccd7
--- /dev/null
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity.impl;
+
+import com.google.common.base.Function;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import io.temporal.failure.ApplicationFailure;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import javax.annotation.Nullable;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
+import org.apache.gobblin.commit.DeliverySemantics;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.SafeDatasetCommit;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.runtime.TaskStateCollectorService;
+import org.apache.gobblin.temporal.ddm.activity.CommitActivity;
+import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
+import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+import org.apache.gobblin.util.Either;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.util.executors.IteratorExecutor;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+
+@Slf4j
+public class CommitActivityImpl implements CommitActivity {
+
+  static int DEFAULT_NUM_DESERIALIZATION_THREADS = 10;
+  static int DEFAULT_NUM_COMMIT_THREADS = 1;
+  @Override
+  public int commit(WUProcessingSpec workSpec) {
+    // TODO: Make this configurable
+    int numDeserializationThreads = DEFAULT_NUM_DESERIALIZATION_THREADS;
+    try {
+      FileSystem fs = Help.loadFileSystem(workSpec);
+      JobState jobState = Help.loadJobState(workSpec, fs);
+      SharedResourcesBroker<GobblinScopeTypes> instanceBroker = JobStateUtils.getSharedResourcesBroker(jobState);
+      JobContext globalGobblinContext = new JobContext(jobState.getProperties(), log, instanceBroker, null);
+      // TODO: Task state dir is a stub with the assumption it is always colocated with the workunits dir (as in the case of MR which generates workunits)
+      Path jobIdParent = new Path(workSpec.getWorkUnitsDir()).getParent();
+      Path jobOutputPath = new Path(new Path(jobIdParent, "output"), jobIdParent.getName());
+      log.info("Output path at: " + jobOutputPath + " with fs at " + fs.getUri());
+      StateStore<TaskState> taskStateStore = Help.openTaskStateStore(workSpec, fs);
+      Optional<Queue<TaskState>> taskStateQueueOpt =
+              TaskStateCollectorService.deserializeTaskStatesFromFolder(taskStateStore, jobOutputPath.getName(), numDeserializationThreads);
+      if (!taskStateQueueOpt.isPresent()) {
+        log.error("No task states found at " + jobOutputPath);
+        return 0;
+      }
+      Queue<TaskState> taskStateQueue = taskStateQueueOpt.get();
+      commitTaskStates(jobState, ImmutableList.copyOf(taskStateQueue), globalGobblinContext);
+      return taskStateQueue.size();
+    } catch (Exception e) {
+      //TODO: IMPROVE GRANULARITY OF RETRIES
+      throw ApplicationFailure.newNonRetryableFailureWithCause(
+          "Failed to commit dataset state for some dataset(s) of job <jobStub>",
+          IOException.class.toString(),
+          new IOException(e),
+          null
+      );
+    }
+  }
+
+  /**
+   * Commit task states to the dataset state store.
+   * @param jobState
+   * @param taskStates
+   * @param jobContext
+   * @throws IOException
+   */
+  private void commitTaskStates(State jobState, Collection<TaskState> taskStates, JobContext jobContext) throws IOException {
+    Map<String, JobState.DatasetState> datasetStatesByUrns = createDatasetStatesByUrns(taskStates);
+    final boolean shouldCommitDataInJob = JobContext.shouldCommitDataInJob(jobState);
+    final DeliverySemantics deliverySemantics = DeliverySemantics.AT_LEAST_ONCE;
+    //TODO: Make this configurable
+    final int numCommitThreads = DEFAULT_NUM_COMMIT_THREADS;
+    if (!shouldCommitDataInJob) {
+      if (Strings.isNullOrEmpty(jobState.getProp(ConfigurationKeys.JOB_DATA_PUBLISHER_TYPE))) {
+        log.warn("No data publisher is configured for this job. This can lead to non-atomic commit behavior.");
+      }
+      log.info("Job will not commit data since data are committed by tasks.");
+    }
+
+    try {
+      if (!datasetStatesByUrns.isEmpty()) {
+        log.info("Persisting {} dataset urns.", datasetStatesByUrns.size());
+      }
+
+      List<Either<Void, ExecutionException>> result = new IteratorExecutor<>(Iterables
+          .transform(datasetStatesByUrns.entrySet(),
+              new Function<Map.Entry<String, JobState.DatasetState>, Callable<Void>>() {
+                @Nullable
+                @Override
+                public Callable<Void> apply(final Map.Entry<String, JobState.DatasetState> entry) {
+                  return new SafeDatasetCommit(shouldCommitDataInJob, false, deliverySemantics, entry.getKey(),
+                      entry.getValue(), false, jobContext);
+                }
+              }).iterator(), numCommitThreads,
+          // TODO: Rewrite executorUtils to use java util optional
+          ExecutorsUtils.newThreadFactory(com.google.common.base.Optional.of(log), com.google.common.base.Optional.of("Commit-thread-%d")))
+          .executeAndGetResults();
+
+      IteratorExecutor.logFailures(result, null, 10);
+
+      if (!IteratorExecutor.verifyAllSuccessful(result)) {
+        // TODO: propagate cause of failure and determine whether or not this is retryable to throw a non-retryable failure exception
+        String jobName = jobState.getProperties().getProperty(ConfigurationKeys.JOB_NAME_KEY, "<job_name_stub>");
+        throw new IOException("Failed to commit dataset state for some dataset(s) of job " + jobName);
+      }
+    } catch (InterruptedException exc) {
+      throw new IOException(exc);
+    }
+  }
+
+  /**
+   * Organize task states by dataset urns.
+   * @param taskStates
+   * @return A map of dataset urns to dataset task states.
+   */
+  public static Map<String, JobState.DatasetState> createDatasetStatesByUrns(Collection<TaskState> taskStates) {
+    Map<String, JobState.DatasetState> datasetStatesByUrns = Maps.newHashMap();
+
+    //TODO: handle skipped tasks?
+    for (TaskState taskState : taskStates) {
+      String datasetUrn = createDatasetUrn(datasetStatesByUrns, taskState);
+      datasetStatesByUrns.get(datasetUrn).incrementTaskCount();
+      datasetStatesByUrns.get(datasetUrn).addTaskState(taskState);
+    }
+
+    return datasetStatesByUrns;
+  }
+
+  private static String createDatasetUrn(Map<String, JobState.DatasetState> datasetStatesByUrns, TaskState taskState) {
+    String datasetUrn = taskState.getProp(ConfigurationKeys.DATASET_URN_KEY, ConfigurationKeys.DEFAULT_DATASET_URN);
+    if (!datasetStatesByUrns.containsKey(datasetUrn)) {
+      JobState.DatasetState datasetState = new JobState.DatasetState();
+      datasetState.setDatasetUrn(datasetUrn);
+      datasetStatesByUrns.put(datasetUrn, datasetState);
+    }
+    return datasetUrn;
+  }
+}
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
index 95425a643..88838ca6a 100644
--- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
@@ -17,6 +17,7 @@
 
 package org.apache.gobblin.temporal.ddm.launcher;
 
+import io.temporal.client.WorkflowOptions;
 import java.net.URI;
 import java.util.List;
 import java.util.Properties;
@@ -25,7 +26,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import lombok.extern.slf4j.Slf4j;
 
 import com.typesafe.config.ConfigFactory;
-import io.temporal.client.WorkflowOptions;
 import org.apache.hadoop.fs.Path;
 
 import org.apache.gobblin.metrics.Tag;
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
index 9af6995b5..d425bbc4b 100644
--- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
@@ -24,7 +24,9 @@ import io.temporal.client.WorkflowClient;
 import io.temporal.worker.WorkerOptions;
 
 import org.apache.gobblin.temporal.cluster.AbstractTemporalWorker;
+import org.apache.gobblin.temporal.ddm.activity.impl.CommitActivityImpl;
 import org.apache.gobblin.temporal.ddm.activity.impl.ProcessWorkUnitImpl;
+import org.apache.gobblin.temporal.ddm.workflow.impl.CommitStepWorkflowImpl;
 import org.apache.gobblin.temporal.ddm.workflow.impl.NestingExecOfProcessWorkUnitWorkflowImpl;
 import org.apache.gobblin.temporal.ddm.workflow.impl.ProcessWorkUnitsWorkflowImpl;
 
@@ -40,12 +42,12 @@ public class WorkFulfillmentWorker extends AbstractTemporalWorker {
 
     @Override
     protected Class<?>[] getWorkflowImplClasses() {
-        return new Class[] { ProcessWorkUnitsWorkflowImpl.class, NestingExecOfProcessWorkUnitWorkflowImpl.class };
+        return new Class[] { ProcessWorkUnitsWorkflowImpl.class, NestingExecOfProcessWorkUnitWorkflowImpl.class, CommitStepWorkflowImpl.class };
     }
 
     @Override
     protected Object[] getActivityImplInstances() {
-        return new Object[] { new ProcessWorkUnitImpl() };
+        return new Object[] { new ProcessWorkUnitImpl(), new CommitActivityImpl() };
     }
 
     @Override
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/CommitStepWorkflow.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/CommitStepWorkflow.java
new file mode 100644
index 000000000..f6f497027
--- /dev/null
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/CommitStepWorkflow.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.workflow;
+
+import io.temporal.workflow.WorkflowInterface;
+import io.temporal.workflow.WorkflowMethod;
+import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
+
+
+/**
+ * Workflow for committing the output of work done by {@link org.apache.gobblin.temporal.ddm.activity.impl.ProcessWorkUnitImpl}
+ */
+@WorkflowInterface
+public interface CommitStepWorkflow {
+
+    /**
+     * Commit the output of the work done by {@link org.apache.gobblin.temporal.ddm.activity.impl.ProcessWorkUnitImpl}
+     * @return number of workunits committed
+     */
+    @WorkflowMethod
+    int commit(WUProcessingSpec workSpec);
+}
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
new file mode 100644
index 000000000..2b674ec19
--- /dev/null
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.workflow.impl;
+
+import io.temporal.activity.ActivityOptions;
+import io.temporal.common.RetryOptions;
+import io.temporal.workflow.Workflow;
+
+import java.time.Duration;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.temporal.ddm.activity.CommitActivity;
+import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
+import org.apache.gobblin.temporal.ddm.workflow.CommitStepWorkflow;
+
+
+@Slf4j
+public class CommitStepWorkflowImpl implements CommitStepWorkflow {
+
+  private static final RetryOptions ACTIVITY_RETRY_OPTS = RetryOptions.newBuilder()
+      .setInitialInterval(Duration.ofSeconds(3))
+      .setMaximumInterval(Duration.ofSeconds(100))
+      .setBackoffCoefficient(2)
+      .setMaximumAttempts(4)
+      .build();
+
+  private static final ActivityOptions ACTIVITY_OPTS = ActivityOptions.newBuilder()
+      .setStartToCloseTimeout(Duration.ofSeconds(999))
+      .setRetryOptions(ACTIVITY_RETRY_OPTS)
+      .build();
+
+  private final CommitActivity activityStub = Workflow.newActivityStub(CommitActivity.class, ACTIVITY_OPTS);
+
+  @Override
+  public int commit(WUProcessingSpec workSpec) {
+    return activityStub.commit(workSpec);
+  }
+}
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
index eafc62409..141f22057 100644
--- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
@@ -23,12 +23,14 @@ import com.typesafe.config.ConfigFactory;
 import io.temporal.api.enums.v1.ParentClosePolicy;
 import io.temporal.workflow.ChildWorkflowOptions;
 import io.temporal.workflow.Workflow;
+import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.temporal.cluster.WorkerConfig;
 import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
 import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck;
 import org.apache.gobblin.temporal.ddm.work.assistance.Help;
 import org.apache.gobblin.temporal.ddm.work.styles.FileSystemJobStateful;
+import org.apache.gobblin.temporal.ddm.workflow.CommitStepWorkflow;
 import org.apache.gobblin.temporal.ddm.workflow.ProcessWorkUnitsWorkflow;
 import org.apache.gobblin.temporal.ddm.work.EagerFsDirBackedWorkUnitClaimCheckWorkload;
 import org.apache.gobblin.temporal.util.nesting.work.WorkflowAddr;
@@ -36,17 +38,30 @@ import org.apache.gobblin.temporal.util.nesting.work.Workload;
 import org.apache.gobblin.temporal.util.nesting.workflow.NestingExecWorkflow;
 
 
+@Slf4j
 public class ProcessWorkUnitsWorkflowImpl implements ProcessWorkUnitsWorkflow {
   public static final String CHILD_WORKFLOW_ID_BASE = "NestingExecWorkUnits";
+  public static final String COMMIT_STEP_WORKFLOW_ID_BASE = "CommitStepWorkflow";
 
   @Override
   public int process(WUProcessingSpec workSpec) {
     Workload<WorkUnitClaimCheck> workload = createWorkload(workSpec);
     NestingExecWorkflow<WorkUnitClaimCheck> processingWorkflow = createProcessingWorkflow(workSpec);
-    return processingWorkflow.performWorkload(
+    int workunitsProcessed = processingWorkflow.performWorkload(
         WorkflowAddr.ROOT, workload, 0,
         workSpec.getTuning().getMaxBranchesPerTree(), workSpec.getTuning().getMaxSubTreesPerTree(), Optional.empty()
     );
+    if (workunitsProcessed > 0) {
+      CommitStepWorkflow commitWorkflow = createCommitStepWorkflow();
+      int result = commitWorkflow.commit(workSpec);
+      if (result == 0) {
+        log.warn("No work units committed at the job level. They could be committed at a task level.");
+      }
+      return result;
+    } else {
+      log.error("No work units processed, so no commit attempted.");
+      return 0;
+    }
   }
 
   protected Workload<WorkUnitClaimCheck> createWorkload(WUProcessingSpec workSpec) {
@@ -61,4 +76,13 @@ public class ProcessWorkUnitsWorkflowImpl implements ProcessWorkUnitsWorkflow {
     // TODO: to incorporate multiple different concrete `NestingExecWorkflow` sub-workflows in the same super-workflow... shall we use queues?!?!?
     return Workflow.newChildWorkflowStub(NestingExecWorkflow.class, childOpts);
   }
+
+  protected CommitStepWorkflow createCommitStepWorkflow() {
+    ChildWorkflowOptions childOpts = ChildWorkflowOptions.newBuilder()
+        .setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_ABANDON)
+        .setWorkflowId(Help.qualifyNamePerExec(COMMIT_STEP_WORKFLOW_ID_BASE, WorkerConfig.of(this).orElse(ConfigFactory.empty())))
+        .build();
+
+    return Workflow.newChildWorkflowStub(CommitStepWorkflow.class, childOpts);
+  }
 }