You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2019/01/28 19:54:02 UTC

[1/3] helix git commit: Copy 101tec ZkConnection into Helix code base for customization.

Repository: helix
Updated Branches:
  refs/heads/master e8125e8b9 -> 8c8f79c5e


Copy 101tec ZkConnection into Helix code base for customization.

This is the preparation to address a potential reconnect issue in ZkConnection. This change does not change any logic.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/060ef363
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/060ef363
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/060ef363

Branch: refs/heads/master
Commit: 060ef363dbe28df326b377d5844a84459cf7aae4
Parents: e8125e8
Author: Jiajun Wang <jj...@linkedin.com>
Authored: Tue Jan 8 15:27:21 2019 -0800
Committer: jiajunwang <er...@gmail.com>
Committed: Thu Jan 24 14:27:03 2019 -0800

----------------------------------------------------------------------
 .../org/apache/helix/manager/zk/ZkClient.java   |   2 +-
 .../manager/zk/client/HelixZkClientFactory.java |   2 +-
 .../helix/manager/zk/client/SharedZkClient.java |   2 +-
 .../helix/manager/zk/zookeeper/ZkClient.java    |   1 -
 .../manager/zk/zookeeper/ZkConnection.java      | 163 +++++++++++++++++++
 .../java/org/apache/helix/ZkTestHelper.java     |   2 +-
 .../org/apache/helix/common/ZkTestBase.java     |   2 +-
 .../helix/manager/zk/TestRawZkClient.java       |   2 +-
 .../manager/zk/client/TestHelixZkClient.java    |   2 +-
 9 files changed, 170 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/060ef363/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
index 89676db..55c7048 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
@@ -20,11 +20,11 @@ package org.apache.helix.manager.zk;
  */
 
 import org.I0Itec.zkclient.IZkConnection;
-import org.I0Itec.zkclient.ZkConnection;
 import org.I0Itec.zkclient.serialize.SerializableSerializer;
 import org.I0Itec.zkclient.serialize.ZkSerializer;
 import org.apache.helix.HelixException;
 import org.apache.helix.manager.zk.client.HelixZkClient;
+import org.apache.helix.manager.zk.zookeeper.ZkConnection;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/helix/blob/060ef363/helix-core/src/main/java/org/apache/helix/manager/zk/client/HelixZkClientFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/client/HelixZkClientFactory.java b/helix-core/src/main/java/org/apache/helix/manager/zk/client/HelixZkClientFactory.java
index 9d10cd3..e6dc90e 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/client/HelixZkClientFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/client/HelixZkClientFactory.java
@@ -1,8 +1,8 @@
 package org.apache.helix.manager.zk.client;
 
 import org.I0Itec.zkclient.IZkConnection;
-import org.I0Itec.zkclient.ZkConnection;
 import org.apache.helix.HelixException;
