You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2021/02/17 18:14:26 UTC

[helix] branch master updated: Add metrics for continuous resource/task rebalance failure count (#1649)

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 7543509  Add metrics for continuous resource/task rebalance failure count (#1649)
7543509 is described below

commit 75435094d1aebab8a087000b9ffe499173250b19
Author: xyuanlu <xy...@gmail.com>
AuthorDate: Wed Feb 17 10:14:15 2021 -0800

    Add metrics for continuous resource/task rebalance failure count (#1649)
    
    Add metrics for continuous resource/task rebalance failure count.
---
 .../helix/controller/GenericHelixController.java   | 34 +++++++++++---
 .../monitoring/mbeans/ClusterStatusMonitor.java    | 22 +++++++++
 .../mbeans/ClusterStatusMonitorMBean.java          | 13 +++++-
 .../controller/TestClusterMaintenanceMode.java     | 53 +++++++++++++++++++++-
 4 files changed, 113 insertions(+), 9 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 11ca5a3..8ba9913 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -165,7 +165,9 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns
 
   private final Map<AsyncWorkerType, DedupEventProcessor<String, Runnable>> _asyncFIFOWorkerPool;
 
-  private long _continousRebalanceFailureCount = 0;
+  private long _continuousRebalanceFailureCount = 0;
+  private long _continuousResourceRebalanceFailureCount = 0;
+  private long _continuousTaskRebalanceFailureCount = 0;
 
   /**
    * The _paused flag is checked by function handleEvent(), while if the flag is set handleEvent()
@@ -800,6 +802,7 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns
         event.getEventType(), event.getEventId(), eventSessionId.orElse("NOT_PRESENT"));
 
     long startTime = System.currentTimeMillis();
+    boolean helixMetaDataAccessRebalanceFail = false;
     boolean rebalanceFail = false;
     for (Pipeline pipeline : pipelines) {
       event.addAttribute(AttributeName.PipelineType.name(), pipeline.getPipelineType());
@@ -810,17 +813,16 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns
         logger.error(
             "Exception while executing {} pipeline: {} for cluster {}. Will not continue to next pipeline",
             dataProvider.getPipelineName(), _clusterName, Arrays.toString(e.getStackTrace()));
-
         if (e instanceof HelixMetaDataAccessException) {
-          rebalanceFail = true;
+          helixMetaDataAccessRebalanceFail = true;
           // If pipeline failed due to read/write fails to zookeeper, retry the pipeline.
           dataProvider.requireFullRefresh();
           logger.warn("Rebalance pipeline failed due to read failure from zookeeper, cluster: " + _clusterName);
 
           // only push a retry event when there is no pending event in the corresponding event queue.
           if (isEventQueueEmpty(isTaskFrameworkPipeline)) {
-            _continousRebalanceFailureCount ++;
-            long delay = getRetryDelay(_continousRebalanceFailureCount);
+            _continuousRebalanceFailureCount ++;
+            long delay = getRetryDelay(_continuousRebalanceFailureCount);
             if (delay == 0) {
               forceRebalance(manager, ClusterEventType.RetryRebalance);
             } else {
@@ -832,11 +834,16 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns
           }
         }
         _clusterStatusMonitor.reportRebalanceFailure();
+        updateContinuousRebalancedFailureCount(isTaskFrameworkPipeline, false /*resetToZero*/);
+        rebalanceFail = true;
         break;
       }
     }
+    if (!helixMetaDataAccessRebalanceFail) {
+      _continuousRebalanceFailureCount = 0;
+    }
     if (!rebalanceFail) {
-      _continousRebalanceFailureCount = 0;
+      updateContinuousRebalancedFailureCount(isTaskFrameworkPipeline, true /*resetToZero*/);
     }
 
     _lastPipelineEndTimestamp = System.currentTimeMillis();
@@ -883,6 +890,21 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns
     resetClusterStatusMonitor();
   }
 
