You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2013/07/12 19:04:18 UTC
git commit: code change for handling zk session expiry
Updated Branches:
refs/heads/master bf7a6583e -> 6bb6e2c01
code change for handling zk session expiry
Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/6bb6e2c0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/6bb6e2c0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/6bb6e2c0
Branch: refs/heads/master
Commit: 6bb6e2c010055c9df2e1945cea428cfe660d7a24
Parents: bf7a658
Author: zzhang <zz...@uci.edu>
Authored: Fri Jul 12 10:04:14 2013 -0700
Committer: zzhang <zz...@uci.edu>
Committed: Fri Jul 12 10:04:14 2013 -0700
----------------------------------------------------------------------
.../java/org/apache/helix/HelixTimerTask.java | 4 +-
.../healthcheck/HealthStatsAggregationTask.java | 168 +++-------------
.../healthcheck/HealthStatsAggregator.java | 165 ++++++++++++++++
.../ParticipantHealthReportCollectorImpl.java | 131 +++----------
.../ParticipantHealthReportTask.java | 72 +++++++
.../helix/manager/zk/AbstractManager.java | 4 +-
.../helix/manager/zk/ControllerManager.java | 14 +-
.../zk/DistributedControllerManager.java | 6 +-
.../helix/manager/zk/ParticipantManager.java | 3 +-
.../apache/helix/manager/zk/ZKHelixManager.java | 10 +-
.../handling/HelixStateTransitionHandler.java | 144 +++++++-------
.../messaging/handling/HelixTaskExecutor.java | 57 ++++--
...estParticipantHealthReportCollectorImpl.java | 11 +-
.../helix/healthcheck/TestAddDropAlert.java | 4 +-
.../healthcheck/TestAlertActionTriggering.java | 8 +-
.../helix/healthcheck/TestAlertFireHistory.java | 20 +-
.../helix/healthcheck/TestExpandAlert.java | 2 +-
.../helix/healthcheck/TestSimpleAlert.java | 2 +-
.../healthcheck/TestSimpleWildcardAlert.java | 4 +-
.../helix/healthcheck/TestStalenessAlert.java | 2 +-
.../helix/healthcheck/TestWildcardAlert.java | 2 +-
.../TestSessionExpiryInTransition.java | 5 +-
.../manager/MockParticipantManager.java | 139 ++++++++++++++
.../manager/TestAbstractManager.java | 43 +++++
.../manager/TestParticipantManager.java | 191 +++++++++++++++++++
25 files changed, 834 insertions(+), 377 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6bb6e2c0/helix-core/src/main/java/org/apache/helix/HelixTimerTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixTimerTask.java b/helix-core/src/main/java/org/apache/helix/HelixTimerTask.java
index 203bb54..0b7edb9 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixTimerTask.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixTimerTask.java
@@ -19,9 +19,7 @@ package org.apache.helix;
* under the License.
*/
-import java.util.TimerTask;
-
-public abstract class HelixTimerTask extends TimerTask
+public abstract class HelixTimerTask
{
/**
* Timer task starts
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6bb6e2c0/helix-core/src/main/java/org/apache/helix/healthcheck/HealthStatsAggregationTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/healthcheck/HealthStatsAggregationTask.java b/helix-core/src/main/java/org/apache/helix/healthcheck/HealthStatsAggregationTask.java
index 4b5fe92..f0dd134 100644
--- a/helix-core/src/main/java/org/apache/helix/healthcheck/HealthStatsAggregationTask.java
+++ b/helix-core/src/main/java/org/apache/helix/healthcheck/HealthStatsAggregationTask.java
@@ -19,189 +19,83 @@ package org.apache.helix.healthcheck;
* under the License.
*/
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
import java.util.Random;
import java.util.Timer;
+import java.util.TimerTask;
-import org.apache.helix.ConfigAccessor;
-import org.apache.helix.model.ConfigScope;
-import org.apache.helix.model.builder.ConfigScopeBuilder;
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixManager;
import org.apache.helix.HelixTimerTask;
-import org.apache.helix.controller.pipeline.Pipeline;
-import org.apache.helix.controller.pipeline.Stage;
-import org.apache.helix.controller.stages.ClusterEvent;
-import org.apache.helix.controller.stages.ReadHealthDataStage;
-import org.apache.helix.controller.stages.StatsAggregationStage;
-import org.apache.helix.monitoring.mbeans.ClusterAlertMBeanCollection;
-import org.apache.helix.monitoring.mbeans.HelixStageLatencyMonitor;
import org.apache.log4j.Logger;
public class HealthStatsAggregationTask extends HelixTimerTask
-{
+{
private static final Logger LOG = Logger.getLogger(HealthStatsAggregationTask.class);
-
public final static int DEFAULT_HEALTH_CHECK_LATENCY = 30 * 1000;
-
+
+ final HealthStatsAggregator _healthStatsAggregator;
+
+ class HealthStatsAggregationTaskTimer extends TimerTask {
+
+ @Override
+ public void run() {
+ _healthStatsAggregator.aggregate();
+ }
+
+ }
+
private Timer _timer;
- private final HelixManager _manager;
- private final Pipeline _healthStatsAggregationPipeline;
private final int _delay;
private final int _period;
- private final ClusterAlertMBeanCollection _alertItemCollection;
- private final Map<String, HelixStageLatencyMonitor> _stageLatencyMonitorMap =
- new HashMap<String, HelixStageLatencyMonitor>();
-
- public HealthStatsAggregationTask(HelixManager manager, int delay, int period)
+
+ public HealthStatsAggregationTask(HealthStatsAggregator healthStatsAggregator, int delay, int period)
{
- _manager = manager;
+ _healthStatsAggregator = healthStatsAggregator;
+
_delay = delay;
_period = period;
-
- // health stats pipeline
- _healthStatsAggregationPipeline = new Pipeline();
- _healthStatsAggregationPipeline.addStage(new ReadHealthDataStage());
- StatsAggregationStage statAggregationStage = new StatsAggregationStage();
- _healthStatsAggregationPipeline.addStage(statAggregationStage);
- _alertItemCollection = statAggregationStage.getClusterAlertMBeanCollection();
-
- registerStageLatencyMonitor(_healthStatsAggregationPipeline);
- }
-
- public HealthStatsAggregationTask(HelixManager manager)
- {
- this(manager, DEFAULT_HEALTH_CHECK_LATENCY, DEFAULT_HEALTH_CHECK_LATENCY);
}
- private void registerStageLatencyMonitor(Pipeline pipeline)
+ public HealthStatsAggregationTask(HealthStatsAggregator healthStatsAggregator)
{
- for (Stage stage : pipeline.getStages())
- {
- String stgName = stage.getStageName();
- if (!_stageLatencyMonitorMap.containsKey(stgName))
- {
- try
- {
- _stageLatencyMonitorMap.put(stage.getStageName(),
- new HelixStageLatencyMonitor(_manager.getClusterName(),
- stgName));
- }
- catch (Exception e)
- {
- LOG.error("Couldn't create StageLatencyMonitor mbean for stage: " + stgName, e);
- }
- }
- else
- {
- LOG.error("StageLatencyMonitor for stage: " + stgName
- + " already exists. Skip register it");
- }
- }
+ this(healthStatsAggregator,
+ DEFAULT_HEALTH_CHECK_LATENCY, DEFAULT_HEALTH_CHECK_LATENCY);
}
@Override
public void start()
{
- LOG.info("START HealthAggregationTask");
if (_timer == null)
{
+ LOG.info("START HealthStatsAggregationTimerTask");
+
// Remove all the previous health check values, if any
- HelixDataAccessor accessor = _manager.getHelixDataAccessor();
- List<String> existingHealthRecordNames = accessor.getChildNames(accessor.keyBuilder().healthReports(_manager.getInstanceName()));
- for(String healthReportName : existingHealthRecordNames)
- {
- LOG.info("Removing old healthrecord " + healthReportName);
- accessor.removeProperty(accessor.keyBuilder().healthReport(_manager.getInstanceName(),healthReportName));
- }
+ _healthStatsAggregator.init();
- _timer = new Timer(true);
- _timer.scheduleAtFixedRate(this, new Random().nextInt(_delay), _period);
+ _timer = new Timer("HealthStatsAggregationTimerTask", true);
+ _timer.scheduleAtFixedRate(new HealthStatsAggregationTaskTimer(),
+ new Random().nextInt(_delay), _period);
}
else
{
- LOG.warn("timer already started");
+ LOG.warn("HealthStatsAggregationTimerTask already started");
}
}
@Override
public synchronized void stop()
{
- LOG.info("Stop HealthAggregationTask");
-
if (_timer != null)
{
+ LOG.info("Stop HealthStatsAggregationTimerTask");
_timer.cancel();
+ _healthStatsAggregator.reset();
_timer = null;
- _alertItemCollection.reset();
-
- for (HelixStageLatencyMonitor stgLatencyMonitor : _stageLatencyMonitorMap.values())
- {
- stgLatencyMonitor.reset();
- }
- }
- else
- {
- LOG.warn("timer already stopped");
- }
- }
-
- @Override
- public synchronized void run()
- {
- if (!isEnabled())
- {
- LOG.info("HealthAggregationTask is disabled.");
- return;
- }
-
- if (!_manager.isLeader())
- {
- LOG.error("Cluster manager: " + _manager.getInstanceName()
- + " is not leader. Pipeline will not be invoked");
- return;
- }
-
- try
- {
- ClusterEvent event = new ClusterEvent("healthChange");
- event.addAttribute("helixmanager", _manager);
- event.addAttribute("HelixStageLatencyMonitorMap", _stageLatencyMonitorMap);
-
- _healthStatsAggregationPipeline.handle(event);
- _healthStatsAggregationPipeline.finish();
- }
- catch (Exception e)
- {
- LOG.error("Exception while executing pipeline: " + _healthStatsAggregationPipeline,
- e);
- }
- }
-
- private boolean isEnabled()
- {
- ConfigAccessor configAccessor = _manager.getConfigAccessor();
- boolean enabled = true;
- if (configAccessor != null)
- {
- // zk-based cluster manager
- ConfigScope scope =
- new ConfigScopeBuilder().forCluster(_manager.getClusterName()).build();
- String isEnabled = configAccessor.get(scope, "healthChange.enabled");
- if (isEnabled != null)
- {
- enabled = new Boolean(isEnabled);
- }
}
else
{
- LOG.debug("File-based cluster manager doesn't support disable healthChange");
+ LOG.warn("HealthStatsAggregationTimerTask already stopped");
}
- return enabled;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6bb6e2c0/helix-core/src/main/java/org/apache/helix/healthcheck/HealthStatsAggregator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/healthcheck/HealthStatsAggregator.java b/helix-core/src/main/java/org/apache/helix/healthcheck/HealthStatsAggregator.java
new file mode 100644
index 0000000..c60e3bb
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/healthcheck/HealthStatsAggregator.java
@@ -0,0 +1,165 @@
+package org.apache.helix.healthcheck;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.controller.pipeline.Pipeline;
+import org.apache.helix.controller.pipeline.Stage;
+import org.apache.helix.controller.stages.ClusterEvent;
+import org.apache.helix.controller.stages.ReadHealthDataStage;
+import org.apache.helix.controller.stages.StatsAggregationStage;
+import org.apache.helix.model.ConfigScope;
+import org.apache.helix.model.builder.ConfigScopeBuilder;
+import org.apache.helix.monitoring.mbeans.ClusterAlertMBeanCollection;
+import org.apache.helix.monitoring.mbeans.HelixStageLatencyMonitor;
+import org.apache.log4j.Logger;
+
+public class HealthStatsAggregator {
+ private static final Logger LOG = Logger.getLogger(HealthStatsAggregator.class);
+
+ public final static int DEFAULT_HEALTH_CHECK_LATENCY = 30 * 1000;
+
+ private final HelixManager _manager;
+ private final Pipeline _healthStatsAggregationPipeline;
+ private final ClusterAlertMBeanCollection _alertItemCollection;
+ private final Map<String, HelixStageLatencyMonitor> _stageLatencyMonitorMap =
+ new HashMap<String, HelixStageLatencyMonitor>();
+
+ public HealthStatsAggregator(HelixManager manager)
+ {
+ _manager = manager;
+
+ // health stats pipeline
+ _healthStatsAggregationPipeline = new Pipeline();
+ _healthStatsAggregationPipeline.addStage(new ReadHealthDataStage());
+ StatsAggregationStage statAggregationStage = new StatsAggregationStage();
+ _healthStatsAggregationPipeline.addStage(statAggregationStage);
+ _alertItemCollection = statAggregationStage.getClusterAlertMBeanCollection();
+
+ registerStageLatencyMonitor(_healthStatsAggregationPipeline);
+ }
+
+ private void registerStageLatencyMonitor(Pipeline pipeline)
+ {
+ for (Stage stage : pipeline.getStages())
+ {
+ String stgName = stage.getStageName();
+ if (!_stageLatencyMonitorMap.containsKey(stgName))
+ {
+ try
+ {
+ _stageLatencyMonitorMap.put(stage.getStageName(),
+ new HelixStageLatencyMonitor(_manager.getClusterName(),
+ stgName));
+ }
+ catch (Exception e)
+ {
+ LOG.error("Couldn't create StageLatencyMonitor mbean for stage: " + stgName, e);
+ }
+ }
+ else
+ {
+ LOG.error("StageLatencyMonitor for stage: " + stgName
+ + " already exists. Skip register it");
+ }
+ }
+ }
+
+ public synchronized void aggregate()
+ {
+ if (!isEnabled())
+ {
+ LOG.info("HealthAggregationTask is disabled.");
+ return;
+ }
+
+ if (!_manager.isLeader())
+ {
+ LOG.error("Cluster manager: " + _manager.getInstanceName()
+ + " is not leader. Pipeline will not be invoked");
+ return;
+ }
+
+ try
+ {
+ ClusterEvent event = new ClusterEvent("healthChange");
+ event.addAttribute("helixmanager", _manager);
+ event.addAttribute("HelixStageLatencyMonitorMap", _stageLatencyMonitorMap);
+
+ _healthStatsAggregationPipeline.handle(event);
+ _healthStatsAggregationPipeline.finish();
+ }
+ catch (Exception e)
+ {
+ LOG.error("Exception while executing pipeline: " + _healthStatsAggregationPipeline,
+ e);
+ }
+ }
+
+ private boolean isEnabled()
+ {
+ ConfigAccessor configAccessor = _manager.getConfigAccessor();
+ boolean enabled = true;
+ if (configAccessor != null)
+ {
+ // zk-based cluster manager
+ ConfigScope scope =
+ new ConfigScopeBuilder().forCluster(_manager.getClusterName()).build();
+ String isEnabled = configAccessor.get(scope, "healthChange.enabled");
+ if (isEnabled != null)
+ {
+ enabled = new Boolean(isEnabled);
+ }
+ }
+ else
+ {
+ LOG.debug("File-based cluster manager doesn't support disable healthChange");
+ }
+ return enabled;
+ }
+
+ public void init() {
+ // Remove all the previous health check values, if any
+ HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+ List<String> existingHealthRecordNames = accessor.getChildNames(accessor.keyBuilder().healthReports(_manager.getInstanceName()));
+ for(String healthReportName : existingHealthRecordNames)
+ {
+ LOG.info("Removing old healthrecord " + healthReportName);
+ accessor.removeProperty(accessor.keyBuilder().healthReport(_manager.getInstanceName(),healthReportName));
+ }
+
+ }
+
+ public void reset() {
+ _alertItemCollection.reset();
+
+ for (HelixStageLatencyMonitor stgLatencyMonitor : _stageLatencyMonitorMap.values())
+ {
+ stgLatencyMonitor.reset();
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6bb6e2c0/helix-core/src/main/java/org/apache/helix/healthcheck/ParticipantHealthReportCollectorImpl.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/healthcheck/ParticipantHealthReportCollectorImpl.java b/helix-core/src/main/java/org/apache/helix/healthcheck/ParticipantHealthReportCollectorImpl.java
index 14c12de..1d33260 100644
--- a/helix-core/src/main/java/org/apache/helix/healthcheck/ParticipantHealthReportCollectorImpl.java
+++ b/helix-core/src/main/java/org/apache/helix/healthcheck/ParticipantHealthReportCollectorImpl.java
@@ -21,170 +21,95 @@ package org.apache.helix.healthcheck;
import java.util.LinkedList;
import java.util.Map;
-import java.util.Random;
-import java.util.Timer;
-import java.util.TimerTask;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
-import org.apache.helix.HelixTimerTask;
import org.apache.helix.ZNRecord;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.alerts.StatsHolder;
import org.apache.helix.model.HealthStat;
import org.apache.log4j.Logger;
-
-public class ParticipantHealthReportCollectorImpl extends HelixTimerTask implements
- ParticipantHealthReportCollector
-{
+public class ParticipantHealthReportCollectorImpl implements
+ ParticipantHealthReportCollector {
private final LinkedList<HealthReportProvider> _healthReportProviderList = new LinkedList<HealthReportProvider>();
- private Timer _timer;
private static final Logger _logger = Logger
.getLogger(ParticipantHealthReportCollectorImpl.class);
private final HelixManager _helixManager;
String _instanceName;
- public final static int DEFAULT_REPORT_LATENCY = 60 * 1000;
- public ParticipantHealthReportCollectorImpl(HelixManager helixManager,
- String instanceName)
- {
+ public ParticipantHealthReportCollectorImpl(HelixManager helixManager, String instanceName) {
_helixManager = helixManager;
_instanceName = instanceName;
addDefaultHealthCheckInfoProvider();
}
- private void addDefaultHealthCheckInfoProvider()
- {
+ private void addDefaultHealthCheckInfoProvider() {
addHealthReportProvider(new DefaultHealthReportProvider());
}
@Override
- public void start()
- {
- if (_timer == null)
- {
- _timer = new Timer(true);
- _timer.scheduleAtFixedRate(this,
- new Random().nextInt(DEFAULT_REPORT_LATENCY), DEFAULT_REPORT_LATENCY);
- }
- else
- {
- _logger.warn("timer already started");
- }
- }
-
- @Override
- public void addHealthReportProvider(HealthReportProvider provider)
- {
- try
- {
- synchronized (_healthReportProviderList)
- {
- if (!_healthReportProviderList.contains(provider))
- {
+ public void addHealthReportProvider(HealthReportProvider provider) {
+ try {
+ synchronized (_healthReportProviderList) {
+ if (!_healthReportProviderList.contains(provider)) {
_healthReportProviderList.add(provider);
- }
- else
- {
+ } else {
_logger.warn("Skipping a duplicated HealthCheckInfoProvider");
}
}
- }
- catch (Exception e)
- {
+ } catch (Exception e) {
_logger.error(e);
}
}
@Override
- public void removeHealthReportProvider(HealthReportProvider provider)
- {
- synchronized (_healthReportProviderList)
- {
- if (_healthReportProviderList.contains(provider))
- {
+ public void removeHealthReportProvider(HealthReportProvider provider) {
+ synchronized (_healthReportProviderList) {
+ if (_healthReportProviderList.contains(provider)) {
_healthReportProviderList.remove(provider);
- }
- else
- {
+ } else {
_logger.warn("Skip removing a non-exist HealthCheckInfoProvider");
}
}
}
@Override
- public void reportHealthReportMessage(ZNRecord healthCheckInfoUpdate)
- {
+ public void reportHealthReportMessage(ZNRecord healthCheckInfoUpdate) {
HelixDataAccessor accessor = _helixManager.getHelixDataAccessor();
Builder keyBuilder = accessor.keyBuilder();
-// accessor.setProperty(
-// PropertyType.HEALTHREPORT, healthCheckInfoUpdate, _instanceName,
-// healthCheckInfoUpdate.getId());
- accessor.setProperty(keyBuilder.healthReport(_instanceName, healthCheckInfoUpdate.getId()),
- new HealthStat(healthCheckInfoUpdate));
+ accessor.setProperty(keyBuilder.healthReport(_instanceName, healthCheckInfoUpdate.getId()),
+ new HealthStat(healthCheckInfoUpdate));
}
@Override
- public void stop()
- {
- _logger.info("Stop HealthCheckInfoReportingTask");
- if (_timer != null)
- {
- _timer.cancel();
- _timer = null;
- }
- else
- {
- _logger.warn("timer already stopped");
- }
- }
-
- @Override
- public synchronized void transmitHealthReports()
- {
- synchronized (_healthReportProviderList)
- {
- for (HealthReportProvider provider : _healthReportProviderList)
- {
- try
- {
+ public synchronized void transmitHealthReports() {
+ synchronized (_healthReportProviderList) {
+ for (HealthReportProvider provider : _healthReportProviderList) {
+ try {
Map<String, String> report = provider.getRecentHealthReport();
Map<String, Map<String, String>> partitionReport = provider
.getRecentPartitionHealthReport();
ZNRecord record = new ZNRecord(provider.getReportName());
- if (report != null)
- {
+ if (report != null) {
record.setSimpleFields(report);
}
- if (partitionReport != null)
- {
+ if (partitionReport != null) {
record.setMapFields(partitionReport);
}
record.setSimpleField(StatsHolder.TIMESTAMP_NAME, "" + System.currentTimeMillis());
-
+
HelixDataAccessor accessor = _helixManager.getHelixDataAccessor();
Builder keyBuilder = accessor.keyBuilder();
- accessor.setProperty(keyBuilder.healthReport(_instanceName, record.getId()),
- new HealthStat(record));
+ accessor.setProperty(keyBuilder.healthReport(_instanceName, record.getId()),
+ new HealthStat(record));
-// _helixManager.getDataAccessor().setProperty(
-// PropertyType.HEALTHREPORT, record, _instanceName, record.getId());
- // reset stats (for now just the partition stats)
provider.resetStats();
- }
- catch (Exception e)
- {
- _logger.error("", e);
+ } catch (Exception e) {
+ _logger.error("fail to transmit health report", e);
}
}
}
}
-
- @Override
- public void run()
- {
- transmitHealthReports();
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6bb6e2c0/helix-core/src/main/java/org/apache/helix/healthcheck/ParticipantHealthReportTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/healthcheck/ParticipantHealthReportTask.java b/helix-core/src/main/java/org/apache/helix/healthcheck/ParticipantHealthReportTask.java
new file mode 100644
index 0000000..4e7cbe4
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/healthcheck/ParticipantHealthReportTask.java
@@ -0,0 +1,72 @@
+package org.apache.helix.healthcheck;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Random;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.helix.HelixTimerTask;
+import org.apache.log4j.Logger;
+
+public class ParticipantHealthReportTask extends HelixTimerTask {
+ private static final Logger LOG = Logger.getLogger(ParticipantHealthReportTask.class);
+ public final static int DEFAULT_REPORT_LATENCY = 60 * 1000;
+
+ Timer _timer;
+ final ParticipantHealthReportCollectorImpl _healthReportCollector;
+
+ class ParticipantHealthReportTimerTask extends TimerTask {
+
+ @Override
+ public void run() {
+ _healthReportCollector.transmitHealthReports();
+ }
+ }
+
+ public ParticipantHealthReportTask(ParticipantHealthReportCollectorImpl healthReportCollector) {
+ _healthReportCollector = healthReportCollector;
+ }
+
+ @Override
+ public void start() {
+ if (_timer == null) {
+ LOG.info("Start HealthCheckInfoReportingTask");
+ _timer = new Timer("ParticipantHealthReportTimerTask", true);
+ _timer.scheduleAtFixedRate(new ParticipantHealthReportTimerTask(),
+ new Random().nextInt(DEFAULT_REPORT_LATENCY), DEFAULT_REPORT_LATENCY);
+ } else {
+ LOG.warn("ParticipantHealthReportTimerTask already started");
+ }
+ }
+
+ @Override
+ public void stop() {
+ if (_timer != null) {
+ LOG.info("Stop ParticipantHealthReportTimerTask");
+ _timer.cancel();
+ _timer = null;
+ } else {
+ LOG.warn("ParticipantHealthReportTimerTask already stopped");
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6bb6e2c0/helix-core/src/main/java/org/apache/helix/manager/zk/AbstractManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/AbstractManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/AbstractManager.java
index 165f639..a68e1de 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/AbstractManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/AbstractManager.java
@@ -75,7 +75,7 @@ public abstract class AbstractManager implements HelixManager, IZkStateListener
final InstanceType _instanceType;
final int _sessionTimeout;
final List<PreConnectCallback> _preConnectCallbacks;
- final List<CallbackHandler> _handlers;
+ protected final List<CallbackHandler> _handlers;
final HelixManagerProperties _properties;
/**
@@ -83,7 +83,7 @@ public abstract class AbstractManager implements HelixManager, IZkStateListener
*/
final String _version;
- ZkClient _zkclient = null;
+ protected ZkClient _zkclient = null;
final DefaultMessagingService _messagingService;
BaseDataAccessor<ZNRecord> _baseDataAccessor;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6bb6e2c0/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManager.java
index 43fa149..4b027a3 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManager.java
@@ -39,6 +39,7 @@ import org.apache.helix.HelixConstants.ChangeType;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.controller.GenericHelixController;
import org.apache.helix.healthcheck.HealthStatsAggregationTask;
+import org.apache.helix.healthcheck.HealthStatsAggregator;
import org.apache.helix.messaging.handling.MessageHandlerFactory;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.monitoring.ZKPathDataDumpTask;
@@ -78,7 +79,8 @@ public class ControllerManager extends AbstractManager {
if (_timer == null)
{
- _timer = new Timer(true);
+ LOG.info("Start StatusDumpTask");
+ _timer = new Timer("StatusDumpTimerTask", true);
_timer.scheduleAtFixedRate(new ZKPathDataDumpTask(helixController,
zkclient,
timeThresholdNoChange),
@@ -92,23 +94,17 @@ public class ControllerManager extends AbstractManager {
public void stop() {
if (_timer != null)
{
+ LOG.info("Stop StatusDumpTask");
_timer.cancel();
_timer = null;
}
}
-
- @Override
- public void run() {
- // TODO Auto-generated method stub
-
- }
-
}
public ControllerManager(String zkAddress, String clusterName, String instanceName) {
super(zkAddress, clusterName, instanceName, InstanceType.CONTROLLER);
- _timerTasks.add(new HealthStatsAggregationTask(this));
+ _timerTasks.add(new HealthStatsAggregationTask(new HealthStatsAggregator(this)));
_timerTasks.add(new StatusDumpTask(_zkclient, this));
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6bb6e2c0/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedControllerManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedControllerManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedControllerManager.java
index 92b4f66..e9f07d7 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedControllerManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedControllerManager.java
@@ -29,8 +29,10 @@ import org.apache.helix.PreConnectCallback;
import org.apache.helix.HelixConstants.ChangeType;
import org.apache.helix.controller.GenericHelixController;
import org.apache.helix.healthcheck.HealthStatsAggregationTask;
+import org.apache.helix.healthcheck.HealthStatsAggregator;
import org.apache.helix.healthcheck.ParticipantHealthReportCollector;
import org.apache.helix.healthcheck.ParticipantHealthReportCollectorImpl;
+import org.apache.helix.healthcheck.ParticipantHealthReportTask;
import org.apache.helix.manager.zk.ControllerManager.StatusDumpTask;
import org.apache.helix.messaging.handling.MessageHandlerFactory;
import org.apache.helix.model.LiveInstance;
@@ -63,9 +65,9 @@ public class DistributedControllerManager extends AbstractManager {
_stateMachineEngine = new HelixStateMachineEngine(this);
_participantHealthInfoCollector = new ParticipantHealthReportCollectorImpl(this, _instanceName);
- _timerTasks.add(_participantHealthInfoCollector);
+ _timerTasks.add(new ParticipantHealthReportTask(_participantHealthInfoCollector));
- _controllerTimerTasks.add(new HealthStatsAggregationTask(this));
+ _controllerTimerTasks.add(new HealthStatsAggregationTask(new HealthStatsAggregator(this)));
_controllerTimerTasks.add(new ControllerManager.StatusDumpTask(_zkclient, this));
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6bb6e2c0/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
index fc54f08..143b02e 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
@@ -56,6 +56,7 @@ import org.apache.helix.HelixConstants.ChangeType;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.healthcheck.ParticipantHealthReportCollector;
import org.apache.helix.healthcheck.ParticipantHealthReportCollectorImpl;
+import org.apache.helix.healthcheck.ParticipantHealthReportTask;
import org.apache.helix.messaging.handling.MessageHandlerFactory;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.HelixConfigScope;
@@ -92,7 +93,7 @@ public class ParticipantManager extends AbstractManager {
_stateMachineEngine = new HelixStateMachineEngine(this);
_participantHealthInfoCollector = new ParticipantHealthReportCollectorImpl(this, _instanceName);
- _timerTasks.add(_participantHealthInfoCollector);
+ _timerTasks.add(new ParticipantHealthReportTask(_participantHealthInfoCollector));
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6bb6e2c0/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index eec38aa..21a5bfb 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -61,8 +61,10 @@ import org.apache.helix.ScopedConfigChangeListener;
import org.apache.helix.ZNRecord;
import org.apache.helix.controller.restlet.ZKPropertyTransferServer;
import org.apache.helix.healthcheck.HealthStatsAggregationTask;
+import org.apache.helix.healthcheck.HealthStatsAggregator;
import org.apache.helix.healthcheck.ParticipantHealthReportCollector;
import org.apache.helix.healthcheck.ParticipantHealthReportCollectorImpl;
+import org.apache.helix.healthcheck.ParticipantHealthReportTask;
import org.apache.helix.messaging.DefaultMessagingService;
import org.apache.helix.messaging.handling.MessageHandlerFactory;
import org.apache.helix.model.ConfigScope;
@@ -106,6 +108,7 @@ public class ZKHelixManager implements HelixManager
private Timer _timer;
private CallbackHandler _leaderElectionHandler;
private ParticipantHealthReportCollectorImpl _participantHealthCheckInfoCollector;
+ private ParticipantHealthReportTask _participantHealthReportTask;
private final DefaultMessagingService _messagingService;
private ZKHelixAdmin _managementTool;
private final String _version;
@@ -213,7 +216,7 @@ public class ZKHelixManager implements HelixManager
_controllerTimerTasks = new ArrayList<HelixTimerTask>();
if (_instanceType == InstanceType.CONTROLLER)
{
- _controllerTimerTasks.add(new HealthStatsAggregationTask(this));
+ _controllerTimerTasks.add(new HealthStatsAggregationTask(new HealthStatsAggregator(this)));
}
}
@@ -460,7 +463,7 @@ public class ZKHelixManager implements HelixManager
if (_participantHealthCheckInfoCollector != null)
{
- _participantHealthCheckInfoCollector.stop();
+ _participantHealthReportTask.stop();
}
if (_timer != null)
@@ -834,7 +837,8 @@ public class ZKHelixManager implements HelixManager
{
_participantHealthCheckInfoCollector =
new ParticipantHealthReportCollectorImpl(this, _instanceName);
- _participantHealthCheckInfoCollector.start();
+ _participantHealthReportTask = new ParticipantHealthReportTask(_participantHealthCheckInfoCollector);
+ _participantHealthReportTask.start();
}
// start the participant health check timer, also create zk path for health
// check info
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6bb6e2c0/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
index fc09070..2baa1cf 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
@@ -125,10 +125,8 @@ public class HelixStateTransitionHandler extends MessageHandler
void postHandleMessage()
{
- // Message message = _message;
- // HelixManager manager = _notificationContext.getManager();
- HelixTaskResult taskResult = (HelixTaskResult) _notificationContext.get(MapKey.HELIX_TASK_RESULT.toString());
- Exception exception = taskResult.getException();
+ HelixTaskResult taskResult = (HelixTaskResult) _notificationContext.get(MapKey.HELIX_TASK_RESULT.toString());
+ Exception exception = taskResult.getException();
String partitionKey = _message.getPartitionName();
String resource = _message.getResourceName();
@@ -141,90 +139,88 @@ public class HelixStateTransitionHandler extends MessageHandler
int bucketSize = _message.getBucketSize();
ZNRecordBucketizer bucketizer = new ZNRecordBucketizer(bucketSize);
- // Lock the helix manager so that the session id will not change when we update
- // the state model state. for zk current state it is OK as we have the per-session
- // current state node
- synchronized (_manager)
+ // No need to sync on manager, we are cancel executor in expiry session before start executor in new session
+ // sessionId might change when we update the state model state.
+ // for zk current state it is OK as we have the per-session current state node
+ if (!_message.getTgtSessionId().equals(_manager.getSessionId()))
{
- if (!_message.getTgtSessionId().equals(_manager.getSessionId()))
- {
- logger.warn("Session id has changed. Skip postExecutionMessage. Old session "
- + _message.getExecutionSessionId() + " , new session : "
- + _manager.getSessionId());
- return;
- }
+ logger.warn("Session id has changed. Skip postExecutionMessage. Old session "
+ + _message.getExecutionSessionId() + " , new session : "
+ + _manager.getSessionId());
+ return;
+ }
- if (taskResult.isSuccess())
- {
- // String fromState = message.getFromState();
- String toState = _message.getToState();
- _currentStateDelta.setState(partitionKey, toState);
+ if (taskResult.isSuccess())
+ {
+ // String fromState = message.getFromState();
+ String toState = _message.getToState();
+ _currentStateDelta.setState(partitionKey, toState);
- if (toState.equalsIgnoreCase(HelixDefinedState.DROPPED.toString()))
- {
- // for "OnOfflineToDROPPED" message, we need to remove the resource key record
- // from the current state of the instance because the resource key is dropped.
- // In the state model it will be stayed as "OFFLINE", which is OK.
- ZNRecordDelta delta =
- new ZNRecordDelta(_currentStateDelta.getRecord(), MergeOperation.SUBTRACT);
- // Don't subtract simple fields since they contain stateModelDefRef
- delta._record.getSimpleFields().clear();
+ if (toState.equalsIgnoreCase(HelixDefinedState.DROPPED.toString()))
+ {
+ // for "OnOfflineToDROPPED" message, we need to remove the resource key record
+ // from the current state of the instance because the resource key is dropped.
+ // In the state model it will be stayed as "OFFLINE", which is OK.
+ ZNRecordDelta delta =
+ new ZNRecordDelta(_currentStateDelta.getRecord(), MergeOperation.SUBTRACT);
+ // Don't subtract simple fields since they contain stateModelDefRef
+ delta._record.getSimpleFields().clear();
- List<ZNRecordDelta> deltaList = new ArrayList<ZNRecordDelta>();
- deltaList.add(delta);
- _currentStateDelta.setDeltaList(deltaList);
- }
- else
- {
- // if the partition is not to be dropped, update _stateModel to the TO_STATE
- _stateModel.updateState(toState);
- }
+ List<ZNRecordDelta> deltaList = new ArrayList<ZNRecordDelta>();
+ deltaList.add(delta);
+ _currentStateDelta.setDeltaList(deltaList);
}
else
{
- if (exception instanceof HelixStateMismatchException)
- {
- // if fromState mismatch, set current state on zk to stateModel's current state
- logger.warn("Force CurrentState on Zk to be stateModel's CurrentState. partitionKey: "
- + partitionKey
- + ", currentState: "
- + _stateModel.getCurrentState()
- + ", message: " + _message);
- _currentStateDelta.setState(partitionKey, _stateModel.getCurrentState());
- }
- else
+ // if the partition is not to be dropped, update _stateModel to the TO_STATE
+ _stateModel.updateState(toState);
+ }
+ }
+ else
+ {
+ if (exception instanceof HelixStateMismatchException)
+ {
+ // if fromState mismatch, set current state on zk to stateModel's current state
+ logger.warn("Force CurrentState on Zk to be stateModel's CurrentState. partitionKey: "
+ + partitionKey
+ + ", currentState: "
+ + _stateModel.getCurrentState()
+ + ", message: " + _message);
+ _currentStateDelta.setState(partitionKey, _stateModel.getCurrentState());
+ }
+ else
+ {
+ StateTransitionError error =
+ new StateTransitionError(ErrorType.INTERNAL, ErrorCode.ERROR, exception);
+ if (exception instanceof InterruptedException)
{
- StateTransitionError error =
- new StateTransitionError(ErrorType.INTERNAL, ErrorCode.ERROR, exception);
- if (exception instanceof InterruptedException)
+ if (_isTimeout)
{
- if (_isTimeout)
- {
- error =
- new StateTransitionError(ErrorType.INTERNAL,
- ErrorCode.TIMEOUT,
- exception);
- }
- else
- {
- // State transition interrupted but not caused by timeout. Keep the current
- // state in this case
- logger.error("State transition interrupted but not timeout. Not updating state. Partition : "
- + _message.getPartitionName() + " MsgId : " + _message.getMsgId());
- return;
- }
+ error =
+ new StateTransitionError(ErrorType.INTERNAL,
+ ErrorCode.TIMEOUT,
+ exception);
}
- _stateModel.rollbackOnError(_message, _notificationContext, error);
- _currentStateDelta.setState(partitionKey, HelixDefinedState.ERROR.toString());
- _stateModel.updateState(HelixDefinedState.ERROR.toString());
-
- // if we have errors transit from ERROR state, disable the partition
- if (_message.getFromState().equalsIgnoreCase(HelixDefinedState.ERROR.toString())) {
- disablePartition();
+ else
+ {
+ // State transition interrupted but not caused by timeout. Keep the current
+ // state in this case
+ logger.error("State transition interrupted but not timeout. Not updating state. Partition : "
+ + _message.getPartitionName() + " MsgId : " + _message.getMsgId());
+ return;
}
}
+ _stateModel.rollbackOnError(_message, _notificationContext, error);
+ _currentStateDelta.setState(partitionKey, HelixDefinedState.ERROR.toString());
+ _stateModel.updateState(HelixDefinedState.ERROR.toString());
+
+ // if we have errors transit from ERROR state, disable the partition
+ if (_message.getFromState().equalsIgnoreCase(HelixDefinedState.ERROR.toString())) {
+ disablePartition();
+ }
}
}
+
try
{
// Update the ZK current state of the node
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6bb6e2c0/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
index cb07494..cd74505 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -419,6 +419,48 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor
accessor.setChildren(readMsgKeys, readMsgs);
}
+ /**
+ * remove message-handler factory from map, shutdown the associated executor
+ *
+ * @param type
+ */
+ void unregisterMessageHandlerFactory(String type) {
+ // shutdown executor-service. disconnect if fail
+ ExecutorService executorSvc = _executorMap.remove(type);
+ if (executorSvc != null) {
+ List<Runnable> tasksLeft = executorSvc.shutdownNow();
+ LOG.info(tasksLeft.size() + " tasks never executed for msgType: "
+ + type + ". tasks: " + tasksLeft);
+ try {
+ if (!executorSvc.awaitTermination(200, TimeUnit.MILLISECONDS)) {
+ LOG.error("executor-service for msgType: " + type
+ + " is not fully terminated in 200ms. will disconnect helix-participant");
+ throw new HelixException("fail to unregister msg-handler for msgType: " + type);
+ }
+ } catch (InterruptedException e) {
+ LOG.error("interruped when waiting for executor-service shutdown for msgType: " + type, e);
+ }
+ }
+
+ // reset state-model
+ MessageHandlerFactory handlerFty = _handlerFactoryMap.remove(type);
+ if (handlerFty != null) {
+ handlerFty.reset();
+ }
+ }
+
+ void reset() {
+ LOG.info("Get FINALIZE notification");
+ for (String msgType : _executorMap.keySet())
+ {
+ unregisterMessageHandlerFactory(msgType);
+ }
+
+ // clear task-map, all tasks should be terminated by now
+ _taskMap.clear();
+
+ }
+
@Override
public void onMessage(String instanceName,
List<Message> messages,
@@ -429,20 +471,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor
// TODO: see if we should have a separate notification call for resetting
if (changeContext.getType() == Type.FINALIZE)
{
- LOG.info("Get FINALIZE notification");
- for (MessageHandlerFactory factory : _handlerFactoryMap.values())
- {
- factory.reset();
- }
- // Cancel all scheduled tasks
- synchronized (_lock)
- {
- for (MessageTaskInfo info : _taskMap.values())
- {
- cancelTask(info._task);
- }
- _taskMap.clear();
- }
+ reset();
return;
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6bb6e2c0/helix-core/src/test/java/org/apache/helix/TestParticipantHealthReportCollectorImpl.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestParticipantHealthReportCollectorImpl.java b/helix-core/src/test/java/org/apache/helix/TestParticipantHealthReportCollectorImpl.java
index 6187b93..4428fa2 100644
--- a/helix-core/src/test/java/org/apache/helix/TestParticipantHealthReportCollectorImpl.java
+++ b/helix-core/src/test/java/org/apache/helix/TestParticipantHealthReportCollectorImpl.java
@@ -22,12 +22,14 @@ package org.apache.helix;
import org.apache.helix.Mocks.MockHealthReportProvider;
import org.apache.helix.Mocks.MockManager;
import org.apache.helix.healthcheck.ParticipantHealthReportCollectorImpl;
+import org.apache.helix.healthcheck.ParticipantHealthReportTask;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
public class TestParticipantHealthReportCollectorImpl {
protected ParticipantHealthReportCollectorImpl _providerImpl;
+ protected ParticipantHealthReportTask _providerTask;
protected HelixManager _manager;
protected MockHealthReportProvider _mockProvider;
@@ -35,21 +37,22 @@ public class TestParticipantHealthReportCollectorImpl {
public void setup()
{
_providerImpl = new ParticipantHealthReportCollectorImpl(new MockManager(), "instance_123");
+ _providerTask = new ParticipantHealthReportTask(_providerImpl);
_mockProvider = new MockHealthReportProvider();
}
@Test (groups = {"unitTest"})
public void testStart() throws Exception
{
- _providerImpl.start();
- _providerImpl.start();
+ _providerTask.start();
+ _providerTask.start();
}
@Test (groups = {"unitTest"})
public void testStop() throws Exception
{
- _providerImpl.stop();
- _providerImpl.stop();
+ _providerTask.stop();
+ _providerTask.stop();
}
@Test (groups = {"unitTest"})
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6bb6e2c0/helix-core/src/test/java/org/apache/helix/healthcheck/TestAddDropAlert.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/healthcheck/TestAddDropAlert.java b/helix-core/src/test/java/org/apache/helix/healthcheck/TestAddDropAlert.java
index 7c82c13..26f4ac3 100644
--- a/helix-core/src/test/java/org/apache/helix/healthcheck/TestAddDropAlert.java
+++ b/helix-core/src/test/java/org/apache/helix/healthcheck/TestAddDropAlert.java
@@ -174,7 +174,7 @@ public class TestAddDropAlert extends ZkIntegrationTestBase
ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_zkClient));
Builder keyBuilder = accessor.keyBuilder();
- new HealthStatsAggregationTask(cmResult._manager).run();
+ new HealthStatsAggregator(cmResult._manager).aggregate();
String instance = "localhost_12918";
ZNRecord record = accessor.getProperty(keyBuilder.alertStatus()).getRecord();
Map<String, Map<String, String>> recMap = record.getMapFields();
@@ -182,7 +182,7 @@ public class TestAddDropAlert extends ZkIntegrationTestBase
Assert.assertTrue(keySet.size() > 0);
_setupTool.getClusterManagementTool().dropAlert(clusterName, _alertStr);
- new HealthStatsAggregationTask(cmResult._manager).run();
+ new HealthStatsAggregator(cmResult._manager).aggregate();
// other verifications go here
// for (int i = 0; i < 1; i++) //change 1 back to 5
// {
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6bb6e2c0/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertActionTriggering.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertActionTriggering.java b/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertActionTriggering.java
index 9c197d0..e20fe77 100644
--- a/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertActionTriggering.java
+++ b/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertActionTriggering.java
@@ -131,8 +131,8 @@ public class TestAlertActionTriggering extends
String controllerName = CONTROLLER_PREFIX + "_0";
HelixManager manager = _startCMResultMap.get(controllerName)._manager;
- HealthStatsAggregationTask task = new HealthStatsAggregationTask(_startCMResultMap.get(controllerName)._manager);
- task.run();
+ HealthStatsAggregator task = new HealthStatsAggregator(_startCMResultMap.get(controllerName)._manager);
+ task.aggregate();
Thread.sleep(4000);
HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
Builder keyBuilder = helixDataAccessor.keyBuilder();
@@ -177,7 +177,7 @@ public class TestAlertActionTriggering extends
// enable the disabled instances
setHealthData(metrics3, metrics3);
- task.run();
+ task.aggregate();
Thread.sleep(1000);
manager.getClusterManagmentTool().enableInstance(manager.getClusterName(), participant2, true);
@@ -191,7 +191,7 @@ public class TestAlertActionTriggering extends
// Test the DISABLE_PARTITION case
int[] metrics4 = {22, 115, 22, 16,163};
setHealthData2(metrics4);
- task.run();
+ task.aggregate();
// scope = new ConfigScopeBuilder().forCluster(manager.getClusterName()).forParticipant(participant1).build();
scope = new HelixConfigScopeBuilder(ConfigScopeProperty.PARTICIPANT)
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6bb6e2c0/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertFireHistory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertFireHistory.java b/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertFireHistory.java
index 24607f6..0f9798f 100644
--- a/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertFireHistory.java
+++ b/helix-core/src/test/java/org/apache/helix/healthcheck/TestAlertFireHistory.java
@@ -108,8 +108,8 @@ public class TestAlertFireHistory extends ZkStandAloneCMTestBaseWithPropertyServ
properties.put("healthChange.enabled", "false");
_setupTool.getClusterManagementTool().setConfig(scope, properties);
- HealthStatsAggregationTask task = new HealthStatsAggregationTask(_startCMResultMap.get(controllerName)._manager);
- task.run();
+ HealthStatsAggregator task = new HealthStatsAggregator(_startCMResultMap.get(controllerName)._manager);
+ task.aggregate();
Thread.sleep(100);
HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
Builder keyBuilder = helixDataAccessor.keyBuilder();
@@ -121,7 +121,7 @@ public class TestAlertFireHistory extends ZkStandAloneCMTestBaseWithPropertyServ
properties.put("healthChange.enabled", "true");
_setupTool.getClusterManagementTool().setConfig(scope, properties);
- task.run();
+ task.aggregate();
Thread.sleep(100);
history = manager.getHelixDataAccessor().getProperty(keyBuilder.alertHistory());
@@ -155,8 +155,8 @@ public class TestAlertFireHistory extends ZkStandAloneCMTestBaseWithPropertyServ
historySize = property.getRecord().getMapFields().size();
}
- HealthStatsAggregationTask task = new HealthStatsAggregationTask(_startCMResultMap.get(controllerName)._manager);
- task.run();
+ HealthStatsAggregator task = new HealthStatsAggregator(_startCMResultMap.get(controllerName)._manager);
+ task.aggregate();
Thread.sleep(100);
@@ -173,7 +173,7 @@ public class TestAlertFireHistory extends ZkStandAloneCMTestBaseWithPropertyServ
Assert.assertTrue(lastRecord.get("(localhost_12921.TestStat@DB#db1.TestMetric2)GREATER(100)").equals("ON"));
setHealthData(metrics1, metrics2);
- task.run();
+ task.aggregate();
Thread.sleep(100);
history = helixDataAccessor.getProperty(keyBuilder.alertHistory()).getRecord();
// no change
@@ -190,7 +190,7 @@ public class TestAlertFireHistory extends ZkStandAloneCMTestBaseWithPropertyServ
int [] metrics3 = {21, 44, 22, 14, 16};
int [] metrics4 = {122, 115, 222, 41,16};
setHealthData(metrics3, metrics4);
- task.run();
+ task.aggregate();
Thread.sleep(100);
history = helixDataAccessor.getProperty(keyBuilder.alertHistory()).getRecord();
// new delta should be recorded
@@ -209,7 +209,7 @@ public class TestAlertFireHistory extends ZkStandAloneCMTestBaseWithPropertyServ
int [] metrics5 = {0, 0, 0, 0, 0};
int [] metrics6 = {0, 0, 0, 0,0};
setHealthData(metrics5, metrics6);
- task.run();
+ task.aggregate();
for (int i = 0; i < 10; i++)
{
@@ -245,7 +245,7 @@ public class TestAlertFireHistory extends ZkStandAloneCMTestBaseWithPropertyServ
int[] metricsy = {99 + 3*x, 99 + 3*y, 98 + 4*x, 98+4*y, 97+5*y};
setHealthData(metricsx, metricsy);
- task.run();
+ task.aggregate();
Thread.sleep(100);
history = helixDataAccessor.getProperty(keyBuilder.alertHistory()).getRecord();
@@ -304,7 +304,7 @@ public class TestAlertFireHistory extends ZkStandAloneCMTestBaseWithPropertyServ
int[] metricsy = {99 + 3*x, 99 + 3*y, 98 + 4*x, 98+4*y, 97+5*y};
setHealthData(metricsx, metricsy);
- task.run();
+ task.aggregate();
for (int j = 0; j < 10; j++) {
Thread.sleep(100);
history = helixDataAccessor.getProperty(keyBuilder.alertHistory()).getRecord();
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6bb6e2c0/helix-core/src/test/java/org/apache/helix/healthcheck/TestExpandAlert.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/healthcheck/TestExpandAlert.java b/helix-core/src/test/java/org/apache/helix/healthcheck/TestExpandAlert.java
index 5151898..5a973cb 100644
--- a/helix-core/src/test/java/org/apache/helix/healthcheck/TestExpandAlert.java
+++ b/helix-core/src/test/java/org/apache/helix/healthcheck/TestExpandAlert.java
@@ -172,7 +172,7 @@ public class TestExpandAlert extends ZkIntegrationTestBase
Thread.sleep(1000);
// HealthAggregationTask is supposed to run by a timer every 30s
// To make sure HealthAggregationTask is run, we invoke it explicitly for this test
- new HealthStatsAggregationTask(cmResult._manager).run();
+ new HealthStatsAggregator(cmResult._manager).aggregate();
//sleep for a few seconds to give stats stage time to trigger
Thread.sleep(3000);
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6bb6e2c0/helix-core/src/test/java/org/apache/helix/healthcheck/TestSimpleAlert.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/healthcheck/TestSimpleAlert.java b/helix-core/src/test/java/org/apache/helix/healthcheck/TestSimpleAlert.java
index b681ecc..635d529 100644
--- a/helix-core/src/test/java/org/apache/helix/healthcheck/TestSimpleAlert.java
+++ b/helix-core/src/test/java/org/apache/helix/healthcheck/TestSimpleAlert.java
@@ -177,7 +177,7 @@ public class TestSimpleAlert extends ZkIntegrationTestBase
// HealthAggregationTask is supposed to run by a timer every 30s
// To make sure HealthAggregationTask is run, we invoke it explicitly for this test
- new HealthStatsAggregationTask(cmResult._manager).run();
+ new HealthStatsAggregator(cmResult._manager).aggregate();
//sleep for a few seconds to give stats stage time to trigger
Thread.sleep(3000);
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6bb6e2c0/helix-core/src/test/java/org/apache/helix/healthcheck/TestSimpleWildcardAlert.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/healthcheck/TestSimpleWildcardAlert.java b/helix-core/src/test/java/org/apache/helix/healthcheck/TestSimpleWildcardAlert.java
index 177948f..3a50e5d 100644
--- a/helix-core/src/test/java/org/apache/helix/healthcheck/TestSimpleWildcardAlert.java
+++ b/helix-core/src/test/java/org/apache/helix/healthcheck/TestSimpleWildcardAlert.java
@@ -179,7 +179,7 @@ public class TestSimpleWildcardAlert extends ZkIntegrationTestBase
Thread.sleep(1000);
// HealthAggregationTask is supposed to run by a timer every 30s
// To make sure HealthAggregationTask is run, we invoke it explicitly for this test
- new HealthStatsAggregationTask(cmResult._manager).run();
+ new HealthStatsAggregator(cmResult._manager).aggregate();
//sleep for a few seconds to give stats stage time to trigger
Thread.sleep(1000);
@@ -221,7 +221,7 @@ public class TestSimpleWildcardAlert extends ZkIntegrationTestBase
_setupTool.getClusterManagementTool().dropAlert(clusterName, alertwildcard);
alertwildcard = "EXP(decay(1.0)(localhost*.RestQueryStats@DBName=TestDB0.latency))CMP(GREATER)CON(15)";
_setupTool.getClusterManagementTool().addAlert(clusterName, alertwildcard);
- new HealthStatsAggregationTask(cmResult._manager).run();
+ new HealthStatsAggregator(cmResult._manager).aggregate();
Thread.sleep(1000);
record = accessor.getProperty(keyBuilder.alertStatus()).getRecord();
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6bb6e2c0/helix-core/src/test/java/org/apache/helix/healthcheck/TestStalenessAlert.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/healthcheck/TestStalenessAlert.java b/helix-core/src/test/java/org/apache/helix/healthcheck/TestStalenessAlert.java
index 1bdb038..d1686cb 100644
--- a/helix-core/src/test/java/org/apache/helix/healthcheck/TestStalenessAlert.java
+++ b/helix-core/src/test/java/org/apache/helix/healthcheck/TestStalenessAlert.java
@@ -170,7 +170,7 @@ public class TestStalenessAlert extends ZkIntegrationTestBase
// HealthAggregationTask is supposed to run by a timer every 30s
// To make sure HealthAggregationTask is run, we invoke it explicitly for this test
- new HealthStatsAggregationTask(cmResult._manager).run();
+ new HealthStatsAggregator(cmResult._manager).aggregate();
//sleep for a few seconds to give stats stage time to trigger
Thread.sleep(3000);
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6bb6e2c0/helix-core/src/test/java/org/apache/helix/healthcheck/TestWildcardAlert.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/healthcheck/TestWildcardAlert.java b/helix-core/src/test/java/org/apache/helix/healthcheck/TestWildcardAlert.java
index 20f0c13..e8b1719 100644
--- a/helix-core/src/test/java/org/apache/helix/healthcheck/TestWildcardAlert.java
+++ b/helix-core/src/test/java/org/apache/helix/healthcheck/TestWildcardAlert.java
@@ -261,7 +261,7 @@ public class TestWildcardAlert extends ZkIntegrationTestBase
Thread.sleep(3000);
// HealthAggregationTask is supposed to run by a timer every 30s
// To make sure HealthAggregationTask is run, we invoke it explicitly for this test
- new HealthStatsAggregationTask(cmResult._manager).run();
+ new HealthStatsAggregator(cmResult._manager).aggregate();
//sleep for a few seconds to give stats stage time to trigger and for bean to trigger
Thread.sleep(3000);
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6bb6e2c0/helix-core/src/test/java/org/apache/helix/integration/TestSessionExpiryInTransition.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSessionExpiryInTransition.java b/helix-core/src/test/java/org/apache/helix/integration/TestSessionExpiryInTransition.java
index e70c3ca..a93e073 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestSessionExpiryInTransition.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestSessionExpiryInTransition.java
@@ -36,6 +36,7 @@ import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.testng.Assert;
+import org.testng.annotations.Test;
public class TestSessionExpiryInTransition extends ZkIntegrationTestBase
@@ -69,9 +70,7 @@ public class TestSessionExpiryInTransition extends ZkIntegrationTestBase
}
}
- // TODO: disable test first until we have a clean design in handling zk disconnect/session-expiry
- // when there is pending messages
- // @Test
+ @Test
public void testSessionExpiryInTransition() throws Exception
{
Logger.getRootLogger().setLevel(Level.WARN);
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6bb6e2c0/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
new file mode 100644
index 0000000..e171539
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
@@ -0,0 +1,139 @@
+package org.apache.helix.integration.manager;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.helix.manager.zk.ParticipantManager;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.mock.participant.DummyProcess.DummyLeaderStandbyStateModelFactory;
+import org.apache.helix.mock.participant.DummyProcess.DummyOnlineOfflineStateModelFactory;
+import org.apache.helix.mock.participant.MockMSModelFactory;
+import org.apache.helix.mock.participant.MockSchemataModelFactory;
+import org.apache.helix.mock.participant.MockTransition;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.log4j.Logger;
+
+
+public class MockParticipantManager extends ParticipantManager implements Runnable
+{
+ private static Logger LOG = Logger.getLogger(MockParticipantManager.class);
+ private final String _instanceName;
+
+ private final CountDownLatch _startCountDown = new CountDownLatch(1);
+ private final CountDownLatch _stopCountDown = new CountDownLatch(1);
+ private final CountDownLatch _waitStopCompleteCountDown = new CountDownLatch(1);
+
+ private final MockMSModelFactory _msModelFactory;
+
+ public MockParticipantManager(String clusterName, String instanceName, String zkAddr) throws Exception
+ {
+ this(clusterName, instanceName, zkAddr, null);
+ }
+
+ public MockParticipantManager(String zkAddr,
+ String clusterName,
+ String instanceName,
+ MockTransition transition) throws Exception
+ {
+ super(zkAddr, clusterName, instanceName);
+ _instanceName = instanceName;
+ _msModelFactory = new MockMSModelFactory(transition);
+ }
+
+ public void setTransition(MockTransition transition)
+ {
+ _msModelFactory.setTrasition(transition);
+ }
+
+ public void syncStop()
+ {
+ _stopCountDown.countDown();
+ try
+ {
+ _waitStopCompleteCountDown.await();
+ }
+ catch (InterruptedException e)
+ {
+ LOG.error("exception in syncStop participant-manager", e);
+ }
+ }
+
+ public void syncStart()
+ {
+ try
+ {
+ new Thread(this).start();
+ _startCountDown.await();
+ }
+ catch (InterruptedException e)
+ {
+ LOG.error("exception in syncStart participant-manager", e);
+ }
+ }
+
+ @Override
+ public void run()
+ {
+ try
+ {
+ StateMachineEngine stateMach = getStateMachineEngine();
+ stateMach.registerStateModelFactory("MasterSlave", _msModelFactory);
+
+ DummyLeaderStandbyStateModelFactory lsModelFactory =
+ new DummyLeaderStandbyStateModelFactory(10);
+ DummyOnlineOfflineStateModelFactory ofModelFactory =
+ new DummyOnlineOfflineStateModelFactory(10);
+ stateMach.registerStateModelFactory("LeaderStandby", lsModelFactory);
+ stateMach.registerStateModelFactory("OnlineOffline", ofModelFactory);
+
+ MockSchemataModelFactory schemataFactory = new MockSchemataModelFactory();
+ stateMach.registerStateModelFactory("STORAGE_DEFAULT_SM_SCHEMATA", schemataFactory);
+
+ connect();
+ _startCountDown.countDown();
+
+ _stopCountDown.await();
+ }
+ catch (InterruptedException e)
+ {
+ String msg =
+ "participant: " + _instanceName + ", " + Thread.currentThread().getName()
+ + " is interrupted";
+ LOG.info(msg);
+ System.err.println(msg);
+ }
+ catch (Exception e)
+ {
+ LOG.error("exception running participant-manager", e);
+ }
+ finally
+ {
+ _startCountDown.countDown();
+
+ disconnect();
+ _waitStopCompleteCountDown.countDown();
+ }
+ }
+
+ public ZkClient getZkClient() {
+ return _zkclient;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6bb6e2c0/helix-core/src/test/java/org/apache/helix/integration/manager/TestAbstractManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/TestAbstractManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/TestAbstractManager.java
new file mode 100644
index 0000000..74774d1
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/TestAbstractManager.java
@@ -0,0 +1,43 @@
+package org.apache.helix.integration.manager;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+
+import org.apache.helix.manager.zk.CallbackHandler;
+import org.apache.helix.manager.zk.ParticipantManager;
+import org.apache.helix.manager.zk.ZkClient;
+
+// ZkHelixManager used for test only. expose more class members
+public class TestAbstractManager extends ParticipantManager {
+
+ public TestAbstractManager(String zkConnectString, String clusterName,
+ String instanceName) throws Exception {
+ super(zkConnectString, clusterName, instanceName);
+ }
+
+ public ZkClient getZkClient() {
+ return _zkclient;
+ }
+
+ public List<CallbackHandler> getHandlers() {
+ return _handlers;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/6bb6e2c0/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java
index efa3138..a35a49d 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java
@@ -19,24 +19,45 @@ package org.apache.helix.integration.manager;
* under the License.
*/
+import java.io.IOException;
import java.util.Date;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkHelixTestManager;
+import org.apache.helix.ZkTestHelper;
import org.apache.helix.integration.ZkIntegrationTestBase;
+import org.apache.helix.integration.TestSessionExpiryInTransition.SessionExpiryTransition;
import org.apache.helix.manager.zk.ControllerManager;
import org.apache.helix.manager.zk.ParticipantManager;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.mock.controller.ClusterController;
import org.apache.helix.mock.participant.MockMSModelFactory;
+import org.apache.helix.mock.participant.MockParticipant;
+import org.apache.helix.mock.participant.MockTransition;
+import org.apache.helix.model.Message;
import org.apache.helix.tools.ClusterStateVerifier;
import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
import org.testng.Assert;
import org.testng.annotations.Test;
public class TestParticipantManager extends ZkIntegrationTestBase {
+ private static Logger LOG = Logger.getLogger(TestParticipantManager.class);
+
@Test
public void simpleIntegrationTest() throws Exception {
// Logger.getRootLogger().setLevel(Level.INFO);
@@ -83,4 +104,174 @@ public class TestParticipantManager extends ZkIntegrationTestBase {
System.out.println("START " + clusterName + " at "
+ new Date(System.currentTimeMillis()));
}
+
+ @Test
+ public void simpleSessionExpiryTest() throws Exception
+ {
+// Logger.getRootLogger().setLevel(Level.WARN);
+
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ final String clusterName = className + "_" + methodName;
+ int n = 1;
+
+ System.out.println("START " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+
+ MockParticipantManager[] participants = new MockParticipantManager[n];
+
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ 1, // partitions per resource
+ n, // number of nodes
+ 1, // replicas
+ "MasterSlave",
+ true); // do rebalance
+
+ // start controller
+ ControllerManager controller = new ControllerManager(ZK_ADDR, clusterName, "controller_0");
+ controller.connect();
+
+ // start participants
+ for (int i = 0; i < n; i++)
+ {
+ String instanceName = "localhost_" + (12918 + i);
+ participants[i] =
+ new MockParticipantManager(ZK_ADDR,
+ clusterName,
+ instanceName,
+ null);
+ participants[i].syncStart();
+ }
+
+ boolean result =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+ String oldSessionId = participants[0].getSessionId();
+
+ // expire zk-connection on localhost_12918
+ ZkTestHelper.expireSession(participants[0].getZkClient());
+
+ // wait until session expiry callback happens
+ TimeUnit.MILLISECONDS.sleep(100);
+
+ result =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+ String newSessionId = participants[0].getSessionId();
+ Assert.assertNotSame(newSessionId, oldSessionId);
+
+ // cleanup
+ controller.disconnect();
+ for (int i = 0; i < n; i++) {
+ participants[i].syncStop();
+ }
+
+ System.out.println("END " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+
+ }
+
+ class SessionExpiryTransition extends MockTransition
+ {
+ private final AtomicBoolean _done = new AtomicBoolean();
+ private final CountDownLatch _startCountdown;
+ private final CountDownLatch _endCountdown;
+
+ public SessionExpiryTransition(CountDownLatch startCountdown, CountDownLatch endCountdown) {
+ _startCountdown = startCountdown;
+ _endCountdown = endCountdown;
+ }
+
+ @Override
+ public void doTransition(Message message, NotificationContext context) throws InterruptedException
+ {
+ String instance = message.getTgtName();
+ String partition = message.getPartitionName();
+ if (instance.equals("localhost_12918")
+ && partition.equals("TestDB0_0")
+ && _done.getAndSet(true) == false)
+ {
+ _startCountdown.countDown();
+ // this await will be interrupted since we cancel the task during handleNewSession
+ _endCountdown.await();
+ }
+ }
+ }
+
+ @Test
+ public void testSessionExpiryInTransition() throws Exception {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ final String clusterName = className + "_" + methodName;
+ int n = 1;
+ CountDownLatch startCountdown = new CountDownLatch(1);
+ CountDownLatch endCountdown = new CountDownLatch(1);
+
+
+ System.out.println("START " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+
+ MockParticipantManager[] participants = new MockParticipantManager[n];
+
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ 1, // partitions per resource
+ n, // number of nodes
+ 1, // replicas
+ "MasterSlave",
+ true); // do rebalance
+
+ // start controller
+ ControllerManager controller = new ControllerManager(ZK_ADDR, clusterName, "controller_0");
+ controller.connect();
+
+ // start participants
+ for (int i = 0; i < n; i++)
+ {
+ String instanceName = "localhost_" + (12918 + i);
+ participants[i] =
+ new MockParticipantManager(ZK_ADDR,
+ clusterName,
+ instanceName,
+ new SessionExpiryTransition(startCountdown, endCountdown));
+ participants[i].syncStart();
+ }
+
+ // wait transition happens to trigger session expiry
+ startCountdown.await();
+ String oldSessionId = participants[0].getSessionId();
+ System.out.println("oldSessionId: " + oldSessionId);
+ ZkTestHelper.expireSession(participants[0].getZkClient());
+
+ boolean result =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ String newSessionId = participants[0].getSessionId();
+ Assert.assertNotSame(newSessionId, oldSessionId);
+
+ // assert interrupt exception error in old session
+ String errPath = PropertyPathConfig.getPath(PropertyType.ERRORS, clusterName, "localhost_12918", oldSessionId, "TestDB0", "TestDB0_0");
+ ZNRecord error = _gZkClient.readData(errPath);
+ Assert.assertNotNull(error, "InterruptedException should happen in old session since task is being cancelled during handleNewSession");
+ String errString = new String(new ZNRecordSerializer().serialize(error));
+ Assert.assertTrue(errString.indexOf("InterruptedException") != -1);
+
+ // cleanup
+ controller.disconnect();
+ for (int i = 0; i < n; i++) {
+ participants[i].syncStop();
+ }
+
+ System.out.println("END " + clusterName + " at "
+ + new Date(System.currentTimeMillis()));
+ }
}