You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/01/23 00:18:28 UTC
git commit: [HELIX-281] Prevent message callbacks from indefinitely
starving other callbacks
Updated Branches:
refs/heads/helix-0.6.2-release f678f79f3 -> c92428023
[HELIX-281] Prevent message callbacks from indefinitely starving other callbacks
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/c9242802
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/c9242802
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/c9242802
Branch: refs/heads/helix-0.6.2-release
Commit: c92428023a6b8456c0e0ecce0649e61ea2575863
Parents: f678f79
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Wed Jan 8 17:18:35 2014 -0800
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Wed Jan 22 15:18:02 2014 -0800
----------------------------------------------------------------------
.../controller/GenericHelixController.java | 67 +++++++---
.../stages/ClusterEventBlockingQueue.java | 123 +++++++++++++++++++
.../stages/TestClusterEventBlockingQueue.java | 95 ++++++++++++++
.../helix/integration/TestSchedulerMessage.java | 6 +-
4 files changed, 276 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/c9242802/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 3b522e6..7e28399 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -28,6 +28,7 @@ import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicReference;
+import org.I0Itec.zkclient.exception.ZkInterruptedException;
import org.apache.helix.ConfigChangeListener;
import org.apache.helix.ControllerChangeListener;
import org.apache.helix.CurrentStateChangeListener;
@@ -39,13 +40,14 @@ import org.apache.helix.IdealStateChangeListener;
import org.apache.helix.LiveInstanceChangeListener;
import org.apache.helix.MessageListener;
import org.apache.helix.NotificationContext;
-import org.apache.helix.ZNRecord;
import org.apache.helix.NotificationContext.Type;
import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.ZNRecord;
import org.apache.helix.controller.pipeline.Pipeline;
import org.apache.helix.controller.pipeline.PipelineRegistry;
import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
import org.apache.helix.controller.stages.ClusterEvent;
+import org.apache.helix.controller.stages.ClusterEventBlockingQueue;
import org.apache.helix.controller.stages.CompatibilityCheckStage;
import org.apache.helix.controller.stages.CurrentStateComputationStage;
import org.apache.helix.controller.stages.ExternalViewComputeStage;
@@ -55,8 +57,8 @@ import org.apache.helix.controller.stages.MessageThrottleStage;
import org.apache.helix.controller.stages.ReadClusterDataStage;
import org.apache.helix.controller.stages.RebalanceIdealStateStage;
import org.apache.helix.controller.stages.ResourceComputationStage;
-import org.apache.helix.controller.stages.TaskAssignmentStage;
import org.apache.helix.controller.stages.ResourceValidationStage;
+import org.apache.helix.controller.stages.TaskAssignmentStage;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.HealthStat;
@@ -95,6 +97,12 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
ClusterStatusMonitor _clusterStatusMonitor;
/**
+ * A queue for controller events and a thread that will consume it
+ */
+ private final ClusterEventBlockingQueue _eventQueue;
+ private final ClusterEventProcessor _eventThread;
+
+ /**
* The _paused flag is checked by function handleEvent(), while if the flag is set
* handleEvent() will be no-op. Other event handling logic keeps the same when the flag
* is set.
@@ -134,7 +142,7 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
List<ZNRecord> dummy = new ArrayList<ZNRecord>();
event.addAttribute("eventData", dummy);
// Should be able to process
- handleEvent(event);
+ _eventQueue.put(event);
}
}
@@ -226,6 +234,9 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
_registry = registry;
_lastSeenInstances = new AtomicReference<Map<String, LiveInstance>>();
_lastSeenSessions = new AtomicReference<Map<String, LiveInstance>>();
+ _eventQueue = new ClusterEventBlockingQueue();
+ _eventThread = new ClusterEventProcessor();
+ _eventThread.start();
}
/**
@@ -277,6 +288,8 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
return;
}
+ logger.info("START: Invoking controller pipeline for event: " + event.getName());
+ long startTime = System.currentTimeMillis();
for (Pipeline pipeline : pipelines) {
try {
pipeline.handle(event);
@@ -287,6 +300,9 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
break;
}
}
+ long endTime = System.currentTimeMillis();
+ logger.info("END: Invoking controller pipeline for event: " + event.getName() + ", took "
+ + (endTime - startTime) + " ms");
}
// TODO since we read data in pipeline, we can get rid of reading from zookeeper in
@@ -300,7 +316,7 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
// event.addAttribute("helixmanager", changeContext.getManager());
// event.addAttribute("changeContext", changeContext);
// event.addAttribute("eventData", externalViewList);
- // // handleEvent(event);
+ // _eventQueue.put(event);
// logger.info("END: GenericClusterController.onExternalViewChange()");
}
@@ -313,7 +329,7 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
event.addAttribute("instanceName", instanceName);
event.addAttribute("changeContext", changeContext);
event.addAttribute("eventData", statesInfo);
- handleEvent(event);
+ _eventQueue.put(event);
logger.info("END: GenericClusterController.onStateChange()");
}
@@ -337,7 +353,7 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
event.addAttribute("instanceName", instanceName);
event.addAttribute("changeContext", changeContext);
event.addAttribute("eventData", messages);
- handleEvent(event);
+ _eventQueue.put(event);
if (_clusterStatusMonitor != null && messages != null) {
_clusterStatusMonitor.addMessageQueueSize(instanceName, messages.size());
@@ -371,7 +387,7 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
event.addAttribute("helixmanager", changeContext.getManager());
event.addAttribute("changeContext", changeContext);
event.addAttribute("eventData", liveInstances);
- handleEvent(event);
+ _eventQueue.put(event);
logger.info("END: Generic GenericClusterController.onLiveInstanceChange()");
}
@@ -397,13 +413,13 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
event.addAttribute("helixmanager", changeContext.getManager());
event.addAttribute("changeContext", changeContext);
event.addAttribute("eventData", idealStates);
- handleEvent(event);
+ _eventQueue.put(event);
if (changeContext.getType() != Type.FINALIZE) {
checkRebalancingTimer(changeContext.getManager(), idealStates);
}
- logger.info("END: Generic GenericClusterController.onIdealStateChange()");
+ logger.info("END: GenericClusterController.onIdealStateChange()");
}
@Override
@@ -413,7 +429,7 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
event.addAttribute("changeContext", changeContext);
event.addAttribute("helixmanager", changeContext.getManager());
event.addAttribute("eventData", configs);
- handleEvent(event);
+ _eventQueue.put(event);
logger.info("END: GenericClusterController.onConfigChange()");
}
@@ -456,7 +472,7 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
event.addAttribute("changeContext", changeContext);
event.addAttribute("helixmanager", changeContext.getManager());
event.addAttribute("eventData", pauseSignal);
- handleEvent(event);
+ _eventQueue.put(event);
} else {
_paused = false;
}
@@ -537,11 +553,34 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
_lastSeenSessions.set(curSessions);
}
+
public void shutdownClusterStatusMonitor(String clusterName) {
- if (_clusterStatusMonitor != null) {
+ if (_clusterStatusMonitor != null) {
logger.info("Shut down _clusterStatusMonitor for cluster " + clusterName);
- _clusterStatusMonitor.reset();
- _clusterStatusMonitor = null;
+ _clusterStatusMonitor.reset();
+ _clusterStatusMonitor = null;
+ }
+ }
+
+ private class ClusterEventProcessor extends Thread {
+ @Override
+ public void run() {
+ logger.info("START ClusterEventProcessor thread");
+ while (!isInterrupted()) {
+ try {
+ ClusterEvent event = _eventQueue.take();
+ handleEvent(event);
+ } catch (InterruptedException e) {
+ logger.warn("ClusterEventProcessor interrupted", e);
+ interrupt();
+ } catch (ZkInterruptedException e) {
+ logger.warn("ClusterEventProcessor caught a ZK connection interrupt", e);
+ interrupt();
+ } catch (Throwable t) {
+ logger.error("ClusterEventProcessor failed while running the controller pipeline", t);
+ }
+ }
+ logger.info("END ClusterEventProcessor thread");
}
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/c9242802/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventBlockingQueue.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventBlockingQueue.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventBlockingQueue.java
new file mode 100644
index 0000000..54c9ca2
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventBlockingQueue.java
@@ -0,0 +1,123 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * A blocking queue of ClusterEvent objects to be used by the controller pipeline. This prevents
+ * multiple events of the same type from flooding the controller and preventing progress from being
+ * made. This queue has no capacity. This class is meant to be a limited implementation of the
+ * {@link BlockingQueue} interface.
+ */
+public class ClusterEventBlockingQueue {
+ private static final Logger LOG = Logger.getLogger(ClusterEventBlockingQueue.class);
+ private final Map<String, ClusterEvent> _eventMap;
+ private final Queue<ClusterEvent> _eventQueue;
+
+ /**
+ * Instantiate the queue
+ */
+ public ClusterEventBlockingQueue() {
+ _eventMap = Maps.newHashMap();
+ _eventQueue = Lists.newLinkedList();
+ }
+
+ /**
+ * Remove all events from the queue
+ */
+ public synchronized void clear() {
+ _eventMap.clear();
+ _eventQueue.clear();
+ }
+
+ /**
+ * Add a single event to the queue, overwriting events with the same name
+ * @param event ClusterEvent event to add
+ */
+ public synchronized void put(ClusterEvent event) {
+ if (!_eventMap.containsKey(event.getName())) {
+ // only insert if there isn't a same-named event already present
+ boolean result = _eventQueue.offer(event);
+ if (!result) {
+ return;
+ }
+ }
+ // always overwrite in case this is a FINALIZE
+ _eventMap.put(event.getName(), event);
+ LOG.debug("Putting event " + event.getName());
+ LOG.debug("Event queue size: " + _eventQueue.size());
+ notify();
+ }
+
+ /**
+ * Remove an element from the front of the queue, blocking if none is available. This method
+ * will return the most recent event seen with the oldest enqueued event name.
+ * @return ClusterEvent at the front of the queue
+ * @throws InterruptedException if the wait for elements was interrupted
+ */
+ public synchronized ClusterEvent take() throws InterruptedException {
+ while (_eventQueue.isEmpty()) {
+ wait();
+ }
+ ClusterEvent queuedEvent = _eventQueue.poll();
+ if (queuedEvent != null) {
+ LOG.debug("Taking event " + queuedEvent.getName());
+ LOG.debug("Event queue size: " + _eventQueue.size());
+ return _eventMap.remove(queuedEvent.getName());
+ }
+ return null;
+ }
+
+ /**
+ * Get at the head of the queue without removing it
+ * @return ClusterEvent at the front of the queue, or null if none available
+ */
+ public synchronized ClusterEvent peek() {
+ ClusterEvent queuedEvent = _eventQueue.peek();
+ if (queuedEvent != null) {
+ return _eventMap.get(queuedEvent.getName());
+ }
+ return queuedEvent;
+ }
+
+ /**
+ * Get the queue size
+ * @return integer size of the queue
+ */
+ public int size() {
+ return _eventQueue.size();
+ }
+
+ /**
+ * Check if the queue is empty
+ * @return true if events are not present, false otherwise
+ */
+ public boolean isEmpty() {
+ return _eventQueue.isEmpty();
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/c9242802/helix-core/src/test/java/org/apache/helix/controller/stages/TestClusterEventBlockingQueue.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestClusterEventBlockingQueue.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestClusterEventBlockingQueue.java
new file mode 100644
index 0000000..2dba7b6
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestClusterEventBlockingQueue.java
@@ -0,0 +1,95 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+
+/**
+ * Test {@link ClusterEventBlockingQueue} to ensure that it coalesces events while keeping then in
+ * FIFO order.
+ */
+public class TestClusterEventBlockingQueue {
+ @Test
+ public void testEventQueue() throws Exception {
+ // initialize the queue
+ ClusterEventBlockingQueue queue = new ClusterEventBlockingQueue();
+
+ // add an event
+ ClusterEvent event1 = new ClusterEvent("event1");
+ queue.put(event1);
+ Assert.assertEquals(queue.size(), 1);
+
+ // add an event with a different name
+ ClusterEvent event2 = new ClusterEvent("event2");
+ queue.put(event2);
+ Assert.assertEquals(queue.size(), 2);
+
+ // add an event with the same name as event1 (should not change queue size)
+ ClusterEvent newEvent1 = new ClusterEvent("event1");
+ newEvent1.addAttribute("attr", 1);
+ queue.put(newEvent1);
+ Assert.assertEquals(queue.size(), 2);
+
+ // test peek
+ ClusterEvent peeked = queue.peek();
+ Assert.assertEquals(peeked.getName(), "event1");
+ Assert.assertEquals(peeked.getAttribute("attr"), 1);
+ Assert.assertEquals(queue.size(), 2);
+
+ // test take the head
+ ListeningExecutorService service =
+ MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
+ ClusterEvent takenEvent1 = safeTake(queue, service);
+ Assert.assertEquals(takenEvent1.getName(), "event1");
+ Assert.assertEquals(takenEvent1.getAttribute("attr"), 1);
+ Assert.assertEquals(queue.size(), 1);
+
+ // test take the tail
+ ClusterEvent takenEvent2 = safeTake(queue, service);
+ Assert.assertEquals(takenEvent2.getName(), "event2");
+ Assert.assertEquals(queue.size(), 0);
+ }
+
+ private ClusterEvent safeTake(final ClusterEventBlockingQueue queue,
+ final ListeningExecutorService service) throws InterruptedException, ExecutionException,
+ TimeoutException {
+ // the take() in ClusterEventBlockingQueue will wait indefinitely
+ // for this test, stop waiting after 30 seconds
+ ListenableFuture<ClusterEvent> future = service.submit(new Callable<ClusterEvent>() {
+ @Override
+ public ClusterEvent call() throws InterruptedException {
+ return queue.take();
+ }
+ });
+ ClusterEvent event = future.get(30, TimeUnit.SECONDS);
+ return event;
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/c9242802/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
index bf851cc..49fd98c 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
@@ -37,9 +37,9 @@ import org.apache.helix.HelixManager;
import org.apache.helix.InstanceType;
import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.PropertyType;
import org.apache.helix.ZNRecord;
-import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
import org.apache.helix.messaging.AsyncCallback;
import org.apache.helix.messaging.handling.HelixTaskResult;
@@ -192,6 +192,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
public void TestSchedulerMsgUsingQueue() throws Exception {
Logger.getRootLogger().setLevel(Level.INFO);
_factory._results.clear();
+ Thread.sleep(2000);
HelixManager manager = null;
for (int i = 0; i < NODE_NR; i++) {
_participants[i].getMessagingService().registerMessageHandlerFactory(
@@ -341,6 +342,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
int messageResultCount = 0;
for (int i = 0; i < 10; i++) {
+ Thread.sleep(1000);
ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
Assert.assertTrue(statusUpdate.getMapField("SentMessageCount").get("MessageCount")
.equals("" + (_PARTITIONS * 3)));
@@ -418,6 +420,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
@Test()
public void TestSchedulerMsg2() throws Exception {
_factory._results.clear();
+ Thread.sleep(2000);
HelixManager manager = null;
for (int i = 0; i < NODE_NR; i++) {
_participants[i].getMessagingService().registerMessageHandlerFactory(
@@ -579,6 +582,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
@Test()
public void TestSchedulerMsg3() throws Exception {
_factory._results.clear();
+ Thread.sleep(2000);
HelixManager manager = null;
for (int i = 0; i < NODE_NR; i++) {
_participants[i].getMessagingService().registerMessageHandlerFactory(