You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/01/22 23:52:31 UTC

[1/2] git commit: [HELIX-281] Prevent message callbacks from indefinitely starving other callbacks

Updated Branches:
  refs/heads/master 300d27edf -> 7fca871c1


[HELIX-281] Prevent message callbacks from indefinitely starving other callbacks


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/d5a4caff
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/d5a4caff
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/d5a4caff

Branch: refs/heads/master
Commit: d5a4caffdf6ed267237309cac9f70fb7ebc14be1
Parents: 180e5b4
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Wed Jan 8 17:18:35 2014 -0800
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Wed Jan 22 14:45:22 2014 -0800

----------------------------------------------------------------------
 .../controller/GenericHelixController.java      |  57 +++++++--
 .../stages/ClusterEventBlockingQueue.java       | 123 +++++++++++++++++++
 .../stages/TestClusterEventBlockingQueue.java   |  95 ++++++++++++++
 .../helix/integration/TestHelixConnection.java  |   1 +
 .../helix/integration/TestSchedulerMessage.java |  12 +-
 5 files changed, 274 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/d5a4caff/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 9fef2da..7b19574 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -28,6 +28,7 @@ import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.I0Itec.zkclient.exception.ZkInterruptedException;
 import org.apache.helix.ConfigChangeListener;
 import org.apache.helix.ControllerChangeListener;
 import org.apache.helix.CurrentStateChangeListener;
@@ -46,6 +47,7 @@ import org.apache.helix.controller.pipeline.Pipeline;
 import org.apache.helix.controller.pipeline.PipelineRegistry;
 import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
 import org.apache.helix.controller.stages.ClusterEvent;
+import org.apache.helix.controller.stages.ClusterEventBlockingQueue;
 import org.apache.helix.controller.stages.CompatibilityCheckStage;
 import org.apache.helix.controller.stages.CurrentStateComputationStage;
 import org.apache.helix.controller.stages.ExternalViewComputeStage;
@@ -56,8 +58,8 @@ import org.apache.helix.controller.stages.PersistAssignmentStage;
 import org.apache.helix.controller.stages.PersistContextStage;
 import org.apache.helix.controller.stages.ReadClusterDataStage;
 import org.apache.helix.controller.stages.ResourceComputationStage;
-import org.apache.helix.controller.stages.TaskAssignmentStage;
 import org.apache.helix.controller.stages.ResourceValidationStage;