+  private void updateContinuousRebalancedFailureCount(boolean isTaskFrameworkPipeline,
+      boolean resetToZero) {
+    if (isTaskFrameworkPipeline) {
+      _continuousTaskRebalanceFailureCount =
+          resetToZero ? 0 : _continuousTaskRebalanceFailureCount + 1;
+      _clusterStatusMonitor
+          .reportContinuousTaskRebalanceFailureCount(_continuousTaskRebalanceFailureCount);
+    } else {
+      _continuousResourceRebalanceFailureCount =
+          resetToZero ? 0 : _continuousResourceRebalanceFailureCount + 1;
+      _clusterStatusMonitor
+          .reportContinuousResourceRebalanceFailureCount(_continuousResourceRebalanceFailureCount);
+    }
+  }
+
   /**
    * get the delay on next retry rebalance due to zk read failure, We use a simple exponential
    * backoff to make the delay between [10ms, 1000ms]
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index 2242b12..c2211a0 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -86,6 +86,8 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
   private AtomicLong _totalPastDueMsgSize = new AtomicLong(0L);
   private boolean _rebalanceFailure = false;
   private AtomicLong _rebalanceFailureCount = new AtomicLong(0L);
+  private AtomicLong _continuousResourceRebalanceFailureCount = new AtomicLong(0L);
+  private AtomicLong _continuousTaskRebalanceFailureCount = new AtomicLong(0L);
 
   private final ConcurrentHashMap<String, ResourceMonitor> _resourceMonitorMap =
       new ConcurrentHashMap<>();
@@ -641,6 +643,8 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
       _totalPastDueMsgSize.set(0L);
       _totalMsgQueueSize.set(0L);
       _rebalanceFailureCount.set(0L);
+      _continuousResourceRebalanceFailureCount.set(0L);
+      _continuousTaskRebalanceFailureCount.set(0L);
     } catch (Exception e) {
       LOG.error("Fail to reset ClusterStatusMonitor, cluster: " + _clusterName, e);
     }
@@ -1014,12 +1018,30 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
     _rebalanceFailureCount.incrementAndGet();
   }
 
+  public void reportContinuousResourceRebalanceFailureCount(long newValue) {
+    _continuousResourceRebalanceFailureCount.set(newValue);
+  }
+
+  public void reportContinuousTaskRebalanceFailureCount(long newValue) {
+    _continuousTaskRebalanceFailureCount.set(newValue);
+  }
+
   @Override
   public long getRebalanceFailureCounter() {
     return _rebalanceFailureCount.get();
   }
 
   @Override
+  public long getContinuousResourceRebalanceFailureCount() {
+    return _continuousResourceRebalanceFailureCount.get();
+  }
+
+  @Override
+  public long getContinuousTaskRebalanceFailureCount() {
+    return _continuousTaskRebalanceFailureCount.get();
+  }
+
+  @Override
   public long getTotalResourceGauge() {
     return _resourceMonitorMap.size();
   }
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java
index 71b7a71..de2d7ac 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java
@@ -83,12 +83,21 @@ public interface ClusterStatusMonitorMBean extends SensorNameProvider {
   long getPaused();
 
   /**
-   * The number of failures during rebalance pipeline.
-   * @return
+   * @return The number of failures during rebalance pipeline.
    */
   long getRebalanceFailureCounter();
 
   /**
+   * @return The number of continuous resource rebalance failure count
+   */
+  long getContinuousResourceRebalanceFailureCount();
+
+  /**
+   * @return The number of continuous task rebalance failure count
+   */
+  long getContinuousTaskRebalanceFailureCount();
+
+  /**
    * @return number of all resources in this cluster
    */
   long getTotalResourceGauge();
diff --git a/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterMaintenanceMode.java b/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterMaintenanceMode.java
index 87aae31..201729e 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterMaintenanceMode.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterMaintenanceMode.java
@@ -22,12 +22,15 @@ package org.apache.helix.integration.controller;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.type.TypeFactory;
 import com.google.common.collect.ImmutableMap;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
 import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.integration.task.TaskTestBase;
@@ -37,12 +40,17 @@ import org.apache.helix.model.ControllerHistory;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.MaintenanceSignal;
+import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import static org.apache.helix.monitoring.mbeans.ClusterStatusMonitor.CLUSTER_DN_KEY;
+
+
 public class TestClusterMaintenanceMode extends TaskTestBase {
+  private static final long TIMEOUT = 180 * 1000L;
   private MockParticipantManager _newInstance;
   private String newResourceAddedDuringMaintenanceMode =
       String.format("%s_%s", WorkflowGenerator.DEFAULT_TGT_DB, 1);
@@ -260,7 +268,7 @@ public class TestClusterMaintenanceMode extends TaskTestBase {
    * @throws InterruptedException
    */
   @Test(dependsOnMethods = "testManualEnablingOverridesAutoEnabling")
-  public void testMaxPartitionLimit() throws InterruptedException {
+  public void testMaxPartitionLimit() throws Exception {
     // Manually exit maintenance mode
     _gSetupTool.getClusterManagementTool().manuallyEnableMaintenanceMode(CLUSTER_NAME, false, null,
         null);
@@ -325,6 +333,49 @@ public class TestClusterMaintenanceMode extends TaskTestBase {
         MaintenanceSignal.TriggeringEntity.CONTROLLER);
     Assert.assertEquals(maintenanceSignal.getAutoTriggerReason(),
         MaintenanceSignal.AutoTriggerReason.MAX_PARTITION_PER_INSTANCE_EXCEEDED);
+
+    // Check if failed rebalance counter is updated
+    boolean result = TestHelper.verify(() -> {
+      try {
+        Long value =
+            (Long) _server.getAttribute(getMbeanName(CLUSTER_NAME), "RebalanceFailureCounter");
+        return value != null && (value > 0);
+      } catch (Exception e) {
+        return false;
+      }
+    }, TIMEOUT);
+    Assert.assertTrue(result);
+
+    // Check failed continuous task rebalance counter is not updated
+    result = TestHelper.verify(() -> {
+      try {
+        Long value = (Long) _server
+            .getAttribute(getMbeanName(CLUSTER_NAME), "ContinuousTaskRebalanceFailureCount");
+        return value != null && (value == 0);
+      } catch (Exception e) {
+        return false;
+      }
+    }, TIMEOUT);
+    Assert.assertTrue(result);
+
+    // Check if failed continuous resource rebalance counter is updated
+    result = TestHelper.verify(() -> {
+      try {
+        Long value = (Long) _server
+            .getAttribute(getMbeanName(CLUSTER_NAME), "ContinuousResourceRebalanceFailureCount");
+        return value != null && (value > 0);
+      } catch (Exception e) {
+        return false;
+      }
+    }, TIMEOUT);
+    Assert.assertTrue(result);
+  }
+
+
+  private ObjectName getMbeanName(String clusterName) throws MalformedObjectNameException {
+    String clusterBeanName = String.format("%s=%s", CLUSTER_DN_KEY, clusterName);
+    return new ObjectName(
+        String.format("%s:%s", MonitorDomainNames.ClusterStatus.name(), clusterBeanName));
   }
 
   /**