You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/07/12 20:48:30 UTC

[1/4] helix git commit: Fix ZK configurations in the tests.

Repository: helix
Updated Branches:
  refs/heads/master b5e5e4919 -> be619d462


Fix ZK configurations in the tests.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/be619d46
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/be619d46
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/be619d46

Branch: refs/heads/master
Commit: be619d462883dbab67b8d6b2e75d1b7d3b8a161b
Parents: 0558344
Author: Jiajun Wang <jj...@linkedin.com>
Authored: Wed Jun 6 17:07:31 2018 -0700
Committer: jiajunwang <er...@gmail.com>
Committed: Thu Jul 12 13:45:17 2018 -0700

----------------------------------------------------------------------
 .../org/apache/helix/webapp/AdminTestBase.java  | 12 +--
 .../helix/integration/TestZkConnectionLost.java | 88 +++++++++++---------
 .../helix/rest/server/AbstractTestClass.java    | 37 ++++----
 3 files changed, 75 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/be619d46/helix-admin-webapp/src/test/java/org/apache/helix/webapp/AdminTestBase.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/AdminTestBase.java b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/AdminTestBase.java
index 65e084f..1daf330 100644
--- a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/AdminTestBase.java
+++ b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/AdminTestBase.java
@@ -19,8 +19,6 @@ package org.apache.helix.webapp;
  * under the License.
  */
 
-import java.util.logging.Level;
-
 import org.I0Itec.zkclient.ZkServer;
 import org.apache.helix.TestHelper;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
@@ -28,14 +26,16 @@ import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.util.ZKClientPool;
 import org.apache.helix.webapp.AdminTestHelper.AdminThread;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.restlet.Client;
 import org.restlet.data.Protocol;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.testng.AssertJUnit;
 import org.testng.annotations.AfterSuite;
 import org.testng.annotations.BeforeSuite;
 
