You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2013/07/12 19:04:18 UTC

git commit: code change for handling zk session expiry

Updated Branches:
  refs/heads/master bf7a6583e -> 6bb6e2c01


code change for handling zk session expiry


Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/6bb6e2c0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/6bb6e2c0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/6bb6e2c0

Branch: refs/heads/master
Commit: 6bb6e2c010055c9df2e1945cea428cfe660d7a24
Parents: bf7a658
Author: zzhang <zz...@uci.edu>
Authored: Fri Jul 12 10:04:14 2013 -0700
Committer: zzhang <zz...@uci.edu>
Committed: Fri Jul 12 10:04:14 2013 -0700

----------------------------------------------------------------------
 .../java/org/apache/helix/HelixTimerTask.java   |   4 +-
 .../healthcheck/HealthStatsAggregationTask.java | 168 +++-------------
 .../healthcheck/HealthStatsAggregator.java      | 165 ++++++++++++++++
 .../ParticipantHealthReportCollectorImpl.java   | 131 +++----------
 .../ParticipantHealthReportTask.java            |  72 +++++++
 .../helix/manager/zk/AbstractManager.java       |   4 +-
 .../helix/manager/zk/ControllerManager.java     |  14 +-
 .../zk/DistributedControllerManager.java        |   6 +-
 .../helix/manager/zk/ParticipantManager.java    |   3 +-
 .../apache/helix/manager/zk/ZKHelixManager.java |  10 +-
 .../handling/HelixStateTransitionHandler.java   | 144 +++++++-------
 .../messaging/handling/HelixTaskExecutor.java   |  57 ++++--
 ...estParticipantHealthReportCollectorImpl.java |  11 +-
 .../helix/healthcheck/TestAddDropAlert.java     |   4 +-
 .../healthcheck/TestAlertActionTriggering.java  |   8 +-
 .../helix/healthcheck/TestAlertFireHistory.java |  20 +-
 .../helix/healthcheck/TestExpandAlert.java      |   2 +-
 .../helix/healthcheck/TestSimpleAlert.java      |   2 +-
 .../healthcheck/TestSimpleWildcardAlert.java    |   4 +-
 .../helix/healthcheck/TestStalenessAlert.java   |   2 +-
 .../helix/healthcheck/TestWildcardAlert.java    |   2 +-
 .../TestSessionExpiryInTransition.java          |   5 +-
 .../manager/MockParticipantManager.java         | 139 ++++++++++++++
 .../manager/TestAbstractManager.java            |  43 +++++
 .../manager/TestParticipantManager.java         | 191 +++++++++++++++++++
 25 files changed, 834 insertions(+), 377 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6bb6e2c0/helix-core/src/main/java/org/apache/helix/HelixTimerTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixTimerTask.java b/helix-core/src/main/java/org/apache/helix/HelixTimerTask.java
index 203bb54..0b7edb9 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixTimerTask.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixTimerTask.java
@@ -19,9 +19,7 @@ package org.apache.helix;
  * under the License.
  */
 
