You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2014/05/20 03:34:00 UTC
git commit: [HELIX-132: current-state and external-view are not
cleaned up when a resource has been removed, rb=21666]
Repository: helix
Updated Branches:
refs/heads/helix-0.6.2-release 21296c95e -> 87806c1c7
[HELIX-132: current-state and external-view are not cleaned up when a resource has been removed, rb=21666]
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/87806c1c
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/87806c1c
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/87806c1c
Branch: refs/heads/helix-0.6.2-release
Commit: 87806c1c7fa2809de6874cf8c52c44b24a8c6da6
Parents: 21296c9
Author: zzhang <zz...@apache.org>
Authored: Mon May 19 18:33:38 2014 -0700
Committer: zzhang <zz...@apache.org>
Committed: Mon May 19 18:33:38 2014 -0700
----------------------------------------------------------------------
.../main/java/org/apache/helix/GroupCommit.java | 40 +++---
.../stages/ExternalViewComputeStage.java | 12 +-
.../stages/ResourceComputationStage.java | 4 +
.../helix/manager/zk/ZKHelixDataAccessor.java | 2 +-
.../org/apache/helix/integration/TestDrop.java | 125 +++++++++++++++++--
5 files changed, 138 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/87806c1c/helix-core/src/main/java/org/apache/helix/GroupCommit.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/GroupCommit.java b/helix-core/src/main/java/org/apache/helix/GroupCommit.java
index 4d1bf68..f813aa9 100644
--- a/helix-core/src/main/java/org/apache/helix/GroupCommit.java
+++ b/helix-core/src/main/java/org/apache/helix/GroupCommit.java
@@ -53,10 +53,6 @@ public class GroupCommit {
private final Queue[] _queues = new Queue[100];
- // potential memory leak if we add resource and remove resource
- // TODO: move the cache logic to data accessor
- // private final Map<String, ZNRecord> _cache = new ConcurrentHashMap<String, ZNRecord>();
-
/**
* Set up a group committer and its associated queues
*/
@@ -81,6 +77,11 @@ public class GroupCommit {
*/
public boolean commit(BaseDataAccessor<ZNRecord> accessor, int options, String key,
ZNRecord record) {
+ return commit(accessor, options, key, record, false);
+ }
+
+ public boolean commit(BaseDataAccessor<ZNRecord> accessor, int options, String key,
+ ZNRecord record, boolean removeIfEmpty) {
Queue queue = getQueue(key);
Entry entry = new Entry(key, record);
@@ -98,7 +99,6 @@ public class GroupCommit {
processed.add(first);
String mergedKey = first._key;
- // ZNRecord merged = _cache.get(mergedKey);
ZNRecord merged = null;
try {
@@ -113,24 +113,7 @@ public class GroupCommit {
* value in ZK; use it as initial value if exists
*/
if (merged == null) {
- // ZNRecord valueOnZk = null;
- // try
- // {
- // valueOnZk = accessor.get(mergedKey, null, 0);
- // }
- // catch(Exception e)
- // {
- // LOG.info(e);
- // }
- // if(valueOnZk != null)
- // {
- // merged = valueOnZk;
- // merged.merge(first._record);
- // }
- // else // Zk path has null data. use the first record as initial record.
- {
merged = new ZNRecord(first._record);
- }
} else {
merged.merge(first._record);
}
@@ -145,9 +128,11 @@ public class GroupCommit {
it.remove();
}
// System.out.println("size:"+ processed.size());
- accessor.set(mergedKey, merged, options);
- // accessor.set(mergedKey, merged, BaseDataAccessor.Option.PERSISTENT);
- // _cache.put(mergedKey, merged);
+ if (removeIfEmpty && merged.getMapFields().isEmpty()) {
+ accessor.remove(mergedKey, options);
+ } else {
+ accessor.set(mergedKey, merged, options);
+ }
} finally {
queue._running.set(null);
for (Entry e : processed) {
@@ -162,7 +147,10 @@ public class GroupCommit {
try {
entry.wait(10);
} catch (InterruptedException e) {
- e.printStackTrace();
+ LOG.error("Interrupted while committing change, key: " + key + ", record: " + record, e);
+
+ // Restore interrupt status
+ Thread.currentThread().interrupt();
return false;
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/87806c1c/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
index 7ef4584..f427f3d 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
@@ -29,7 +29,6 @@ import java.util.TreeMap;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixDefinedState;
import org.apache.helix.HelixManager;
-import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.ZNRecord;
@@ -49,12 +48,12 @@ import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
import org.apache.log4j.Logger;
public class ExternalViewComputeStage extends AbstractBaseStage {
- private static Logger log = Logger.getLogger(ExternalViewComputeStage.class);
+ private static Logger LOG = Logger.getLogger(ExternalViewComputeStage.class);
@Override
public void process(ClusterEvent event) throws Exception {
long startTime = System.currentTimeMillis();
- log.info("START ExternalViewComputeStage.process()");
+ LOG.info("START ExternalViewComputeStage.process()");
HelixManager manager = event.getAttribute("helixmanager");
Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
@@ -145,12 +144,13 @@ public class ExternalViewComputeStage extends AbstractBaseStage {
// remove dead external-views
for (String resourceName : curExtViews.keySet()) {
if (!resourceMap.keySet().contains(resourceName)) {
+ LOG.info("Remove externalView for resource: " + resourceName);
dataAccessor.removeProperty(keyBuilder.externalView(resourceName));
}
}
long endTime = System.currentTimeMillis();
- log.info("END ExternalViewComputeStage.process(). took: " + (endTime - startTime) + " ms");
+ LOG.info("END ExternalViewComputeStage.process(). took: " + (endTime - startTime) + " ms");
}
private void updateScheduledTaskStatus(ExternalView ev, HelixManager manager,
@@ -172,7 +172,7 @@ public class ExternalViewComputeStage extends AbstractBaseStage {
for (String taskState : ev.getStateMap(taskPartitionName).values()) {
if (taskState.equalsIgnoreCase(HelixDefinedState.ERROR.toString())
|| taskState.equalsIgnoreCase("COMPLETED")) {
- log.info(taskPartitionName + " finished as " + taskState);
+ LOG.info(taskPartitionName + " finished as " + taskState);
finishedTasks.getListFields().put(taskPartitionName, emptyList);
finishedTasks.getMapFields().put(taskPartitionName, emptyMap);
@@ -182,7 +182,7 @@ public class ExternalViewComputeStage extends AbstractBaseStage {
taskQueueIdealState.getRecord().getMapField(taskPartitionName)
.get(DefaultSchedulerMessageHandlerFactory.CONTROLLER_MSG_ID);
if (controllerMsgId != null) {
- log.info(taskPartitionName + " finished with controllerMsg " + controllerMsgId);
+ LOG.info(taskPartitionName + " finished with controllerMsg " + controllerMsgId);
if (!controllerMsgUpdates.containsKey(controllerMsgId)) {
controllerMsgUpdates.put(controllerMsgId, new HashMap<String, String>());
}
http://git-wip-us.apache.org/repos/asf/helix/blob/87806c1c/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
index 51f0ec1..54e2255 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
@@ -85,6 +85,10 @@ public class ResourceComputationStage extends AbstractBaseStage {
String resourceName = currentState.getResourceName();
Map<String, String> resourceStateMap = currentState.getPartitionStateMap();
+ if (resourceStateMap.keySet().isEmpty()) {
+ // don't include empty current state for dropped resource
+ continue;
+ }
// don't overwrite ideal state settings
if (!resourceMap.containsKey(resourceName)) {
http://git-wip-us.apache.org/repos/asf/helix/blob/87806c1c/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
index 94e8feb..d0c4024 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
@@ -152,7 +152,7 @@ public class ZKHelixDataAccessor implements HelixDataAccessor, ControllerChangeL
boolean success = false;
switch (type) {
case CURRENTSTATES:
- success = _groupCommit.commit(_baseDataAccessor, options, path, value.getRecord());
+ success = _groupCommit.commit(_baseDataAccessor, options, path, value.getRecord(), true);
break;
default:
if (type.usePropertyTransferServer()) {
http://git-wip-us.apache.org/repos/asf/helix/blob/87806c1c/helix-core/src/test/java/org/apache/helix/integration/TestDrop.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDrop.java b/helix-core/src/test/java/org/apache/helix/integration/TestDrop.java
index 5b54fad..f73ef29 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestDrop.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDrop.java
@@ -25,15 +25,19 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixDefinedState;
import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
-import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.PropertyKey;
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.mock.participant.ErrTransition;
+import org.apache.helix.model.CurrentState;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.builder.CustomModeISBuilder;
@@ -44,6 +48,79 @@ import org.testng.Assert;
import org.testng.annotations.Test;
public class TestDrop extends ZkIntegrationTestBase {
+
+ /**
+ * Assert externalView and currentState for each participant are empty
+ * @param clusterName
+ * @param db
+ * @param participants
+ */
+ private void assertEmptyCSandEV(String clusterName, String db, MockParticipantManager[] participants) {
+ HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ Assert.assertNull(accessor.getProperty(keyBuilder.externalView(db)));
+
+ for (MockParticipantManager participant : participants) {
+ String instanceName = participant.getInstanceName();
+ String sessionId = participant.getSessionId();
+ Assert.assertNull(accessor.getProperty(keyBuilder.currentState(instanceName, sessionId, db)));
+ }
+ }
+
+ @Test
+ public void testBasic() throws Exception {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ final int n = 5;
+
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+ MockParticipantManager[] participants = new MockParticipantManager[n];
+
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ 8, // partitions per resource
+ n, // number of nodes
+ 3, // replicas
+ "MasterSlave", true); // do rebalance
+
+ // start controller
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller");
+ controller.syncStart();
+
+ // start participants
+ for (int i = 0; i < n; i++) {
+ String instanceName = "localhost_" + (12918 + i);
+
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+ participants[i].syncStart();
+ }
+
+ boolean result =
+ ClusterStateVerifier
+ .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // Drop TestDB0
+ HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
+ admin.dropResource(clusterName, "TestDB0");
+
+ result =
+ ClusterStateVerifier
+ .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ assertEmptyCSandEV(clusterName, "TestDB0", participants);
+
+ System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+ }
+
@Test
public void testDropErrorPartitionAutoIS() throws Exception {
// Logger.getRootLogger().setLevel(Level.INFO);
@@ -107,6 +184,8 @@ public class TestDrop extends ZkIntegrationTestBase {
clusterName));
Assert.assertTrue(result);
+ assertEmptyCSandEV(className, "TestDB0", participants);
+
// clean up
controller.syncStop();
for (int i = 0; i < n; i++) {
@@ -181,13 +260,39 @@ public class TestDrop extends ZkIntegrationTestBase {
ZKHelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
- Builder keyBuilder = accessor.keyBuilder();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
InstanceConfig config = accessor.getProperty(keyBuilder.instanceConfig("localhost_12918"));
List<String> disabledPartitions = config.getDisabledPartitions();
// System.out.println("disabledPartitions: " + disabledPartitions);
Assert.assertEquals(disabledPartitions.size(), 1, "TestDB0_4 should be disabled");
Assert.assertEquals(disabledPartitions.get(0), "TestDB0_4");
+ // ExteranlView should have TestDB0_4->localhost_12918_>ERROR
+ ExternalView ev = accessor.getProperty(keyBuilder.externalView("TestDB0"));
+ Set<String> partitions = ev.getPartitionSet();
+ Assert.assertEquals(partitions.size(), 1, "Should have TestDB0_4->localhost_12918->ERROR");
+ String errPartition = partitions.iterator().next();
+ Assert.assertEquals(errPartition, "TestDB0_4");
+ Map<String, String> stateMap = ev.getStateMap(errPartition);
+ Assert.assertEquals(stateMap.size(), 1);
+ Assert.assertEquals(stateMap.keySet().iterator().next(), "localhost_12918");
+ Assert.assertEquals(stateMap.get("localhost_12918"), HelixDefinedState.ERROR.name());
+
+ // localhost_12918 should have TestDB0_4 in ERROR state
+ CurrentState cs = accessor.getProperty(keyBuilder.currentState(participants[0].getInstanceName(),
+ participants[0].getSessionId(), "TestDB0"));
+ Map<String, String> partitionStateMap = cs.getPartitionStateMap();
+ Assert.assertEquals(partitionStateMap.size(), 1);
+ Assert.assertEquals(partitionStateMap.keySet().iterator().next(), "TestDB0_4");
+ Assert.assertEquals(partitionStateMap.get("TestDB0_4"), HelixDefinedState.ERROR.name());
+
+ // all other participants should have cleaned up empty current state
+ for (int i = 1; i < n; i++) {
+ String instanceName = participants[i].getInstanceName();
+ String sessionId = participants[i].getSessionId();
+ Assert.assertNull(accessor.getProperty(keyBuilder.currentState(instanceName, sessionId, "TestDB0")));
+ }
+
// clean up
controller.syncStop();
for (int i = 0; i < n; i++) {
@@ -230,8 +335,8 @@ public class TestDrop extends ZkIntegrationTestBase {
HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
- Builder keyBuiler = accessor.keyBuilder();
- accessor.setProperty(keyBuiler.idealStates("TestDB0"), isBuilder.build());
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ accessor.setProperty(keyBuilder.idealStates("TestDB0"), isBuilder.build());
// start controller
ClusterControllerManager controller =
@@ -273,6 +378,8 @@ public class TestDrop extends ZkIntegrationTestBase {
clusterName));
Assert.assertTrue(result, "Should be empty exeternal-view");
+ assertEmptyCSandEV(clusterName, "TestDB0", participants);
+
// clean up
controller.syncStop();
for (int i = 0; i < n; i++) {
@@ -335,7 +442,7 @@ public class TestDrop extends ZkIntegrationTestBase {
Assert.assertTrue(result);
// drop schemata resource group
- System.out.println("Dropping schemata resource group...");
+ // System.out.println("Dropping schemata resource group...");
command = "--zkSvr " + ZK_ADDR + " --dropResource " + clusterName + " schemata";
ClusterSetup.processCommandLineArgs(command.split("\\s+"));
result =
@@ -343,13 +450,7 @@ public class TestDrop extends ZkIntegrationTestBase {
clusterName));
Assert.assertTrue(result);
- // make sure schemata external view is empty
- ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
- ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
- Builder keyBuilder = accessor.keyBuilder();
- ExternalView extView = accessor.getProperty(keyBuilder.externalView("schemata"));
- Assert.assertEquals(extView.getPartitionSet().size(), 0,
- "schemata externalView should be empty but was \"" + extView + "\"");
+ assertEmptyCSandEV(clusterName, "schemata", participants);
// clean up
controller.syncStop();