You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/10/25 23:12:55 UTC

[1/2] helix git commit: Fix concurrent modification for aync purge job

Repository: helix
Updated Branches:
  refs/heads/master 67ff66b48 -> d75d5fcdc


Fix concurrent modification for aync purge job

For async purge job, we need to get a snapshot of WorkflowConfig list and loop through all the jobs inside.
It has concurrent modification because this map is shared between different pipelines.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/4a8e7853
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/4a8e7853
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/4a8e7853

Branch: refs/heads/master
Commit: 4a8e78531191e99a8ee1de2b63a81d6a54c0961a
Parents: 67ff66b
Author: Junkai Xue <jx...@linkedin.com>
Authored: Fri Aug 17 17:11:11 2018 -0700
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Thu Oct 25 16:12:38 2018 -0700

----------------------------------------------------------------------
 .../helix/common/caches/TaskDataCache.java      |  3 ++-
 .../stages/TaskGarbageCollectionStage.java      | 23 +++++++++++++++-----
 2 files changed, 19 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/4a8e7853/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java
index 8892d2e..c706db3 100644
--- a/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import org.apache.helix.AccessOption;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.PropertyType;
@@ -51,7 +52,7 @@ public class TaskDataCache extends AbstractDataCache {
 
   private String _clusterName;
   private Map<String, JobConfig> _jobConfigMap = new HashMap<>();
-  private Map<String, WorkflowConfig> _workflowConfigMap = new HashMap<>();
+  private Map<String, WorkflowConfig> _workflowConfigMap = new ConcurrentHashMap<>();
   private Map<String, ZNRecord> _contextMap = new HashMap<>();
   // The following fields have been added for quota-based task scheduling
   private final AssignableInstanceManager _assignableInstanceManager = new AssignableInstanceManager();

http://git-wip-us.apache.org/repos/asf/helix/blob/4a8e7853/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
index e533c68..247d053 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
@@ -1,5 +1,7 @@
 package org.apache.helix.controller.stages;
 
+import java.util.HashSet;
+import java.util.Set;
 import org.apache.helix.HelixManager;
 import org.apache.helix.controller.pipeline.AbstractAsyncBaseStage;
 import org.apache.helix.controller.pipeline.AsyncWorkerType;
@@ -10,6 +12,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class TaskGarbageCollectionStage extends AbstractAsyncBaseStage {
+  private static Logger LOG = LoggerFactory.getLogger(TaskGarbageCollectionStage.class);
   private static RebalanceScheduler _rebalanceScheduler = new RebalanceScheduler();
 
   @Override
@@ -18,15 +21,23 @@ public class TaskGarbageCollectionStage extends AbstractAsyncBaseStage {
   }
 
   @Override
-  public void execute(ClusterEvent event) throws Exception {
+  public void execute(ClusterEvent event) {
     ClusterDataCache clusterDataCache = event.getAttribute(AttributeName.ClusterDataCache.name());
     HelixManager manager = event.getAttribute(AttributeName.helixmanager.name());
-    for (WorkflowConfig workflowConfig : clusterDataCache.getWorkflowConfigMap().values()) {
+    Set<WorkflowConfig> existingWorkflows =
+        new HashSet<>(clusterDataCache.getWorkflowConfigMap().values());
+    for (WorkflowConfig workflowConfig : existingWorkflows) {
       // clean up the expired jobs if it is a queue.
-      if (!workflowConfig.isTerminable() || workflowConfig.isJobQueue()) {
-        TaskUtil.purgeExpiredJobs(workflowConfig.getWorkflowId(), workflowConfig,
-            clusterDataCache.getWorkflowContext(workflowConfig.getWorkflowId()), manager,
-            _rebalanceScheduler);
+      if (workflowConfig != null && (!workflowConfig.isTerminable() || workflowConfig
+          .isJobQueue())) {
+        try {
+          TaskUtil.purgeExpiredJobs(workflowConfig.getWorkflowId(), workflowConfig,
+              clusterDataCache.getWorkflowContext(workflowConfig.getWorkflowId()), manager,
+              _rebalanceScheduler);
+        } catch (Exception e) {
+          LOG.warn(String.format("Failed to purge job for workflow %s with reason %s",
+              workflowConfig.getWorkflowId(), e.toString()));
+        }
       }
     }
   }


[2/2] helix git commit: Fix mappingCalculator NPE

Posted by jx...@apache.org.
Fix mappingCalculator NPE


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/d75d5fcd
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/d75d5fcd
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/d75d5fcd

Branch: refs/heads/master
Commit: d75d5fcdc9736a0023e8b98a1e55d802fbf4212b
Parents: 4a8e785
Author: Junkai Xue <jx...@linkedin.com>
Authored: Wed Aug 22 14:59:29 2018 -0700
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Thu Oct 25 16:12:44 2018 -0700

----------------------------------------------------------------------
 .../apache/helix/controller/stages/task/TaskSchedulingStage.java    | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/d75d5fcd/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java
index 86c6c32..cbb0160 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java
@@ -134,6 +134,7 @@ public class TaskSchedulingStage extends AbstractBaseStage {
     } else {
       // Create dummy rebalancer for dropping existing current states
       rebalancer = new SemiAutoRebalancer();
+      mappingCalculator = new SemiAutoRebalancer();
     }
 
     if (rebalancer instanceof TaskRebalancer) {