You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/10/31 20:50:48 UTC

[1/2] helix git commit: Customize the pipeline thread name with cluster name and pipeline type.

Repository: helix
Updated Branches:
  refs/heads/master 566d4f166 -> 1103fecb6


Customize the pipeline thread name with cluster name and pipeline type.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/96593708
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/96593708
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/96593708

Branch: refs/heads/master
Commit: 9659370849116b8a3f5d1853c7695274942ce211
Parents: 566d4f1
Author: Lei Xia <lx...@linkedin.com>
Authored: Tue Oct 2 14:43:25 2018 -0700
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Wed Oct 31 13:50:33 2018 -0700

----------------------------------------------------------------------
 .../controller/GenericHelixController.java      | 30 +++++++++++++-------
 1 file changed, 19 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/96593708/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index dd409e5..eb75286 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -356,7 +356,7 @@ public class GenericHelixController implements IdealStateChangeListener,
   }
 
   private GenericHelixController(PipelineRegistry registry, PipelineRegistry taskRegistry,
-      String clusterName) {
+      final String clusterName) {
     _paused = false;
     _registry = registry;
     _taskRegistry = taskRegistry;
@@ -366,7 +366,7 @@ public class GenericHelixController implements IdealStateChangeListener,
     _asyncTasksThreadPool =
         Executors.newScheduledThreadPool(ASYNC_TASKS_THREADPOOL_SIZE, new ThreadFactory() {
           @Override public Thread newThread(Runnable r) {
-            return new Thread(r, "GerenricHelixController-async_task_thread");
+            return new Thread(r, "HelixController-async_tasks-" + _clusterName);
           }
         });
 
@@ -378,8 +378,9 @@ public class GenericHelixController implements IdealStateChangeListener,
     _cache = new ClusterDataCache(clusterName);
     _taskCache = new ClusterDataCache(clusterName);
 
-    _eventThread = new ClusterEventProcessor(_cache, _eventQueue);
-    _taskEventThread = new ClusterEventProcessor(_taskCache, _taskEventQueue);
+    _eventThread = new ClusterEventProcessor(_cache, _eventQueue, "default-" + clusterName);
+    _taskEventThread =
+        new ClusterEventProcessor(_taskCache, _taskEventQueue, "task-" + clusterName);
 
     _forceRebalanceTimer = new Timer();
     _lastPipelineEndTimestamp = TopStateHandoffReportStage.TIMESTAMP_NOT_RECORDED;
@@ -979,33 +980,40 @@ public class GenericHelixController implements IdealStateChangeListener,
   private class ClusterEventProcessor extends Thread {
     private final ClusterDataCache _cache;
     private final ClusterEventBlockingQueue _eventBlockingQueue;
+    private final String _processorName;
 
     public ClusterEventProcessor(ClusterDataCache cache,
-        ClusterEventBlockingQueue eventBlockingQueue) {
-      super("GenericHelixController-event_process");
+        ClusterEventBlockingQueue eventBlockingQueue, String processorName) {
+      super("HelixController-pipeline-" + processorName);
       _cache = cache;
       _eventBlockingQueue = eventBlockingQueue;
+      _processorName = processorName;
     }
 
     @Override
     public void run() {
-      logger.info("START ClusterEventProcessor thread  for cluster " + _clusterName);
+      logger.info(
+          "START ClusterEventProcessor thread  for cluster " + _clusterName + ", processor name: "
+              + _processorName);
       while (!isInterrupted()) {
         try {
           handleEvent(_eventBlockingQueue.take(), _cache);
         } catch (InterruptedException e) {
-          logger.warn("ClusterEventProcessor interrupted", e);
+          logger.warn("ClusterEventProcessor interrupted " + _processorName, e);
           interrupt();
         } catch (ZkInterruptedException e) {
-          logger.warn("ClusterEventProcessor caught a ZK connection interrupt", e);
+          logger
+              .warn("ClusterEventProcessor caught a ZK connection interrupt " + _processorName, e);
           interrupt();
         } catch (ThreadDeath death) {
+          logger.error("ClusterEventProcessor caught a ThreadDeath  " + _processorName, death);
           throw death;
         } catch (Throwable t) {
-          logger.error("ClusterEventProcessor failed while running the controller pipeline", t);
+          logger.error("ClusterEventProcessor failed while running the controller pipeline "
+              + _processorName, t);
         }
       }
-      logger.info("END ClusterEventProcessor thread");
+      logger.info("END ClusterEventProcessor thread " + _processorName);
     }
   }
 


[2/2] helix git commit: Improve message handling properly

Posted by jx...@apache.org.
Improve message handling properly

We need some improvement for client side to help stabilize the message handling. Current scenario is that when process message, Helix marked the message read before really scheduled.

In this case, if scheduling has problem, but the message already has been marked as read, no one will take care of the message and hang there forever.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/1103fecb
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/1103fecb
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/1103fecb

Branch: refs/heads/master
Commit: 1103fecb67def5e610a7b22636ba4ac25e23777b
Parents: 9659370
Author: Junkai Xue <jx...@linkedin.com>
Authored: Mon Sep 17 11:51:31 2018 -0700
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Wed Oct 31 13:50:37 2018 -0700

----------------------------------------------------------------------
 .../helix/messaging/handling/HelixTaskExecutor.java  | 15 +++++++++------
 1 file changed, 9 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/1103fecb/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
index 5e2082c..3ae90d3 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -952,22 +952,25 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
     if (readMsgs.size() > 0) {
       updateMessageState(readMsgs, accessor, instanceName);
 
+      // Remove message if schedule tasks are failed.
       for (Map.Entry<String, MessageHandler> handlerEntry : stateTransitionHandlers.entrySet()) {
         MessageHandler handler = handlerEntry.getValue();
         NotificationContext context = stateTransitionContexts.get(handlerEntry.getKey());
         Message msg = handler._message;
-        scheduleTask(
-            new HelixTask(msg, context, handler, this)
-        );
+        if (!scheduleTask(new HelixTask(msg, context, handler, this))) {
+          removeMessageFromTaskAndFutureMap(msg);
+          removeMessageFromZK(accessor, msg, instanceName);
+        }
       }
 
       for (int i = 0; i < nonStateTransitionHandlers.size(); i++) {
         MessageHandler handler = nonStateTransitionHandlers.get(i);
         NotificationContext context = nonStateTransitionContexts.get(i);
         Message msg = handler._message;
-        scheduleTask(
-            new HelixTask(msg, context, handler, this)
-        );
+        if (!scheduleTask(new HelixTask(msg, context, handler, this))) {
+          removeMessageFromTaskAndFutureMap(msg);
+          removeMessageFromZK(accessor, msg, instanceName);
+        }
       }
     }
   }