You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2014/06/10 22:03:49 UTC
git commit: [HELIX-453] On session expiry/recovery,
not all message types are re-registered, rb=22432
Repository: helix
Updated Branches:
refs/heads/helix-0.6.x 3d8042266 -> 21f09efa0
[HELIX-453] On session expiry/recovery, not all message types are re-registered, rb=22432
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/21f09efa
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/21f09efa
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/21f09efa
Branch: refs/heads/helix-0.6.x
Commit: 21f09efa01ff4eeb995d451345eb224dbf744e8c
Parents: 3d80422
Author: zzhang <zz...@apache.org>
Authored: Tue Jun 10 13:03:28 2014 -0700
Committer: zzhang <zz...@apache.org>
Committed: Tue Jun 10 13:03:28 2014 -0700
----------------------------------------------------------------------
.../messaging/handling/HelixTaskExecutor.java | 121 +++++++++---
.../java/org/apache/helix/model/Message.java | 10 -
.../helix/integration/TestZkSessionExpiry.java | 188 +++++++++++++++++++
3 files changed, 285 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/21f09efa/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
index 7412311..c322215 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -58,6 +58,37 @@ import org.apache.helix.util.StatusUpdateUtil;
import org.apache.log4j.Logger;
public class HelixTaskExecutor implements MessageListener, TaskExecutor {
+ /**
+ * Put together all registration information about a message handler factory
+ */
+ class MsgHandlerFactoryRegistryItem {
+ private final MessageHandlerFactory _factory;
+ private final int _threadPoolSize;
+
+ public MsgHandlerFactoryRegistryItem(MessageHandlerFactory factory, int threadPoolSize) {
+ if (factory == null) {
+ throw new NullPointerException("Message handler factory is null");
+ }
+
+ if (threadPoolSize <= 0) {
+ throw new IllegalArgumentException("Illegal thread pool size: " + threadPoolSize);
+ }
+
+ _factory = factory;
+ _threadPoolSize = threadPoolSize;
+ }
+
+ int threadPoolSize() {
+ return _threadPoolSize;
+ }
+
+ MessageHandlerFactory factory() {
+ return _factory;
+ }
+ }
+
+ private static Logger LOG = Logger.getLogger(HelixTaskExecutor.class);
+
// TODO: we need to further design how to throttle this.
// From storage point of view, only bootstrap case is expensive
// and we need to throttle, which is mostly IO / network bounded.
@@ -69,21 +100,24 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
private final ParticipantMonitor _monitor;
public static final String MAX_THREADS = "maxThreads";
- final ConcurrentHashMap<String, MessageHandlerFactory> _handlerFactoryMap =
- new ConcurrentHashMap<String, MessageHandlerFactory>();
+ /**
+ * Map of MsgType->MsgHandlerFactoryRegistryItem
+ */
+ final ConcurrentHashMap<String, MsgHandlerFactoryRegistryItem> _hdlrFtyRegistry;
final ConcurrentHashMap<String, ExecutorService> _executorMap;
- private static Logger LOG = Logger.getLogger(HelixTaskExecutor.class);
-
- Map<String, Integer> _resourceThreadpoolSizeMap = new ConcurrentHashMap<String, Integer>();
+ final Map<String, Integer> _resourceThreadpoolSizeMap;
// timer for schedule timeout tasks
final Timer _timer;
public HelixTaskExecutor() {
_taskMap = new ConcurrentHashMap<String, MessageTaskInfo>();
+
+ _hdlrFtyRegistry = new ConcurrentHashMap<String, MsgHandlerFactoryRegistryItem>();
_executorMap = new ConcurrentHashMap<String, ExecutorService>();
+ _resourceThreadpoolSizeMap = new ConcurrentHashMap<String, Integer>();
_lock = new Object();
_statusUpdateUtil = new StatusUpdateUtil();
@@ -107,18 +141,24 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
+ factory.getMessageType());
}
- MessageHandlerFactory prevFactory = _handlerFactoryMap.putIfAbsent(type, factory);
- if (prevFactory == null) {
- if (!_executorMap.contains(type)) {
- _executorMap.put(type, Executors.newFixedThreadPool(threadpoolSize));
- } else {
- LOG.error("Skip to create new thread pool for type: " + type);
+ MsgHandlerFactoryRegistryItem newItem =
+ new MsgHandlerFactoryRegistryItem(factory, threadpoolSize);
+ MsgHandlerFactoryRegistryItem prevItem = _hdlrFtyRegistry.putIfAbsent(type, newItem);
+ if (prevItem == null) {
+ ExecutorService newPool = Executors.newFixedThreadPool(threadpoolSize);
+ ExecutorService prevExecutor = _executorMap.putIfAbsent(type, newPool);
+ if (prevExecutor != null) {
+ LOG.warn("Skip creating a new thread pool for type: " + type + ", already existing pool: "
+ + prevExecutor + ", isShutdown: " + prevExecutor.isShutdown());
+ newPool = null;
}
LOG.info("Registered message handler factory for type: " + type + ", poolSize: "
+ threadpoolSize + ", factory: " + factory + ", pool: " + _executorMap.get(type));
} else {
- LOG.warn("Fail to register message handler factory for type: " + type + ", poolSize: "
- + threadpoolSize + ", factory: " + factory);
+ LOG.info("Skip register message handler factory for type: " + type + ", poolSize: "
+ + threadpoolSize + ", factory: " + factory + ", already existing factory: "
+ + prevItem.factory());
+ newItem = null;
}
}
@@ -161,7 +201,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
/**
* Find the executor service for the message. A message can have a per-statemodelfactory
* executor service, or per-message type executor service.
- **/
+ */
ExecutorService findExecutorServiceForMsg(Message message) {
ExecutorService executorService = _executorMap.get(message.getMsgType());
if (message.getMsgType().equals(MessageType.STATE_TRANSITION.toString())) {
@@ -376,29 +416,39 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
* @param type
*/
void unregisterMessageHandlerFactory(String type) {
+ MsgHandlerFactoryRegistryItem item = _hdlrFtyRegistry.remove(type);
ExecutorService pool = _executorMap.remove(type);
- MessageHandlerFactory handlerFty = _handlerFactoryMap.remove(type);
- LOG.info("Unregistering message handler factory for type: " + type + ", factory: " + handlerFty
- + ", pool: " + pool);
+ LOG.info("Unregistering message handler factory for type: " + type + ", factory: "
+ + item.factory() + ", pool: " + pool);
if (pool != null) {
shutdownAndAwaitTermination(pool);
}
// reset state-model
- if (handlerFty != null) {
- handlerFty.reset();
+ if (item != null) {
+ item.factory().reset();
}
- LOG.info("Unregistered message handler factory for type: " + type + ", factory: " + handlerFty
- + ", pool: " + pool);
+ LOG.info("Unregistered message handler factory for type: " + type + ", factory: "
+ + item.factory() + ", pool: " + pool);
}
void reset() {
LOG.info("Reset HelixTaskExecutor");
- for (String msgType : _executorMap.keySet()) {
- unregisterMessageHandlerFactory(msgType);
+ for (String msgType : _hdlrFtyRegistry.keySet()) {
+ // don't un-register factories, just shutdown all executors
+ ExecutorService pool = _executorMap.remove(msgType);
+ if (pool != null) {
+ LOG.info("Reset exectuor for msgType: " + msgType + ", pool: " + pool);
+ shutdownAndAwaitTermination(pool);
+ }
+
+ MsgHandlerFactoryRegistryItem item = _hdlrFtyRegistry.get(msgType);
+ if (item.factory() != null) {
+ item.factory().reset();
+ }
}
// Log all tasks that fail to terminate
@@ -407,12 +457,29 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
LOG.warn("Task: " + taskId + " fails to terminate. Message: " + info._task.getMessage());
}
_taskMap.clear();
+ }
+ void init() {
+ LOG.info("Init HelixTaskExecutor");
+
+ // Re-init all existing factories
+ for (String msgType : _hdlrFtyRegistry.keySet()) {
+ MsgHandlerFactoryRegistryItem item = _hdlrFtyRegistry.get(msgType);
+ ExecutorService newPool = Executors.newFixedThreadPool(item.threadPoolSize());
+ ExecutorService prevPool = _executorMap.putIfAbsent(msgType, newPool);
+ if (prevPool != null) {
+ // Will happen if we register and call init
+ LOG.info("Skip init a new thread pool for type: " + msgType + ", already existing pool: "
+ + prevPool + ", isShutdown: " + prevPool.isShutdown());
+ newPool = null;
+ }
+ }
}
@Override
public void onMessage(String instanceName, List<Message> messages,
NotificationContext changeContext) {
+
// If FINALIZE notification comes, reset all handler factories
// and terminate all the thread pools
// TODO: see if we should have a separate notification call for resetting
@@ -421,6 +488,11 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
return;
}
+ if (changeContext.getType() == Type.INIT) {
+ init();
+ // continue to process messages
+ }
+
if (messages == null || messages.size() == 0) {
LOG.info("No Messages to process");
return;
@@ -562,7 +634,8 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
public MessageHandler createMessageHandler(Message message, NotificationContext changeContext) {
String msgType = message.getMsgType().toString();
- MessageHandlerFactory handlerFactory = _handlerFactoryMap.get(msgType);
+ MsgHandlerFactoryRegistryItem item = _hdlrFtyRegistry.get(msgType);
+ MessageHandlerFactory handlerFactory = item.factory();
// Fail to find a MessageHandlerFactory for the message
// we will keep the message and the message will be handled when
http://git-wip-us.apache.org/repos/asf/helix/blob/21f09efa/helix-core/src/main/java/org/apache/helix/model/Message.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/Message.java b/helix-core/src/main/java/org/apache/helix/model/Message.java
index d599b8b..937a28e 100644
--- a/helix-core/src/main/java/org/apache/helix/model/Message.java
+++ b/helix-core/src/main/java/org/apache/helix/model/Message.java
@@ -626,16 +626,6 @@ public class Message extends HelixProperty {
return partitionNames;
}
- // public AtomicInteger getGroupMsgCountDown()
- // {
- // return _groupMsgCountDown;
- // }
- //
- // public void setGroupMsgCountDown(AtomicInteger countDown)
- // {
- // _groupMsgCountDown = countDown;
- // }
-
/**
* Check if this message is targetted for a controller
* @return true if this is a controller message, false otherwise
http://git-wip-us.apache.org/repos/asf/helix/blob/21f09efa/helix-core/src/test/java/org/apache/helix/integration/TestZkSessionExpiry.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestZkSessionExpiry.java b/helix-core/src/test/java/org/apache/helix/integration/TestZkSessionExpiry.java
new file mode 100644
index 0000000..c911283
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestZkSessionExpiry.java
@@ -0,0 +1,188 @@
+package org.apache.helix.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZkTestHelper;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.messaging.handling.HelixTaskResult;
+import org.apache.helix.messaging.handling.MessageHandler;
+import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.model.Message;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestZkSessionExpiry extends ZkUnitTestBase {
+ final static String DUMMY_MSG_TYPE = "DUMMY";
+
+ static class DummyMessageHandler extends MessageHandler {
+ final Set<String> _handledMsgSet;
+
+ public DummyMessageHandler(Message message, NotificationContext context,
+ Set<String> handledMsgSet) {
+ super(message, context);
+ _handledMsgSet = handledMsgSet;
+ }
+
+ @Override
+ public HelixTaskResult handleMessage() throws InterruptedException {
+ _handledMsgSet.add(_message.getId());
+ HelixTaskResult ret = new HelixTaskResult();
+ ret.setSuccess(true);
+ return ret;
+ }
+
+ @Override
+ public void onError(Exception e, ErrorCode code, ErrorType type) {
+ // Do nothing
+ }
+
+ }
+
+ static class DummyMessageHandlerFactory implements MessageHandlerFactory {
+ final Set<String> _handledMsgSet;
+
+ public DummyMessageHandlerFactory(Set<String> handledMsgSet) {
+ _handledMsgSet = handledMsgSet;
+ }
+
+ @Override
+ public MessageHandler createHandler(Message message, NotificationContext context) {
+ return new DummyMessageHandler(message, context, _handledMsgSet);
+ }
+
+ @Override
+ public String getMessageType() {
+ return DUMMY_MSG_TYPE;
+ }
+
+ @Override
+ public void reset() {
+ // Do nothing
+ }
+
+ }
+
+ @Test
+ public void testMsgHdlrFtyReRegistration() throws Exception {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ int n = 2;
+
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ 8, // partitions per resource
+ n, // number of nodes
+ 2, // replicas
+ "MasterSlave", true); // do rebalance
+
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+ controller.syncStart();
+
+ // start participants
+ Set<String> handledMsgSet = new HashSet<String>();
+ MockParticipantManager[] participants = new MockParticipantManager[n];
+ for (int i = 0; i < n; i++) {
+ String instanceName = "localhost_" + (12918 + i);
+
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+ participants[i].getMessagingService().registerMessageHandlerFactory(DUMMY_MSG_TYPE,
+ new DummyMessageHandlerFactory(handledMsgSet));
+ participants[i].syncStart();
+ }
+
+ boolean result =
+ ClusterStateVerifier
+ .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // trigger dummy message handler
+ checkDummyMsgHandler(participants[0], handledMsgSet);
+
+ // expire localhost_12918
+ ZkTestHelper.expireSession(participants[0].getZkClient());
+ result =
+ ClusterStateVerifier
+ .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // trigger dummy message handler again
+ checkDummyMsgHandler(participants[0], handledMsgSet);
+
+ // clean up
+ controller.syncStop();
+ for (int i = 0; i < n; i++) {
+ participants[i].syncStop();
+ }
+
+ System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+ }
+
+ /**
+ * trigger dummy message handler and verify it's invoked
+ * @param manager
+ * @param handledMsgMap
+ * @throws Exception
+ */
+ private static void checkDummyMsgHandler(HelixManager manager,
+ final Set<String> handledMsgSet) throws Exception {
+
+ final Message aMsg = newMsg();
+ HelixDataAccessor accessor = manager.getHelixDataAccessor();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ accessor.setProperty(keyBuilder.message(manager.getInstanceName(), aMsg.getId()), aMsg);
+ boolean result = TestHelper.verify(new TestHelper.Verifier() {
+
+ @Override
+ public boolean verify() throws Exception {
+
+ return handledMsgSet.contains(aMsg.getId());
+ }
+ }, 5 * 1000);
+ Assert.assertTrue(result);
+ }
+
+ private static Message newMsg() {
+ Message msg = new Message(DUMMY_MSG_TYPE, UUID.randomUUID().toString());
+ msg.setTgtSessionId("*");
+ msg.setTgtName("localhost_12918");
+ return msg;
+ }
+}