You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/07/16 20:35:12 UTC
[3/3] git commit: [HELIX-472] Errors should be cleaned up less
frequently
[HELIX-472] Errors should be cleaned up less frequently
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/371e6576
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/371e6576
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/371e6576
Branch: refs/heads/helix-0.6.x
Commit: 371e65769774d4e116b97c0b88d52e2be545c8ec
Parents: abf4e49
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Tue Jul 15 11:33:43 2014 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Wed Jul 16 11:34:37 2014 -0700
----------------------------------------------------------------------
.../apache/helix/manager/zk/ZKHelixManager.java | 9 +-
.../helix/monitoring/ZKPathDataDumpTask.java | 31 +++---
.../helix/integration/TestSchedulerMessage.java | 2 +-
.../monitoring/TestZKPathDataDumpTask.java | 99 ++++++++++++++++++--
4 files changed, 119 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/371e6576/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index 8d28dbd..f95f6ee 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -139,13 +139,16 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
public void start() {
long initialDelay = 0;
long period = 15 * 60 * 1000;
- int timeThresholdNoChange = 15 * 60 * 1000;
+ long timeThresholdNoChangeForStatusUpdates = 15 * 60 * 1000; // 15 minutes
+ long timeThresholdNoChangeForErrors = 24 * 60 * 60 * 1000; // 1 day
+ int maximumNumberOfLeafNodesAllowed = 10000;
if (_timer == null) {
LOG.info("Start StatusDumpTask");
_timer = new Timer("StatusDumpTimerTask", true);
- _timer.scheduleAtFixedRate(new ZKPathDataDumpTask(helixController, timeThresholdNoChange),
- initialDelay, period);
+ _timer.scheduleAtFixedRate(new ZKPathDataDumpTask(helixController,
+ timeThresholdNoChangeForStatusUpdates, timeThresholdNoChangeForErrors,
+ maximumNumberOfLeafNodesAllowed), initialDelay, period);
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/371e6576/helix-core/src/main/java/org/apache/helix/monitoring/ZKPathDataDumpTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/ZKPathDataDumpTask.java b/helix-core/src/main/java/org/apache/helix/monitoring/ZKPathDataDumpTask.java
index a0190d2..0a91256 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/ZKPathDataDumpTask.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/ZKPathDataDumpTask.java
@@ -25,9 +25,9 @@ import java.util.TimerTask;
import org.apache.helix.BaseDataAccessor;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.PropertyType;
import org.apache.helix.ZNRecord;
-import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.util.HelixUtil;
import org.apache.log4j.Logger;
@@ -38,17 +38,24 @@ import com.google.common.collect.Lists;
public class ZKPathDataDumpTask extends TimerTask {
static Logger LOG = Logger.getLogger(ZKPathDataDumpTask.class);
- private final int _thresholdNoChangeInMs;
+ private final long _thresholdNoChangeMsForStatusUpdates;
+ private final long _thresholdNoChangeMsForErrors;
+ private final int _maxLeafCount;
private final HelixManager _manager;
private final ZNRecordSerializer _jsonSerializer;
- public ZKPathDataDumpTask(HelixManager manager, int thresholdNoChangeInMs) {
+ public ZKPathDataDumpTask(HelixManager manager, long thresholdNoChangeMsForStatusUpdates,
+ long thresholdNoChangeMsForErrors, int maxLeafCount) {
LOG.info("Init ZKPathDataDumpTask for cluster: " + manager.getClusterName()
- + ", thresholdNoChangeInMs: " + thresholdNoChangeInMs);
+ + ", thresholdNoChangeMsForStatusUpdates: " + thresholdNoChangeMsForStatusUpdates
+ + ", thresholdNoChangeMsForErrors: " + thresholdNoChangeMsForErrors + ", maxLeafCount: "
+ + maxLeafCount);
_manager = manager;
_jsonSerializer = new ZNRecordSerializer();
- _thresholdNoChangeInMs = thresholdNoChangeInMs;
+ _thresholdNoChangeMsForStatusUpdates = thresholdNoChangeMsForStatusUpdates;
+ _thresholdNoChangeMsForErrors = thresholdNoChangeMsForErrors;
+ _maxLeafCount = maxLeafCount;
}
@Override
@@ -70,25 +77,26 @@ public class ZKPathDataDumpTask extends TimerTask {
String statusUpdatePath =
HelixUtil.getInstancePropertyPath(_manager.getClusterName(), instance,
PropertyType.STATUSUPDATES);
- dump(baseAccessor, statusUpdatePath, _thresholdNoChangeInMs);
+ dump(baseAccessor, statusUpdatePath, _thresholdNoChangeMsForStatusUpdates, _maxLeafCount);
// dump participant errors
String errorPath =
HelixUtil.getInstancePropertyPath(_manager.getClusterName(), instance,
PropertyType.ERRORS);
- dump(baseAccessor, errorPath, _thresholdNoChangeInMs * 3);
+ dump(baseAccessor, errorPath, _thresholdNoChangeMsForErrors, _maxLeafCount);
}
// dump controller status updates
String controllerStatusUpdatePath =
HelixUtil.getControllerPropertyPath(_manager.getClusterName(),
PropertyType.STATUSUPDATES_CONTROLLER);
- dump(baseAccessor, controllerStatusUpdatePath, _thresholdNoChangeInMs);
+ dump(baseAccessor, controllerStatusUpdatePath, _thresholdNoChangeMsForStatusUpdates,
+ _maxLeafCount);
// dump controller errors
String controllerErrorPath =
HelixUtil.getControllerPropertyPath(_manager.getClusterName(),
PropertyType.ERRORS_CONTROLLER);
- dump(baseAccessor, controllerErrorPath, _thresholdNoChangeInMs);
+ dump(baseAccessor, controllerErrorPath, _thresholdNoChangeMsForErrors, _maxLeafCount);
}
/**
@@ -122,7 +130,8 @@ public class ZKPathDataDumpTask extends TimerTask {
return leafPaths;
}
- void dump(BaseDataAccessor<ZNRecord> accessor, String ancestorPath, int threshold) {
+ void dump(BaseDataAccessor<ZNRecord> accessor, String ancestorPath, long threshold,
+ int maxLeafCount) {
List<String> leafPaths = scanPath(accessor, ancestorPath);
if (leafPaths.isEmpty()) {
return;
@@ -133,7 +142,7 @@ public class ZKPathDataDumpTask extends TimerTask {
long now = System.currentTimeMillis();
for (int i = 0; i < stats.length; i++) {
Stat stat = stats[i];
- if ((now - stat.getMtime()) > threshold) {
+ if ((stats.length > maxLeafCount) || ((now - stat.getMtime()) > threshold)) {
dumpPaths.add(leafPaths.get(i));
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/371e6576/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
index cb6e186..70713f3 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
@@ -393,7 +393,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBase {
}
}
Thread.sleep(3000);
- ZKPathDataDumpTask dumpTask = new ZKPathDataDumpTask(manager, 0);
+ ZKPathDataDumpTask dumpTask = new ZKPathDataDumpTask(manager, 0L, 0L, Integer.MAX_VALUE);
dumpTask.run();
subPaths = _gZkClient.getChildren(controllerStatusPath);
http://git-wip-us.apache.org/repos/asf/helix/blob/371e6576/helix-core/src/test/java/org/apache/helix/monitoring/TestZKPathDataDumpTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/TestZKPathDataDumpTask.java b/helix-core/src/test/java/org/apache/helix/monitoring/TestZKPathDataDumpTask.java
index a3d8ae3..d073dd2 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/TestZKPathDataDumpTask.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/TestZKPathDataDumpTask.java
@@ -19,6 +19,9 @@ package org.apache.helix.monitoring;
* under the License.
*/
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
import java.util.Date;
import org.apache.helix.BaseDataAccessor;
@@ -30,14 +33,11 @@ import org.apache.helix.ZNRecord;
import org.apache.helix.ZkUnitTestBase;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.model.StatusUpdate;
import org.apache.helix.model.Error;
+import org.apache.helix.model.StatusUpdate;
import org.testng.Assert;
import org.testng.annotations.Test;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
public class TestZKPathDataDumpTask extends ZkUnitTestBase {
@Test
@@ -67,8 +67,9 @@ public class TestZKPathDataDumpTask extends ZkUnitTestBase {
when(manager.getHelixDataAccessor()).thenReturn(accessor);
when(manager.getClusterName()).thenReturn(clusterName);
- // run dump task without statusUpdates and errors, should not remove any existing statusUpdate/error paths
- ZKPathDataDumpTask task = new ZKPathDataDumpTask(manager, 0);
+ // run dump task without statusUpdates and errors, should not remove any existing
+ // statusUpdate/error paths
+ ZKPathDataDumpTask task = new ZKPathDataDumpTask(manager, 0L, 0L, Integer.MAX_VALUE);
task.run();
PropertyKey controllerStatusUpdateKey = keyBuilder.controllerTaskStatuses();
Assert.assertTrue(baseAccessor.exists(controllerStatusUpdateKey.getPath(), 0));
@@ -88,7 +89,8 @@ public class TestZKPathDataDumpTask extends ZkUnitTestBase {
// add controller status updates and errors
controllerStatusUpdateKey = keyBuilder.controllerTaskStatus("session_0", "TestDB");
- accessor.setProperty(controllerStatusUpdateKey, new StatusUpdate(new ZNRecord("controllerStatusUpdate")));
+ accessor.setProperty(controllerStatusUpdateKey, new StatusUpdate(new ZNRecord(
+ "controllerStatusUpdate")));
controllerErrorKey = keyBuilder.controllerTaskError("TestDB_error");
accessor.setProperty(controllerErrorKey, new Error(new ZNRecord("controllerError")));
@@ -110,4 +112,87 @@ public class TestZKPathDataDumpTask extends ZkUnitTestBase {
System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
}
+
+ @Test
+ public void testCapacityReached() throws Exception {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ int n = 1;
+
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ 2, // partitions per resource
+ n, // number of nodes
+ 1, // replicas
+ "MasterSlave", true); // do rebalance
+
+ HelixDataAccessor accessor =
+ new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ BaseDataAccessor<ZNRecord> baseAccessor = accessor.getBaseDataAccessor();
+
+ HelixManager manager = mock(HelixManager.class);
+ when(manager.getHelixDataAccessor()).thenReturn(accessor);
+ when(manager.getClusterName()).thenReturn(clusterName);
+
+ // run dump task without statusUpdates and errors, should not remove any existing
+ // statusUpdate/error paths
+ ZKPathDataDumpTask task = new ZKPathDataDumpTask(manager, Long.MAX_VALUE, Long.MAX_VALUE, 1);
+ task.run();
+ PropertyKey controllerStatusUpdateKey = keyBuilder.controllerTaskStatuses();
+ Assert.assertTrue(baseAccessor.exists(controllerStatusUpdateKey.getPath(), 0));
+ PropertyKey controllerErrorKey = keyBuilder.controllerTaskErrors();
+ Assert.assertTrue(baseAccessor.exists(controllerErrorKey.getPath(), 0));
+ PropertyKey statusUpdateKey = keyBuilder.stateTransitionStatus("localhost_12918");
+ Assert.assertTrue(baseAccessor.exists(statusUpdateKey.getPath(), 0));
+ PropertyKey errorKey = keyBuilder.stateTransitionErrors("localhost_12918");
+ Assert.assertTrue(baseAccessor.exists(errorKey.getPath(), 0));
+
+ // add participant status updates and errors
+ statusUpdateKey =
+ keyBuilder.stateTransitionStatus("localhost_12918", "session_0", "TestDB0", "TestDB0_0");
+ accessor.setProperty(statusUpdateKey, new StatusUpdate(new ZNRecord("statusUpdate")));
+ errorKey =
+ keyBuilder.stateTransitionError("localhost_12918", "session_0", "TestDB0", "TestDB0_0");
+ accessor.setProperty(errorKey, new Error(new ZNRecord("error")));
+
+ // add controller status updates and errors (one of each, should not trigger anything)
+ controllerStatusUpdateKey = keyBuilder.controllerTaskStatus("session_0", "TestDB");
+ accessor.setProperty(controllerStatusUpdateKey, new StatusUpdate(new ZNRecord(
+ "controllerStatusUpdate")));
+ controllerErrorKey = keyBuilder.controllerTaskError("TestDB_error");
+ accessor.setProperty(controllerErrorKey, new Error(new ZNRecord("controllerError")));
+
+ // run dump task, should not remove anything because the threshold is not exceeded
+ task.run();
+ Assert.assertTrue(baseAccessor.exists(controllerStatusUpdateKey.getPath(), 0));
+ Assert.assertTrue(baseAccessor.exists(controllerErrorKey.getPath(), 0));
+ Assert.assertTrue(baseAccessor.exists(statusUpdateKey.getPath(), 0));
+ Assert.assertTrue(baseAccessor.exists(errorKey.getPath(), 0));
+
+ // add a second set of all status updates and errors
+ statusUpdateKey =
+ keyBuilder.stateTransitionStatus("localhost_12918", "session_0", "TestDB0", "TestDB0_1");
+ accessor.setProperty(statusUpdateKey, new StatusUpdate(new ZNRecord("statusUpdate")));
+ errorKey =
+ keyBuilder.stateTransitionError("localhost_12918", "session_0", "TestDB0", "TestDB0_1");
+ accessor.setProperty(errorKey, new Error(new ZNRecord("error")));
+ controllerStatusUpdateKey = keyBuilder.controllerTaskStatus("session_0", "TestDB1");
+ accessor.setProperty(controllerStatusUpdateKey, new StatusUpdate(new ZNRecord(
+ "controllerStatusUpdate")));
+ controllerErrorKey = keyBuilder.controllerTaskError("TestDB1_error");
+ accessor.setProperty(controllerErrorKey, new Error(new ZNRecord("controllerError")));
+
+ // run dump task, should remove everything since capacities are exceeded
+ task.run();
+ Assert.assertFalse(baseAccessor.exists(controllerStatusUpdateKey.getPath(), 0));
+ Assert.assertFalse(baseAccessor.exists(controllerErrorKey.getPath(), 0));
+ Assert.assertFalse(baseAccessor.exists(statusUpdateKey.getPath(), 0));
+ Assert.assertFalse(baseAccessor.exists(errorKey.getPath(), 0));
+ }
}