You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by lx...@apache.org on 2017/02/08 17:59:56 UTC

[21/38] helix git commit: Persist participant's offline timestamp in ParticipantHistory.

Persist participant's offline timestamp in ParticipantHistory.

This is to persist the timestamp when a participant is going offline.
1) If a participant goes offlien gracefully (by calling disconnect()), participant will write a timestamp to its history record.
2) If a participant goes offline without calling disconnect() (e.g, GC, machine crashes),  controller will try to set the timestamp in its pipeline triggered by liveInstanceChanges.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/46705c59
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/46705c59
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/46705c59

Branch: refs/heads/helix-0.6.x
Commit: 46705c598e054cec54c4d4cd6fb309b3d1a25475
Parents: 9e8ec45
Author: Lei Xia <lx...@linkedin.com>
Authored: Thu Sep 1 18:21:36 2016 -0700
Committer: Lei Xia <lx...@linkedin.com>
Committed: Wed Feb 8 09:50:48 2017 -0800

----------------------------------------------------------------------
 .../controller/stages/ClusterDataCache.java     | 46 +++++++++++++++
 .../helix/manager/zk/ParticipantManager.java    | 23 +++++---
 .../apache/helix/manager/zk/ZKHelixAdmin.java   |  8 +--
 .../apache/helix/manager/zk/ZKHelixManager.java | 17 ++++--
 .../apache/helix/model/ParticipantHistory.java  | 42 +++++++++++++-
 .../integration/TestNodeOfflineTimeStamp.java   | 59 ++++++++++++++++++++
 .../TestRebalancerPersistAssignments.java       | 38 ++++++++++++-
 .../integration/ZkStandAloneCMTestBase.java     | 11 +++-
 8 files changed, 222 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/46705c59/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index dbc12d4..2cd8ec0 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -19,6 +19,7 @@ package org.apache.helix.controller.stages;
  * under the License.
  */
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -39,6 +40,7 @@ import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
+import org.apache.helix.model.ParticipantHistory;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.log4j.Logger;
@@ -62,6 +64,7 @@ public class ClusterDataCache {
   Map<String, StateModelDefinition> _stateModelDefMap;
   Map<String, InstanceConfig> _instanceConfigMap;
   Map<String, InstanceConfig> _instanceConfigCacheMap;
+  Map<String, Long> _instanceOfflineTimeMap;
   Map<String, ResourceConfig> _resourceConfigMap;
   Map<String, ResourceConfig> _resourceConfigCacheMap;
   Map<String, ClusterConstraints> _constraintMap;
@@ -74,6 +77,8 @@ public class ClusterDataCache {
 
   boolean _init = true;
 
+  boolean _updateInstanceOfflineTime = true;
+
   private static final Logger LOG = Logger.getLogger(ClusterDataCache.class.getName());
 
   /**
@@ -102,6 +107,10 @@ public class ClusterDataCache {
     _stateModelDefMap = accessor.getChildValuesMap(keyBuilder.stateModelDefs());
     _constraintMap = accessor.getChildValuesMap(keyBuilder.constraints());
 
+    if (_init || _updateInstanceOfflineTime) {
+      updateOfflineInstanceHistory(accessor);
+    }
+
     if (LOG.isTraceEnabled()) {
       for (LiveInstance instance : _liveInstanceMap.values()) {
         LOG.trace("live instance: " + instance.getInstanceName() + " " + instance.getSessionId());
@@ -236,6 +245,42 @@ public class ClusterDataCache {
   }
 
   /**
+   * Return the last offline time for given instance.
+   * Return NULL if the instance is ONLINE currently, or the record is not persisted somehow.
+   *
+   * @param instanceName
+   * @return
+   */
+  public Long getInstanceOfflineTime(String instanceName) {
+    if (_instanceOfflineTimeMap != null) {
+      return _instanceOfflineTimeMap.get(instanceName);
+    }
+    return null;
+  }
+
+  private void updateOfflineInstanceHistory(HelixDataAccessor accessor) {
+    List<String> offlineNodes = new ArrayList<String>(_instanceConfigMap.keySet());
+    offlineNodes.removeAll(_liveInstanceMap.keySet());
+    _instanceOfflineTimeMap = new HashMap<String, Long>();
+
+    for(String instance : offlineNodes) {
+      Builder keyBuilder = accessor.keyBuilder();
+      PropertyKey propertyKey = keyBuilder.participantHistory(instance);
+      ParticipantHistory history = accessor.getProperty(propertyKey);
+      if (history == null) {
+        history = new ParticipantHistory(instance);
+      }
+      if (history.getLastOfflineTime() == ParticipantHistory.ONLINE) {
+        history.reportOffline();
+        // persist history back to ZK.
+        accessor.setProperty(propertyKey, history);
+      }
+      _instanceOfflineTimeMap.put(instance, history.getLastOfflineTime());
+    }
+    _updateInstanceOfflineTime = false;
+  }
+
+  /**
    * Retrieves the idealstates for all resources
    * @return
    */
@@ -269,6 +314,7 @@ public class ClusterDataCache {
       liveInstanceMap.put(liveInstance.getId(), liveInstance);
     }
     _liveInstanceCacheMap = liveInstanceMap;
+    _updateInstanceOfflineTime = true;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/helix/blob/46705c59/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
index e6176ef..bf7302b 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
@@ -246,7 +246,9 @@ public class ParticipantManager {
       }
     }
 
-    updateHistory();
+    ParticipantHistory history = getHistory();
+    history.reportOnline(_sessionId);
+    persistHistory(history);
   }
 
   /**
@@ -339,20 +341,27 @@ public class ParticipantManager {
     _messagingService.onConnected();
   }
 
-  /**
-   * Update participant session history.
-   */
-  private void updateHistory() {
+  private ParticipantHistory getHistory() {
     PropertyKey propertyKey = _keyBuilder.participantHistory(_instanceName);
     ParticipantHistory history = _dataAccessor.getProperty(propertyKey);
     if (history == null) {
       history = new ParticipantHistory(_instanceName);
     }
-    history.updateHistory(_sessionId);
+    return history;
+  }
+
+  private void persistHistory(ParticipantHistory history) {
+    PropertyKey propertyKey = _keyBuilder.participantHistory(_instanceName);
     _dataAccessor.setProperty(propertyKey, history);
   }
 
-  public void shutdown() {
+  public void reset() {
+  }
 
+  public void disconnect() {
+    ParticipantHistory history = getHistory();
+    history.reportOffline();
+    persistHistory(history);
+    reset();
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/46705c59/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index 629f40a..b1ce406 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -1054,10 +1054,10 @@ public class ZKHelixAdmin implements HelixAdmin {
     String path = keyBuilder.constraint(constraintType.toString()).getPath();
 
     baseAccessor.update(path, new DataUpdater<ZNRecord>() {
-      @Override
-      public ZNRecord update(ZNRecord currentData) {
-        ClusterConstraints constraints =
-            currentData == null ? new ClusterConstraints(constraintType) : new ClusterConstraints(currentData);
+      @Override public ZNRecord update(ZNRecord currentData) {
+        ClusterConstraints constraints = currentData == null ?
+            new ClusterConstraints(constraintType) :
+            new ClusterConstraints(currentData);
 
         constraints.addConstraintItem(constraintId, constraintItem);
         return constraints.getRecord();

http://git-wip-us.apache.org/repos/asf/helix/blob/46705c59/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index aa6dc7a..37634bd 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -572,11 +572,6 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
       }
 
     } finally {
-      _zkclient.close();
-      _zkclient = null;
-      _sessionStartTime = null;
-      LOG.info("Cluster manager: " + _instanceName + " disconnected");
-
       if (_controller != null) {
         try {
           _controller.shutdown();
@@ -587,6 +582,16 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
           _leaderElectionHandler = null;
         }
       }
+
+      if (_participantManager != null) {
+        _participantManager.disconnect();
+        _participantManager = null;
+      }
+
+      _zkclient.close();
+      _zkclient = null;
+      _sessionStartTime = null;
+      LOG.info("Cluster manager: " + _instanceName + " disconnected");
     }
   }
 
@@ -898,7 +903,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
 
   void handleNewSessionAsParticipant() throws Exception {
     if (_participantManager != null) {
-      _participantManager.shutdown();
+      _participantManager.reset();
     }
     _participantManager =
         new ParticipantManager(this, _zkclient, _sessionTimeout, _liveInstanceInfoProvider,

http://git-wip-us.apache.org/repos/asf/helix/blob/46705c59/helix-core/src/main/java/org/apache/helix/model/ParticipantHistory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ParticipantHistory.java b/helix-core/src/main/java/org/apache/helix/model/ParticipantHistory.java
index 585fd59..0ee8a58 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ParticipantHistory.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ParticipantHistory.java
@@ -40,9 +40,12 @@ public class ParticipantHistory extends HelixProperty {
     TIME,
     DATE,
     SESSION,
-    HISTORY
+    HISTORY,
+    LAST_OFFLINE_TIME
   }
 
+  public static long ONLINE = -1;
+
   public ParticipantHistory(String id) {
     super(id);
   }
@@ -52,11 +55,44 @@ public class ParticipantHistory extends HelixProperty {
   }
 
   /**
-   * Update last offline timestamp in participant history.
+   * Called when a participant went offline or is about to go offline.
+   * This will update the offline timestamp in participant history.
+   */
+  public void reportOffline() {
+    long time = System.currentTimeMillis();
+    _record.setSimpleField(ConfigProperty.LAST_OFFLINE_TIME.name(), String.valueOf(time));
+  }
+
+  /**
+   * Called when a participant goes online, this will update all related session history.
    *
    * @return
    */
-  public void updateHistory(String sessionId) {
+  public void reportOnline(String sessionId) {
+    updateSessionHistory(sessionId);
+    _record.setSimpleField(ConfigProperty.LAST_OFFLINE_TIME.name(), String.valueOf(ONLINE));
+  }
+
+  /**
+   * Get the time when this node goes offline last time (epoch time).
+   * If the node is currently online, return -1.
+   * If no offline time is record, return NULL.
+   *
+   * @return
+   */
+  public Long getLastOfflineTime() {
+    String time = _record.getSimpleField(ConfigProperty.LAST_OFFLINE_TIME.name());
+    if (time == null) {
+      return ONLINE;
+    }
+
+    return Long.valueOf(time);
+  }
+
+  /**
+   * Add record to session online history list
+   */
+  private void updateSessionHistory(String sessionId) {
     List<String> list = _record.getListField(ConfigProperty.HISTORY.name());
     if (list == null) {
       list = new ArrayList<String>();

http://git-wip-us.apache.org/repos/asf/helix/blob/46705c59/helix-core/src/test/java/org/apache/helix/integration/TestNodeOfflineTimeStamp.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestNodeOfflineTimeStamp.java b/helix-core/src/test/java/org/apache/helix/integration/TestNodeOfflineTimeStamp.java
new file mode 100644
index 0000000..cfff20b
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestNodeOfflineTimeStamp.java
@@ -0,0 +1,59 @@
+package org.apache.helix.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.ParticipantHistory;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestNodeOfflineTimeStamp extends ZkStandAloneCMTestBase {
+  final String className = getShortClassName();
+
+  @Test
+  public void testNodeShutdown() throws Exception {
+    for (MockParticipantManager participant : _participants) {
+      ParticipantHistory history = getInstanceHistory(participant.getInstanceName());
+      Assert.assertNotNull(history);
+      Assert.assertEquals(history.getLastOfflineTime(), Long.valueOf(ParticipantHistory.ONLINE));
+    }
+
+    long shutdownTime = System.currentTimeMillis();
+    _participants[0].syncStop();
+    ParticipantHistory history = getInstanceHistory(_participants[0].getInstanceName());
+    long recordTime = history.getLastOfflineTime();
+
+    Assert.assertTrue(Math.abs(shutdownTime - recordTime) <= 500L);
+
+    _participants[0].reset();
+    _participants[0].syncStart();
+
+    history = getInstanceHistory(_participants[0].getInstanceName());
+    Assert.assertEquals(history.getLastOfflineTime(), Long.valueOf(ParticipantHistory.ONLINE));
+  }
+
+  private ParticipantHistory getInstanceHistory(String instance) {
+    HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+    PropertyKey propertyKey = accessor.keyBuilder().participantHistory(instance);
+    return accessor.getProperty(propertyKey);
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/46705c59/helix-core/src/test/java/org/apache/helix/integration/TestRebalancerPersistAssignments.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestRebalancerPersistAssignments.java b/helix-core/src/test/java/org/apache/helix/integration/TestRebalancerPersistAssignments.java
index 74c7a9f..fd0cc64 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestRebalancerPersistAssignments.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestRebalancerPersistAssignments.java
@@ -29,12 +29,15 @@ import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
 import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossibleExternalViewVerifier;
 import org.apache.helix.tools.ClusterStateVerifier.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.HelixClusterVerifier;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
+import java.util.Collections;
 import java.util.Date;
 import java.util.HashSet;
 import java.util.Map;
@@ -81,7 +84,40 @@ public class TestRebalancerPersistAssignments extends ZkStandAloneCMTestBase {
   }
 
   @Test(dataProvider = "rebalanceModes")
-  public void testAutoRebalanceWithPersistAssignmentEnable(RebalanceMode rebalanceMode)
+  public void testDisablePersist(RebalanceMode rebalanceMode)
+      throws Exception {
+    String testDb = "TestDB2-" + rebalanceMode.name();
+
+    _setupTool.addResourceToCluster(CLUSTER_NAME, testDb, 5,
+        BuiltInStateModelDefinitions.LeaderStandby.name(), rebalanceMode.name());
+    _setupTool.rebalanceStorageCluster(CLUSTER_NAME, testDb, 3);
+
+    HelixClusterVerifier verifier =
+        new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
+            .setResources(new HashSet<String>(Collections.singleton(testDb))).build();
+    Assert.assertTrue(verifier.verify());
+
+    // kill 1 node
+    _participants[0].syncStop();
+
+    Assert.assertTrue(verifier.verify());
+
+    IdealState idealState =
+        _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, testDb);
+
+    Set<String> excludedInstances = new HashSet<String>();
+    excludedInstances.add(_participants[0].getInstanceName());
+    verifyAssignmentInIdealStateWithPersistDisabled(idealState, excludedInstances);
+
+    // clean up
+    _setupTool.getClusterManagementTool().dropResource(CLUSTER_NAME, testDb);
+    _participants[0] =
+        new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, _participants[0].getInstanceName());
+    _participants[0].syncStart();
+  }
+
+  @Test(dataProvider = "rebalanceModes", dependsOnMethods = {"testDisablePersist"})
+  public void testEnablePersist(RebalanceMode rebalanceMode)
       throws Exception {
     String testDb = "TestDB1-" + rebalanceMode.name();
     enablePersistAssignment(true);

http://git-wip-us.apache.org/repos/asf/helix/blob/46705c59/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java b/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java
index 9f1cf1c..d0609ac 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java
@@ -21,6 +21,9 @@ package org.apache.helix.integration;
 
 import java.util.Date;
 
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.tools.ClusterSetup;
@@ -46,7 +49,8 @@ public class ZkStandAloneCMTestBase extends ZkIntegrationTestBase {
   protected static final String TEST_DB = "TestDB";
   protected static final int _PARTITIONS = 20;
 
-  protected ClusterSetup _setupTool = null;
+  protected ClusterSetup _setupTool;
+  protected HelixManager _manager;
   protected final String CLASS_NAME = getShortClassName();
   protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
 
@@ -97,6 +101,11 @@ public class ZkStandAloneCMTestBase extends ZkIntegrationTestBase {
         ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
             CLUSTER_NAME));
     Assert.assertTrue(result);
+
+    // create cluster manager
+    _manager = HelixManagerFactory
+        .getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
+    _manager.connect();
   }
 
   @AfterClass