You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2018/07/17 19:06:11 UTC

[3/4] helix git commit: Fix the issue that resource MBean may not be cleaned up when the resource is dropped.

Fix the issue that resource MBean may not be cleaned up when the resource is dropped.

If a resource is not successfully created on any participant, and it is removed in this situation, the corresponding MBean maybe left over by the controller.
This fix will ensure all resource MBeans that are no longer related to any living resource to be removed.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/3deeeaba
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/3deeeaba
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/3deeeaba

Branch: refs/heads/master
Commit: 3deeeabaa988bf40c0ba953209dd6b26df984552
Parents: 964b802
Author: Jiajun Wang <jj...@linkedin.com>
Authored: Thu Jul 12 15:20:19 2018 -0700
Committer: jiajunwang <er...@gmail.com>
Committed: Tue Jul 17 11:53:26 2018 -0700

----------------------------------------------------------------------
 .../stages/ExternalViewComputeStage.java        | 71 ++++++++++-----
 .../monitoring/mbeans/ClusterStatusMonitor.java | 53 +++++++----
 .../mbeans/TestDropResourceMetricsReset.java    | 94 ++++++++++++++++----
 3 files changed, 160 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/3deeeaba/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
index bf7be01..d901327 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
@@ -19,21 +19,41 @@ package org.apache.helix.controller.stages;
  * under the License.
  */
 
-import org.apache.helix.*;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixDefinedState;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZNRecordDelta;
 import org.apache.helix.ZNRecordDelta.MergeOperation;
 import org.apache.helix.controller.LogUtil;
 import org.apache.helix.controller.pipeline.AbstractAsyncBaseStage;
 import org.apache.helix.controller.pipeline.AsyncWorkerType;
 import org.apache.helix.controller.pipeline.StageException;
 import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
