You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/01/25 21:49:29 UTC

[38/50] [abbrv] helix git commit: Remove all legacy dependencies on "controller" as the controller nodes' name.

Remove all legacy dependencies on "controller" as the controller nodes' name.

Should check instance type for the instance information instead of relying on the name. Name can be configured as any string.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/5ffab62f
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/5ffab62f
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/5ffab62f

Branch: refs/heads/master
Commit: 5ffab62fc96007cb636a6da97be0e62ddb63cd91
Parents: 89089b4
Author: Jiajun Wang <jj...@linkedin.com>
Authored: Tue Nov 14 00:38:44 2017 -0800
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Wed Jan 24 18:32:52 2018 -0800

----------------------------------------------------------------------
 .../DefaultSchedulerMessageHandlerFactory.java  |   4 +-
 .../messaging/DefaultMessagingService.java      |   4 +-
 .../handling/HelixStateTransitionHandler.java   |  21 ++-
 .../helix/messaging/handling/HelixTask.java     |  22 +--
 .../messaging/handling/HelixTaskExecutor.java   |  32 ++--
 .../java/org/apache/helix/model/Message.java    |   4 +-
 .../participant/HelixStateMachineEngine.java    |   2 +-
 .../org/apache/helix/util/StatusUpdateUtil.java | 149 ++++++++++++-------
 8 files changed, 141 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/5ffab62f/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java b/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
index 51bb5c3..443df9f 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
@@ -80,7 +80,7 @@ public class DefaultSchedulerMessageHandlerFactory implements MessageHandlerFact
           + _timeout + " Ms");
 
       _statusUpdateUtil.logError(_originalMessage, SchedulerAsyncCallback.class, "Task timeout",
-          _manager.getHelixDataAccessor());
+          _manager);
       addSummary(_resultSummaryMap, _originalMessage, _manager, true);
     }
 
@@ -94,7 +94,7 @@ public class DefaultSchedulerMessageHandlerFactory implements MessageHandlerFact
       if (this.isDone()) {
         _logger.info("Scheduler msg " + _originalMessage.getMsgId() + " completed");
         _statusUpdateUtil.logInfo(_originalMessage, SchedulerAsyncCallback.class,
-            "Scheduler task completed", _manager.getHelixDataAccessor());
+            "Scheduler task completed", _manager);
         addSummary(_resultSummaryMap, _originalMessage, _manager, false);
       }
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/5ffab62f/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java b/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
index e776dc2..fb84fd7 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
@@ -204,7 +204,7 @@ public class DefaultMessagingService implements ClusterMessagingService {
     Message newMessage = new Message(message.getRecord(), id);
     newMessage.setMsgId(id);
     newMessage.setSrcName(_manager.getInstanceName());
-    newMessage.setTgtName("Controller");
+    newMessage.setTgtName(InstanceType.CONTROLLER.name());
     messages.add(newMessage);
     return messages;
   }
@@ -298,7 +298,7 @@ public class DefaultMessagingService implements ClusterMessagingService {
 
       if (_manager.getInstanceType() == InstanceType.CONTROLLER
           || _manager.getInstanceType() == InstanceType.CONTROLLER_PARTICIPANT) {
-        nopMsg.setTgtName("Controller");
+        nopMsg.setTgtName(InstanceType.CONTROLLER.name());
         accessor.setProperty(keyBuilder.controllerMessage(nopMsg.getId()), nopMsg);
       }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/5ffab62f/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
index ece7ac7..9412dde 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
@@ -89,7 +89,7 @@ public class HelixStateTransitionHandler extends MessageHandler {
               + Arrays.toString(Message.Attributes.values());
 
       _statusUpdateUtil.logError(_message, HelixStateTransitionHandler.class, errorMessage,
-          _manager.getHelixDataAccessor());
+          _manager);
       logger.error(errorMessage);
       throw new HelixException(errorMessage);
     }
