You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/01/23 00:18:28 UTC

git commit: [HELIX-281] Prevent message callbacks from indefinitely starving other callbacks

Updated Branches:
  refs/heads/helix-0.6.2-release f678f79f3 -> c92428023


[HELIX-281] Prevent message callbacks from indefinitely starving other callbacks


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/c9242802
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/c9242802
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/c9242802

Branch: refs/heads/helix-0.6.2-release
Commit: c92428023a6b8456c0e0ecce0649e61ea2575863
Parents: f678f79
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Wed Jan 8 17:18:35 2014 -0800
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Wed Jan 22 15:18:02 2014 -0800

----------------------------------------------------------------------
 .../controller/GenericHelixController.java      |  67 +++++++---
 .../stages/ClusterEventBlockingQueue.java       | 123 +++++++++++++++++++
 .../stages/TestClusterEventBlockingQueue.java   |  95 ++++++++++++++
 .../helix/integration/TestSchedulerMessage.java |   6 +-
 4 files changed, 276 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/c9242802/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 3b522e6..7e28399 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -28,6 +28,7 @@ import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.I0Itec.zkclient.exception.ZkInterruptedException;
 import org.apache.helix.ConfigChangeListener;
 import org.apache.helix.ControllerChangeListener;
 import org.apache.helix.CurrentStateChangeListener;
@@ -39,13 +40,14 @@ import org.apache.helix.IdealStateChangeListener;
 import org.apache.helix.LiveInstanceChangeListener;
 import org.apache.helix.MessageListener;
 import org.apache.helix.NotificationContext;
-import org.apache.helix.ZNRecord;
 import org.apache.helix.NotificationContext.Type;
 import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.ZNRecord;
 import org.apache.helix.controller.pipeline.Pipeline;
 import org.apache.helix.controller.pipeline.PipelineRegistry;
 import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
 import org.apache.helix.controller.stages.ClusterEvent;
+import org.apache.helix.controller.stages.ClusterEventBlockingQueue;
 import org.apache.helix.controller.stages.CompatibilityCheckStage;
 import org.apache.helix.controller.stages.CurrentStateComputationStage;
 import org.apache.helix.controller.stages.ExternalViewComputeStage;
@@ -55,8 +57,8 @@ import org.apache.helix.controller.stages.MessageThrottleStage;
 import org.apache.helix.controller.stages.ReadClusterDataStage;
 import org.apache.helix.controller.stages.RebalanceIdealStateStage;
 import org.apache.helix.controller.stages.ResourceComputationStage;
-import org.apache.helix.controller.stages.TaskAssignmentStage;
 import org.apache.helix.controller.stages.ResourceValidationStage;
