You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/06/28 21:18:53 UTC

helix git commit: [HELIX-706] process tev and persist assignment asynchronously

Repository: helix
Updated Branches:
  refs/heads/master 6e047915d -> 7a2b9693d


[HELIX-706] process tev and persist assignment asynchronously


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/7a2b9693
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/7a2b9693
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/7a2b9693

Branch: refs/heads/master
Commit: 7a2b9693d49d578ac9121a944f1b469c2f2316d9
Parents: 6e04791
Author: Harry Zhang <hr...@linkedin.com>
Authored: Tue Jun 26 16:05:50 2018 -0700
Committer: Harry Zhang <hr...@linkedin.com>
Committed: Thu Jun 28 14:18:24 2018 -0700

----------------------------------------------------------------------
 .../helix/common/DedupEventProcessor.java       |  3 +
 .../controller/GenericHelixController.java      | 64 +++++++++++++++++---
 .../controller/pipeline/AbstractBaseStage.java  | 23 ++++++-
 .../controller/stages/AsyncWorkerType.java      | 32 ++++++++++
 .../helix/controller/stages/AttributeName.java  |  3 +-
 .../controller/stages/ClusterDataCache.java     |  2 +-
 .../stages/PersistAssignmentStage.java          | 26 +++++++-
 .../stages/TargetExteralViewCalcStage.java      | 53 +++++++++++++---
 8 files changed, 182 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/7a2b9693/helix-core/src/main/java/org/apache/helix/common/DedupEventProcessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/common/DedupEventProcessor.java b/helix-core/src/main/java/org/apache/helix/common/DedupEventProcessor.java