@@ -119,7 +119,7 @@ public class HelixStateTransitionHandler extends MessageHandler {
               + _message.getTgtName();
 
       _statusUpdateUtil.logError(_message, HelixStateTransitionHandler.class, errorMessage,
-          accessor);
+          _manager);
       logger.error(errorMessage);
       throw new HelixStateMismatchException(errorMessage);
     }
@@ -162,7 +162,7 @@ public class HelixStateTransitionHandler extends MessageHandler {
                                  HelixStateTransitionHandler.class,
                                  e,
                                  "Error when removing " + CurrentState.CurrentStateProperty.REQUESTED_STATE.name() +  " from current state.",
-                                 accessor);
+                                 _manager);
     }
   }
 
@@ -286,7 +286,7 @@ public class HelixStateTransitionHandler extends MessageHandler {
           new StateTransitionError(ErrorType.FRAMEWORK, ErrorCode.ERROR, e);
       _stateModel.rollbackOnError(_message, _notificationContext, error);
       _statusUpdateUtil.logError(_message, HelixStateTransitionHandler.class, e,
-          "Error when update current-state ", accessor);
+          "Error when update current-state ", _manager);
     }
   }
 
@@ -311,15 +311,14 @@ public class HelixStateTransitionHandler extends MessageHandler {
     synchronized (_stateModel) {
       HelixTaskResult taskResult = new HelixTaskResult();
       HelixManager manager = context.getManager();
-      HelixDataAccessor accessor = manager.getHelixDataAccessor();
 
       _statusUpdateUtil.logInfo(message, HelixStateTransitionHandler.class,
-          "Message handling task begin execute", accessor);
+          "Message handling task begin execute", manager);
       message.setExecuteStartTimeStamp(new Date().getTime());
 
       try {
         preHandleMessage();
-        invoke(accessor, context, taskResult, message);
+        invoke(manager, context, taskResult, message);
       } catch (HelixStateMismatchException e) {
         // Simply log error and return from here if State mismatch.
         // The current state of the state model is intact.
@@ -344,7 +343,7 @@ public class HelixStateTransitionHandler extends MessageHandler {
           taskResult.setCancelled(true);
         } else {
           _statusUpdateUtil
-              .logError(message, HelixStateTransitionHandler.class, e, errorMessage, accessor);
+              .logError(message, HelixStateTransitionHandler.class, e, errorMessage, manager);
           taskResult.setSuccess(false);
           taskResult.setMessage(e.toString());
           taskResult.setException(e);
@@ -361,11 +360,11 @@ public class HelixStateTransitionHandler extends MessageHandler {
     }
   }
 
-  private void invoke(HelixDataAccessor accessor, NotificationContext context,
+  private void invoke(HelixManager manager, NotificationContext context,
       HelixTaskResult taskResult, Message message) throws IllegalAccessException,
       InvocationTargetException, InterruptedException, HelixRollbackException {
     _statusUpdateUtil.logInfo(message, HelixStateTransitionHandler.class,
-        "Message handling invoking", accessor);
+        "Message handling invoking", manager);
 
     // by default, we invoke state transition function in state model
     Method methodToInvoke = null;
@@ -417,7 +416,7 @@ public class HelixStateTransitionHandler extends MessageHandler {
       taskResult.setSuccess(false);
 
       _statusUpdateUtil
-          .logError(message, HelixStateTransitionHandler.class, errorMessage, accessor);
+          .logError(message, HelixStateTransitionHandler.class, errorMessage, manager);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/5ffab62f/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
index 2543d81..85665c1 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
@@ -79,7 +79,7 @@ public class HelixTask implements MessageTask {
     logger.info("handling task: " + getTaskId() + " begin, at: " + start);
     HelixDataAccessor accessor = _manager.getHelixDataAccessor();
     _statusUpdateUtil.logInfo(_message, HelixTask.class, "Message handling task begin execute",
-        accessor);
+        _manager);
     _message.setExecuteStartTimeStamp(new Date().getTime());
 
     // add a concurrent map to hold currentStateUpdates for sub-messages of a batch-message
@@ -99,7 +99,7 @@ public class HelixTask implements MessageTask {
       taskResult.setInterrupted(true);
 
       _statusUpdateUtil.logError(_message, HelixTask.class, e,
-          "State transition interrupted, timeout:" + _isTimeout, accessor);
+          "State transition interrupted, timeout:" + _isTimeout, _manager);
       logger.info("Message " + _message.getMsgId() + " is interrupted");
     } catch (Exception e) {
       taskResult = new HelixTaskResult();
@@ -110,7 +110,7 @@ public class HelixTask implements MessageTask {
           "Exception while executing a message. " + e + " msgId: " + _message.getMsgId()
               + " type: " + _message.getMsgType();
       logger.error(errorMessage, e);
-      _statusUpdateUtil.logError(_message, HelixTask.class, e, errorMessage, accessor);
+      _statusUpdateUtil.logError(_message, HelixTask.class, e, errorMessage, _manager);
     }
 
     // cancel timeout task
@@ -120,7 +120,7 @@ public class HelixTask implements MessageTask {
     try {
       if (taskResult.isSuccess()) {
         _statusUpdateUtil
-            .logInfo(_message, _handler.getClass(), "Message handling task completed successfully", accessor);
+            .logInfo(_message, _handler.getClass(), "Message handling task completed successfully", _manager);
         logger.info("Message " + _message.getMsgId() + " completed.");
         _executor.getParticipantMonitor().reportProcessedMessage(_message, ParticipantMessageMonitor.ProcessedMessageState.COMPLETED);
       } else {
@@ -134,7 +134,7 @@ public class HelixTask implements MessageTask {
             logger.info("Message timeout, retry count: " + retryCount + " msgId:"
                 + _message.getMsgId());
             _statusUpdateUtil.logInfo(_message, _handler.getClass(),
-                "Message handling task timeout, retryCount:" + retryCount, accessor);
+                "Message handling task timeout, retryCount:" + retryCount, _manager);
             // Notify the handler that timeout happens, and the number of retries left
             // In case timeout happens (time out and also interrupted)
             // we should retry the execution of the message by re-schedule it in
@@ -151,7 +151,7 @@ public class HelixTask implements MessageTask {
           type = null;
           _statusUpdateUtil
               .logInfo(_message, _handler.getClass(), "Cancellation completed successfully",
-                  accessor);
+                  _manager);
           _executor.getParticipantMonitor().reportProcessedMessage(
               _message, ParticipantMessageMonitor.ProcessedMessageState.DISCARDED);
         } else {// logging for errors
@@ -160,7 +160,7 @@ public class HelixTask implements MessageTask {
               "Message execution failed. msgId: " + getTaskId() + ", errorMsg: "
                   + taskResult.getMessage();
           logger.error(errorMsg);
-          _statusUpdateUtil.logError(_message, _handler.getClass(), errorMsg, accessor);
+          _statusUpdateUtil.logError(_message, _handler.getClass(), errorMsg, _manager);
           _executor.getParticipantMonitor().reportProcessedMessage(
               _message, ParticipantMessageMonitor.ProcessedMessageState.FAILED);
         }
@@ -185,7 +185,7 @@ public class HelixTask implements MessageTask {
       String errorMessage =
           "Exception after executing a message, msgId: " + _message.getMsgId() + e;
       logger.error(errorMessage, e);
-      _statusUpdateUtil.logError(_message, HelixTask.class, errorMessage, accessor);
+      _statusUpdateUtil.logError(_message, HelixTask.class, errorMessage, _manager);
     } finally {
       long end = System.currentTimeMillis();
       logger.info("msg: " + _message.getMsgId() + " handling task completed, results:"
@@ -206,7 +206,7 @@ public class HelixTask implements MessageTask {
   private void removeMessageFromZk(HelixDataAccessor accessor, Message message) {
     Builder keyBuilder = accessor.keyBuilder();
     PropertyKey msgKey;
-    if (message.getTgtName().equalsIgnoreCase("controller")) {
+    if (message.getTgtName().equalsIgnoreCase(InstanceType.CONTROLLER.name())) {
       msgKey = keyBuilder.controllerMessage(message.getMsgId());
     } else {
       msgKey = keyBuilder.message(_manager.getInstanceName(), message.getMsgId());
@@ -259,7 +259,7 @@ public class HelixTask implements MessageTask {
     if (_message.getCorrelationId() != null
         && !message.getMsgType().equals(MessageType.TASK_REPLY.name())) {
       logger.info("Sending reply for message " + message.getCorrelationId());
-      _statusUpdateUtil.logInfo(message, HelixTask.class, "Sending reply", accessor);
+      _statusUpdateUtil.logInfo(message, HelixTask.class, "Sending reply", _manager);
 
       taskResult.getTaskResultMap().put("SUCCESS", "" + taskResult.isSuccess());
       taskResult.getTaskResultMap().put("INTERRUPTED", "" + taskResult.isInterrupted());
@@ -280,7 +280,7 @@ public class HelixTask implements MessageTask {
         accessor.setProperty(keyBuilder.controllerMessage(replyMessage.getMsgId()), replyMessage);
       }
       _statusUpdateUtil.logInfo(message, HelixTask.class,
-          "1 msg replied to " + replyMessage.getTgtName(), accessor);
+          "1 msg replied to " + replyMessage.getTgtName(), _manager);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/5ffab62f/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
index c0be583..064b6fd 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -398,10 +398,11 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
     String taskId = task.getTaskId();
     Message message = task.getMessage();
     NotificationContext notificationContext = task.getNotificationContext();
+    HelixManager manager = notificationContext.getManager();
 
     try {
       // Check to see if dedicate thread pool for handling state transition messages is configured or provided.
-      updateStateTransitionMessageThreadPool(message, notificationContext.getManager());
+      updateStateTransitionMessageThreadPool(message, manager);
 
       LOG.info("Scheduling message: " + taskId);
       // System.out.println("sched msg: " + message.getPartitionName() + "-"
@@ -409,8 +410,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
       // + message.getToState());
 
       _statusUpdateUtil.logInfo(message, HelixTaskExecutor.class,
-          "Message handling task scheduled", notificationContext.getManager()
-              .getHelixDataAccessor());
+          "Message handling task scheduled", manager);
 
       // this sync guarantees that ExecutorService.submit() task and put taskInfo into map are
       // sync'ed
@@ -441,23 +441,19 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
           } else {
             LOG.debug("Message does not have timeout. MsgId: " + task.getTaskId());
           }
-
           _taskMap.put(taskId, new MessageTaskInfo(task, future, timerTask));
 
           LOG.info("Message: " + taskId + " handling task scheduled");
-
           return true;
         } else {
           _statusUpdateUtil.logWarning(message, HelixTaskExecutor.class,
-              "Message handling task already sheduled for " + taskId, notificationContext
-                  .getManager().getHelixDataAccessor());
+              "Message handling task already sheduled for " + taskId, manager);
         }
       }
     } catch (Exception e) {
       LOG.error("Error while executing task. " + message, e);
-
       _statusUpdateUtil.logError(message, HelixTaskExecutor.class, e, "Error while executing task "
-          + e, notificationContext.getManager().getHelixDataAccessor());
+          + e, manager);
     }
     return false;
   }
@@ -480,25 +476,25 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
         Future<HelixTaskResult> future = taskInfo.getFuture();
         removeMessageFromTaskAndFutureMap(message);
         _statusUpdateUtil.logInfo(message, HelixTaskExecutor.class, "Canceling task: " + taskId,
-            notificationContext.getManager().getHelixDataAccessor());
+            notificationContext.getManager());
 
         // If the thread is still running it will be interrupted if cancel(true)
         // is called. So state transition callbacks should implement logic to
         // return if it is interrupted.
         if (future.cancel(true)) {
           _statusUpdateUtil.logInfo(message, HelixTaskExecutor.class, "Canceled task: " + taskId,
-              notificationContext.getManager().getHelixDataAccessor());
+              notificationContext.getManager());
           _taskMap.remove(taskId);
           return true;
         } else {
           _statusUpdateUtil.logInfo(message, HelixTaskExecutor.class,
               "fail to cancel task: " + taskId,
-              notificationContext.getManager().getHelixDataAccessor());
+              notificationContext.getManager());
         }
       } else {
         _statusUpdateUtil.logWarning(message, HelixTaskExecutor.class,
             "fail to cancel task: " + taskId + ", future not found",
-            notificationContext.getManager().getHelixDataAccessor());
+            notificationContext.getManager());
       }
     }
     return false;
@@ -807,7 +803,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
                 + message.getMsgId();
         LOG.warn(warningMessage);
         reportAndRemoveMessage(message, accessor, instanceName, ProcessedMessageState.DISCARDED);
-        _statusUpdateUtil.logWarning(message, HelixStateMachineEngine.class, warningMessage, accessor);
+        _statusUpdateUtil.logWarning(message, HelixStateMachineEngine.class, warningMessage, manager);
 
         // Proactively send a session sync message from participant to controller
         // upon session mismatch after a new session is established
@@ -873,7 +869,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
         String error =
             "Failed to create message handler for " + message.getMsgId() + ", exception: " + e;
 
-        _statusUpdateUtil.logError(message, HelixStateMachineEngine.class, e, error, accessor);
+        _statusUpdateUtil.logError(message, HelixStateMachineEngine.class, e, error, manager);
 
         message.setMsgState(MessageState.UNPROCESSABLE);
         removeMessageFromZK(accessor, message, instanceName);
@@ -882,7 +878,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
         continue;
       }
 
