You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/01/25 21:48:59 UTC

[08/50] [abbrv] helix git commit: Cleanup ZkHelixManager and ZkClient config items to remove the ambiguous ones.

Cleanup ZkHelixManager and ZkClient config items to remove the ambiguous ones.

We notice that several items are using default values from different classes. And some default items are not set properly. Fix the problem to prevent any future config issues.
Also add comments to clear the usage of each settings.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/2f791a68
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/2f791a68
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/2f791a68

Branch: refs/heads/master
Commit: 2f791a688ae5aafa27e40d7e984a4fd7ce480499
Parents: 401ada6
Author: Jiajun Wang <jj...@linkedin.com>
Authored: Thu Nov 9 11:22:18 2017 -0800
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Wed Jan 24 18:30:55 2018 -0800

----------------------------------------------------------------------
 .../apache/helix/manager/zk/ZKHelixManager.java | 68 ++++++++------------
 .../org/apache/helix/manager/zk/ZkClient.java   | 34 ++++------
 2 files changed, 39 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/2f791a68/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index 430c56b..60b9a3f 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -19,43 +19,21 @@ package org.apache.helix.manager.zk;
  * under the License.
  */
 
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.*;
-import java.util.concurrent.TimeUnit;
-import javax.management.JMException;
-
 import org.I0Itec.zkclient.IZkStateListener;
 import org.I0Itec.zkclient.ZkConnection;
-import org.apache.helix.BaseDataAccessor;
-import org.apache.helix.ClusterMessagingService;
-import org.apache.helix.ConfigAccessor;
-import org.apache.helix.api.listeners.ClusterConfigChangeListener;
+import org.apache.helix.*;
+import org.apache.helix.HelixConstants.ChangeType;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.api.listeners.*;
 import org.apache.helix.api.listeners.ConfigChangeListener;
 import org.apache.helix.api.listeners.ControllerChangeListener;
 import org.apache.helix.api.listeners.CurrentStateChangeListener;
 import org.apache.helix.api.listeners.ExternalViewChangeListener;
-import org.apache.helix.HelixAdmin;
-import org.apache.helix.HelixConstants.ChangeType;
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixException;
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerProperties;
-import org.apache.helix.HelixTimerTask;
 import org.apache.helix.api.listeners.IdealStateChangeListener;
 import org.apache.helix.api.listeners.InstanceConfigChangeListener;
-import org.apache.helix.InstanceType;
 import org.apache.helix.api.listeners.LiveInstanceChangeListener;
-import org.apache.helix.LiveInstanceInfoProvider;
 import org.apache.helix.api.listeners.MessageListener;
-import org.apache.helix.PreConnectCallback;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.PropertyPathBuilder;
-import org.apache.helix.PropertyType;
-import org.apache.helix.api.listeners.ResourceConfigChangeListener;
 import org.apache.helix.api.listeners.ScopedConfigChangeListener;
-import org.apache.helix.ZNRecord;
 import org.apache.helix.controller.GenericHelixController;
 import org.apache.helix.healthcheck.ParticipantHealthReportCollector;
 import org.apache.helix.healthcheck.ParticipantHealthReportCollectorImpl;
