You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2013/11/12 02:18:59 UTC

[1/2] git commit: [HELIX-296] HelixConnection in 0.7.0 does not remove LiveInstance znode

Updated Branches:
  refs/heads/master 55c935169 -> 7c444e506


[HELIX-296] HelixConnection in 0.7.0 does not remove LiveInstance znode


Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/72d879ed
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/72d879ed
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/72d879ed

Branch: refs/heads/master
Commit: 72d879edfced69f5ca9af6585592caa891d2b0c7
Parents: ec36112
Author: zzhang <zz...@apache.org>
Authored: Mon Nov 11 17:18:27 2013 -0800
Committer: zzhang <zz...@apache.org>
Committed: Mon Nov 11 17:18:27 2013 -0800

----------------------------------------------------------------------
 .../org/apache/helix/HelixAutoController.java   |  19 +
 .../java/org/apache/helix/HelixConnection.java  |  27 ++
 .../helix/HelixConnectionStateListener.java     |  19 +
 .../java/org/apache/helix/HelixController.java  |  19 +
 .../java/org/apache/helix/HelixParticipant.java |  19 +
 .../main/java/org/apache/helix/HelixRole.java   |  19 +
 .../java/org/apache/helix/HelixService.java     |  19 +
 .../manager/zk/HelixConnectionAdaptor.java      |  19 +
 .../helix/manager/zk/ZkCallbackHandler.java     | 460 +++++++++++++++++++
 .../org/apache/helix/manager/zk/ZkClient.java   |   7 +-
 .../helix/manager/zk/ZkHelixAutoController.java |  19 +
 .../helix/manager/zk/ZkHelixConnection.java     |  56 ++-
 .../helix/manager/zk/ZkHelixController.java     |  19 +
 .../helix/manager/zk/ZkHelixLeaderElection.java |  37 +-
 .../helix/manager/zk/ZkHelixParticipant.java    |  23 +
 .../apache/helix/monitoring/StatusDumpTask.java |  19 +
 .../statemachine/HelixStateModelFactory.java    |  19 +
 .../HelixStateModelFactoryAdaptor.java          |  19 +
 .../helix/integration/TestHelixConnection.java  |  20 +
 .../manager/zk/TestZkHelixAutoController.java   | 120 +++++
 .../helix/manager/zk/TestZkHelixController.java | 160 +++++++
 .../manager/zk/TestZkHelixParticipant.java      | 103 +++++
 .../helix/examples/LogicalModelExample.java     |   3 +-
 23 files changed, 1222 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/72d879ed/helix-core/src/main/java/org/apache/helix/HelixAutoController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixAutoController.java b/helix-core/src/main/java/org/apache/helix/HelixAutoController.java
index 7ad9218..91ec809 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixAutoController.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixAutoController.java
@@ -1,5 +1,24 @@
 package org.apache.helix;
 
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 import org.apache.helix.api.id.ControllerId;
 import org.apache.helix.participant.StateMachineEngine;
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/72d879ed/helix-core/src/main/java/org/apache/helix/HelixConnection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixConnection.java b/helix-core/src/main/java/org/apache/helix/HelixConnection.java
index 7551673..9d93c97 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixConnection.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixConnection.java
@@ -1,5 +1,24 @@
 package org.apache.helix;
 
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 import org.apache.helix.api.accessor.ClusterAccessor;
 import org.apache.helix.api.accessor.ParticipantAccessor;
 import org.apache.helix.api.accessor.ResourceAccessor;