-import java.util.TimerTask;
-
-public abstract class HelixTimerTask extends TimerTask
+public abstract class HelixTimerTask
 {
   /**
    * Timer task starts

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6bb6e2c0/helix-core/src/main/java/org/apache/helix/healthcheck/HealthStatsAggregationTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/healthcheck/HealthStatsAggregationTask.java b/helix-core/src/main/java/org/apache/helix/healthcheck/HealthStatsAggregationTask.java
index 4b5fe92..f0dd134 100644
--- a/helix-core/src/main/java/org/apache/helix/healthcheck/HealthStatsAggregationTask.java
+++ b/helix-core/src/main/java/org/apache/helix/healthcheck/HealthStatsAggregationTask.java
@@ -19,189 +19,83 @@ package org.apache.helix.healthcheck;
  * under the License.
  */
 
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
 import java.util.Random;
 import java.util.Timer;
+import java.util.TimerTask;
 
-import org.apache.helix.ConfigAccessor;
-import org.apache.helix.model.ConfigScope;
-import org.apache.helix.model.builder.ConfigScopeBuilder;
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixManager;
 import org.apache.helix.HelixTimerTask;
-import org.apache.helix.controller.pipeline.Pipeline;
-import org.apache.helix.controller.pipeline.Stage;
-import org.apache.helix.controller.stages.ClusterEvent;
-import org.apache.helix.controller.stages.ReadHealthDataStage;
-import org.apache.helix.controller.stages.StatsAggregationStage;
-import org.apache.helix.monitoring.mbeans.ClusterAlertMBeanCollection;
-import org.apache.helix.monitoring.mbeans.HelixStageLatencyMonitor;
 import org.apache.log4j.Logger;
 
 
 public class HealthStatsAggregationTask extends HelixTimerTask
-{
+{  
   private static final Logger LOG = Logger.getLogger(HealthStatsAggregationTask.class);
-
   public final static int DEFAULT_HEALTH_CHECK_LATENCY = 30 * 1000;
-
+  
+  final HealthStatsAggregator _healthStatsAggregator;
+  
+  class HealthStatsAggregationTaskTimer extends TimerTask {
+
+    @Override
+    public void run() {
+      _healthStatsAggregator.aggregate();
+    }
+    
+  }
+  
   private Timer _timer;
-  private final HelixManager _manager;
-  private final Pipeline _healthStatsAggregationPipeline;
   private final int _delay;
   private final int _period;
-  private final ClusterAlertMBeanCollection _alertItemCollection;
-  private final Map<String, HelixStageLatencyMonitor> _stageLatencyMonitorMap =
-      new HashMap<String, HelixStageLatencyMonitor>();
-
-  public HealthStatsAggregationTask(HelixManager manager, int delay, int period)
+  
+  public HealthStatsAggregationTask(HealthStatsAggregator healthStatsAggregator, int delay, int period)
   {
-    _manager = manager;
+    _healthStatsAggregator = healthStatsAggregator;
+    
     _delay = delay;
     _period = period;
-
-    // health stats pipeline
-    _healthStatsAggregationPipeline = new Pipeline();
-    _healthStatsAggregationPipeline.addStage(new ReadHealthDataStage());
-    StatsAggregationStage statAggregationStage = new StatsAggregationStage();
-    _healthStatsAggregationPipeline.addStage(statAggregationStage);
-    _alertItemCollection = statAggregationStage.getClusterAlertMBeanCollection();
-
-    registerStageLatencyMonitor(_healthStatsAggregationPipeline);
-  }
-
-  public HealthStatsAggregationTask(HelixManager manager)
-  {
-    this(manager, DEFAULT_HEALTH_CHECK_LATENCY, DEFAULT_HEALTH_CHECK_LATENCY);
   }
 
-  private void registerStageLatencyMonitor(Pipeline pipeline)
+  public HealthStatsAggregationTask(HealthStatsAggregator healthStatsAggregator)
   {
-    for (Stage stage : pipeline.getStages())
-    {
-      String stgName = stage.getStageName();
-      if (!_stageLatencyMonitorMap.containsKey(stgName))
-      {
-        try
-        {
-          _stageLatencyMonitorMap.put(stage.getStageName(),
-                                      new HelixStageLatencyMonitor(_manager.getClusterName(),
-                                                                   stgName));
-        }
-        catch (Exception e)
-        {
-          LOG.error("Couldn't create StageLatencyMonitor mbean for stage: " + stgName, e);
-        }
-      }
-      else
-      {
-        LOG.error("StageLatencyMonitor for stage: " + stgName
-            + " already exists. Skip register it");
-      }
-    }
+    this(healthStatsAggregator, 
+        DEFAULT_HEALTH_CHECK_LATENCY, DEFAULT_HEALTH_CHECK_LATENCY);
   }
 
   @Override
   public void start()
   {
-    LOG.info("START HealthAggregationTask");
 
     if (_timer == null)
     {
+      LOG.info("START HealthStatsAggregationTimerTask");
+
       // Remove all the previous health check values, if any
-      HelixDataAccessor accessor = _manager.getHelixDataAccessor();
-      List<String> existingHealthRecordNames = accessor.getChildNames(accessor.keyBuilder().healthReports(_manager.getInstanceName()));
-      for(String healthReportName : existingHealthRecordNames)
-      {
-        LOG.info("Removing old healthrecord " + healthReportName);
-        accessor.removeProperty(accessor.keyBuilder().healthReport(_manager.getInstanceName(),healthReportName));
-      }
+      _healthStatsAggregator.init();
       
-      _timer = new Timer(true);
-      _timer.scheduleAtFixedRate(this, new Random().nextInt(_delay), _period);
+      _timer = new Timer("HealthStatsAggregationTimerTask", true);
+      _timer.scheduleAtFixedRate(new HealthStatsAggregationTaskTimer(), 
+          new Random().nextInt(_delay), _period);
     }
     else
     {
-      LOG.warn("timer already started");
+      LOG.warn("HealthStatsAggregationTimerTask already started");
     }
   }
 
   @Override
   public synchronized void stop()
   {
-    LOG.info("Stop HealthAggregationTask");
-
     if (_timer != null)
     {
+      LOG.info("Stop HealthStatsAggregationTimerTask");
       _timer.cancel();
+      _healthStatsAggregator.reset();
       _timer = null;
-      _alertItemCollection.reset();
-
-      for (HelixStageLatencyMonitor stgLatencyMonitor : _stageLatencyMonitorMap.values())
-      {
-        stgLatencyMonitor.reset();
-      }
-    }
-    else
-    {
-      LOG.warn("timer already stopped");
-    }
-  }
-
-  @Override
-  public synchronized void run()
-  {
-    if (!isEnabled())
-    {
-      LOG.info("HealthAggregationTask is disabled.");
-      return;
-    }
-    
-    if (!_manager.isLeader())
-    {
-      LOG.error("Cluster manager: " + _manager.getInstanceName()
-          + " is not leader. Pipeline will not be invoked");
-      return;
-    }
-
-    try
-    {
-      ClusterEvent event = new ClusterEvent("healthChange");
-      event.addAttribute("helixmanager", _manager);
-      event.addAttribute("HelixStageLatencyMonitorMap", _stageLatencyMonitorMap);
-
-      _healthStatsAggregationPipeline.handle(event);
-      _healthStatsAggregationPipeline.finish();
-    }
-    catch (Exception e)
-    {
-      LOG.error("Exception while executing pipeline: " + _healthStatsAggregationPipeline,
-                e);
-    }
-  }
-
-  private boolean isEnabled()
-  {
-    ConfigAccessor configAccessor = _manager.getConfigAccessor();
-    boolean enabled = true;
-    if (configAccessor != null)
-    {
-      // zk-based cluster manager
-      ConfigScope scope =
-          new ConfigScopeBuilder().forCluster(_manager.getClusterName()).build();
-      String isEnabled = configAccessor.get(scope, "healthChange.enabled");
-      if (isEnabled != null)
-      {
-        enabled = new Boolean(isEnabled);
-      }
     }
     else
     {
-      LOG.debug("File-based cluster manager doesn't support disable healthChange");
+      LOG.warn("HealthStatsAggregationTimerTask already stopped");
     }
-    return enabled;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6bb6e2c0/helix-core/src/main/java/org/apache/helix/healthcheck/HealthStatsAggregator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/healthcheck/HealthStatsAggregator.java b/helix-core/src/main/java/org/apache/helix/healthcheck/HealthStatsAggregator.java
new file mode 100644
index 0000000..c60e3bb
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/healthcheck/HealthStatsAggregator.java
@@ -0,0 +1,165 @@
+package org.apache.helix.healthcheck;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.controller.pipeline.Pipeline;
+import org.apache.helix.controller.pipeline.Stage;
+import org.apache.helix.controller.stages.ClusterEvent;
+import org.apache.helix.controller.stages.ReadHealthDataStage;
+import org.apache.helix.controller.stages.StatsAggregationStage;
+import org.apache.helix.model.ConfigScope;
+import org.apache.helix.model.builder.ConfigScopeBuilder;
+import org.apache.helix.monitoring.mbeans.ClusterAlertMBeanCollection;
+import org.apache.helix.monitoring.mbeans.HelixStageLatencyMonitor;
+import org.apache.log4j.Logger;
+
+public class HealthStatsAggregator {
+  private static final Logger LOG = Logger.getLogger(HealthStatsAggregator.class);
+
+  public final static int DEFAULT_HEALTH_CHECK_LATENCY = 30 * 1000;
+
+  private final HelixManager _manager;
+  private final Pipeline _healthStatsAggregationPipeline;
+  private final ClusterAlertMBeanCollection _alertItemCollection;
+  private final Map<String, HelixStageLatencyMonitor> _stageLatencyMonitorMap =
+      new HashMap<String, HelixStageLatencyMonitor>();
+
+  public HealthStatsAggregator(HelixManager manager)
+  {
+    _manager = manager;
+
+    // health stats pipeline
+    _healthStatsAggregationPipeline = new Pipeline();
+    _healthStatsAggregationPipeline.addStage(new ReadHealthDataStage());
+    StatsAggregationStage statAggregationStage = new StatsAggregationStage();
+    _healthStatsAggregationPipeline.addStage(statAggregationStage);
+    _alertItemCollection = statAggregationStage.getClusterAlertMBeanCollection();
+
+    registerStageLatencyMonitor(_healthStatsAggregationPipeline);
+  }
+
+  private void registerStageLatencyMonitor(Pipeline pipeline)
+  {
+    for (Stage stage : pipeline.getStages())
+    {
+      String stgName = stage.getStageName();
+      if (!_stageLatencyMonitorMap.containsKey(stgName))
+      {
+        try
+        {
+          _stageLatencyMonitorMap.put(stage.getStageName(),
+                                      new HelixStageLatencyMonitor(_manager.getClusterName(),
+                                                                   stgName));
+        }
+        catch (Exception e)
+        {
+          LOG.error("Couldn't create StageLatencyMonitor mbean for stage: " + stgName, e);
+        }
+      }
+      else
+      {
+        LOG.error("StageLatencyMonitor for stage: " + stgName
+            + " already exists. Skip register it");
+      }
+    }
+  }
+
+  public synchronized void aggregate()
+  {
+    if (!isEnabled())
+    {
+      LOG.info("HealthAggregationTask is disabled.");
+      return;
+    }
+    
+    if (!_manager.isLeader())
+    {
+      LOG.error("Cluster manager: " + _manager.getInstanceName()
+          + " is not leader. Pipeline will not be invoked");
+      return;
+    }
+
+    try
+    {
+      ClusterEvent event = new ClusterEvent("healthChange");
+      event.addAttribute("helixmanager", _manager);
+      event.addAttribute("HelixStageLatencyMonitorMap", _stageLatencyMonitorMap);
+
+      _healthStatsAggregationPipeline.handle(event);
+      _healthStatsAggregationPipeline.finish();
+    }
+    catch (Exception e)
+    {
+      LOG.error("Exception while executing pipeline: " + _healthStatsAggregationPipeline,
+                e);
+    }
+  }
+
+  private boolean isEnabled()
+  {
+    ConfigAccessor configAccessor = _manager.getConfigAccessor();
+    boolean enabled = true;
+    if (configAccessor != null)
+    {
+      // zk-based cluster manager
+      ConfigScope scope =
+          new ConfigScopeBuilder().forCluster(_manager.getClusterName()).build();
+      String isEnabled = configAccessor.get(scope, "healthChange.enabled");
+      if (isEnabled != null)
+      {
+        enabled = new Boolean(isEnabled);
+      }
+    }
+    else
+    {
+      LOG.debug("File-based cluster manager doesn't support disable healthChange");
+    }
+    return enabled;
+  }
+  
+  public void init() {
+    // Remove all the previous health check values, if any
+    HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+    List<String> existingHealthRecordNames = accessor.getChildNames(accessor.keyBuilder().healthReports(_manager.getInstanceName()));
+    for(String healthReportName : existingHealthRecordNames)
+    {
+      LOG.info("Removing old healthrecord " + healthReportName);
+      accessor.removeProperty(accessor.keyBuilder().healthReport(_manager.getInstanceName(),healthReportName));
+    }
+
+  }
+  
+  public void reset() {
+    _alertItemCollection.reset();
+
+    for (HelixStageLatencyMonitor stgLatencyMonitor : _stageLatencyMonitorMap.values())
+    {
+      stgLatencyMonitor.reset();
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6bb6e2c0/helix-core/src/main/java/org/apache/helix/healthcheck/ParticipantHealthReportCollectorImpl.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/healthcheck/ParticipantHealthReportCollectorImpl.java b/helix-core/src/main/java/org/apache/helix/healthcheck/ParticipantHealthReportCollectorImpl.java
index 14c12de..1d33260 100644
--- a/helix-core/src/main/java/org/apache/helix/healthcheck/ParticipantHealthReportCollectorImpl.java
+++ b/helix-core/src/main/java/org/apache/helix/healthcheck/ParticipantHealthReportCollectorImpl.java
@@ -21,170 +21,95 @@ package org.apache.helix.healthcheck;
 
 import java.util.LinkedList;
 import java.util.Map;
-import java.util.Random;
-import java.util.Timer;
-import java.util.TimerTask;
 
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
-import org.apache.helix.HelixTimerTask;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.alerts.StatsHolder;
 import org.apache.helix.model.HealthStat;
 import org.apache.log4j.Logger;
 
-
-public class ParticipantHealthReportCollectorImpl extends HelixTimerTask implements
-    ParticipantHealthReportCollector
-{
+public class ParticipantHealthReportCollectorImpl implements
+    ParticipantHealthReportCollector {
   private final LinkedList<HealthReportProvider> _healthReportProviderList = new LinkedList<HealthReportProvider>();
-  private Timer _timer;
   private static final Logger _logger = Logger
       .getLogger(ParticipantHealthReportCollectorImpl.class);
   private final HelixManager _helixManager;
   String _instanceName;
-  public final static int DEFAULT_REPORT_LATENCY = 60 * 1000;
 
-  public ParticipantHealthReportCollectorImpl(HelixManager helixManager,
-      String instanceName)
-  {
+  public ParticipantHealthReportCollectorImpl(HelixManager helixManager, String instanceName) {
     _helixManager = helixManager;
     _instanceName = instanceName;
     addDefaultHealthCheckInfoProvider();
   }
 
-  private void addDefaultHealthCheckInfoProvider()
-  {
+  private void addDefaultHealthCheckInfoProvider() {
     addHealthReportProvider(new DefaultHealthReportProvider());
   }
 
   @Override
-  public void start()
-  {
-    if (_timer == null)
-    {
-      _timer = new Timer(true);
-      _timer.scheduleAtFixedRate(this,
-          new Random().nextInt(DEFAULT_REPORT_LATENCY), DEFAULT_REPORT_LATENCY);
-    }
-    else
-    {
-      _logger.warn("timer already started");
-    }
-  }
-
-  @Override
-  public void addHealthReportProvider(HealthReportProvider provider)
-  {
-    try
-    {
-      synchronized (_healthReportProviderList)
-      {
-        if (!_healthReportProviderList.contains(provider))
-        {
+  public void addHealthReportProvider(HealthReportProvider provider) {
+    try {
+      synchronized (_healthReportProviderList) {
+        if (!_healthReportProviderList.contains(provider)) {
           _healthReportProviderList.add(provider);
-        }
-        else
-        {
+        } else {
           _logger.warn("Skipping a duplicated HealthCheckInfoProvider");
         }
       }
-    }
-    catch (Exception e)
-    {
+    } catch (Exception e) {
       _logger.error(e);
     }
   }
 
   @Override
-  public void removeHealthReportProvider(HealthReportProvider provider)
-  {
-    synchronized (_healthReportProviderList)
-    {
-      if (_healthReportProviderList.contains(provider))
-      {
+  public void removeHealthReportProvider(HealthReportProvider provider) {
+    synchronized (_healthReportProviderList) {
+      if (_healthReportProviderList.contains(provider)) {
         _healthReportProviderList.remove(provider);
-      }
-      else
-      {
+      } else {
         _logger.warn("Skip removing a non-exist HealthCheckInfoProvider");
       }
     }
   }
 
   @Override
-  public void reportHealthReportMessage(ZNRecord healthCheckInfoUpdate)
-  {
+  public void reportHealthReportMessage(ZNRecord healthCheckInfoUpdate) {
     HelixDataAccessor accessor = _helixManager.getHelixDataAccessor();
     Builder keyBuilder = accessor.keyBuilder();
-//    accessor.setProperty(
-//        PropertyType.HEALTHREPORT, healthCheckInfoUpdate, _instanceName,
-//        healthCheckInfoUpdate.getId());
-    accessor.setProperty(keyBuilder.healthReport(_instanceName, healthCheckInfoUpdate.getId()), 
-                         new HealthStat(healthCheckInfoUpdate));
+    accessor.setProperty(keyBuilder.healthReport(_instanceName, healthCheckInfoUpdate.getId()),
+        new HealthStat(healthCheckInfoUpdate));
 
   }
 
   @Override
-  public void stop()
-  {
-    _logger.info("Stop HealthCheckInfoReportingTask");
-    if (_timer != null)
-    {
-      _timer.cancel();
-      _timer = null;
-    }
-    else
-    {
-      _logger.warn("timer already stopped");
-    }
-  }
-
-  @Override
-  public synchronized void transmitHealthReports()
-  {
-    synchronized (_healthReportProviderList)
-    {
-      for (HealthReportProvider provider : _healthReportProviderList)
-      {
-        try
-        {
+  public synchronized void transmitHealthReports() {
+    synchronized (_healthReportProviderList) {
+      for (HealthReportProvider provider : _healthReportProviderList) {
+        try {
           Map<String, String> report = provider.getRecentHealthReport();
           Map<String, Map<String, String>> partitionReport = provider
               .getRecentPartitionHealthReport();
           ZNRecord record = new ZNRecord(provider.getReportName());
-          if (report != null)
-          {
+          if (report != null) {
             record.setSimpleFields(report);
           }
-          if (partitionReport != null)
-          {
+          if (partitionReport != null) {
             record.setMapFields(partitionReport);
           }
           record.setSimpleField(StatsHolder.TIMESTAMP_NAME, "" + System.currentTimeMillis());
-          
+
           HelixDataAccessor accessor = _helixManager.getHelixDataAccessor();
           Builder keyBuilder = accessor.keyBuilder();
-          accessor.setProperty(keyBuilder.healthReport(_instanceName, record.getId()), 
-                               new HealthStat(record));
+          accessor.setProperty(keyBuilder.healthReport(_instanceName, record.getId()),
+              new HealthStat(record));
 
-//          _helixManager.getDataAccessor().setProperty(
-//              PropertyType.HEALTHREPORT, record, _instanceName, record.getId());
-          // reset stats (for now just the partition stats)
           provider.resetStats();
-        }
-        catch (Exception e)
-        {
-          _logger.error("", e);
+        } catch (Exception e) {
+          _logger.error("fail to transmit health report", e);
         }
       }
     }
   }
-
-  @Override
-  public void run()
-  {
-    transmitHealthReports();
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6bb6e2c0/helix-core/src/main/java/org/apache/helix/healthcheck/ParticipantHealthReportTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/healthcheck/ParticipantHealthReportTask.java b/helix-core/src/main/java/org/apache/helix/healthcheck/ParticipantHealthReportTask.java
new file mode 100644
index 0000000..4e7cbe4
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/healthcheck/ParticipantHealthReportTask.java
@@ -0,0 +1,72 @@
+package org.apache.helix.healthcheck;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Random;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.helix.HelixTimerTask;
+import org.apache.log4j.Logger;
+
+public class ParticipantHealthReportTask extends HelixTimerTask {
+  private static final Logger LOG = Logger.getLogger(ParticipantHealthReportTask.class);
+  public final static int DEFAULT_REPORT_LATENCY = 60 * 1000;
+
+  Timer _timer;
+  final ParticipantHealthReportCollectorImpl _healthReportCollector;
+
+  class ParticipantHealthReportTimerTask extends TimerTask {
+
+    @Override
+    public void run() {
+      _healthReportCollector.transmitHealthReports();      
+    }
+  }
+
+  public ParticipantHealthReportTask(ParticipantHealthReportCollectorImpl healthReportCollector) {
+    _healthReportCollector = healthReportCollector;
+  }
+
+  @Override
+  public void start() {
+    if (_timer == null) {
+      LOG.info("Start HealthCheckInfoReportingTask");
+      _timer = new Timer("ParticipantHealthReportTimerTask", true);
+      _timer.scheduleAtFixedRate(new ParticipantHealthReportTimerTask(),
+          new Random().nextInt(DEFAULT_REPORT_LATENCY), DEFAULT_REPORT_LATENCY);
+    } else {
+      LOG.warn("ParticipantHealthReportTimerTask already started");
+    }
+  }
+
+  @Override
+  public void stop() {
+    if (_timer != null) {
+      LOG.info("Stop ParticipantHealthReportTimerTask");
+      _timer.cancel();
+      _timer = null;
+    } else {
+      LOG.warn("ParticipantHealthReportTimerTask already stopped");
+    }
+  }
+
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6bb6e2c0/helix-core/src/main/java/org/apache/helix/manager/zk/AbstractManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/AbstractManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/AbstractManager.java
index 165f639..a68e1de 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/AbstractManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/AbstractManager.java
@@ -75,7 +75,7 @@ public abstract class AbstractManager implements HelixManager, IZkStateListener
   final InstanceType _instanceType;
   final int _sessionTimeout;
   final List<PreConnectCallback> _preConnectCallbacks;
-  final List<CallbackHandler> _handlers;
+  protected final List<CallbackHandler> _handlers;
   final HelixManagerProperties _properties;
   
   /**
@@ -83,7 +83,7 @@ public abstract class AbstractManager implements HelixManager, IZkStateListener
    */
   final String _version;
 
-  ZkClient _zkclient = null;
+  protected ZkClient _zkclient = null;
   final DefaultMessagingService _messagingService;
 
   BaseDataAccessor<ZNRecord> _baseDataAccessor;

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6bb6e2c0/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManager.java
index 43fa149..4b027a3 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManager.java
@@ -39,6 +39,7 @@ import org.apache.helix.HelixConstants.ChangeType;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.controller.GenericHelixController;
 import org.apache.helix.healthcheck.HealthStatsAggregationTask;
+import org.apache.helix.healthcheck.HealthStatsAggregator;
 import org.apache.helix.messaging.handling.MessageHandlerFactory;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.monitoring.ZKPathDataDumpTask;
@@ -78,7 +79,8 @@ public class ControllerManager extends AbstractManager {
 
       if (_timer == null)
       {
-        _timer = new Timer(true);
+        LOG.info("Start StatusDumpTask");
+        _timer = new Timer("StatusDumpTimerTask", true);
         _timer.scheduleAtFixedRate(new ZKPathDataDumpTask(helixController,
                                                           zkclient,
                                                           timeThresholdNoChange),
@@ -92,23 +94,17 @@ public class ControllerManager extends AbstractManager {
     public void stop() {
       if (_timer != null)
       {
+        LOG.info("Stop StatusDumpTask");
         _timer.cancel();
         _timer = null;
       }      
     }
-
-    @Override
-    public void run() {
-      // TODO Auto-generated method stub
-      
-    }
-    
   }
   
   public ControllerManager(String zkAddress, String clusterName, String instanceName) {
     super(zkAddress, clusterName, instanceName, InstanceType.CONTROLLER);
     
-    _timerTasks.add(new HealthStatsAggregationTask(this));
+    _timerTasks.add(new HealthStatsAggregationTask(new HealthStatsAggregator(this)));
     _timerTasks.add(new StatusDumpTask(_zkclient, this));
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6bb6e2c0/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedControllerManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedControllerManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedControllerManager.java
index 92b4f66..e9f07d7 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedControllerManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedControllerManager.java
@@ -29,8 +29,10 @@ import org.apache.helix.PreConnectCallback;
 import org.apache.helix.HelixConstants.ChangeType;
 import org.apache.helix.controller.GenericHelixController;
 import org.apache.helix.healthcheck.HealthStatsAggregationTask;
+import org.apache.helix.healthcheck.HealthStatsAggregator;
 import org.apache.helix.healthcheck.ParticipantHealthReportCollector;
 import org.apache.helix.healthcheck.ParticipantHealthReportCollectorImpl;
+import org.apache.helix.healthcheck.ParticipantHealthReportTask;
 import org.apache.helix.manager.zk.ControllerManager.StatusDumpTask;
 import org.apache.helix.messaging.handling.MessageHandlerFactory;
 import org.apache.helix.model.LiveInstance;
@@ -63,9 +65,9 @@ public class DistributedControllerManager extends AbstractManager {
     _stateMachineEngine = new HelixStateMachineEngine(this);
     _participantHealthInfoCollector = new ParticipantHealthReportCollectorImpl(this, _instanceName);
 
-    _timerTasks.add(_participantHealthInfoCollector);
+    _timerTasks.add(new ParticipantHealthReportTask(_participantHealthInfoCollector));
     
-    _controllerTimerTasks.add(new HealthStatsAggregationTask(this));
+    _controllerTimerTasks.add(new HealthStatsAggregationTask(new HealthStatsAggregator(this)));
     _controllerTimerTasks.add(new ControllerManager.StatusDumpTask(_zkclient, this));
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6bb6e2c0/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
index fc54f08..143b02e 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
@@ -56,6 +56,7 @@ import org.apache.helix.HelixConstants.ChangeType;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.healthcheck.ParticipantHealthReportCollector;
 import org.apache.helix.healthcheck.ParticipantHealthReportCollectorImpl;
+import org.apache.helix.healthcheck.ParticipantHealthReportTask;
 import org.apache.helix.messaging.handling.MessageHandlerFactory;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.HelixConfigScope;
@@ -92,7 +93,7 @@ public class ParticipantManager extends AbstractManager {
     _stateMachineEngine = new HelixStateMachineEngine(this);
     _participantHealthInfoCollector = new ParticipantHealthReportCollectorImpl(this, _instanceName);
     
-    _timerTasks.add(_participantHealthInfoCollector);
+    _timerTasks.add(new ParticipantHealthReportTask(_participantHealthInfoCollector));
   }
   
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6bb6e2c0/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index eec38aa..21a5bfb 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -61,8 +61,10 @@ import org.apache.helix.ScopedConfigChangeListener;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.controller.restlet.ZKPropertyTransferServer;
 import org.apache.helix.healthcheck.HealthStatsAggregationTask;
+import org.apache.helix.healthcheck.HealthStatsAggregator;
 import org.apache.helix.healthcheck.ParticipantHealthReportCollector;
 import org.apache.helix.healthcheck.ParticipantHealthReportCollectorImpl;
+import org.apache.helix.healthcheck.ParticipantHealthReportTask;
 import org.apache.helix.messaging.DefaultMessagingService;
 import org.apache.helix.messaging.handling.MessageHandlerFactory;
 import org.apache.helix.model.ConfigScope;
@@ -106,6 +108,7 @@ public class ZKHelixManager implements HelixManager
   private Timer                                _timer;
   private CallbackHandler                      _leaderElectionHandler;
   private ParticipantHealthReportCollectorImpl _participantHealthCheckInfoCollector;
+  private ParticipantHealthReportTask _participantHealthReportTask;
   private final DefaultMessagingService        _messagingService;
   private ZKHelixAdmin                         _managementTool;
   private final String                         _version;
@@ -213,7 +216,7 @@ public class ZKHelixManager implements HelixManager
     _controllerTimerTasks = new ArrayList<HelixTimerTask>();
     if (_instanceType == InstanceType.CONTROLLER)
     {
-      _controllerTimerTasks.add(new HealthStatsAggregationTask(this));
+      _controllerTimerTasks.add(new HealthStatsAggregationTask(new HealthStatsAggregator(this)));
     }
   }
 
@@ -460,7 +463,7 @@ public class ZKHelixManager implements HelixManager
 
     if (_participantHealthCheckInfoCollector != null)
     {
-      _participantHealthCheckInfoCollector.stop();
+      _participantHealthReportTask.stop();
     }
 
     if (_timer != null)
@@ -834,7 +837,8 @@ public class ZKHelixManager implements HelixManager
     {
       _participantHealthCheckInfoCollector =
           new ParticipantHealthReportCollectorImpl(this, _instanceName);
-      _participantHealthCheckInfoCollector.start();
+      _participantHealthReportTask = new ParticipantHealthReportTask(_participantHealthCheckInfoCollector);
+      _participantHealthReportTask.start();
     }
     // start the participant health check timer, also create zk path for health
     // check info

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6bb6e2c0/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
index fc09070..2baa1cf 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
@@ -125,10 +125,8 @@ public class HelixStateTransitionHandler extends MessageHandler
 
   void postHandleMessage()
   {
-	// Message message = _message;
-	// HelixManager manager = _notificationContext.getManager();
-	HelixTaskResult taskResult = (HelixTaskResult) _notificationContext.get(MapKey.HELIX_TASK_RESULT.toString());
-	Exception exception = taskResult.getException();
+  	HelixTaskResult taskResult = (HelixTaskResult) _notificationContext.get(MapKey.HELIX_TASK_RESULT.toString());
+  	Exception exception = taskResult.getException();
 		
     String partitionKey = _message.getPartitionName();
     String resource = _message.getResourceName();
@@ -141,90 +139,88 @@ public class HelixStateTransitionHandler extends MessageHandler
     int bucketSize = _message.getBucketSize();
     ZNRecordBucketizer bucketizer = new ZNRecordBucketizer(bucketSize);
 
-    // Lock the helix manager so that the session id will not change when we update
-    // the state model state. for zk current state it is OK as we have the per-session
-    // current state node
-    synchronized (_manager)
+    // No need to sync on manager, we are cancel executor in expiry session before start executor in new session
+    // sessionId might change when we update the state model state. 
+    // for zk current state it is OK as we have the per-session current state node
+    if (!_message.getTgtSessionId().equals(_manager.getSessionId()))
     {
-      if (!_message.getTgtSessionId().equals(_manager.getSessionId()))
-      {
-        logger.warn("Session id has changed. Skip postExecutionMessage. Old session "
-            + _message.getExecutionSessionId() + " , new session : "
-            + _manager.getSessionId());
-        return;
-      }
+      logger.warn("Session id has changed. Skip postExecutionMessage. Old session "
+          + _message.getExecutionSessionId() + " , new session : "
+          + _manager.getSessionId());
+      return;
+    }
 
-      if (taskResult.isSuccess())
-      {
-        // String fromState = message.getFromState();
-        String toState = _message.getToState();
-        _currentStateDelta.setState(partitionKey, toState);
+    if (taskResult.isSuccess())
+    {
+      // String fromState = message.getFromState();
+      String toState = _message.getToState();
+      _currentStateDelta.setState(partitionKey, toState);
 
-        if (toState.equalsIgnoreCase(HelixDefinedState.DROPPED.toString()))
-        {
-          // for "OnOfflineToDROPPED" message, we need to remove the resource key record
-          // from the current state of the instance because the resource key is dropped.
-          // In the state model it will be stayed as "OFFLINE", which is OK.
-          ZNRecordDelta delta =
-              new ZNRecordDelta(_currentStateDelta.getRecord(), MergeOperation.SUBTRACT);
-          // Don't subtract simple fields since they contain stateModelDefRef
-          delta._record.getSimpleFields().clear();
+      if (toState.equalsIgnoreCase(HelixDefinedState.DROPPED.toString()))
+      {
+        // for "OnOfflineToDROPPED" message, we need to remove the resource key record
+        // from the current state of the instance because the resource key is dropped.
+        // In the state model it will be stayed as "OFFLINE", which is OK.
+        ZNRecordDelta delta =
+            new ZNRecordDelta(_currentStateDelta.getRecord(), MergeOperation.SUBTRACT);
+        // Don't subtract simple fields since they contain stateModelDefRef
+        delta._record.getSimpleFields().clear();
 
-          List<ZNRecordDelta> deltaList = new ArrayList<ZNRecordDelta>();
-          deltaList.add(delta);
-          _currentStateDelta.setDeltaList(deltaList);
-        }
-        else
-        {
-          // if the partition is not to be dropped, update _stateModel to the TO_STATE
-          _stateModel.updateState(toState);
-        }
+        List<ZNRecordDelta> deltaList = new ArrayList<ZNRecordDelta>();
+        deltaList.add(delta);
+        _currentStateDelta.setDeltaList(deltaList);
       }
       else
       {
-        if (exception instanceof HelixStateMismatchException)
-        {
-          // if fromState mismatch, set current state on zk to stateModel's current state
-          logger.warn("Force CurrentState on Zk to be stateModel's CurrentState. partitionKey: "
-              + partitionKey
-              + ", currentState: "
-              + _stateModel.getCurrentState()
-              + ", message: " + _message);
-          _currentStateDelta.setState(partitionKey, _stateModel.getCurrentState());
-        }
-        else
+        // if the partition is not to be dropped, update _stateModel to the TO_STATE
+        _stateModel.updateState(toState);
+      }
+    }
+    else
+    {
+      if (exception instanceof HelixStateMismatchException)
+      {
+        // if fromState mismatch, set current state on zk to stateModel's current state
+        logger.warn("Force CurrentState on Zk to be stateModel's CurrentState. partitionKey: "
+            + partitionKey
+            + ", currentState: "
+            + _stateModel.getCurrentState()
+            + ", message: " + _message);
+        _currentStateDelta.setState(partitionKey, _stateModel.getCurrentState());
+      }
+      else
+      {
+        StateTransitionError error =
+            new StateTransitionError(ErrorType.INTERNAL, ErrorCode.ERROR, exception);
+        if (exception instanceof InterruptedException)
         {
-          StateTransitionError error =
-              new StateTransitionError(ErrorType.INTERNAL, ErrorCode.ERROR, exception);
-          if (exception instanceof InterruptedException)
+          if (_isTimeout)
           {
-            if (_isTimeout)
-            {
-              error =
-                  new StateTransitionError(ErrorType.INTERNAL,
-                                           ErrorCode.TIMEOUT,
-                                           exception);
-            }
-            else
-            {
-              // State transition interrupted but not caused by timeout. Keep the current
-              // state in this case
-              logger.error("State transition interrupted but not timeout. Not updating state. Partition : "
-                  + _message.getPartitionName() + " MsgId : " + _message.getMsgId());
-              return;
-            }
+            error =
+                new StateTransitionError(ErrorType.INTERNAL,
+                                         ErrorCode.TIMEOUT,
+                                         exception);
           }
-          _stateModel.rollbackOnError(_message, _notificationContext, error);
-          _currentStateDelta.setState(partitionKey, HelixDefinedState.ERROR.toString());
-          _stateModel.updateState(HelixDefinedState.ERROR.toString());
-          
-          // if we have errors transit from ERROR state, disable the partition
-          if (_message.getFromState().equalsIgnoreCase(HelixDefinedState.ERROR.toString())) {
-            disablePartition();
+          else
+          {
+            // State transition interrupted but not caused by timeout. Keep the current
+            // state in this case
+            logger.error("State transition interrupted but not timeout. Not updating state. Partition : "
+                + _message.getPartitionName() + " MsgId : " + _message.getMsgId());
+            return;
           }
         }
+        _stateModel.rollbackOnError(_message, _notificationContext, error);
+        _currentStateDelta.setState(partitionKey, HelixDefinedState.ERROR.toString());
+        _stateModel.updateState(HelixDefinedState.ERROR.toString());
+        
+        // if we have errors transit from ERROR state, disable the partition
+        if (_message.getFromState().equalsIgnoreCase(HelixDefinedState.ERROR.toString())) {
+          disablePartition();
+        }
       }
     }
+    
     try
     {
       // Update the ZK current state of the node

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6bb6e2c0/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
index cb07494..cd74505 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -419,6 +419,48 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor
     accessor.setChildren(readMsgKeys, readMsgs);
   }
 
+  /**
+   * remove message-handler factory from map, shutdown the associated executor
+   * 
+   * @param type
+   */
+  void unregisterMessageHandlerFactory(String type) {
+    // shutdown executor-service. disconnect if fail
+    ExecutorService executorSvc = _executorMap.remove(type);
+    if (executorSvc != null) {
+      List<Runnable> tasksLeft = executorSvc.shutdownNow();
+      LOG.info(tasksLeft.size() + " tasks never executed for msgType: "
+          + type + ". tasks: " + tasksLeft);
+      try {
+        if (!executorSvc.awaitTermination(200, TimeUnit.MILLISECONDS)) {
+          LOG.error("executor-service for msgType: " + type 
+              + " is not fully terminated in 200ms. will disconnect helix-participant");
+          throw new HelixException("fail to unregister msg-handler for msgType: " + type);
+        }
+      } catch (InterruptedException e) {
+        LOG.error("interruped when waiting for executor-service shutdown for msgType: " + type, e);
+      }
+    }
+    
+    // reset state-model
+    MessageHandlerFactory handlerFty = _handlerFactoryMap.remove(type);
+    if (handlerFty != null) {
+      handlerFty.reset();
+    }
+  }
+  
+  void reset() {
+    LOG.info("Get FINALIZE notification");
+    for (String msgType : _executorMap.keySet())
+    {
+      unregisterMessageHandlerFactory(msgType);
+    }
+    
+    // clear task-map, all tasks should be terminated by now
+    _taskMap.clear();
+            
+  }
+  
   @Override
   public void onMessage(String instanceName,
                         List<Message> messages,
@@ -429,20 +471,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor
     // TODO: see if we should have a separate notification call for resetting
     if (changeContext.getType() == Type.FINALIZE)
     {
-      LOG.info("Get FINALIZE notification");
-      for (MessageHandlerFactory factory : _handlerFactoryMap.values())
-      {
-        factory.reset();
-      }
-      // Cancel all scheduled tasks
-      synchronized (_lock)
-      {
-          for (MessageTaskInfo info : _taskMap.values())
-          {
-            cancelTask(info._task);
-          }
-        _taskMap.clear();
-      }
+      reset();
       return;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6bb6e2c0/helix-core/src/test/java/org/apache/helix/TestParticipantHealthReportCollectorImpl.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestParticipantHealthReportCollectorImpl.java b/helix-core/src/test/java/org/apache/helix/TestParticipantHealthReportCollectorImpl.java
index 6187b93..4428fa2 100644
--- a/helix-core/src/test/java/org/apache/helix/TestParticipantHealthReportCollectorImpl.java
+++ b/helix-core/src/test/java/org/apache/helix/TestParticipantHealthReportCollectorImpl.java
@@ -22,12 +22,14 @@ package org.apache.helix;
 import org.apache.helix.Mocks.MockHealthReportProvider;
 import org.apache.helix.Mocks.MockManager;
 import org.apache.helix.healthcheck.ParticipantHealthReportCollectorImpl;
+import org.apache.helix.healthcheck.ParticipantHealthReportTask;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 public class TestParticipantHealthReportCollectorImpl {
 
 	protected ParticipantHealthReportCollectorImpl _providerImpl;
+	protected ParticipantHealthReportTask _providerTask;
 	protected HelixManager _manager;
 	protected MockHealthReportProvider _mockProvider;
 	
@@ -35,21 +37,22 @@ public class TestParticipantHealthReportCollectorImpl {
 	public void setup()
 	{
 		 _providerImpl = new ParticipantHealthReportCollectorImpl(new MockManager(), "instance_123");
+		 _providerTask = new ParticipantHealthReportTask(_providerImpl);
 		 _mockProvider = new MockHealthReportProvider();
 	}
 	
 	 @Test (groups = {"unitTest"})
 	  public void testStart() throws Exception
 	  {
-		 _providerImpl.start();
-		 _providerImpl.start();
+	   _providerTask.start();
+	   _providerTask.start();
 	  }
 	 
 	 @Test (groups = {"unitTest"})
 	  public void testStop() throws Exception
 	  {
-		 _providerImpl.stop();
-		 _providerImpl.stop();
+	   _providerTask.stop();
+	   _providerTask.stop();
 	  }
 	 
 	 @Test (groups = {"unitTest"})

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6bb6e2c0/helix-core/src/test/java/org/apache/helix/healthcheck/TestAddDropAlert.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/healthcheck/TestAddDropAlert.java b/helix-core/src/test/java/org/apache/helix/healthcheck/TestAddDropAlert.java
index 7c82c13..26f4ac3 100644
--- a/helix-core/src/test/java/org/apache/helix/healthcheck/TestAddDropAlert.java
+++ b/helix-core/src/test/java/org/apache/helix/healthcheck/TestAddDropAlert.java
@@ -174,7 +174,7 @@ public class TestAddDropAlert extends ZkIntegrationTestBase
     ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_zkClient));
     Builder keyBuilder = accessor.keyBuilder();
 
-    new HealthStatsAggregationTask(cmResult._manager).run();
+    new HealthStatsAggregator(cmResult._manager).aggregate();
     String instance = "localhost_12918";
     ZNRecord record = accessor.getProperty(keyBuilder.alertStatus()).getRecord();
     Map<String, Map<String, String>> recMap = record.getMapFields();
@@ -182,7 +182,7 @@ public class TestAddDropAlert extends ZkIntegrationTestBase
     Assert.assertTrue(keySet.size() > 0);
 
     _setupTool.getClusterManagementTool().dropAlert(clusterName, _alertStr);
-    new HealthStatsAggregationTask(cmResult._manager).run();
+    new HealthStatsAggregator(cmResult._manager).aggregate();
     // other verifications go here
     // for (int i = 0; i < 1; i++) //change 1 back to 5
     // {

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6bb6e2c0/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertActionTriggering.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertActionTriggering.java b/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertActionTriggering.java
index 9c197d0..e20fe77 100644
--- a/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertActionTriggering.java
+++ b/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertActionTriggering.java
@@ -131,8 +131,8 @@ public class TestAlertActionTriggering extends
     String controllerName = CONTROLLER_PREFIX + "_0";
     HelixManager manager = _startCMResultMap.get(controllerName)._manager;
 
-    HealthStatsAggregationTask task = new HealthStatsAggregationTask(_startCMResultMap.get(controllerName)._manager);
-    task.run();
+    HealthStatsAggregator task = new HealthStatsAggregator(_startCMResultMap.get(controllerName)._manager);
+    task.aggregate();
     Thread.sleep(4000);
     HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
     Builder keyBuilder = helixDataAccessor.keyBuilder();
@@ -177,7 +177,7 @@ public class TestAlertActionTriggering extends
 
     // enable the disabled instances
     setHealthData(metrics3, metrics3);
-    task.run();
+    task.aggregate();
     Thread.sleep(1000);
 
     manager.getClusterManagmentTool().enableInstance(manager.getClusterName(), participant2, true);
@@ -191,7 +191,7 @@ public class TestAlertActionTriggering extends
     // Test the DISABLE_PARTITION case
     int[] metrics4 = {22, 115, 22, 16,163};
     setHealthData2(metrics4);
-    task.run();
+    task.aggregate();
 
     // scope = new ConfigScopeBuilder().forCluster(manager.getClusterName()).forParticipant(participant1).build();
     scope = new HelixConfigScopeBuilder(ConfigScopeProperty.PARTICIPANT)

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6bb6e2c0/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertFireHistory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertFireHistory.java b/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertFireHistory.java
index 24607f6..0f9798f 100644
--- a/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertFireHistory.java
+++ b/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertFireHistory.java
@@ -108,8 +108,8 @@ public class TestAlertFireHistory extends ZkStandAloneCMTestBaseWithPropertyServ
     properties.put("healthChange.enabled", "false");
     _setupTool.getClusterManagementTool().setConfig(scope, properties);
     
-    HealthStatsAggregationTask task = new HealthStatsAggregationTask(_startCMResultMap.get(controllerName)._manager);
-    task.run();
+    HealthStatsAggregator task = new HealthStatsAggregator(_startCMResultMap.get(controllerName)._manager);
+    task.aggregate();
     Thread.sleep(100);
     HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
     Builder keyBuilder = helixDataAccessor.keyBuilder();
@@ -121,7 +121,7 @@ public class TestAlertFireHistory extends ZkStandAloneCMTestBaseWithPropertyServ
     properties.put("healthChange.enabled", "true");
     _setupTool.getClusterManagementTool().setConfig(scope, properties);
     
-    task.run();
+    task.aggregate();
     Thread.sleep(100);
     
     history = manager.getHelixDataAccessor().getProperty(keyBuilder.alertHistory());
@@ -155,8 +155,8 @@ public class TestAlertFireHistory extends ZkStandAloneCMTestBaseWithPropertyServ
       historySize = property.getRecord().getMapFields().size();
     }
     
-    HealthStatsAggregationTask task = new HealthStatsAggregationTask(_startCMResultMap.get(controllerName)._manager);
-    task.run();
+    HealthStatsAggregator task = new HealthStatsAggregator(_startCMResultMap.get(controllerName)._manager);
+    task.aggregate();
     Thread.sleep(100);
     
     
@@ -173,7 +173,7 @@ public class TestAlertFireHistory extends ZkStandAloneCMTestBaseWithPropertyServ
     Assert.assertTrue(lastRecord.get("(localhost_12921.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("ON"));
     
     setHealthData(metrics1, metrics2);
-    task.run();
+    task.aggregate();
     Thread.sleep(100);
     history = helixDataAccessor.getProperty(keyBuilder.alertHistory()).getRecord();
     // no change
@@ -190,7 +190,7 @@ public class TestAlertFireHistory extends ZkStandAloneCMTestBaseWithPropertyServ
     int [] metrics3 = {21, 44, 22, 14, 16};
     int [] metrics4 = {122, 115, 222, 41,16};
     setHealthData(metrics3, metrics4);
-    task.run();
+    task.aggregate();
     Thread.sleep(100);
     history = helixDataAccessor.getProperty(keyBuilder.alertHistory()).getRecord();
     // new delta should be recorded
@@ -209,7 +209,7 @@ public class TestAlertFireHistory extends ZkStandAloneCMTestBaseWithPropertyServ
     int [] metrics5 = {0, 0, 0, 0, 0};
     int [] metrics6 = {0, 0, 0, 0,0};
     setHealthData(metrics5, metrics6);
-    task.run();
+    task.aggregate();
     
     for (int i = 0; i < 10; i++) 
     {
@@ -245,7 +245,7 @@ public class TestAlertFireHistory extends ZkStandAloneCMTestBaseWithPropertyServ
       int[] metricsy = {99 + 3*x, 99 + 3*y, 98 + 4*x, 98+4*y, 97+5*y};
       
       setHealthData(metricsx, metricsy);
-      task.run();
+      task.aggregate();
       Thread.sleep(100);
       history = helixDataAccessor.getProperty(keyBuilder.alertHistory()).getRecord();
       
@@ -304,7 +304,7 @@ public class TestAlertFireHistory extends ZkStandAloneCMTestBaseWithPropertyServ
       int[] metricsy = {99 + 3*x, 99 + 3*y, 98 + 4*x, 98+4*y, 97+5*y};
       
       setHealthData(metricsx, metricsy);
-      task.run();
+      task.aggregate();
       for (int j = 0; j < 10; j++) {
           Thread.sleep(100);
           history = helixDataAccessor.getProperty(keyBuilder.alertHistory()).getRecord();

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6bb6e2c0/helix-core/src/test/java/org/apache/helix/healthcheck/TestExpandAlert.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/healthcheck/TestExpandAlert.java b/helix-core/src/test/java/org/apache/helix/healthcheck/TestExpandAlert.java
index 5151898..5a973cb 100644
--- a/helix-core/src/test/java/org/apache/helix/healthcheck/TestExpandAlert.java
+++ b/helix-core/src/test/java/org/apache/helix/healthcheck/TestExpandAlert.java
@@ -172,7 +172,7 @@ public class TestExpandAlert extends ZkIntegrationTestBase
     Thread.sleep(1000);
     // HealthAggregationTask is supposed to run by a timer every 30s
     // To make sure HealthAggregationTask is run, we invoke it explicitly for this test
-    new HealthStatsAggregationTask(cmResult._manager).run();
+    new HealthStatsAggregator(cmResult._manager).aggregate();
   //sleep for a few seconds to give stats stage time to trigger
     Thread.sleep(3000);
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6bb6e2c0/helix-core/src/test/java/org/apache/helix/healthcheck/TestSimpleAlert.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/healthcheck/TestSimpleAlert.java b/helix-core/src/test/java/org/apache/helix/healthcheck/TestSimpleAlert.java
index b681ecc..635d529 100644
--- a/helix-core/src/test/java/org/apache/helix/healthcheck/TestSimpleAlert.java
+++ b/helix-core/src/test/java/org/apache/helix/healthcheck/TestSimpleAlert.java
@@ -177,7 +177,7 @@ public class TestSimpleAlert extends ZkIntegrationTestBase
 
     // HealthAggregationTask is supposed to run by a timer every 30s
     // To make sure HealthAggregationTask is run, we invoke it explicitly for this test
-    new HealthStatsAggregationTask(cmResult._manager).run();
+    new HealthStatsAggregator(cmResult._manager).aggregate();
     //sleep for a few seconds to give stats stage time to trigger
     Thread.sleep(3000);
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6bb6e2c0/helix-core/src/test/java/org/apache/helix/healthcheck/TestSimpleWildcardAlert.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/healthcheck/TestSimpleWildcardAlert.java b/helix-core/src/test/java/org/apache/helix/healthcheck/TestSimpleWildcardAlert.java
index 177948f..3a50e5d 100644
--- a/helix-core/src/test/java/org/apache/helix/healthcheck/TestSimpleWildcardAlert.java
+++ b/helix-core/src/test/java/org/apache/helix/healthcheck/TestSimpleWildcardAlert.java
@@ -179,7 +179,7 @@ public class TestSimpleWildcardAlert extends ZkIntegrationTestBase
     Thread.sleep(1000);
     // HealthAggregationTask is supposed to run by a timer every 30s
     // To make sure HealthAggregationTask is run, we invoke it explicitly for this test
-    new HealthStatsAggregationTask(cmResult._manager).run();
+    new HealthStatsAggregator(cmResult._manager).aggregate();
     //sleep for a few seconds to give stats stage time to trigger
     Thread.sleep(1000);
 
@@ -221,7 +221,7 @@ public class TestSimpleWildcardAlert extends ZkIntegrationTestBase
     _setupTool.getClusterManagementTool().dropAlert(clusterName, alertwildcard);
     alertwildcard = "EXP(decay(1.0)(localhost*.RestQueryStats@DBName=TestDB0.latency))CMP(GREATER)CON(15)";
     _setupTool.getClusterManagementTool().addAlert(clusterName, alertwildcard);
-    new HealthStatsAggregationTask(cmResult._manager).run();
+    new HealthStatsAggregator(cmResult._manager).aggregate();
     Thread.sleep(1000);
     
     record = accessor.getProperty(keyBuilder.alertStatus()).getRecord();

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6bb6e2c0/helix-core/src/test/java/org/apache/helix/healthcheck/TestStalenessAlert.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/healthcheck/TestStalenessAlert.java b/helix-core/src/test/java/org/apache/helix/healthcheck/TestStalenessAlert.java
index 1bdb038..d1686cb 100644
--- a/helix-core/src/test/java/org/apache/helix/healthcheck/TestStalenessAlert.java
+++ b/helix-core/src/test/java/org/apache/helix/healthcheck/TestStalenessAlert.java
@@ -170,7 +170,7 @@ public class TestStalenessAlert extends ZkIntegrationTestBase
 
     // HealthAggregationTask is supposed to run by a timer every 30s
     // To make sure HealthAggregationTask is run, we invoke it explicitly for this test
-    new HealthStatsAggregationTask(cmResult._manager).run();
+    new HealthStatsAggregator(cmResult._manager).aggregate();
   //sleep for a few seconds to give stats stage time to trigger
     Thread.sleep(3000);
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6bb6e2c0/helix-core/src/test/java/org/apache/helix/healthcheck/TestWildcardAlert.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/healthcheck/TestWildcardAlert.java b/helix-core/src/test/java/org/apache/helix/healthcheck/TestWildcardAlert.java
index 20f0c13..e8b1719 100644
--- a/helix-core/src/test/java/org/apache/helix/healthcheck/TestWildcardAlert.java
+++ b/helix-core/src/test/java/org/apache/helix/healthcheck/TestWildcardAlert.java
@@ -261,7 +261,7 @@ public class TestWildcardAlert extends ZkIntegrationTestBase
     Thread.sleep(3000);
     // HealthAggregationTask is supposed to run by a timer every 30s
     // To make sure HealthAggregationTask is run, we invoke it explicitly for this test
-    new HealthStatsAggregationTask(cmResult._manager).run();
+    new HealthStatsAggregator(cmResult._manager).aggregate();
 
     //sleep for a few seconds to give stats stage time to trigger and for bean to trigger
     Thread.sleep(3000);

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6bb6e2c0/helix-core/src/test/java/org/apache/helix/integration/TestSessionExpiryInTransition.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSessionExpiryInTransition.java b/helix-core/src/test/java/org/apache/helix/integration/TestSessionExpiryInTransition.java
index e70c3ca..a93e073 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestSessionExpiryInTransition.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestSessionExpiryInTransition.java
@@ -36,6 +36,7 @@ import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.testng.Assert;
+import org.testng.annotations.Test;
 
 
 public class TestSessionExpiryInTransition extends ZkIntegrationTestBase
@@ -69,9 +70,7 @@ public class TestSessionExpiryInTransition extends ZkIntegrationTestBase
     }
   }
  
-  // TODO: disable test first until we have a clean design in handling zk disconnect/session-expiry
-  // when there is pending messages
-  // @Test
+  @Test
   public void testSessionExpiryInTransition() throws Exception
   {
     Logger.getRootLogger().setLevel(Level.WARN);

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6bb6e2c0/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
new file mode 100644
index 0000000..e171539
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
@@ -0,0 +1,139 @@
+package org.apache.helix.integration.manager;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.helix.manager.zk.ParticipantManager;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.mock.participant.DummyProcess.DummyLeaderStandbyStateModelFactory;
+import org.apache.helix.mock.participant.DummyProcess.DummyOnlineOfflineStateModelFactory;
+import org.apache.helix.mock.participant.MockMSModelFactory;
+import org.apache.helix.mock.participant.MockSchemataModelFactory;
+import org.apache.helix.mock.participant.MockTransition;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.log4j.Logger;
+
+
+public class MockParticipantManager extends ParticipantManager implements Runnable
+{
+  private static Logger           LOG = Logger.getLogger(MockParticipantManager.class);
+  private final String            _instanceName;
+
+  private final CountDownLatch    _startCountDown          = new CountDownLatch(1);
+  private final CountDownLatch    _stopCountDown           = new CountDownLatch(1);
+  private final CountDownLatch    _waitStopCompleteCountDown = new CountDownLatch(1);
+
+  private final MockMSModelFactory _msModelFactory;
+
+  public MockParticipantManager(String clusterName, String instanceName, String zkAddr) throws Exception
+  {
+    this(clusterName, instanceName, zkAddr, null);
+  }
+
+  public MockParticipantManager(String zkAddr,
+                         String clusterName,
+                         String instanceName,
+                         MockTransition transition) throws Exception
+  {
+    super(zkAddr, clusterName, instanceName);
+    _instanceName = instanceName;
+    _msModelFactory = new MockMSModelFactory(transition);
+  }
+
+  public void setTransition(MockTransition transition)
+  {
+    _msModelFactory.setTrasition(transition);
+  }
+
+  public void syncStop()
+  {
+    _stopCountDown.countDown();
+    try
+    {
+      _waitStopCompleteCountDown.await();
+    }
+    catch (InterruptedException e)
+    {
+      LOG.error("exception in syncStop participant-manager", e);
+    }
+  }
+
+  public void syncStart()
+  {
+    try
+    {
+      new Thread(this).start();
+      _startCountDown.await();
+    }
+    catch (InterruptedException e)
+    {
+      LOG.error("exception in syncStart participant-manager", e);
+    }
+  }
+
+  @Override
+  public void run()
+  {
+    try
+    {
+      StateMachineEngine stateMach = getStateMachineEngine();
+      stateMach.registerStateModelFactory("MasterSlave", _msModelFactory);
+
+      DummyLeaderStandbyStateModelFactory lsModelFactory =
+          new DummyLeaderStandbyStateModelFactory(10);
+      DummyOnlineOfflineStateModelFactory ofModelFactory =
+          new DummyOnlineOfflineStateModelFactory(10);
+      stateMach.registerStateModelFactory("LeaderStandby", lsModelFactory);
+      stateMach.registerStateModelFactory("OnlineOffline", ofModelFactory);
+
+      MockSchemataModelFactory schemataFactory = new MockSchemataModelFactory();
+      stateMach.registerStateModelFactory("STORAGE_DEFAULT_SM_SCHEMATA", schemataFactory);
+
+      connect();
+      _startCountDown.countDown();
+
+      _stopCountDown.await();
+    }
+    catch (InterruptedException e)
+    {
+      String msg =
+          "participant: " + _instanceName + ", " + Thread.currentThread().getName()
+              + " is interrupted";
+      LOG.info(msg);
+      System.err.println(msg);
+    }
+    catch (Exception e)
+    {
+      LOG.error("exception running participant-manager", e);
+    }
+    finally
+    {
+      _startCountDown.countDown();
+
+      disconnect();
+      _waitStopCompleteCountDown.countDown();
+    }
+  }
+
+  public ZkClient getZkClient() {
+    return _zkclient;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6bb6e2c0/helix-core/src/test/java/org/apache/helix/integration/manager/TestAbstractManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/TestAbstractManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/TestAbstractManager.java
new file mode 100644
index 0000000..74774d1
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/TestAbstractManager.java
@@ -0,0 +1,43 @@
+package org.apache.helix.integration.manager;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+
+import org.apache.helix.manager.zk.CallbackHandler;
+import org.apache.helix.manager.zk.ParticipantManager;
+import org.apache.helix.manager.zk.ZkClient;
+
+// ZkHelixManager used for test only. expose more class members
+public class TestAbstractManager extends ParticipantManager {
+
+  public TestAbstractManager(String zkConnectString, String clusterName, 
+      String instanceName) throws Exception {
+    super(zkConnectString, clusterName, instanceName);
+  }
+
+  public ZkClient getZkClient() {
+    return _zkclient;
+  }
+
+  public List<CallbackHandler> getHandlers() {
+    return _handlers;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6bb6e2c0/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java
index efa3138..a35a49d 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java
@@ -19,24 +19,45 @@ package org.apache.helix.integration.manager;
  * under the License.
  */
 
+import java.io.IOException;
 import java.util.Date;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
 import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkHelixTestManager;
+import org.apache.helix.ZkTestHelper;
 import org.apache.helix.integration.ZkIntegrationTestBase;
+import org.apache.helix.integration.TestSessionExpiryInTransition.SessionExpiryTransition;
 import org.apache.helix.manager.zk.ControllerManager;
 import org.apache.helix.manager.zk.ParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.mock.controller.ClusterController;
 import org.apache.helix.mock.participant.MockMSModelFactory;
+import org.apache.helix.mock.participant.MockParticipant;
+import org.apache.helix.mock.participant.MockTransition;
+import org.apache.helix.model.Message;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
 public class TestParticipantManager extends ZkIntegrationTestBase {
 
+  private static Logger LOG = Logger.getLogger(TestParticipantManager.class);
+
   @Test
   public void simpleIntegrationTest() throws Exception {
     // Logger.getRootLogger().setLevel(Level.INFO);
@@ -83,4 +104,174 @@ public class TestParticipantManager extends ZkIntegrationTestBase {
     System.out.println("START " + clusterName + " at "
         + new Date(System.currentTimeMillis()));
   }
+  
+  @Test
+  public void simpleSessionExpiryTest() throws Exception
+  {
+//    Logger.getRootLogger().setLevel(Level.WARN);
+
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    final String clusterName = className + "_" + methodName;
+    int n = 1;
+
+    System.out.println("START " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+
+    MockParticipantManager[] participants = new MockParticipantManager[n];
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+                            "localhost", // participant name prefix
+                            "TestDB", // resource name prefix
+                            1, // resources
+                            1, // partitions per resource
+                            n, // number of nodes
+                            1, // replicas
+                            "MasterSlave",
+                            true); // do rebalance
+
+    // start controller
+    ControllerManager controller = new ControllerManager(ZK_ADDR, clusterName, "controller_0");
+    controller.connect();
+
+    // start participants
+    for (int i = 0; i < n; i++)
+    {
+      String instanceName = "localhost_" + (12918 + i);
+      participants[i] =
+          new MockParticipantManager(ZK_ADDR, 
+                                 clusterName,
+                                 instanceName,
+                                 null);
+      participants[i].syncStart();
+    }
+
+    boolean result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                 clusterName));
+    Assert.assertTrue(result);
+    String oldSessionId = participants[0].getSessionId();
+    
+    // expire zk-connection on localhost_12918
+    ZkTestHelper.expireSession(participants[0].getZkClient());
+
+    // wait until session expiry callback happens
+    TimeUnit.MILLISECONDS.sleep(100);
+    
+    result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                 clusterName));
+    Assert.assertTrue(result);
+    String newSessionId = participants[0].getSessionId();
+    Assert.assertNotSame(newSessionId, oldSessionId);
+    
+    // cleanup
+    controller.disconnect();
+    for (int i = 0; i < n; i++) {
+      participants[i].syncStop();
+    }
+    
+    System.out.println("END " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+
+  }
+  
+  class SessionExpiryTransition extends MockTransition
+  {
+    private final AtomicBoolean _done = new AtomicBoolean();
+    private final CountDownLatch _startCountdown;
+    private final CountDownLatch _endCountdown;
+  
+    public SessionExpiryTransition(CountDownLatch startCountdown, CountDownLatch endCountdown) {
+      _startCountdown = startCountdown;
+      _endCountdown = endCountdown;
+    }
+    
+    @Override
+    public void doTransition(Message message, NotificationContext context) throws InterruptedException
+    {
+      String instance = message.getTgtName();
+      String partition = message.getPartitionName();
+      if (instance.equals("localhost_12918")
+          && partition.equals("TestDB0_0")
+          && _done.getAndSet(true) == false)
+      {
+        _startCountdown.countDown();
+        // this await will be interrupted since we cancel the task during handleNewSession
+        _endCountdown.await();
+      }
+    }
+  }
+  
+  @Test
+  public void testSessionExpiryInTransition() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    final String clusterName = className + "_" + methodName;
+    int n = 1;
+    CountDownLatch startCountdown = new CountDownLatch(1);
+    CountDownLatch endCountdown = new CountDownLatch(1);
+
+
+    System.out.println("START " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+
+    MockParticipantManager[] participants = new MockParticipantManager[n];
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+                            "localhost", // participant name prefix
+                            "TestDB", // resource name prefix
+                            1, // resources
+                            1, // partitions per resource
+                            n, // number of nodes
+                            1, // replicas
+                            "MasterSlave",
+                            true); // do rebalance
+
+    // start controller
+    ControllerManager controller = new ControllerManager(ZK_ADDR, clusterName, "controller_0");
+    controller.connect();
+    
+    // start participants
+    for (int i = 0; i < n; i++)
+    {
+      String instanceName = "localhost_" + (12918 + i);
+      participants[i] =
+          new MockParticipantManager(ZK_ADDR, 
+                                 clusterName,
+                                 instanceName,
+                                 new SessionExpiryTransition(startCountdown, endCountdown));
+      participants[i].syncStart();
+    }
+    
+    // wait transition happens to trigger session expiry
+    startCountdown.await();
+    String oldSessionId = participants[0].getSessionId();
+    System.out.println("oldSessionId: " + oldSessionId);
+    ZkTestHelper.expireSession(participants[0].getZkClient());
+    
+    boolean result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                 clusterName));
+    Assert.assertTrue(result);
+   
+    String newSessionId = participants[0].getSessionId();
+    Assert.assertNotSame(newSessionId, oldSessionId);
+    
+    // assert interrupt exception error in old session
+    String errPath = PropertyPathConfig.getPath(PropertyType.ERRORS, clusterName, "localhost_12918", oldSessionId, "TestDB0", "TestDB0_0");
+    ZNRecord error = _gZkClient.readData(errPath);
+    Assert.assertNotNull(error, "InterruptedException should happen in old session since task is being cancelled during handleNewSession");
+    String errString = new String(new ZNRecordSerializer().serialize(error));
+    Assert.assertTrue(errString.indexOf("InterruptedException") != -1);
+    
+    // cleanup
+    controller.disconnect();
+    for (int i = 0; i < n; i++) {
+      participants[i].syncStop();
+    }
+    
+    System.out.println("END " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+  }
 }