-      markReadMessage(message, changeContext, accessor);
+      markReadMessage(message, changeContext, manager);
       readMsgs.add(message);
 
       // batch creation of all current state meta data
@@ -1003,12 +999,12 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
   }
 
   private void markReadMessage(Message message, NotificationContext context,
-      HelixDataAccessor accessor) {
+      HelixManager manager) {
     message.setMsgState(MessageState.READ);
     message.setReadTimeStamp(new Date().getTime());
     message.setExecuteSessionId(context.getManager().getSessionId());
 
-    _statusUpdateUtil.logInfo(message, HelixStateMachineEngine.class, "New Message", accessor);
+    _statusUpdateUtil.logInfo(message, HelixStateMachineEngine.class, "New Message", manager);
   }
 
   public MessageHandler createMessageHandler(Message message, NotificationContext changeContext) {

http://git-wip-us.apache.org/repos/asf/helix/blob/5ffab62f/helix-core/src/main/java/org/apache/helix/model/Message.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/Message.java b/helix-core/src/main/java/org/apache/helix/model/Message.java
index d987c54..9f0054c 100644
--- a/helix-core/src/main/java/org/apache/helix/model/Message.java
+++ b/helix-core/src/main/java/org/apache/helix/model/Message.java
@@ -644,7 +644,7 @@ public class Message extends HelixProperty {
     replyMessage.setMsgState(MessageState.NEW);
     replyMessage.setSrcName(instanceName);
     if (srcMessage.getSrcInstanceType() == InstanceType.CONTROLLER) {
-      replyMessage.setTgtName("Controller");
+      replyMessage.setTgtName(InstanceType.CONTROLLER.name());
     } else {
       replyMessage.setTgtName(srcMessage.getMsgSrc());
     }
@@ -849,7 +849,7 @@ public class Message extends HelixProperty {
    * @return true if this is a controller message, false otherwise
    */
   public boolean isControlerMsg() {
-    return getTgtName().equalsIgnoreCase("controller");
+    return getTgtName().equalsIgnoreCase(InstanceType.CONTROLLER.name());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/helix/blob/5ffab62f/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java b/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
index cd424f8..3a13180 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
@@ -127,7 +127,7 @@ public class HelixStateMachineEngine implements StateMachineEngine {
 
         if (_manager.getInstanceType() == InstanceType.CONTROLLER
             || _manager.getInstanceType() == InstanceType.CONTROLLER_PARTICIPANT) {
-          nopMsg.setTgtName("Controller");
+          nopMsg.setTgtName(InstanceType.CONTROLLER.name());
           accessor.setProperty(keyBuilder.controllerMessage(nopMsg.getId()), nopMsg);
         }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/5ffab62f/helix-core/src/main/java/org/apache/helix/util/StatusUpdateUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/util/StatusUpdateUtil.java b/helix-core/src/main/java/org/apache/helix/util/StatusUpdateUtil.java
index 3eb20ef..90e0d24 100644
--- a/helix-core/src/main/java/org/apache/helix/util/StatusUpdateUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/StatusUpdateUtil.java
@@ -32,13 +32,17 @@ import java.util.Map;
 import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.helix.*;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.ZNRecord;
 import org.apache.helix.model.Error;
 import org.apache.helix.model.Message;
-import org.apache.helix.model.StatusUpdate;
 import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.model.StatusUpdate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -310,6 +314,18 @@ public class StatusUpdateUtil {
     return String.format("%4s %26s ", level.toString(), time) + recordId;
   }
 
+  @Deprecated
+  public void logMessageStatusUpdateRecord(Message message, Level level, Class classInfo,
+      String additionalInfo, HelixDataAccessor accessor) {
+    try {
+      ZNRecord record = createMessageStatusUpdateRecord(message, level, classInfo, additionalInfo);
+      publishStatusUpdateRecord(record, message, level, accessor,
+          message.getTgtName().equalsIgnoreCase(InstanceType.CONTROLLER.name()));
+    } catch (Exception e) {
+      _logger.error("Exception while logging status update", e);
+    }
+  }
+
   /**
    * Create a statusupdate that is related to a cluster manager message, then record it to
    * the zookeeper store.
@@ -321,14 +337,16 @@ public class StatusUpdateUtil {
    *          class info about the class that reports the status update
    * @param additionalInfo
    *          info the additional debug information
-   * @param accessor
-   *          the zookeeper data accessor that writes the status update to zookeeper
+   * @param manager
+   *          the HelixManager that writes the status update to zookeeper
    */
   public void logMessageStatusUpdateRecord(Message message, Level level, Class classInfo,
-      String additionalInfo, HelixDataAccessor accessor) {
+      String additionalInfo, HelixManager manager) {
     try {
       ZNRecord record = createMessageStatusUpdateRecord(message, level, classInfo, additionalInfo);
-      publishStatusUpdateRecord(record, message, level, accessor);
+      publishStatusUpdateRecord(record, message, level, manager.getHelixDataAccessor(),
+          manager.getInstanceType().equals(InstanceType.CONTROLLER) || manager.getInstanceType()
+              .equals(InstanceType.CONTROLLER_PARTICIPANT));
     } catch (Exception e) {
       _logger.error("Exception while logging status update", e);
     }
@@ -338,39 +356,67 @@ public class StatusUpdateUtil {
     RebalanceResourceFailure,
   }
 
-  public void logError(ErrorType errorType, Class classInfo, String additionalInfo,
-      HelixManager helixManager) {
+  public void logError(ErrorType errorType, Class classInfo, String additionalInfo, HelixManager helixManager) {
     if (helixManager != null) {
-      logError(errorType, "ErrorInfo", helixManager.getInstanceName(), helixManager.getSessionId(),
-          additionalInfo, classInfo, helixManager.getHelixDataAccessor());
+      logError(errorType, "ErrorInfo", helixManager.getInstanceName(), helixManager.getSessionId(), additionalInfo,
+          classInfo, helixManager.getHelixDataAccessor(),
+          helixManager.getInstanceType().equals(InstanceType.CONTROLLER) || helixManager.getInstanceType()
+              .equals(InstanceType.CONTROLLER_PARTICIPANT));
     } else {
       _logger.error("Exception while logging error. HelixManager is null.");
     }
   }
 
   private void logError(ErrorType errorType, String updateKey, String instanceName,
-      String sessionId, String additionalInfo, Class classInfo, HelixDataAccessor accessor) {
+      String sessionId, String additionalInfo, Class classInfo, HelixDataAccessor accessor,
+      boolean isController) {
     try {
       ZNRecord record = createEmptyStatusUpdateRecord(sessionId + "__" + instanceName);
 
-      Map<String, String> contentMap = new TreeMap<String, String>();
+      Map<String, String> contentMap = new TreeMap<>();
       contentMap.put("AdditionalInfo", additionalInfo);
       contentMap.put("Class", classInfo.toString());
       contentMap.put("SessionId", sessionId);
 
       record.setMapField(generateMapFieldId(Level.HELIX_ERROR, updateKey), contentMap);
 
-      publishErrorRecord(record, instanceName, errorType.name(), updateKey, sessionId, accessor);
+      publishErrorRecord(record, instanceName, errorType.name(), updateKey, sessionId, accessor,
+          isController);
     } catch (Exception e) {
       _logger.error("Exception while logging error", e);
     }
   }
 
+  public void logError(Message message, Class classInfo, String additionalInfo, HelixManager manager) {
+    logMessageStatusUpdateRecord(message, Level.HELIX_ERROR, classInfo, additionalInfo, manager);
+  }
+
+  public void logError(Message message, Class classInfo, Exception e, String additionalInfo,
+      HelixManager manager) {
+    StringWriter sw = new StringWriter();
+    PrintWriter pw = new PrintWriter(sw);
+    e.printStackTrace(pw);
+    logMessageStatusUpdateRecord(message, Level.HELIX_ERROR, classInfo,
+        additionalInfo + sw.toString(), manager);
+  }
+
+  public void logInfo(Message message, Class classInfo, String additionalInfo,
+      HelixManager manager) {
+    logMessageStatusUpdateRecord(message, Level.HELIX_INFO, classInfo, additionalInfo, manager);
+  }
+
+  public void logWarning(Message message, Class classInfo, String additionalInfo,
+      HelixManager manager) {
+    logMessageStatusUpdateRecord(message, Level.HELIX_WARNING, classInfo, additionalInfo, manager);
+  }
+
+  @Deprecated
   public void logError(Message message, Class classInfo, String additionalInfo,
       HelixDataAccessor accessor) {
     logMessageStatusUpdateRecord(message, Level.HELIX_ERROR, classInfo, additionalInfo, accessor);
   }
 
+  @Deprecated
   public void logError(Message message, Class classInfo, Exception e, String additionalInfo,
       HelixDataAccessor accessor) {
     StringWriter sw = new StringWriter();
@@ -380,16 +426,42 @@ public class StatusUpdateUtil {
         additionalInfo + sw.toString(), accessor);
   }
 
+  @Deprecated
   public void logInfo(Message message, Class classInfo, String additionalInfo,
       HelixDataAccessor accessor) {
     logMessageStatusUpdateRecord(message, Level.HELIX_INFO, classInfo, additionalInfo, accessor);
   }
 
+  @Deprecated
   public void logWarning(Message message, Class classInfo, String additionalInfo,
       HelixDataAccessor accessor) {
     logMessageStatusUpdateRecord(message, Level.HELIX_WARNING, classInfo, additionalInfo, accessor);
   }
 
+  private String getStatusUpdateKey(Message message) {
+    if (message.getMsgType().equalsIgnoreCase(MessageType.STATE_TRANSITION.name())) {
+      return message.getPartitionName();
+    }
+    return message.getMsgId();
+  }
+
+  /**
+   * Generate the sub-path under STATUSUPDATE or ERROR path for a status update
+   */
+  String getStatusUpdateSubPath(Message message) {
+    if (message.getMsgType().equalsIgnoreCase(MessageType.STATE_TRANSITION.name())) {
+      return message.getResourceName();
+    }
+    return message.getMsgType();
+  }
+
+  String getStatusUpdateRecordName(Message message) {
+    if (message.getMsgType().equalsIgnoreCase(MessageType.STATE_TRANSITION.name())) {
+      return message.getTgtSessionId() + "__" + message.getResourceName();
+    }
+    return message.getMsgId();
+  }
+
   /**
    * Write a status update record to zookeeper to the zookeeper store.
    * @param record
@@ -400,9 +472,11 @@ public class StatusUpdateUtil {
    *          the error level of the message update
    * @param accessor
    *          the zookeeper data accessor that writes the status update to zookeeper
+   * @param isController
+   *          if the update is for a controller instance or not
    */
   void publishStatusUpdateRecord(ZNRecord record, Message message, Level level,
-      HelixDataAccessor accessor) {
+      HelixDataAccessor accessor, boolean isController) {
     String instanceName = message.getTgtName();
     String statusUpdateSubPath = getStatusUpdateSubPath(message);
     String statusUpdateKey = getStatusUpdateKey(message);
@@ -416,11 +490,10 @@ public class StatusUpdateUtil {
 
     Builder keyBuilder = accessor.keyBuilder();
     if (!_recordedMessages.containsKey(message.getMsgId())) {
-      // TODO instanceName of a controller might be any string
-      if (instanceName.equalsIgnoreCase("Controller")) {
-        accessor.updateProperty(
-            keyBuilder.controllerTaskStatus(statusUpdateSubPath, statusUpdateKey),
-            new StatusUpdate(createMessageLogRecord(message)));
+      if (isController) {
+        accessor
+            .updateProperty(keyBuilder.controllerTaskStatus(statusUpdateSubPath, statusUpdateKey),
+                new StatusUpdate(createMessageLogRecord(message)));
 
       } else {
 
@@ -442,7 +515,7 @@ public class StatusUpdateUtil {
       _recordedMessages.put(message.getMsgId(), message.getMsgId());
     }
 
-    if (instanceName.equalsIgnoreCase("Controller")) {
+    if (isController) {
       accessor.updateProperty(
           keyBuilder.controllerTaskStatus(statusUpdateSubPath, statusUpdateKey), new StatusUpdate(
               record));
@@ -462,35 +535,10 @@ public class StatusUpdateUtil {
     // If the error level is ERROR, also write the record to "ERROR" ZNode
     if (Level.HELIX_ERROR == level) {
       publishErrorRecord(record, instanceName, statusUpdateSubPath, statusUpdateKey, sessionId,
-          accessor);
+          accessor, isController);
     }
   }
 
-  private String getStatusUpdateKey(Message message) {
-    if (message.getMsgType().equalsIgnoreCase(MessageType.STATE_TRANSITION.name())) {
-      return message.getPartitionName();
-    }
-    return message.getMsgId();
-  }
-
-  /**
-   * Generate the sub-path under STATUSUPDATE or ERROR path for a status update
-   */
-  String getStatusUpdateSubPath(Message message) {
-    if (message.getMsgType().equalsIgnoreCase(MessageType.STATE_TRANSITION.name())) {
-      return message.getResourceName();
-    } else {
-      return message.getMsgType();
-    }
-  }
-
-  String getStatusUpdateRecordName(Message message) {
-    if (message.getMsgType().equalsIgnoreCase(MessageType.STATE_TRANSITION.name())) {
-      return message.getTgtSessionId() + "__" + message.getResourceName();
-    }
-    return message.getMsgId();
-  }
-
   /**
    * Write an error record to zookeeper to the zookeeper store.
    * @param record
@@ -505,12 +553,13 @@ public class StatusUpdateUtil {
    *          the session id
    * @param accessor
    *          the zookeeper data accessor that writes the status update to zookeeper
+   * @param isController
+   *          if the error log is for a controller instance or not
    */
   void publishErrorRecord(ZNRecord record, String instanceName, String updateSubPath,
-      String updateKey, String sessionId, HelixDataAccessor accessor) {
+      String updateKey, String sessionId, HelixDataAccessor accessor, boolean isController) {
     Builder keyBuilder = accessor.keyBuilder();
-    // TODO remove the hard code: "controller"
-    if (instanceName.toLowerCase().startsWith("controller")) {
+    if (isController) {
       // TODO need to fix: ERRORS_CONTROLLER doesn't have a form of
       // ../{sessionId}/{subPath}
       accessor.setProperty(keyBuilder.controllerTaskError(updateSubPath), new Error(record));