@@ -61,6 +80,14 @@ public interface HelixConnection {
   HelixController createController(ClusterId clusterId, ControllerId controllerId);
 
   /**
+   * create an autonomous helix-controller
+   * @param clusterId
+   * @param controllerId
+   * @return
+   */
+  HelixAutoController createAutoController(ClusterId clusterId, ControllerId controllerId);
+
+  /**
    * create a cluster-accessor
    * @param clusterId
    * @return cluster-accessor

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/72d879ed/helix-core/src/main/java/org/apache/helix/HelixConnectionStateListener.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixConnectionStateListener.java b/helix-core/src/main/java/org/apache/helix/HelixConnectionStateListener.java
index 13172d0..044a3a7 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixConnectionStateListener.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixConnectionStateListener.java
@@ -1,5 +1,24 @@
 package org.apache.helix;
 
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 public interface HelixConnectionStateListener {
   /**
    * called after connection is established

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/72d879ed/helix-core/src/main/java/org/apache/helix/HelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixController.java b/helix-core/src/main/java/org/apache/helix/HelixController.java
index ce47e3d..9565cfb 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixController.java
@@ -1,5 +1,24 @@
 package org.apache.helix;
 
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 import org.apache.helix.api.id.ControllerId;
 
 public interface HelixController extends HelixRole, HelixService, HelixConnectionStateListener {

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/72d879ed/helix-core/src/main/java/org/apache/helix/HelixParticipant.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixParticipant.java b/helix-core/src/main/java/org/apache/helix/HelixParticipant.java
index 9002b15..678da4a 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixParticipant.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixParticipant.java
@@ -1,5 +1,24 @@
 package org.apache.helix;
 
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.participant.StateMachineEngine;
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/72d879ed/helix-core/src/main/java/org/apache/helix/HelixRole.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixRole.java b/helix-core/src/main/java/org/apache/helix/HelixRole.java
index 9e112d1..ffcb700 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixRole.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixRole.java
@@ -1,5 +1,24 @@
 package org.apache.helix;
 
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 import org.apache.helix.api.id.ClusterId;
 import org.apache.helix.api.id.Id;
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/72d879ed/helix-core/src/main/java/org/apache/helix/HelixService.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixService.java b/helix-core/src/main/java/org/apache/helix/HelixService.java
index 33cc8e5..a1ce0ec 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixService.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixService.java
@@ -1,5 +1,24 @@
 package org.apache.helix;
 
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 /**
  * Operational methods of a helix role
  */

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/72d879ed/helix-core/src/main/java/org/apache/helix/manager/zk/HelixConnectionAdaptor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/HelixConnectionAdaptor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/HelixConnectionAdaptor.java
index 869563a..487c3a6 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/HelixConnectionAdaptor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/HelixConnectionAdaptor.java
@@ -1,5 +1,24 @@
 package org.apache.helix.manager.zk;
 
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 import org.apache.helix.ClusterMessagingService;
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.ConfigChangeListener;

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/72d879ed/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCallbackHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCallbackHandler.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCallbackHandler.java
new file mode 100644
index 0000000..e0a0b33
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCallbackHandler.java
@@ -0,0 +1,460 @@
+package org.apache.helix.manager.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import static org.apache.helix.HelixConstants.ChangeType.CONFIG;
+import static org.apache.helix.HelixConstants.ChangeType.CURRENT_STATE;
+import static org.apache.helix.HelixConstants.ChangeType.EXTERNAL_VIEW;
+import static org.apache.helix.HelixConstants.ChangeType.IDEAL_STATE;
+import static org.apache.helix.HelixConstants.ChangeType.LIVE_INSTANCE;
+import static org.apache.helix.HelixConstants.ChangeType.MESSAGE;
+import static org.apache.helix.HelixConstants.ChangeType.MESSAGES_CONTROLLER;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.I0Itec.zkclient.IZkChildListener;
+import org.I0Itec.zkclient.IZkDataListener;
+import org.I0Itec.zkclient.exception.ZkNoNodeException;
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.ConfigChangeListener;
+import org.apache.helix.ControllerChangeListener;
+import org.apache.helix.CurrentStateChangeListener;
+import org.apache.helix.ExternalViewChangeListener;
+import org.apache.helix.HealthStateChangeListener;
+import org.apache.helix.HelixConnection;
+import org.apache.helix.HelixConstants.ChangeType;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.HelixRole;
+import org.apache.helix.IdealStateChangeListener;
+import org.apache.helix.InstanceConfigChangeListener;
+import org.apache.helix.LiveInstanceChangeListener;
+import org.apache.helix.MessageListener;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.NotificationContext.Type;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.ScopedConfigChangeListener;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.HealthStat;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.Watcher.Event.EventType;
+
+/**
+ * This is a copy of {@link CallbackHandler} We need to synchronize on ZkHelixConnection
+ * instead ofHelixManager to avoid dead-lock.
+ * Otherwise an example deadlock scenario would be:
+ * 1) main-thread calls ZkHelixConnection#disconnect(), results in:
+ * - ZkHelixController#reset(), holding ZkHelixConnection, waiting HelixConnectionAdaptor
+ * 2) zk-event-thread calls CallbackHandler#handleChildChange(), results in:
+ * - CallbackHandler#invoke(), holding HelixConnectionAdaptor, waiting ZkHelixConnection
+ * TODO remove code duplication
+ */
+public class ZkCallbackHandler implements IZkChildListener, IZkDataListener
+
+{
+  private static Logger logger = Logger.getLogger(ZkCallbackHandler.class);
+
+  /**
+   * define the next possible notification types
+   */
+  private static Map<Type, List<Type>> nextNotificationType = new HashMap<Type, List<Type>>();
+  static {
+    nextNotificationType.put(Type.INIT, Arrays.asList(Type.CALLBACK, Type.FINALIZE));
+    nextNotificationType.put(Type.CALLBACK, Arrays.asList(Type.CALLBACK, Type.FINALIZE));
+    nextNotificationType.put(Type.FINALIZE, Arrays.asList(Type.INIT));
+  }
+
+  private final String _path;
+  private final Object _listener;
+  private final EventType[] _eventTypes;
+
+  private final ChangeType _changeType;
+  private final ZkClient _zkClient;
+  private final AtomicLong _lastNotificationTimeStamp;
+
+  private final HelixRole _role;
+  private final HelixManager _manager;
+  private final String _instanceName;
+  private final HelixConnection _connection;
+  private final HelixDataAccessor _accessor;
+
+  private final PropertyKey _propertyKey;
+
+  /**
+   * maintain the expected notification types
+   * this is fix for HELIX-195: race condition between FINALIZE callbacks and Zk callbacks
+   */
+  private List<NotificationContext.Type> _expectTypes = nextNotificationType.get(Type.FINALIZE);
+
+  public ZkCallbackHandler(HelixRole role, ZkClient client,
+      PropertyKey propertyKey,
+      Object listener, EventType[] eventTypes, ChangeType changeType) {
+    if (listener == null) {
+      throw new HelixException("listener could not be null");
+    }
+
+    _role = role;
+    _manager = new HelixConnectionAdaptor(role);
+    _instanceName = role.getId().stringify();
+    _connection = role.getConnection();
+    _accessor = _connection.createDataAccessor(role.getClusterId());
+    _zkClient = client;
+    _propertyKey = propertyKey;
+    _path = propertyKey.getPath();
+    _listener = listener;
+    _eventTypes = eventTypes;
+    _changeType = changeType;
+    _lastNotificationTimeStamp = new AtomicLong(System.nanoTime());
+    init();
+  }
+
+  public Object getListener() {
+    return _listener;
+  }
+
+  public String getPath() {
+    return _path;
+  }
+
+  public void invoke(NotificationContext changeContext) throws Exception {
+    // This allows the listener to work with one change at a time
+    synchronized (_connection) {
+      Type type = changeContext.getType();
+      if (!_expectTypes.contains(type)) {
+        logger.warn("Skip processing callbacks for listener: " + _listener + ", path: " + _path
+            + ", expected types: " + _expectTypes + " but was " + type);
+        return;
+      }
+      _expectTypes = nextNotificationType.get(type);
+
+      // Builder keyBuilder = _accessor.keyBuilder();
+      long start = System.currentTimeMillis();
+      if (logger.isInfoEnabled()) {
+        logger.info(Thread.currentThread().getId() + " START:INVOKE " + _path + " listener:"
+            + _listener.getClass().getCanonicalName());
+      }
+
+      if (_changeType == IDEAL_STATE) {
+
+        IdealStateChangeListener idealStateChangeListener = (IdealStateChangeListener) _listener;
+        subscribeForChanges(changeContext, _path, true, true);
+        List<IdealState> idealStates = _accessor.getChildValues(_propertyKey);
+
+        idealStateChangeListener.onIdealStateChange(idealStates, changeContext);
+
+      } else if (_changeType == ChangeType.INSTANCE_CONFIG) {
+        subscribeForChanges(changeContext, _path, true, true);
+        if (_listener instanceof ConfigChangeListener) {
+          ConfigChangeListener configChangeListener = (ConfigChangeListener) _listener;
+          List<InstanceConfig> configs = _accessor.getChildValues(_propertyKey);
+          configChangeListener.onConfigChange(configs, changeContext);
+        } else if (_listener instanceof InstanceConfigChangeListener) {
+          InstanceConfigChangeListener listener = (InstanceConfigChangeListener) _listener;
+          List<InstanceConfig> configs = _accessor.getChildValues(_propertyKey);
+          listener.onInstanceConfigChange(configs, changeContext);
+        }
+      } else if (_changeType == CONFIG) {
+        subscribeForChanges(changeContext, _path, true, true);
+        ScopedConfigChangeListener listener = (ScopedConfigChangeListener) _listener;
+        List<HelixProperty> configs = _accessor.getChildValues(_propertyKey);
+        listener.onConfigChange(configs, changeContext);
+      } else if (_changeType == LIVE_INSTANCE) {
+        LiveInstanceChangeListener liveInstanceChangeListener =
+            (LiveInstanceChangeListener) _listener;
+        subscribeForChanges(changeContext, _path, true, true);
+        List<LiveInstance> liveInstances = _accessor.getChildValues(_propertyKey);
+
+        liveInstanceChangeListener.onLiveInstanceChange(liveInstances, changeContext);
+
+      } else if (_changeType == CURRENT_STATE) {
+        CurrentStateChangeListener currentStateChangeListener =
+            (CurrentStateChangeListener) _listener;
+        subscribeForChanges(changeContext, _path, true, true);
+        String instanceName = PropertyPathConfig.getInstanceNameFromPath(_path);
+
+        List<CurrentState> currentStates = _accessor.getChildValues(_propertyKey);
+
+        currentStateChangeListener.onStateChange(instanceName, currentStates, changeContext);
+
+      } else if (_changeType == MESSAGE) {
+        MessageListener messageListener = (MessageListener) _listener;
+        subscribeForChanges(changeContext, _path, true, false);
+        String instanceName = PropertyPathConfig.getInstanceNameFromPath(_path);
+        List<Message> messages = _accessor.getChildValues(_propertyKey);
+
+        messageListener.onMessage(instanceName, messages, changeContext);
+
+      } else if (_changeType == MESSAGES_CONTROLLER) {
+        MessageListener messageListener = (MessageListener) _listener;
+        subscribeForChanges(changeContext, _path, true, false);
+        List<Message> messages = _accessor.getChildValues(_propertyKey);
+
+        messageListener.onMessage(_instanceName, messages, changeContext);
+
+      } else if (_changeType == EXTERNAL_VIEW) {
+        ExternalViewChangeListener externalViewListener = (ExternalViewChangeListener) _listener;
+        subscribeForChanges(changeContext, _path, true, true);
+        List<ExternalView> externalViewList = _accessor.getChildValues(_propertyKey);
+
+        externalViewListener.onExternalViewChange(externalViewList, changeContext);
+      } else if (_changeType == ChangeType.CONTROLLER) {
+        ControllerChangeListener controllerChangelistener = (ControllerChangeListener) _listener;
+        subscribeForChanges(changeContext, _path, true, false);
+        controllerChangelistener.onControllerChange(changeContext);
+      } else if (_changeType == ChangeType.HEALTH) {
+        HealthStateChangeListener healthStateChangeListener = (HealthStateChangeListener) _listener;
+        subscribeForChanges(changeContext, _path, true, true); // TODO: figure out
+        // settings here
+        String instanceName = PropertyPathConfig.getInstanceNameFromPath(_path);
+
+        List<HealthStat> healthReportList = _accessor.getChildValues(_propertyKey);
+
+        healthStateChangeListener.onHealthChange(instanceName, healthReportList, changeContext);
+      }
+
+      long end = System.currentTimeMillis();
+      if (logger.isInfoEnabled()) {
+        logger.info(Thread.currentThread().getId() + " END:INVOKE " + _path + " listener:"
+            + _listener.getClass().getCanonicalName() + " Took: " + (end - start) + "ms");
+      }
+    }
+  }
+
+  private void subscribeChildChange(String path, NotificationContext context) {
+    NotificationContext.Type type = context.getType();
+    if (type == NotificationContext.Type.INIT || type == NotificationContext.Type.CALLBACK) {
+      logger.info(_instanceName + " subscribes child-change. path: " + path
+          + ", listener: " + _listener);
+      _zkClient.subscribeChildChanges(path, this);
+    } else if (type == NotificationContext.Type.FINALIZE) {
+      logger.info(_instanceName + " unsubscribe child-change. path: " + path
+          + ", listener: " + _listener);
+
+      _zkClient.unsubscribeChildChanges(path, this);
+    }
+  }
+
+  private void subscribeDataChange(String path, NotificationContext context) {
+    NotificationContext.Type type = context.getType();
+    if (type == NotificationContext.Type.INIT || type == NotificationContext.Type.CALLBACK) {
+      if (logger.isDebugEnabled()) {
+        logger.debug(_instanceName + " subscribe data-change. path: " + path
+            + ", listener: " + _listener);
+      }
+      _zkClient.subscribeDataChanges(path, this);
+
+    } else if (type == NotificationContext.Type.FINALIZE) {
+      logger.info(_instanceName + " unsubscribe data-change. path: " + path
+          + ", listener: " + _listener);
+
+      _zkClient.unsubscribeDataChanges(path, this);
+    }
+  }
+
+  // TODO watchParent is always true. consider remove it
+  private void subscribeForChanges(NotificationContext context, String path, boolean watchParent,
+      boolean watchChild) {
+    if (watchParent) {
+      subscribeChildChange(path, context);
+    }
+
+    if (watchChild) {
+      try {
+        switch (_changeType) {
+        case CURRENT_STATE:
+        case IDEAL_STATE:
+        case EXTERNAL_VIEW: {
+          // check if bucketized
+          BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
+          List<ZNRecord> records = baseAccessor.getChildren(path, null, 0);
+          for (ZNRecord record : records) {
+            HelixProperty property = new HelixProperty(record);
+            String childPath = path + "/" + record.getId();
+
+            int bucketSize = property.getBucketSize();
+            if (bucketSize > 0) {
+              // subscribe both data-change and child-change on bucketized parent node
+              // data-change gives a delete-callback which is used to remove watch
+              subscribeChildChange(childPath, context);
+              subscribeDataChange(childPath, context);
+
+              // subscribe data-change on bucketized child
+              List<String> bucketizedChildNames = _zkClient.getChildren(childPath);
+              if (bucketizedChildNames != null) {
+                for (String bucketizedChildName : bucketizedChildNames) {
+                  String bucketizedChildPath = childPath + "/" + bucketizedChildName;
+                  subscribeDataChange(bucketizedChildPath, context);
+                }
+              }
+            } else {
+              subscribeDataChange(childPath, context);
+            }
+          }
+          break;
+        }
+        default: {
+          List<String> childNames = _zkClient.getChildren(path);
+          if (childNames != null) {
+            for (String childName : childNames) {
+              String childPath = path + "/" + childName;
+              subscribeDataChange(childPath, context);
+            }
+          }
+          break;
+        }
+        }
+      } catch (ZkNoNodeException e) {
+        logger.warn("fail to subscribe child/data change. path: " + path + ", listener: "
+            + _listener, e);
+      }
+    }
+
+  }
+
+  public EventType[] getEventTypes() {
+    return _eventTypes;
+  }
+
+  /**
+   * Invoke the listener so that it sets up the initial values from the zookeeper if any
+   * exists
+   */
+  public void init() {
+    updateNotificationTime(System.nanoTime());
+    try {
+      NotificationContext changeContext = new NotificationContext(_manager);
+      changeContext.setType(NotificationContext.Type.INIT);
+      invoke(changeContext);
+    } catch (Exception e) {
+      String msg = "Exception while invoking init callback for listener:" + _listener;
+      ZKExceptionHandler.getInstance().handle(msg, e);
+    }
+  }
+
+  @Override
+  public void handleDataChange(String dataPath, Object data) {
+    try {
+      updateNotificationTime(System.nanoTime());
+      if (dataPath != null && dataPath.startsWith(_path)) {
+        NotificationContext changeContext = new NotificationContext(_manager);
+        changeContext.setType(NotificationContext.Type.CALLBACK);
+        invoke(changeContext);
+      }
+    } catch (Exception e) {
+      String msg =
+          "exception in handling data-change. path: " + dataPath + ", listener: " + _listener;
+      ZKExceptionHandler.getInstance().handle(msg, e);
+    }
+  }
+
+  @Override
+  public void handleDataDeleted(String dataPath) {
+    try {
+      updateNotificationTime(System.nanoTime());
+      if (dataPath != null && dataPath.startsWith(_path)) {
+        logger.info(_instanceName + " unsubscribe data-change. path: " + dataPath
+            + ", listener: " + _listener);
+        _zkClient.unsubscribeDataChanges(dataPath, this);
+
+        // only needed for bucketized parent, but OK if we don't have child-change
+        // watch on the bucketized parent path
+        logger.info(_instanceName + " unsubscribe child-change. path: " + dataPath
+            + ", listener: " + _listener);
+        _zkClient.unsubscribeChildChanges(dataPath, this);
+        // No need to invoke() since this event will handled by child-change on parent-node
+        // NotificationContext changeContext = new NotificationContext(_manager);
+        // changeContext.setType(NotificationContext.Type.CALLBACK);
+        // invoke(changeContext);
+      }
+    } catch (Exception e) {
+      String msg =
+          "exception in handling data-delete-change. path: " + dataPath + ", listener: "
+              + _listener;
+      ZKExceptionHandler.getInstance().handle(msg, e);
+    }
+  }
+
+  @Override
+  public void handleChildChange(String parentPath, List<String> currentChilds) {
+    try {
+      updateNotificationTime(System.nanoTime());
+      if (parentPath != null && parentPath.startsWith(_path)) {
+        NotificationContext changeContext = new NotificationContext(_manager);
+
+        if (currentChilds == null) {
+          // parentPath has been removed
+          if (parentPath.equals(_path)) {
+            // _path has been removed, remove this listener
+            _manager.removeListener(_propertyKey, _listener);
+          }
+          changeContext.setType(NotificationContext.Type.FINALIZE);
+        } else {
+          changeContext.setType(NotificationContext.Type.CALLBACK);
+        }
+        invoke(changeContext);
+      }
+    } catch (Exception e) {
+      String msg =
+          "exception in handling child-change. instance: " + _instanceName
+              + ", parentPath: " + parentPath + ", listener: " + _listener;
+      ZKExceptionHandler.getInstance().handle(msg, e);
+    }
+  }
+
+  /**
+   * Invoke the listener for the last time so that the listener could clean up resources
+   */
+  public void reset() {
+    try {
+      NotificationContext changeContext = new NotificationContext(_manager);
+      changeContext.setType(NotificationContext.Type.FINALIZE);
+      invoke(changeContext);
+    } catch (Exception e) {
+      String msg = "Exception while resetting the listener:" + _listener;
+      ZKExceptionHandler.getInstance().handle(msg, e);
+    }
+  }
+
+  private void updateNotificationTime(long nanoTime) {
+    long l = _lastNotificationTimeStamp.get();
+    while (nanoTime > l) {
+      boolean b = _lastNotificationTimeStamp.compareAndSet(l, nanoTime);
+      if (b) {
+        break;
+      } else {
+        l = _lastNotificationTimeStamp.get();
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/72d879ed/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
index 4fd7a9a..e43eddf 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
@@ -345,6 +345,10 @@ public class ZkClient extends org.I0Itec.zkclient.ZkClient {
 
   @Override
   public boolean delete(final String path) {
+    return this.delete(path, -1);
+  }
+
+  public boolean delete(final String path, final int version) {
     long startT = System.nanoTime();
     try {
       try {
@@ -352,7 +356,8 @@ public class ZkClient extends org.I0Itec.zkclient.ZkClient {
 
           @Override
           public Object call() throws Exception {
-            _connection.delete(path);
+            ZkConnection connection = (ZkConnection) _connection;
+            connection.getZookeeper().delete(path, version);
             return null;
           }
         });

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/72d879ed/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixAutoController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixAutoController.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixAutoController.java
index d9ea445..136d47e 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixAutoController.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixAutoController.java
@@ -1,5 +1,24 @@
 package org.apache.helix.manager.zk;
 
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 import org.apache.helix.ClusterMessagingService;
 import org.apache.helix.HelixAutoController;
 import org.apache.helix.HelixConnection;

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/72d879ed/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixConnection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixConnection.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixConnection.java
index 0717d77..7d6f132 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixConnection.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixConnection.java
@@ -1,5 +1,24 @@
 package org.apache.helix.manager.zk;
 
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -22,6 +41,7 @@ import org.apache.helix.CurrentStateChangeListener;
 import org.apache.helix.ExternalViewChangeListener;
 import org.apache.helix.HealthStateChangeListener;
 import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixAutoController;
 import org.apache.helix.HelixConstants.ChangeType;
 import org.apache.helix.HelixConnection;
 import org.apache.helix.HelixConnectionStateListener;
@@ -67,7 +87,7 @@ public class ZkHelixConnection implements HelixConnection, IZkStateListener {
   final Set<HelixConnectionStateListener> _connectionListener =
       new CopyOnWriteArraySet<HelixConnectionStateListener>();
 
-  final Map<HelixRole, List<CallbackHandler>> _handlers;
+  final Map<HelixRole, List<ZkCallbackHandler>> _handlers;
   final HelixManagerProperties _properties;
 
   /**
@@ -88,7 +108,7 @@ public class ZkHelixConnection implements HelixConnection, IZkStateListener {
 
   public ZkHelixConnection(String zkAddr) {
     _zkAddr = zkAddr;
-    _handlers = new HashMap<HelixRole, List<CallbackHandler>>();
+    _handlers = new HashMap<HelixRole, List<ZkCallbackHandler>>();
 
     /**
      * use system property if available
@@ -201,6 +221,11 @@ public class ZkHelixConnection implements HelixConnection, IZkStateListener {
   }
 
   @Override
+  public HelixAutoController createAutoController(ClusterId clusterId, ControllerId controllerId) {
+    return new ZkHelixAutoController(this, clusterId, controllerId);
+  }
+
+  @Override
   public ClusterAccessor createClusterAccessor(ClusterId clusterId) {
     return new ClusterAccessor(clusterId, createDataAccessor(clusterId));
   }
@@ -374,14 +399,14 @@ public class ZkHelixConnection implements HelixConnection, IZkStateListener {
   public boolean removeListener(HelixRole role, Object listener, PropertyKey key) {
     LOG.info("role: " + role + " removing listener: " + listener + " on path: " + key.getPath()
         + " from connection: " + this);
-    List<CallbackHandler> toRemove = new ArrayList<CallbackHandler>();
-    List<CallbackHandler> handlerList = _handlers.get(role);
+    List<ZkCallbackHandler> toRemove = new ArrayList<ZkCallbackHandler>();
+    List<ZkCallbackHandler> handlerList = _handlers.get(role);
     if (handlerList == null) {
       return true;
     }
 
     synchronized (this) {
-      for (CallbackHandler handler : handlerList) {
+      for (ZkCallbackHandler handler : handlerList) {
         // compare property-key path and listener reference
         if (handler.getPath().equals(key.getPath()) && handler.getListener().equals(listener)) {
           toRemove.add(handler);
@@ -395,7 +420,7 @@ public class ZkHelixConnection implements HelixConnection, IZkStateListener {
     }
 
     // handler.reset() may modify the handlers list, so do it outside the iteration
-    for (CallbackHandler handler : toRemove) {
+    for (ZkCallbackHandler handler : toRemove) {
       handler.reset();
     }
 
@@ -529,16 +554,15 @@ public class ZkHelixConnection implements HelixConnection, IZkStateListener {
   void addListener(HelixRole role, Object listener, PropertyKey propertyKey, ChangeType changeType,
       EventType[] eventType) {
     // checkConnected();
-    HelixManager manager = new HelixConnectionAdaptor(role);
     PropertyType type = propertyKey.getType();
 
     synchronized (this) {
       if (!_handlers.containsKey(role)) {
-        _handlers.put(role, new CopyOnWriteArrayList<CallbackHandler>());
+        _handlers.put(role, new CopyOnWriteArrayList<ZkCallbackHandler>());
       }
-      List<CallbackHandler> handlerList = _handlers.get(role);
+      List<ZkCallbackHandler> handlerList = _handlers.get(role);
 
-      for (CallbackHandler handler : handlerList) {
+      for (ZkCallbackHandler handler : handlerList) {
         // compare property-key path and listener reference
         if (handler.getPath().equals(propertyKey.getPath())
             && handler.getListener().equals(listener)) {
@@ -549,8 +573,8 @@ public class ZkHelixConnection implements HelixConnection, IZkStateListener {
         }
       }
 
-      CallbackHandler newHandler =
-          new CallbackHandler(manager, _zkclient, propertyKey, listener, eventType, changeType);
+      ZkCallbackHandler newHandler =
+          new ZkCallbackHandler(role, _zkclient, propertyKey, listener, eventType, changeType);
 
       handlerList.add(newHandler);
       LOG.info("role: " + role + " added listener: " + listener + " for type: " + type
@@ -560,10 +584,10 @@ public class ZkHelixConnection implements HelixConnection, IZkStateListener {
 
   void initHandlers(HelixRole role) {
     synchronized (this) {
-      List<CallbackHandler> handlerList = _handlers.get(role);
+      List<ZkCallbackHandler> handlerList = _handlers.get(role);
 
       if (handlerList != null) {
-        for (CallbackHandler handler : handlerList) {
+        for (ZkCallbackHandler handler : handlerList) {
           handler.init();
           LOG.info("role: " + role + ", init handler: " + handler.getPath() + ", "
               + handler.getListener());
@@ -574,10 +598,10 @@ public class ZkHelixConnection implements HelixConnection, IZkStateListener {
 
   void resetHandlers(HelixRole role) {
     synchronized (this) {
-      List<CallbackHandler> handlerList = _handlers.get(role);
+      List<ZkCallbackHandler> handlerList = _handlers.get(role);
 
       if (handlerList != null) {
-        for (CallbackHandler handler : handlerList) {
+        for (ZkCallbackHandler handler : handlerList) {
           handler.reset();
           LOG.info("role: " + role + ", reset handler: " + handler.getPath() + ", "
               + handler.getListener());

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/72d879ed/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java
index b0c5a8b..f116c76 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java
@@ -1,5 +1,24 @@
 package org.apache.helix.manager.zk;
 
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 import java.util.ArrayList;
 import java.util.List;
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/72d879ed/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixLeaderElection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixLeaderElection.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixLeaderElection.java
index 06e8cd8..77da158 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixLeaderElection.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixLeaderElection.java
@@ -1,5 +1,24 @@
 package org.apache.helix.manager.zk;
 
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 import java.lang.management.ManagementFactory;
 
 import org.apache.helix.ControllerChangeListener;
@@ -58,11 +77,13 @@ public class ZkHelixLeaderElection implements ControllerChangeListener {
     }
 
     try {
+      HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+      PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
       if (changeContext.getType().equals(NotificationContext.Type.INIT)
           || changeContext.getType().equals(NotificationContext.Type.CALLBACK)) {
         LOG.info(_controllerId + " is trying to acquire leadership for cluster: " + _clusterId);
-        HelixDataAccessor accessor = _manager.getHelixDataAccessor();
-        PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
 
         while (accessor.getProperty(keyBuilder.controllerLeader()) == null) {
           boolean success = tryUpdateController(_manager);
@@ -83,7 +104,17 @@ public class ZkHelixLeaderElection implements ControllerChangeListener {
         /**
          * clear write-through cache
          */
-        _manager.getHelixDataAccessor().getBaseDataAccessor().reset();
+        accessor.getBaseDataAccessor().reset();
+
+        /**
+         * remove leader ephemeral znode if this is the leader
+         * note that session expiry may happen during checking leader and remove leader
+         * in this race condition, we may remove a leader node created by another controller
+         * this is fine since it will just invoke another round of leader-election
+         */
+        if (_controller.isLeader()) {
+          accessor.removeProperty(keyBuilder.controllerLeader());
+        }
       }
 
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/72d879ed/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixParticipant.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixParticipant.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixParticipant.java
index eba96c9..56e4be8 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixParticipant.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixParticipant.java
@@ -1,5 +1,24 @@
 package org.apache.helix.manager.zk;
 
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 import java.lang.management.ManagementFactory;
 import java.util.ArrayList;
 import java.util.List;
@@ -415,6 +434,10 @@ public class ZkHelixParticipant implements HelixParticipant {
     _messagingService.getExecutor().shutdown();
     _accessor.shutdown();
 
+    /**
+     * remove live instance ephemeral znode
+     */
+    _accessor.removeProperty(_keyBuilder.liveInstance(_participantId.stringify()));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/72d879ed/helix-core/src/main/java/org/apache/helix/monitoring/StatusDumpTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/StatusDumpTask.java b/helix-core/src/main/java/org/apache/helix/monitoring/StatusDumpTask.java
index 751aefa..421aab0 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/StatusDumpTask.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/StatusDumpTask.java
@@ -1,5 +1,24 @@
 package org.apache.helix.monitoring;
 
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Timer;

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/72d879ed/helix-core/src/main/java/org/apache/helix/participant/statemachine/HelixStateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/statemachine/HelixStateModelFactory.java b/helix-core/src/main/java/org/apache/helix/participant/statemachine/HelixStateModelFactory.java
index 770bba4..45f56e5 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/statemachine/HelixStateModelFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/statemachine/HelixStateModelFactory.java
@@ -1,5 +1,24 @@
 package org.apache.helix.participant.statemachine;
 
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/72d879ed/helix-core/src/main/java/org/apache/helix/participant/statemachine/HelixStateModelFactoryAdaptor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/statemachine/HelixStateModelFactoryAdaptor.java b/helix-core/src/main/java/org/apache/helix/participant/statemachine/HelixStateModelFactoryAdaptor.java
index a17d9f3..320275d 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/statemachine/HelixStateModelFactoryAdaptor.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/statemachine/HelixStateModelFactoryAdaptor.java
@@ -1,5 +1,24 @@
 package org.apache.helix.participant.statemachine;
 
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 import org.apache.helix.api.id.PartitionId;
 
 public class HelixStateModelFactoryAdaptor<T extends StateModel> extends StateModelFactory<T> {

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/72d879ed/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java b/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java
index 43e0250..29e0a44 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java
@@ -1,5 +1,24 @@
 package org.apache.helix.integration;
 
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 import java.util.Arrays;
 import java.util.Date;
 import java.util.Map;
@@ -147,6 +166,7 @@ public class TestHelixConnection extends ZkUnitTestBase {
     // clean up
     controller.stopAsync();
     participant.stopAsync();
+    connection.disconnect();
 
     System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
   }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/72d879ed/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAutoController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAutoController.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAutoController.java
new file mode 100644
index 0000000..28b1477
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAutoController.java
@@ -0,0 +1,120 @@
+package org.apache.helix.manager.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+
+import org.apache.helix.HelixAutoController;
+import org.apache.helix.HelixConnection;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ControllerId;
+import org.apache.helix.model.LiveInstance;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestZkHelixAutoController extends ZkUnitTestBase {
+  @Test
+  public void testOnConnectedAndDisconnecting() throws Exception {
+    // Logger.getRootLogger().setLevel(Level.INFO);
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    int n = 2;
+
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        "TestDB", // resource name prefix
+        1, // resources
+        32, // partitions per resource
+        n, // number of nodes
+        2, // replicas
+        "MasterSlave", true); // do rebalance
+
+    // create connection
+    HelixConnection connection = new ZkHelixConnection(ZK_ADDR);
+    connection.connect();
+
+    // start auto-controller
+    ClusterId clusterId = ClusterId.from(clusterName);
+    final HelixAutoController[] controllers = new HelixAutoController[n];
+    for (int i = 0; i < n; i++) {
+      int port = 12918 + i;
+      ControllerId controllerId = ControllerId.from("localhost_" + port);
+      controllers[i] = connection.createAutoController(clusterId, controllerId);
+      controllers[i].startAsync();
+    }
+
+    // check live-instance znode for localhost_12918/12919 exists
+    final HelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+    final PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+    for (int i = 0; i < n; i++) {
+      String instanceName = controllers[i].getControllerId().stringify();
+      Assert.assertNotNull(accessor.getProperty(keyBuilder.liveInstance(instanceName)));
+    }
+
+    // check leader znode exists
+    LiveInstance leader = accessor.getProperty(keyBuilder.controllerLeader());
+    Assert.assertNotNull(leader);
+    Assert.assertEquals(leader.getInstanceName(), controllers[0].getControllerId().stringify());
+
+    // stop controller localhost_12918
+    controllers[0].stopAsync();
+
+    // check live-instance znode for localhost_12918 is gone
+    String instanceName = controllers[0].getControllerId().stringify();
+    Assert.assertNull(accessor.getProperty(keyBuilder.liveInstance(instanceName)));
+
+    // check localhost_12919 becomes the new leader
+    boolean success = TestHelper.verify(new TestHelper.Verifier() {
+
+      @Override
+      public boolean verify() throws Exception {
+        LiveInstance leader = accessor.getProperty(keyBuilder.controllerLeader());
+        if (leader == null) {
+          return false;
+        }
+        return leader.getInstanceName().equals(controllers[1].getControllerId().stringify());
+
+      }
+    }, 3 * 1000);
+    Assert.assertTrue(success, "fail to re-elect new leader");
+
+    // clean up
+    connection.disconnect();
+
+    // check live-instance znode for localhost_12919 is gone
+    instanceName = controllers[1].getControllerId().stringify();
+    Assert.assertNull(accessor.getProperty(keyBuilder.liveInstance(instanceName)));
+
+    // check leader znode is gone
+    Assert.assertNull(accessor.getProperty(keyBuilder.controllerLeader()));
+
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/72d879ed/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixController.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixController.java
new file mode 100644
index 0000000..0127edb
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixController.java
@@ -0,0 +1,160 @@
+package org.apache.helix.manager.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+
+import org.apache.helix.HelixConnection;
+import org.apache.helix.HelixController;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ControllerId;
+import org.apache.helix.model.LiveInstance;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestZkHelixController extends ZkUnitTestBase {
+
+  @Test
+  public void testOnConnectedAndDisconnecting() throws Exception {
+    // Logger.getRootLogger().setLevel(Level.INFO);
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    int n = 2;
+
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        "TestDB", // resource name prefix
+        1, // resources
+        32, // partitions per resource
+        n, // number of nodes
+        2, // replicas
+        "MasterSlave", true); // do rebalance
+
+    // create connection
+    HelixConnection connection = new ZkHelixConnection(ZK_ADDR);
+    connection.connect();
+
+    // start controller
+    ClusterId clusterId = ClusterId.from(clusterName);
+    ControllerId controllerId = ControllerId.from("controller");
+    HelixController controller = connection.createController(clusterId, controllerId);
+    controller.startAsync();
+
+    // check leader znode exists
+    HelixDataAccessor accessor = connection.createDataAccessor(clusterId);
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    LiveInstance leader = accessor.getProperty(keyBuilder.controllerLeader());
+    Assert.assertNotNull(leader);
+    Assert.assertEquals(leader.getInstanceName(), controllerId.stringify());
+
+    // stop participant
+    controller.stopAsync();
+
+    // check leader znode is gone
+    Assert.assertNull(accessor.getProperty(keyBuilder.controllerLeader()));
+
+    // clean up
+    connection.disconnect();
+
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+
+  /**
+   * Remove leader znode externally should invoke another round of leader-election
+   * this simulates the race condition in
+   * {@link ZkHelixLeaderElection#onControllerChange(org.apache.helix.NotificationContext)}
+   * @throws Exception
+   */
+  @Test
+  public void testRemoveLeaderZnode() throws Exception {
+
+    // Logger.getRootLogger().setLevel(Level.INFO);
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    int n = 2;
+
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        "TestDB", // resource name prefix
+        1, // resources
+        32, // partitions per resource
+        n, // number of nodes
+        2, // replicas
+        "MasterSlave", true); // do rebalance
+
+    // create connection
+    HelixConnection connection = new ZkHelixConnection(ZK_ADDR);
+    connection.connect();
+
+    // start controller
+    ClusterId clusterId = ClusterId.from(clusterName);
+    final ControllerId controllerId = ControllerId.from("controller");
+
+    // start controller
+    HelixController controller = connection.createController(clusterId, controllerId);
+    controller.startAsync();
+
+    // check live-instance znode for localhost_12918 exists
+    final HelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+    final PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    LiveInstance leader = accessor.getProperty(keyBuilder.controllerLeader());
+    Assert.assertNotNull(leader);
+    Assert.assertEquals(leader.getInstanceName(), controllerId.stringify());
+
+    // remove leader znode externally
+    accessor.removeProperty(keyBuilder.controllerLeader());
+
+    // verify leader is re-elected
+    boolean result = TestHelper.verify(new TestHelper.Verifier() {
+
+      @Override
+      public boolean verify() throws Exception {
+        LiveInstance leader = accessor.getProperty(keyBuilder.controllerLeader());
+        if (leader == null) {
+          return false;
+        }
+
+        return leader.getInstanceName().equals(controllerId.stringify());
+      }
+    }, 3 * 1000);
+
+    Assert.assertTrue(result, "Fail to re-elect a new leader");
+
+    // clean up
+    connection.disconnect();
+
+    // check leader znode is gone
+    Assert.assertNull(accessor.getProperty(keyBuilder.controllerLeader()));
+
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/72d879ed/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixParticipant.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixParticipant.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixParticipant.java
new file mode 100644
index 0000000..8658736
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixParticipant.java
@@ -0,0 +1,103 @@
+package org.apache.helix.manager.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+
+import org.apache.helix.HelixConnection;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixParticipant;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.integration.TestHelixConnection;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestZkHelixParticipant extends ZkUnitTestBase {
+
+  @Test
+  public void testOnConnectedAndDisconnecting() throws Exception {
+    // Logger.getRootLogger().setLevel(Level.INFO);
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    int n = 2;
+
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        "TestDB", // resource name prefix
+        1, // resources
+        32, // partitions per resource
+        n, // number of nodes
+        2, // replicas
+        "MasterSlave", true); // do rebalance
+
+    // create connection
+    HelixConnection connection = new ZkHelixConnection(ZK_ADDR);
+    connection.connect();
+
+    // start participant
+    ClusterId clusterId = ClusterId.from(clusterName);
+    HelixParticipant[] participants = new HelixParticipant[n];
+    for (int i = 0; i < n; i++) {
+      int port = 12918 + i;
+      ParticipantId participantId = ParticipantId.from("localhost_" + port);
+
+      participants[i] = connection.createParticipant(clusterId, participantId);
+      participants[i].getStateMachineEngine().registerStateModelFactory(
+        StateModelDefId.from("MasterSlave"), new TestHelixConnection.MockStateModelFactory());
+
+      participants[i].startAsync();
+    }
+
+    // check live-instance znode for localhost_12918/12919 exist
+    HelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+    for (int i = 0; i < n; i++) {
+      Assert.assertNotNull(accessor.getProperty(keyBuilder.liveInstance(participants[i].getParticipantId().stringify())));
+    }
+
+    // stop participant localhost_12918
+    participants[0].stopAsync();
+
+    // check live-instance znode for localhost_12918 is gone
+    Assert.assertNull(accessor.getProperty(keyBuilder.liveInstance(participants[0]
+        .getParticipantId().stringify())));
+
+    // clean up
+    connection.disconnect();
+
+    // check live-instance znode for localhost_12919 is gone
+    Assert.assertNull(accessor.getProperty(keyBuilder.liveInstance(participants[1]
+        .getParticipantId().stringify())));
+
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/72d879ed/helix-examples/src/main/java/org/apache/helix/examples/LogicalModelExample.java
----------------------------------------------------------------------
diff --git a/helix-examples/src/main/java/org/apache/helix/examples/LogicalModelExample.java b/helix-examples/src/main/java/org/apache/helix/examples/LogicalModelExample.java
index 2f5a28b..c233417 100644
--- a/helix-examples/src/main/java/org/apache/helix/examples/LogicalModelExample.java
+++ b/helix-examples/src/main/java/org/apache/helix/examples/LogicalModelExample.java
@@ -142,10 +142,9 @@ public class LogicalModelExample {
     autoJoinParticipant.stopAsync();
 
     // stop the controller
-    helixParticipant.stopAsync();
+    helixController.stopAsync();
 
     // drop the cluster
-    Thread.sleep(5000);
     dropCluster(clusterId, connection);
     connection.disconnect();
   }


[2/2] git commit: Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/incubator-helix

Posted by zz...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/incubator-helix


Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/7c444e50
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/7c444e50
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/7c444e50

Branch: refs/heads/master
Commit: 7c444e50603a79804834b4da8e159222eea3a1f2
Parents: 72d879e 55c9351
Author: zzhang <zz...@apache.org>
Authored: Mon Nov 11 17:18:40 2013 -0800
Committer: zzhang <zz...@apache.org>
Committed: Mon Nov 11 17:18:40 2013 -0800

----------------------------------------------------------------------
 .../controller/rebalancer/CustomRebalancer.java |  62 +------
 .../rebalancer/FallbackRebalancer.java          | 185 +++++++++++++++++++
 .../util/ConstraintBasedAssignment.java         |  50 +++++
 .../stages/BestPossibleStateCalcStage.java      |  16 +-
 .../controller/stages/ResourceCurrentState.java |  11 ++
 .../java/org/apache/helix/model/Partition.java  |   2 +
 .../java/org/apache/helix/model/Resource.java   |   4 +-
 .../TestUserDefRebalancerCompatibility.java     | 104 +++++++++++
 8 files changed, 366 insertions(+), 68 deletions(-)
----------------------------------------------------------------------