You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/10/29 17:50:39 UTC

[1/5] helix git commit: [HELIX-762] TASK: Change LOG mode from info to debug

Repository: helix
Updated Branches:
  refs/heads/master d75d5fcdc -> 5033785c2


[HELIX-762] TASK: Change LOG mode from info to debug

In production, it was observed that some users were running thousands of tasks, and since AssignableInstance leaves a line of log for each task assigned or released, the amount of log that was being generated was too much, and it was too verbose.
Changelist:
1. Change the logging mode from info to debug in AssignableInstance and AssignableInstanceManager


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/e7b960c2
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/e7b960c2
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/e7b960c2

Branch: refs/heads/master
Commit: e7b960c22896c08337292d20f674f20a7f1391d0
Parents: d75d5fc
Author: Hunter Lee <hu...@linkedin.com>
Authored: Fri Oct 26 18:32:16 2018 -0700
Committer: Hunter Lee <hu...@linkedin.com>
Committed: Fri Oct 26 18:33:25 2018 -0700

----------------------------------------------------------------------
 .../helix/task/AssignableInstanceManager.java   | 16 +++----
 .../helix/task/assigner/AssignableInstance.java | 44 ++++++++++----------
 2 files changed, 30 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/e7b960c2/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java b/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java
index abe5f1c..2693005 100644
--- a/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java
+++ b/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java
@@ -80,7 +80,7 @@ public class AssignableInstanceManager {
       AssignableInstance assignableInstance =
           new AssignableInstance(clusterConfig, instanceConfig, liveInstance);
       _assignableInstanceMap.put(instanceConfig.getInstanceName(), assignableInstance);
-      LOG.info("AssignableInstance created for instance: {}", instanceName);
+      LOG.debug("AssignableInstance created for instance: {}", instanceName);
     }
 
     // Update task profiles by traversing all TaskContexts
