You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by "Zhen Zhang (JIRA)" <ji...@apache.org> on 2014/05/06 20:45:17 UTC
[jira] [Updated] (HELIX-443) Race condition in Helix
register/unregister MessageHandlerFactory
[ https://issues.apache.org/jira/browse/HELIX-443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Zhen Zhang updated HELIX-443:
-----------------------------
Description:
When zk session expiry happens, we will reset all the listeners including HelixTaskExecutor (which is a message listener on Helix participant). The reset() will call HelixTaskExecutor#unregisterMessageHandlerFactory() which has the logic that if the executor is not terminated in 200ms, it will throw an exception, which will in turn, skip removing MssageHandlerFactory from handler-factory map.
{noformat}
void unregisterMessageHandlerFactory(String type) {
// shutdown executor-service. disconnect if fail
ExecutorService executorSvc = _executorMap.remove(type);
if (executorSvc != null) {
List<Runnable> tasksLeft = executorSvc.shutdownNow();
LOG.info(tasksLeft.size() + " tasks never executed for msgType: " + type + ". tasks: "
+ tasksLeft);
try {
if (!executorSvc.awaitTermination(200, TimeUnit.MILLISECONDS)) {
LOG.error("executor-service for msgType: " + type
+ " is not fully terminated in 200ms. will disconnect helix-participant");
throw new HelixException("fail to unregister msg-handler for msgType: " + type);
}
} catch (InterruptedException e) {
LOG.error("interruped when waiting for executor-service shutdown for msgType: " + type, e);
}
}
// reset state-model
MessageHandlerFactory handlerFty = _handlerFactoryMap.remove(type);
if (handlerFty != null) {
handlerFty.reset();
}
}
{noformat}
When we re-connect to zk, we re-register message-handler factory, which first checks if the message-handler factory exists and then adds an executor:
{noformat}
public void registerMessageHandlerFactory(String type, MessageHandlerFactory factory,
int threadpoolSize) {
if (!_handlerFactoryMap.containsKey(type)) {
if (!type.equalsIgnoreCase(factory.getMessageType())) {
throw new HelixException("Message factory type mismatch. Type: " + type + " factory : "
+ factory.getMessageType());
}
_handlerFactoryMap.put(type, factory);
ExecutorService executorSvc = Executors.newFixedThreadPool(threadpoolSize);
_executorMap.put(type, executorSvc);
LOG.info("Added msg-factory for type: " + type + ", threadpool size " + threadpoolSize);
} else {
LOG.warn("Fail to register msg-handler-factory for type: " + type + ", pool-size: "
+ threadpoolSize + ", factory: " + factory);
}
}
{noformat}
So if we fail to remove message-handler factory, we will fail to register executor, which will lead to NPE when we receive a message:
{noformat}
java.lang.NullPointerException
at org.apache.helix.messaging.handling.HelixTaskExecutor.scheduleTask(HelixTaskExecutor.java:243)
at org.apache.helix.messaging.handling.HelixTaskExecutor.onMessage(HelixTaskExecutor.java:531)
at org.apache.helix.manager.zk.CallbackHandler.invoke(CallbackHandler.java:195)
at org.apache.helix.manager.zk.CallbackHandler.handleChildChange(CallbackHandler.java:404)
at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568)
at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
{noformat}
> Race condition in Helix register/unregister MessageHandlerFactory
> -----------------------------------------------------------------
>
> Key: HELIX-443
> URL: https://issues.apache.org/jira/browse/HELIX-443
> Project: Apache Helix
> Issue Type: Bug
> Reporter: Zhen Zhang
> Assignee: Zhen Zhang
>
> When zk session expiry happens, we will reset all the listeners including HelixTaskExecutor (which is a message listener on Helix participant). The reset() will call HelixTaskExecutor#unregisterMessageHandlerFactory() which has the logic that if the executor is not terminated in 200ms, it will throw an exception, which will in turn, skip removing MssageHandlerFactory from handler-factory map.
> {noformat}
> void unregisterMessageHandlerFactory(String type) {
> // shutdown executor-service. disconnect if fail
> ExecutorService executorSvc = _executorMap.remove(type);
> if (executorSvc != null) {
> List<Runnable> tasksLeft = executorSvc.shutdownNow();
> LOG.info(tasksLeft.size() + " tasks never executed for msgType: " + type + ". tasks: "
> + tasksLeft);
> try {
> if (!executorSvc.awaitTermination(200, TimeUnit.MILLISECONDS)) {
> LOG.error("executor-service for msgType: " + type
> + " is not fully terminated in 200ms. will disconnect helix-participant");
> throw new HelixException("fail to unregister msg-handler for msgType: " + type);
> }
> } catch (InterruptedException e) {
> LOG.error("interruped when waiting for executor-service shutdown for msgType: " + type, e);
> }
> }
> // reset state-model
> MessageHandlerFactory handlerFty = _handlerFactoryMap.remove(type);
> if (handlerFty != null) {
> handlerFty.reset();
> }
> }
> {noformat}
> When we re-connect to zk, we re-register message-handler factory, which first checks if the message-handler factory exists and then adds an executor:
> {noformat}
> public void registerMessageHandlerFactory(String type, MessageHandlerFactory factory,
> int threadpoolSize) {
> if (!_handlerFactoryMap.containsKey(type)) {
> if (!type.equalsIgnoreCase(factory.getMessageType())) {
> throw new HelixException("Message factory type mismatch. Type: " + type + " factory : "
> + factory.getMessageType());
> }
> _handlerFactoryMap.put(type, factory);
> ExecutorService executorSvc = Executors.newFixedThreadPool(threadpoolSize);
> _executorMap.put(type, executorSvc);
> LOG.info("Added msg-factory for type: " + type + ", threadpool size " + threadpoolSize);
> } else {
> LOG.warn("Fail to register msg-handler-factory for type: " + type + ", pool-size: "
> + threadpoolSize + ", factory: " + factory);
> }
> }
> {noformat}
> So if we fail to remove message-handler factory, we will fail to register executor, which will lead to NPE when we receive a message:
> {noformat}
> java.lang.NullPointerException
> at org.apache.helix.messaging.handling.HelixTaskExecutor.scheduleTask(HelixTaskExecutor.java:243)
> at org.apache.helix.messaging.handling.HelixTaskExecutor.onMessage(HelixTaskExecutor.java:531)
> at org.apache.helix.manager.zk.CallbackHandler.invoke(CallbackHandler.java:195)
> at org.apache.helix.manager.zk.CallbackHandler.handleChildChange(CallbackHandler.java:404)
> at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568)
> at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> {noformat}
--
This message was sent by Atlassian JIRA
(v6.2#6252)