You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2017/09/20 18:23:22 UTC

helix git commit: Fix disconnected zkConnection issue.

Repository: helix
Updated Branches:
  refs/heads/master 78ed261e7 -> 3da3e319a


Fix disconnected zkConnection issue.

One issue is found that when zkConnection may be using an invalid zookeeper object (null). And related calls will get NPE error.
Affected Helix components are ZKHelixManager, ZkHelixPropertyStore and other zk related classes.
For fixing this issue:
1. Override retryUntilConnected() in Helix ZkClient to check the connection before trigger callbacks. This will prevent NPE. But user will still need to try-catch IllegalStateException, and re-create a ZkClient if necessary.
2. For ZKHelixManager, implement handleSessionEstablishmentError to retry establishing a new connection. If retry fails, Helix invokes a user registered state handler.
3. Add unit test for simulating connection error and test if error handler can recover the connection or trigger user registered callback.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/3da3e319
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/3da3e319
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/3da3e319

Branch: refs/heads/master
Commit: 3da3e319a4a821add6291c385dae41b633fad078
Parents: 78ed261
Author: Jiajun Wang <jj...@linkedin.com>
Authored: Tue Jul 25 16:50:22 2017 -0700
Committer: Jiajun Wang <jj...@linkedin.com>
Committed: Wed Sep 20 11:20:01 2017 -0700

----------------------------------------------------------------------
 .../org/apache/helix/HelixManagerFactory.java   | 16 ++++
 .../manager/zk/HelixManagerStateListener.java   | 43 +++++++++++
 .../apache/helix/manager/zk/ZKHelixManager.java | 68 +++++++++++++++--
 .../org/apache/helix/manager/zk/ZkClient.java   | 14 ++++
 .../helix/integration/TestZkReconnect.java      | 77 +++++++++++++++++++-
 5 files changed, 210 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/3da3e319/helix-core/src/main/java/org/apache/helix/HelixManagerFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixManagerFactory.java b/helix-core/src/main/java/org/apache/helix/HelixManagerFactory.java
index 1847546..b26f9d8 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixManagerFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixManagerFactory.java
@@ -25,6 +25,7 @@ package org.apache.helix;
  * for zk-based cluster managers, the getZKXXX(..zkClient) that takes a zkClient parameter
  *   are intended for session expiry test purpose
  */
+import org.apache.helix.manager.zk.HelixManagerStateListener;
 import org.apache.helix.manager.zk.ZKHelixManager;
 import org.apache.log4j.Logger;
 
@@ -48,4 +49,19 @@ public final class HelixManagerFactory {
     return new ZKHelixManager(clusterName, instanceName, type, zkAddr);
   }
 
