You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/10/25 23:12:55 UTC
[1/2] helix git commit: Fix concurrent modification for aync purge job
Repository: helix
Updated Branches:
refs/heads/master 67ff66b48 -> d75d5fcdc
Fix concurrent modification for aync purge job
For async purge job, we need to get a snapshot of WorkflowConfig list and loop through all the jobs inside.
It has concurrent modification because this map is shared between different pipelines.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/4a8e7853
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/4a8e7853
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/4a8e7853
Branch: refs/heads/master
Commit: 4a8e78531191e99a8ee1de2b63a81d6a54c0961a
Parents: 67ff66b
Author: Junkai Xue <jx...@linkedin.com>
Authored: Fri Aug 17 17:11:11 2018 -0700
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Thu Oct 25 16:12:38 2018 -0700
----------------------------------------------------------------------
.../helix/common/caches/TaskDataCache.java | 3 ++-
.../stages/TaskGarbageCollectionStage.java | 23 +++++++++++++++-----
2 files changed, 19 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/4a8e7853/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java
index 8892d2e..c706db3 100644
--- a/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.helix.AccessOption;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.PropertyType;
@@ -51,7 +52,7 @@ public class TaskDataCache extends AbstractDataCache {
private String _clusterName;
private Map<String, JobConfig> _jobConfigMap = new HashMap<>();
- private Map<String, WorkflowConfig> _workflowConfigMap = new HashMap<>();
+ private Map<String, WorkflowConfig> _workflowConfigMap = new ConcurrentHashMap<>();
private Map<String, ZNRecord> _contextMap = new HashMap<>();
// The following fields have been added for quota-based task scheduling
private final AssignableInstanceManager _assignableInstanceManager = new AssignableInstanceManager();
http://git-wip-us.apache.org/repos/asf/helix/blob/4a8e7853/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
index e533c68..247d053 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
@@ -1,5 +1,7 @@
package org.apache.helix.controller.stages;
+import java.util.HashSet;
+import java.util.Set;
import org.apache.helix.HelixManager;
import org.apache.helix.controller.pipeline.AbstractAsyncBaseStage;
import org.apache.helix.controller.pipeline.AsyncWorkerType;
@@ -10,6 +12,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TaskGarbageCollectionStage extends AbstractAsyncBaseStage {
+ private static Logger LOG = LoggerFactory.getLogger(TaskGarbageCollectionStage.class);
private static RebalanceScheduler _rebalanceScheduler = new RebalanceScheduler();
@Override
@@ -18,15 +21,23 @@ public class TaskGarbageCollectionStage extends AbstractAsyncBaseStage {
}
@Override
- public void execute(ClusterEvent event) throws Exception {
+ public void execute(ClusterEvent event) {
ClusterDataCache clusterDataCache = event.getAttribute(AttributeName.ClusterDataCache.name());
HelixManager manager = event.getAttribute(AttributeName.helixmanager.name());
- for (WorkflowConfig workflowConfig : clusterDataCache.getWorkflowConfigMap().values()) {
+ Set<WorkflowConfig> existingWorkflows =
+ new HashSet<>(clusterDataCache.getWorkflowConfigMap().values());
+ for (WorkflowConfig workflowConfig : existingWorkflows) {
// clean up the expired jobs if it is a queue.
- if (!workflowConfig.isTerminable() || workflowConfig.isJobQueue()) {
- TaskUtil.purgeExpiredJobs(workflowConfig.getWorkflowId(), workflowConfig,
- clusterDataCache.getWorkflowContext(workflowConfig.getWorkflowId()), manager,
- _rebalanceScheduler);
+ if (workflowConfig != null && (!workflowConfig.isTerminable() || workflowConfig
+ .isJobQueue())) {
+ try {
+ TaskUtil.purgeExpiredJobs(workflowConfig.getWorkflowId(), workflowConfig,
+ clusterDataCache.getWorkflowContext(workflowConfig.getWorkflowId()), manager,
+ _rebalanceScheduler);
+ } catch (Exception e) {
+ LOG.warn(String.format("Failed to purge job for workflow %s with reason %s",
+ workflowConfig.getWorkflowId(), e.toString()));
+ }
}
}
}
[2/2] helix git commit: Fix mappingCalculator NPE
Posted by jx...@apache.org.
Fix mappingCalculator NPE
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/d75d5fcd
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/d75d5fcd
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/d75d5fcd
Branch: refs/heads/master
Commit: d75d5fcdc9736a0023e8b98a1e55d802fbf4212b
Parents: 4a8e785
Author: Junkai Xue <jx...@linkedin.com>
Authored: Wed Aug 22 14:59:29 2018 -0700
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Thu Oct 25 16:12:44 2018 -0700
----------------------------------------------------------------------
.../apache/helix/controller/stages/task/TaskSchedulingStage.java | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/d75d5fcd/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java
index 86c6c32..cbb0160 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java
@@ -134,6 +134,7 @@ public class TaskSchedulingStage extends AbstractBaseStage {
} else {
// Create dummy rebalancer for dropping existing current states
rebalancer = new SemiAutoRebalancer();
+ mappingCalculator = new SemiAutoRebalancer();
}
if (rebalancer instanceof TaskRebalancer) {