You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by lx...@apache.org on 2017/02/08 17:59:56 UTC
[21/38] helix git commit: Persist participant's offline timestamp in
ParticipantHistory.
Persist participant's offline timestamp in ParticipantHistory.
This is to persist the timestamp when a participant is going offline.
1) If a participant goes offlien gracefully (by calling disconnect()), participant will write a timestamp to its history record.
2) If a participant goes offline without calling disconnect() (e.g, GC, machine crashes), controller will try to set the timestamp in its pipeline triggered by liveInstanceChanges.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/46705c59
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/46705c59
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/46705c59
Branch: refs/heads/helix-0.6.x
Commit: 46705c598e054cec54c4d4cd6fb309b3d1a25475
Parents: 9e8ec45
Author: Lei Xia <lx...@linkedin.com>
Authored: Thu Sep 1 18:21:36 2016 -0700
Committer: Lei Xia <lx...@linkedin.com>
Committed: Wed Feb 8 09:50:48 2017 -0800
----------------------------------------------------------------------
.../controller/stages/ClusterDataCache.java | 46 +++++++++++++++
.../helix/manager/zk/ParticipantManager.java | 23 +++++---
.../apache/helix/manager/zk/ZKHelixAdmin.java | 8 +--
.../apache/helix/manager/zk/ZKHelixManager.java | 17 ++++--
.../apache/helix/model/ParticipantHistory.java | 42 +++++++++++++-
.../integration/TestNodeOfflineTimeStamp.java | 59 ++++++++++++++++++++
.../TestRebalancerPersistAssignments.java | 38 ++++++++++++-
.../integration/ZkStandAloneCMTestBase.java | 11 +++-
8 files changed, 222 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/46705c59/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index dbc12d4..2cd8ec0 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -19,6 +19,7 @@ package org.apache.helix.controller.stages;
* under the License.
*/
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -39,6 +40,7 @@ import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message;
+import org.apache.helix.model.ParticipantHistory;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.model.StateModelDefinition;
import org.apache.log4j.Logger;
@@ -62,6 +64,7 @@ public class ClusterDataCache {
Map<String, StateModelDefinition> _stateModelDefMap;
Map<String, InstanceConfig> _instanceConfigMap;
Map<String, InstanceConfig> _instanceConfigCacheMap;
+ Map<String, Long> _instanceOfflineTimeMap;
Map<String, ResourceConfig> _resourceConfigMap;
Map<String, ResourceConfig> _resourceConfigCacheMap;
Map<String, ClusterConstraints> _constraintMap;
@@ -74,6 +77,8 @@ public class ClusterDataCache {
boolean _init = true;
+ boolean _updateInstanceOfflineTime = true;
+
private static final Logger LOG = Logger.getLogger(ClusterDataCache.class.getName());
/**
@@ -102,6 +107,10 @@ public class ClusterDataCache {
_stateModelDefMap = accessor.getChildValuesMap(keyBuilder.stateModelDefs());
_constraintMap = accessor.getChildValuesMap(keyBuilder.constraints());
+ if (_init || _updateInstanceOfflineTime) {
+ updateOfflineInstanceHistory(accessor);
+ }
+
if (LOG.isTraceEnabled()) {
for (LiveInstance instance : _liveInstanceMap.values()) {
LOG.trace("live instance: " + instance.getInstanceName() + " " + instance.getSessionId());
@@ -236,6 +245,42 @@ public class ClusterDataCache {
}
/**
+ * Return the last offline time for given instance.
+ * Return NULL if the instance is ONLINE currently, or the record is not persisted somehow.
+ *
+ * @param instanceName
+ * @return
+ */
+ public Long getInstanceOfflineTime(String instanceName) {
+ if (_instanceOfflineTimeMap != null) {
+ return _instanceOfflineTimeMap.get(instanceName);
+ }
+ return null;
+ }
+
+ private void updateOfflineInstanceHistory(HelixDataAccessor accessor) {
+ List<String> offlineNodes = new ArrayList<String>(_instanceConfigMap.keySet());
+ offlineNodes.removeAll(_liveInstanceMap.keySet());
+ _instanceOfflineTimeMap = new HashMap<String, Long>();
+
+ for(String instance : offlineNodes) {
+ Builder keyBuilder = accessor.keyBuilder();
+ PropertyKey propertyKey = keyBuilder.participantHistory(instance);
+ ParticipantHistory history = accessor.getProperty(propertyKey);
+ if (history == null) {
+ history = new ParticipantHistory(instance);
+ }
+ if (history.getLastOfflineTime() == ParticipantHistory.ONLINE) {
+ history.reportOffline();
+ // persist history back to ZK.
+ accessor.setProperty(propertyKey, history);
+ }
+ _instanceOfflineTimeMap.put(instance, history.getLastOfflineTime());
+ }
+ _updateInstanceOfflineTime = false;
+ }
+
+ /**
* Retrieves the idealstates for all resources
* @return
*/
@@ -269,6 +314,7 @@ public class ClusterDataCache {
liveInstanceMap.put(liveInstance.getId(), liveInstance);
}
_liveInstanceCacheMap = liveInstanceMap;
+ _updateInstanceOfflineTime = true;
}
/**
http://git-wip-us.apache.org/repos/asf/helix/blob/46705c59/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
index e6176ef..bf7302b 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
@@ -246,7 +246,9 @@ public class ParticipantManager {
}
}
- updateHistory();
+ ParticipantHistory history = getHistory();
+ history.reportOnline(_sessionId);
+ persistHistory(history);
}
/**
@@ -339,20 +341,27 @@ public class ParticipantManager {
_messagingService.onConnected();
}
- /**
- * Update participant session history.
- */
- private void updateHistory() {
+ private ParticipantHistory getHistory() {
PropertyKey propertyKey = _keyBuilder.participantHistory(_instanceName);
ParticipantHistory history = _dataAccessor.getProperty(propertyKey);
if (history == null) {
history = new ParticipantHistory(_instanceName);
}
- history.updateHistory(_sessionId);
+ return history;
+ }
+
+ private void persistHistory(ParticipantHistory history) {
+ PropertyKey propertyKey = _keyBuilder.participantHistory(_instanceName);
_dataAccessor.setProperty(propertyKey, history);
}
- public void shutdown() {
+ public void reset() {
+ }
+ public void disconnect() {
+ ParticipantHistory history = getHistory();
+ history.reportOffline();
+ persistHistory(history);
+ reset();
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/46705c59/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index 629f40a..b1ce406 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -1054,10 +1054,10 @@ public class ZKHelixAdmin implements HelixAdmin {
String path = keyBuilder.constraint(constraintType.toString()).getPath();
baseAccessor.update(path, new DataUpdater<ZNRecord>() {
- @Override
- public ZNRecord update(ZNRecord currentData) {
- ClusterConstraints constraints =
- currentData == null ? new ClusterConstraints(constraintType) : new ClusterConstraints(currentData);
+ @Override public ZNRecord update(ZNRecord currentData) {
+ ClusterConstraints constraints = currentData == null ?
+ new ClusterConstraints(constraintType) :
+ new ClusterConstraints(currentData);
constraints.addConstraintItem(constraintId, constraintItem);
return constraints.getRecord();
http://git-wip-us.apache.org/repos/asf/helix/blob/46705c59/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index aa6dc7a..37634bd 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -572,11 +572,6 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
}
} finally {
- _zkclient.close();
- _zkclient = null;
- _sessionStartTime = null;
- LOG.info("Cluster manager: " + _instanceName + " disconnected");
-
if (_controller != null) {
try {
_controller.shutdown();
@@ -587,6 +582,16 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
_leaderElectionHandler = null;
}
}
+
+ if (_participantManager != null) {
+ _participantManager.disconnect();
+ _participantManager = null;
+ }
+
+ _zkclient.close();
+ _zkclient = null;
+ _sessionStartTime = null;
+ LOG.info("Cluster manager: " + _instanceName + " disconnected");
}
}
@@ -898,7 +903,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
void handleNewSessionAsParticipant() throws Exception {
if (_participantManager != null) {
- _participantManager.shutdown();
+ _participantManager.reset();
}
_participantManager =
new ParticipantManager(this, _zkclient, _sessionTimeout, _liveInstanceInfoProvider,
http://git-wip-us.apache.org/repos/asf/helix/blob/46705c59/helix-core/src/main/java/org/apache/helix/model/ParticipantHistory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ParticipantHistory.java b/helix-core/src/main/java/org/apache/helix/model/ParticipantHistory.java
index 585fd59..0ee8a58 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ParticipantHistory.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ParticipantHistory.java
@@ -40,9 +40,12 @@ public class ParticipantHistory extends HelixProperty {
TIME,
DATE,
SESSION,
- HISTORY
+ HISTORY,
+ LAST_OFFLINE_TIME
}
+ public static long ONLINE = -1;
+
public ParticipantHistory(String id) {
super(id);
}
@@ -52,11 +55,44 @@ public class ParticipantHistory extends HelixProperty {
}
/**
- * Update last offline timestamp in participant history.
+ * Called when a participant went offline or is about to go offline.
+ * This will update the offline timestamp in participant history.
+ */
+ public void reportOffline() {
+ long time = System.currentTimeMillis();
+ _record.setSimpleField(ConfigProperty.LAST_OFFLINE_TIME.name(), String.valueOf(time));
+ }
+
+ /**
+ * Called when a participant goes online, this will update all related session history.
*
* @return
*/
- public void updateHistory(String sessionId) {
+ public void reportOnline(String sessionId) {
+ updateSessionHistory(sessionId);
+ _record.setSimpleField(ConfigProperty.LAST_OFFLINE_TIME.name(), String.valueOf(ONLINE));
+ }
+
+ /**
+ * Get the time when this node goes offline last time (epoch time).
+ * If the node is currently online, return -1.
+ * If no offline time is record, return NULL.
+ *
+ * @return
+ */
+ public Long getLastOfflineTime() {
+ String time = _record.getSimpleField(ConfigProperty.LAST_OFFLINE_TIME.name());
+ if (time == null) {
+ return ONLINE;
+ }
+
+ return Long.valueOf(time);
+ }
+
+ /**
+ * Add record to session online history list
+ */
+ private void updateSessionHistory(String sessionId) {
List<String> list = _record.getListField(ConfigProperty.HISTORY.name());
if (list == null) {
list = new ArrayList<String>();
http://git-wip-us.apache.org/repos/asf/helix/blob/46705c59/helix-core/src/test/java/org/apache/helix/integration/TestNodeOfflineTimeStamp.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestNodeOfflineTimeStamp.java b/helix-core/src/test/java/org/apache/helix/integration/TestNodeOfflineTimeStamp.java
new file mode 100644
index 0000000..cfff20b
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestNodeOfflineTimeStamp.java
@@ -0,0 +1,59 @@
+package org.apache.helix.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.ParticipantHistory;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestNodeOfflineTimeStamp extends ZkStandAloneCMTestBase {
+ final String className = getShortClassName();
+
+ @Test
+ public void testNodeShutdown() throws Exception {
+ for (MockParticipantManager participant : _participants) {
+ ParticipantHistory history = getInstanceHistory(participant.getInstanceName());
+ Assert.assertNotNull(history);
+ Assert.assertEquals(history.getLastOfflineTime(), Long.valueOf(ParticipantHistory.ONLINE));
+ }
+
+ long shutdownTime = System.currentTimeMillis();
+ _participants[0].syncStop();
+ ParticipantHistory history = getInstanceHistory(_participants[0].getInstanceName());
+ long recordTime = history.getLastOfflineTime();
+
+ Assert.assertTrue(Math.abs(shutdownTime - recordTime) <= 500L);
+
+ _participants[0].reset();
+ _participants[0].syncStart();
+
+ history = getInstanceHistory(_participants[0].getInstanceName());
+ Assert.assertEquals(history.getLastOfflineTime(), Long.valueOf(ParticipantHistory.ONLINE));
+ }
+
+ private ParticipantHistory getInstanceHistory(String instance) {
+ HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+ PropertyKey propertyKey = accessor.keyBuilder().participantHistory(instance);
+ return accessor.getProperty(propertyKey);
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/46705c59/helix-core/src/test/java/org/apache/helix/integration/TestRebalancerPersistAssignments.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestRebalancerPersistAssignments.java b/helix-core/src/test/java/org/apache/helix/integration/TestRebalancerPersistAssignments.java
index 74c7a9f..fd0cc64 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestRebalancerPersistAssignments.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestRebalancerPersistAssignments.java
@@ -29,12 +29,15 @@ import org.apache.helix.model.IdealState;
import org.apache.helix.model.IdealState.RebalanceMode;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossibleExternalViewVerifier;
import org.apache.helix.tools.ClusterStateVerifier.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.HelixClusterVerifier;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
+import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.Map;
@@ -81,7 +84,40 @@ public class TestRebalancerPersistAssignments extends ZkStandAloneCMTestBase {
}
@Test(dataProvider = "rebalanceModes")
- public void testAutoRebalanceWithPersistAssignmentEnable(RebalanceMode rebalanceMode)
+ public void testDisablePersist(RebalanceMode rebalanceMode)
+ throws Exception {
+ String testDb = "TestDB2-" + rebalanceMode.name();
+
+ _setupTool.addResourceToCluster(CLUSTER_NAME, testDb, 5,
+ BuiltInStateModelDefinitions.LeaderStandby.name(), rebalanceMode.name());
+ _setupTool.rebalanceStorageCluster(CLUSTER_NAME, testDb, 3);
+
+ HelixClusterVerifier verifier =
+ new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
+ .setResources(new HashSet<String>(Collections.singleton(testDb))).build();
+ Assert.assertTrue(verifier.verify());
+
+ // kill 1 node
+ _participants[0].syncStop();
+
+ Assert.assertTrue(verifier.verify());
+
+ IdealState idealState =
+ _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, testDb);
+
+ Set<String> excludedInstances = new HashSet<String>();
+ excludedInstances.add(_participants[0].getInstanceName());
+ verifyAssignmentInIdealStateWithPersistDisabled(idealState, excludedInstances);
+
+ // clean up
+ _setupTool.getClusterManagementTool().dropResource(CLUSTER_NAME, testDb);
+ _participants[0] =
+ new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, _participants[0].getInstanceName());
+ _participants[0].syncStart();
+ }
+
+ @Test(dataProvider = "rebalanceModes", dependsOnMethods = {"testDisablePersist"})
+ public void testEnablePersist(RebalanceMode rebalanceMode)
throws Exception {
String testDb = "TestDB1-" + rebalanceMode.name();
enablePersistAssignment(true);
http://git-wip-us.apache.org/repos/asf/helix/blob/46705c59/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java b/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java
index 9f1cf1c..d0609ac 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java
@@ -21,6 +21,9 @@ package org.apache.helix.integration;
import java.util.Date;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.tools.ClusterSetup;
@@ -46,7 +49,8 @@ public class ZkStandAloneCMTestBase extends ZkIntegrationTestBase {
protected static final String TEST_DB = "TestDB";
protected static final int _PARTITIONS = 20;
- protected ClusterSetup _setupTool = null;
+ protected ClusterSetup _setupTool;
+ protected HelixManager _manager;
protected final String CLASS_NAME = getShortClassName();
protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
@@ -97,6 +101,11 @@ public class ZkStandAloneCMTestBase extends ZkIntegrationTestBase {
ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
CLUSTER_NAME));
Assert.assertTrue(result);
+
+ // create cluster manager
+ _manager = HelixManagerFactory
+ .getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
+ _manager.connect();
}
@AfterClass