+  /**
+   * Construct a zk-based cluster manager that enforces all types (PARTICIPANT, CONTROLLER, and
+   * SPECTATOR) to have a name
+   * @param clusterName
+   * @param instanceName
+   * @param type
+   * @param zkAddr
+   * @param stateListener
+   * @return a HelixManager backed by Zookeeper
+   */
+  public static HelixManager getZKHelixManager(String clusterName, String instanceName,
+      InstanceType type, String zkAddr, HelixManagerStateListener stateListener) {
+    return new ZKHelixManager(clusterName, instanceName, type, zkAddr, stateListener);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/3da3e319/helix-core/src/main/java/org/apache/helix/manager/zk/HelixManagerStateListener.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/HelixManagerStateListener.java b/helix-core/src/main/java/org/apache/helix/manager/zk/HelixManagerStateListener.java
new file mode 100644
index 0000000..0277bfd
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/HelixManagerStateListener.java
@@ -0,0 +1,43 @@
+package org.apache.helix.manager.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixManager;
+
+
+public interface HelixManagerStateListener {
+
+  /**
+   * Placeholder method for ensure backward compatible.
+   * User will need to implement this method when Helix support async connecting to zookeeper.
+   *
+   * Invoked when the HelixManager connection to zookeeper is established
+   * @param helixManager HelixManager that is successfully connected
+   */
+  void onConnected(HelixManager helixManager) throws Exception;
+
+  /**
+   * Invoked when the HelixManager connection to zookeeper is disconnected
+   *
+   * @param helixManager HelixManager that fails to be connected
+   * @param error connection error
+   */
+  void onDisconnected(HelixManager helixManager, Throwable error) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/3da3e319/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index aa2efe6..670a65e 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -82,15 +82,19 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
   public static final int FLAPPING_TIME_WINDOW = 300000; // Default to 300 sec
   public static final int MAX_DISCONNECT_THRESHOLD = 5;
   public static final String ALLOW_PARTICIPANT_AUTO_JOIN = "allowParticipantAutoJoin";
+  private static final int DEFAULT_CONNECTION_ESTABLISHMENT_RETRY_TIMEOUT = 120000; // Default to 120 sec
 
   protected final String _zkAddress;
   private final String _clusterName;
   private final String _instanceName;
   private final InstanceType _instanceType;
   private final int _sessionTimeout;
+  private final int _clientConnectionTimeout;
+  private final int _connectionRetryTimeout;
   private final List<PreConnectCallback> _preConnectCallbacks;
   protected final List<CallbackHandler> _handlers;
   private final HelixManagerProperties _properties;
+  private final HelixManagerStateListener _stateListener;
 
   /**
    * helix version#
@@ -175,6 +179,11 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
 
   public ZKHelixManager(String clusterName, String instanceName, InstanceType instanceType,
       String zkAddress) {
+    this(clusterName, instanceName, instanceType, zkAddress, null);
+  }
+
+  public ZKHelixManager(String clusterName, String instanceName, InstanceType instanceType,
+      String zkAddress, HelixManagerStateListener stateListener) {
 
     LOG.info("Create a zk-based cluster manager. zkSvr: " + zkAddress + ", clusterName: "
         + clusterName + ", instanceName: " + instanceName + ", type: " + instanceType);
@@ -203,6 +212,8 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
     _keyBuilder = new Builder(clusterName);
     _messagingService = new DefaultMessagingService(this);
 
+    _stateListener = stateListener;
+
     /**
      * use system property if available
      */
@@ -214,8 +225,12 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
         getSystemPropertyAsInt("helixmanager.maxDisconnectThreshold",
             ZKHelixManager.MAX_DISCONNECT_THRESHOLD);
 
-    _sessionTimeout =
-        getSystemPropertyAsInt("zk.session.timeout", ZkClient.DEFAULT_SESSION_TIMEOUT);
+    _sessionTimeout = getSystemPropertyAsInt("zk.session.timeout", ZkClient.DEFAULT_SESSION_TIMEOUT);
+
+    _clientConnectionTimeout = getSystemPropertyAsInt("zk.connection.timeout", ZkClient.DEFAULT_CONNECTION_TIMEOUT);
+
+    _connectionRetryTimeout =
+        getSystemPropertyAsInt("zk.connectionReEstablishment.timeout", DEFAULT_CONNECTION_ESTABLISHMENT_RETRY_TIMEOUT);
 
     /**
      * instance type specific init
@@ -471,7 +486,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
         ChainedPathZkSerializer.builder(new ZNRecordStreamingSerializer()).build();
 
     _zkclient =
-        new ZkClient(_zkAddress, _sessionTimeout, ZkClient.DEFAULT_CONNECTION_TIMEOUT, zkSerializer);
+        new ZkClient(_zkAddress, _sessionTimeout, _clientConnectionTimeout, zkSerializer);
 
     _baseDataAccessor = createBaseDataAccessor();
 
@@ -582,6 +597,8 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
         _participantManager = null;
       }
 
+      _helixPropertyStore = null;
+
       _zkclient.close();
       _zkclient = null;
       _sessionStartTime = null;
@@ -813,9 +830,10 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
        */
       _disconnectTimeHistory.add(System.currentTimeMillis());
       if (isFlapping()) {
-        LOG.error("instanceName: " + _instanceName + " is flapping. disconnect it. "
+        String errorMsg = "instanceName: " + _instanceName + " is flapping. disconnect it. "
             + " maxDisconnectThreshold: " + _maxDisconnectThreshold + " disconnects in "
-            + _flappingTimeWindowMs + "ms.");
+            + _flappingTimeWindowMs + "ms.";
+        LOG.error(errorMsg);
 
         // Only disable the instance when it's instance type is PARTICIPANT
         if (_instanceType.equals(InstanceType.PARTICIPANT)) {
@@ -824,6 +842,9 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
           getClusterManagmentTool().enableInstance(_clusterName, _instanceName, false);
         }
         disconnect();
+        if (_stateListener != null) {
+          _stateListener.onDisconnected(this, new HelixException(errorMsg));
+        }
       }
       break;
     case Expired:
@@ -920,12 +941,45 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
   }
 
   @Override
-  public void handleSessionEstablishmentError(Throwable var1) throws Exception {
+  public void handleSessionEstablishmentError(Throwable error) throws Exception {
+    LOG.warn("Handling Session Establishment Error. Try to reset connection.", error);
+    // Close currently disconnected ZkClient before cleanup
+    if (_zkclient != null) {
+      _zkclient.close();
+    }
+    // Cleanup ZKHelixManager
+    disconnect();
+    // Try to establish connections
+    long operationStartTime = System.currentTimeMillis();
+    while (!isConnected()) {
+      try {
+        connect();
+        break;
+      } catch (Exception e) {
+        if (System.currentTimeMillis() - operationStartTime >= _connectionRetryTimeout) {
+          break;
+        }
+        // If retry fails, use the latest exception.
+        error = e;
+        LOG.error("Fail to reset connection after session establishment error happens. Will retry.", error);
+        // Yield until next retry.
+        Thread.yield();
+      }
+    }
+
+    if (!isConnected()) {
+      LOG.error("Fail to reset connection after session establishment error happens.", error);
+      // retry failed, trigger error handler
+      if (_stateListener != null) {
+        _stateListener.onDisconnected(this, error);
+      }
+    } else {
+      LOG.info("Connection is recovered.");
+    }
   }
 
   @Override
   public Long getSessionStartTime() {
     return _sessionStartTime;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/3da3e319/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
index a4edc91..0a61e82 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
@@ -437,4 +437,18 @@ public class ZkClient extends org.I0Itec.zkclient.ZkClient {
     });
   }
 
+  public <T> T retryUntilConnected(final Callable<T> callable) {
+    final ZkConnection zkConnection = (ZkConnection) getConnection();
+    return super.retryUntilConnected(new Callable<T>() {
+      @Override
+      public T call() throws Exception {
+        // Validate that the connection is not null before trigger callback
+        if (zkConnection == null || zkConnection.getZookeeper() == null) {
+          throw new IllegalStateException(
+              "ZkConnection is in invalid state! Please close this ZkClient and create new client.");
+        }
+        return callable.call();
+      }
+    });
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/3da3e319/helix-core/src/test/java/org/apache/helix/integration/TestZkReconnect.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestZkReconnect.java b/helix-core/src/test/java/org/apache/helix/integration/TestZkReconnect.java
index 3475049..8288608 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestZkReconnect.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestZkReconnect.java
@@ -31,10 +31,13 @@ import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.TestHelper;
+import org.apache.helix.manager.zk.HelixManagerStateListener;
+import org.apache.helix.manager.zk.ZKHelixManager;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Message;
 import org.apache.helix.participant.statemachine.StateModel;
 import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.tools.ClusterVerifiers.ClusterStateVerifier;
 import org.apache.helix.tools.ClusterVerifiers.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
@@ -137,6 +140,79 @@ public class TestZkReconnect {
     }
   }
 
+  @Test
+  public void testZKDisconnectCallback() throws Exception {
+    final int zkPort = TestHelper.getRandomPort();
+    final String zkAddr = String.format("localhost:%d", zkPort);
+    final ZkServer zkServer = TestHelper.startZkServer(zkAddr);
+
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    final String clusterName = className + "_" + methodName;
+
+    // Init flag to check if callback is triggered
+    final AtomicReference<Boolean> flag = new AtomicReference<Boolean>(false);
+
+    // Setup cluster
+    LOG.info("Setup clusters");
+    ClusterSetup clusterSetup = new ClusterSetup(zkAddr);
+    clusterSetup.addCluster(clusterName, true);
+    // For fast test, set short timeout
+    System.setProperty("zk.connection.timeout", "2000");
+    System.setProperty("zk.connectionReEstablishment.timeout", "1000");
+
+    // Registers and starts controller, register listener for disconnect handling
+    LOG.info("Starts controller");
+    final ZKHelixManager controller =
+        (ZKHelixManager) HelixManagerFactory.getZKHelixManager(clusterName, null, InstanceType.CONTROLLER, zkAddr,
+            new HelixManagerStateListener() {
+              @Override
+              public void onConnected(HelixManager helixManager) throws Exception {
+                return;
+              }
+
+              @Override
+              public void onDisconnected(HelixManager helixManager, Throwable error) throws Exception {
+                Assert.assertEquals(helixManager.getClusterName(), clusterName);
+                flag.getAndSet(true);
+              }
+            });
+
+    try {
+      controller.connect();
+      ZkHelixPropertyStore propertyStore = controller.getHelixPropertyStore();
+
+      // 1. shutdown zkServer and check if handler trigger callback
+      zkServer.shutdown();
+      // Retry will fail, and flag should be set within onDisconnected handler
+      controller.handleSessionEstablishmentError(new Exception("For testing"));
+      Assert.assertTrue(flag.get());
+
+      try {
+        propertyStore.get("/", null, 0);
+        Assert.fail("propertyStore should be disconnected.");
+      } catch (IllegalStateException e) {
+        // Expected exception
+        System.out.println(e.getMessage());
+      }
+
+      // 2. restart zkServer and check if handler will recover connection
+      flag.getAndSet(false);
+      zkServer.start();
+      // Retry will succeed, and flag should not be set
+      controller.handleSessionEstablishmentError(new Exception("For testing"));
+      Assert.assertFalse(flag.get());
+      // New propertyStore should be in good state
+      propertyStore = controller.getHelixPropertyStore();
+      propertyStore.get("/", null, 0);
+    } finally {
+      controller.disconnect();
+      zkServer.shutdown();
+      System.clearProperty("zk.connection.timeout");
+      System.clearProperty("zk.connectionReEstablishment.timeout");
+    }
+  }
+
   public static final class SimpleStateModel extends StateModel {
 
     private final CountDownLatch latch;
@@ -151,5 +227,4 @@ public class TestZkReconnect {
       latch.countDown();
     }
   }
-
 }