-import org.apache.helix.model.*;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Message;
 import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.ResourceConfig;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.model.StatusUpdate;
 import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
 
 public class ExternalViewComputeStage extends AbstractAsyncBaseStage {
   private static Logger LOG = LoggerFactory.getLogger(ExternalViewComputeStage.class);
@@ -60,8 +80,11 @@ public class ExternalViewComputeStage extends AbstractAsyncBaseStage {
 
     CurrentStateOutput currentStateOutput =
         event.getAttribute(AttributeName.CURRENT_STATE.name());
+    ClusterStatusMonitor clusterStatusMonitor =
+        event.getAttribute(AttributeName.clusterStatusMonitor.name());
 
     List<ExternalView> newExtViews = new ArrayList<>();
+    Set<String> monitoringResources = new HashSet<>();
 
     Map<String, ExternalView> curExtViews = cache.getExternalViews();
 
@@ -100,24 +123,19 @@ public class ExternalViewComputeStage extends AbstractAsyncBaseStage {
       IdealState idealState = cache.getIdealState(resourceName);
       if (!cache.isTaskCache()) {
         ResourceConfig resourceConfig = cache.getResourceConfig(resourceName);
-        ClusterStatusMonitor clusterStatusMonitor =
-            event.getAttribute(AttributeName.clusterStatusMonitor.name());
         if (clusterStatusMonitor != null) {
-          if (idealState != null && (resourceConfig == null || !resourceConfig
-              .isMonitoringDisabled())) {
-            if (!idealState.getStateModelDefRef()
-                .equalsIgnoreCase(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
-              StateModelDefinition stateModelDef =
-                  cache.getStateModelDef(idealState.getStateModelDefRef());
-              clusterStatusMonitor
-                  .setResourceStatus(view, cache.getIdealState(view.getResourceName()),
-                      stateModelDef);
-              clusterStatusMonitor
-                  .updatePendingMessages(resource.getResourceName(), totalPendingMessageCount);
-            }
-          } else {
-            // Drop the metrics if the resource is dropped, or the MonitorDisabled is changed to true.
-            clusterStatusMonitor.unregisterResource(view.getResourceName());
+          if (idealState != null // has ideal state
+              && (resourceConfig == null || !resourceConfig.isMonitoringDisabled()) // monitoring not disabled
+              && !idealState.getStateModelDefRef() // and not a job resource
+              .equalsIgnoreCase(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
+            StateModelDefinition stateModelDef =
+                cache.getStateModelDef(idealState.getStateModelDefRef());
+            clusterStatusMonitor
+                .setResourceStatus(view, cache.getIdealState(view.getResourceName()),
+                    stateModelDef);
+            clusterStatusMonitor
+                .updatePendingMessages(resource.getResourceName(), totalPendingMessageCount);
+            monitoringResources.add(resourceName);
           }
         }
       }
@@ -145,7 +163,12 @@ public class ExternalViewComputeStage extends AbstractAsyncBaseStage {
       }
     }
 
-    List<String> externalviewsToRemove = new ArrayList<>();
+    // Keep MBeans for existing resources and unregister MBeans for dropped resources
+    if (clusterStatusMonitor != null) {
+      clusterStatusMonitor.retainResourceMonitor(monitoringResources);
+    }
+
+    List<String> externalViewsToRemove = new ArrayList<>();
     // TODO: consider not setting the externalview of SCHEDULER_TASK_QUEUE at all.
     // Are there any entity that will be interested in its change?
 
@@ -163,7 +186,7 @@ public class ExternalViewComputeStage extends AbstractAsyncBaseStage {
           LogUtil
               .logInfo(LOG, _eventId, "Remove externalView for resource: " + resourceName);
           dataAccessor.removeProperty(keyBuilder.externalView(resourceName));
-          externalviewsToRemove.add(resourceName);
+          externalViewsToRemove.add(resourceName);
         }
       } else {
         keys.add(keyBuilder.externalView(resourceName));
@@ -181,10 +204,10 @@ public class ExternalViewComputeStage extends AbstractAsyncBaseStage {
       if (!resourceMap.keySet().contains(resourceName)) {
         LogUtil.logInfo(LOG, _eventId, "Remove externalView for resource: " + resourceName);
         dataAccessor.removeProperty(keyBuilder.externalView(resourceName));
-        externalviewsToRemove.add(resourceName);
+        externalViewsToRemove.add(resourceName);
       }
     }
-    cache.removeExternalViews(externalviewsToRemove);
+    cache.removeExternalViews(externalViewsToRemove);
   }
 
   private void updateScheduledTaskStatus(ExternalView ev, HelixManager manager,

http://git-wip-us.apache.org/repos/asf/helix/blob/3deeeaba/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index 52d06e3..e3655d8 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -19,12 +19,22 @@ package org.apache.helix.monitoring.mbeans;
  * under the License.
  */
 
+import com.google.common.base.Joiner;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-import java.util.concurrent.atomic.AtomicLong;
 import org.apache.helix.controller.stages.BestPossibleStateOutput;
-import org.apache.helix.model.*;
-import org.apache.helix.task.*;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.WorkflowConfig;
+import org.apache.helix.task.WorkflowContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,8 +43,16 @@ import javax.management.MBeanServer;
 import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
 import java.lang.management.ManagementFactory;
-import java.util.*;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
 
 
 public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
@@ -398,20 +416,21 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
   }
 
   /**
-   * Indicate that a resource has been dropped, thus making it OK to drop its metrics
-   * @param resourceName the resource that has been dropped
+   * Cleanup resource monitors. Keep the monitors if only exist in the input set.
+   * @param resourceNames the resources that still exist
    */
-  public void unregisterResource(String resourceName) {
-    if (_resourceMbeanMap.containsKey(resourceName)) {
-      synchronized (this) {
-        if (_resourceMbeanMap.containsKey(resourceName)) {
-          try {
-            unregisterResources(Arrays.asList(resourceName));
-          } catch (MalformedObjectNameException e) {
-            LOG.error("Could not unregister beans for " + resourceName, e);
-          }
-        }
-      }
+  public void retainResourceMonitor(Set<String> resourceNames) {
+    Set<String> resourcesToRemove = new HashSet<>();
+    synchronized (this) {
+      resourcesToRemove.addAll(_resourceMbeanMap.keySet());
+    }
+    resourcesToRemove.removeAll(resourceNames);
+
+    try {
+      unregisterResources(resourcesToRemove);
+    } catch (MalformedObjectNameException e) {
+      LOG.error(String.format("Could not unregister beans for the following resources: %s",
+          Joiner.on(',').join(resourcesToRemove)), e);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/3deeeaba/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDropResourceMetricsReset.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDropResourceMetricsReset.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDropResourceMetricsReset.java
index b815160..04e79e6 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDropResourceMetricsReset.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDropResourceMetricsReset.java
@@ -19,15 +19,6 @@ package org.apache.helix.monitoring.mbeans;
  * under the License.
  */
 
-import java.io.IOException;
-import java.util.Date;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import javax.management.InstanceNotFoundException;
-import javax.management.MBeanServerConnection;
-import javax.management.MBeanServerNotification;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZkUnitTestBase;
 import org.apache.helix.integration.manager.ClusterControllerManager;
@@ -35,23 +26,39 @@ import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.tools.ClusterSetup;
 import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import javax.management.InstanceNotFoundException;
+import javax.management.MBeanServerConnection;
+import javax.management.MBeanServerNotification;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import java.io.IOException;
+import java.util.Date;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
 public class TestDropResourceMetricsReset extends ZkUnitTestBase {
-  private final CountDownLatch _registerLatch = new CountDownLatch(1);
-  private final CountDownLatch _unregisterLatch = new CountDownLatch(1);
+  private CountDownLatch _registerLatch;
+  private CountDownLatch _unregisterLatch;
+  private String _className = TestHelper.getTestClassName();
+
+  @BeforeMethod
+  public void beforeMethod() {
+    _registerLatch = new CountDownLatch(1);
+    _unregisterLatch = new CountDownLatch(1);
+  }
 
   @Test
   public void testBasic() throws Exception {
     final int NUM_PARTICIPANTS = 4;
     final int NUM_PARTITIONS = 64;
     final int NUM_REPLICAS = 1;
-    final String RESOURCE_NAME = "TestDB0";
+    final String RESOURCE_NAME = "BasicDB0";
 
-    String className = TestHelper.getTestClassName();
     String methodName = TestHelper.getTestMethodName();
-    String clusterName = className + "_" + methodName;
-    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+    String clusterName = _className + "_" + methodName;
 
     ParticipantMonitorListener listener =
         new ParticipantMonitorListener("ClusterStatus", clusterName, RESOURCE_NAME);
@@ -59,7 +66,7 @@ public class TestDropResourceMetricsReset extends ZkUnitTestBase {
     // Set up cluster
     TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
         "localhost", // participant name prefix
-        "TestDB", // resource name prefix
+        "BasicDB", // resource name prefix
         1, // resources
         NUM_PARTITIONS, // partitions per resource
         NUM_PARTICIPANTS, // number of nodes
@@ -97,7 +104,60 @@ public class TestDropResourceMetricsReset extends ZkUnitTestBase {
       participant.syncStop();
     }
     TestHelper.dropCluster(clusterName, _gZkClient);
-    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+
+  @Test (dependsOnMethods = "testBasic")
+  public void testDropWithNoCurrentState() throws Exception {
+    final int NUM_PARTICIPANTS = 1;
+    final int NUM_PARTITIONS = 1;
+    final int NUM_REPLICAS = 1;
+    final String RESOURCE_NAME = "TestDB0";
+
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = _className + "_" + methodName;
+
+    ParticipantMonitorListener listener =
+        new ParticipantMonitorListener("ClusterStatus", clusterName, RESOURCE_NAME);
+
+    // Set up cluster
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        "TestDB", // resource name prefix
+        1, // resources
+        NUM_PARTITIONS, // partitions per resource
+        NUM_PARTICIPANTS, // number of nodes
+        NUM_REPLICAS, // replicas
+        "MasterSlave", RebalanceMode.FULL_AUTO, // use FULL_AUTO mode to test node tagging
+        true); // do rebalance
+
+    // Start participants and controller
+    ClusterSetup setupTool = new ClusterSetup(_gZkClient);
+    MockParticipantManager participant =
+        new MockParticipantManager(ZK_ADDR, clusterName, "localhost_12918");
+    participant.syncStart();
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+    controller.syncStart();
+
+    // Verify that the bean was created
+    boolean noTimeout = _registerLatch.await(30000, TimeUnit.MILLISECONDS);
+    Assert.assertTrue(noTimeout);
+
+    // stop the participant, so the resource does not exist in any current states.
+    participant.syncStop();
+
+    // Drop the resource
+    setupTool.dropResourceFromCluster(clusterName, RESOURCE_NAME);
+
+    // Verify that the bean was removed
+    noTimeout = _unregisterLatch.await(30000, TimeUnit.MILLISECONDS);
+    Assert.assertTrue(noTimeout);
+
+    // Clean up
+    listener.disconnect();
+    controller.syncStop();
+
+    TestHelper.dropCluster(clusterName, _gZkClient);
   }
 
   private ObjectName getObjectName(String resourceName, String clusterName)