You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2014/05/20 03:34:00 UTC

git commit: [HELIX-132: current-state and external-view are not cleaned up when a resource has been removed, rb=21666]

Repository: helix
Updated Branches:
  refs/heads/helix-0.6.2-release 21296c95e -> 87806c1c7


[HELIX-132: current-state and external-view are not cleaned up when a resource has been removed, rb=21666]


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/87806c1c
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/87806c1c
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/87806c1c

Branch: refs/heads/helix-0.6.2-release
Commit: 87806c1c7fa2809de6874cf8c52c44b24a8c6da6
Parents: 21296c9
Author: zzhang <zz...@apache.org>
Authored: Mon May 19 18:33:38 2014 -0700
Committer: zzhang <zz...@apache.org>
Committed: Mon May 19 18:33:38 2014 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/helix/GroupCommit.java |  40 +++---
 .../stages/ExternalViewComputeStage.java        |  12 +-
 .../stages/ResourceComputationStage.java        |   4 +
 .../helix/manager/zk/ZKHelixDataAccessor.java   |   2 +-
 .../org/apache/helix/integration/TestDrop.java  | 125 +++++++++++++++++--
 5 files changed, 138 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/87806c1c/helix-core/src/main/java/org/apache/helix/GroupCommit.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/GroupCommit.java b/helix-core/src/main/java/org/apache/helix/GroupCommit.java