index 7f3525b..942b021 100644
--- a/helix-core/src/main/java/org/apache/helix/common/DedupEventProcessor.java
+++ b/helix-core/src/main/java/org/apache/helix/common/DedupEventProcessor.java
@@ -58,6 +58,9 @@ public abstract class DedupEventProcessor<T, E> extends Thread {
   protected abstract void handleEvent(E event);
 
   public void queueEvent(T eventType, E event) {
+    if (isInterrupted()) {
+      return;
+    }
     _eventQueue.put(eventType, event);
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/7a2b9693/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 6e2df23..0e0817d 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -19,8 +19,18 @@ package org.apache.helix.controller;
  * under the License.
  */
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import org.I0Itec.zkclient.exception.ZkInterruptedException;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
@@ -30,23 +40,26 @@ import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.api.exceptions.HelixMetaDataAccessException;
 import org.apache.helix.api.listeners.*;
 import org.apache.helix.common.ClusterEventBlockingQueue;
+import org.apache.helix.common.DedupEventProcessor;
 import org.apache.helix.controller.pipeline.Pipeline;
 import org.apache.helix.controller.pipeline.PipelineRegistry;
 import org.apache.helix.controller.stages.*;
-import org.apache.helix.model.*;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.MaintenanceSignal;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.PauseSignal;
+import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.monitoring.mbeans.ClusterEventMonitor;
 import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
 import org.apache.helix.task.TaskDriver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.*;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.apache.helix.HelixConstants.ChangeType;
+import static org.apache.helix.HelixConstants.*;
 
 /**
  * Cluster Controllers main goal is to keep the cluster state as close as possible to Ideal State.
@@ -90,6 +103,8 @@ public class GenericHelixController implements IdealStateChangeListener,
   private final ClusterEventBlockingQueue _taskEventQueue;
   private final ClusterEventProcessor _taskEventThread;
 
+  private final Map<AsyncWorkerType, DedupEventProcessor<String, Runnable>> _asyncFIFOWorkerPool;
+
   private long _continousRebalanceFailureCount = 0;
 
   /**
@@ -290,6 +305,9 @@ public class GenericHelixController implements IdealStateChangeListener,
 
     _eventQueue = new ClusterEventBlockingQueue();
     _taskEventQueue = new ClusterEventBlockingQueue();
+
+    _asyncFIFOWorkerPool = new HashMap<>();
+
     _cache = new ClusterDataCache(clusterName);
     _taskCache = new ClusterDataCache(clusterName);
 
@@ -298,12 +316,36 @@ public class GenericHelixController implements IdealStateChangeListener,
 
     _forceRebalanceTimer = new Timer();
 
+    initializeAsyncFIFOWorkers();
     initPipelines(_eventThread, _cache, false);
     initPipelines(_taskEventThread, _taskCache, true);
 
     _clusterStatusMonitor = new ClusterStatusMonitor(_clusterName);
   }
 
+  private void initializeAsyncFIFOWorkers() {
+    for (AsyncWorkerType type : AsyncWorkerType.values()) {
+      DedupEventProcessor<String, Runnable> worker =
+          new DedupEventProcessor<String, Runnable>(_clusterName, type.name()) {
+            @Override
+            protected void handleEvent(Runnable event) {
+              // TODO: retry when queue is empty and event.run() failed?
+              event.run();
+            }
+          };
+      worker.start();
+      _asyncFIFOWorkerPool.put(type, worker);
+      logger.info("Started async worker {}", worker.getName());
+    }
+  }
+
+  private void shutdownAsyncFIFOWorkers() {
+    for (DedupEventProcessor processor : _asyncFIFOWorkerPool.values()) {
+      processor.shutdown();
+      logger.info("Shutdown async worker {}", processor.getName());
+    }
+  }
+
   private boolean isEventQueueEmpty(boolean taskQueue) {
     if (taskQueue) {
       return _taskEventQueue.isEmpty();
@@ -642,6 +684,7 @@ public class GenericHelixController implements IdealStateChangeListener,
     ClusterEvent event = new ClusterEvent(_clusterName, eventType);
     event.addAttribute(AttributeName.helixmanager.name(), changeContext.getManager());
     event.addAttribute(AttributeName.changeContext.name(), changeContext);
+    event.addAttribute(AttributeName.AsyncFIFOWorkerPool.name(), _asyncFIFOWorkerPool);
     for (Map.Entry<String, Object> attr : eventAttributes.entrySet()) {
       event.addAttribute(attr.getKey(), attr.getValue());
     }
@@ -780,6 +823,9 @@ public class GenericHelixController implements IdealStateChangeListener,
       logger.warn("Timeout when terminating async tasks. Some async tasks are still executing.");
     }
 
+    // shutdown async workers
+    shutdownAsyncFIFOWorkers();
+
     enableClusterStatusMonitor(false);
 
     // TODO controller shouldn't be used in anyway after shutdown.

http://git-wip-us.apache.org/repos/asf/helix/blob/7a2b9693/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractBaseStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractBaseStage.java b/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractBaseStage.java
index df37010..d12833f 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractBaseStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractBaseStage.java
@@ -18,9 +18,13 @@ package org.apache.helix.controller.pipeline;
  * specific language governing permissions and limitations
  * under the License.
  */
+
+import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
-
+import org.apache.helix.common.DedupEventProcessor;
+import org.apache.helix.controller.stages.AsyncWorkerType;
+import org.apache.helix.controller.stages.AttributeName;
 import org.apache.helix.controller.stages.ClusterEvent;
 
 public class AbstractBaseStage implements Stage {
@@ -62,4 +66,21 @@ public class AbstractBaseStage implements Stage {
       service.submit(task);
     }
   }
+
+  protected DedupEventProcessor<String, Runnable> getAsyncWorkerFromClusterEvent(ClusterEvent event,
+      AsyncWorkerType worker) {
+    Map<AsyncWorkerType, DedupEventProcessor<String, Runnable>> workerPool =
+        event.getAttribute(AttributeName.AsyncFIFOWorkerPool.name());
+    if (workerPool != null) {
+      if (workerPool.containsKey(worker)) {
+        return workerPool.get(worker);
+      }
+    }
+    return null;
+  }
+
+  protected String getAsyncTaskDedupType(boolean isTaskPipeline) {
+    return String
+        .format("%s::%s", isTaskPipeline ? "TASK" : "RESOURCE", getClass().getSimpleName());
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/7a2b9693/helix-core/src/main/java/org/apache/helix/controller/stages/AsyncWorkerType.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/AsyncWorkerType.java b/helix-core/src/main/java/org/apache/helix/controller/stages/AsyncWorkerType.java
new file mode 100644
index 0000000..995705f
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/AsyncWorkerType.java
@@ -0,0 +1,32 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * There are bunch of stages, i.e. TargetExternalViewCalc, PersistAssignment, etc., that have
+ * the choice to submit its tasks to corresponding workers to do the job asynchronously.
+ *
+ * This class contains Async worker enums that corresponding stages can use
+ */
+
+public enum AsyncWorkerType {
+  TargetExternalViewCalcWorker,
+  PersistAssignmentWorker
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/7a2b9693/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java b/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
index f7a6da2..b98dc9e 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
@@ -35,5 +35,6 @@ public enum AttributeName {
   clusterStatusMonitor,
   changeContext,
   instanceName,
-  eventData
+  eventData,
+  AsyncFIFOWorkerPool
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/7a2b9693/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index 5f87317..16b8633 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -154,7 +154,7 @@ public class ClusterDataCache {
       _liveInstanceCacheMap = accessor.getChildValuesMap(keyBuilder.liveInstances(), true);
       _updateInstanceOfflineTime = true;
       LOG.info("Refresh LiveInstances for cluster " + _clusterName + ", took " + (
-          System.currentTimeMillis() - startTime) + " ms");
+          System.currentTimeMillis() - start) + " ms");
     }
 
     if (_propertyDataChangedMap.get(ChangeType.INSTANCE_CONFIG)) {

http://git-wip-us.apache.org/repos/asf/helix/blob/7a2b9693/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
index 8d188ec..7463f24 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
@@ -24,13 +24,13 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-
 import java.util.Set;
 import org.I0Itec.zkclient.DataUpdater;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.common.DedupEventProcessor;
 import org.apache.helix.controller.common.PartitionStateMap;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
@@ -48,7 +48,29 @@ import org.slf4j.LoggerFactory;
 public class PersistAssignmentStage extends AbstractBaseStage {
   private static final Logger LOG = LoggerFactory.getLogger(PersistAssignmentStage.class);
 
-  @Override public void process(ClusterEvent event) throws Exception {
+  @Override
+  public void process(final ClusterEvent event) throws Exception {
+    DedupEventProcessor<String, Runnable> asyncWorker =
+        getAsyncWorkerFromClusterEvent(event, AsyncWorkerType.PersistAssignmentWorker);
+    ClusterDataCache cache = event.getAttribute(AttributeName.ClusterDataCache.name());
+
+    if (asyncWorker != null) {
+      LOG.info("Sending PersistAssignmentStage task for cluster {}, {} pipeline to worker",
+          cache.getClusterName(), cache.isTaskCache() ? "TASK" : "RESOURCE");
+      asyncWorker.queueEvent(getAsyncTaskDedupType(cache.isTaskCache()), new Runnable() {
+        @Override
+        public void run() {
+          doPersistAssignment(event);
+        }
+      });
+    } else {
+      LOG.info("Starting PersistAssignmentStage synchronously for cluster {}, {} pipeline",
+          cache.getClusterName(), cache.isTaskCache() ? "TASK" : "RESOURCE");
+      doPersistAssignment(event);
+    }
+  }
+
+  private void doPersistAssignment(final ClusterEvent event) {
     ClusterDataCache cache = event.getAttribute(AttributeName.ClusterDataCache.name());
     ClusterConfig clusterConfig = cache.getClusterConfig();
 

http://git-wip-us.apache.org/repos/asf/helix/blob/7a2b9693/helix-core/src/main/java/org/apache/helix/controller/stages/TargetExteralViewCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/TargetExteralViewCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/TargetExteralViewCalcStage.java
index b7e4ebd..95b3988 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/TargetExteralViewCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/TargetExteralViewCalcStage.java
@@ -23,12 +23,11 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
 import org.apache.helix.AccessOption;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
-import org.apache.helix.HelixProperty;
 import org.apache.helix.PropertyKey;
+import org.apache.helix.common.DedupEventProcessor;
 import org.apache.helix.controller.common.PartitionStateMap;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.model.ClusterConfig;
@@ -44,24 +43,40 @@ public class TargetExteralViewCalcStage extends AbstractBaseStage {
   private static final Logger LOG = LoggerFactory.getLogger(TargetExteralViewCalcStage.class);
 
   @Override
-  public void process(ClusterEvent event) throws Exception {
+  public void process(final ClusterEvent event) throws Exception {
     ClusterDataCache cache = event.getAttribute(AttributeName.ClusterDataCache.name());
+    DedupEventProcessor<String, Runnable> tevWorker =
+        getAsyncWorkerFromClusterEvent(event, AsyncWorkerType.TargetExternalViewCalcWorker);
+
     ClusterConfig clusterConfig = cache.getClusterConfig();
 
     if (cache.isTaskCache() || !clusterConfig.isTargetExternalViewEnabled()) {
       return;
     }
 
+    if (tevWorker == null) {
+      LOG.info("Generating target external view synchronously for cluster {}, {} pipeline",
+          cache.getClusterName(), cache.isTaskCache() ? "TASK" : "RESOURCE");
+      doCalculateTargetExternalView(event);
+    } else {
+      // We have an async worker so update external view asynchronously
+      LOG.info("Sending target external view generating task for cluster {}, {} pipeline to worker",
+          cache.getClusterName(), cache.isTaskCache() ? "TASK" : "RESOURCE");
+      tevWorker.queueEvent(getAsyncTaskDedupType(cache.isTaskCache()), new Runnable() {
+        @Override
+        public void run() {
+          doCalculateTargetExternalView(event);
+        }
+      });
+    }
+
+  }
+
+  private void doCalculateTargetExternalView(final ClusterEvent event) {
     HelixManager helixManager = event.getAttribute(AttributeName.helixmanager.name());
+    ClusterDataCache cache = event.getAttribute(AttributeName.ClusterDataCache.name());
     HelixDataAccessor accessor = helixManager.getHelixDataAccessor();
 
-    if (!accessor.getBaseDataAccessor()
-        .exists(accessor.keyBuilder().targetExternalViews().getPath(), AccessOption.PERSISTENT)) {
-      accessor.getBaseDataAccessor()
-          .create(accessor.keyBuilder().targetExternalViews().getPath(), null,
-              AccessOption.PERSISTENT);
-    }
-
     BestPossibleStateOutput bestPossibleAssignments =
         event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
 
@@ -69,6 +84,17 @@ public class TargetExteralViewCalcStage extends AbstractBaseStage {
         event.getAttribute(AttributeName.INTERMEDIATE_STATE.name());
     Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name());
 
+    long startTimeStamp = System.currentTimeMillis();
+    LOG.info("START: computing target external view for cluster {}, {} pipeline",
+        cache.getClusterName(), cache.isTaskCache() ? "TASK" : "RESOURCE");
+
+    if (!accessor.getBaseDataAccessor()
+        .exists(accessor.keyBuilder().targetExternalViews().getPath(), AccessOption.PERSISTENT)) {
+      accessor.getBaseDataAccessor()
+          .create(accessor.keyBuilder().targetExternalViews().getPath(), null,
+              AccessOption.PERSISTENT);
+    }
+
     List<PropertyKey> keys = new ArrayList<>();
     List<ExternalView> targetExternalViews = new ArrayList<>();
 
@@ -115,7 +141,13 @@ public class TargetExteralViewCalcStage extends AbstractBaseStage {
         }
       }
     }
+    // TODO (HELIX-964): remove TEV when idealstate is removed
     accessor.setChildren(keys, targetExternalViews);
+
+    long endTimeStamp = System.currentTimeMillis();
+    LOG.info("END: computing target external view for cluster {}, {} pipeline. Took: {} ms",
+        cache.getClusterName(), cache.isTaskCache() ? "TASK" : "RESOURCE",
+        endTimeStamp - startTimeStamp);
   }
 
   private Map<String, Map<String, String>> convertToMapFields(
@@ -126,4 +158,5 @@ public class TargetExteralViewCalcStage extends AbstractBaseStage {
     }
     return mapFields;
   }
+
 }