You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by "Kanak Biscuitwala (JIRA)" <ji...@apache.org> on 2014/08/25 19:37:00 UTC
[jira] [Updated] (HELIX-443) Race condition in Helix
register/unregister MessageHandlerFactory
[ https://issues.apache.org/jira/browse/HELIX-443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Kanak Biscuitwala updated HELIX-443:
------------------------------------
Fix Version/s: 0.6.4
0.7.1
> Race condition in Helix register/unregister MessageHandlerFactory
> -----------------------------------------------------------------
>
> Key: HELIX-443
> URL: https://issues.apache.org/jira/browse/HELIX-443
> Project: Apache Helix
> Issue Type: Bug
> Reporter: Zhen Zhang
> Assignee: Zhen Zhang
> Fix For: 0.7.1, 0.6.4
>
>
> When zk session expiry happens, we will reset all the listeners including HelixTaskExecutor (which is a message listener on Helix participant). The reset() will call HelixTaskExecutor#unregisterMessageHandlerFactory() which has the logic that if the executor is not terminated in 200ms, it will throw an exception, which will in turn, skip removing MssageHandlerFactory from handler-factory map.
> {noformat}
> void unregisterMessageHandlerFactory(String type) {
> // shutdown executor-service. disconnect if fail
> ExecutorService executorSvc = _executorMap.remove(type);
> if (executorSvc != null) {
> List<Runnable> tasksLeft = executorSvc.shutdownNow();
> LOG.info(tasksLeft.size() + " tasks never executed for msgType: " + type + ". tasks: "
> + tasksLeft);
> try {
> if (!executorSvc.awaitTermination(200, TimeUnit.MILLISECONDS)) {
> LOG.error("executor-service for msgType: " + type
> + " is not fully terminated in 200ms. will disconnect helix-participant");
> throw new HelixException("fail to unregister msg-handler for msgType: " + type);
> }
> } catch (InterruptedException e) {
> LOG.error("interruped when waiting for executor-service shutdown for msgType: " + type, e);
> }
> }
> // reset state-model
> MessageHandlerFactory handlerFty = _handlerFactoryMap.remove(type);
> if (handlerFty != null) {
> handlerFty.reset();
> }
> }
> {noformat}
> When we re-connect to zk, we re-register message-handler factory, which first checks if the message-handler factory exists and then adds an executor:
> {noformat}
> public void registerMessageHandlerFactory(String type, MessageHandlerFactory factory,
> int threadpoolSize) {
> if (!_handlerFactoryMap.containsKey(type)) {
> if (!type.equalsIgnoreCase(factory.getMessageType())) {
> throw new HelixException("Message factory type mismatch. Type: " + type + " factory : "
> + factory.getMessageType());
> }
> _handlerFactoryMap.put(type, factory);
> ExecutorService executorSvc = Executors.newFixedThreadPool(threadpoolSize);
> _executorMap.put(type, executorSvc);
> LOG.info("Added msg-factory for type: " + type + ", threadpool size " + threadpoolSize);
> } else {
> LOG.warn("Fail to register msg-handler-factory for type: " + type + ", pool-size: "
> + threadpoolSize + ", factory: " + factory);
> }
> }
> {noformat}
> So if we fail to remove message-handler factory, we will fail to register executor, which will lead to NPE when we receive a message:
> {noformat}
> java.lang.NullPointerException
> at org.apache.helix.messaging.handling.HelixTaskExecutor.scheduleTask(HelixTaskExecutor.java:243)
> at org.apache.helix.messaging.handling.HelixTaskExecutor.onMessage(HelixTaskExecutor.java:531)
> at org.apache.helix.manager.zk.CallbackHandler.invoke(CallbackHandler.java:195)
> at org.apache.helix.manager.zk.CallbackHandler.handleChildChange(CallbackHandler.java:404)
> at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568)
> at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> {noformat}
--
This message was sent by Atlassian JIRA
(v6.2#6252)