+import java.util.logging.Level;
+
 public class AdminTestBase {
   private static Logger LOG = LoggerFactory.getLogger(AdminTestBase.class);
   public static final String ZK_ADDR = "localhost:2187";
@@ -60,8 +60,8 @@ public class AdminTestBase {
     ZKClientPool.reset();
 
     _gZkClient =
-        new ZkClient(ZK_ADDR, ZkClient.DEFAULT_CONNECTION_TIMEOUT,
-            ZkClient.DEFAULT_SESSION_TIMEOUT, new ZNRecordSerializer());
+        new ZkClient(ZK_ADDR, ZkClient.DEFAULT_SESSION_TIMEOUT, ZkClient.DEFAULT_CONNECTION_TIMEOUT,
+            new ZNRecordSerializer());
     _gSetupTool = new ClusterSetup(_gZkClient);
 
     // start admin

http://git-wip-us.apache.org/repos/asf/helix/blob/be619d46/helix-core/src/test/java/org/apache/helix/integration/TestZkConnectionLost.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestZkConnectionLost.java b/helix-core/src/test/java/org/apache/helix/integration/TestZkConnectionLost.java
index 85b8554..319f540 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestZkConnectionLost.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestZkConnectionLost.java
@@ -2,13 +2,9 @@ package org.apache.helix.integration;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Sets;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicReference;
 import org.I0Itec.zkclient.ZkServer;
 import org.apache.helix.HelixException;
+import org.apache.helix.SystemPropertyKeys;
 import org.apache.helix.TestHelper;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
@@ -29,6 +25,12 @@ import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
+
 public class TestZkConnectionLost extends TaskTestBase {
   private static final Logger LOG = LoggerFactory.getLogger(TestZkConnectionLost.class);
 
@@ -65,53 +67,63 @@ public class TestZkConnectionLost extends TaskTestBase {
 
   @Test
   public void testLostZkConnection() throws Exception {
-    System.setProperty("helixmanager.waitForConnectedTimeout", "1000");
-    System.setProperty("zk.session.timeout", "1000");
-    String queueName = TestHelper.getTestMethodName();
+    System.setProperty(SystemPropertyKeys.ZK_WAIT_CONNECTED_TIMEOUT, "1000");
+    System.setProperty(SystemPropertyKeys.ZK_SESSION_TIMEOUT, "1000");
+    try {
+      String queueName = TestHelper.getTestMethodName();
 
-    startParticipants(_zkAddr);
+      startParticipants(_zkAddr);
 
-    // Create a queue
-    LOG.info("Starting job-queue: " + queueName);
-    JobQueue.Builder queueBuild = TaskTestUtil.buildRecurrentJobQueue(queueName, 0, 6000);
-    createAndEnqueueJob(queueBuild, 3);
+      // Create a queue
+      LOG.info("Starting job-queue: " + queueName);
+      JobQueue.Builder queueBuild = TaskTestUtil.buildRecurrentJobQueue(queueName, 0, 6000);
+      createAndEnqueueJob(queueBuild, 3);
 
-    _driver.start(queueBuild.build());
+      _driver.start(queueBuild.build());
 
-    restartZkServer();
+      restartZkServer();
 
-    WorkflowContext wCtx = TaskTestUtil.pollForWorkflowContext(_driver, queueName);
-    String scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
-    _driver.pollForWorkflowState(scheduledQueue, 30000, TaskState.COMPLETED);
+      WorkflowContext wCtx = TaskTestUtil.pollForWorkflowContext(_driver, queueName);
+      String scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
+      _driver.pollForWorkflowState(scheduledQueue, 30000, TaskState.COMPLETED);
+    } finally {
+      System.clearProperty(SystemPropertyKeys.ZK_WAIT_CONNECTED_TIMEOUT);
+      System.clearProperty(SystemPropertyKeys.ZK_SESSION_TIMEOUT);
+    }
   }
 
   @Test(dependsOnMethods = { "testLostZkConnection" }, enabled = false)
-  public void testLostZkConnectionNegative()
-      throws Exception {
-    System.setProperty("helixmanager.waitForConnectedTimeout", "10");
-    System.setProperty("zk.session.timeout", "1000");
-    String queueName = TestHelper.getTestMethodName();
+  public void testLostZkConnectionNegative() throws Exception {
+    System.setProperty(SystemPropertyKeys.ZK_WAIT_CONNECTED_TIMEOUT, "10");
+    System.setProperty(SystemPropertyKeys.ZK_SESSION_TIMEOUT, "1000");
 
-    stopParticipants();
-    startParticipants(_zkAddr);
+    try {
+      String queueName = TestHelper.getTestMethodName();
 
-    LOG.info("Starting job-queue: " + queueName);
-    JobQueue.Builder queueBuild = TaskTestUtil.buildRecurrentJobQueue(queueName, 0, 6000);
-    createAndEnqueueJob(queueBuild, 3);
+      stopParticipants();
+      startParticipants(_zkAddr);
 
-    _driver.start(queueBuild.build());
+      LOG.info("Starting job-queue: " + queueName);
+      JobQueue.Builder queueBuild = TaskTestUtil.buildRecurrentJobQueue(queueName, 0, 6000);
+      createAndEnqueueJob(queueBuild, 3);
 
-    restartZkServer();
+      _driver.start(queueBuild.build());
 
-    WorkflowContext wCtx = TaskTestUtil.pollForWorkflowContext(_driver, queueName);
-    // ensure job 1 is started before stop it
-    String scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
+      restartZkServer();
 
-    try{
-      _driver.pollForWorkflowState(scheduledQueue, 30000, TaskState.COMPLETED);
-      Assert.fail("Test failure!");
-    } catch (HelixException ex) {
-      // test succeed
+      WorkflowContext wCtx = TaskTestUtil.pollForWorkflowContext(_driver, queueName);
+      // ensure job 1 is started before stop it
+      String scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
+
+      try {
+        _driver.pollForWorkflowState(scheduledQueue, 30000, TaskState.COMPLETED);
+        Assert.fail("Test failure!");
+      } catch (HelixException ex) {
+        // test succeed
+      }
+    } finally {
+      System.clearProperty(SystemPropertyKeys.ZK_WAIT_CONNECTED_TIMEOUT);
+      System.clearProperty(SystemPropertyKeys.ZK_SESSION_TIMEOUT);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/be619d46/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
----------------------------------------------------------------------
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java b/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
index 8b86c17..0f7a43d 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
@@ -19,20 +19,6 @@ package org.apache.helix.rest.server;
  * under the License.
  */
 
-import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.logging.Level;
-import javax.ws.rs.client.Entity;
-import javax.ws.rs.client.WebTarget;
-import javax.ws.rs.core.Application;
-import javax.ws.rs.core.Response;
 import org.I0Itec.zkclient.ZkServer;
 import org.apache.helix.AccessOption;
 import org.apache.helix.BaseDataAccessor;
@@ -74,6 +60,21 @@ import org.testng.Assert;
 import org.testng.annotations.AfterSuite;
 import org.testng.annotations.BeforeSuite;
 
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.Application;
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.logging.Level;
+
 public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest {
   protected static final String ZK_ADDR = "localhost:2123";
   protected static final String WORKFLOW_PREFIX = "Workflow_";
@@ -201,10 +202,10 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest {
       java.util.logging.Logger topJavaLogger = java.util.logging.Logger.getLogger("");
       topJavaLogger.setLevel(Level.WARNING);
 
-      _gZkClient = new ZkClient(ZK_ADDR, ZkClient.DEFAULT_CONNECTION_TIMEOUT,
-          ZkClient.DEFAULT_SESSION_TIMEOUT, new ZNRecordSerializer());
-      _gZkClientTestNS = new ZkClient(_zkAddrTestNS, ZkClient.DEFAULT_CONNECTION_TIMEOUT, ZkClient.DEFAULT_SESSION_TIMEOUT,
-          new ZNRecordSerializer());
+      _gZkClient = new ZkClient(ZK_ADDR, ZkClient.DEFAULT_SESSION_TIMEOUT,
+          ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
+      _gZkClientTestNS = new ZkClient(_zkAddrTestNS, ZkClient.DEFAULT_SESSION_TIMEOUT,
+          ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
       _gSetupTool = new ClusterSetup(_gZkClient);
       _configAccessor = new ConfigAccessor(_gZkClient);
       _baseAccessor = new ZkBaseDataAccessor<>(_gZkClient);


[4/4] helix git commit: Increase TestZkConnectionLost timeout to stable the test.

Posted by jx...@apache.org.
Increase TestZkConnectionLost timeout to stable the test.

RB=1326718
G=helix-reviewers
A=lxia


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/e2b1277c
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/e2b1277c
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/e2b1277c

Branch: refs/heads/master
Commit: e2b1277c84f877fee2accde641334369740a3155
Parents: b5e5e49
Author: Jiajun Wang <jj...@linkedin.com>
Authored: Fri Jun 1 10:43:14 2018 -0700
Committer: jiajunwang <er...@gmail.com>
Committed: Thu Jul 12 13:45:17 2018 -0700

----------------------------------------------------------------------
 .../src/main/java/org/apache/helix/task/TaskDriver.java       | 7 ++++---
 .../org/apache/helix/integration/TestZkConnectionLost.java    | 4 ++--
 2 files changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/e2b1277c/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index b04534f..bc767b7 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -859,9 +859,10 @@ public class TaskDriver {
         .contains(ctx.getWorkflowState())) && System.currentTimeMillis() < st + timeout);
 
     if (ctx == null || !allowedStates.contains(ctx.getWorkflowState())) {
-      throw new HelixException(String
-          .format("Workflow \"%s\" context is empty or not in states: \"%s\"", workflowName,
-              targetStates));
+      throw new HelixException(String.format(
+          "Workflow \"%s\" context is empty or not in states: \"%s\", current state: \"%s\"",
+          workflowName, targetStates.toString(),
+          ctx == null ? "null" : ctx.getWorkflowState().toString()));
     }
 
     return ctx.getWorkflowState();

http://git-wip-us.apache.org/repos/asf/helix/blob/e2b1277c/helix-core/src/test/java/org/apache/helix/integration/TestZkConnectionLost.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestZkConnectionLost.java b/helix-core/src/test/java/org/apache/helix/integration/TestZkConnectionLost.java
index 3721b2c..85b8554 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestZkConnectionLost.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestZkConnectionLost.java
@@ -82,7 +82,7 @@ public class TestZkConnectionLost extends TaskTestBase {
 
     WorkflowContext wCtx = TaskTestUtil.pollForWorkflowContext(_driver, queueName);
     String scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
-    _driver.pollForWorkflowState(scheduledQueue, 10000, TaskState.COMPLETED);
+    _driver.pollForWorkflowState(scheduledQueue, 30000, TaskState.COMPLETED);
   }
 
   @Test(dependsOnMethods = { "testLostZkConnection" }, enabled = false)
@@ -108,7 +108,7 @@ public class TestZkConnectionLost extends TaskTestBase {
     String scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
 
     try{
-      _driver.pollForWorkflowState(scheduledQueue, 10000, TaskState.COMPLETED);
+      _driver.pollForWorkflowState(scheduledQueue, 30000, TaskState.COMPLETED);
       Assert.fail("Test failure!");
     } catch (HelixException ex) {
       // test succeed


[2/4] helix git commit: DelayedAutoRebalancer should calculate assignment based on full partition list.

Posted by jx...@apache.org.
DelayedAutoRebalancer should calculate assignment based on full partition list.

An issue was report that DelayedAutoRebalancer will generate different assignment when user change their user-defined preference list. This is because for some full-auto rebalance strategy, the algorithm balance partition based on workload. As a result, config change in one partition will affect others.

In this change, we changed the DelayedAutoRebalancer to calculate assignment based on full partition list first. Then apply user-defined list. This will fix the issue.

Also updated related test to cover this scenario.

RB=1327182
BUG=HELIX-1052
G=helix-reviewers
A=lxia


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/c145c7c7
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/c145c7c7
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/c145c7c7

Branch: refs/heads/master
Commit: c145c7c71b996f7a223d82683f1eeff8222f728c
Parents: e2b1277
Author: Jiajun Wang <jj...@linkedin.com>
Authored: Fri Jun 1 15:30:39 2018 -0700
Committer: jiajunwang <er...@gmail.com>
Committed: Thu Jul 12 13:45:17 2018 -0700

----------------------------------------------------------------------
 .../rebalancer/DelayedAutoRebalancer.java       |  5 +--
 .../rebalancer/TestMixedModeAutoRebalance.java  | 47 +++++++++++++++-----
 2 files changed, 36 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/c145c7c7/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
index 02f96f6..7b38c31 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
@@ -148,12 +148,9 @@ public class DelayedAutoRebalancer extends AbstractRebalancer {
     Map<String, Map<String, String>> currentMapping =
         currentMapping(currentStateOutput, resourceName, allPartitions, stateCountMap);
 
-
-    List<String> partitionsToAssign = new ArrayList<>(allPartitions);
-    partitionsToAssign.removeAll(userDefinedPreferenceList.keySet());
     int maxPartition = currentIdealState.getMaxPartitionsPerInstance();
     _rebalanceStrategy =
-        getRebalanceStrategy(currentIdealState.getRebalanceStrategy(), partitionsToAssign, resourceName,
+        getRebalanceStrategy(currentIdealState.getRebalanceStrategy(), allPartitions, resourceName,
             stateCountMap, maxPartition);
 
     // sort node lists to ensure consistent preferred assignments

http://git-wip-us.apache.org/repos/asf/helix/blob/c145c7c7/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestMixedModeAutoRebalance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestMixedModeAutoRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestMixedModeAutoRebalance.java
index 77340af..156bad5 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestMixedModeAutoRebalance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestMixedModeAutoRebalance.java
@@ -21,11 +21,14 @@ package org.apache.helix.integration.rebalancer;
 
 import java.util.ArrayList;
 import java.util.Date;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.NotificationContext;
+import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
 import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy;
 import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
 import org.apache.helix.integration.common.ZkIntegrationTestBase;
@@ -102,25 +105,27 @@ public class TestMixedModeAutoRebalance extends ZkIntegrationTestBase {
 
   @DataProvider(name = "stateModels")
   public static Object [][] stateModels() {
-    return new Object[][] { {BuiltInStateModelDefinitions.MasterSlave.name(), true},
-        {BuiltInStateModelDefinitions.OnlineOffline.name(), true},
-        {BuiltInStateModelDefinitions.LeaderStandby.name(), true},
-        {BuiltInStateModelDefinitions.MasterSlave.name(), false},
-        {BuiltInStateModelDefinitions.OnlineOffline.name(), false},
-        {BuiltInStateModelDefinitions.LeaderStandby.name(), false},
+    return new Object[][] { {BuiltInStateModelDefinitions.MasterSlave.name(), true, CrushRebalanceStrategy.class.getName()},
+        {BuiltInStateModelDefinitions.OnlineOffline.name(), true, CrushRebalanceStrategy.class.getName()},
+        {BuiltInStateModelDefinitions.LeaderStandby.name(), true, CrushRebalanceStrategy.class.getName()},
+        {BuiltInStateModelDefinitions.MasterSlave.name(), false, CrushRebalanceStrategy.class.getName()},
+        {BuiltInStateModelDefinitions.OnlineOffline.name(), false, CrushRebalanceStrategy.class.getName()},
+        {BuiltInStateModelDefinitions.LeaderStandby.name(), false, CrushRebalanceStrategy.class.getName()},
+        {BuiltInStateModelDefinitions.MasterSlave.name(), true, CrushEdRebalanceStrategy.class.getName()},
+        {BuiltInStateModelDefinitions.OnlineOffline.name(), true, CrushEdRebalanceStrategy.class.getName()}
     };
   }
 
   @Test(dataProvider = "stateModels")
   public void testUserDefinedPreferenceListsInFullAuto(
-      String stateModel, boolean delayEnabled) throws Exception {
+      String stateModel, boolean delayEnabled, String rebalanceStrateyName) throws Exception {
     String db = "Test-DB-" + stateModel;
     if (delayEnabled) {
       createResourceWithDelayedRebalance(CLUSTER_NAME, db, stateModel, _PARTITIONS, _replica,
-          _replica - 1, 200, CrushRebalanceStrategy.class.getName());
+          _replica - 1, 200, rebalanceStrateyName);
     } else {
       createResourceWithDelayedRebalance(CLUSTER_NAME, db, stateModel, _PARTITIONS, _replica,
-          _replica, 0, CrushRebalanceStrategy.class.getName());
+          _replica, 0, rebalanceStrateyName);
     }
     IdealState idealState =
         _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
@@ -142,12 +147,18 @@ public class TestMixedModeAutoRebalance extends ZkIntegrationTestBase {
 
     //TODO: Trigger rebalancer, remove this once Helix controller is listening on resource config changes.
     RebalanceScheduler.invokeRebalance(_dataAccessor, db);
+    Assert.assertTrue(_clusterVerifier.verify(1000));
+    verifyUserDefinedPreferenceLists(db, userDefinedPreferenceLists, userDefinedPartitions);
 
     while (userDefinedPartitions.size() > 0) {
-      Thread.sleep(100);
-      Assert.assertTrue(_clusterVerifier.verify());
-      verifyUserDefinedPreferenceLists(db, userDefinedPreferenceLists, userDefinedPartitions);
+      IdealState originIS = _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+      Set<String> nonUserDefinedPartitions = new HashSet<>(originIS.getPartitionSet());
+      nonUserDefinedPartitions.removeAll(userDefinedPartitions);
+
       removePartitionFromUserDefinedList(db, userDefinedPartitions);
+      Assert.assertTrue(_clusterVerifier.verify(1000));
+      verifyUserDefinedPreferenceLists(db, userDefinedPreferenceLists, userDefinedPartitions);
+      verifyNonUserDefinedAssignment(db, originIS, nonUserDefinedPartitions);
     }
   }
 
@@ -216,6 +227,18 @@ public class TestMixedModeAutoRebalance extends ZkIntegrationTestBase {
     }
   }
 
+  private void verifyNonUserDefinedAssignment(String db, IdealState originIS, Set<String> nonUserDefinedPartitions)
+      throws InterruptedException {
+    IdealState newIS = _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+    Assert.assertEquals(originIS.getPartitionSet(), newIS.getPartitionSet());
+    for (String p : newIS.getPartitionSet()) {
+      if (nonUserDefinedPartitions.contains(p)) {
+        // for non user defined partition, mapping should keep the same
+        Assert.assertEquals(newIS.getPreferenceList(p), originIS.getPreferenceList(p));
+      }
+    }
+  }
+
   private void removePartitionFromUserDefinedList(String db, List<String> userDefinedPartitions) {
     ResourceConfig resourceConfig = _configAccessor.getResourceConfig(CLUSTER_NAME, db);
     Map<String, List<String>> lists = resourceConfig.getPreferenceLists();


[3/4] helix git commit: Make ZkClient keep retrying connect on expiring.

Posted by jx...@apache.org.
Make ZkClient keep retrying connect on expiring.

This is to prevent Zk reconnect failure due to transient network issue.
With this change in ZkClient, HelixManager retry is no longer needed. Deprecate the related option item and simplify handleSessionEstablishmentError logic.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/05583449
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/05583449
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/05583449

Branch: refs/heads/master
Commit: 0558344963115414cbec15c752d44b4eec3cc6c4
Parents: c145c7c
Author: Jiajun Wang <jj...@linkedin.com>
Authored: Tue Jun 5 10:44:55 2018 -0700
Committer: jiajunwang <er...@gmail.com>
Committed: Thu Jul 12 13:45:17 2018 -0700

----------------------------------------------------------------------
 .../org/apache/helix/SystemPropertyKeys.java    |   1 +
 .../apache/helix/manager/zk/ZKHelixManager.java |  35 +--
 .../helix/manager/zk/zookeeper/ZkClient.java    |  39 ++-
 .../helix/util/ExponentialBackoffStrategy.java  |  33 +++
 .../helix/integration/TestZkReconnect.java      | 241 -------------------
 .../helix/manager/zk/TestZkReconnect.java       | 239 ++++++++++++++++++
 6 files changed, 310 insertions(+), 278 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/05583449/helix-core/src/main/java/org/apache/helix/SystemPropertyKeys.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/SystemPropertyKeys.java b/helix-core/src/main/java/org/apache/helix/SystemPropertyKeys.java
index 7af9635..aa8535b 100644
--- a/helix-core/src/main/java/org/apache/helix/SystemPropertyKeys.java
+++ b/helix-core/src/main/java/org/apache/helix/SystemPropertyKeys.java
@@ -15,6 +15,7 @@ public class SystemPropertyKeys {
 
   public static final String ZK_CONNECTION_TIMEOUT = "zk.connection.timeout";
 
+  @Deprecated
   public static final String ZK_REESTABLISHMENT_CONNECTION_TIMEOUT =
       "zk.connectionReEstablishment.timeout";
 

http://git-wip-us.apache.org/repos/asf/helix/blob/05583449/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index 4eb037a..5620ef6 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -68,7 +68,6 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
   public static final int FLAPPING_TIME_WINDOW = 300000; // Default to 300 sec
   public static final int MAX_DISCONNECT_THRESHOLD = 5;
   public static final String ALLOW_PARTICIPANT_AUTO_JOIN = "allowParticipantAutoJoin";
-  private static final int DEFAULT_CONNECTION_ESTABLISHMENT_RETRY_TIMEOUT = 120000; // Default to 120 sec
   private static final int DEFAULT_WAIT_CONNECTED_TIMEOUT = 10 * 1000;  // wait until connected for up to 10 seconds.
 
   protected final String _zkAddress;
@@ -78,7 +77,6 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
   private final int _waitForConnectedTimeout; // wait time for testing connect
   private final int _sessionTimeout; // client side session timeout, will be overridden by server timeout. Disconnect after timeout
   private final int _connectionInitTimeout; // client timeout to init connect
-  private final int _connectionRetryTimeout; // retry when connect being re-established
   private final List<PreConnectCallback> _preConnectCallbacks;
   protected final List<CallbackHandler> _handlers;
   private final HelixManagerProperties _properties;
@@ -233,10 +231,6 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
         .getSystemPropertyAsInt(SystemPropertyKeys.ZK_CONNECTION_TIMEOUT,
             ZkClient.DEFAULT_CONNECTION_TIMEOUT);
 
-    _connectionRetryTimeout = HelixUtil
-        .getSystemPropertyAsInt(SystemPropertyKeys.ZK_REESTABLISHMENT_CONNECTION_TIMEOUT,
-            DEFAULT_CONNECTION_ESTABLISHMENT_RETRY_TIMEOUT);
-
     _waitForConnectedTimeout = HelixUtil
         .getSystemPropertyAsInt(SystemPropertyKeys.ZK_WAIT_CONNECTED_TIMEOUT,
             DEFAULT_WAIT_CONNECTED_TIMEOUT);
@@ -1060,38 +1054,15 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
 
   @Override
   public void handleSessionEstablishmentError(Throwable error) throws Exception {
-    LOG.warn("Handling Session Establishment Error. Try to reset connection.", error);
+    LOG.warn("Handling Session Establishment Error. Disconnect Helix Manager.", error);
     // Cleanup ZKHelixManager
     if (_zkclient != null) {
       _zkclient.close();
     }
     disconnect();
-    // Try to establish connections
-    long operationStartTime = System.currentTimeMillis();
-    while (!isConnected()) {
-      try {
-        connect();
-        break;
-      } catch (Exception e) {
-        if (System.currentTimeMillis() - operationStartTime >= _connectionRetryTimeout) {
-          break;
-        }
-        // If retry fails, use the latest exception.
-        error = e;
-        LOG.error("Fail to reset connection after session establishment error happens. Will retry.", error);
-        // Yield until next retry.
-        Thread.yield();
-      }
-    }
 
-    if (!isConnected()) {
-      LOG.error("Fail to reset connection after session establishment error happens.", error);
-      // retry failed, trigger error handler
-      if (_stateListener != null) {
-        _stateListener.onDisconnected(this, error);
-      }
-    } else {
-      LOG.info("Connection is recovered.");
+    if (_stateListener != null) {
+      _stateListener.onDisconnected(this, error);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/05583449/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
index 0aa6587..0939826 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
@@ -45,6 +45,7 @@ import org.apache.helix.manager.zk.PathBasedZkSerializer;
 import org.apache.helix.manager.zk.ZkAsyncCallbacks;
 import org.apache.helix.manager.zk.zookeeper.ZkEventThread.ZkEvent;
 import org.apache.helix.monitoring.mbeans.ZkClientMonitor;
+import org.apache.helix.util.ExponentialBackoffStrategy;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.ConnectionLossException;
@@ -67,6 +68,7 @@ import org.slf4j.LoggerFactory;
  */
 public class ZkClient implements Watcher {
   private static Logger LOG = LoggerFactory.getLogger(ZkClient.class);
+  private static long MAX_RECONNECT_INTERVAL_MS = 30000; // 30 seconds
 
   protected final IZkConnection _connection;
   protected final long operationRetryTimeoutInMillis;
@@ -783,16 +785,40 @@ public class ZkClient implements Watcher {
     }
     fireStateChangedEvent(event.getState());
     if (event.getState() == KeeperState.Expired) {
+      reconnectOnExpiring();
+    }
+  }
+
+  private void reconnectOnExpiring() {
+    int retryCount = 0;
+    ExponentialBackoffStrategy retryStrategy =
+        new ExponentialBackoffStrategy(MAX_RECONNECT_INTERVAL_MS, true);
+
+    Exception reconnectException = new ZkException("Shutdown triggered.");
+    while (!_closed) {
       try {
         reconnect();
         fireNewSessionEvents();
-      } catch (final Exception e) {
-        LOG.warn(
-            "Unable to re-establish connection. Notifying consumer of the following exception: ",
-            e);
-        fireSessionEstablishmentError(e);
+        return;
+      } catch (ZkInterruptedException interrupt) {
+        reconnectException = interrupt;
+        break;
+      } catch (Exception e) {
+        reconnectException = e;
+        long waitInterval = retryStrategy.getNextWaitInterval(retryCount++);
+        LOG.warn("ZkClient reconnect on expiring failed. Will retry after {} ms", waitInterval, e);
+        try {
+          Thread.sleep(waitInterval);
+        } catch (InterruptedException ex) {
+          reconnectException = ex;
+          break;
+        }
       }
     }
+
+    LOG.info("Unable to re-establish connection. Notifying consumer of the following exception: ",
+        reconnectException);
+    fireSessionEstablishmentError(reconnectException);
   }
 
   private void fireNewSessionEvents() {
@@ -1437,6 +1463,9 @@ public class ZkClient implements Watcher {
    */
   public void connect(final long maxMsToWaitUntilConnected, Watcher watcher)
       throws ZkInterruptedException, ZkTimeoutException, IllegalStateException {
+    if (_closed) {
+      throw new IllegalStateException("ZkClient already closed!");
+    }
     boolean started = false;
     acquireEventLock();
     try {

http://git-wip-us.apache.org/repos/asf/helix/blob/05583449/helix-core/src/main/java/org/apache/helix/util/ExponentialBackoffStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/util/ExponentialBackoffStrategy.java b/helix-core/src/main/java/org/apache/helix/util/ExponentialBackoffStrategy.java
new file mode 100644
index 0000000..b1a66c9
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/util/ExponentialBackoffStrategy.java
@@ -0,0 +1,33 @@
+package org.apache.helix.util;
+
+import java.util.Random;
+
+public class ExponentialBackoffStrategy {
+  private final long INIT_RETRY_INTERVAL = 500;
+  private final long _maxRetryInterval;
+  private final boolean _addJitter;
+  private final Random _ran;
+
+  public ExponentialBackoffStrategy(long maxRetryInterval, boolean addJitter) {
+    _maxRetryInterval = maxRetryInterval;
+    _addJitter = addJitter;
+    _ran = new Random(System.currentTimeMillis());
+  }
+
+  public long getNextWaitInterval(int numberOfTriesFailed) {
+    double exponentialMultiplier = Math.pow(2.0, numberOfTriesFailed - 1);
+    double result = exponentialMultiplier * INIT_RETRY_INTERVAL;
+
+    if (_maxRetryInterval > 0 && result > _maxRetryInterval) {
+      result = _maxRetryInterval;
+    }
+
+    if (_addJitter) {
+      // Adding jitter so the real result would be 75% to 100% of the original result.
+      // Don't directly add jitter here, since it may exceed the max retry interval setup
+      result = result * (0.75 + _ran.nextDouble() % 0.25);
+    }
+
+    return (long) result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/05583449/helix-core/src/test/java/org/apache/helix/integration/TestZkReconnect.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestZkReconnect.java b/helix-core/src/test/java/org/apache/helix/integration/TestZkReconnect.java
deleted file mode 100644
index aa23257..0000000
--- a/helix-core/src/test/java/org/apache/helix/integration/TestZkReconnect.java
+++ /dev/null
@@ -1,241 +0,0 @@
-package org.apache.helix.integration;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.I0Itec.zkclient.ZkServer;
-import org.apache.helix.HelixAdmin;
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerFactory;
-import org.apache.helix.InstanceType;
-import org.apache.helix.NotificationContext;
-import org.apache.helix.TestHelper;
-import org.apache.helix.manager.zk.HelixManagerStateListener;
-import org.apache.helix.manager.zk.ZKHelixManager;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.Message;
-import org.apache.helix.participant.statemachine.StateModel;
-import org.apache.helix.participant.statemachine.StateModelFactory;
-import org.apache.helix.store.zk.ZkHelixPropertyStore;
-import org.apache.helix.tools.ClusterSetup;
-import org.apache.helix.tools.ClusterStateVerifier;
-import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-public class TestZkReconnect {
-  private static final Logger LOG = LoggerFactory.getLogger(TestZkReconnect.class);
-
-  @Test (enabled = false)
-  public void testZKReconnect() throws Exception {
-    final AtomicReference<ZkServer> zkServerRef = new AtomicReference<ZkServer>();
-    final int zkPort = TestHelper.getRandomPort();
-    final String zkAddr = String.format("localhost:%d", zkPort);
-    ZkServer zkServer = TestHelper.startZkServer(zkAddr);
-    zkServerRef.set(zkServer);
-
-    String className = TestHelper.getTestClassName();
-    String methodName = TestHelper.getTestMethodName();
-    String clusterName = className + "_" + methodName;
-
-    // Setup cluster
-    LOG.info("Setup clusters");
-    ClusterSetup clusterSetup = new ClusterSetup(zkAddr);
-    clusterSetup.addCluster(clusterName, true);
-
-    // Registers and starts controller
-    LOG.info("Starts controller");
-    HelixManager controller =
-        HelixManagerFactory.getZKHelixManager(clusterName, null, InstanceType.CONTROLLER, zkAddr);
-    controller.connect();
-
-    // Registers and starts participant
-    LOG.info("Starts participant");
-    String hostname = "localhost";
-    String instanceId = String.format("%s_%d", hostname, 1);
-    clusterSetup.addInstanceToCluster(clusterName, instanceId);
-    HelixManager participant =
-        HelixManagerFactory.getZKHelixManager(clusterName, instanceId, InstanceType.PARTICIPANT,
-            zkAddr);
-    participant.connect();
-
-    LOG.info("Register state machine");
-    final CountDownLatch latch = new CountDownLatch(1);
-    participant.getStateMachineEngine().registerStateModelFactory("OnlineOffline",
-        new StateModelFactory<StateModel>() {
-          @Override
-          public StateModel createNewStateModel(String resource, String stateUnitKey) {
-            return new SimpleStateModel(latch);
-          }
-        }, "test");
-
-    String resourceName = "test-resource";
-    LOG.info("Ideal state assignment");
-    HelixAdmin helixAdmin = participant.getClusterManagmentTool();
-    helixAdmin.addResource(clusterName, resourceName, 1, "OnlineOffline",
-        IdealState.RebalanceMode.CUSTOMIZED.toString());
-
-    IdealState idealState = helixAdmin.getResourceIdealState(clusterName, resourceName);
-    idealState.setReplicas("1");
-    idealState.setStateModelFactoryName("test");
-    idealState.setPartitionState(resourceName + "_0", instanceId, "ONLINE");
-
-    LOG.info("Shutdown ZK server");
-    TestHelper.stopZkServer(zkServerRef.get());
-    Executors.newSingleThreadScheduledExecutor().schedule(new Runnable() {
-
-      @Override
-      public void run() {
-        try {
-          LOG.info("Restart ZK server");
-          // zkServer.set(TestUtils.startZookeeper(zkDir, zkPort));
-          zkServerRef.set(TestHelper.startZkServer(zkAddr, null, false));
-        } catch (Exception e) {
-          LOG.error(e.getMessage(), e);
-        }
-      }
-    }, 2L, TimeUnit.SECONDS);
-
-    // future.get();
-
-    LOG.info("Before update ideal state");
-    helixAdmin.setResourceIdealState(clusterName, resourceName, idealState);
-    LOG.info("After update ideal state");
-
-    LOG.info("Wait for OFFLINE->ONLINE state transition");
-    try {
-      Assert.assertTrue(latch.await(15, TimeUnit.SECONDS));
-
-      // wait until stable state
-      boolean result =
-          ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(zkAddr,
-              clusterName));
-      Assert.assertTrue(result);
-
-    } finally {
-      participant.disconnect();
-      zkServerRef.get().shutdown();
-    }
-  }
-
-  @Test
-  public void testHelixManagerStateListenerCallback() throws Exception {
-    final int zkPort = TestHelper.getRandomPort();
-    final String zkAddr = String.format("localhost:%d", zkPort);
-    final ZkServer zkServer = TestHelper.startZkServer(zkAddr);
-
-    String className = TestHelper.getTestClassName();
-    String methodName = TestHelper.getTestMethodName();
-    final String clusterName = className + "_" + methodName;
-
-    // Init onDisconnectedFlag to check if callback is triggered
-    final AtomicReference<Boolean> onDisconnectedFlag = new AtomicReference<>(false);
-    final AtomicReference<Boolean> onConnectedFlag = new AtomicReference<>(false);
-
-    // Setup cluster
-    LOG.info("Setup clusters");
-    ClusterSetup clusterSetup = new ClusterSetup(zkAddr);
-    clusterSetup.addCluster(clusterName, true);
-    // For fast test, set short timeout
-    System.setProperty("zk.connection.timeout", "2000");
-    System.setProperty("zk.connectionReEstablishment.timeout", "1000");
-
-    // Registers and starts controller, register listener for disconnect handling
-    LOG.info("Starts controller");
-    final ZKHelixManager controller =
-        (ZKHelixManager) HelixManagerFactory.getZKHelixManager(clusterName, null, InstanceType.CONTROLLER, zkAddr,
-            new HelixManagerStateListener() {
-              @Override
-              public void onConnected(HelixManager helixManager) throws Exception {
-                Assert.assertEquals(helixManager.getClusterName(), clusterName);
-                onConnectedFlag.getAndSet(true);
-              }
-
-              @Override
-              public void onDisconnected(HelixManager helixManager, Throwable error) throws Exception {
-                Assert.assertEquals(helixManager.getClusterName(), clusterName);
-                onDisconnectedFlag.getAndSet(true);
-              }
-            });
-
-    try {
-      controller.connect();
-      Assert.assertTrue(onConnectedFlag.getAndSet(false));
-      ZkHelixPropertyStore propertyStore = controller.getHelixPropertyStore();
-
-      // 1. shutdown zkServer and check if handler trigger callback
-      zkServer.shutdown();
-
-      // Retry will fail, and onDisconnectedFlag should be set within onDisconnected handler
-      controller.handleSessionEstablishmentError(new Exception("For testing"));
-      Assert.assertTrue(onDisconnectedFlag.get());
-      Assert.assertFalse(onConnectedFlag.get());
-      Assert.assertFalse(controller.isConnected());
-
-      // Verify ZK is down
-      try {
-        propertyStore.get("/", null, 0);
-        Assert.fail("propertyStore should be disconnected.");
-      } catch (IllegalStateException e) {
-        // Expected exception
-        System.out.println(e.getMessage());
-      }
-
-      // 2. restart zkServer and check if handler will recover connection
-      onDisconnectedFlag.getAndSet(false);
-      zkServer.start();
-
-      // Retry will succeed, and onDisconnectedFlag should not be set
-      controller.handleSessionEstablishmentError(new Exception("For testing"));
-      Assert.assertFalse(onDisconnectedFlag.get());
-      Assert.assertTrue(onConnectedFlag.get());
-
-      // New propertyStore should be in good state
-      propertyStore = controller.getHelixPropertyStore();
-      propertyStore.get("/", null, 0);
-    } finally {
-      controller.disconnect();
-      zkServer.shutdown();
-      System.clearProperty("zk.connection.timeout");
-      System.clearProperty("zk.connectionReEstablishment.timeout");
-    }
-  }
-
-  public static final class SimpleStateModel extends StateModel {
-
-    private final CountDownLatch latch;
-
-    public SimpleStateModel(CountDownLatch latch) {
-      this.latch = latch;
-    }
-
-    public void onBecomeOnlineFromOffline(Message message, NotificationContext context) {
-      // LOG.info(HelixUtils.toString(message));
-      LOG.info("message: " + message);
-      latch.countDown();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/05583449/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkReconnect.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkReconnect.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkReconnect.java
new file mode 100644
index 0000000..e2d36b2
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkReconnect.java
@@ -0,0 +1,239 @@
+package org.apache.helix.manager.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.I0Itec.zkclient.ZkServer;
+import org.apache.helix.*;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestZkReconnect {
+  private static final Logger LOG = LoggerFactory.getLogger(TestZkReconnect.class);
+
+  @Test (enabled = false)
+  public void testZKReconnect() throws Exception {
+    final AtomicReference<ZkServer> zkServerRef = new AtomicReference<ZkServer>();
+    final int zkPort = TestHelper.getRandomPort();
+    final String zkAddr = String.format("localhost:%d", zkPort);
+    ZkServer zkServer = TestHelper.startZkServer(zkAddr);
+    zkServerRef.set(zkServer);
+
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+
+    // Setup cluster
+    LOG.info("Setup clusters");
+    ClusterSetup clusterSetup = new ClusterSetup(zkAddr);
+    clusterSetup.addCluster(clusterName, true);
+
+    // Registers and starts controller
+    LOG.info("Starts controller");
+    HelixManager controller =
+        HelixManagerFactory.getZKHelixManager(clusterName, null, InstanceType.CONTROLLER, zkAddr);
+    controller.connect();
+
+    // Registers and starts participant
+    LOG.info("Starts participant");
+    String hostname = "localhost";
+    String instanceId = String.format("%s_%d", hostname, 1);
+    clusterSetup.addInstanceToCluster(clusterName, instanceId);
+    HelixManager participant =
+        HelixManagerFactory.getZKHelixManager(clusterName, instanceId, InstanceType.PARTICIPANT,
+            zkAddr);
+    participant.connect();
+
+    LOG.info("Register state machine");
+    final CountDownLatch latch = new CountDownLatch(1);
+    participant.getStateMachineEngine().registerStateModelFactory("OnlineOffline",
+        new StateModelFactory<StateModel>() {
+          @Override
+          public StateModel createNewStateModel(String resource, String stateUnitKey) {
+            return new SimpleStateModel(latch);
+          }
+        }, "test");
+
+    String resourceName = "test-resource";
+    LOG.info("Ideal state assignment");
+    HelixAdmin helixAdmin = participant.getClusterManagmentTool();
+    helixAdmin.addResource(clusterName, resourceName, 1, "OnlineOffline",
+        IdealState.RebalanceMode.CUSTOMIZED.toString());
+
+    IdealState idealState = helixAdmin.getResourceIdealState(clusterName, resourceName);
+    idealState.setReplicas("1");
+    idealState.setStateModelFactoryName("test");
+    idealState.setPartitionState(resourceName + "_0", instanceId, "ONLINE");
+
+    LOG.info("Shutdown ZK server");
+    TestHelper.stopZkServer(zkServerRef.get());
+    Executors.newSingleThreadScheduledExecutor().schedule(new Runnable() {
+
+      @Override
+      public void run() {
+        try {
+          LOG.info("Restart ZK server");
+          // zkServer.set(TestUtils.startZookeeper(zkDir, zkPort));
+          zkServerRef.set(TestHelper.startZkServer(zkAddr, null, false));
+        } catch (Exception e) {
+          LOG.error(e.getMessage(), e);
+        }
+      }
+    }, 2L, TimeUnit.SECONDS);
+
+    // future.get();
+
+    LOG.info("Before update ideal state");
+    helixAdmin.setResourceIdealState(clusterName, resourceName, idealState);
+    LOG.info("After update ideal state");
+
+    LOG.info("Wait for OFFLINE->ONLINE state transition");
+    try {
+      Assert.assertTrue(latch.await(15, TimeUnit.SECONDS));
+
+      // wait until stable state
+      boolean result =
+          ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(zkAddr,
+              clusterName));
+      Assert.assertTrue(result);
+
+    } finally {
+      participant.disconnect();
+      zkServerRef.get().shutdown();
+    }
+  }
+
+  @Test
+  public void testHelixManagerStateListenerCallback() throws Exception {
+    final int zkPort = TestHelper.getRandomPort();
+    final String zkAddr = String.format("localhost:%d", zkPort);
+    final ZkServer zkServer = TestHelper.startZkServer(zkAddr);
+
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    final String clusterName = className + "_" + methodName;
+
+    // Init onDisconnectedFlag to check if callback is triggered
+    final AtomicReference<Boolean> onDisconnectedFlag = new AtomicReference<>(false);
+    final AtomicReference<Boolean> onConnectedFlag = new AtomicReference<>(false);
+
+    // Setup cluster
+    LOG.info("Setup clusters");
+    ClusterSetup clusterSetup = new ClusterSetup(zkAddr);
+    clusterSetup.addCluster(clusterName, true);
+    // For fast test, set short timeout
+    System.setProperty("zk.connection.timeout", "2000");
+    System.setProperty("zk.connectionReEstablishment.timeout", "1000");
+
+    // Registers and starts controller, register listener for disconnect handling
+    LOG.info("Starts controller");
+    final ZKHelixManager controller =
+        (ZKHelixManager) HelixManagerFactory.getZKHelixManager(clusterName, null, InstanceType.CONTROLLER, zkAddr,
+            new HelixManagerStateListener() {
+              @Override
+              public void onConnected(HelixManager helixManager) throws Exception {
+                Assert.assertEquals(helixManager.getClusterName(), clusterName);
+                onConnectedFlag.getAndSet(true);
+              }
+
+              @Override
+              public void onDisconnected(HelixManager helixManager, Throwable error) throws Exception {
+                Assert.assertEquals(helixManager.getClusterName(), clusterName);
+                onDisconnectedFlag.getAndSet(true);
+              }
+            });
+
+    try {
+      controller.connect();
+      // check onConnected() is triggered
+      Assert.assertTrue(onConnectedFlag.getAndSet(false));
+
+      // 1. shutdown zkServer and check if handler trigger callback
+      zkServer.shutdown();
+      // Simulate a retry in ZkClient that will not succeed
+      WatchedEvent event = new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.Expired, null);
+      controller._zkclient.process(event);
+      Assert.assertFalse(controller._zkclient.waitUntilConnected(10000, TimeUnit.MILLISECONDS));
+      // While retrying, onDisconnectedFlag = false
+      Assert.assertFalse(onDisconnectedFlag.get());
+
+      // 2. restart zkServer and check if handler will recover connection
+      zkServer.start();
+      Assert.assertTrue(controller._zkclient
+          .waitUntilConnected(ZkClient.DEFAULT_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS));
+      Assert.assertTrue(controller.isConnected());
+
+      // New propertyStore should be in good state
+      ZkHelixPropertyStore propertyStore = controller.getHelixPropertyStore();
+      propertyStore.get("/", null, 0);
+
+      // Inject expire to test handler
+      // onDisconnectedFlag should be set within onDisconnected handler
+      controller.handleSessionEstablishmentError(new Exception("For testing"));
+      Assert.assertTrue(onDisconnectedFlag.get());
+      Assert.assertFalse(onConnectedFlag.get());
+      Assert.assertFalse(controller.isConnected());
+
+      // Verify ZK is down
+      try {
+        controller.getHelixPropertyStore();
+      } catch (HelixException e) {
+        // Expected exception
+        System.out.println(e.getMessage());
+      }
+    } finally {
+      controller.disconnect();
+      zkServer.shutdown();
+      System.clearProperty("zk.connection.timeout");
+      System.clearProperty("zk.connectionReEstablishment.timeout");
+    }
+  }
+
+  public static final class SimpleStateModel extends StateModel {
+
+    private final CountDownLatch latch;
+
+    public SimpleStateModel(CountDownLatch latch) {
+      this.latch = latch;
+    }
+
+    public void onBecomeOnlineFromOffline(Message message, NotificationContext context) {
+      // LOG.info(HelixUtils.toString(message));
+      LOG.info("message: " + message);
+      latch.countDown();
+    }
+  }
+}