You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by wl...@apache.org on 2023/12/15 23:57:32 UTC
(gobblin) branch master updated: [GOBBLIN-1968] Temporal commit step integration (#3829)
This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 18fba9e5c [GOBBLIN-1968] Temporal commit step integration (#3829)
18fba9e5c is described below
commit 18fba9e5c3d70f21a7b1dbe2b65ee83c23bfa4e6
Author: William Lo <lo...@gmail.com>
AuthorDate: Fri Dec 15 18:57:27 2023 -0500
[GOBBLIN-1968] Temporal commit step integration (#3829)
Add commit step to Gobblin temporal workflow for job publish
---
.../org/apache/gobblin/runtime/JobContext.java | 2 +-
.../apache/gobblin/runtime/SafeDatasetCommit.java | 6 +-
.../gobblin/runtime/TaskStateCollectorService.java | 108 +++++++------
.../temporal/ddm/activity/CommitActivity.java | 36 +++++
.../ddm/activity/impl/CommitActivityImpl.java | 175 +++++++++++++++++++++
.../ddm/launcher/ProcessWorkUnitsJobLauncher.java | 2 +-
.../temporal/ddm/worker/WorkFulfillmentWorker.java | 6 +-
.../temporal/ddm/workflow/CommitStepWorkflow.java | 37 +++++
.../ddm/workflow/impl/CommitStepWorkflowImpl.java | 53 +++++++
.../impl/ProcessWorkUnitsWorkflowImpl.java | 26 ++-
10 files changed, 396 insertions(+), 55 deletions(-)
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
index 89d1dc41b..c2bdc592d 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
@@ -543,7 +543,7 @@ public class JobContext implements Closeable {
* Data should be committed by the job if either {@link ConfigurationKeys#JOB_COMMIT_POLICY_KEY} is set to "full",
* or {@link ConfigurationKeys#PUBLISH_DATA_AT_JOB_LEVEL} is set to true.
*/
- private static boolean shouldCommitDataInJob(State state) {
+ public static boolean shouldCommitDataInJob(State state) {
boolean jobCommitPolicyIsFull =
JobCommitPolicy.getCommitPolicy(state.getProperties()) == JobCommitPolicy.COMMIT_ON_FULL_SUCCESS;
boolean publishDataAtJobLevel = state.getPropAsBoolean(ConfigurationKeys.PUBLISH_DATA_AT_JOB_LEVEL,
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
index a2885c168..e07083ff1 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
@@ -63,7 +63,7 @@ import org.apache.gobblin.source.extractor.JobCommitPolicy;
*/
@RequiredArgsConstructor
@Slf4j
-final class SafeDatasetCommit implements Callable<Void> {
+public final class SafeDatasetCommit implements Callable<Void> {
private static final Object GLOBAL_LOCK = new Object();
@@ -319,8 +319,8 @@ final class SafeDatasetCommit implements Callable<Void> {
datasetState.setState(JobState.RunningState.FAILED);
datasetState.incrementJobFailures();
Optional<String> taskStateException = taskState.getTaskFailureException();
- log.warn("At least one task did not get committed successfully. Setting dataset state to FAILED. "
- + (taskStateException.isPresent() ? taskStateException.get() : "Exception not set."));
+ log.warn("Failed task state for {} At least one task did not get committed successfully. Setting dataset state to FAILED. {}" ,
+ taskState.getWorkunit().getOutputFilePath(), taskStateException.isPresent() ? taskStateException.get() : "Exception not set.");
return;
}
}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java
index 431646385..b7eb436fa 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Properties;
import java.util.Queue;
import java.util.concurrent.Callable;
@@ -29,10 +30,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import com.google.common.base.Optional;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Queues;
@@ -68,8 +66,6 @@ import org.apache.gobblin.util.ParallelRunner;
@Slf4j
public class TaskStateCollectorService extends AbstractScheduledService {
- private static final Logger LOGGER = LoggerFactory.getLogger(TaskStateCollectorService.class);
-
private final JobState jobState;
private final EventBus eventBus;
@@ -145,7 +141,7 @@ public class TaskStateCollectorService extends AbstractScheduledService {
throw new RuntimeException("Could not construct TaskCollectorHandler " + handlerTypeName, rfe);
}
} else {
- optionalTaskCollectorHandler = Optional.absent();
+ optionalTaskCollectorHandler = Optional.empty();
}
isJobProceedOnCollectorServiceFailure =
@@ -166,13 +162,13 @@ public class TaskStateCollectorService extends AbstractScheduledService {
@Override
protected void startUp() throws Exception {
- LOGGER.info("Starting the " + TaskStateCollectorService.class.getSimpleName());
+ log.info("Starting the " + TaskStateCollectorService.class.getSimpleName());
super.startUp();
}
@Override
protected void shutDown() throws Exception {
- LOGGER.info("Stopping the " + TaskStateCollectorService.class.getSimpleName());
+ log.info("Stopping the " + TaskStateCollectorService.class.getSimpleName());
try {
runOneIteration();
} finally {
@@ -193,42 +189,14 @@ public class TaskStateCollectorService extends AbstractScheduledService {
* @throws IOException if it fails to collect the output {@link TaskState}s
*/
private void collectOutputTaskStates() throws IOException {
- List<String> taskStateNames = taskStateStore.getTableNames(outputTaskStateDir.getName(), new Predicate<String>() {
- @Override
- public boolean apply(String input) {
- return input.endsWith(AbstractJobLauncher.TASK_STATE_STORE_TABLE_SUFFIX)
- && !input.startsWith(FsStateStore.TMP_FILE_PREFIX);
- }});
- if (taskStateNames == null || taskStateNames.size() == 0) {
- LOGGER.debug("No output task state files found in " + this.outputTaskStateDir);
+ final Optional<Queue<TaskState>> taskStateQueue = deserializeTaskStatesFromFolder(taskStateStore, outputTaskStateDir.getName(), this.stateSerDeRunnerThreads);
+ if (!taskStateQueue.isPresent()) {
return;
}
-
- final Queue<TaskState> taskStateQueue = Queues.newConcurrentLinkedQueue();
- try (ParallelRunner stateSerDeRunner = new ParallelRunner(this.stateSerDeRunnerThreads, null)) {
- for (final String taskStateName : taskStateNames) {
- LOGGER.debug("Found output task state file " + taskStateName);
- // Deserialize the TaskState and delete the file
- stateSerDeRunner.submitCallable(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- TaskState taskState = taskStateStore.getAll(outputTaskStateDir.getName(), taskStateName).get(0);
- taskStateQueue.add(taskState);
- taskStateStore.delete(outputTaskStateDir.getName(), taskStateName);
- return null;
- }
- }, "Deserialize state for " + taskStateName);
- }
- } catch (IOException ioe) {
- LOGGER.warn("Could not read all task state files.");
- }
-
- LOGGER.info(String.format("Collected task state of %d completed tasks", taskStateQueue.size()));
-
// Add the TaskStates of completed tasks to the JobState so when the control
// returns to the launcher, it sees the TaskStates of all completed tasks.
- for (TaskState taskState : taskStateQueue) {
+ for (TaskState taskState : taskStateQueue.get()) {
consumeTaskIssues(taskState);
taskState.setJobState(this.jobState);
this.jobState.addTaskState(taskState);
@@ -241,14 +209,14 @@ public class TaskStateCollectorService extends AbstractScheduledService {
// Finish any additional steps defined in handler on driver level.
// Currently implemented handler for Hive registration only.
if (optionalTaskCollectorHandler.isPresent()) {
- LOGGER.info("Execute Pipelined TaskStateCollectorService Handler for " + taskStateQueue.size() + " tasks");
+ log.info("Execute Pipelined TaskStateCollectorService Handler for " + taskStateQueue.get().size() + " tasks");
try {
- optionalTaskCollectorHandler.get().handle(taskStateQueue);
+ optionalTaskCollectorHandler.get().handle(taskStateQueue.get());
} catch (Throwable t) {
if (isJobProceedOnCollectorServiceFailure) {
log.error("Failed to commit dataset while job proceeds", t);
- SafeDatasetCommit.setTaskFailureException(taskStateQueue, t);
+ SafeDatasetCommit.setTaskFailureException(taskStateQueue.get(), t);
} else {
throw new RuntimeException("Hive Registration as the TaskStateCollectorServiceHandler failed.", t);
}
@@ -256,7 +224,53 @@ public class TaskStateCollectorService extends AbstractScheduledService {
}
// Notify the listeners for the completion of the tasks
- this.eventBus.post(new NewTaskCompletionEvent(ImmutableList.copyOf(taskStateQueue)));
+ this.eventBus.post(new NewTaskCompletionEvent(ImmutableList.copyOf(taskStateQueue.get())));
+ }
+
+ /**
+ * Reads in a {@link StateStore} and deserializes all task states found in the provided table name
+ * Task State files are populated by the {@link GobblinMultiTaskAttempt} to record the output of remote concurrent tasks (e.g. MR mappers)
+ * @param taskStateStore
+ * @param taskStateTableName
+ * @param numDeserializerThreads
+ * @return Queue of TaskStates, optional if no task states are found in the provided state store
+ * @throws IOException
+ */
+ public static Optional<Queue<TaskState>> deserializeTaskStatesFromFolder(StateStore<TaskState> taskStateStore, String taskStateTableName,
+ int numDeserializerThreads) throws IOException {
+ List<String> taskStateNames = taskStateStore.getTableNames(taskStateTableName, new Predicate<String>() {
+ @Override
+ public boolean apply(String input) {
+ return input != null
+ && input.endsWith(AbstractJobLauncher.TASK_STATE_STORE_TABLE_SUFFIX)
+ && !input.startsWith(FsStateStore.TMP_FILE_PREFIX);
+ }});
+
+ if (taskStateNames == null || taskStateNames.isEmpty()) {
+ log.warn("No output task state files found in " + taskStateTableName);
+ return Optional.empty();
+ }
+
+ final Queue<TaskState> taskStateQueue = Queues.newConcurrentLinkedQueue();
+ try (ParallelRunner stateSerDeRunner = new ParallelRunner(numDeserializerThreads, null)) {
+ for (final String taskStateName : taskStateNames) {
+ log.debug("Found output task state file " + taskStateName);
+ // Deserialize the TaskState and delete the file
+ stateSerDeRunner.submitCallable(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ TaskState taskState = taskStateStore.getAll(taskStateTableName, taskStateName).get(0);
+ taskStateQueue.add(taskState);
+ taskStateStore.delete(taskStateTableName, taskStateName);
+ return null;
+ }
+ }, "Deserialize state for " + taskStateName);
+ }
+ } catch (IOException ioe) {
+ log.error("Could not read all task state files due to", ioe);
+ }
+ log.info(String.format("Collected task state of %d completed tasks in %s", taskStateQueue.size(), taskStateTableName));
+ return Optional.of(taskStateQueue);
}
/**
@@ -267,7 +281,7 @@ public class TaskStateCollectorService extends AbstractScheduledService {
private void reportJobProgress(TaskState taskState) {
String stringSize = taskState.getProp(ServiceConfigKeys.WORK_UNIT_SIZE);
if (stringSize == null) {
- LOGGER.warn("Expected to report job progress but work unit byte size property null");
+ log.warn("Expected to report job progress but work unit byte size property null");
return;
}
@@ -275,7 +289,7 @@ public class TaskStateCollectorService extends AbstractScheduledService {
// If progress reporting is enabled, value should be present
if (!this.jobState.contains(ServiceConfigKeys.TOTAL_WORK_UNIT_SIZE)) {
- LOGGER.warn("Expected to report job progress but total bytes to copy property null");
+ log.warn("Expected to report job progress but total bytes to copy property null");
return;
}
this.totalSizeToCopy = this.jobState.getPropAsLong(ServiceConfigKeys.TOTAL_WORK_UNIT_SIZE);
@@ -287,7 +301,7 @@ public class TaskStateCollectorService extends AbstractScheduledService {
this.workUnitsCompletedSoFar += 1;
if (this.totalNumWorkUnits == 0) {
- LOGGER.warn("Expected to report job progress but work units are not countable");
+ log.warn("Expected to report job progress but work units are not countable");
return;
}
newPercentageCopied = this.workUnitsCompletedSoFar / this.totalNumWorkUnits;
@@ -307,7 +321,7 @@ public class TaskStateCollectorService extends AbstractScheduledService {
Map<String, String> progress = new HashMap<>();
progress.put(TimingEvent.JOB_COMPLETION_PERCENTAGE, String.valueOf(percentageToReport));
- LOGGER.info("Sending copy progress event with percentage " + percentageToReport + "%");
+ log.info("Sending copy progress event with percentage " + percentageToReport + "%");
new TimingEvent(this.eventSubmitter, TimingEvent.JOB_COMPLETION_PERCENTAGE).stop(progress);
}
}
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/CommitActivity.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/CommitActivity.java
new file mode 100644
index 000000000..1f29a7abb
--- /dev/null
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/CommitActivity.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity;
+
+import io.temporal.activity.ActivityInterface;
+import io.temporal.activity.ActivityMethod;
+import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
+
+
+/** Activity for reading the output of work done by {@link org.apache.gobblin.temporal.ddm.activity.impl.ProcessWorkUnitImpl} by
+ * reading in a {@link WUProcessingSpec} to determine the location of the output task states */
+@ActivityInterface
+public interface CommitActivity {
+ /**
+ * Commit the output of the work done by {@link org.apache.gobblin.temporal.ddm.activity.impl.ProcessWorkUnitImpl}
+ * @param workSpec
+ * @return number of workunits committed
+ */
+ @ActivityMethod
+ int commit(WUProcessingSpec workSpec);
+}
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
new file mode 100644
index 000000000..8874bccd7
--- /dev/null
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity.impl;
+
+import com.google.common.base.Function;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import io.temporal.failure.ApplicationFailure;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import javax.annotation.Nullable;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
+import org.apache.gobblin.commit.DeliverySemantics;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.SafeDatasetCommit;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.runtime.TaskStateCollectorService;
+import org.apache.gobblin.temporal.ddm.activity.CommitActivity;
+import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
+import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+import org.apache.gobblin.util.Either;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.util.executors.IteratorExecutor;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+
+@Slf4j
+public class CommitActivityImpl implements CommitActivity {
+
+ static int DEFAULT_NUM_DESERIALIZATION_THREADS = 10;
+ static int DEFAULT_NUM_COMMIT_THREADS = 1;
+ @Override
+ public int commit(WUProcessingSpec workSpec) {
+ // TODO: Make this configurable
+ int numDeserializationThreads = DEFAULT_NUM_DESERIALIZATION_THREADS;
+ try {
+ FileSystem fs = Help.loadFileSystem(workSpec);
+ JobState jobState = Help.loadJobState(workSpec, fs);
+ SharedResourcesBroker<GobblinScopeTypes> instanceBroker = JobStateUtils.getSharedResourcesBroker(jobState);
+ JobContext globalGobblinContext = new JobContext(jobState.getProperties(), log, instanceBroker, null);
+ // TODO: Task state dir is a stub with the assumption it is always colocated with the workunits dir (as in the case of MR which generates workunits)
+ Path jobIdParent = new Path(workSpec.getWorkUnitsDir()).getParent();
+ Path jobOutputPath = new Path(new Path(jobIdParent, "output"), jobIdParent.getName());
+ log.info("Output path at: " + jobOutputPath + " with fs at " + fs.getUri());
+ StateStore<TaskState> taskStateStore = Help.openTaskStateStore(workSpec, fs);
+ Optional<Queue<TaskState>> taskStateQueueOpt =
+ TaskStateCollectorService.deserializeTaskStatesFromFolder(taskStateStore, jobOutputPath.getName(), numDeserializationThreads);
+ if (!taskStateQueueOpt.isPresent()) {
+ log.error("No task states found at " + jobOutputPath);
+ return 0;
+ }
+ Queue<TaskState> taskStateQueue = taskStateQueueOpt.get();
+ commitTaskStates(jobState, ImmutableList.copyOf(taskStateQueue), globalGobblinContext);
+ return taskStateQueue.size();
+ } catch (Exception e) {
+ //TODO: IMPROVE GRANULARITY OF RETRIES
+ throw ApplicationFailure.newNonRetryableFailureWithCause(
+ "Failed to commit dataset state for some dataset(s) of job <jobStub>",
+ IOException.class.toString(),
+ new IOException(e),
+ null
+ );
+ }
+ }
+
+ /**
+ * Commit task states to the dataset state store.
+ * @param jobState
+ * @param taskStates
+ * @param jobContext
+ * @throws IOException
+ */
+ private void commitTaskStates(State jobState, Collection<TaskState> taskStates, JobContext jobContext) throws IOException {
+ Map<String, JobState.DatasetState> datasetStatesByUrns = createDatasetStatesByUrns(taskStates);
+ final boolean shouldCommitDataInJob = JobContext.shouldCommitDataInJob(jobState);
+ final DeliverySemantics deliverySemantics = DeliverySemantics.AT_LEAST_ONCE;
+ //TODO: Make this configurable
+ final int numCommitThreads = DEFAULT_NUM_COMMIT_THREADS;
+ if (!shouldCommitDataInJob) {
+ if (Strings.isNullOrEmpty(jobState.getProp(ConfigurationKeys.JOB_DATA_PUBLISHER_TYPE))) {
+ log.warn("No data publisher is configured for this job. This can lead to non-atomic commit behavior.");
+ }
+ log.info("Job will not commit data since data are committed by tasks.");
+ }
+
+ try {
+ if (!datasetStatesByUrns.isEmpty()) {
+ log.info("Persisting {} dataset urns.", datasetStatesByUrns.size());
+ }
+
+ List<Either<Void, ExecutionException>> result = new IteratorExecutor<>(Iterables
+ .transform(datasetStatesByUrns.entrySet(),
+ new Function<Map.Entry<String, JobState.DatasetState>, Callable<Void>>() {
+ @Nullable
+ @Override
+ public Callable<Void> apply(final Map.Entry<String, JobState.DatasetState> entry) {
+ return new SafeDatasetCommit(shouldCommitDataInJob, false, deliverySemantics, entry.getKey(),
+ entry.getValue(), false, jobContext);
+ }
+ }).iterator(), numCommitThreads,
+ // TODO: Rewrite executorUtils to use java util optional
+ ExecutorsUtils.newThreadFactory(com.google.common.base.Optional.of(log), com.google.common.base.Optional.of("Commit-thread-%d")))
+ .executeAndGetResults();
+
+ IteratorExecutor.logFailures(result, null, 10);
+
+ if (!IteratorExecutor.verifyAllSuccessful(result)) {
+ // TODO: propagate cause of failure and determine whether or not this is retryable to throw a non-retryable failure exception
+ String jobName = jobState.getProperties().getProperty(ConfigurationKeys.JOB_NAME_KEY, "<job_name_stub>");
+ throw new IOException("Failed to commit dataset state for some dataset(s) of job " + jobName);
+ }
+ } catch (InterruptedException exc) {
+ throw new IOException(exc);
+ }
+ }
+
+ /**
+ * Organize task states by dataset urns.
+ * @param taskStates
+ * @return A map of dataset urns to dataset task states.
+ */
+ public static Map<String, JobState.DatasetState> createDatasetStatesByUrns(Collection<TaskState> taskStates) {
+ Map<String, JobState.DatasetState> datasetStatesByUrns = Maps.newHashMap();
+
+ //TODO: handle skipped tasks?
+ for (TaskState taskState : taskStates) {
+ String datasetUrn = createDatasetUrn(datasetStatesByUrns, taskState);
+ datasetStatesByUrns.get(datasetUrn).incrementTaskCount();
+ datasetStatesByUrns.get(datasetUrn).addTaskState(taskState);
+ }
+
+ return datasetStatesByUrns;
+ }
+
+ private static String createDatasetUrn(Map<String, JobState.DatasetState> datasetStatesByUrns, TaskState taskState) {
+ String datasetUrn = taskState.getProp(ConfigurationKeys.DATASET_URN_KEY, ConfigurationKeys.DEFAULT_DATASET_URN);
+ if (!datasetStatesByUrns.containsKey(datasetUrn)) {
+ JobState.DatasetState datasetState = new JobState.DatasetState();
+ datasetState.setDatasetUrn(datasetUrn);
+ datasetStatesByUrns.put(datasetUrn, datasetState);
+ }
+ return datasetUrn;
+ }
+}
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
index 95425a643..88838ca6a 100644
--- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
@@ -17,6 +17,7 @@
package org.apache.gobblin.temporal.ddm.launcher;
+import io.temporal.client.WorkflowOptions;
import java.net.URI;
import java.util.List;
import java.util.Properties;
@@ -25,7 +26,6 @@ import java.util.concurrent.ConcurrentHashMap;
import lombok.extern.slf4j.Slf4j;
import com.typesafe.config.ConfigFactory;
-import io.temporal.client.WorkflowOptions;
import org.apache.hadoop.fs.Path;
import org.apache.gobblin.metrics.Tag;
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
index 9af6995b5..d425bbc4b 100644
--- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
@@ -24,7 +24,9 @@ import io.temporal.client.WorkflowClient;
import io.temporal.worker.WorkerOptions;
import org.apache.gobblin.temporal.cluster.AbstractTemporalWorker;
+import org.apache.gobblin.temporal.ddm.activity.impl.CommitActivityImpl;
import org.apache.gobblin.temporal.ddm.activity.impl.ProcessWorkUnitImpl;
+import org.apache.gobblin.temporal.ddm.workflow.impl.CommitStepWorkflowImpl;
import org.apache.gobblin.temporal.ddm.workflow.impl.NestingExecOfProcessWorkUnitWorkflowImpl;
import org.apache.gobblin.temporal.ddm.workflow.impl.ProcessWorkUnitsWorkflowImpl;
@@ -40,12 +42,12 @@ public class WorkFulfillmentWorker extends AbstractTemporalWorker {
@Override
protected Class<?>[] getWorkflowImplClasses() {
- return new Class[] { ProcessWorkUnitsWorkflowImpl.class, NestingExecOfProcessWorkUnitWorkflowImpl.class };
+ return new Class[] { ProcessWorkUnitsWorkflowImpl.class, NestingExecOfProcessWorkUnitWorkflowImpl.class, CommitStepWorkflowImpl.class };
}
@Override
protected Object[] getActivityImplInstances() {
- return new Object[] { new ProcessWorkUnitImpl() };
+ return new Object[] { new ProcessWorkUnitImpl(), new CommitActivityImpl() };
}
@Override
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/CommitStepWorkflow.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/CommitStepWorkflow.java
new file mode 100644
index 000000000..f6f497027
--- /dev/null
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/CommitStepWorkflow.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.workflow;
+
+import io.temporal.workflow.WorkflowInterface;
+import io.temporal.workflow.WorkflowMethod;
+import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
+
+
+/**
+ * Workflow for committing the output of work done by {@link org.apache.gobblin.temporal.ddm.activity.impl.ProcessWorkUnitImpl}
+ */
+@WorkflowInterface
+public interface CommitStepWorkflow {
+
+ /**
+ * Commit the output of the work done by {@link org.apache.gobblin.temporal.ddm.activity.impl.ProcessWorkUnitImpl}
+ * @return number of workunits committed
+ */
+ @WorkflowMethod
+ int commit(WUProcessingSpec workSpec);
+}
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
new file mode 100644
index 000000000..2b674ec19
--- /dev/null
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.workflow.impl;
+
+import io.temporal.activity.ActivityOptions;
+import io.temporal.common.RetryOptions;
+import io.temporal.workflow.Workflow;
+
+import java.time.Duration;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.temporal.ddm.activity.CommitActivity;
+import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
+import org.apache.gobblin.temporal.ddm.workflow.CommitStepWorkflow;
+
+
+@Slf4j
+public class CommitStepWorkflowImpl implements CommitStepWorkflow {
+
+ private static final RetryOptions ACTIVITY_RETRY_OPTS = RetryOptions.newBuilder()
+ .setInitialInterval(Duration.ofSeconds(3))
+ .setMaximumInterval(Duration.ofSeconds(100))
+ .setBackoffCoefficient(2)
+ .setMaximumAttempts(4)
+ .build();
+
+ private static final ActivityOptions ACTIVITY_OPTS = ActivityOptions.newBuilder()
+ .setStartToCloseTimeout(Duration.ofSeconds(999))
+ .setRetryOptions(ACTIVITY_RETRY_OPTS)
+ .build();
+
+ private final CommitActivity activityStub = Workflow.newActivityStub(CommitActivity.class, ACTIVITY_OPTS);
+
+ @Override
+ public int commit(WUProcessingSpec workSpec) {
+ return activityStub.commit(workSpec);
+ }
+}
diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
index eafc62409..141f22057 100644
--- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
+++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
@@ -23,12 +23,14 @@ import com.typesafe.config.ConfigFactory;
import io.temporal.api.enums.v1.ParentClosePolicy;
import io.temporal.workflow.ChildWorkflowOptions;
import io.temporal.workflow.Workflow;
+import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.temporal.cluster.WorkerConfig;
import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck;
import org.apache.gobblin.temporal.ddm.work.assistance.Help;
import org.apache.gobblin.temporal.ddm.work.styles.FileSystemJobStateful;
+import org.apache.gobblin.temporal.ddm.workflow.CommitStepWorkflow;
import org.apache.gobblin.temporal.ddm.workflow.ProcessWorkUnitsWorkflow;
import org.apache.gobblin.temporal.ddm.work.EagerFsDirBackedWorkUnitClaimCheckWorkload;
import org.apache.gobblin.temporal.util.nesting.work.WorkflowAddr;
@@ -36,17 +38,30 @@ import org.apache.gobblin.temporal.util.nesting.work.Workload;
import org.apache.gobblin.temporal.util.nesting.workflow.NestingExecWorkflow;
+@Slf4j
public class ProcessWorkUnitsWorkflowImpl implements ProcessWorkUnitsWorkflow {
public static final String CHILD_WORKFLOW_ID_BASE = "NestingExecWorkUnits";
+ public static final String COMMIT_STEP_WORKFLOW_ID_BASE = "CommitStepWorkflow";
@Override
public int process(WUProcessingSpec workSpec) {
Workload<WorkUnitClaimCheck> workload = createWorkload(workSpec);
NestingExecWorkflow<WorkUnitClaimCheck> processingWorkflow = createProcessingWorkflow(workSpec);
- return processingWorkflow.performWorkload(
+ int workunitsProcessed = processingWorkflow.performWorkload(
WorkflowAddr.ROOT, workload, 0,
workSpec.getTuning().getMaxBranchesPerTree(), workSpec.getTuning().getMaxSubTreesPerTree(), Optional.empty()
);
+ if (workunitsProcessed > 0) {
+ CommitStepWorkflow commitWorkflow = createCommitStepWorkflow();
+ int result = commitWorkflow.commit(workSpec);
+ if (result == 0) {
+ log.warn("No work units committed at the job level. They could be committed at a task level.");
+ }
+ return result;
+ } else {
+ log.error("No work units processed, so no commit attempted.");
+ return 0;
+ }
}
protected Workload<WorkUnitClaimCheck> createWorkload(WUProcessingSpec workSpec) {
@@ -61,4 +76,13 @@ public class ProcessWorkUnitsWorkflowImpl implements ProcessWorkUnitsWorkflow {
// TODO: to incorporate multiple different concrete `NestingExecWorkflow` sub-workflows in the same super-workflow... shall we use queues?!?!?
return Workflow.newChildWorkflowStub(NestingExecWorkflow.class, childOpts);
}
+
+ protected CommitStepWorkflow createCommitStepWorkflow() {
+ ChildWorkflowOptions childOpts = ChildWorkflowOptions.newBuilder()
+ .setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_ABANDON)
+ .setWorkflowId(Help.qualifyNamePerExec(COMMIT_STEP_WORKFLOW_ID_BASE, WorkerConfig.of(this).orElse(ConfigFactory.empty())))
+ .build();
+
+ return Workflow.newChildWorkflowStub(CommitStepWorkflow.class, childOpts);
+ }
}