+import org.apache.helix.controller.stages.TaskAssignmentStage;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.HealthStat;
@@ -96,6 +98,12 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
   ClusterStatusMonitor _clusterStatusMonitor;
 
   /**
+   * A queue for controller events and a thread that will consume it
+   */
+  private final ClusterEventBlockingQueue _eventQueue;
+  private final ClusterEventProcessor _eventThread;
+
+  /**
    * The _paused flag is checked by function handleEvent(), while if the flag is set
    * handleEvent() will be no-op. Other event handling logic keeps the same when the flag
    * is set.
@@ -135,7 +143,7 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
       List<ZNRecord> dummy = new ArrayList<ZNRecord>();
       event.addAttribute("eventData", dummy);
       // Should be able to process
-      handleEvent(event);
+      _eventQueue.put(event);
     }
   }
 
@@ -217,6 +225,9 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
     _registry = registry;
     _lastSeenInstances = new AtomicReference<Map<String, LiveInstance>>();
     _lastSeenSessions = new AtomicReference<Map<String, LiveInstance>>();
+    _eventQueue = new ClusterEventBlockingQueue();
+    _eventThread = new ClusterEventProcessor();
+    _eventThread.start();
   }
 
   /**
@@ -273,6 +284,8 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
       return;
     }
 
+    logger.info("START: Invoking controller pipeline for event: " + event.getName());
+    long startTime = System.currentTimeMillis();
     for (Pipeline pipeline : pipelines) {
       try {
         pipeline.handle(event);
@@ -283,6 +296,9 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
         break;
       }
     }
+    long endTime = System.currentTimeMillis();
+    logger.info("END: Invoking controller pipeline for event: " + event.getName() + ", took "
+        + (endTime - startTime) + " ms");
   }
 
   // TODO since we read data in pipeline, we can get rid of reading from zookeeper in
@@ -296,7 +312,7 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
     // event.addAttribute("helixmanager", changeContext.getManager());
     // event.addAttribute("changeContext", changeContext);
     // event.addAttribute("eventData", externalViewList);
-    // // handleEvent(event);
+    // _eventQueue.put(event);
     // logger.info("END: GenericClusterController.onExternalViewChange()");
   }
 
@@ -309,7 +325,7 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
     event.addAttribute("instanceName", instanceName);
     event.addAttribute("changeContext", changeContext);
     event.addAttribute("eventData", statesInfo);
-    handleEvent(event);
+    _eventQueue.put(event);
     logger.info("END: GenericClusterController.onStateChange()");
   }
 
@@ -333,7 +349,7 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
     event.addAttribute("instanceName", instanceName);
     event.addAttribute("changeContext", changeContext);
     event.addAttribute("eventData", messages);
-    handleEvent(event);
+    _eventQueue.put(event);
 
     if (_clusterStatusMonitor != null && messages != null) {
       _clusterStatusMonitor.addMessageQueueSize(instanceName, messages.size());
@@ -367,7 +383,7 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
     event.addAttribute("helixmanager", changeContext.getManager());
     event.addAttribute("changeContext", changeContext);
     event.addAttribute("eventData", liveInstances);
-    handleEvent(event);
+    _eventQueue.put(event);
     logger.info("END: Generic GenericClusterController.onLiveInstanceChange()");
   }
 
@@ -393,13 +409,13 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
     event.addAttribute("helixmanager", changeContext.getManager());
     event.addAttribute("changeContext", changeContext);
     event.addAttribute("eventData", idealStates);
-    handleEvent(event);
+    _eventQueue.put(event);
 
     if (changeContext.getType() != Type.FINALIZE) {
       checkRebalancingTimer(changeContext.getManager(), idealStates);
     }
 
-    logger.info("END: Generic GenericClusterController.onIdealStateChange()");
+    logger.info("END: GenericClusterController.onIdealStateChange()");
   }
 
   @Override
@@ -409,7 +425,7 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
     event.addAttribute("changeContext", changeContext);
     event.addAttribute("helixmanager", changeContext.getManager());
     event.addAttribute("eventData", configs);
-    handleEvent(event);
+    _eventQueue.put(event);
     logger.info("END: GenericClusterController.onConfigChange()");
   }
 
@@ -452,7 +468,7 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
         event.addAttribute("changeContext", changeContext);
         event.addAttribute("helixmanager", changeContext.getManager());
         event.addAttribute("eventData", pauseSignal);
-        handleEvent(event);
+        _eventQueue.put(event);
       } else {
         _paused = false;
       }
@@ -534,4 +550,25 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
 
   }
 
+  private class ClusterEventProcessor extends Thread {
+    @Override
+    public void run() {
+      logger.info("START ClusterEventProcessor thread");
+      while (!isInterrupted()) {
+        try {
+          ClusterEvent event = _eventQueue.take();
+          handleEvent(event);
+        } catch (InterruptedException e) {
+          logger.warn("ClusterEventProcessor interrupted", e);
+          interrupt();
+        } catch (ZkInterruptedException e) {
+          logger.warn("ClusterEventProcessor caught a ZK connection interrupt", e);
+          interrupt();
+        } catch (Throwable t) {
+          logger.error("ClusterEventProcessor failed while running the controller pipeline", t);
+        }
+      }
+      logger.info("END ClusterEventProcessor thread");
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/d5a4caff/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventBlockingQueue.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventBlockingQueue.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventBlockingQueue.java
new file mode 100644
index 0000000..54c9ca2
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventBlockingQueue.java
@@ -0,0 +1,123 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * A blocking queue of ClusterEvent objects to be used by the controller pipeline. This prevents
+ * multiple events of the same type from flooding the controller and preventing progress from being
+ * made. This queue has no capacity. This class is meant to be a limited implementation of the
+ * {@link BlockingQueue} interface.
+ */
+public class ClusterEventBlockingQueue {
+  private static final Logger LOG = Logger.getLogger(ClusterEventBlockingQueue.class);
+  private final Map<String, ClusterEvent> _eventMap;
+  private final Queue<ClusterEvent> _eventQueue;
+
+  /**
+   * Instantiate the queue
+   */
+  public ClusterEventBlockingQueue() {
+    _eventMap = Maps.newHashMap();
+    _eventQueue = Lists.newLinkedList();
+  }
+
+  /**
+   * Remove all events from the queue
+   */
+  public synchronized void clear() {
+    _eventMap.clear();
+    _eventQueue.clear();
+  }
+
+  /**
+   * Add a single event to the queue, overwriting events with the same name
+   * @param event ClusterEvent event to add
+   */
+  public synchronized void put(ClusterEvent event) {
+    if (!_eventMap.containsKey(event.getName())) {
+      // only insert if there isn't a same-named event already present
+      boolean result = _eventQueue.offer(event);
+      if (!result) {
+        return;
+      }
+    }
+    // always overwrite in case this is a FINALIZE
+    _eventMap.put(event.getName(), event);
+    LOG.debug("Putting event " + event.getName());
+    LOG.debug("Event queue size: " + _eventQueue.size());
+    notify();
+  }
+
+  /**
+   * Remove an element from the front of the queue, blocking if none is available. This method
+   * will return the most recent event seen with the oldest enqueued event name.
+   * @return ClusterEvent at the front of the queue
+   * @throws InterruptedException if the wait for elements was interrupted
+   */
+  public synchronized ClusterEvent take() throws InterruptedException {
+    while (_eventQueue.isEmpty()) {
+      wait();
+    }
+    ClusterEvent queuedEvent = _eventQueue.poll();
+    if (queuedEvent != null) {
+      LOG.debug("Taking event " + queuedEvent.getName());
+      LOG.debug("Event queue size: " + _eventQueue.size());
+      return _eventMap.remove(queuedEvent.getName());
+    }
+    return null;
+  }
+
+  /**
+   * Get at the head of the queue without removing it
+   * @return ClusterEvent at the front of the queue, or null if none available
+   */
+  public synchronized ClusterEvent peek() {
+    ClusterEvent queuedEvent = _eventQueue.peek();
+    if (queuedEvent != null) {
+      return _eventMap.get(queuedEvent.getName());
+    }
+    return queuedEvent;
+  }
+
+  /**
+   * Get the queue size
+   * @return integer size of the queue
+   */
+  public int size() {
+    return _eventQueue.size();
+  }
+
+  /**
+   * Check if the queue is empty
+   * @return true if events are not present, false otherwise
+   */
+  public boolean isEmpty() {
+    return _eventQueue.isEmpty();
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/d5a4caff/helix-core/src/test/java/org/apache/helix/controller/stages/TestClusterEventBlockingQueue.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestClusterEventBlockingQueue.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestClusterEventBlockingQueue.java
new file mode 100644
index 0000000..2dba7b6
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestClusterEventBlockingQueue.java
@@ -0,0 +1,95 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+
+/**
+ * Test {@link ClusterEventBlockingQueue} to ensure that it coalesces events while keeping then in
+ * FIFO order.
+ */
+public class TestClusterEventBlockingQueue {
+  @Test
+  public void testEventQueue() throws Exception {
+    // initialize the queue
+    ClusterEventBlockingQueue queue = new ClusterEventBlockingQueue();
+
+    // add an event
+    ClusterEvent event1 = new ClusterEvent("event1");
+    queue.put(event1);
+    Assert.assertEquals(queue.size(), 1);
+
+    // add an event with a different name
+    ClusterEvent event2 = new ClusterEvent("event2");
+    queue.put(event2);
+    Assert.assertEquals(queue.size(), 2);
+
+    // add an event with the same name as event1 (should not change queue size)
+    ClusterEvent newEvent1 = new ClusterEvent("event1");
+    newEvent1.addAttribute("attr", 1);
+    queue.put(newEvent1);
+    Assert.assertEquals(queue.size(), 2);
+
+    // test peek
+    ClusterEvent peeked = queue.peek();
+    Assert.assertEquals(peeked.getName(), "event1");
+    Assert.assertEquals(peeked.getAttribute("attr"), 1);
+    Assert.assertEquals(queue.size(), 2);
+
+    // test take the head
+    ListeningExecutorService service =
+        MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
+    ClusterEvent takenEvent1 = safeTake(queue, service);
+    Assert.assertEquals(takenEvent1.getName(), "event1");
+    Assert.assertEquals(takenEvent1.getAttribute("attr"), 1);
+    Assert.assertEquals(queue.size(), 1);
+
+    // test take the tail
+    ClusterEvent takenEvent2 = safeTake(queue, service);
+    Assert.assertEquals(takenEvent2.getName(), "event2");
+    Assert.assertEquals(queue.size(), 0);
+  }
+
+  private ClusterEvent safeTake(final ClusterEventBlockingQueue queue,
+      final ListeningExecutorService service) throws InterruptedException, ExecutionException,
+      TimeoutException {
+    // the take() in ClusterEventBlockingQueue will wait indefinitely
+    // for this test, stop waiting after 30 seconds
+    ListenableFuture<ClusterEvent> future = service.submit(new Callable<ClusterEvent>() {
+      @Override
+      public ClusterEvent call() throws InterruptedException {
+        return queue.take();
+      }
+    });
+    ClusterEvent event = future.get(30, TimeUnit.SECONDS);
+    return event;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/d5a4caff/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java b/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java
index b415393..318ab66 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestHelixConnection.java
@@ -142,6 +142,7 @@ public class TestHelixConnection extends ZkUnitTestBase {
         StateModelDefId.from("MasterSlave"), new MockStateModelFactory());
 
     participant.startAsync();
+    Thread.sleep(1000);
 
     // verify
     final HelixDataAccessor accessor = connection.createDataAccessor(clusterId);

http://git-wip-us.apache.org/repos/asf/helix/blob/d5a4caff/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
index 30f5807..6066859 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
@@ -38,10 +38,10 @@ import org.apache.helix.HelixManager;
 import org.apache.helix.InstanceType;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.PropertyType;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
-import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
 import org.apache.helix.messaging.AsyncCallback;
 import org.apache.helix.messaging.handling.HelixTaskResult;
@@ -196,6 +196,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
   public void testSchedulerMsgUsingQueue() throws Exception {
     Logger.getRootLogger().setLevel(Level.INFO);
     _factory._results.clear();
+    Thread.sleep(2000);
     HelixManager manager = null;
     for (int i = 0; i < NODE_NR; i++) {
       _participants[i].getMessagingService().registerMessageHandlerFactory(
@@ -345,6 +346,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
 
     int messageResultCount = 0;
     for (int i = 0; i < 10; i++) {
+      Thread.sleep(1000);
       ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord();
       Assert.assertTrue(statusUpdate.getMapField("SentMessageCount").get("MessageCount")
           .equals("" + (_PARTITIONS * 3)));
@@ -422,6 +424,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
   @Test()
   public void testSchedulerMsg2() throws Exception {
     _factory._results.clear();
+    Thread.sleep(2000);
     HelixManager manager = null;
     for (int i = 0; i < NODE_NR; i++) {
       _participants[i].getMessagingService().registerMessageHandlerFactory(
@@ -583,6 +586,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
   @Test()
   public void testSchedulerMsg3() throws Exception {
     _factory._results.clear();
+    Thread.sleep(2000);
     HelixManager manager = null;
     for (int i = 0; i < NODE_NR; i++) {
       _participants[i].getMessagingService().registerMessageHandlerFactory(
@@ -690,7 +694,6 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
       }
       Assert.assertEquals(messageResultCount, _PARTITIONS * 3 / 5);
 
-
       boolean success = false;
       for (int j = 0; j < 6; j++) {
         int count = 0;
@@ -763,7 +766,8 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
     String crString = sw.toString();
 
     schedulerMessage.getRecord().setSimpleField("Criteria", crString);
-    schedulerMessage.getRecord().setMapField("MessageTemplate", msgTemplate.getRecord().getSimpleFields());
+    schedulerMessage.getRecord().setMapField("MessageTemplate",
+        msgTemplate.getRecord().getSimpleFields());
     schedulerMessage.getRecord().setSimpleField("TIMEOUT", "-1");
     schedulerMessage.getRecord().setSimpleField("WAIT_ALL", "true");
 
@@ -1029,4 +1033,4 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
         ConstraintType.MESSAGE_CONSTRAINT, "constraint1");
 
   }
-}
\ No newline at end of file
+}


[2/2] git commit: Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/helix

Posted by ka...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/helix


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/7fca871c
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/7fca871c
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/7fca871c

Branch: refs/heads/master
Commit: 7fca871c1c43e473be6b7b37b5619f9f1ebdbd4c
Parents: d5a4caf 300d27e
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Wed Jan 22 14:52:01 2014 -0800
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Wed Jan 22 14:52:01 2014 -0800

----------------------------------------------------------------------
 README.md                                       |  19 +-
 helix-core/pom.xml                              |   8 +-
 .../controller/GenericHelixController.java      |  13 +-
 .../stages/ExternalViewComputeStage.java        |  26 +-
 .../DistClusterControllerElection.java          |   8 +-
 .../java/org/apache/helix/tools/ZkGrep.java     | 641 +++++++++++++++++++
 .../org/apache/helix/tools/ZkLogAnalyzer.java   | 444 -------------
 .../TestClusterStatusMonitorLifecycle.java      | 250 ++++++++
 hpost-review.sh                                 |   2 +-
 pom.xml                                         |  27 +-
 recipes/rabbitmq-consumer-group/README.md       |   2 +-
 .../src/site/markdown/Building.md               |   4 +-
 .../src/site/markdown/Quickstart.md             |   8 +-
 .../0.6.1-incubating/src/site/markdown/index.md |   2 +-
 .../src/site/markdown/recipes/lock_manager.md   |   4 +-
 .../markdown/recipes/rabbitmq_consumer_group.md |   6 +-
 .../recipes/rsync_replicated_file_store.md      |   4 +-
 .../site/markdown/recipes/service_discovery.md  |   4 +-
 .../site/markdown/recipes/task_dag_execution.md |   4 +-
 .../src/site/markdown/tutorial_messaging.md     |   2 +-
 .../src/site/markdown/tutorial_propstore.md     |   2 +-
 website/0.6.1-incubating/src/site/site.xml      |  10 +-
 .../src/site/xdoc/download.xml.vm               |   2 +-
 .../src/site/markdown/Building.md               |   4 +-
 .../src/site/markdown/Quickstart.md             |   8 +-
 .../0.6.2-incubating/src/site/markdown/index.md |   2 +-
 .../src/site/markdown/recipes/lock_manager.md   |   4 +-
 .../markdown/recipes/rabbitmq_consumer_group.md |   6 +-
 .../recipes/rsync_replicated_file_store.md      |   4 +-
 .../site/markdown/recipes/service_discovery.md  |   4 +-
 .../site/markdown/recipes/task_dag_execution.md |   4 +-
 .../src/site/markdown/tutorial_admin.md         |  10 +-
 .../src/site/markdown/tutorial_messaging.md     |   2 +-
 .../src/site/markdown/tutorial_propstore.md     |   2 +-
 .../src/site/markdown/tutorial_yaml.md          |   2 +-
 website/0.6.2-incubating/src/site/site.xml      |  10 +-
 .../src/site/xdoc/download.xml.vm               |   2 +-
 .../src/site/markdown/Building.md               |   4 +-
 .../src/site/markdown/Quickstart.md             |   8 +-
 .../0.7.0-incubating/src/site/markdown/index.md |   2 +-
 .../src/site/markdown/recipes/lock_manager.md   |   4 +-
 .../markdown/recipes/rabbitmq_consumer_group.md |   6 +-
 .../recipes/rsync_replicated_file_store.md      |   4 +-
 .../site/markdown/recipes/service_discovery.md  |   4 +-
 .../site/markdown/recipes/task_dag_execution.md |   4 +-
 .../markdown/recipes/user_def_rebalancer.md     |  16 +-
 .../src/site/markdown/tutorial_accessors.md     |  12 +-
 .../src/site/markdown/tutorial_admin.md         |  10 +-
 .../src/site/markdown/tutorial_messaging.md     |   2 +-
 .../src/site/markdown/tutorial_propstore.md     |   2 +-
 .../markdown/tutorial_user_def_rebalancer.md    |   4 +-
 .../src/site/markdown/tutorial_yaml.md          |   2 +-
 website/0.7.0-incubating/src/site/site.xml      |  10 +-
 .../src/site/xdoc/download.xml.vm               |   2 +-
 website/pom.xml                                 |   4 +-
 website/src/site/apt/releasing.apt              |  22 +-
 website/src/site/markdown/index.md              |  18 +-
 .../src/site/markdown/involved/contribdocs.md   |   4 +-
 website/src/site/markdown/sources.md            |   6 +-
 website/src/site/site.xml                       |   6 +-
 website/src/site/xdoc/download.xml.vm           |   2 +-
 website/trunk/src/site/markdown/Building.md     |   4 +-
 website/trunk/src/site/markdown/Quickstart.md   |   8 +-
 website/trunk/src/site/markdown/index.md        |   2 +-
 .../src/site/markdown/recipes/lock_manager.md   |   4 +-
 .../markdown/recipes/rabbitmq_consumer_group.md |   6 +-
 .../recipes/rsync_replicated_file_store.md      |   4 +-
 .../site/markdown/recipes/service_discovery.md  |   4 +-
 .../site/markdown/recipes/task_dag_execution.md |   4 +-
 .../markdown/recipes/user_def_rebalancer.md     |  16 +-
 .../src/site/markdown/tutorial_accessors.md     |  12 +-
 .../trunk/src/site/markdown/tutorial_admin.md   |  10 +-
 .../src/site/markdown/tutorial_messaging.md     |   2 +-
 .../src/site/markdown/tutorial_propstore.md     |   2 +-
 .../markdown/tutorial_user_def_rebalancer.md    |   4 +-
 .../trunk/src/site/markdown/tutorial_yaml.md    |   2 +-
 website/trunk/src/site/site.xml                 |  10 +-
 website/trunk/src/site/xdoc/download.xml.vm     |   2 +-
 78 files changed, 1130 insertions(+), 680 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/7fca871c/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --cc helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 7b19574,b15627a..28a5b06
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@@ -549,26 -528,12 +544,34 @@@ public class GenericHelixController imp
      _lastSeenSessions.set(curSessions);
  
    }
 -  
 +
+   public void shutdownClusterStatusMonitor(String clusterName) {
+     if (_clusterStatusMonitor != null) {
+       logger.info("Shut down _clusterStatusMonitor for cluster " + clusterName);
 -       _clusterStatusMonitor.reset();
 -       _clusterStatusMonitor = null;
++      _clusterStatusMonitor.reset();
++      _clusterStatusMonitor = null;
++    }
++  }
++
 +  private class ClusterEventProcessor extends Thread {
 +    @Override
 +    public void run() {
 +      logger.info("START ClusterEventProcessor thread");
 +      while (!isInterrupted()) {
 +        try {
 +          ClusterEvent event = _eventQueue.take();
 +          handleEvent(event);
 +        } catch (InterruptedException e) {
 +          logger.warn("ClusterEventProcessor interrupted", e);
 +          interrupt();
 +        } catch (ZkInterruptedException e) {
 +          logger.warn("ClusterEventProcessor caught a ZK connection interrupt", e);
 +          interrupt();
 +        } catch (Throwable t) {
 +          logger.error("ClusterEventProcessor failed while running the controller pipeline", t);
 +        }
 +      }
 +      logger.info("END ClusterEventProcessor thread");
      }
    }
  }

http://git-wip-us.apache.org/repos/asf/helix/blob/7fca871c/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
----------------------------------------------------------------------
diff --cc helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
index 977b661,dddb0c0..a15e6b3
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
@@@ -113,19 -116,16 +116,18 @@@ public class ExternalViewComputeStage e
          }
        }
  
-       // TODO fix this
        // Update cluster status monitor mbean
-       // ClusterStatusMonitor clusterStatusMonitor =
-       // (ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor");
-       // IdealState idealState = cache._idealStateMap.get(view.getResourceName());
-       // if (idealState != null) {
-       // if (clusterStatusMonitor != null
-       // && !idealState.getStateModelDefRef().equalsIgnoreCase(
-       // DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
-       // clusterStatusMonitor.onExternalViewChange(view,
-       // cache._idealStateMap.get(view.getResourceName()));
-       // }
-       // }
 -       ClusterStatusMonitor clusterStatusMonitor = (ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor");
 -       Resource currentResource = cluster.getResourceMap().get(view.getResourceId());
 -       if (currentResource != null) {
 -         IdealState idealState = currentResource.getIdealState();
 -         if (clusterStatusMonitor != null && 
 -             !idealState.getStateModelDefRef().equalsIgnoreCase(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
 -         clusterStatusMonitor.onExternalViewChange(view, idealState);
 -         }
 -       }
++      ClusterStatusMonitor clusterStatusMonitor =
++          (ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor");
++      Resource currentResource = cluster.getResourceMap().get(view.getResourceId());
++      if (currentResource != null) {
++        IdealState idealState = currentResource.getIdealState();
++        if (clusterStatusMonitor != null
++            && !idealState.getStateModelDefRef().equalsIgnoreCase(
++                DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
++          clusterStatusMonitor.onExternalViewChange(view, idealState);
++        }
++      }
  
        // compare the new external view with current one, set only on different
        ExternalView curExtView = curExtViews.get(resourceId.stringify());