index 4d1bf68..f813aa9 100644
--- a/helix-core/src/main/java/org/apache/helix/GroupCommit.java
+++ b/helix-core/src/main/java/org/apache/helix/GroupCommit.java
@@ -53,10 +53,6 @@ public class GroupCommit {
 
   private final Queue[] _queues = new Queue[100];
 
-  // potential memory leak if we add resource and remove resource
-  // TODO: move the cache logic to data accessor
-  // private final Map<String, ZNRecord> _cache = new ConcurrentHashMap<String, ZNRecord>();
-
   /**
    * Set up a group committer and its associated queues
    */
@@ -81,6 +77,11 @@ public class GroupCommit {
    */
   public boolean commit(BaseDataAccessor<ZNRecord> accessor, int options, String key,
       ZNRecord record) {
+    return commit(accessor, options, key, record, false);
+  }
+
+  public boolean commit(BaseDataAccessor<ZNRecord> accessor, int options, String key,
+      ZNRecord record, boolean removeIfEmpty) {
     Queue queue = getQueue(key);
     Entry entry = new Entry(key, record);
 
@@ -98,7 +99,6 @@ public class GroupCommit {
           processed.add(first);
 
           String mergedKey = first._key;
-          // ZNRecord merged = _cache.get(mergedKey);
           ZNRecord merged = null;
 
           try {
@@ -113,24 +113,7 @@ public class GroupCommit {
            * value in ZK; use it as initial value if exists
            */
           if (merged == null) {
-            // ZNRecord valueOnZk = null;
-            // try
-            // {
-            // valueOnZk = accessor.get(mergedKey, null, 0);
-            // }
-            // catch(Exception e)
-            // {
-            // LOG.info(e);
-            // }
-            // if(valueOnZk != null)
-            // {
-            // merged = valueOnZk;
-            // merged.merge(first._record);
-            // }
-            // else // Zk path has null data. use the first record as initial record.
-            {
               merged = new ZNRecord(first._record);
-            }
           } else {
             merged.merge(first._record);
           }
@@ -145,9 +128,11 @@ public class GroupCommit {
             it.remove();
           }
           // System.out.println("size:"+ processed.size());
-          accessor.set(mergedKey, merged, options);
-          // accessor.set(mergedKey, merged, BaseDataAccessor.Option.PERSISTENT);
-          // _cache.put(mergedKey, merged);
+          if (removeIfEmpty && merged.getMapFields().isEmpty()) {
+            accessor.remove(mergedKey, options);
+          } else {
+            accessor.set(mergedKey, merged, options);
+          }
         } finally {
           queue._running.set(null);
           for (Entry e : processed) {
@@ -162,7 +147,10 @@ public class GroupCommit {
           try {
             entry.wait(10);
           } catch (InterruptedException e) {
-            e.printStackTrace();
+            LOG.error("Interrupted while committing change, key: " + key + ", record: " + record, e);
+
+            // Restore interrupt status
+            Thread.currentThread().interrupt();
             return false;
           }
         }

http://git-wip-us.apache.org/repos/asf/helix/blob/87806c1c/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
index 7ef4584..f427f3d 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
@@ -29,7 +29,6 @@ import java.util.TreeMap;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixDefinedState;
 import org.apache.helix.HelixManager;
-import org.apache.helix.NotificationContext;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.ZNRecord;
@@ -49,12 +48,12 @@ import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
 import org.apache.log4j.Logger;
 
 public class ExternalViewComputeStage extends AbstractBaseStage {
-  private static Logger log = Logger.getLogger(ExternalViewComputeStage.class);
+  private static Logger LOG = Logger.getLogger(ExternalViewComputeStage.class);
 
   @Override
   public void process(ClusterEvent event) throws Exception {
     long startTime = System.currentTimeMillis();
-    log.info("START ExternalViewComputeStage.process()");
+    LOG.info("START ExternalViewComputeStage.process()");
 
     HelixManager manager = event.getAttribute("helixmanager");
     Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
@@ -145,12 +144,13 @@ public class ExternalViewComputeStage extends AbstractBaseStage {
     // remove dead external-views
     for (String resourceName : curExtViews.keySet()) {
       if (!resourceMap.keySet().contains(resourceName)) {
+        LOG.info("Remove externalView for resource: " + resourceName);
         dataAccessor.removeProperty(keyBuilder.externalView(resourceName));
       }
     }
 
     long endTime = System.currentTimeMillis();
-    log.info("END ExternalViewComputeStage.process(). took: " + (endTime - startTime) + " ms");
+    LOG.info("END ExternalViewComputeStage.process(). took: " + (endTime - startTime) + " ms");
   }
 
   private void updateScheduledTaskStatus(ExternalView ev, HelixManager manager,
@@ -172,7 +172,7 @@ public class ExternalViewComputeStage extends AbstractBaseStage {
       for (String taskState : ev.getStateMap(taskPartitionName).values()) {
         if (taskState.equalsIgnoreCase(HelixDefinedState.ERROR.toString())
             || taskState.equalsIgnoreCase("COMPLETED")) {
-          log.info(taskPartitionName + " finished as " + taskState);
+          LOG.info(taskPartitionName + " finished as " + taskState);
           finishedTasks.getListFields().put(taskPartitionName, emptyList);
           finishedTasks.getMapFields().put(taskPartitionName, emptyMap);
 
@@ -182,7 +182,7 @@ public class ExternalViewComputeStage extends AbstractBaseStage {
                 taskQueueIdealState.getRecord().getMapField(taskPartitionName)
                     .get(DefaultSchedulerMessageHandlerFactory.CONTROLLER_MSG_ID);
             if (controllerMsgId != null) {
-              log.info(taskPartitionName + " finished with controllerMsg " + controllerMsgId);
+              LOG.info(taskPartitionName + " finished with controllerMsg " + controllerMsgId);
               if (!controllerMsgUpdates.containsKey(controllerMsgId)) {
                 controllerMsgUpdates.put(controllerMsgId, new HashMap<String, String>());
               }

http://git-wip-us.apache.org/repos/asf/helix/blob/87806c1c/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
index 51f0ec1..54e2255 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
@@ -85,6 +85,10 @@ public class ResourceComputationStage extends AbstractBaseStage {
 
           String resourceName = currentState.getResourceName();
           Map<String, String> resourceStateMap = currentState.getPartitionStateMap();
+          if (resourceStateMap.keySet().isEmpty()) {
+            // don't include empty current state for dropped resource
+            continue;
+          }
 
           // don't overwrite ideal state settings
           if (!resourceMap.containsKey(resourceName)) {

http://git-wip-us.apache.org/repos/asf/helix/blob/87806c1c/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
index 94e8feb..d0c4024 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
@@ -152,7 +152,7 @@ public class ZKHelixDataAccessor implements HelixDataAccessor, ControllerChangeL
     boolean success = false;
     switch (type) {
     case CURRENTSTATES:
-      success = _groupCommit.commit(_baseDataAccessor, options, path, value.getRecord());
+      success = _groupCommit.commit(_baseDataAccessor, options, path, value.getRecord(), true);
       break;
     default:
       if (type.usePropertyTransferServer()) {

http://git-wip-us.apache.org/repos/asf/helix/blob/87806c1c/helix-core/src/test/java/org/apache/helix/integration/TestDrop.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDrop.java b/helix-core/src/test/java/org/apache/helix/integration/TestDrop.java
index 5b54fad..f73ef29 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestDrop.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDrop.java
@@ -25,15 +25,19 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixDefinedState;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
-import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.PropertyKey;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.mock.participant.ErrTransition;
+import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.builder.CustomModeISBuilder;
@@ -44,6 +48,79 @@ import org.testng.Assert;
 import org.testng.annotations.Test;
 
 public class TestDrop extends ZkIntegrationTestBase {
+
+  /**
+   * Assert externalView and currentState for each participant are empty
+   * @param clusterName
+   * @param db
+   * @param participants
+   */
+  private void assertEmptyCSandEV(String clusterName, String db, MockParticipantManager[] participants) {
+    HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    Assert.assertNull(accessor.getProperty(keyBuilder.externalView(db)));
+
+    for (MockParticipantManager participant : participants) {
+      String instanceName = participant.getInstanceName();
+      String sessionId = participant.getSessionId();
+      Assert.assertNull(accessor.getProperty(keyBuilder.currentState(instanceName, sessionId, db)));
+    }
+  }
+
+  @Test
+  public void testBasic() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    final int n = 5;
+
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    MockParticipantManager[] participants = new MockParticipantManager[n];
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        "TestDB", // resource name prefix
+        1, // resources
+        8, // partitions per resource
+        n, // number of nodes
+        3, // replicas
+        "MasterSlave", true); // do rebalance
+
+    // start controller
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller");
+    controller.syncStart();
+
+    // start participants
+    for (int i = 0; i < n; i++) {
+      String instanceName = "localhost_" + (12918 + i);
+
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+      participants[i].syncStart();
+    }
+
+    boolean result =
+        ClusterStateVerifier
+            .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+                clusterName));
+    Assert.assertTrue(result);
+
+    // Drop TestDB0
+    HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
+    admin.dropResource(clusterName, "TestDB0");
+
+    result =
+        ClusterStateVerifier
+            .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+                clusterName));
+    Assert.assertTrue(result);
+
+    assertEmptyCSandEV(clusterName, "TestDB0", participants);
+
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+
   @Test
   public void testDropErrorPartitionAutoIS() throws Exception {
     // Logger.getRootLogger().setLevel(Level.INFO);
@@ -107,6 +184,8 @@ public class TestDrop extends ZkIntegrationTestBase {
             clusterName));
     Assert.assertTrue(result);
 
+    assertEmptyCSandEV(className, "TestDB0", participants);
+
     // clean up
     controller.syncStop();
     for (int i = 0; i < n; i++) {
@@ -181,13 +260,39 @@ public class TestDrop extends ZkIntegrationTestBase {
 
     ZKHelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
-    Builder keyBuilder = accessor.keyBuilder();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
     InstanceConfig config = accessor.getProperty(keyBuilder.instanceConfig("localhost_12918"));
     List<String> disabledPartitions = config.getDisabledPartitions();
     // System.out.println("disabledPartitions: " + disabledPartitions);
     Assert.assertEquals(disabledPartitions.size(), 1, "TestDB0_4 should be disabled");
     Assert.assertEquals(disabledPartitions.get(0), "TestDB0_4");
 
+    // ExteranlView should have TestDB0_4->localhost_12918_>ERROR
+    ExternalView ev = accessor.getProperty(keyBuilder.externalView("TestDB0"));
+    Set<String> partitions = ev.getPartitionSet();
+    Assert.assertEquals(partitions.size(), 1, "Should have TestDB0_4->localhost_12918->ERROR");
+    String errPartition = partitions.iterator().next();
+    Assert.assertEquals(errPartition, "TestDB0_4");
+    Map<String, String> stateMap = ev.getStateMap(errPartition);
+    Assert.assertEquals(stateMap.size(), 1);
+    Assert.assertEquals(stateMap.keySet().iterator().next(), "localhost_12918");
+    Assert.assertEquals(stateMap.get("localhost_12918"), HelixDefinedState.ERROR.name());
+
+    // localhost_12918 should have TestDB0_4 in ERROR state
+    CurrentState cs = accessor.getProperty(keyBuilder.currentState(participants[0].getInstanceName(),
+        participants[0].getSessionId(), "TestDB0"));
+    Map<String, String> partitionStateMap = cs.getPartitionStateMap();
+    Assert.assertEquals(partitionStateMap.size(), 1);
+    Assert.assertEquals(partitionStateMap.keySet().iterator().next(), "TestDB0_4");
+    Assert.assertEquals(partitionStateMap.get("TestDB0_4"), HelixDefinedState.ERROR.name());
+
+    // all other participants should have cleaned up empty current state
+    for (int i = 1; i < n; i++) {
+      String instanceName = participants[i].getInstanceName();
+      String sessionId = participants[i].getSessionId();
+      Assert.assertNull(accessor.getProperty(keyBuilder.currentState(instanceName, sessionId, "TestDB0")));
+    }
+
     // clean up
     controller.syncStop();
     for (int i = 0; i < n; i++) {
@@ -230,8 +335,8 @@ public class TestDrop extends ZkIntegrationTestBase {
 
     HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
-    Builder keyBuiler = accessor.keyBuilder();
-    accessor.setProperty(keyBuiler.idealStates("TestDB0"), isBuilder.build());
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    accessor.setProperty(keyBuilder.idealStates("TestDB0"), isBuilder.build());
 
     // start controller
     ClusterControllerManager controller =
@@ -273,6 +378,8 @@ public class TestDrop extends ZkIntegrationTestBase {
             clusterName));
     Assert.assertTrue(result, "Should be empty exeternal-view");
 
+    assertEmptyCSandEV(clusterName, "TestDB0", participants);
+
     // clean up
     controller.syncStop();
     for (int i = 0; i < n; i++) {
@@ -335,7 +442,7 @@ public class TestDrop extends ZkIntegrationTestBase {
     Assert.assertTrue(result);
 
     // drop schemata resource group
-    System.out.println("Dropping schemata resource group...");
+    // System.out.println("Dropping schemata resource group...");
     command = "--zkSvr " + ZK_ADDR + " --dropResource " + clusterName + " schemata";
     ClusterSetup.processCommandLineArgs(command.split("\\s+"));
     result =
@@ -343,13 +450,7 @@ public class TestDrop extends ZkIntegrationTestBase {
             clusterName));
     Assert.assertTrue(result);
 
-    // make sure schemata external view is empty
-    ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
-    ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
-    Builder keyBuilder = accessor.keyBuilder();
-    ExternalView extView = accessor.getProperty(keyBuilder.externalView("schemata"));
-    Assert.assertEquals(extView.getPartitionSet().size(), 0,
-        "schemata externalView should be empty but was \"" + extView + "\"");
+    assertEmptyCSandEV(clusterName, "schemata", participants);
 
     // clean up
     controller.syncStop();