@@ -89,7 +89,7 @@ public class AssignableInstanceManager {
       JobConfig jobConfig = jobConfigMap.get(jobName);
       JobContext jobContext = taskDataCache.getJobContext(jobName);
       if (jobConfig == null || jobContext == null) {
-        LOG.warn(
+        LOG.debug(
             "JobConfig or JobContext for this job is null. Skipping this job! Job name: {}, JobConfig: {}, JobContext: {}",
             jobName, jobConfig, jobContext);
         continue; // Ignore this job if either the config or context is null
@@ -115,7 +115,7 @@ public class AssignableInstanceManager {
             taskId = String.format("%s_%s", jobConfig.getJobId(), taskIndex);
           }
           if (assignedInstance == null) {
-            LOG.warn(
+            LOG.debug(
                 "This task's TaskContext does not have an assigned instance! Task will be ignored. "
                     + "Job: {}, TaskId: {}, TaskIndex: {}",
                 jobContext.getName(), taskId, taskIndex);
@@ -128,11 +128,11 @@ public class AssignableInstanceManager {
                 assignableInstance.restoreTaskAssignResult(taskId, taskConfig, quotaType);
             if (taskAssignResult.isSuccessful()) {
               _taskAssignResultMap.put(taskId, taskAssignResult);
-              LOG.info("TaskAssignResult restored for taskId: {}, assigned on instance: {}", taskId,
+              LOG.debug("TaskAssignResult restored for taskId: {}, assigned on instance: {}", taskId,
                   assignedInstance);
             }
           } else {
-            LOG.warn(
+            LOG.debug(
                 "While building AssignableInstance map, discovered that the instance a task is assigned to is no "
                     + "longer a LiveInstance! TaskAssignResult will not be created and no resource will be taken "
                     + "up for this task. Job: {}, TaskId: {}, TaskIndex: {}, Instance: {}",
@@ -179,7 +179,7 @@ public class AssignableInstanceManager {
         AssignableInstance assignableInstance =
             new AssignableInstance(clusterConfig, instanceConfig, liveInstance);
         _assignableInstanceMap.put(instanceName, assignableInstance);
-        LOG.info("AssignableInstance created for instance: {} during updateAssignableInstances",
+        LOG.debug("AssignableInstance created for instance: {} during updateAssignableInstances",
             instanceName);
       }
       // Remove because we've confirmed that this AssignableInstance is a LiveInstance as well
@@ -196,14 +196,14 @@ public class AssignableInstanceManager {
           if (_taskAssignResultMap.get(taskToRemove).getAssignableInstance().getInstanceName()
               .equals(instanceToBeRemoved.getInstanceName())) {
             _taskAssignResultMap.remove(taskToRemove); // TODO: Hunter: Move this if necessary
-            LOG.info(
+            LOG.debug(
                 "TaskAssignResult removed because its assigned instance is no longer live. TaskID: {}, instance: {}",
                 taskToRemove, instanceToBeRemoved.getInstanceName());
           }
         }
       }
       _assignableInstanceMap.remove(instanceToBeRemoved.getInstanceName());
-      LOG.info(
+      LOG.debug(
           "Non-live AssignableInstance removed for instance: {} during updateAssignableInstances",
           instanceToBeRemoved.getInstanceName());
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/e7b960c2/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java b/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java
index a1f2fd4..be38ab4 100644
--- a/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java
+++ b/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java
@@ -113,18 +113,18 @@ public class AssignableInstance {
       resourceCapacity = new HashMap<>();
       resourceCapacity.put(LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name(),
           Integer.toString(TaskStateModelFactory.TASK_THREADPOOL_SIZE));
-      logger.info("No resource capacity provided in LiveInstance {}, assuming default capacity: {}",
+      logger.debug("No resource capacity provided in LiveInstance {}, assuming default capacity: {}",
           _instanceConfig.getInstanceName(), resourceCapacity);
     }
 
     if (typeQuotaRatio == null) {
       typeQuotaRatio = new HashMap<>();
       typeQuotaRatio.put(DEFAULT_QUOTA_TYPE, Integer.toString(1));
-      logger.info("No quota type ratio provided in LiveInstance {}, assuming default ratio: {}",
+      logger.debug("No quota type ratio provided in LiveInstance {}, assuming default ratio: {}",
           _instanceConfig.getInstanceName(), typeQuotaRatio);
     }
 
-    logger.info(
+    logger.debug(
         "Updating capacity for AssignableInstance {}. Resource Capacity: {}; Type Quota Ratio: {}",
         _instanceConfig.getInstanceName(), resourceCapacity, typeQuotaRatio);
 
@@ -135,7 +135,7 @@ public class AssignableInstance {
         int capacity = Integer.valueOf(resEntry.getValue());
 
         if (!_totalCapacity.containsKey(resourceType)) {
-          logger.info("Adding InstanceResourceType {}", resourceType);
+          logger.debug("Adding InstanceResourceType {}", resourceType);
           _usedCapacity.put(resourceType, new HashMap<String, Integer>());
         }
         tempTotalCapacity.put(resourceType, new HashMap<String, Integer>());
@@ -162,7 +162,7 @@ public class AssignableInstance {
 
           // Add quota for new quota type
           if (!_usedCapacity.get(resourceType).containsKey(quotaType)) {
-            logger.info("Adding QuotaType {} for resource {}", quotaType, resourceType);
+            logger.debug("Adding QuotaType {} for resource {}", quotaType, resourceType);
             _usedCapacity.get(resourceType).put(quotaType, 0);
           }
         }
@@ -177,13 +177,13 @@ public class AssignableInstance {
       // Purge used capacity for resource deleted
       _usedCapacity.keySet().retainAll(resourceCapacity.keySet());
 
-      logger.info(
+      logger.debug(
           "Finished updating capacity for AssignableInstance {}. Current capacity {}. Current usage: {}",
           _instanceConfig.getInstanceName(), _totalCapacity, _usedCapacity);
     } catch (Exception e) {
       // TODO: properly escalate error
       logger.error(
-          "Failed to update capacity for Assignableinstance {}, still using current capacity {}. Current usage: {}",
+          "Failed to update capacity for AssignableInstance {}, still using current capacity {}. Current usage: {}",
           _instanceConfig.getInstanceName(), _totalCapacity, _usedCapacity, e);
     }
   }
@@ -196,14 +196,14 @@ public class AssignableInstance {
    */
   public void updateConfigs(ClusterConfig clusterConfig, InstanceConfig instanceConfig,
       LiveInstance liveInstance) {
-    logger.info("Updating configs for AssignableInstance {}", _instanceConfig.getInstanceName());
+    logger.debug("Updating configs for AssignableInstance {}", _instanceConfig.getInstanceName());
     boolean refreshCapacity = false;
     if (clusterConfig != null && clusterConfig.getTaskQuotaRatioMap() != null) {
       if (!clusterConfig.getTaskQuotaRatioMap().equals(_clusterConfig.getTaskQuotaRatioMap())) {
         refreshCapacity = true;
       }
       _clusterConfig = clusterConfig;
-      logger.info("Updated cluster config");
+      logger.debug("Updated cluster config");
     }
 
     if (liveInstance != null) {
@@ -217,7 +217,7 @@ public class AssignableInstance {
           refreshCapacity = true;
         }
         _liveInstance = liveInstance;
-        logger.info("Updated live instance");
+        logger.debug("Updated live instance");
       }
     }
 
@@ -228,7 +228,7 @@ public class AssignableInstance {
             _instanceConfig.getInstanceName(), instanceConfig.getInstanceName());
       } else {
         _instanceConfig = instanceConfig;
-        logger.info("Updated instance config");
+        logger.debug("Updated instance config");
       }
     }
 
@@ -236,7 +236,7 @@ public class AssignableInstance {
       refreshTotalCapacity();
     }
 
-    logger.info("Updated configs for AssignableInstance {}", _instanceConfig.getInstanceName());
+    logger.debug("Updated configs for AssignableInstance {}", _instanceConfig.getInstanceName());
   }
 
   /**
@@ -256,7 +256,7 @@ public class AssignableInstance {
     }
 
     if (_currentAssignments.contains(task.getId())) {
-      logger.warn(
+      logger.debug(
           "Task: {} of quotaType: {} is already assigned to this instance. Instance name: {}",
           task.getId(), quotaType, getInstanceName());
 
@@ -272,7 +272,7 @@ public class AssignableInstance {
     // Fail when no such resource type
     if (!_totalCapacity.containsKey(resourceType)) {
 
-      logger.warn(
+      logger.debug(
           "AssignableInstance does not support the given resourceType: {}. Task: {}, quotaType: {}, Instance name: {}",
           resourceType, task.getId(), quotaType, getInstanceName());
 
@@ -288,7 +288,7 @@ public class AssignableInstance {
     }
     if (!_totalCapacity.get(resourceType).containsKey(quotaType)) {
 
-      logger.warn(
+      logger.debug(
           "AssignableInstance does not support the given quotaType: {}. Task: {}, quotaType: {}, Instance name: {}. Task will be assigned as DEFAULT type.",
           quotaType, task.getId(), quotaType, getInstanceName());
       quotaType = DEFAULT_QUOTA_TYPE;
@@ -301,8 +301,8 @@ public class AssignableInstance {
     // Fail with insufficient quota
     if (capacity <= usage) {
 
-      logger.warn(
-          "AssignableInstance does not have enough capacity for quotaType: {}. Task: {}, quotaType: {}, Instance name: {}. Current capacity: {} capacity needed to schedule: {}",
+      logger.debug(
+          "AssignableInstance does not have enough capacity for quotaType: {}. Task: {}, quotaType: {}, Instance name: {}. Total capacity: {} Current usage: {}",
           quotaType, task.getId(), quotaType, getInstanceName(), capacity, usage);
 
       return new TaskAssignResult(task, quotaType, this, false, 0,
@@ -366,11 +366,11 @@ public class AssignableInstance {
       }
     } else {
       // resourceType is not found. Leave a warning log and will not touch quota
-      logger.warn(
+      logger.debug(
           "Task's requested resource type is not supported. TaskConfig: %s; UsedCapacity: %s; ResourceType: %s",
           result.getTaskConfig(), _usedCapacity, resourceType);
     }
-    logger.info("Assigned task {} to instance {}", result.getTaskConfig().getId(),
+    logger.debug("Assigned task {} to instance {}", result.getTaskConfig().getId(),
         _instanceConfig.getInstanceName());
   }
 
@@ -385,12 +385,12 @@ public class AssignableInstance {
    */
   public synchronized void release(TaskConfig taskConfig, String quotaType) {
     if (!_currentAssignments.contains(taskConfig.getId())) {
-      logger.warn("Task {} is not assigned on instance {}", taskConfig.getId(),
+      logger.debug("Task {} is not assigned on instance {}", taskConfig.getId(),
           _instanceConfig.getInstanceName());
       return;
     }
     if (quotaType == null) {
-      logger.warn("Task {}'s quotaType is null. Trying to release as DEFAULT type.",
+      logger.debug("Task {}'s quotaType is null. Trying to release as DEFAULT type.",
           taskConfig.getId());
       quotaType = AssignableInstance.DEFAULT_QUOTA_TYPE;
     }
@@ -414,7 +414,7 @@ public class AssignableInstance {
     // If the resource type is not found, we just remove from currentAssignments since no adjustment
     // can be made
     _currentAssignments.remove(taskConfig.getId());
-    logger.info("Released task {} from instance {}", taskConfig.getId(),
+    logger.debug("Released task {} from instance {}", taskConfig.getId(),
         _instanceConfig.getInstanceName());
   }
 


[4/5] helix git commit: [HELIX-765] TASK: Build quota profile from scratch every rebalance

Posted by jx...@apache.org.
[HELIX-765] TASK: Build quota profile from scratch every rebalance

It has been reported that instances have a full quota despite no tasks existing in their CURRENTSTATES. The cause of this is not clear, so making ClusterDataCache trigger a refresh of all AssignableInstances will ensure that there aren't situations where it looks like there has been a thread leak. Optimizations will be implemented if necessary.
Changelist:
1. Make AssignableInstanceManager build all AssignableInstances from scratch every rebalance


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/930a4b7a
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/930a4b7a
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/930a4b7a

Branch: refs/heads/master
Commit: 930a4b7ae7eb63be0a751a593ba630ae55fb2cfb
Parents: d33d9ef
Author: Hunter Lee <hu...@linkedin.com>
Authored: Fri Oct 26 19:06:42 2018 -0700
Committer: Hunter Lee <hu...@linkedin.com>
Committed: Fri Oct 26 19:06:42 2018 -0700

----------------------------------------------------------------------
 .../controller/stages/ClusterDataCache.java     | 29 ++++++++++++--------
 1 file changed, 17 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/930a4b7a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index 67b59a8..93aea4f 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -209,7 +209,6 @@ public class ClusterDataCache extends AbstractDataCache {
     }
 
     _liveInstanceMap = new HashMap<>(_liveInstanceCacheMap);
-    _liveInstanceMap = new HashMap<>(_liveInstanceCacheMap);
     _instanceConfigMap = new ConcurrentHashMap<>(_instanceConfigCacheMap);
     _resourceConfigMap = new HashMap<>(_resourceConfigCacheMap);
 
@@ -230,17 +229,23 @@ public class ClusterDataCache extends AbstractDataCache {
       // Refresh AssignableInstanceManager
       AssignableInstanceManager assignableInstanceManager =
           _taskDataCache.getAssignableInstanceManager();
-      if (_existsClusterConfigChange) {
-        // Update both flags since buildAssignableInstances includes updateAssignableInstances
-        _existsClusterConfigChange = false;
-        _existsInstanceChange = false;
-        assignableInstanceManager.buildAssignableInstances(_clusterConfig, _taskDataCache,
-            _liveInstanceMap, _instanceConfigMap);
-      } else if (_existsInstanceChange) {
-        _existsInstanceChange = false;
-        assignableInstanceManager.updateAssignableInstances(_clusterConfig, _liveInstanceMap,
-            _instanceConfigMap);
-      }
+      // Build from scratch every time
+      assignableInstanceManager.buildAssignableInstances(_clusterConfig, _taskDataCache,
+          _liveInstanceMap, _instanceConfigMap);
+      /**
+       * TODO: Consider this for optimization after sufficient testing
+       * if (_existsClusterConfigChange) {
+       * // Update both flags since buildAssignableInstances includes updateAssignableInstances
+       * _existsClusterConfigChange = false;
+       * _existsInstanceChange = false;
+       * assignableInstanceManager.buildAssignableInstances(_clusterConfig, _taskDataCache,
+       * _liveInstanceMap, _instanceConfigMap);
+       * } else if (_existsInstanceChange) {
+       * _existsInstanceChange = false;
+       * assignableInstanceManager.updateAssignableInstances(_clusterConfig, _liveInstanceMap,
+       * _instanceConfigMap);
+       * }
+       **/
     }
 
     _instanceMessagesCache.refresh(accessor, _liveInstanceMap);


[2/5] helix git commit: [HELIX-763] Task:Ignore tasks whose workflow and job are inactive

Posted by jx...@apache.org.
[HELIX-763] Task:Ignore tasks whose workflow and job are inactive

It was discovered that by manual testing, there were task states in INIT and RUNNING, and they were occupying a thread count even though their parent job or workflow was in an inactive state (terminal or stopped). This was happening when the capacities were being rebuilt from scratch, which could have caused a thread leak.
Changelist:
1. Add a check in buildAssignableInstances() so that it ignores workflows and jobs whose states are inactive states (that is, their tasks cannot be occupying a thread on Participants)


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/e492d9f6
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/e492d9f6
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/e492d9f6

Branch: refs/heads/master
Commit: e492d9f663d8edad0f344208cc8affc6828708a3
Parents: e7b960c
Author: Hunter Lee <hu...@linkedin.com>
Authored: Fri Oct 26 18:49:52 2018 -0700
Committer: Hunter Lee <hu...@linkedin.com>
Committed: Fri Oct 26 18:49:52 2018 -0700

----------------------------------------------------------------------
 .../helix/task/AssignableInstanceManager.java   | 45 +++++++++++++++++++-
 1 file changed, 43 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/e492d9f6/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java b/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java
index 2693005..1c1ed69 100644
--- a/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java
+++ b/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java
@@ -21,7 +21,6 @@ package org.apache.helix.task;
 
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
@@ -94,6 +93,24 @@ public class AssignableInstanceManager {
             jobName, jobConfig, jobContext);
         continue; // Ignore this job if either the config or context is null
       }
+
+      // First, check that the workflow and job are in valid states. This is important because
+      // sometimes aborted jobs do not get a proper update of their task states, meaning there could
+      // be INIT and RUNNING tasks we want to ignore
+      String workflowName = jobConfig.getWorkflow();
+      WorkflowConfig workflowConfig = taskDataCache.getWorkflowConfig(workflowName);
+      WorkflowContext workflowContext = taskDataCache.getWorkflowContext(workflowName);
+      if (workflowConfig == null || workflowContext == null) {
+        // There is no workflow config or context - meaning no tasks are currently scheduled and
+        // invalid, so skip this job
+        continue;
+      }
+      TaskState workflowState = workflowContext.getWorkflowState();
+      TaskState jobState = workflowContext.getJobState(jobName);
+      if (isResourceTerminalOrStopped(workflowState) || isResourceTerminalOrStopped(jobState)) {
+        continue;
+      }
+
       String quotaType = jobConfig.getJobType();
       if (quotaType == null) {
         quotaType = AssignableInstance.DEFAULT_QUOTA_TYPE;
@@ -236,4 +253,28 @@ public class AssignableInstanceManager {
   public Map<String, TaskAssignResult> getTaskAssignResultMap() {
     return _taskAssignResultMap;
   }
-}
\ No newline at end of file
+
+  /**
+   * Determines whether it's possible for a given workflow or a job to have any running tasks. In
+   * other words, rule out all resources that are in terminal states or have been stopped.
+   * @param state
+   * @return
+   */
+  private boolean isResourceTerminalOrStopped(TaskState state) {
+    if (state == null) {
+      // If the state is null, it cannot have currently-running tasks either, so consider it
+      // inactive
+      return true;
+    }
+    switch (state) {
+      case ABORTED:
+      case FAILED:
+      case STOPPED:
+      case COMPLETED:
+      case TIMED_OUT:
+      case NOT_STARTED:
+        return true;
+    }
+    return false;
+  }
+}


[5/5] helix git commit: [HELIX-766] TASK: Add logging functionality in AssignableInstanceManager

Posted by jx...@apache.org.
[HELIX-766] TASK: Add logging functionality in AssignableInstanceManager

In order to debug task-related inquiries and issues, we realized that it would be very helpful if we logged there was a log recording the current quota capacity of all AssignableInstances. This is for cases where we see jobs whose tasks are not getting assigned so that we could quickly rule out the possibility of bugs in quota-based scheduling.
Changelist:
    1. Add a method that logs current quota profile in a JSON format with an option flag of only displaying when there are quota types whose capacities are full
    2. Add info logs in AssignableInstanceManager


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/5033785c
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/5033785c
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/5033785c

Branch: refs/heads/master
Commit: 5033785c231af363953367f65f77513911b753f5
Parents: 930a4b7
Author: Hunter Lee <hu...@linkedin.com>
Authored: Fri Oct 26 19:08:02 2018 -0700
Committer: Hunter Lee <hu...@linkedin.com>
Committed: Fri Oct 26 19:08:02 2018 -0700

----------------------------------------------------------------------
 .../controller/stages/ClusterDataCache.java     |  4 +-
 .../helix/task/AssignableInstanceManager.java   | 55 +++++++++++++++++++-
 2 files changed, 56 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/5033785c/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index 93aea4f..da22c5e 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -233,7 +233,8 @@ public class ClusterDataCache extends AbstractDataCache {
       assignableInstanceManager.buildAssignableInstances(_clusterConfig, _taskDataCache,
           _liveInstanceMap, _instanceConfigMap);
       /**
-       * TODO: Consider this for optimization after sufficient testing
+       * TODO: (Hunter) Consider this for optimization after fixing the problem of quotas not being
+       * properly released for targeted tasks
        * if (_existsClusterConfigChange) {
        * // Update both flags since buildAssignableInstances includes updateAssignableInstances
        * _existsClusterConfigChange = false;
@@ -246,6 +247,7 @@ public class ClusterDataCache extends AbstractDataCache {
        * _instanceConfigMap);
        * }
        **/
+      assignableInstanceManager.logQuotaProfileJSON(false);
     }
 
     _instanceMessagesCache.refresh(accessor, _liveInstanceMap);

http://git-wip-us.apache.org/repos/asf/helix/blob/5033785c/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java b/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java
index 1c1ed69..7ab3f0d 100644
--- a/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java
+++ b/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java
@@ -31,6 +31,9 @@ import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.task.assigner.AssignableInstance;
 import org.apache.helix.task.assigner.TaskAssignResult;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.node.ObjectNode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -145,8 +148,8 @@ public class AssignableInstanceManager {
                 assignableInstance.restoreTaskAssignResult(taskId, taskConfig, quotaType);
             if (taskAssignResult.isSuccessful()) {
               _taskAssignResultMap.put(taskId, taskAssignResult);
-              LOG.debug("TaskAssignResult restored for taskId: {}, assigned on instance: {}", taskId,
-                  assignedInstance);
+              LOG.debug("TaskAssignResult restored for taskId: {}, assigned on instance: {}",
+                  taskId, assignedInstance);
             }
           } else {
             LOG.debug(
@@ -158,6 +161,8 @@ public class AssignableInstanceManager {
         }
       }
     }
+    LOG.info(
+        "AssignableInstanceManager built AssignableInstances from scratch based on contexts in TaskDataCache due to Controller switch or ClusterConfig change.");
   }
 
   /**
@@ -224,6 +229,8 @@ public class AssignableInstanceManager {
           "Non-live AssignableInstance removed for instance: {} during updateAssignableInstances",
           instanceToBeRemoved.getInstanceName());
     }
+    LOG.info(
+        "AssignableInstanceManager updated AssignableInstances due to LiveInstance/InstanceConfig change.");
   }
 
   /**
@@ -277,4 +284,48 @@ public class AssignableInstanceManager {
     }
     return false;
   }
+
+  /*
+   * Creates a JSON-style String that shows the quota profile and logs it.
+   * TODO: Make this with an associated event ID if this becomes a performance bottleneck
+   * @param onlyDisplayIfFull if true, this String will only contain the profile for instances whose
+   * quota capacity is at its full to avoid cluttering up the log
+   */
+  public void logQuotaProfileJSON(boolean onlyDisplayIfFull) {
+    // Create a String to use as the log for quota status
+    ObjectMapper mapper = new ObjectMapper();
+    JsonNode instanceNode = mapper.createObjectNode();
+
+    // Loop through all instances
+    for (Map.Entry<String, AssignableInstance> instanceEntry : _assignableInstanceMap.entrySet()) {
+      AssignableInstance assignableInstance = instanceEntry.getValue();
+      boolean capacityFull = false;
+      JsonNode resourceTypeNode = mapper.createObjectNode();
+      for (Map.Entry<String, Map<String, Integer>> capacityEntry : assignableInstance
+          .getTotalCapacity().entrySet()) {
+        String resourceType = capacityEntry.getKey();
+        Map<String, Integer> quotaTypeMap = capacityEntry.getValue();
+        JsonNode quotaTypeNode = mapper.createObjectNode();
+        for (Map.Entry<String, Integer> typeEntry : quotaTypeMap.entrySet()) {
+          String quotaType = typeEntry.getKey();
+          int totalCapacity = typeEntry.getValue();
+          int usedCapacity = assignableInstance.getUsedCapacity().get(resourceType).get(quotaType);
+          if (!capacityFull) {
+            capacityFull = totalCapacity <= usedCapacity;
+          }
+          String capacityString = String.format("%d/%d", usedCapacity, totalCapacity);
+          ((ObjectNode) quotaTypeNode).put(quotaType, capacityString);
+        }
+        ((ObjectNode) resourceTypeNode).put(resourceType, quotaTypeNode);
+      }
+      // If onlyDisplayIfFull, do not add the JsonNode to the parent node
+      if (onlyDisplayIfFull && !capacityFull) {
+        continue;
+      }
+      ((ObjectNode) instanceNode).put(instanceEntry.getKey(), resourceTypeNode);
+    }
+    if (instanceNode.size() > 0) {
+      LOG.info("Current quota capacity: {}", instanceNode.toString());
+    }
+  }
 }


[3/5] helix git commit: [HELIX-764] TASK: Fix LiveInstanceCurrentState change flag

Posted by jx...@apache.org.
[HELIX-764] TASK: Fix LiveInstanceCurrentState change flag

Previously, existsLiveInstanceOrCurrentStateChange was getting reset in ClusterDataCache when its getter was called. This was problematic because if there were multiple jobs or multiple workflows, whoever calls this getter would get the correct flag value, and the ensuing callers would get a false because the flag would have been reset. This RB fixes that bug by reseting the flat right in the beginning of refresh() call in ClusterDataCache, which allows all callers during that pipeline would get the same, correct value.
Changelist:
1. Change the getter so that it does not reset the flag; instead, reset the flag in the beginning of refresh()


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/d33d9efe
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/d33d9efe
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/d33d9efe

Branch: refs/heads/master
Commit: d33d9efea25fe9d2bbbb9e84a4ce7614b544ef2d
Parents: e492d9f
Author: Hunter Lee <hu...@linkedin.com>
Authored: Fri Oct 26 19:03:47 2018 -0700
Committer: Hunter Lee <hu...@linkedin.com>
Committed: Fri Oct 26 19:03:47 2018 -0700

----------------------------------------------------------------------
 .../controller/stages/ClusterDataCache.java     | 22 ++++++++++----------
 1 file changed, 11 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/d33d9efe/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index f960603..67b59a8 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -151,6 +151,9 @@ public class ClusterDataCache extends AbstractDataCache {
     long startTime = System.currentTimeMillis();
     Builder keyBuilder = accessor.keyBuilder();
 
+    // Reset the LiveInstance/CurrentState change flag
+    _existsLiveInstanceOrCurrentStateChange = false;
+
     if (_propertyDataChangedMap.get(ChangeType.IDEAL_STATE)) {
       _propertyDataChangedMap.put(ChangeType.IDEAL_STATE, false);
       clearCachedResourceAssignments();
@@ -935,14 +938,12 @@ public class ClusterDataCache extends AbstractDataCache {
   }
 
   /**
-   * Returns whether there has been LiveInstance change. Once called, it will be set to false. To be
-   * used for task-assigning.
+   * Returns whether there has been LiveInstance or CurrentState change. To be used for
+   * task-assigning in AbstractTaskDispatcher.
    * @return
    */
   public boolean getExistsLiveInstanceOrCurrentStateChange() {
-    boolean change = _existsLiveInstanceOrCurrentStateChange;
-    _existsLiveInstanceOrCurrentStateChange = false;
-    return change;
+    return _existsLiveInstanceOrCurrentStateChange;
   }
 
   private Map<String, ResourceConfig> refreshResourceConfigs(HelixDataAccessor accessor) {
@@ -960,17 +961,16 @@ public class ClusterDataCache extends AbstractDataCache {
 
     for (String resourceConfig : _resourceConfigMap.keySet()) {
       cachedKeys.add(keyBuilder.resourceConfig(resourceConfig));
-      cachedResourceConfigMap
-          .put(keyBuilder.resourceConfig(resourceConfig), _resourceConfigMap.get(resourceConfig));
+      cachedResourceConfigMap.put(keyBuilder.resourceConfig(resourceConfig),
+          _resourceConfigMap.get(resourceConfig));
     }
     cachedKeys.retainAll(currentResourceConfigKeys);
 
     Set<PropertyKey> reloadKeys = new HashSet<>(currentResourceConfigKeys);
     reloadKeys.removeAll(cachedKeys);
 
-    Map<PropertyKey, ResourceConfig> updatedMap =
-        refreshProperties(accessor, new LinkedList<>(reloadKeys), new ArrayList<>(cachedKeys),
-            cachedResourceConfigMap);
+    Map<PropertyKey, ResourceConfig> updatedMap = refreshProperties(accessor,
+        new LinkedList<>(reloadKeys), new ArrayList<>(cachedKeys), cachedResourceConfigMap);
     for (ResourceConfig resourceConfig : updatedMap.values()) {
       refreshedResourceConfigs.put(resourceConfig.getResourceName(), resourceConfig);
     }
@@ -1000,4 +1000,4 @@ public class ClusterDataCache extends AbstractDataCache {
 
     return sb.toString();
   }
-}
\ No newline at end of file
+}