You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@helix.apache.org by "Hudson (JIRA)" <ji...@apache.org> on 2014/07/11 22:35:05 UTC

[jira] [Commented] (HELIX-443) Race condition in Helix register/unregister MessageHandlerFactory

    [ https://issues.apache.org/jira/browse/HELIX-443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14059310#comment-14059310 ] 

Hudson commented on HELIX-443:
------------------------------

FAILURE: Integrated in helix #1258 (See [https://builds.apache.org/job/helix/1258/])
[HELIX-443] Race condition in Helix register/unregister MessageHandlerFactory, rb=21248 (kanak: rev fdee2dd73dcb074a1e689e2431134edee9104768)
* helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java


> Race condition in Helix register/unregister MessageHandlerFactory
> -----------------------------------------------------------------
>
>                 Key: HELIX-443
>                 URL: https://issues.apache.org/jira/browse/HELIX-443
>             Project: Apache Helix
>          Issue Type: Bug
>            Reporter: Zhen Zhang
>            Assignee: Zhen Zhang
>
> When zk session expiry happens, we will reset all the listeners including HelixTaskExecutor (which is a message listener on Helix participant). The reset() will call HelixTaskExecutor#unregisterMessageHandlerFactory() which has the logic that if the executor is not terminated in 200ms, it will throw an exception, which will in turn, skip removing MssageHandlerFactory from handler-factory map.
> {noformat}
> void unregisterMessageHandlerFactory(String type) {
>     // shutdown executor-service. disconnect if fail
>     ExecutorService executorSvc = _executorMap.remove(type);
>     if (executorSvc != null) {
>       List<Runnable> tasksLeft = executorSvc.shutdownNow();
>       LOG.info(tasksLeft.size() + " tasks never executed for msgType: " + type + ". tasks: "
>           + tasksLeft);
>       try {
>         if (!executorSvc.awaitTermination(200, TimeUnit.MILLISECONDS)) {
>           LOG.error("executor-service for msgType: " + type
>               + " is not fully terminated in 200ms. will disconnect helix-participant");
>           throw new HelixException("fail to unregister msg-handler for msgType: " + type);
>         }
>       } catch (InterruptedException e) {
>         LOG.error("interruped when waiting for executor-service shutdown for msgType: " + type, e);
>       }
>     }
>     // reset state-model
>     MessageHandlerFactory handlerFty = _handlerFactoryMap.remove(type);
>     if (handlerFty != null) {
>       handlerFty.reset();
>     }
>   }
> {noformat}
> When we re-connect to zk, we re-register message-handler factory, which first checks if the message-handler factory exists and then adds an executor:
> {noformat}
>   public void registerMessageHandlerFactory(String type, MessageHandlerFactory factory,
>       int threadpoolSize) {
>     if (!_handlerFactoryMap.containsKey(type)) {
>       if (!type.equalsIgnoreCase(factory.getMessageType())) {
>         throw new HelixException("Message factory type mismatch. Type: " + type + " factory : "
>             + factory.getMessageType());
>       }
>       _handlerFactoryMap.put(type, factory);
>       ExecutorService executorSvc = Executors.newFixedThreadPool(threadpoolSize);
>       _executorMap.put(type, executorSvc);
>       LOG.info("Added msg-factory for type: " + type + ", threadpool size " + threadpoolSize);
>     } else {
>       LOG.warn("Fail to register msg-handler-factory for type: " + type + ", pool-size: "
>           + threadpoolSize + ", factory: " + factory);
>     }
>   }
> {noformat}
> So if we fail to remove message-handler factory, we will fail to register executor, which will lead to NPE when we receive a message:
> {noformat}
> java.lang.NullPointerException
>         at org.apache.helix.messaging.handling.HelixTaskExecutor.scheduleTask(HelixTaskExecutor.java:243)
>         at org.apache.helix.messaging.handling.HelixTaskExecutor.onMessage(HelixTaskExecutor.java:531)
>         at org.apache.helix.manager.zk.CallbackHandler.invoke(CallbackHandler.java:195)
>         at org.apache.helix.manager.zk.CallbackHandler.handleChildChange(CallbackHandler.java:404)
>         at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568)
>         at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.2#6252)