You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2014/06/10 22:03:49 UTC

git commit: [HELIX-453] On session expiry/recovery, not all message types are re-registered, rb=22432

Repository: helix
Updated Branches:
  refs/heads/helix-0.6.x 3d8042266 -> 21f09efa0


[HELIX-453] On session expiry/recovery, not all message types are re-registered, rb=22432


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/21f09efa
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/21f09efa
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/21f09efa

Branch: refs/heads/helix-0.6.x
Commit: 21f09efa01ff4eeb995d451345eb224dbf744e8c
Parents: 3d80422
Author: zzhang <zz...@apache.org>
Authored: Tue Jun 10 13:03:28 2014 -0700
Committer: zzhang <zz...@apache.org>
Committed: Tue Jun 10 13:03:28 2014 -0700

----------------------------------------------------------------------
 .../messaging/handling/HelixTaskExecutor.java   | 121 +++++++++---
 .../java/org/apache/helix/model/Message.java    |  10 -
 .../helix/integration/TestZkSessionExpiry.java  | 188 +++++++++++++++++++
 3 files changed, 285 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/21f09efa/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
index 7412311..c322215 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -58,6 +58,37 @@ import org.apache.helix.util.StatusUpdateUtil;
 import org.apache.log4j.Logger;
 
 public class HelixTaskExecutor implements MessageListener, TaskExecutor {
+  /**
+   * Put together all registration information about a message handler factory
+   */
+  class MsgHandlerFactoryRegistryItem {
+    private final MessageHandlerFactory _factory;
+    private final int _threadPoolSize;
+
+    public MsgHandlerFactoryRegistryItem(MessageHandlerFactory factory, int threadPoolSize) {
+      if (factory == null) {
+        throw new NullPointerException("Message handler factory is null");
+      }
+
+      if (threadPoolSize <= 0) {
+        throw new IllegalArgumentException("Illegal thread pool size: " + threadPoolSize);
+      }
+
+      _factory = factory;
+      _threadPoolSize = threadPoolSize;
+    }
+
+    int threadPoolSize() {
+      return _threadPoolSize;
+    }
+
+    MessageHandlerFactory factory() {
+      return _factory;
+    }
+  }
+
+  private static Logger LOG = Logger.getLogger(HelixTaskExecutor.class);
+
   // TODO: we need to further design how to throttle this.
   // From storage point of view, only bootstrap case is expensive
   // and we need to throttle, which is mostly IO / network bounded.
@@ -69,21 +100,24 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
   private final ParticipantMonitor _monitor;
   public static final String MAX_THREADS = "maxThreads";
 
-  final ConcurrentHashMap<String, MessageHandlerFactory> _handlerFactoryMap =
-      new ConcurrentHashMap<String, MessageHandlerFactory>();
+  /**
+   * Map of MsgType->MsgHandlerFactoryRegistryItem
+   */
+  final ConcurrentHashMap<String, MsgHandlerFactoryRegistryItem> _hdlrFtyRegistry;
 
   final ConcurrentHashMap<String, ExecutorService> _executorMap;
 
-  private static Logger LOG = Logger.getLogger(HelixTaskExecutor.class);
-
-  Map<String, Integer> _resourceThreadpoolSizeMap = new ConcurrentHashMap<String, Integer>();
+  final Map<String, Integer> _resourceThreadpoolSizeMap;
 
   // timer for schedule timeout tasks
   final Timer _timer;
 
   public HelixTaskExecutor() {
     _taskMap = new ConcurrentHashMap<String, MessageTaskInfo>();
+
+    _hdlrFtyRegistry = new ConcurrentHashMap<String, MsgHandlerFactoryRegistryItem>();
     _executorMap = new ConcurrentHashMap<String, ExecutorService>();
+    _resourceThreadpoolSizeMap = new ConcurrentHashMap<String, Integer>();
 
     _lock = new Object();
     _statusUpdateUtil = new StatusUpdateUtil();
@@ -107,18 +141,24 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
           + factory.getMessageType());
     }
 
