You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/01/22 23:52:31 UTC
[1/2] git commit: [HELIX-281] Prevent message callbacks from
indefinitely starving other callbacks
Updated Branches:
refs/heads/master 300d27edf -> 7fca871c1
[HELIX-281] Prevent message callbacks from indefinitely starving other callbacks
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/d5a4caff
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/d5a4caff
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/d5a4caff
Branch: refs/heads/master
Commit: d5a4caffdf6ed267237309cac9f70fb7ebc14be1
Parents: 180e5b4
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Wed Jan 8 17:18:35 2014 -0800
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Wed Jan 22 14:45:22 2014 -0800
----------------------------------------------------------------------
.../controller/GenericHelixController.java | 57 +++++++--
.../stages/ClusterEventBlockingQueue.java | 123 +++++++++++++++++++
.../stages/TestClusterEventBlockingQueue.java | 95 ++++++++++++++
.../helix/integration/TestHelixConnection.java | 1 +
.../helix/integration/TestSchedulerMessage.java | 12 +-
5 files changed, 274 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/d5a4caff/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 9fef2da..7b19574 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -28,6 +28,7 @@ import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicReference;
+import org.I0Itec.zkclient.exception.ZkInterruptedException;
import org.apache.helix.ConfigChangeListener;
import org.apache.helix.ControllerChangeListener;
import org.apache.helix.CurrentStateChangeListener;
@@ -46,6 +47,7 @@ import org.apache.helix.controller.pipeline.Pipeline;
import org.apache.helix.controller.pipeline.PipelineRegistry;
import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
import org.apache.helix.controller.stages.ClusterEvent;
+import org.apache.helix.controller.stages.ClusterEventBlockingQueue;
import org.apache.helix.controller.stages.CompatibilityCheckStage;
import org.apache.helix.controller.stages.CurrentStateComputationStage;
import org.apache.helix.controller.stages.ExternalViewComputeStage;
@@ -56,8 +58,8 @@ import org.apache.helix.controller.stages.PersistAssignmentStage;
import org.apache.helix.controller.stages.PersistContextStage;
import org.apache.helix.controller.stages.ReadClusterDataStage;
import org.apache.helix.controller.stages.ResourceComputationStage;
-import org.apache.helix.controller.stages.TaskAssignmentStage;
import org.apache.helix.controller.stages.ResourceValidationStage;
+import org.apache.helix.controller.stages.TaskAssignmentStage;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.HealthStat;
@@ -96,6 +98,12 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
ClusterStatusMonitor _clusterStatusMonitor;
/**
+ * A queue for controller events and a thread that will consume it
+ */
+ private final ClusterEventBlockingQueue _eventQueue;
+ private final ClusterEventProcessor _eventThread;
+
+ /**
* The _paused flag is checked by function handleEvent(), while if the flag is set
* handleEvent() will be no-op. Other event handling logic keeps the same when the flag
* is set.
@@ -135,7 +143,7 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
List<ZNRecord> dummy = new ArrayList<ZNRecord>();
event.addAttribute("eventData", dummy);
// Should be able to process
- handleEvent(event);
+ _eventQueue.put(event);
}
}
@@ -217,6 +225,9 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
_registry = registry;
_lastSeenInstances = new AtomicReference<Map<String, LiveInstance>>();
_lastSeenSessions = new AtomicReference<Map<String, LiveInstance>>();
+ _eventQueue = new ClusterEventBlockingQueue();
+ _eventThread = new ClusterEventProcessor();
+ _eventThread.start();
}
/**
@@ -273,6 +284,8 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
return;
}
+ logger.info("START: Invoking controller pipeline for event: " + event.getName());
+ long startTime = System.currentTimeMillis();
for (Pipeline pipeline : pipelines) {
try {
pipeline.handle(event);
@@ -283,6 +296,9 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
break;
}
}
+ long endTime = System.currentTimeMillis();
+ logger.info("END: Invoking controller pipeline for event: " + event.getName() + ", took "
+ + (endTime - startTime) + " ms");
}
// TODO since we read data in pipeline, we can get rid of reading from zookeeper in
@@ -296,7 +312,7 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
// event.addAttribute("helixmanager", changeContext.getManager());
// event.addAttribute("changeContext", changeContext);
// event.addAttribute("eventData", externalViewList);
- // // handleEvent(event);
+ // _eventQueue.put(event);
// logger.info("END: GenericClusterController.onExternalViewChange()");
}
@@ -309,7 +325,7 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
event.addAttribute("instanceName", instanceName);
event.addAttribute("changeContext", changeContext);
event.addAttribute("eventData", statesInfo);
- handleEvent(event);
+ _eventQueue.put(event);
logger.info("END: GenericClusterController.onStateChange()");
}
@@ -333,7 +349,7 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
event.addAttribute("instanceName", instanceName);
event.addAttribute("changeContext", changeContext);
event.addAttribute("eventData", messages);
- handleEvent(event);
+ _eventQueue.put(event);
if (_clusterStatusMonitor != null && messages != null) {
_clusterStatusMonitor.addMessageQueueSize(instanceName, messages.size());
@@ -367,7 +383,7 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
event.addAttribute("helixmanager", changeContext.getManager());
event.addAttribute("changeContext", changeContext);
event.addAttribute("eventData", liveInstances);
- handleEvent(event);
+ _eventQueue.put(event);
logger.info("END: Generic GenericClusterController.onLiveInstanceChange()");
}
@@ -393,13 +409,13 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
event.addAttribute("helixmanager", changeContext.getManager());
event.addAttribute("changeContext", changeContext);
event.addAttribute("eventData", idealStates);
- handleEvent(event);
+ _eventQueue.put(event);
if (changeContext.getType() != Type.FINALIZE) {
checkRebalancingTimer(changeContext.getManager(), idealStates);
}
- logger.info("END: Generic GenericClusterController.onIdealStateChange()");
+ logger.info("END: GenericClusterController.onIdealStateChange()");
}
@Override
@@ -409,7 +425,7 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
event.addAttribute("changeContext", changeContext);
event.addAttribute("helixmanager", changeContext.getManager());
event.addAttribute("eventData", configs);
- handleEvent(event);
+ _eventQueue.put(event);
logger.info("END: GenericClusterController.onConfigChange()");
}
@@ -452,7 +468,7 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
event.addAttribute("changeContext", changeContext);
event.addAttribute("helixmanager", changeContext.getManager());
event.addAttribute("eventData", pauseSignal);
- handleEvent(event);
+ _eventQueue.put(event);
} else {
_paused = false;
}
@@ -534,4 +550,25 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
}
+ private class ClusterEventProcessor extends Thread {
+ @Override
+ public void run() {
+ logger.info("START ClusterEventProcessor thread");
+ while (!isInterrupted()) {
+ try {
+ ClusterEvent event = _eventQueue.take();
+ handleEvent(event);
+ } catch (InterruptedException e) {
+ logger.warn("ClusterEventProcessor interrupted", e);
+ interrupt();
+ } catch (ZkInterruptedException e) {
+ logger.warn("ClusterEventProcessor caught a ZK connection interrupt", e);
+ interrupt();
+ } catch (Throwable t) {
+ logger.error("ClusterEventProcessor failed while running the controller pipeline", t);
+ }
+ }
+ logger.info("END ClusterEventProcessor thread");
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/helix/blob/d5a4caff/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventBlockingQueue.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventBlockingQueue.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventBlockingQueue.java
new file mode 100644
index 0000000..54c9ca2
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventBlockingQueue.java
@@ -0,0 +1,123 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * A blocking queue of ClusterEvent objects to be used by the controller pipeline. This prevents
+ * multiple events of the same type from flooding the controller and preventing progress from being
+ * made. This queue has no capacity. This class is meant to be a limited implementation of the
+ * {@link BlockingQueue} interface.
+ */
+public class ClusterEventBlockingQueue {
+ private static final Logger LOG = Logger.getLogger(ClusterEventBlockingQueue.class);
+ private final Map<String, ClusterEvent> _eventMap;
+ private final Queue<ClusterEvent> _eventQueue;
+
+ /**
+ * Instantiate the queue
+ */
+ public ClusterEventBlockingQueue() {
+ _eventMap = Maps.newHashMap();
+ _eventQueue = Lists.newLinkedList();
+ }
+
+ /**
+ * Remove all events from the queue
+ */
+ public synchronized void clear() {
+ _eventMap.clear();
+ _eventQueue.clear();
+ }
+
+ /**
+ * Add a single event to the queue, overwriting events with the same name
+ * @param event ClusterEvent event to add
+ */
+ public synchronized void put(ClusterEvent event) {
+ if (!_eventMap.containsKey(event.getName())) {
+ // only insert if there isn't a same-named event already present
+ boolean result = _eventQueue.offer(event);
+ if (!result) {
+ return;
+ }
+ }
+ // always overwrite in case this is a FINALIZE
+ _eventMap.put(event.getName(), event);
+ LOG.debug("Putting event " + event.getName());
+ LOG.debug("Event queue size: " + _eventQueue.size());
+ notify();
+ }
+
+ /**
+ * Remove an element from the front of the queue, blocking if none is available. This method
+ * will return the most recent event seen with the oldest enqueued event name.
+ * @return ClusterEvent at the front of the queue
+ * @throws InterruptedException if the wait for elements was interrupted
+ */
+ public synchronized ClusterEvent take() throws InterruptedException {
+ while (_eventQueue.isEmpty()) {
+ wait();
+ }
+ ClusterEvent queuedEvent = _eventQueue.poll();
+ if (queuedEvent != null) {
+ LOG.debug("Taking event " + queuedEvent.getName());
+ LOG.debug("Event queue size: " + _eventQueue.size());
+ return _eventMap.remove(queuedEvent.getName());
+ }
+ return null;
+ }
+
+ /**
+ * Get at the head of the queue without removing it
+ * @return ClusterEvent at the front of the queue, or null if none available
+ */
+ public synchronized ClusterEvent peek() {
+ ClusterEvent queuedEvent = _eventQueue.peek();
+ if (queuedEvent != null) {
+ return _eventMap.get(queuedEvent.getName());
+ }
+ return queuedEvent;
+ }
+
+ /**
+ * Get the queue size
+ * @return integer size of the queue
+ */
+ public int size() {
+ return _eventQueue.size();
+ }
+
+ /**
+ * Check if the queue is empty
+ * @return true if events are not present, false otherwise
+ */
+ public boolean isEmpty() {
+ return _eventQueue.isEmpty();
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/d5a4caff/helix-core/src/test/java/org/apache/helix/controller/stages/TestClusterEventBlockingQueue.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestClusterEventBlockingQueue.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestClusterEventBlockingQueue.java
new file mode 100644
index 0000000..2dba7b6
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestClusterEventBlockingQueue.java
@@ -0,0 +1,95 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+
+/**
+ * Test {@link ClusterEventBlockingQueue} to ensure that it coalesces events while keeping then in
+ * FIFO order.
+ */
+public class TestClusterEventBlockingQueue {
+ @Test
+ public void testEventQueue() throws Exception {
+ // initialize the queue
+ ClusterEventBlockingQueue queue = new ClusterEventBlockingQueue();
+
+ // add an event
+ ClusterEvent event1 = new ClusterEvent("event1");
+ queue.put(event1);
+ Assert.assertEquals(queue.size(), 1);
+
+ // add an event with a different name
+ ClusterEvent event2 = new ClusterEvent("event2");
+ queue.put(event2);
+ Assert.assertEquals(queue.size(), 2);
+
+ // add an event with the same name as event1 (should not change queue size)
+ ClusterEvent newEvent1 = new ClusterEvent("event1");
+ newEvent1.addAttribute("attr", 1);
+ queue.put(newEvent1);
+ Assert.assertEquals(queue.size(), 2);
+
+ // test peek
+ ClusterEvent peeked = queue.peek();
+ Assert.assertEquals(peeked.getName(), "event1");
+ Assert.assertEquals(peeked.getAttribute("attr"), 1);
+ Assert.assertEquals(queue.size(), 2);
+
+ // test take the head
+ ListeningExecutorService service =
+ MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
+ ClusterEvent takenEvent1 = safeTake(queue, service);
+ Assert.assertEquals(takenEvent1.getName(), "event1");
+ Assert.assertEquals(takenEvent1.getAttribute("attr"), 1);
+ Assert.assertEquals(queue.size(), 1);
+
+ // test take the tail
+ ClusterEvent takenEvent2 = safeTake(queue, service);
+ Assert.assertEquals(takenEvent2.getName(), "event2");
+ Assert.assertEquals(queue.size(), 0);
+ }
+
+ private ClusterEvent safeTake(final ClusterEventBlockingQueue queue,
+ final ListeningExecutorService service) throws InterruptedException, ExecutionException,
+ TimeoutException {
+ // the take() in ClusterEventBlockingQueue will wait indefinitely
+ // for this test, stop waiting after 30 seconds
+ ListenableFuture<ClusterEvent> future = service.submit(new Callable<ClusterEvent>() {
+ @Override
+ public ClusterEvent call() throws InterruptedException {
+ return queue.take();
+ }
+ });
+ ClusterEvent event = future.get(30, TimeUnit.SECONDS);
+ return event;
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/d5a4caff/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java b/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java
index b415393..318ab66 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java
@@ -142,6 +142,7 @@ public class TestHelixConnection extends ZkUnitTestBase {
StateModelDefId.from("MasterSlave"), new MockStateModelFactory());
participant.startAsync();
+ Thread.sleep(1000);
// verify
final HelixDataAccessor accessor = connection.createDataAccessor(clusterId);
http://git-wip-us.apache.org/repos/asf/helix/blob/d5a4caff/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
index 30f5807..6066859 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
@@ -38,10 +38,10 @@ import org.apache.helix.HelixManager;
import org.apache.helix.InstanceType;
import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.PropertyType;
import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
-import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
import org.apache.helix.messaging.AsyncCallback;
import org.apache.helix.messaging.handling.HelixTaskResult;
@@ -196,6 +196,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
public void testSchedulerMsgUsingQueue() throws Exception {
Logger.getRootLogger().setLevel(Level.INFO);
_factory._results.clear();
+ Thread.sleep(2000);
HelixManager manager = null;
for (int i = 0; i < NODE_NR; i++) {
_participants[i].getMessagingService().registerMessageHandlerFactory(
@@ -345,6 +346,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
int messageResultCount = 0;
for (int i = 0; i < 10; i++) {
+ Thread.sleep(1000);
ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
Assert.assertTrue(statusUpdate.getMapField("SentMessageCount").get("MessageCount")
.equals("" + (_PARTITIONS * 3)));
@@ -422,6 +424,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
@Test()
public void testSchedulerMsg2() throws Exception {
_factory._results.clear();
+ Thread.sleep(2000);
HelixManager manager = null;
for (int i = 0; i < NODE_NR; i++) {
_participants[i].getMessagingService().registerMessageHandlerFactory(
@@ -583,6 +586,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
@Test()
public void testSchedulerMsg3() throws Exception {
_factory._results.clear();
+ Thread.sleep(2000);
HelixManager manager = null;
for (int i = 0; i < NODE_NR; i++) {
_participants[i].getMessagingService().registerMessageHandlerFactory(
@@ -690,7 +694,6 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
}
Assert.assertEquals(messageResultCount, _PARTITIONS * 3 / 5);
-
boolean success = false;
for (int j = 0; j < 6; j++) {
int count = 0;
@@ -763,7 +766,8 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
String crString = sw.toString();
schedulerMessage.getRecord().setSimpleField("Criteria", crString);
- schedulerMessage.getRecord().setMapField("MessageTemplate", msgTemplate.getRecord().getSimpleFields());
+ schedulerMessage.getRecord().setMapField("MessageTemplate",
+ msgTemplate.getRecord().getSimpleFields());
schedulerMessage.getRecord().setSimpleField("TIMEOUT", "-1");
schedulerMessage.getRecord().setSimpleField("WAIT_ALL", "true");
@@ -1029,4 +1033,4 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
ConstraintType.MESSAGE_CONSTRAINT, "constraint1");
}
-}
\ No newline at end of file
+}
[2/2] git commit: Merge branch 'master' of
https://git-wip-us.apache.org/repos/asf/helix
Posted by ka...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/helix
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/7fca871c
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/7fca871c
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/7fca871c
Branch: refs/heads/master
Commit: 7fca871c1c43e473be6b7b37b5619f9f1ebdbd4c
Parents: d5a4caf 300d27e
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Wed Jan 22 14:52:01 2014 -0800
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Wed Jan 22 14:52:01 2014 -0800
----------------------------------------------------------------------
README.md | 19 +-
helix-core/pom.xml | 8 +-
.../controller/GenericHelixController.java | 13 +-
.../stages/ExternalViewComputeStage.java | 26 +-
.../DistClusterControllerElection.java | 8 +-
.../java/org/apache/helix/tools/ZkGrep.java | 641 +++++++++++++++++++
.../org/apache/helix/tools/ZkLogAnalyzer.java | 444 -------------
.../TestClusterStatusMonitorLifecycle.java | 250 ++++++++
hpost-review.sh | 2 +-
pom.xml | 27 +-
recipes/rabbitmq-consumer-group/README.md | 2 +-
.../src/site/markdown/Building.md | 4 +-
.../src/site/markdown/Quickstart.md | 8 +-
.../0.6.1-incubating/src/site/markdown/index.md | 2 +-
.../src/site/markdown/recipes/lock_manager.md | 4 +-
.../markdown/recipes/rabbitmq_consumer_group.md | 6 +-
.../recipes/rsync_replicated_file_store.md | 4 +-
.../site/markdown/recipes/service_discovery.md | 4 +-
.../site/markdown/recipes/task_dag_execution.md | 4 +-
.../src/site/markdown/tutorial_messaging.md | 2 +-
.../src/site/markdown/tutorial_propstore.md | 2 +-
website/0.6.1-incubating/src/site/site.xml | 10 +-
.../src/site/xdoc/download.xml.vm | 2 +-
.../src/site/markdown/Building.md | 4 +-
.../src/site/markdown/Quickstart.md | 8 +-
.../0.6.2-incubating/src/site/markdown/index.md | 2 +-
.../src/site/markdown/recipes/lock_manager.md | 4 +-
.../markdown/recipes/rabbitmq_consumer_group.md | 6 +-
.../recipes/rsync_replicated_file_store.md | 4 +-
.../site/markdown/recipes/service_discovery.md | 4 +-
.../site/markdown/recipes/task_dag_execution.md | 4 +-
.../src/site/markdown/tutorial_admin.md | 10 +-
.../src/site/markdown/tutorial_messaging.md | 2 +-
.../src/site/markdown/tutorial_propstore.md | 2 +-
.../src/site/markdown/tutorial_yaml.md | 2 +-
website/0.6.2-incubating/src/site/site.xml | 10 +-
.../src/site/xdoc/download.xml.vm | 2 +-
.../src/site/markdown/Building.md | 4 +-
.../src/site/markdown/Quickstart.md | 8 +-
.../0.7.0-incubating/src/site/markdown/index.md | 2 +-
.../src/site/markdown/recipes/lock_manager.md | 4 +-
.../markdown/recipes/rabbitmq_consumer_group.md | 6 +-
.../recipes/rsync_replicated_file_store.md | 4 +-
.../site/markdown/recipes/service_discovery.md | 4 +-
.../site/markdown/recipes/task_dag_execution.md | 4 +-
.../markdown/recipes/user_def_rebalancer.md | 16 +-
.../src/site/markdown/tutorial_accessors.md | 12 +-
.../src/site/markdown/tutorial_admin.md | 10 +-
.../src/site/markdown/tutorial_messaging.md | 2 +-
.../src/site/markdown/tutorial_propstore.md | 2 +-
.../markdown/tutorial_user_def_rebalancer.md | 4 +-
.../src/site/markdown/tutorial_yaml.md | 2 +-
website/0.7.0-incubating/src/site/site.xml | 10 +-
.../src/site/xdoc/download.xml.vm | 2 +-
website/pom.xml | 4 +-
website/src/site/apt/releasing.apt | 22 +-
website/src/site/markdown/index.md | 18 +-
.../src/site/markdown/involved/contribdocs.md | 4 +-
website/src/site/markdown/sources.md | 6 +-
website/src/site/site.xml | 6 +-
website/src/site/xdoc/download.xml.vm | 2 +-
website/trunk/src/site/markdown/Building.md | 4 +-
website/trunk/src/site/markdown/Quickstart.md | 8 +-
website/trunk/src/site/markdown/index.md | 2 +-
.../src/site/markdown/recipes/lock_manager.md | 4 +-
.../markdown/recipes/rabbitmq_consumer_group.md | 6 +-
.../recipes/rsync_replicated_file_store.md | 4 +-
.../site/markdown/recipes/service_discovery.md | 4 +-
.../site/markdown/recipes/task_dag_execution.md | 4 +-
.../markdown/recipes/user_def_rebalancer.md | 16 +-
.../src/site/markdown/tutorial_accessors.md | 12 +-
.../trunk/src/site/markdown/tutorial_admin.md | 10 +-
.../src/site/markdown/tutorial_messaging.md | 2 +-
.../src/site/markdown/tutorial_propstore.md | 2 +-
.../markdown/tutorial_user_def_rebalancer.md | 4 +-
.../trunk/src/site/markdown/tutorial_yaml.md | 2 +-
website/trunk/src/site/site.xml | 10 +-
website/trunk/src/site/xdoc/download.xml.vm | 2 +-
78 files changed, 1130 insertions(+), 680 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/7fca871c/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --cc helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 7b19574,b15627a..28a5b06
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@@ -549,26 -528,12 +544,34 @@@ public class GenericHelixController imp
_lastSeenSessions.set(curSessions);
}
-
+
+ public void shutdownClusterStatusMonitor(String clusterName) {
+ if (_clusterStatusMonitor != null) {
+ logger.info("Shut down _clusterStatusMonitor for cluster " + clusterName);
- _clusterStatusMonitor.reset();
- _clusterStatusMonitor = null;
++ _clusterStatusMonitor.reset();
++ _clusterStatusMonitor = null;
++ }
++ }
++
+ private class ClusterEventProcessor extends Thread {
+ @Override
+ public void run() {
+ logger.info("START ClusterEventProcessor thread");
+ while (!isInterrupted()) {
+ try {
+ ClusterEvent event = _eventQueue.take();
+ handleEvent(event);
+ } catch (InterruptedException e) {
+ logger.warn("ClusterEventProcessor interrupted", e);
+ interrupt();
+ } catch (ZkInterruptedException e) {
+ logger.warn("ClusterEventProcessor caught a ZK connection interrupt", e);
+ interrupt();
+ } catch (Throwable t) {
+ logger.error("ClusterEventProcessor failed while running the controller pipeline", t);
+ }
+ }
+ logger.info("END ClusterEventProcessor thread");
}
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/7fca871c/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
----------------------------------------------------------------------
diff --cc helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
index 977b661,dddb0c0..a15e6b3
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
@@@ -113,19 -116,16 +116,18 @@@ public class ExternalViewComputeStage e
}
}
- // TODO fix this
// Update cluster status monitor mbean
- // ClusterStatusMonitor clusterStatusMonitor =
- // (ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor");
- // IdealState idealState = cache._idealStateMap.get(view.getResourceName());
- // if (idealState != null) {
- // if (clusterStatusMonitor != null
- // && !idealState.getStateModelDefRef().equalsIgnoreCase(
- // DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
- // clusterStatusMonitor.onExternalViewChange(view,
- // cache._idealStateMap.get(view.getResourceName()));
- // }
- // }
- ClusterStatusMonitor clusterStatusMonitor = (ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor");
- Resource currentResource = cluster.getResourceMap().get(view.getResourceId());
- if (currentResource != null) {
- IdealState idealState = currentResource.getIdealState();
- if (clusterStatusMonitor != null &&
- !idealState.getStateModelDefRef().equalsIgnoreCase(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
- clusterStatusMonitor.onExternalViewChange(view, idealState);
- }
- }
++ ClusterStatusMonitor clusterStatusMonitor =
++ (ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor");
++ Resource currentResource = cluster.getResourceMap().get(view.getResourceId());
++ if (currentResource != null) {
++ IdealState idealState = currentResource.getIdealState();
++ if (clusterStatusMonitor != null
++ && !idealState.getStateModelDefRef().equalsIgnoreCase(
++ DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
++ clusterStatusMonitor.onExternalViewChange(view, idealState);
++ }
++ }
// compare the new external view with current one, set only on different
ExternalView curExtView = curExtViews.get(resourceId.stringify());