@@ -70,11 +48,17 @@ import org.apache.helix.participant.HelixStateMachineEngine;
 import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.store.zk.AutoFallbackPropertyStore;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.zookeeper.Watcher.Event.EventType;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.apache.zookeeper.ZooKeeper.States;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.management.JMException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
 
 
 public class ZKHelixManager implements HelixManager, IZkStateListener {
@@ -90,10 +74,10 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
   private final String _clusterName;
   private final String _instanceName;
   private final InstanceType _instanceType;
-  private final int _sessionTimeout;
-  private final int _clientConnectionTimeout;
-  private final int _connectionRetryTimeout;
-  private final int _waitForConnectedTimeout;
+  private final int _waitForConnectedTimeout; // wait time for testing connect
+  private final int _sessionTimeout; // client side session timeout, will be overridden by server timeout. Disconnect after timeout
+  private final int _connectionInitTimeout; // client timeout to init connect
+  private final int _connectionRetryTimeout; // retry when connect being re-established
   private final List<PreConnectCallback> _preConnectCallbacks;
   protected final List<CallbackHandler> _handlers;
   private final HelixManagerProperties _properties;
@@ -241,7 +225,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
 
     _sessionTimeout = getSystemPropertyAsInt("zk.session.timeout", ZkClient.DEFAULT_SESSION_TIMEOUT);
 
-    _clientConnectionTimeout = getSystemPropertyAsInt("zk.connection.timeout", ZkClient.DEFAULT_CONNECTION_TIMEOUT);
+    _connectionInitTimeout = getSystemPropertyAsInt("zk.connection.timeout", ZkClient.DEFAULT_CONNECTION_TIMEOUT);
 
     _connectionRetryTimeout = getSystemPropertyAsInt("zk.connectionReEstablishment.timeout",
         DEFAULT_CONNECTION_ESTABLISHMENT_RETRY_TIMEOUT);
@@ -309,7 +293,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
         + _clusterName + " by instance: " + _instanceName);
 
     synchronized (this) {
-      List<CallbackHandler> toRemove = new ArrayList<CallbackHandler>();
+      List<CallbackHandler> toRemove = new ArrayList<>();
       for (CallbackHandler handler : _handlers) {
         // compare property-key path and listener reference
         if (handler.getPath().equals(key.getPath()) && handler.getListener().equals(listener)) {
@@ -346,13 +330,15 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
 
     boolean isConnected = isConnected();
     if (!isConnected && timeout > 0) {
-      LOG.warn("zkClient to " + _zkAddress + " is not connected, wait for " + timeout + "ms.");
-      isConnected = _zkclient.waitUntilConnected(timeout, TimeUnit.MILLISECONDS);
+      LOG.warn(
+          "zkClient to " + _zkAddress + " is not connected, wait for " + _waitForConnectedTimeout
+              + "ms.");
+      isConnected = _zkclient.waitUntilConnected(_waitForConnectedTimeout, TimeUnit.MILLISECONDS);
     }
 
     if (!isConnected) {
-      LOG.error("zkClient is not connected after waiting " +
-          timeout + "ms." + ", clusterName: " + _clusterName + ", zkAddress: " + _zkAddress);
+      LOG.error("zkClient is not connected after waiting " + timeout + "ms."
+          + ", clusterName: " + _clusterName + ", zkAddress: " + _zkAddress);
       throw new HelixException(
           "HelixManager is not connected within retry timeout for cluster " + _clusterName);
     }
@@ -589,7 +575,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
   }
 
   BaseDataAccessor<ZNRecord> createBaseDataAccessor() {
-    ZkBaseDataAccessor<ZNRecord> baseDataAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkclient);
+    ZkBaseDataAccessor<ZNRecord> baseDataAccessor = new ZkBaseDataAccessor<>(_zkclient);
 
     return baseDataAccessor;
   }
@@ -610,7 +596,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
 
     ZkClient.Builder zkClientBuilder = new ZkClient.Builder();
     zkClientBuilder.setZkServer(_zkAddress).setSessionTimeout(_sessionTimeout)
-        .setConnectionTimeout(_clientConnectionTimeout).setZkSerializer(zkSerializer)
+        .setConnectionTimeout(_connectionInitTimeout).setZkSerializer(zkSerializer)
         .setMonitorType(_instanceType.name())
         .setMonitorKey(_clusterName)
         .setMonitorInstanceName(_instanceName)
@@ -633,7 +619,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
     _zkclient.subscribeStateChanges(this);
     while (retryCount < 3) {
       try {
-        _zkclient.waitUntilConnected(_sessionTimeout, TimeUnit.MILLISECONDS);
+        _zkclient.waitUntilConnected(_connectionInitTimeout, TimeUnit.MILLISECONDS);
         handleStateChanged(KeeperState.SyncConnected);
         handleNewSession();
         break;

http://git-wip-us.apache.org/repos/asf/helix/blob/2f791a68/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
index 7229914..c9f7ccf 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
@@ -19,10 +19,6 @@ package org.apache.helix.manager.zk;
  * under the License.
  */
 
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.Callable;
-
 import org.I0Itec.zkclient.IZkConnection;
 import org.I0Itec.zkclient.ZkConnection;
 import org.I0Itec.zkclient.exception.ZkException;
@@ -32,20 +28,19 @@ import org.I0Itec.zkclient.serialize.SerializableSerializer;
 import org.I0Itec.zkclient.serialize.ZkSerializer;
 import org.apache.helix.HelixException;
 import org.apache.helix.ZNRecord;
-import org.apache.helix.manager.zk.ZkAsyncCallbacks.CreateCallbackHandler;
-import org.apache.helix.manager.zk.ZkAsyncCallbacks.DeleteCallbackHandler;
-import org.apache.helix.manager.zk.ZkAsyncCallbacks.ExistsCallbackHandler;
-import org.apache.helix.manager.zk.ZkAsyncCallbacks.GetDataCallbackHandler;
-import org.apache.helix.manager.zk.ZkAsyncCallbacks.SetDataCallbackHandler;
+import org.apache.helix.manager.zk.ZkAsyncCallbacks.*;
 import org.apache.helix.monitoring.mbeans.ZkClientMonitor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.management.JMException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.Callable;
 
 /**
  * ZKClient does not provide some functionalities, this will be used for quick fixes if
@@ -57,8 +52,6 @@ public class ZkClient extends org.I0Itec.zkclient.ZkClient {
   private static Logger LOG = LoggerFactory.getLogger(ZkClient.class);
   public static final int DEFAULT_CONNECTION_TIMEOUT = 60 * 1000;
   public static final int DEFAULT_SESSION_TIMEOUT = 30 * 1000;
-  // public static String sessionId;
-  // public static String sessionPassword;
 
   private PathBasedZkSerializer _zkSerializer;
   private ZkClientMonitor _monitor;
@@ -83,7 +76,7 @@ public class ZkClient extends org.I0Itec.zkclient.ZkClient {
   }
 
   public ZkClient(String zkServers, String monitorType, String monitorKey) {
-    this(new ZkConnection(zkServers), Integer.MAX_VALUE,
+    this(new ZkConnection(zkServers, DEFAULT_SESSION_TIMEOUT), Integer.MAX_VALUE,
         new BasicZkSerializer(new SerializableSerializer()), monitorType, monitorKey);
   }
 
@@ -126,7 +119,8 @@ public class ZkClient extends org.I0Itec.zkclient.ZkClient {
   }
 
   public ZkClient(String zkServers, int connectionTimeout) {
-    this(new ZkConnection(zkServers), connectionTimeout, new SerializableSerializer());
+    this(new ZkConnection(zkServers, DEFAULT_SESSION_TIMEOUT), connectionTimeout,
+        new SerializableSerializer());
   }
 
   public ZkClient(String zkServers) {
@@ -598,12 +592,12 @@ public class ZkClient extends org.I0Itec.zkclient.ZkClient {
   public static class Builder {
     IZkConnection _connection;
     String _zkServer;
-    Integer _sessionTimeout;
 
     PathBasedZkSerializer _zkSerializer;
 
     long _operationRetryTimeout = -1L;
-    int _connectionTimeout = Integer.MAX_VALUE;
+    int _connectionTimeout = DEFAULT_CONNECTION_TIMEOUT;
+    int _sessionTimeout = DEFAULT_SESSION_TIMEOUT;
 
     String _monitorType;
     String _monitorKey;
@@ -684,11 +678,7 @@ public class ZkClient extends org.I0Itec.zkclient.ZkClient {
           throw new HelixException(
               "Failed to build ZkClient since no connection or ZK server address is specified.");
         } else {
-          if (_sessionTimeout == null) {
-            _connection = new ZkConnection(_zkServer);
-          } else {
-            _connection = new ZkConnection(_zkServer, _sessionTimeout);
-          }
+          _connection = new ZkConnection(_zkServer, _sessionTimeout);
         }
       }