-    MessageHandlerFactory prevFactory = _handlerFactoryMap.putIfAbsent(type, factory);
-    if (prevFactory == null) {
-      if (!_executorMap.contains(type)) {
-        _executorMap.put(type, Executors.newFixedThreadPool(threadpoolSize));
-      } else {
-        LOG.error("Skip to create new thread pool for type: " + type);
+    MsgHandlerFactoryRegistryItem newItem =
+        new MsgHandlerFactoryRegistryItem(factory, threadpoolSize);
+    MsgHandlerFactoryRegistryItem prevItem = _hdlrFtyRegistry.putIfAbsent(type, newItem);
+    if (prevItem == null) {
+      ExecutorService newPool = Executors.newFixedThreadPool(threadpoolSize);
+      ExecutorService prevExecutor = _executorMap.putIfAbsent(type, newPool);
+      if (prevExecutor != null) {
+        LOG.warn("Skip creating a new thread pool for type: " + type + ", already existing pool: "
+            + prevExecutor + ", isShutdown: " + prevExecutor.isShutdown());
+        newPool = null;
       }
       LOG.info("Registered message handler factory for type: " + type + ", poolSize: "
           + threadpoolSize + ", factory: " + factory + ", pool: " + _executorMap.get(type));
     } else {
-      LOG.warn("Fail to register message handler factory for type: " + type + ", poolSize: "
-          + threadpoolSize + ", factory: " + factory);
+      LOG.info("Skip register message handler factory for type: " + type + ", poolSize: "
+          + threadpoolSize + ", factory: " + factory + ", already existing factory: "
+          + prevItem.factory());
+      newItem = null;
     }
   }
 
@@ -161,7 +201,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
   /**
    * Find the executor service for the message. A message can have a per-statemodelfactory
    * executor service, or per-message type executor service.
-   **/
+   */
   ExecutorService findExecutorServiceForMsg(Message message) {
     ExecutorService executorService = _executorMap.get(message.getMsgType());
     if (message.getMsgType().equals(MessageType.STATE_TRANSITION.toString())) {
@@ -376,29 +416,39 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
    * @param type
    */
   void unregisterMessageHandlerFactory(String type) {
+    MsgHandlerFactoryRegistryItem item = _hdlrFtyRegistry.remove(type);
     ExecutorService pool = _executorMap.remove(type);
-    MessageHandlerFactory handlerFty = _handlerFactoryMap.remove(type);
 
-    LOG.info("Unregistering message handler factory for type: " + type + ", factory: " + handlerFty
-        + ", pool: " + pool);
+    LOG.info("Unregistering message handler factory for type: " + type + ", factory: "
+        + item.factory() + ", pool: " + pool);
 
     if (pool != null) {
       shutdownAndAwaitTermination(pool);
     }
 
     // reset state-model
-    if (handlerFty != null) {
-      handlerFty.reset();
+    if (item != null) {
+      item.factory().reset();
     }
 
-    LOG.info("Unregistered message handler factory for type: " + type + ", factory: " + handlerFty
-        + ", pool: " + pool);
+    LOG.info("Unregistered message handler factory for type: " + type + ", factory: "
+        + item.factory() + ", pool: " + pool);
   }
 
   void reset() {
     LOG.info("Reset HelixTaskExecutor");
-    for (String msgType : _executorMap.keySet()) {
-      unregisterMessageHandlerFactory(msgType);
+    for (String msgType : _hdlrFtyRegistry.keySet()) {
+      // don't un-register factories, just shutdown all executors
+      ExecutorService pool = _executorMap.remove(msgType);
+      if (pool != null) {
+        LOG.info("Reset exectuor for msgType: " + msgType + ", pool: " + pool);
+        shutdownAndAwaitTermination(pool);
+      }
+
+      MsgHandlerFactoryRegistryItem item = _hdlrFtyRegistry.get(msgType);
+      if (item.factory() != null) {
+        item.factory().reset();
+      }
     }
 
     // Log all tasks that fail to terminate
@@ -407,12 +457,29 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
       LOG.warn("Task: " + taskId + " fails to terminate. Message: " + info._task.getMessage());
     }
     _taskMap.clear();
+  }
 
+  void init() {
+    LOG.info("Init HelixTaskExecutor");
+
+    // Re-init all existing factories
+    for (String msgType : _hdlrFtyRegistry.keySet()) {
+      MsgHandlerFactoryRegistryItem item = _hdlrFtyRegistry.get(msgType);
+      ExecutorService newPool = Executors.newFixedThreadPool(item.threadPoolSize());
+      ExecutorService prevPool = _executorMap.putIfAbsent(msgType, newPool);
+      if (prevPool != null) {
+        // Will happen if we register and call init
+        LOG.info("Skip init a new thread pool for type: " + msgType + ", already existing pool: "
+            + prevPool + ", isShutdown: " + prevPool.isShutdown());
+        newPool = null;
+      }
+    }
   }
 
   @Override
   public void onMessage(String instanceName, List<Message> messages,
       NotificationContext changeContext) {
+
     // If FINALIZE notification comes, reset all handler factories
     // and terminate all the thread pools
     // TODO: see if we should have a separate notification call for resetting
@@ -421,6 +488,11 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
       return;
     }
 
+    if (changeContext.getType() == Type.INIT) {
+      init();
+      // continue to process messages
+    }
+
     if (messages == null || messages.size() == 0) {
       LOG.info("No Messages to process");
       return;
@@ -562,7 +634,8 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
   public MessageHandler createMessageHandler(Message message, NotificationContext changeContext) {
     String msgType = message.getMsgType().toString();
 
-    MessageHandlerFactory handlerFactory = _handlerFactoryMap.get(msgType);
+    MsgHandlerFactoryRegistryItem item = _hdlrFtyRegistry.get(msgType);
+    MessageHandlerFactory handlerFactory = item.factory();
 
     // Fail to find a MessageHandlerFactory for the message
     // we will keep the message and the message will be handled when

http://git-wip-us.apache.org/repos/asf/helix/blob/21f09efa/helix-core/src/main/java/org/apache/helix/model/Message.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/Message.java b/helix-core/src/main/java/org/apache/helix/model/Message.java
index d599b8b..937a28e 100644
--- a/helix-core/src/main/java/org/apache/helix/model/Message.java
+++ b/helix-core/src/main/java/org/apache/helix/model/Message.java
@@ -626,16 +626,6 @@ public class Message extends HelixProperty {
     return partitionNames;
   }
 
-  // public AtomicInteger getGroupMsgCountDown()
-  // {
-  // return _groupMsgCountDown;
-  // }
-  //
-  // public void setGroupMsgCountDown(AtomicInteger countDown)
-  // {
-  // _groupMsgCountDown = countDown;
-  // }
-
   /**
    * Check if this message is targetted for a controller
    * @return true if this is a controller message, false otherwise

http://git-wip-us.apache.org/repos/asf/helix/blob/21f09efa/helix-core/src/test/java/org/apache/helix/integration/TestZkSessionExpiry.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestZkSessionExpiry.java b/helix-core/src/test/java/org/apache/helix/integration/TestZkSessionExpiry.java
new file mode 100644
index 0000000..c911283
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestZkSessionExpiry.java
@@ -0,0 +1,188 @@
+package org.apache.helix.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZkTestHelper;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.messaging.handling.HelixTaskResult;
+import org.apache.helix.messaging.handling.MessageHandler;
+import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.model.Message;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestZkSessionExpiry extends ZkUnitTestBase {
+  final static String DUMMY_MSG_TYPE = "DUMMY";
+
+  static class DummyMessageHandler extends MessageHandler {
+    final Set<String> _handledMsgSet;
+
+    public DummyMessageHandler(Message message, NotificationContext context,
+        Set<String> handledMsgSet) {
+      super(message, context);
+      _handledMsgSet = handledMsgSet;
+    }
+
+    @Override
+    public HelixTaskResult handleMessage() throws InterruptedException {
+      _handledMsgSet.add(_message.getId());
+      HelixTaskResult ret = new HelixTaskResult();
+      ret.setSuccess(true);
+      return ret;
+    }
+
+    @Override
+    public void onError(Exception e, ErrorCode code, ErrorType type) {
+      // Do nothing
+    }
+
+  }
+
+  static class DummyMessageHandlerFactory implements MessageHandlerFactory {
+    final Set<String> _handledMsgSet;
+
+    public DummyMessageHandlerFactory(Set<String> handledMsgSet) {
+      _handledMsgSet = handledMsgSet;
+    }
+
+    @Override
+    public MessageHandler createHandler(Message message, NotificationContext context) {
+      return new DummyMessageHandler(message, context, _handledMsgSet);
+    }
+
+    @Override
+    public String getMessageType() {
+      return DUMMY_MSG_TYPE;
+    }
+
+    @Override
+    public void reset() {
+      // Do nothing
+    }
+
+  }
+
+  @Test
+  public void testMsgHdlrFtyReRegistration() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    int n = 2;
+
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        "TestDB", // resource name prefix
+        1, // resources
+        8, // partitions per resource
+        n, // number of nodes
+        2, // replicas
+        "MasterSlave", true); // do rebalance
+
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+    controller.syncStart();
+
+    // start participants
+    Set<String> handledMsgSet = new HashSet<String>();
+    MockParticipantManager[] participants = new MockParticipantManager[n];
+    for (int i = 0; i < n; i++) {
+      String instanceName = "localhost_" + (12918 + i);
+
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+      participants[i].getMessagingService().registerMessageHandlerFactory(DUMMY_MSG_TYPE,
+          new DummyMessageHandlerFactory(handledMsgSet));
+      participants[i].syncStart();
+    }
+
+    boolean result =
+        ClusterStateVerifier
+            .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+                clusterName));
+    Assert.assertTrue(result);
+
+    // trigger dummy message handler
+    checkDummyMsgHandler(participants[0], handledMsgSet);
+
+    // expire localhost_12918
+    ZkTestHelper.expireSession(participants[0].getZkClient());
+    result =
+        ClusterStateVerifier
+            .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+                clusterName));
+    Assert.assertTrue(result);
+
+    // trigger dummy message handler again
+    checkDummyMsgHandler(participants[0], handledMsgSet);
+
+    // clean up
+    controller.syncStop();
+    for (int i = 0; i < n; i++) {
+      participants[i].syncStop();
+    }
+
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+
+  /**
+   * trigger dummy message handler and verify it's invoked
+   * @param manager
+   * @param handledMsgMap
+   * @throws Exception
+   */
+  private static void checkDummyMsgHandler(HelixManager manager,
+      final Set<String> handledMsgSet) throws Exception {
+
+    final Message aMsg = newMsg();
+    HelixDataAccessor accessor = manager.getHelixDataAccessor();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    accessor.setProperty(keyBuilder.message(manager.getInstanceName(), aMsg.getId()), aMsg);
+    boolean result = TestHelper.verify(new TestHelper.Verifier() {
+
+      @Override
+      public boolean verify() throws Exception {
+
+        return handledMsgSet.contains(aMsg.getId());
+      }
+    }, 5 * 1000);
+    Assert.assertTrue(result);
+  }
+
+  private static Message newMsg() {
+    Message msg = new Message(DUMMY_MSG_TYPE, UUID.randomUUID().toString());
+    msg.setTgtSessionId("*");
+    msg.setTgtName("localhost_12918");
+    return msg;
+  }
+}