+import org.apache.helix.manager.zk.zookeeper.ZkConnection;
 
 /**
  * Abstract class of the ZkClient factory.

http://git-wip-us.apache.org/repos/asf/helix/blob/060ef363/helix-core/src/main/java/org/apache/helix/manager/zk/client/SharedZkClient.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/client/SharedZkClient.java b/helix-core/src/main/java/org/apache/helix/manager/zk/client/SharedZkClient.java
index 242dea0..5c6ade0 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/client/SharedZkClient.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/client/SharedZkClient.java
@@ -3,8 +3,8 @@ package org.apache.helix.manager.zk.client;
 import java.util.List;
 
 import org.I0Itec.zkclient.IZkConnection;
-import org.I0Itec.zkclient.ZkConnection;
 import org.apache.helix.HelixException;
+import org.apache.helix.manager.zk.zookeeper.ZkConnection;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.data.ACL;
 import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/helix/blob/060ef363/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
index 132d9e3..5fa2f91 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
@@ -28,7 +28,6 @@ import org.I0Itec.zkclient.IZkChildListener;
 import org.I0Itec.zkclient.IZkConnection;
 import org.I0Itec.zkclient.IZkDataListener;
 import org.I0Itec.zkclient.IZkStateListener;
-import org.I0Itec.zkclient.ZkConnection;
 import org.I0Itec.zkclient.ZkLock;
 import org.I0Itec.zkclient.exception.ZkBadVersionException;
 import org.I0Itec.zkclient.exception.ZkException;

http://git-wip-us.apache.org/repos/asf/helix/blob/060ef363/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkConnection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkConnection.java b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkConnection.java
new file mode 100644
index 0000000..b4a78bb
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkConnection.java
@@ -0,0 +1,163 @@
+/**
+ * Copyright 2010 the original author or authors.
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.helix.manager.zk.zookeeper;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.I0Itec.zkclient.IZkConnection;
+import org.I0Itec.zkclient.exception.ZkException;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Op;
+import org.apache.zookeeper.OpResult;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooKeeper.States;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+
+public class ZkConnection implements IZkConnection {
+
+  private static final Logger LOG = Logger.getLogger(ZkConnection.class);
+
+  /** It is recommended to use quite large sessions timeouts for ZooKeeper. */
+  private static final int DEFAULT_SESSION_TIMEOUT = 30000;
+
+  private ZooKeeper _zk = null;
+  private Lock _zookeeperLock = new ReentrantLock();
+
+  private final String _servers;
+  private final int _sessionTimeOut;
+
+  public ZkConnection(String zkServers) {
+    this(zkServers, DEFAULT_SESSION_TIMEOUT);
+  }
+
+  public ZkConnection(String zkServers, int sessionTimeOut) {
+    _servers = zkServers;
+    _sessionTimeOut = sessionTimeOut;
+  }
+
+  @Override
+  public void connect(Watcher watcher) {
+    _zookeeperLock.lock();
+    try {
+      if (_zk != null) {
+        throw new IllegalStateException("zk client has already been started");
+      }
+      try {
+        LOG.debug("Creating new ZookKeeper instance to connect to " + _servers + ".");
+        _zk = new ZooKeeper(_servers, _sessionTimeOut, watcher);
+      } catch (IOException e) {
+        throw new ZkException("Unable to connect to " + _servers, e);
+      }
+    } finally {
+      _zookeeperLock.unlock();
+    }
+  }
+
+  @Override
+  public void close() throws InterruptedException {
+    _zookeeperLock.lock();
+    try {
+      if (_zk != null) {
+        LOG.debug("Closing ZooKeeper connected to " + _servers);
+        _zk.close();
+        _zk = null;
+      }
+    } finally {
+      _zookeeperLock.unlock();
+    }
+  }
+
+  @Override
+  public String create(String path, byte[] data, CreateMode mode) throws KeeperException, InterruptedException {
+    return _zk.create(path, data, Ids.OPEN_ACL_UNSAFE, mode);
+  }
+
+  @Override
+  public String create(String path, byte[] data, List<ACL> acl, CreateMode mode) throws KeeperException, InterruptedException {
+    return _zk.create(path, data, acl, mode);
+  }
+
+  @Override
+  public void delete(String path) throws InterruptedException, KeeperException {
+    _zk.delete(path, -1);
+  }
+
+  @Override
+  public boolean exists(String path, boolean watch) throws KeeperException, InterruptedException {
+    return _zk.exists(path, watch) != null;
+  }
+
+  @Override
+  public List<String> getChildren(final String path, final boolean watch) throws KeeperException, InterruptedException {
+    return _zk.getChildren(path, watch);
+  }
+
+  @Override
+  public byte[] readData(String path, Stat stat, boolean watch) throws KeeperException, InterruptedException {
+    return _zk.getData(path, watch, stat);
+  }
+
+  public void writeData(String path, byte[] data) throws KeeperException, InterruptedException {
+    writeData(path, data, -1);
+  }
+
+  @Override
+  public void writeData(String path, byte[] data, int version) throws KeeperException, InterruptedException {
+    _zk.setData(path, data, version);
+  }
+
+  @Override
+  public Stat writeDataReturnStat(String path, byte[] data, int version) throws KeeperException, InterruptedException {
+    return _zk.setData(path, data, version);
+  }
+
+  @Override
+  public States getZookeeperState() {
+    return _zk != null ? _zk.getState() : null;
+  }
+
+  public ZooKeeper getZookeeper() {
+    return _zk;
+  }
+
+  @Override
+  public long getCreateTime(String path) throws KeeperException, InterruptedException {
+    Stat stat = _zk.exists(path, false);
+    if (stat != null) {
+      return stat.getCtime();
+    }
+    return -1;
+  }
+
+  @Override
+  public String getServers() {
+    return _servers;
+  }
+
+  @Override
+  public List<OpResult> multi(Iterable<Op> ops) throws KeeperException, InterruptedException {
+    return _zk.multi(ops);
+  }
+
+  @Override
+  public void addAuthInfo(String scheme, byte[] auth) {
+    _zk.addAuthInfo(scheme, auth);
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/060ef363/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java b/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
index 19cd2e8..f701ae4 100644
--- a/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
+++ b/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
@@ -40,12 +40,12 @@ import java.util.concurrent.TimeUnit;
 import org.I0Itec.zkclient.IZkChildListener;
 import org.I0Itec.zkclient.IZkDataListener;
 import org.I0Itec.zkclient.IZkStateListener;
-import org.I0Itec.zkclient.ZkConnection;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.manager.zk.client.HelixZkClient;
+import org.apache.helix.manager.zk.zookeeper.ZkConnection;
 import org.apache.helix.model.ExternalView;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;

http://git-wip-us.apache.org/repos/asf/helix/blob/060ef363/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
index b0c44e1..700d204 100644
--- a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
@@ -29,7 +29,6 @@ import java.util.Set;
 import java.util.logging.Level;
 
 import org.I0Itec.zkclient.IZkStateListener;
-import org.I0Itec.zkclient.ZkConnection;
 import org.I0Itec.zkclient.ZkServer;
 import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.ConfigAccessor;
@@ -59,6 +58,7 @@ import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.manager.zk.client.DedicatedZkClientFactory;
 import org.apache.helix.manager.zk.client.HelixZkClient;
+import org.apache.helix.manager.zk.zookeeper.ZkConnection;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.ConfigScope;

http://git-wip-us.apache.org/repos/asf/helix/blob/060ef363/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java
index 7b232fe..80b2eab 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java
@@ -31,12 +31,12 @@ import javax.management.MBeanServer;
 import javax.management.ObjectName;
 import org.I0Itec.zkclient.IZkDataListener;
 import org.I0Itec.zkclient.IZkStateListener;
-import org.I0Itec.zkclient.ZkConnection;
 import org.I0Itec.zkclient.ZkServer;
 import org.apache.helix.HelixException;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.manager.zk.zookeeper.ZkConnection;
 import org.apache.helix.monitoring.mbeans.MBeanRegistrar;
 import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
 import org.apache.helix.monitoring.mbeans.ZkClientMonitor;

http://git-wip-us.apache.org/repos/asf/helix/blob/060ef363/helix-core/src/test/java/org/apache/helix/manager/zk/client/TestHelixZkClient.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/client/TestHelixZkClient.java b/helix-core/src/test/java/org/apache/helix/manager/zk/client/TestHelixZkClient.java
index 67e2731..aedea29 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/client/TestHelixZkClient.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/client/TestHelixZkClient.java
@@ -22,10 +22,10 @@ package org.apache.helix.manager.zk.client;
 import java.util.concurrent.TimeUnit;
 
 import org.I0Itec.zkclient.IZkDataListener;
-import org.I0Itec.zkclient.ZkConnection;
 import org.apache.helix.HelixException;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.manager.zk.zookeeper.ZkConnection;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 


[2/3] helix git commit: Add ZkConnection.reconnect to avoid NPE when reset ZkConnection.

Posted by jx...@apache.org.
Add ZkConnection.reconnect to avoid NPE when reset ZkConnection.

In the old version, reconnect was done by closing and then connecting. In between, the zookeeper ref is null. This may cause NPE which terminate ZkClient operation retry earlier than expected.
This change copy the existing ZkConnection and add reconnect. The new method ensures reconnecting without leaving the field empty.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/867aa5f4
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/867aa5f4
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/867aa5f4

Branch: refs/heads/master
Commit: 867aa5f4593ea303a8cd15475550ef86ace61d1d
Parents: 060ef36
Author: Jiajun Wang <jj...@linkedin.com>
Authored: Tue Jan 8 15:27:21 2019 -0800
Committer: jiajunwang <er...@gmail.com>
Committed: Thu Jan 24 14:30:51 2019 -0800

----------------------------------------------------------------------
 .../helix/manager/zk/zookeeper/ZkClient.java    |  5 ++---
 .../manager/zk/zookeeper/ZkConnection.java      | 20 +++++++++++++++++++-
 2 files changed, 21 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/867aa5f4/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
index 5fa2f91..03de880 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
@@ -823,9 +823,8 @@ public class ZkClient implements Watcher {
   private void reconnect() {
     getEventLock().lock();
     try {
-      IZkConnection connection = getConnection();
-      connection.close();
-      connection.connect(this);
+      ZkConnection connection = ((ZkConnection) getConnection());
+      connection.reconnect(this);
     } catch (InterruptedException e) {
       throw new ZkInterruptedException(e);
     } finally {

http://git-wip-us.apache.org/repos/asf/helix/blob/867aa5f4/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkConnection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkConnection.java b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkConnection.java
index b4a78bb..6f1e880 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkConnection.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkConnection.java
@@ -31,7 +31,6 @@ import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
 
 public class ZkConnection implements IZkConnection {
-
   private static final Logger LOG = Logger.getLogger(ZkConnection.class);
 
   /** It is recommended to use quite large sessions timeouts for ZooKeeper. */
@@ -84,6 +83,25 @@ public class ZkConnection implements IZkConnection {
     }
   }
 
+  protected void reconnect(Watcher watcher) throws InterruptedException {
+    _zookeeperLock.lock();
+    try {
+      if (_zk == null) {
+        throw new IllegalStateException("zk client has not been connected or already been closed");
+      }
+      ZooKeeper prevZk = _zk;
+      try {
+        LOG.debug("Creating new ZookKeeper instance to reconnect to " + _servers + ".");
+        _zk = new ZooKeeper(_servers, _sessionTimeOut, watcher);
+        prevZk.close();
+      } catch (IOException e) {
+        throw new ZkException("Unable to connect to " + _servers, e);
+      }
+    } finally {
+      _zookeeperLock.unlock();
+    }
+  }
+
   @Override
   public String create(String path, byte[] data, CreateMode mode) throws KeeperException, InterruptedException {
     return _zk.create(path, data, Ids.OPEN_ACL_UNSAFE, mode);


[3/3] helix git commit: Improve the callback handler behavior regarding batch mode event handling when handler is reset.

Posted by jx...@apache.org.
Improve the callback handler behavior regarding batch mode event handling when handler is reset.

For new session handling, the callback handler should not interrupt the current executing process. This could cause a pending request failed unexpectedly. Note that after the change, closing a callback will still interrupt thread to avoid thread leak.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/8c8f79c5
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/8c8f79c5
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/8c8f79c5

Branch: refs/heads/master
Commit: 8c8f79c5e545818c0e4478addea453fc42b6d003
Parents: 867aa5f
Author: Jiajun Wang <jj...@linkedin.com>
Authored: Fri Jan 11 15:53:17 2019 -0800
Committer: jiajunwang <er...@gmail.com>
Committed: Fri Jan 25 13:04:07 2019 -0800

----------------------------------------------------------------------
 .../helix/common/DedupEventProcessor.java       |  4 ++
 .../helix/manager/zk/CallbackHandler.java       | 55 ++++++++++++++------
 .../apache/helix/manager/zk/ZKHelixManager.java | 14 ++---
 3 files changed, 50 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/8c8f79c5/helix-core/src/main/java/org/apache/helix/common/DedupEventProcessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/common/DedupEventProcessor.java b/helix-core/src/main/java/org/apache/helix/common/DedupEventProcessor.java
index 10e3b00..c26ecd8 100644
--- a/helix-core/src/main/java/org/apache/helix/common/DedupEventProcessor.java
+++ b/helix-core/src/main/java/org/apache/helix/common/DedupEventProcessor.java
@@ -64,6 +64,10 @@ public abstract class DedupEventProcessor<T, E> extends Thread {
     _eventQueue.put(eventType, event);
   }
 
+  public void resetEventQueue() {
+    _eventQueue.clear();
+  }
+
   public void shutdown() {
     this.interrupt();
     _eventQueue.clear();

http://git-wip-us.apache.org/repos/asf/helix/blob/8c8f79c5/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
index 9e9d1a7..713d214 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
@@ -28,6 +28,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
+
 import org.I0Itec.zkclient.IZkChildListener;
 import org.I0Itec.zkclient.IZkDataListener;
 import org.I0Itec.zkclient.exception.ZkNoNodeException;
@@ -37,7 +38,13 @@ import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixProperty;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.NotificationContext.Type;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyPathConfig;
 import org.apache.helix.SystemPropertyKeys;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.api.listeners.BatchMode;
 import org.apache.helix.api.listeners.ClusterConfigChangeListener;
 import org.apache.helix.api.listeners.ConfigChangeListener;
 import org.apache.helix.api.listeners.ControllerChangeListener;
@@ -47,15 +54,9 @@ import org.apache.helix.api.listeners.IdealStateChangeListener;
 import org.apache.helix.api.listeners.InstanceConfigChangeListener;
 import org.apache.helix.api.listeners.LiveInstanceChangeListener;
 import org.apache.helix.api.listeners.MessageListener;
+import org.apache.helix.api.listeners.PreFetch;
 import org.apache.helix.api.listeners.ResourceConfigChangeListener;
 import org.apache.helix.api.listeners.ScopedConfigChangeListener;
-import org.apache.helix.api.listeners.BatchMode;
-import org.apache.helix.api.listeners.PreFetch;
-import org.apache.helix.NotificationContext;
-import org.apache.helix.NotificationContext.Type;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.PropertyPathConfig;
-import org.apache.helix.ZNRecord;
 import org.apache.helix.common.DedupEventProcessor;
 import org.apache.helix.manager.zk.client.HelixZkClient;
 import org.apache.helix.model.ClusterConfig;
@@ -67,9 +68,9 @@ import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.monitoring.mbeans.HelixCallbackMonitor;
+import org.apache.zookeeper.Watcher.Event.EventType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.zookeeper.Watcher.Event.EventType;
 
 import static org.apache.helix.HelixConstants.ChangeType.*;
 
@@ -320,7 +321,13 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener {
             "CallbackHandler is not ready, ignore change callback from path: "
                 + _path + ", for listener: " + _listener);
       } else {
-        _batchCallbackProcessor.queueEvent(changeContext.getType(), changeContext);
+        synchronized (this) {
+          if (_batchCallbackProcessor != null) {
+            _batchCallbackProcessor.queueEvent(changeContext.getType(), changeContext);
+          } else {
+            throw new HelixException("Failed to process callback in batch mode. Batch Callback Processor does not exist.");
+          }
+        }
       }
     } else {
       invoke(changeContext);
@@ -573,11 +580,14 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener {
     logger.info("initializing CallbackHandler: " + this.toString() + " content: " + getContent());
 
     if (_batchModeEnabled) {
-      if (_batchCallbackProcessor != null) {
-        _batchCallbackProcessor.shutdown();
+      synchronized (this) {
+        if (_batchCallbackProcessor != null) {
+          _batchCallbackProcessor.resetEventQueue();
+        } else {
+          _batchCallbackProcessor = new CallbackProcessor(this);
+          _batchCallbackProcessor.start();
+        }
       }
-      _batchCallbackProcessor = new CallbackProcessor(this);
-      _batchCallbackProcessor.start();
     }
 
     updateNotificationTime(System.nanoTime());
@@ -674,12 +684,25 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener {
   /**
    * Invoke the listener for the last time so that the listener could clean up resources
    */
+  @Deprecated
   public void reset() {
-    logger.info("Resetting CallbackHandler: " + this.toString());
+    reset(true);
+  }
+
+  void reset(boolean isShutdown) {
+    logger.info("Resetting CallbackHandler: {}. Is resetting for shutdown: {}.", this.toString(),
+        isShutdown);
     try {
       _ready = false;
-      if (_batchCallbackProcessor != null) {
-        _batchCallbackProcessor.shutdown();
+      synchronized (this) {
+        if (_batchCallbackProcessor != null) {
+          if (isShutdown) {
+            _batchCallbackProcessor.shutdown();
+            _batchCallbackProcessor = null;
+          } else {
+            _batchCallbackProcessor.resetEventQueue();
+          }
+        }
       }
       NotificationContext changeContext = new NotificationContext(_manager);
       changeContext.setType(NotificationContext.Type.FINALIZE);

http://git-wip-us.apache.org/repos/asf/helix/blob/8c8f79c5/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index ba3c16f..aa08c76 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -298,7 +298,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
 
       // handler.reset() may modify the handlers list, so do it outside the iteration
       for (CallbackHandler handler : toRemove) {
-        handler.reset();
+        handler.reset(true);
       }
     }
 
@@ -717,10 +717,10 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
       // TODO reset user defined handlers only
       // TODO Fix the issue that when connection disconnected, reset handlers will be blocked. -- JJ
       // This is because reset logic contains ZK operations.
-      resetHandlers();
+      resetHandlers(true);
 
       if (_leaderElectionHandler != null) {
-        _leaderElectionHandler.reset();
+        _leaderElectionHandler.reset(true);
       }
 
     } finally {
@@ -947,7 +947,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
     }
   }
 
-  void resetHandlers() {
+  void resetHandlers(boolean isShutdown) {
     synchronized (this) {
       if (_handlers != null) {
         // get a copy of the list and iterate over the copy list
@@ -956,7 +956,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
         tmpHandlers.addAll(_handlers);
 
         for (CallbackHandler handler : tmpHandlers) {
-          handler.reset();
+          handler.reset(isShutdown);
           LOG.info("reset handler: " + handler.getPath() + ", " + handler.getListener());
         }
       }
@@ -1054,9 +1054,9 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
      */
     stopTimerTasks();
     if (_leaderElectionHandler != null) {
-      _leaderElectionHandler.reset();
+      _leaderElectionHandler.reset(false);
     }
-    resetHandlers();
+    resetHandlers(false);
 
     /**
      * clean up write-through cache