+import org.apache.helix.controller.stages.TaskAssignmentStage;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.HealthStat;
@@ -95,6 +97,12 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
   ClusterStatusMonitor _clusterStatusMonitor;
 
   /**
+   * A queue for controller events and a thread that will consume it
+   */
+  private final ClusterEventBlockingQueue _eventQueue;
+  private final ClusterEventProcessor _eventThread;
+
+  /**
    * The _paused flag is checked by function handleEvent(), while if the flag is set
    * handleEvent() will be no-op. Other event handling logic keeps the same when the flag
    * is set.
@@ -134,7 +142,7 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
       List<ZNRecord> dummy = new ArrayList<ZNRecord>();
       event.addAttribute("eventData", dummy);
       // Should be able to process
-      handleEvent(event);
+      _eventQueue.put(event);
     }
   }
 
@@ -226,6 +234,9 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
     _registry = registry;
     _lastSeenInstances = new AtomicReference<Map<String, LiveInstance>>();
     _lastSeenSessions = new AtomicReference<Map<String, LiveInstance>>();
+    _eventQueue = new ClusterEventBlockingQueue();
+    _eventThread = new ClusterEventProcessor();
+    _eventThread.start();
   }
 
   /**
@@ -277,6 +288,8 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
       return;
     }
 
+    logger.info("START: Invoking controller pipeline for event: " + event.getName());
+    long startTime = System.currentTimeMillis();
     for (Pipeline pipeline : pipelines) {
       try {
         pipeline.handle(event);
@@ -287,6 +300,9 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
         break;
       }
     }
+    long endTime = System.currentTimeMillis();
+    logger.info("END: Invoking controller pipeline for event: " + event.getName() + ", took "
+        + (endTime - startTime) + " ms");
   }
 
   // TODO since we read data in pipeline, we can get rid of reading from zookeeper in
@@ -300,7 +316,7 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
     // event.addAttribute("helixmanager", changeContext.getManager());
     // event.addAttribute("changeContext", changeContext);
     // event.addAttribute("eventData", externalViewList);
-    // // handleEvent(event);
+    // _eventQueue.put(event);
     // logger.info("END: GenericClusterController.onExternalViewChange()");
   }
 
@@ -313,7 +329,7 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
     event.addAttribute("instanceName", instanceName);
     event.addAttribute("changeContext", changeContext);
     event.addAttribute("eventData", statesInfo);
-    handleEvent(event);
+    _eventQueue.put(event);
     logger.info("END: GenericClusterController.onStateChange()");
   }
 
@@ -337,7 +353,7 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
     event.addAttribute("instanceName", instanceName);
     event.addAttribute("changeContext", changeContext);
     event.addAttribute("eventData", messages);
-    handleEvent(event);
+    _eventQueue.put(event);
 
     if (_clusterStatusMonitor != null && messages != null) {
       _clusterStatusMonitor.addMessageQueueSize(instanceName, messages.size());
@@ -371,7 +387,7 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
     event.addAttribute("helixmanager", changeContext.getManager());
     event.addAttribute("changeContext", changeContext);
     event.addAttribute("eventData", liveInstances);
-    handleEvent(event);
+    _eventQueue.put(event);
     logger.info("END: Generic GenericClusterController.onLiveInstanceChange()");
   }
 
@@ -397,13 +413,13 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
     event.addAttribute("helixmanager", changeContext.getManager());
     event.addAttribute("changeContext", changeContext);
     event.addAttribute("eventData", idealStates);
-    handleEvent(event);
+    _eventQueue.put(event);
 
     if (changeContext.getType() != Type.FINALIZE) {
       checkRebalancingTimer(changeContext.getManager(), idealStates);
     }
 
-    logger.info("END: Generic GenericClusterController.onIdealStateChange()");
+    logger.info("END: GenericClusterController.onIdealStateChange()");
   }
 
   @Override
@@ -413,7 +429,7 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
     event.addAttribute("changeContext", changeContext);
     event.addAttribute("helixmanager", changeContext.getManager());
     event.addAttribute("eventData", configs);
-    handleEvent(event);
+    _eventQueue.put(event);
     logger.info("END: GenericClusterController.onConfigChange()");
   }
 
@@ -456,7 +472,7 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
         event.addAttribute("changeContext", changeContext);
         event.addAttribute("helixmanager", changeContext.getManager());
         event.addAttribute("eventData", pauseSignal);
-        handleEvent(event);
+        _eventQueue.put(event);
       } else {
         _paused = false;
       }
@@ -537,11 +553,34 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
     _lastSeenSessions.set(curSessions);
 
   }
+
   public void shutdownClusterStatusMonitor(String clusterName) {
-     if (_clusterStatusMonitor != null) {
+    if (_clusterStatusMonitor != null) {
       logger.info("Shut down _clusterStatusMonitor for cluster " + clusterName);
-       _clusterStatusMonitor.reset();
-       _clusterStatusMonitor = null;
+      _clusterStatusMonitor.reset();
+      _clusterStatusMonitor = null;
+    }
+  }
+
+  private class ClusterEventProcessor extends Thread {
+    @Override
+    public void run() {
+      logger.info("START ClusterEventProcessor thread");
+      while (!isInterrupted()) {
+        try {
+          ClusterEvent event = _eventQueue.take();
+          handleEvent(event);
+        } catch (InterruptedException e) {
+          logger.warn("ClusterEventProcessor interrupted", e);
+          interrupt();
+        } catch (ZkInterruptedException e) {
+          logger.warn("ClusterEventProcessor caught a ZK connection interrupt", e);
+          interrupt();
+        } catch (Throwable t) {
+          logger.error("ClusterEventProcessor failed while running the controller pipeline", t);
+        }
+      }
+      logger.info("END ClusterEventProcessor thread");
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/c9242802/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventBlockingQueue.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventBlockingQueue.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventBlockingQueue.java
new file mode 100644
index 0000000..54c9ca2
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventBlockingQueue.java
@@ -0,0 +1,123 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * A blocking queue of ClusterEvent objects to be used by the controller pipeline. This prevents
+ * multiple events of the same type from flooding the controller and preventing progress from being
+ * made. This queue has no capacity. This class is meant to be a limited implementation of the
+ * {@link BlockingQueue} interface.
+ */
+public class ClusterEventBlockingQueue {
+  private static final Logger LOG = Logger.getLogger(ClusterEventBlockingQueue.class);
+  private final Map<String, ClusterEvent> _eventMap;
+  private final Queue<ClusterEvent> _eventQueue;
+
+  /**
+   * Instantiate the queue
+   */
+  public ClusterEventBlockingQueue() {
+    _eventMap = Maps.newHashMap();
+    _eventQueue = Lists.newLinkedList();
+  }
+
+  /**
+   * Remove all events from the queue
+   */
+  public synchronized void clear() {
+    _eventMap.clear();
+    _eventQueue.clear();
+  }
+
+  /**
+   * Add a single event to the queue, overwriting events with the same name
+   * @param event ClusterEvent event to add
+   */
+  public synchronized void put(ClusterEvent event) {
+    if (!_eventMap.containsKey(event.getName())) {
+      // only insert if there isn't a same-named event already present
+      boolean result = _eventQueue.offer(event);
+      if (!result) {
+        return;
+      }
+    }
+    // always overwrite in case this is a FINALIZE
+    _eventMap.put(event.getName(), event);
+    LOG.debug("Putting event " + event.getName());
+    LOG.debug("Event queue size: " + _eventQueue.size());
+    notify();
+  }
+
+  /**
+   * Remove an element from the front of the queue, blocking if none is available. This method
+   * will return the most recent event seen with the oldest enqueued event name.
+   * @return ClusterEvent at the front of the queue
+   * @throws InterruptedException if the wait for elements was interrupted
+   */
+  public synchronized ClusterEvent take() throws InterruptedException {
+    while (_eventQueue.isEmpty()) {
+      wait();
+    }
+    ClusterEvent queuedEvent = _eventQueue.poll();
+    if (queuedEvent != null) {
+      LOG.debug("Taking event " + queuedEvent.getName());
+      LOG.debug("Event queue size: " + _eventQueue.size());
+      return _eventMap.remove(queuedEvent.getName());
+    }
+    return null;
+  }
+
+  /**
+   * Get at the head of the queue without removing it
+   * @return ClusterEvent at the front of the queue, or null if none available
+   */
+  public synchronized ClusterEvent peek() {
+    ClusterEvent queuedEvent = _eventQueue.peek();
+    if (queuedEvent != null) {
+      return _eventMap.get(queuedEvent.getName());
+    }
+    return queuedEvent;
+  }
+
+  /**
+   * Get the queue size
+   * @return integer size of the queue
+   */
+  public int size() {
+    return _eventQueue.size();
+  }
+
+  /**
+   * Check if the queue is empty
+   * @return true if events are not present, false otherwise
+   */
+  public boolean isEmpty() {
+    return _eventQueue.isEmpty();
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/c9242802/helix-core/src/test/java/org/apache/helix/controller/stages/TestClusterEventBlockingQueue.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestClusterEventBlockingQueue.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestClusterEventBlockingQueue.java
new file mode 100644
index 0000000..2dba7b6
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestClusterEventBlockingQueue.java
@@ -0,0 +1,95 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+
+/**
+ * Test {@link ClusterEventBlockingQueue} to ensure that it coalesces events while keeping then in
+ * FIFO order.
+ */
+public class TestClusterEventBlockingQueue {
+  @Test
+  public void testEventQueue() throws Exception {
+    // initialize the queue
+    ClusterEventBlockingQueue queue = new ClusterEventBlockingQueue();
+
+    // add an event
+    ClusterEvent event1 = new ClusterEvent("event1");
+    queue.put(event1);
+    Assert.assertEquals(queue.size(), 1);
+
+    // add an event with a different name
+    ClusterEvent event2 = new ClusterEvent("event2");
+    queue.put(event2);
+    Assert.assertEquals(queue.size(), 2);
+
+    // add an event with the same name as event1 (should not change queue size)
+    ClusterEvent newEvent1 = new ClusterEvent("event1");
+    newEvent1.addAttribute("attr", 1);
+    queue.put(newEvent1);
+    Assert.assertEquals(queue.size(), 2);
+
+    // test peek
+    ClusterEvent peeked = queue.peek();
+    Assert.assertEquals(peeked.getName(), "event1");
+    Assert.assertEquals(peeked.getAttribute("attr"), 1);
+    Assert.assertEquals(queue.size(), 2);
+
+    // test take the head
+    ListeningExecutorService service =
+        MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
+    ClusterEvent takenEvent1 = safeTake(queue, service);
+    Assert.assertEquals(takenEvent1.getName(), "event1");
+    Assert.assertEquals(takenEvent1.getAttribute("attr"), 1);
+    Assert.assertEquals(queue.size(), 1);
+
+    // test take the tail
+    ClusterEvent takenEvent2 = safeTake(queue, service);
+    Assert.assertEquals(takenEvent2.getName(), "event2");
+    Assert.assertEquals(queue.size(), 0);
+  }
+
+  private ClusterEvent safeTake(final ClusterEventBlockingQueue queue,
+      final ListeningExecutorService service) throws InterruptedException, ExecutionException,
+      TimeoutException {
+    // the take() in ClusterEventBlockingQueue will wait indefinitely
+    // for this test, stop waiting after 30 seconds
+    ListenableFuture<ClusterEvent> future = service.submit(new Callable<ClusterEvent>() {
+      @Override
+      public ClusterEvent call() throws InterruptedException {
+        return queue.take();
+      }
+    });
+    ClusterEvent event = future.get(30, TimeUnit.SECONDS);
+    return event;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/c9242802/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
index bf851cc..49fd98c 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
@@ -37,9 +37,9 @@ import org.apache.helix.HelixManager;
 import org.apache.helix.InstanceType;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.PropertyType;
 import org.apache.helix.ZNRecord;
-import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
 import org.apache.helix.messaging.AsyncCallback;
 import org.apache.helix.messaging.handling.HelixTaskResult;
@@ -192,6 +192,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
   public void TestSchedulerMsgUsingQueue() throws Exception {
     Logger.getRootLogger().setLevel(Level.INFO);
     _factory._results.clear();
+    Thread.sleep(2000);
     HelixManager manager = null;
     for (int i = 0; i < NODE_NR; i++) {
       _participants[i].getMessagingService().registerMessageHandlerFactory(
@@ -341,6 +342,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
 
     int messageResultCount = 0;
     for (int i = 0; i < 10; i++) {
+      Thread.sleep(1000);
       ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
       Assert.assertTrue(statusUpdate.getMapField("SentMessageCount").get("MessageCount")
           .equals("" + (_PARTITIONS * 3)));
@@ -418,6 +420,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
   @Test()
   public void TestSchedulerMsg2() throws Exception {
     _factory._results.clear();
+    Thread.sleep(2000);
     HelixManager manager = null;
     for (int i = 0; i < NODE_NR; i++) {
       _participants[i].getMessagingService().registerMessageHandlerFactory(
@@ -579,6 +582,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
   @Test()
   public void TestSchedulerMsg3() throws Exception {
     _factory._results.clear();
+    Thread.sleep(2000);
     HelixManager manager = null;
     for (int i = 0; i < NODE_NR; i++) {
       _participants[i].getMessagingService().registerMessageHandlerFactory(