You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@helix.apache.org by GitBox <gi...@apache.org> on 2020/07/01 01:50:57 UTC

[GitHub] [helix] alirezazamani commented on a change in pull request #1076: Recover Workflow Garbage Collection Logic

alirezazamani commented on a change in pull request #1076:
URL: https://github.com/apache/helix/pull/1076#discussion_r448071506



##########
File path: helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
##########
@@ -23,34 +29,87 @@ public AsyncWorkerType getAsyncWorkerType() {
   }
 
   @Override
-  public void execute(ClusterEvent event) {
-    WorkflowControllerDataProvider dataProvider =
-        event.getAttribute(AttributeName.ControllerDataProvider.name());
+  public void process(ClusterEvent event) throws Exception {
+    // Use main thread to compute what jobs need to be purged, and what workflows need to be gc'ed.
+    // This is to avoid race conditions since the cache will be modified. After this work, then the
+    // async work will happen.
     HelixManager manager = event.getAttribute(AttributeName.helixmanager.name());
-
-    if (dataProvider == null || manager == null) {
+    if (manager == null) {
       LOG.warn(
-          "ResourceControllerDataProvider or HelixManager is null for event {}({}) in cluster {}. Skip TaskGarbageCollectionStage.",
+          "HelixManager is null for event {}({}) in cluster {}. Skip TaskGarbageCollectionStage.",
           event.getEventId(), event.getEventType(), event.getClusterName());
       return;
     }
 
-    Set<WorkflowConfig> existingWorkflows =
-        new HashSet<>(dataProvider.getWorkflowConfigMap().values());
-    for (WorkflowConfig workflowConfig : existingWorkflows) {
-      // clean up the expired jobs if it is a queue.
+    Map<String, Set<String>> expiredJobsMap = new HashMap<>();
+    Set<String> workflowsToBeDeleted = new HashSet<>();
+    WorkflowControllerDataProvider dataProvider =
+        event.getAttribute(AttributeName.ControllerDataProvider.name());
+    for (Map.Entry<String, ZNRecord> entry : dataProvider.getContexts().entrySet()) {
+      WorkflowConfig workflowConfig = dataProvider.getWorkflowConfig(entry.getKey());
       if (workflowConfig != null && (!workflowConfig.isTerminable() || workflowConfig
           .isJobQueue())) {
-        try {
-          TaskUtil.purgeExpiredJobs(workflowConfig.getWorkflowId(), workflowConfig,
-              dataProvider.getWorkflowContext(workflowConfig.getWorkflowId()), manager,
-              _rebalanceScheduler);
-        } catch (Exception e) {
-          LOG.warn(String.format("Failed to purge job for workflow %s with reason %s",
-              workflowConfig.getWorkflowId(), e.toString()));
+        WorkflowContext workflowContext = dataProvider.getWorkflowContext(entry.getKey());

Review comment:
       Ok makes sense.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org