You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2014/02/27 01:52:51 UTC

[1/2] [HELIX-319] refactor MonitoringClient to accommodate distributed monitoring server, rb=18455

Repository: helix
Updated Branches:
  refs/heads/helix-monitoring b182690f8 -> 3505beadb


http://git-wip-us.apache.org/repos/asf/helix/blob/3505bead/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestClientServerMonitoring.java
----------------------------------------------------------------------
diff --git a/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestClientServerMonitoring.java b/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestClientServerMonitoring.java
index 8b7f839..588bc35 100644
--- a/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestClientServerMonitoring.java
+++ b/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestClientServerMonitoring.java
@@ -19,204 +19,170 @@ package org.apache.helix.monitoring;
  * under the License.
  */
 
-import java.net.InetAddress;
 import java.util.Date;
 import java.util.List;
-import java.util.Map;
-import java.util.Random;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerFactory;
-import org.apache.helix.InstanceType;
+import org.apache.helix.MonitoringTestHelper;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
 import org.apache.helix.ZkUnitTestBase;
 import org.apache.helix.api.id.ClusterId;
-import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.integration.manager.ClusterControllerManager;
-import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
 import org.apache.helix.model.IdealState.RebalanceMode;
-import org.apache.helix.model.InstanceConfig;
-import org.apache.helix.model.Leader;
 import org.apache.helix.model.MonitoringConfig;
-import org.apache.helix.spectator.RoutingTableProvider;
+import org.apache.helix.model.builder.HelixConfigScopeBuilder;
+import org.apache.helix.tools.ClusterStateVerifier;
 import org.junit.Assert;
 import org.testng.annotations.Test;
 
-import com.google.common.collect.Maps;
+import com.aphyr.riemann.Proto.Event;
+import com.aphyr.riemann.client.RiemannClient;
 
 public class TestClientServerMonitoring extends ZkUnitTestBase {
   @Test
   public void testMonitoring() throws Exception {
-    final int NUM_PARTICIPANTS = 4;
+    final int NUM_PARTICIPANTS = 0;
     final int NUM_PARTITIONS = 8;
-    final int NUM_REPLICAS = 2;
+    final int NUM_REPLICAS = 1;
 
     String className = TestHelper.getTestClassName();
     String methodName = TestHelper.getTestMethodName();
     String clusterName = className + "_" + methodName;
     System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
 
-    // Set up cluster
+    // Set up monitoring cluster
     TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
         "localhost", // participant name prefix
-        "TestDB", // resource name prefix
+        "MonitoringService", // resource name prefix
         1, // resources
         NUM_PARTITIONS, // partitions per resource
         NUM_PARTICIPANTS, // number of nodes
         NUM_REPLICAS, // replicas
-        "MasterSlave", // pick a built-in state model
+        "OnlineOffline", // pick a built-in state model
         RebalanceMode.FULL_AUTO, // let Helix handle rebalancing
         true); // do rebalance
 
-    // start participants
-    MockParticipantManager[] participants = new MockParticipantManager[NUM_PARTICIPANTS];
-    for (int i = 0; i < NUM_PARTICIPANTS; i++) {
-      participants[i] =
-          new MockParticipantManager(ZK_ADDR, clusterName, "localhost_" + (12918 + i));
-      participants[i].syncStart();
-    }
-    HelixDataAccessor accessor = participants[0].getHelixDataAccessor();
-    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
-
-    // set a custom monitoring config
-    MonitoringConfig monitoringConfig = new MonitoringConfig("sampleMonitoringConfig");
-    monitoringConfig.setConfig(getMonitoringConfigString());
-    accessor.setProperty(keyBuilder.monitoringConfig("sampleMonitoringConfig"), monitoringConfig);
-
-    // start controller
+    // Enable auto-join
+    HelixConfigScope scope =
+        new HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER).forCluster(clusterName).build();
+    ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient);
+    configAccessor.set(scope, ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN, "" + true);
+
+    // Start controller
     ClusterControllerManager controller =
         new ClusterControllerManager(ZK_ADDR, clusterName, "controller");
-    controller.registerMonitoringServer(new RiemannMonitoringServer(InetAddress.getLocalHost()
-        .getHostName()));
     controller.syncStart();
 
-    // make sure the leader has registered and is showing the server port
-    Leader leader = accessor.getProperty(keyBuilder.controllerLeader());
-    Assert.assertNotNull(leader);
-    Assert.assertNotEquals(leader.getMonitoringPort(), -1);
-    Assert.assertNotNull(leader.getMonitoringHost());
+    // Start monitoring server
+    int port = MonitoringTestHelper.availableTcpPort();
+    MonitoringConfig monitoringConfig = new MonitoringConfig(RiemannConfigs.DEFAULT_RIEMANN_CONFIG);
+    monitoringConfig.setConfig(MonitoringTestHelper.getRiemannConfigString(port));
+
+    RiemannConfigs.Builder builder = new RiemannConfigs.Builder().addConfig(monitoringConfig);
+    RiemannMonitoringServer server = new RiemannMonitoringServer(builder.build());
+    server.start();
+
+    // Start Riemann agent
+    RiemannAgent agent = new RiemannAgent(ZK_ADDR, clusterName, port);
+    agent.start();
+
+    // Check live-instance
+    final HelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+    final PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    List<String> liveInstances = accessor.getChildNames(keyBuilder.liveInstances());
+    Assert.assertNotNull(liveInstances);
+    Assert.assertEquals(liveInstances.size(), 1);
+
+    // Check external-view
+    boolean result =
+        ClusterStateVerifier
+            .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+                clusterName));
+    Assert.assertTrue(result);
+
+    // Connect monitoring client
+    final RiemannMonitoringClient client =
+        new RiemannMonitoringClient(ZK_ADDR, ClusterId.from(clusterName),
+            ResourceId.from("MonitoringService0"), 1);
+    client.connect();
 
-    // run the spectator
-    spectate(clusterName, "TestDB0", NUM_PARTITIONS);
+    final RiemannClient rclient = RiemannClient.tcp("localhost", port);
+    rclient.connect();
 
-    // stop participants
-    for (MockParticipantManager participant : participants) {
-      participant.syncStop();
-    }
+    // Test MonitoringEvent#send()
+    MonitoringEvent event = new MonitoringEvent().tag("test").ttl(5);
+    client.send(ResourceId.from("TestDB"), event, false);
 
-    // stop controller
-    controller.syncStop();
-  }
+    // Check monitoring server has received the event with tag="test"
+    result = TestHelper.verify(new TestHelper.Verifier() {
 
-  private String getMonitoringConfigString() {
-    StringBuilder sb =
-        new StringBuilder()
-            .append("(defn parse-int\r\n")
-            .append(
-                "  \"Convert a string to an integer\"\r\n  [instr]\r\n  (Integer/parseInt instr))\r\n\r\n")
-            .append("(defn parse-double\r\n  \"Convert a string into a double\"\r\n  [instr]\r\n")
-            .append("  (Double/parseDouble instr))\r\n\r\n(defn check-failure-rate\r\n")
-            .append(
-                "  \"Check if the event should trigger an alarm based on failure rate\"\r\n  [e]\r\n")
-            .append(
-                "  (let [writeCount (parse-int (:writeCount e)) failedCount (parse-int (:failedCount e))]\r\n")
-            .append(
-                "    (if (> writeCount 0)\r\n      (let [ratio (double (/ failedCount writeCount))]\r\n")
-            .append("        (if (> ratio 0.1) ; Report if the failure count exceeds 10%\r\n")
-            .append(
-                "          (prn (:host e) \"has an unacceptable failure rate of\" ratio))))))\r\n\r\n")
-            .append(
-                "(defn check-95th-latency\r\n  \"Check if the 95th percentile latency is within expectations\"\r\n")
-            .append("  [e]\r\n  (let [latency (parse-double (:latency95 e))]\r\n")
-            .append(
-                "    (if (> latency 1.0) ; Report if the 95th percentile latency exceeds 1.0s\r\n")
-            .append(
-                "      (prn (:host e) \"has an unacceptable 95th percentile latency of\" latency))))\r\n\r\n")
-            .append("(streams\r\n  (where\r\n    (service #\".*LatencyReport.*\")")
-            .append(
-                " ; Only process services containing LatencyReport\r\n    check-failure-rate\r\n")
-            .append("    check-95th-latency))");
-    return sb.toString();
-  }
+      @Override
+      public boolean verify() throws Exception {
+        List<Event> events = rclient.query("tagged \"test\"");
+        return (events.size() == 1) && (events.get(0).getTagsCount() == 1)
+            && (events.get(0).getTags(0).equals("test"));
+      }
+    }, 5 * 1000);
+    Assert.assertTrue(result);
 
-  private void spectate(final String clusterName, final String resourceName, final int numPartitions)
-      throws Exception {
-    final Random random = new Random();
-    final ClusterId clusterId = ClusterId.from(clusterName);
-    final ResourceId resourceId = ResourceId.from(resourceName);
+    // Test MonitoringEvent#sendAndFlush()
+    MonitoringEvent event2 = new MonitoringEvent().tag("test2").ttl(5);
+    client.sendAndFlush(ResourceId.from("TestDB2"), event2);
 
-    // Connect to Helix
-    final HelixManager manager =
-        HelixManagerFactory.getZKHelixManager(clusterName, null, InstanceType.SPECTATOR, ZK_ADDR);
-    manager.connect();
+    // Check monitoring server has received the event with tag="test2"
+    result = TestHelper.verify(new TestHelper.Verifier() {
 
-    // Attach a monitoring client to this connection
-    final MonitoringClient client =
-        new RiemannMonitoringClient(clusterId, manager.getHelixDataAccessor());
-    client.connect();
+      @Override
+      public boolean verify() throws Exception {
+        List<Event> events = rclient.query("tagged \"test2\"");
+        return (events.size() == 1) && (events.get(0).getTagsCount() == 1)
+            && (events.get(0).getTags(0).equals("test2"));
+      }
+    }, 5 * 1000);
+    Assert.assertTrue(result);
 
-    // Start spectating
-    final RoutingTableProvider routingTableProvider = new RoutingTableProvider();
-    manager.addExternalViewChangeListener(routingTableProvider);
+    // Test MonitoringEvent#every()
+    client.every(ResourceId.from("TestDB3"), 1, 0, TimeUnit.SECONDS, new Runnable() {
 
-    // Send some metrics
-    client.every(5, 0, TimeUnit.SECONDS, new Runnable() {
       @Override
       public void run() {
-        Map<ParticipantId, Integer> writeCounts = Maps.newHashMap();
-        Map<ParticipantId, Integer> failedCounts = Maps.newHashMap();
-        Map<ParticipantId, Double> latency95Map = Maps.newHashMap();
-        for (int i = 0; i < numPartitions; i++) {
-          // Figure out who hosts what
-          PartitionId partitionId = PartitionId.from(resourceId, i + "");
-          List<InstanceConfig> instances =
-              routingTableProvider.getInstances(resourceName, partitionId.stringify(), "MASTER");
-          if (instances.size() < 1) {
-            continue;
-          }
-
-          // Normally you would get these attributes by using a CallTracker
-          ParticipantId participantId = instances.get(0).getParticipantId();
-          int writeCount = random.nextInt(1000) + 10;
-          if (!writeCounts.containsKey(participantId)) {
-            writeCounts.put(participantId, writeCount);
-          } else {
-            writeCounts.put(participantId, writeCounts.get(participantId) + writeCount);
-          }
-          int failedCount = i != 0 ? 0 : writeCount / 2; // bad write count from p0 master
-          if (!failedCounts.containsKey(participantId)) {
-            failedCounts.put(participantId, failedCount);
-          } else {
-            failedCounts.put(participantId, failedCounts.get(participantId) + failedCount);
-          }
-          double latency = (i != 1) ? 0.001 : 5.000; // bad 95th latency from p1 master
-          latency95Map.put(participantId, latency);
-        }
-
-        // Send everything grouped by participant
-        for (ParticipantId participantId : writeCounts.keySet()) {
-          Map<String, String> attributes = Maps.newHashMap();
-          attributes.put("writeCount", writeCounts.get(participantId) + "");
-          attributes.put("failedCount", failedCounts.get(participantId) + "");
-          attributes.put("latency95", latency95Map.get(participantId) + "");
-
-          // Send an event with a ttl long enough to span the send interval
-          MonitoringEvent e =
-              new MonitoringEvent().cluster(clusterId).resource(resourceId)
-                  .participant(participantId).name("LatencyReport").attributes(attributes)
-                  .eventState("update").ttl(10.0f);
-          client.send(e, false);
-        }
+        MonitoringEvent event3 =
+            new MonitoringEvent().tag("test3").resource(ResourceId.from("db" + System.currentTimeMillis())).ttl(5);
+        client.send(ResourceId.from("TestDB3"), event3, false);
       }
     });
-    Thread.sleep(60000);
+
+    // Check monitoring server has received at least 2 event2 with tag="test3"
+    result = TestHelper.verify(new TestHelper.Verifier() {
+
+      @Override
+      public boolean verify() throws Exception {
+        List<Event> events = rclient.query("tagged \"test3\"");
+        return (events.size() > 2) && (events.get(0).getTagsCount() == 1)
+            && (events.get(0).getTags(0).equals("test3"));
+      }
+    }, 10 * 1000);
+    Assert.assertTrue(result);
+
+    // Stop client
     client.disconnect();
-    manager.disconnect();
+    rclient.disconnect();
+
+    // Stop controller
+    controller.syncStop();
+
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
   }
 
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/3505bead/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestRiemannAgent.java
----------------------------------------------------------------------
diff --git a/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestRiemannAgent.java b/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestRiemannAgent.java
new file mode 100644
index 0000000..100c28c
--- /dev/null
+++ b/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestRiemannAgent.java
@@ -0,0 +1,129 @@
+package org.apache.helix.monitoring;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+import java.util.List;
+
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.MonitoringTestHelper;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.MonitoringConfig;
+import org.apache.helix.model.builder.HelixConfigScopeBuilder;
+import org.apache.helix.monitoring.RiemannConfigs.Builder;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Lists;
+
+public class TestRiemannAgent extends ZkUnitTestBase {
+  @Test
+  public void testStartAndStop() throws Exception {
+    final int NUM_PARTICIPANTS = 0;
+    final int NUM_PARTITIONS = 4;
+    final int NUM_REPLICAS = 1;
+
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    // Set up monitoring cluster
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        "MonitoringService", // resource name prefix
+        1, // resources
+        NUM_PARTITIONS, // partitions per resource
+        NUM_PARTICIPANTS, // number of nodes
+        NUM_REPLICAS, // replicas
+        "OnlineOffline", // pick a built-in state model
+        RebalanceMode.FULL_AUTO, // let Helix handle rebalancing
+        true); // do rebalance
+
+    // Enable auto-join
+    HelixConfigScope scope =
+        new HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER).forCluster(clusterName).build();
+    ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient);
+    configAccessor.set(scope, ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN, "" + true);
+
+    // start controller
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller");
+    controller.syncStart();
+
+    // Start monitoring server
+    int port = MonitoringTestHelper.availableTcpPort();
+    MonitoringConfig monitoringConfig = new MonitoringConfig(RiemannConfigs.DEFAULT_RIEMANN_CONFIG);
+    monitoringConfig.setConfig(MonitoringTestHelper.getRiemannConfigString(port));
+
+    RiemannConfigs.Builder builder = new Builder().addConfig(monitoringConfig);
+    RiemannMonitoringServer server = new RiemannMonitoringServer(builder.build());
+    server.start();
+
+    // Start Riemann agent
+    RiemannAgent agent = new RiemannAgent(ZK_ADDR, clusterName, port);
+    agent.start();
+
+    // Check live-instance
+    final HelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+    final PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    List<String> liveInstances = accessor.getChildNames(keyBuilder.liveInstances());
+    Assert.assertNotNull(liveInstances);
+    Assert.assertEquals(liveInstances.size(), 1);
+
+    // Check external-view
+    boolean result =
+        ClusterStateVerifier
+            .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+                clusterName));
+    Assert.assertTrue(result);
+
+    // Stop monitoring server
+    server.stop();
+
+    result = TestHelper.verify(new TestHelper.Verifier() {
+
+      @Override
+      public boolean verify() throws Exception {
+        List<String> liveInstances = accessor.getChildNames(keyBuilder.liveInstances());
+        return liveInstances != null && liveInstances.size() == 0;
+      }
+    }, 15 * 1000);
+    Assert.assertTrue(result, "RiemannAgent should be disconnected if RiemannServer is stopped");
+
+    // Stop controller
+    controller.syncStop();
+
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/3505bead/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestRiemannMonitoringServer.java
----------------------------------------------------------------------
diff --git a/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestRiemannMonitoringServer.java b/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestRiemannMonitoringServer.java
new file mode 100644
index 0000000..f7f45f9
--- /dev/null
+++ b/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestRiemannMonitoringServer.java
@@ -0,0 +1,79 @@
+package org.apache.helix.monitoring;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.Date;
+
+import org.apache.helix.MonitoringTestHelper;
+import org.apache.helix.TestHelper;
+import org.apache.helix.model.MonitoringConfig;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.aphyr.riemann.client.RiemannClient;
+import com.google.common.collect.Lists;
+
+public class TestRiemannMonitoringServer {
+
+  @Test
+  public void testBasic() throws IOException {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String testName = className + "_" + methodName;
+
+    System.out.println("START " + testName + " at " + new Date(System.currentTimeMillis()));
+
+    int port = MonitoringTestHelper.availableTcpPort();
+    MonitoringConfig monitoringConfig = new MonitoringConfig(RiemannConfigs.DEFAULT_RIEMANN_CONFIG);
+    monitoringConfig.setConfig(MonitoringTestHelper.getRiemannConfigString(port));
+
+    RiemannConfigs.Builder builder = new RiemannConfigs.Builder().addConfig(monitoringConfig);
+    RiemannMonitoringServer server = new RiemannMonitoringServer(builder.build());
+
+    // Check server starts
+    server.start();
+    Assert.assertTrue(server.isStarted());
+
+    RiemannClient rclient = null;
+    try {
+      rclient = RiemannClient.tcp("localhost", port);
+      rclient.connect();
+    } catch (IOException e) {
+      Assert.fail("Riemann server should start on port: " + port);
+    }
+
+    // Check server stops
+    Assert.assertNotNull(rclient);
+    rclient.disconnect();
+    server.stop();
+    Assert.assertFalse(server.isStarted());
+
+    try {
+      rclient = RiemannClient.tcp("localhost", port);
+      rclient.connect();
+      Assert.fail("Riemann server should be stopped on port: " + port);
+    } catch (IOException e) {
+      // ok
+    }
+
+    System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
+  }
+}


[2/2] git commit: [HELIX-319] refactor MonitoringClient to accommodate distributed monitoring server, rb=18455

Posted by zz...@apache.org.
[HELIX-319] refactor MonitoringClient to accommodate distributed monitoring server, rb=18455


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/3505bead
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/3505bead
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/3505bead

Branch: refs/heads/helix-monitoring
Commit: 3505beadbc0f0b0f5cf3e804e23e0ee1ca6c2fb2
Parents: b182690
Author: zzhang <zz...@apache.org>
Authored: Wed Feb 26 16:52:26 2014 -0800
Committer: zzhang <zz...@apache.org>
Committed: Wed Feb 26 16:52:26 2014 -0800

----------------------------------------------------------------------
 .../org/apache/helix/HelixAutoController.java   |   4 +-
 .../java/org/apache/helix/HelixController.java  |   4 +-
 .../java/org/apache/helix/HelixManager.java     |   4 +-
 .../manager/zk/HelixConnectionAdaptor.java      |  36 --
 .../apache/helix/manager/zk/ZKHelixManager.java |  28 +-
 .../helix/manager/zk/ZkHelixAutoController.java |  10 -
 .../helix/manager/zk/ZkHelixController.java     |  26 +-
 .../helix/manager/zk/ZkHelixLeaderElection.java |   6 +-
 .../helix/monitoring/MonitoringClient.java      |  16 +-
 .../helix/monitoring/MonitoringServer.java      |  19 -
 .../helix/monitoring/MonitoringServerOwner.java |  37 --
 .../src/test/java/org/apache/helix/Mocks.java   |  12 -
 .../org/apache/helix/MonitoringTestHelper.java  | 114 +++++
 .../controller/stages/DummyClusterManager.java  |  11 -
 .../helix/participant/MockZKHelixManager.java   |  12 -
 .../monitoring/RiemannMonitoringClient.java     | 428 +++++++++++++------
 helix-monitor-server/pom.xml                    |   5 +
 .../apache/helix/monitoring/RiemannAgent.java   | 123 ++++++
 .../monitoring/RiemannAgentStateModel.java      |  54 +++
 .../RiemannAgentStateModelFactory.java          |  29 ++
 .../helix/monitoring/RiemannAlertProxy.java     | 111 +++++
 .../apache/helix/monitoring/RiemannConfigs.java | 116 +++++
 .../monitoring/RiemannMonitoringServer.java     | 133 +-----
 .../helix/monitoring/BasicMonitoringTest.java   |  93 ----
 .../helix/monitoring/IntegrationTest.java       | 206 +++++++++
 .../monitoring/TestClientServerMonitoring.java  | 254 +++++------
 .../helix/monitoring/TestRiemannAgent.java      | 129 ++++++
 .../monitoring/TestRiemannMonitoringServer.java |  79 ++++
 28 files changed, 1402 insertions(+), 697 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/3505bead/helix-core/src/main/java/org/apache/helix/HelixAutoController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixAutoController.java b/helix-core/src/main/java/org/apache/helix/HelixAutoController.java
index fdab2a6..04cbf2d 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixAutoController.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixAutoController.java
@@ -20,14 +20,12 @@ package org.apache.helix;
  */
 
 import org.apache.helix.api.id.ControllerId;
-import org.apache.helix.monitoring.MonitoringServerOwner;
 import org.apache.helix.participant.StateMachineEngine;
 
 /**
  * Autonomous controller
  */
-public interface HelixAutoController extends HelixRole, HelixService, HelixConnectionStateListener,
-    MonitoringServerOwner {
+public interface HelixAutoController extends HelixRole, HelixService, HelixConnectionStateListener {
   /**
    * get controller id
    * @return controller id

http://git-wip-us.apache.org/repos/asf/helix/blob/3505bead/helix-core/src/main/java/org/apache/helix/HelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixController.java b/helix-core/src/main/java/org/apache/helix/HelixController.java
index 098fd96..9565cfb 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixController.java
@@ -20,10 +20,8 @@ package org.apache.helix;
  */
 
 import org.apache.helix.api.id.ControllerId;
-import org.apache.helix.monitoring.MonitoringServerOwner;
 
-public interface HelixController extends HelixRole, HelixService, HelixConnectionStateListener,
-    MonitoringServerOwner {
+public interface HelixController extends HelixRole, HelixService, HelixConnectionStateListener {
 
   /**
    * get controller id

http://git-wip-us.apache.org/repos/asf/helix/blob/3505bead/helix-core/src/main/java/org/apache/helix/HelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixManager.java b/helix-core/src/main/java/org/apache/helix/HelixManager.java
index 17c94e5..75e9196 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixManager.java
@@ -25,7 +25,6 @@ import org.apache.helix.controller.GenericHelixController;
 import org.apache.helix.healthcheck.ParticipantHealthReportCollector;
 import org.apache.helix.manager.zk.ZKHelixManager;
 import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
-import org.apache.helix.monitoring.MonitoringServerOwner;
 import org.apache.helix.participant.HelixStateMachineEngine;
 import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.spectator.RoutingTableProvider;
@@ -42,7 +41,6 @@ import org.apache.helix.store.zk.ZkHelixPropertyStore;
  *    // ROLE can be participant, spectator or a controller<br/>
  * manager.connect();
  * manager.addSOMEListener();
- * manager.start()
  * After start is invoked the subsequent interactions will be via listener onChange callbacks
  * There will be 3 scenarios for onChange callback, which can be determined using NotificationContext.type
  * INIT -> will be invoked the first time the listener is added
@@ -56,7 +54,7 @@ import org.apache.helix.store.zk.ZkHelixPropertyStore;
  * @see RoutingTableProvider RoutingTableProvider for spectator
  * @see GenericHelixController RoutingTableProvider for controller
  */
-public interface HelixManager extends MonitoringServerOwner {
+public interface HelixManager {
 
   public static final String ALLOW_PARTICIPANT_AUTO_JOIN =
       ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN;

http://git-wip-us.apache.org/repos/asf/helix/blob/3505bead/helix-core/src/main/java/org/apache/helix/manager/zk/HelixConnectionAdaptor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/HelixConnectionAdaptor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/HelixConnectionAdaptor.java
index 65a192a..a2f6d9e 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/HelixConnectionAdaptor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/HelixConnectionAdaptor.java
@@ -315,40 +315,4 @@ public class HelixConnectionAdaptor implements HelixManager {
     _connection.addControllerMessageListener(_role, listener, _clusterId);
   }
 
-  @Override
-  public void registerMonitoringServer(MonitoringServer monitoringServer) {
-    switch (_role.getType()) {
-    case CONTROLLER:
-      HelixController controller = (HelixController) _role;
-      controller.registerMonitoringServer(monitoringServer);
-      break;
-    case CONTROLLER_PARTICIPANT:
-      HelixAutoController autoController = (HelixAutoController) _role;
-      autoController.registerMonitoringServer(monitoringServer);
-      break;
-    default:
-      LOG.error("A non-controller cannot own a monitoring server!");
-      break;
-    }
-  }
-
-  @Override
-  public MonitoringServer getMonitoringServer() {
-    MonitoringServer server = null;
-    switch (_role.getType()) {
-    case CONTROLLER:
-      HelixController controller = (HelixController) _role;
-      server = controller.getMonitoringServer();
-      break;
-    case CONTROLLER_PARTICIPANT:
-      HelixAutoController autoController = (HelixAutoController) _role;
-      server = autoController.getMonitoringServer();
-      break;
-    default:
-      LOG.error("Cannot get a monitoring server for a non-controller (" + _role.getType() + ")!");
-      break;
-    }
-    return server;
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/3505bead/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index 83eba53..b7b3381 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -66,6 +66,7 @@ import org.apache.helix.messaging.DefaultMessagingService;
 import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
 import org.apache.helix.model.Leader;
 import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.MonitoringConfig;
 import org.apache.helix.monitoring.MonitoringServer;
 import org.apache.helix.monitoring.ZKPathDataDumpTask;
 import org.apache.helix.participant.HelixStateMachineEngine;
@@ -106,7 +107,6 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
   private ConfigAccessor _configAccessor;
   private ZkHelixPropertyStore<ZNRecord> _helixPropertyStore;
   protected LiveInstanceInfoProvider _liveInstanceInfoProvider = null;
-  private MonitoringServer _monitoringServer;
 
   private volatile String _sessionId;
 
@@ -214,8 +214,6 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
     _sessionTimeout =
         getSystemPropertyAsInt("zk.session.timeout", ZkClient.DEFAULT_SESSION_TIMEOUT);
 
-    _monitoringServer = null;
-
     /**
      * instance type specific init
      */
@@ -548,11 +546,6 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
        */
       _messagingService.getExecutor().shutdown();
 
-      // stop monitoring
-      if (_monitoringServer != null && _monitoringServer.isStarted()) {
-        _monitoringServer.stop();
-      }
-
       // TODO reset user defined handlers only
       resetHandlers();
 
@@ -867,16 +860,6 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
     initHandlers(_handlers);
   }
 
-  @Override
-  public void registerMonitoringServer(MonitoringServer monitoringServer) {
-    _monitoringServer = monitoringServer;
-  }
-
-  @Override
-  public MonitoringServer getMonitoringServer() {
-    return _monitoringServer;
-  }
-
   void handleNewSessionAsParticipant() throws Exception {
     /**
      * auto-join
@@ -907,15 +890,6 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
   }
 
   void handleNewSessionAsController() {
-    // get the monitoring service up
-    if (_monitoringServer != null) {
-      if (_monitoringServer.isStarted()) {
-        _monitoringServer.stop();
-      }
-      _monitoringServer.addConfigs(_dataAccessor);
-      _monitoringServer.start();
-    }
-
     // get the leader election process going
     if (_leaderElectionHandler != null) {
       _leaderElectionHandler.init();

http://git-wip-us.apache.org/repos/asf/helix/blob/3505bead/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixAutoController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixAutoController.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixAutoController.java
index 3967ed9..d17192f 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixAutoController.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixAutoController.java
@@ -131,14 +131,4 @@ public class ZkHelixAutoController implements HelixAutoController {
     return _controller.isLeader();
   }
 
-  @Override
-  public void registerMonitoringServer(MonitoringServer server) {
-    _controller.registerMonitoringServer(server);
-  }
-
-  @Override
-  public MonitoringServer getMonitoringServer() {
-    return _controller.getMonitoringServer();
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/3505bead/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java
index 9da59b9..75d15b9 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java
@@ -42,6 +42,7 @@ import org.apache.helix.healthcheck.HealthStatsAggregator;
 import org.apache.helix.messaging.DefaultMessagingService;
 import org.apache.helix.messaging.handling.MessageHandlerFactory;
 import org.apache.helix.model.Leader;
+import org.apache.helix.model.MonitoringConfig;
 import org.apache.helix.monitoring.MonitoringServer;
 import org.apache.helix.monitoring.StatusDumpTask;
 import org.apache.log4j.Logger;
@@ -59,7 +60,6 @@ public class ZkHelixController implements HelixController {
   final HelixDataAccessor _accessor;
   final HelixManager _manager;
   final ZkHelixLeaderElection _leaderElection;
-  MonitoringServer _monitoringServer;
 
   public ZkHelixController(ZkHelixConnection connection, ClusterId clusterId,
       ControllerId controllerId) {
@@ -79,7 +79,6 @@ public class ZkHelixController implements HelixController {
     _timerTasks.add(new HealthStatsAggregationTask(new HealthStatsAggregator(_manager)));
     _timerTasks.add(new StatusDumpTask(clusterId, _manager.getHelixDataAccessor()));
 
-    _monitoringServer = null;
   }
 
   void startTimerTasks() {
@@ -118,11 +117,6 @@ public class ZkHelixController implements HelixController {
      */
     _connection.resetHandlers(this);
 
-    // stop the monitoring service if it's running
-    if (_monitoringServer != null && _monitoringServer.isStarted()) {
-      _monitoringServer.stop();
-    }
-
   }
 
   void init() {
@@ -135,14 +129,6 @@ public class ZkHelixController implements HelixController {
     }
 
     /**
-     * start the monitoring service if it exists; this must happen before leader election
-     */
-    if (_monitoringServer != null) {
-      _monitoringServer.addConfigs(_accessor);
-      _monitoringServer.start();
-    }
-
-    /**
      * leader-election listener should be reset/init before all other controller listeners;
      * it's ok to add a listener multiple times, since we check existence in
      * ZkHelixConnection#addXXXListner()
@@ -213,16 +199,6 @@ public class ZkHelixController implements HelixController {
     return false;
   }
 
-  @Override
-  public void registerMonitoringServer(MonitoringServer server) {
-    _monitoringServer = server;
-  }
-
-  @Override
-  public MonitoringServer getMonitoringServer() {
-    return _monitoringServer;
-  }
-
   void addListenersToController(GenericHelixController pipeline) {
     try {
       /**

http://git-wip-us.apache.org/repos/asf/helix/blob/3505bead/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixLeaderElection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixLeaderElection.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixLeaderElection.java
index 3c5b3db..c69d1a3 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixLeaderElection.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixLeaderElection.java
@@ -159,11 +159,7 @@ public class ZkHelixLeaderElection implements ControllerChangeListener {
       } else {
         LOG.warn("ZKPropertyTransferServer instance is null");
       }
-      MonitoringServer server = manager.getMonitoringServer();
-      if (server != null) {
-        leader.setMonitoringHost(server.getHost());
-        leader.setMonitoringPort(server.getPort());
-      }
+
       boolean success = accessor.createProperty(keyBuilder.controllerLeader(), leader);
       if (success) {
         return true;

http://git-wip-us.apache.org/repos/asf/helix/blob/3505bead/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringClient.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringClient.java b/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringClient.java
index a794eab..743f8b4 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringClient.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringClient.java
@@ -21,14 +21,17 @@ package org.apache.helix.monitoring;
 
 import java.util.concurrent.TimeUnit;
 
+import org.apache.helix.api.id.ResourceId;
+
 /**
  * Interface for a client that can register with a monitoring server and send events for monitoring
  */
 public interface MonitoringClient {
   /**
    * Connect. May be asynchronous.
+   * @throws Exception
    */
-  void connect();
+  void connect() throws Exception;
 
   /**
    * Disconnect synchronously.
@@ -37,27 +40,30 @@ public interface MonitoringClient {
 
   /**
    * Send an event
+   * @param resource
    * @param e the event
    * @param batch true if this should be part of a batch operation
    * @return true if the event was sent (or queued for batching), false otherwise
    */
-  boolean send(MonitoringEvent e, boolean batch);
+  boolean send(ResourceId resource, MonitoringEvent e, boolean batch);
 
   /**
    * Send an event and flush any outstanding messages
+   * @param resource
    * @param e the event
    * @return true if events were successfully sent, false otherwise
    */
-  boolean sendAndFlush(MonitoringEvent e);
+  boolean sendAndFlush(ResourceId resource, MonitoringEvent e);
 
   /**
    * Schedule an operation to run
+   * @param resource
    * @param interval the frequency
    * @param delay the amount of time to wait before the first execution
    * @param unit the unit of time to use
    * @param r the code to run
    */
-  void every(long interval, long delay, TimeUnit unit, Runnable r);
+  void every(ResourceId resource, long interval, long delay, TimeUnit unit, Runnable r);
 
   /**
    * Check if there is a valid connection to a monitoring server
@@ -79,7 +85,7 @@ public interface MonitoringClient {
 
   /**
    * Flush all outstanding events
-   * @return true if the events were flushed, false otherwise
+   * @return true if all events were flushed, false otherwise
    */
   boolean flush();
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/3505bead/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringServer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringServer.java b/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringServer.java
index 3168bb2..b8c5a4c 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringServer.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringServer.java
@@ -19,8 +19,6 @@ package org.apache.helix.monitoring;
  * under the License.
  */
 
-import org.apache.helix.HelixDataAccessor;
-
 /**
  * Generic interface for a monitoring service that should be attached to a controller.
  */
@@ -36,26 +34,9 @@ public interface MonitoringServer {
   public void stop();
 
   /**
-   * Add a collection of configuration files
-   * @param accessor HelixDataAccessor that can reach Helix's backing store
-   */
-  public void addConfigs(HelixDataAccessor accessor);
-
-  /**
    * Check if the service has been started
    * @return true if started, false otherwise
    */
   public boolean isStarted();
 
-  /**
-   * Get the host of the service
-   * @return String hostname
-   */
-  public String getHost();
-
-  /**
-   * Get the port of the service
-   * @return integer port
-   */
-  public int getPort();
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/3505bead/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringServerOwner.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringServerOwner.java b/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringServerOwner.java
deleted file mode 100644
index 8e00c54..0000000
--- a/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringServerOwner.java
+++ /dev/null
@@ -1,37 +0,0 @@
-package org.apache.helix.monitoring;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * Interface to implement for Helix components that can manage a monitoring server
- */
-public interface MonitoringServerOwner {
-  /**
-   * Attach a monitoring server to this controller
-   * @param server MonitoringServer implementation
-   */
-  void registerMonitoringServer(MonitoringServer server);
-
-  /**
-   * Get the monitoring service, if any, registered with this connection
-   * @return a MonitoringService, or null if none
-   */
-  MonitoringServer getMonitoringServer();
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/3505bead/helix-core/src/test/java/org/apache/helix/Mocks.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/Mocks.java b/helix-core/src/test/java/org/apache/helix/Mocks.java
index 87d4e68..d170d06 100644
--- a/helix-core/src/test/java/org/apache/helix/Mocks.java
+++ b/helix-core/src/test/java/org/apache/helix/Mocks.java
@@ -479,18 +479,6 @@ public class Mocks {
 
     }
 
-    @Override
-    public void registerMonitoringServer(MonitoringServer monitoringServer) {
-      // TODO Auto-generated method stub
-
-    }
-
-    @Override
-    public MonitoringServer getMonitoringServer() {
-      // TODO Auto-generated method stub
-      return null;
-    }
-
   }
 
   public static class MockAccessor implements HelixDataAccessor // DataAccessor

http://git-wip-us.apache.org/repos/asf/helix/blob/3505bead/helix-core/src/test/java/org/apache/helix/MonitoringTestHelper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/MonitoringTestHelper.java b/helix-core/src/test/java/org/apache/helix/MonitoringTestHelper.java
new file mode 100644
index 0000000..c17dab0
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/MonitoringTestHelper.java
@@ -0,0 +1,114 @@
+package org.apache.helix;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.net.ServerSocket;
+
+import org.I0Itec.zkclient.NetworkUtil;
+
+public class MonitoringTestHelper {
+  static final int MAX_PORT = 65535;
+
+  /**
+   * generate a default riemann.config
+   * @param riemannPort
+   * @return
+   */
+  public static String getRiemannConfigString(int riemannPort) {
+    StringBuilder sb = new StringBuilder();
+    sb.append("(logging/init :file \"/dev/null\")\n\n")
+        .append("(tcp-server :host \"0.0.0.0\" :port " + riemannPort + ")\n\n")
+        .append("(instrumentation {:interval 1})\n\n")
+        .append("; (udp-server :host \"0.0.0.0\")\n")
+        .append("; (ws-server :host \"0.0.0.0\")\n")
+        .append("; (repl-server :host \"0.0.0.0\")\n\n")
+        .append("(periodically-expire 1)\n\n")
+        .append(
+            "(let [index (default :ttl 3 (update-index (index)))]\n  (streams\n    (expired prn)\n    index))\n");
+
+    return sb.toString();
+  }
+
+  /**
+   * generate a test config for checking latency
+   * @param proxyPort
+   * @return
+   */
+  public static String getLatencyCheckConfigString(int proxyPort)
+  {
+    StringBuilder sb = new StringBuilder();
+    sb.append("(require 'riemann.config)\n")
+      .append("(require 'clj-http.client)\n\n")
+      .append("(defn parse-double\n  \"Convert a string into a double\"\n  ")
+      .append("[instr]\n  (Double/parseDouble instr))\n\n")
+      .append("(defn check-95th-latency\n  \"Check if the 95th percentile latency is within expectations\"\n  ")
+      .append("[e]\n  (let [latency (parse-double (:latency95 e))]\n    ")
+      .append("(if (> latency 1.0) \n      ; Report if the 95th percentile latency exceeds 1.0s\n      ")
+      .append("(do (prn (:host e) \"has an unacceptable 95th percentile latency of\" latency)\n      ")
+      .append("(let [alert-name-str (str \"(\" (:cluster e) \".%.\" (:host e) \")(latency95)>(1000)\" )\n        ")
+      .append("proxy-url (str \"http://localhost:\" " + proxyPort + " )]\n        ")
+      .append("(clj-http.client/post proxy-url {:body alert-name-str }))))))\n\n")
+      .append("(streams\n  (where\n    ; Only process services containing LatencyReport\n    ")
+      .append("(and (service #\".*LatencyReport.*\") (not (state \"expired\")))\n    ")
+      .append("check-95th-latency))\n");
+    
+    return sb.toString();
+  }
+  
+  /**
+   * find an available tcp port
+   * @return
+   */
+  public static int availableTcpPort() {
+    ServerSocket ss = null;
+    try {
+      ss = new ServerSocket(0);
+      ss.setReuseAddress(true);
+      return ss.getLocalPort();
+    } catch (IOException e) {
+      // ok
+    } finally {
+      if (ss != null) {
+        try {
+          ss.close();
+        } catch (IOException e) {
+          // should not be thrown
+        }
+      }
+    }
+    return -1;
+  }
+
+  /**
+   * find the first available port starting from startPort inclusive
+   * @param startPort
+   * @return
+   */
+  public static int availableTcpPort(int startPort) {
+    int port = startPort;
+    for (; port <= MAX_PORT; port++) {
+      if (NetworkUtil.isPortFree(port))
+        break;
+    }
+
+    return port > MAX_PORT ? -1 : port;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/3505bead/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java b/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
index e07f0b5..f73b368 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
@@ -262,15 +262,4 @@ public class DummyClusterManager implements HelixManager {
 
   }
 
-  @Override
-  public void registerMonitoringServer(MonitoringServer monitoringServer) {
-    // TODO Auto-generated method stub
-
-  }
-
-  @Override
-  public MonitoringServer getMonitoringServer() {
-    // TODO Auto-generated method stub
-    return null;
-  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/3505bead/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java b/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
index 0b8395e..6ac0fa3 100644
--- a/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
+++ b/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
@@ -268,16 +268,4 @@ public class MockZKHelixManager implements HelixManager {
 
   }
 
-  @Override
-  public void registerMonitoringServer(MonitoringServer monitoringServer) {
-    // TODO Auto-generated method stub
-
-  }
-
-  @Override
-  public MonitoringServer getMonitoringServer() {
-    // TODO Auto-generated method stub
-    return null;
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/3505bead/helix-monitor-client/src/main/java/org/apache/helix/monitoring/RiemannMonitoringClient.java
----------------------------------------------------------------------
diff --git a/helix-monitor-client/src/main/java/org/apache/helix/monitoring/RiemannMonitoringClient.java b/helix-monitor-client/src/main/java/org/apache/helix/monitoring/RiemannMonitoringClient.java
index 20b0825..e6c8b2d 100644
--- a/helix-monitor-client/src/main/java/org/apache/helix/monitoring/RiemannMonitoringClient.java
+++ b/helix-monitor-client/src/main/java/org/apache/helix/monitoring/RiemannMonitoringClient.java
@@ -22,14 +22,19 @@ package org.apache.helix.monitoring;
 import java.io.IOException;
 import java.net.UnknownHostException;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 
-import org.I0Itec.zkclient.IZkDataListener;
-import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.ZNRecord;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
 import org.apache.helix.api.id.ClusterId;
-import org.apache.helix.model.Leader;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.spectator.RoutingTableProvider;
 import org.apache.log4j.Logger;
 
 import com.aphyr.riemann.client.AbstractRiemannClient;
@@ -40,113 +45,192 @@ import com.aphyr.riemann.client.UnsupportedJVMException;
 import com.google.common.collect.Lists;
 
 /**
- * A Riemann-based monitoring client
- * Thread safety note: connect and disconnect are serialized to ensure that there
- * is no attempt to connect or disconnect with an inconsistent state. The send routines are not
- * protected for performance reasons, and so a single send/flush may fail.
+ * A Riemann-based monitoring client Thread safety note: connect and disconnect are serialized to
+ * ensure that there is no attempt to connect or disconnect with an inconsistent state. The send
+ * routines are not protected for performance reasons, and so a single send/flush may fail.
  */
 public class RiemannMonitoringClient implements MonitoringClient {
   private static final Logger LOG = Logger.getLogger(RiemannMonitoringClient.class);
-  private String _host;
-  private int _port;
+  public static final String DEFAULT_MONITORING_SERVICE_NAME = "MonitoringService";
+
+  /**
+   * Contains information about a RiemannClient inside a MonitoringClient
+   */
+  class MonitoringClientInfo {
+    /**
+     * host/port of riemann server to which this client connects
+     */
+    String _host;
+    int _port;
+
+    /**
+     * riemann client
+     */
+    RiemannClient _client;
+
+    /**
+     * batch rieman client, null if batch is not enabled
+     */
+    RiemannBatchClient _batchClient;
+
+    /**
+     * list of periodic tasks scheduled on this riemann client
+     */
+    final List<ScheduledItem> _scheduledItems;
+
+    public MonitoringClientInfo() {
+      _host = null;
+      _port = -1;
+      _client = null;
+      _batchClient = null;
+      _scheduledItems = Lists.newArrayList();
+    }
+
+  }
+
   private int _batchSize;
-  private RiemannClient _client;
-  private RiemannBatchClient _batchClient;
-  private List<ScheduledItem> _scheduledItems;
-  private HelixDataAccessor _accessor;
-  private IZkDataListener _leaderListener;
+  private final ResourceId _monitoringServiceName;
+  private int _monitoringServicePartitionNum;
+
+  private final HelixManager _spectator;
+  private final RoutingTableProvider _routingTableProvider;
+  private final Map<ResourceId, MonitoringClientInfo> _clientMap;
 
   /**
    * Create a non-batched monitoring client
-   * @param clusterId the cluster to monitor
-   * @param accessor an accessor for the cluster
+   * @param zkAddr
+   * @param monitoringClusterId
    */
-  public RiemannMonitoringClient(ClusterId clusterId, HelixDataAccessor accessor) {
-    this(clusterId, accessor, 1);
+  public RiemannMonitoringClient(String zkAddr, ClusterId monitoringClusterId) {
+    this(zkAddr, monitoringClusterId, ResourceId.from(DEFAULT_MONITORING_SERVICE_NAME), 1);
   }
 
   /**
    * Create a monitoring client that supports batching
-   * @param clusterId the cluster to monitor
-   * @param accessor an accessor for the cluster
-   * @param batchSize the number of events in a batch
+   * @param clusterId
+   *          the cluster to monitor
+   * @param accessor
+   *          an accessor for the cluster
+   * @param batchSize
+   *          the number of events in a batch
+   * @throws Exception
    */
-  public RiemannMonitoringClient(ClusterId clusterId, HelixDataAccessor accessor, int batchSize) {
-    _host = null;
-    _port = -1;
+  public RiemannMonitoringClient(String zkAddr, ClusterId monitoringClusterId,
+      ResourceId monitoringServiceName, int batchSize) {
     _batchSize = batchSize > 0 ? batchSize : 1;
-    _client = null;
-    _batchClient = null;
-    _accessor = accessor;
-    _scheduledItems = Lists.newLinkedList();
-    _leaderListener = getLeaderListener();
+    _monitoringServiceName = monitoringServiceName;
+    _monitoringServicePartitionNum = 0;
+    _clientMap = new ConcurrentHashMap<ResourceId, RiemannMonitoringClient.MonitoringClientInfo>();
+
+    _spectator =
+        HelixManagerFactory.getZKHelixManager(monitoringClusterId.stringify(), null,
+            InstanceType.SPECTATOR, zkAddr);
+    _routingTableProvider = new RoutingTableProvider();
   }
 
   @Override
-  public void connect() {
+  public void connect() throws Exception {
     if (isConnected()) {
       LOG.error("Already connected to Riemann!");
       return;
     }
 
-    // watch for changes
-    changeLeaderSubscription(true);
+    // Connect spectator to the cluster being monitored
+    _spectator.connect();
+    _spectator.addExternalViewChangeListener(_routingTableProvider);
 
-    // do the connect asynchronously as a tcp establishment could take time
-    Leader leader = _accessor.getProperty(_accessor.keyBuilder().controllerLeader());
-    doConnectAsync(leader);
+    // Get partition number of monitoring service
+    HelixDataAccessor accessor = _spectator.getHelixDataAccessor();
+    IdealState idealState =
+        accessor.getProperty(accessor.keyBuilder().idealStates(_monitoringServiceName.stringify()));
+    _monitoringServicePartitionNum = idealState.getNumPartitions();
   }
 
   @Override
   public void disconnect() {
-    changeLeaderSubscription(false);
-    disconnectInternal();
+    // disconnect internal riemann clients
+    for (ResourceId resource : _clientMap.keySet()) {
+      disconnectInternal(resource);
+    }
+
+    _spectator.disconnect();
+    _monitoringServicePartitionNum = 0;
   }
 
   @Override
   public boolean isConnected() {
-    return _client != null && _client.isConnected();
+    return _spectator.isConnected();
   }
 
-  @Override
-  public boolean flush() {
+  /**
+   * Flush a riemann client for a resource
+   * @param resource
+   * @return
+   */
+  private boolean flush(ResourceId resource) {
     if (!isConnected()) {
       LOG.error("Tried to flush a Riemann client that is not connected!");
       return false;
     }
-    AbstractRiemannClient c = getClient(true);
+
+    AbstractRiemannClient c = getClient(resource, true);
+    if (c == null) {
+      LOG.warn("Fail to get riemann client for resource: " + resource);
+      return false;
+    }
+
     try {
       c.flush();
       return true;
     } catch (IOException e) {
-      LOG.error("Problem flushing the Riemann event queue!", e);
+      LOG.error("Problem flushing the Riemann event queue for resource: " + resource, e);
     }
     return false;
   }
 
   @Override
-  public boolean send(MonitoringEvent event, boolean batch) {
+  public boolean flush() {
+    boolean succeed = true;
+    for (ResourceId resource : _clientMap.keySet()) {
+      succeed = succeed && flush(resource);
+    }
+
+    return succeed;
+  }
+
+  @Override
+  public boolean send(ResourceId resource, MonitoringEvent event, boolean batch) {
     if (!isConnected()) {
       LOG.error("Riemann connection must be active in order to send an event!");
       return false;
     }
-    AbstractRiemannClient c = getClient(batch);
-    convertEvent(c, event).send();
+
+    if (!isConnected(resource)) {
+      connect(resource, null, event);
+    } else {
+      AbstractRiemannClient c = getClient(resource, batch);
+      convertEvent(c, event).send();
+    }
+
     return true;
   }
 
   @Override
-  public boolean sendAndFlush(MonitoringEvent event) {
-    boolean sendResult = send(event, true);
+  public boolean sendAndFlush(ResourceId resource, MonitoringEvent event) {
+
+    boolean sendResult = send(resource, event, true);
     if (sendResult) {
-      return flush();
+      return flush(resource);
     }
     return false;
   }
 
+  /**
+   * Batch should be enabled for either all or none of riemann clients
+   */
   @Override
   public boolean isBatchingEnabled() {
-    return _batchClient != null && _batchClient.isConnected();
+    return _batchSize > 1;
   }
 
   @Override
@@ -154,68 +238,121 @@ public class RiemannMonitoringClient implements MonitoringClient {
     return _batchSize;
   }
 
+  /**
+   * Check if a riemann client for given resource is connected
+   * @param resource
+   * @return true if riemann client is connected, false otherwise
+   */
+  private boolean isConnected(ResourceId resource) {
+    if (!isConnected()) {
+      return false;
+    }
+
+    MonitoringClientInfo clientInfo = _clientMap.get(resource);
+    return clientInfo != null && clientInfo._client != null && clientInfo._client.isConnected();
+  }
+
   @Override
-  public void every(long interval, long delay, TimeUnit unit, Runnable r) {
+  public synchronized void every(ResourceId resource, long interval, long delay, TimeUnit unit,
+      Runnable r) {
+    if (!isConnected()) {
+      LOG.error("Riemann client must be connected in order to send events!");
+      return;
+    }
+
     ScheduledItem scheduledItem = new ScheduledItem();
     scheduledItem.interval = interval;
     scheduledItem.delay = delay;
     scheduledItem.unit = unit;
     scheduledItem.r = r;
-    _scheduledItems.add(scheduledItem);
-    if (isConnected()) {
-      getClient().every(interval, delay, unit, r);
+
+    if (isConnected(resource)) {
+      MonitoringClientInfo clientInfo = _clientMap.get(resource);
+      clientInfo._scheduledItems.add(scheduledItem);
+      getClient(resource).every(interval, delay, unit, r);
+    } else {
+      connect(resource, scheduledItem, null);
+    }
+  }
+
+  /**
+   * Connect a riemann client to riemann server given a resource
+   * @param resource
+   * @param scheduledItem
+   * @param pendingEvent
+   */
+  private void connect(ResourceId resource, ScheduledItem scheduledItem,
+      MonitoringEvent pendingEvent) {
+    // Hash by resourceId
+    int partitionKey = resource.hashCode() % _monitoringServicePartitionNum;
+    List<InstanceConfig> instances =
+        _routingTableProvider.getInstances(_monitoringServiceName.stringify(),
+            _monitoringServiceName + "_" + partitionKey, "ONLINE");
+
+    if (instances.size() == 0) {
+      LOG.error("Riemann monitoring server for resource: " + resource + " at partitionKey: "
+          + partitionKey + " is not available");
+      return;
     }
+
+    InstanceConfig instanceConfig = instances.get(0);
+    String host = instanceConfig.getHostName();
+    int port = Integer.parseInt(instanceConfig.getPort());
+
+    // Do the connect asynchronously as a tcp establishment could take time
+    doConnectAsync(resource, host, port, scheduledItem, pendingEvent);
   }
 
   /**
-   * Get a raw, non-batched Riemann client.
-   * WARNING: do not cache this, as it may be disconnected without notice
+   * Get a raw, non-batched Riemann client. WARNING: do not cache this, as it may be disconnected
+   * without notice
    * @return RiemannClient
    */
-  private RiemannClient getClient() {
-    return _client;
+  private RiemannClient getClient(ResourceId resource) {
+    MonitoringClientInfo clientInfo = _clientMap.get(resource);
+    return clientInfo == null ? null : clientInfo._client;
   }
 
   /**
-   * Get a batched Riemann client (if batching is supported)
-   * WARNING: do not cache this, as it may be disconnected without notice
+   * Get a batched Riemann client (if batching is supported) WARNING: do not cache this, as it may
+   * be disconnected without notice
    * @return RiemannBatchClient
    */
-  private RiemannBatchClient getBatchClient() {
-    return _batchClient;
+  private RiemannBatchClient getBatchClient(ResourceId resource) {
+    MonitoringClientInfo clientInfo = _clientMap.get(resource);
+    return clientInfo == null ? null : clientInfo._batchClient;
   }
 
   /**
-   * Get a Riemann client
-   * WARNING: do not cache this, as it may be disconnected without notice
-   * @param batch true if the client is preferred to support batching, false otherwise
+   * Get a Riemann client WARNING: do not cache this, as it may be disconnected without notice
+   * @param batch
+   *          true if the client is preferred to support batching, false otherwise
    * @return AbstractRiemannClient
    */
-  private AbstractRiemannClient getClient(boolean batch) {
+  private AbstractRiemannClient getClient(ResourceId resource, boolean batch) {
     if (batch && isBatchingEnabled()) {
-      return getBatchClient();
+      return getBatchClient(resource);
     } else {
-      return getClient();
+      return getClient(resource);
     }
   }
 
   /**
    * Based on the contents of the leader node, connect to a Riemann server
-   * @param leader node containing host/port
+   * @param leader
+   *          node containing host/port
    */
-  private void doConnectAsync(final Leader leader) {
+  private void doConnectAsync(final ResourceId resource, final String host, final int port,
+      final ScheduledItem scheduledItem, final MonitoringEvent pendingEvent) {
     new Thread() {
       @Override
       public void run() {
         synchronized (RiemannMonitoringClient.this) {
-          // only connect if the leader is available; otherwise it will be picked up by the callback
-          if (leader != null) {
-            _host = leader.getMonitoringHost();
-            _port = leader.getMonitoringPort();
-          }
-          // connect if there's a valid host and port
-          if (_host != null && _port != -1) {
-            connectInternal(_host, _port);
+          if (resource != null && host != null && port != -1) {
+            connectInternal(resource, host, port, scheduledItem, pendingEvent);
+          } else {
+            LOG.error("Fail to doConnectAsync becaue of invalid arguments, resource: " + resource
+                + ", host: " + host + ", port: " + port);
           }
         }
       }
@@ -224,24 +361,52 @@ public class RiemannMonitoringClient implements MonitoringClient {
 
   /**
    * Establishment of a connection to a Riemann server
-   * @param host monitoring server hostname
-   * @param port monitoring server port
+   * @param resource
+   * @param host
+   * @param port
+   * @param scheduledItem
+   * @param pendingEvent
    */
-  private synchronized void connectInternal(String host, int port) {
-    disconnectInternal();
-    try {
-      _client = RiemannClient.tcp(host, port);
-      _client.connect();
-      // we might have to reschedule tasks
-      for (ScheduledItem item : _scheduledItems) {
-        _client.every(item.interval, item.delay, item.unit, item.r);
+  private synchronized void connectInternal(ResourceId resource, String host, int port,
+      ScheduledItem scheduledItem, MonitoringEvent pendingEvent) {
+    MonitoringClientInfo clientInfo = _clientMap.get(resource);
+    if (clientInfo != null && clientInfo._host.equals(host) && clientInfo._port == port
+        && clientInfo._client != null && clientInfo._client.isConnected()) {
+      LOG.info("Riemann client for resource: " + resource + " already connected on " + host + ":"
+          + port);
+
+      // We might have to reschedule tasks
+      if (scheduledItem != null) {
+        clientInfo._scheduledItems.add(scheduledItem);
+        clientInfo._client.every(scheduledItem.interval, scheduledItem.delay, scheduledItem.unit,
+            scheduledItem.r);
       }
+
+      // Sending over pending event
+      if (pendingEvent != null) {
+        convertEvent(clientInfo._client, pendingEvent).send();
+      }
+
+      return;
+    }
+
+    // Disconnect from previous riemann server
+    disconnectInternal(resource);
+
+    // Connect to new riemann server
+    RiemannClient client = null;
+    RiemannBatchClient batchClient = null;
+    try {
+      client = RiemannClient.tcp(host, port);
+      client.connect();
     } catch (IOException e) {
       LOG.error("Error establishing a connection!", e);
+
     }
-    if (_client != null && getBatchSize() > 1) {
+
+    if (client != null && getBatchSize() > 1) {
       try {
-        _batchClient = new RiemannBatchClient(_batchSize, _client);
+        batchClient = new RiemannBatchClient(_batchSize, client);
       } catch (UnknownHostException e) {
         _batchSize = 1;
         LOG.error("Could not resolve host", e);
@@ -250,60 +415,61 @@ public class RiemannMonitoringClient implements MonitoringClient {
         LOG.warn("Batching not enabled because of incompatible JVM", e);
       }
     }
-  }
 
-  /**
-   * Teardown of a connection to a Riemann server
-   */
-  private synchronized void disconnectInternal() {
-    try {
-      if (_batchClient != null && _batchClient.isConnected()) {
-        _batchClient.disconnect();
-      } else if (_client != null && _client.isConnected()) {
-        _client.disconnect();
-      }
-    } catch (IOException e) {
-      LOG.error("Disconnection error", e);
+    if (clientInfo == null) {
+      clientInfo = new MonitoringClientInfo();
     }
-    _batchClient = null;
-    _client = null;
-  }
 
-  /**
-   * Change the subscription status to the Leader node
-   * @param subscribe true to subscribe, false to unsubscribe
-   */
-  private void changeLeaderSubscription(boolean subscribe) {
-    String leaderPath = _accessor.keyBuilder().controllerLeader().getPath();
-    BaseDataAccessor<ZNRecord> baseAccessor = _accessor.getBaseDataAccessor();
-    if (subscribe) {
-      baseAccessor.subscribeDataChanges(leaderPath, _leaderListener);
-    } else {
-      baseAccessor.unsubscribeDataChanges(leaderPath, _leaderListener);
+    clientInfo._host = host;
+    clientInfo._port = port;
+    clientInfo._client = client;
+    clientInfo._batchClient = batchClient;
+    if (scheduledItem != null) {
+      clientInfo._scheduledItems.add(scheduledItem);
+    }
+    _clientMap.put(resource, clientInfo);
+
+    // We might have to reschedule tasks
+    for (ScheduledItem item : clientInfo._scheduledItems) {
+      client.every(item.interval, item.delay, item.unit, item.r);
+    }
+
+    // Send over pending event
+    if (pendingEvent != null) {
+      convertEvent(client, pendingEvent).send();
     }
   }
 
   /**
-   * Get callbacks for when the leader changes
-   * @return implemented IZkDataListener
+   * Teardown of a connection to a Riemann server
    */
-  private IZkDataListener getLeaderListener() {
-    return new IZkDataListener() {
-      @Override
-      public void handleDataChange(String dataPath, Object data) throws Exception {
-        Leader leader = new Leader((ZNRecord) data);
-        doConnectAsync(leader);
-      }
+  private synchronized void disconnectInternal(ResourceId resource) {
+    MonitoringClientInfo clientInfo = _clientMap.get(resource);
+    if (clientInfo == null) {
+      return;
+    }
 
-      @Override
-      public void handleDataDeleted(String dataPath) throws Exception {
-        disconnectInternal();
+    RiemannBatchClient batchClient = clientInfo._batchClient;
+    RiemannClient client = clientInfo._client;
+
+    clientInfo._batchClient = null;
+    clientInfo._client = null;
+
+    try {
+      if (batchClient != null && batchClient.isConnected()) {
+        batchClient.scheduler().shutdown();
+        batchClient.disconnect();
+      } else if (client != null && client.isConnected()) {
+        client.scheduler().shutdown();
+        client.disconnect();
       }
-    };
+    } catch (IOException e) {
+      LOG.error("Disconnection error", e);
+    }
   }
 
   /**
-   * Change a helix event into a Riemann event
+   * Change a helix monitoring event into a Riemann event
    * @param c Riemann client
    * @param helixEvent helix event
    * @return Riemann EventDSL

http://git-wip-us.apache.org/repos/asf/helix/blob/3505bead/helix-monitor-server/pom.xml
----------------------------------------------------------------------
diff --git a/helix-monitor-server/pom.xml b/helix-monitor-server/pom.xml
index 041a390..2cbddfd 100644
--- a/helix-monitor-server/pom.xml
+++ b/helix-monitor-server/pom.xml
@@ -54,6 +54,11 @@ under the License.
       <version>0.2.4</version>
     </dependency>
     <dependency>
+      <groupId>org.eclipse.jetty.aggregate</groupId>
+      <artifactId>jetty-all-server</artifactId>
+      <version>8.1.14.v20131031</version>
+    </dependency>
+    <dependency>
       <groupId>factual</groupId>
       <artifactId>clj-helix</artifactId>
       <version>0.1.0</version>

http://git-wip-us.apache.org/repos/asf/helix/blob/3505bead/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAgent.java
----------------------------------------------------------------------
diff --git a/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAgent.java b/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAgent.java
new file mode 100644
index 0000000..474d838
--- /dev/null
+++ b/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAgent.java
@@ -0,0 +1,123 @@
+package org.apache.helix.monitoring;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.log4j.Logger;
+
+import com.aphyr.riemann.client.RiemannClient;
+
+public class RiemannAgent {
+  private static final Logger LOG = Logger.getLogger(RiemannAgent.class);
+
+  static final String STATEMODEL_NAME = "OnlineOffline";
+
+  final Random _random;
+  final String _zkAddr;
+  final String _clusterName;
+  final String _instanceName;
+  final int _riemannPort;
+  final HelixManager _participant;
+  final RiemannClient _client;
+
+  RiemannAgent(String zkAddr, String clusterName, int riemannPort) throws IOException {
+    _random = new Random();
+    _zkAddr = zkAddr;
+    _clusterName = clusterName;
+    _instanceName =
+        String.format("%s_%d", InetAddress.getLocalHost().getCanonicalHostName(), riemannPort);
+    _riemannPort = riemannPort;
+    _participant =
+        HelixManagerFactory.getZKHelixManager(clusterName, _instanceName, InstanceType.PARTICIPANT,
+            zkAddr);
+    _client = RiemannClient.tcp("localhost", riemannPort);
+  }
+
+  public void start() throws Exception {
+    LOG.info("Starting RiemannAgent. zk: " + _zkAddr + ", cluster: " + _clusterName
+        + ", instance: " + _instanceName + ", riemannPort: " + _riemannPort);
+
+    // Wait until riemann port is connected
+    int timeout = 30 * 1000;
+    long startT = System.currentTimeMillis();
+    while ((System.currentTimeMillis() - startT) < timeout) {
+      try {
+        _client.connect();
+        break;
+      } catch (IOException e) {
+        int sleep = _random.nextInt(3000) + 3000;
+        LOG.info("Wait " + sleep + "ms for riemann server to come up");
+        TimeUnit.MILLISECONDS.sleep(sleep);
+      }
+    }
+
+    if (!_client.isConnected()) {
+      String err =
+          "Fail to connect to reimann server on localhost:" + _riemannPort + " in " + timeout
+              + "ms";
+      LOG.error(err);
+      throw new RuntimeException(err);
+    }
+    LOG.info("RiemannAgent connected to local riemann server on port: " + _riemannPort);
+
+    // Start helix participant
+    _participant.getStateMachineEngine().registerStateModelFactory(STATEMODEL_NAME,
+        new RiemannAgentStateModelFactory());
+    _participant.connect();
+
+    // Monitor riemann server
+    _client.every(10, 0, TimeUnit.SECONDS, new Runnable() {
+
+      @Override
+      public void run() {
+        try {
+          // send heartbeat metrics
+          _client.event().service("heartbeat").state("running").ttl(20).sendWithAck();
+        } catch (Exception e) {
+          LOG.error("Exception in send heatbeat to local riemann server, shutdown RiemannAgent: "
+              + _instanceName, e);
+          shutdown();
+        }
+      }
+    });
+
+  }
+
+  public void shutdown() {
+    LOG.info("Shutting down RiemannAgent. zk: " + _zkAddr + ", cluster: " + _clusterName
+        + ", instance: " + _instanceName + ", riemannPort: " + _riemannPort);
+
+    try {
+      _client.scheduler().shutdown();
+      _client.disconnect();
+    } catch (IOException e) {
+      LOG.error("Exception in disconnect riemann client", e);
+    }
+
+    _participant.disconnect();
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/3505bead/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAgentStateModel.java
----------------------------------------------------------------------
diff --git a/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAgentStateModel.java b/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAgentStateModel.java
new file mode 100644
index 0000000..2e32cef
--- /dev/null
+++ b/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAgentStateModel.java
@@ -0,0 +1,54 @@
+package org.apache.helix.monitoring;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.log4j.Logger;
+
+@StateModelInfo(initialState = "OFFLINE", states = {
+    "DROPPED", "OFFLINE", "ONLINE"
+})
+public class RiemannAgentStateModel extends StateModel {
+  private static final Logger LOG = Logger.getLogger(RiemannAgentStateModel.class);
+
+  void logTransition(Message message) {
+    String toState = message.getToState();
+    String fromState = message.getFromState();
+    String resourceName = message.getResourceName();
+    String partittionName = message.getPartitionName();
+
+    LOG.info("Become " + toState + " from " + fromState + " for resource: " + resourceName
+        + ", partition: " + partittionName);
+  }
+
+  @Transition(to = "ONLINE", from = "OFFLINE")
+  public void onBecomeOnlineFromOffline(Message message, NotificationContext context) {
+    logTransition(message);
+  }
+
+  @Transition(to = "OFFLINE", from = "ONLINE")
+  public void onBecomeOfflineFromOnline(Message message, NotificationContext context) {
+    logTransition(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/3505bead/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAgentStateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAgentStateModelFactory.java b/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAgentStateModelFactory.java
new file mode 100644
index 0000000..a5865ad
--- /dev/null
+++ b/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAgentStateModelFactory.java
@@ -0,0 +1,29 @@
+package org.apache.helix.monitoring;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.participant.statemachine.StateModelFactory;
+
+public class RiemannAgentStateModelFactory extends StateModelFactory<RiemannAgentStateModel> {
+  @Override
+  public RiemannAgentStateModel createNewStateModel(String partitionName) {
+    return new RiemannAgentStateModel();
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/3505bead/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAlertProxy.java
----------------------------------------------------------------------
diff --git a/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAlertProxy.java b/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAlertProxy.java
new file mode 100644
index 0000000..5fe8ebe
--- /dev/null
+++ b/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAlertProxy.java
@@ -0,0 +1,111 @@
+package org.apache.helix.monitoring;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.StringWriter;
+import java.nio.charset.Charset;
+import java.util.UUID;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.alert.AlertName;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.log4j.Logger;
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.handler.AbstractHandler;
+
+/**
+ * Accept alerts from local riemann server and forward it to helix-controller
+ */
+public class RiemannAlertProxy {
+  private static final Logger LOG = Logger.getLogger(RiemannAlertProxy.class);
+
+  class RiemannAlertProxyHandler extends AbstractHandler {
+    @Override
+    public void handle(String target, Request baseRequest, HttpServletRequest request,
+        HttpServletResponse response) throws IOException, ServletException {
+      // Read content-body
+      InputStream inputStream = request.getInputStream();
+      StringWriter writer = new StringWriter();
+      IOUtils.copy(inputStream, writer, Charset.defaultCharset().toString());
+      String alertNameStr = writer.toString();
+      LOG.info("Handling alert: " + alertNameStr);
+
+      // Send alert message to the controller of cluster being monitored
+      try {
+        AlertName alertName = AlertName.from(alertNameStr);
+        String clusterName = alertName.getScope().getClusterId().stringify();
+        HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseAccessor);
+        PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+        Message message = new Message(MessageType.ALERT, UUID.randomUUID().toString());
+        message.setAttribute(Message.Attributes.ALERT_NAME, alertNameStr);
+        message.setTgtSessionId("*");
+        message.setTgtName("controller");
+        accessor.setProperty(keyBuilder.controllerMessage(message.getId()), message);
+      } catch (Exception e) {
+        LOG.error("Fail to send alert to cluster being monitored: " + alertNameStr, e);
+      }
+
+      // return ok
+      response.setStatus(HttpServletResponse.SC_OK);
+      baseRequest.setHandled(true);
+    }
+  }
+
+  final int _proxyPort;
+  final Server _server;
+  final BaseDataAccessor<ZNRecord> _baseAccessor;
+  final AbstractHandler _handler;
+
+  public RiemannAlertProxy(int proxyPort, BaseDataAccessor<ZNRecord> baseAccessor) {
+    _proxyPort = proxyPort;
+    _server = new Server(proxyPort);
+    _baseAccessor = baseAccessor;
+    _handler = new RiemannAlertProxyHandler();
+  }
+
+  public void start() throws Exception {
+    LOG.info("Starting RiemannAlertProxy on port: " + _proxyPort);
+    _server.setHandler(_handler);
+    _server.start();
+
+  }
+
+  public void shutdown() {
+    try {
+      LOG.info("Stopping RiemannAlertProxy on port: " + _proxyPort);
+      _server.stop();
+    } catch (Exception e) {
+      LOG.error("Fail to stop RiemannAlertProxy", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/3505bead/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannConfigs.java
----------------------------------------------------------------------
diff --git a/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannConfigs.java b/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannConfigs.java
new file mode 100644
index 0000000..193b763
--- /dev/null
+++ b/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannConfigs.java
@@ -0,0 +1,116 @@
+package org.apache.helix.monitoring;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.PrintWriter;
+import java.util.List;
+
+import org.apache.helix.model.MonitoringConfig;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Riemann configs
+ */
+public class RiemannConfigs {
+  private static final Logger LOG = Logger.getLogger(RiemannConfigs.class);
+  private static final String DEFAULT_CONFIG_DIR = "riemannconfigs";
+  public static final String DEFAULT_RIEMANN_CONFIG = "riemann.config";
+
+  private final String _configDir;
+  private final List<MonitoringConfig> _configs;
+
+  RiemannConfigs(String configDir, List<MonitoringConfig> configs) {
+    _configDir = configDir;
+    _configs = configs;
+  }
+
+  /**
+   * persist configs to riemann config dir
+   */
+  public void persistConfigs() {
+    // create the directory
+    File dir = new File(_configDir);
+    if (!dir.exists()) {
+      dir.mkdir();
+    }
+
+    for (MonitoringConfig config : _configs) {
+      String configData = config.getConfig();
+      String fileName = _configDir + "/" + config.getId();
+      try {
+        PrintWriter writer = new PrintWriter(fileName);
+        writer.println(configData);
+        writer.close();
+
+        // make sure this is cleaned up eventually
+        File file = new File(fileName);
+        file.deleteOnExit();
+      } catch (FileNotFoundException e) {
+        LOG.error("Could not write " + config.getId(), e);
+      }
+    }
+  }
+
+  public String getConfigDir() {
+    return _configDir;
+  }
+
+  public static class Builder {
+    private final List<MonitoringConfig> _configs;
+    private final String _configDir;
+
+    /**
+     * By default, configs will be placed in "{systemTmpDir}/riemannconfigs"
+     */
+    public Builder() {
+      this(System.getProperty("java.io.tmpdir") + "/" + DEFAULT_CONFIG_DIR);
+    }
+
+    public Builder(String configDir) {
+      _configDir = configDir;
+      _configs = Lists.newArrayList();
+    }
+
+    public Builder addConfig(MonitoringConfig monitoringConfig) {
+      _configs.add(monitoringConfig);
+      return this;
+    }
+
+    public Builder addConfigs(List<MonitoringConfig> monitoringConfigs) {
+      _configs.addAll(monitoringConfigs);
+      return this;
+    }
+
+    public RiemannConfigs build() {
+      // Check default riemann config exists
+      for (MonitoringConfig config : _configs) {
+        if (config.getId().equals(DEFAULT_RIEMANN_CONFIG)) {
+          return new RiemannConfigs(_configDir, _configs);
+        }
+      }
+      throw new IllegalArgumentException("Missing default riemann config: "
+          + DEFAULT_RIEMANN_CONFIG);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/3505bead/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannMonitoringServer.java
----------------------------------------------------------------------
diff --git a/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannMonitoringServer.java b/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannMonitoringServer.java
index 36719aa..d4f11a5 100644
--- a/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannMonitoringServer.java
+++ b/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannMonitoringServer.java
@@ -19,25 +19,6 @@ package org.apache.helix.monitoring;
  * under the License.
  */
 
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.net.InetAddress;
-import java.net.URL;
-import java.net.UnknownHostException;
-import java.util.List;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.helix.BaseDataAccessor;
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.model.MonitoringConfig;
-import org.apache.helix.util.ZKClientPool;
 import org.apache.log4j.Logger;
 
 import clojure.lang.RT;
@@ -48,62 +29,29 @@ import clojure.lang.Symbol;
  */
 public class RiemannMonitoringServer implements MonitoringServer {
   private static final Logger LOG = Logger.getLogger(RiemannMonitoringServer.class);
-  private static final String DEFAULT_CONFIG_DIR = "/tmp/riemannconfigs";
-  private final String _host;
-  private boolean _isStarted;
-  private boolean _configsAdded;
-  private String _configDir;
 
-  /**
-   * Create a monitoring server. Configs will be placed in "/tmp/riemannconfigs".
-   */
-  public RiemannMonitoringServer(String host) {
-    this(DEFAULT_CONFIG_DIR, host);
-  }
+  private volatile boolean _isStarted;
+  private final RiemannConfigs _config;
 
   /**
-   * Create a monitoring server.
-   * @param configDir Directory to use for storing configs
-   * @param host Hostname where the server lives (i.e. the hostname of this machine)
+   * Create a monitoring server
+   * @param config
    */
-  public RiemannMonitoringServer(String configDir, String host) {
-    _configDir = configDir;
+  public RiemannMonitoringServer(RiemannConfigs config) {
+    LOG.info("Construct RiemannMonitoringServer with configDir: " + config.getConfigDir());
+    _config = config;
+    config.persistConfigs();
     _isStarted = false;
-    _configsAdded = false;
-    _host = host;
   }
 
   @Override
   public synchronized void start() {
-    // get the config file
-    URL url = Thread.currentThread().getContextClassLoader().getResource("riemann.config");
-    if (url == null) {
-      LOG.error("Riemann config file does not exist!");
-      return;
-    }
-    String path = url.getPath();
-    LOG.info("Riemann config file is at: " + path);
-
-    // register config files
-    if (_configsAdded) {
-      try {
-        File srcFile = new File(path);
-        File file = File.createTempFile("riemann", "config");
-        file.deleteOnExit();
-        FileUtils.copyFile(srcFile, file);
-        PrintWriter out = new PrintWriter(new BufferedWriter(new FileWriter(file, true)));
-        out.println("\n(include \"" + _configDir + "\")\n");
-        out.close();
-        path = file.getAbsolutePath();
-      } catch (IOException e) {
-        LOG.error("Could not add configs!");
-      }
-    }
+    LOG.info("Starting Riemann server with configDir: " + _config.getConfigDir());
 
     // start Riemann
     RT.var("clojure.core", "require").invoke(Symbol.intern("riemann.bin"));
-    RT.var("clojure.core", "require").invoke(Symbol.intern("riemann.config"));
-    RT.var("riemann.bin", "-main").invoke(path);
+    RT.var("clojure.core", "require").invoke(Symbol.intern(RiemannConfigs.DEFAULT_RIEMANN_CONFIG));
+    RT.var("riemann.bin", "-main").invoke(_config.getConfigDir());
     _isStarted = true;
   }
 
@@ -113,70 +61,13 @@ public class RiemannMonitoringServer implements MonitoringServer {
       LOG.error("Tried to stop Riemann when not started!");
       return;
     }
+    LOG.info("Stopping Riemann server");
     RT.var("riemann.config", "stop!").invoke();
     _isStarted = false;
   }
 
   @Override
-  public synchronized void addConfigs(HelixDataAccessor accessor) {
-    // create the directory
-    File dir = new File(_configDir);
-    if (!dir.exists()) {
-      dir.mkdir();
-    }
-
-    // persist ZK-based configs
-    if (accessor != null) {
-      List<MonitoringConfig> configs =
-          accessor.getChildValues(accessor.keyBuilder().monitoringConfigs());
-      for (MonitoringConfig config : configs) {
-        String configData = config.getConfig();
-        String fileName = _configDir + "/" + config.getId();
-        try {
-          PrintWriter writer = new PrintWriter(fileName);
-          writer.println(configData);
-          writer.close();
-
-          // make sure this is cleaned up eventually
-          File file = new File(fileName);
-          file.deleteOnExit();
-        } catch (FileNotFoundException e) {
-          LOG.error("Could not write " + config.getId(), e);
-        }
-      }
-    }
-
-    // restart if started
-    if (_isStarted) {
-      stop();
-      start();
-    }
-    _configsAdded = true;
-  }
-
-  @Override
   public boolean isStarted() {
     return _isStarted;
   }
-
-  @Override
-  public String getHost() {
-    return _host;
-  }
-
-  @Override
-  public int getPort() {
-    return 5555;
-  }
-
-  public static void main(String[] args) throws InterruptedException, UnknownHostException {
-    RiemannMonitoringServer service =
-        new RiemannMonitoringServer(InetAddress.getLocalHost().getHostName());
-    BaseDataAccessor<ZNRecord> baseAccessor =
-        new ZkBaseDataAccessor<ZNRecord>(ZKClientPool.getZkClient("eat1-app87.corp:2181"));
-    HelixDataAccessor accessor = new ZKHelixDataAccessor("perf-test-cluster", baseAccessor);
-    service.addConfigs(accessor);
-    service.start();
-    Thread.currentThread().join();
-  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/3505bead/helix-monitor-server/src/test/java/org/apache/helix/monitoring/BasicMonitoringTest.java
----------------------------------------------------------------------
diff --git a/helix-monitor-server/src/test/java/org/apache/helix/monitoring/BasicMonitoringTest.java b/helix-monitor-server/src/test/java/org/apache/helix/monitoring/BasicMonitoringTest.java
deleted file mode 100644
index 6680f00..0000000
--- a/helix-monitor-server/src/test/java/org/apache/helix/monitoring/BasicMonitoringTest.java
+++ /dev/null
@@ -1,93 +0,0 @@
-package org.apache.helix.monitoring;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.net.InetAddress;
-import java.util.Date;
-
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.TestHelper;
-import org.apache.helix.ZkUnitTestBase;
-import org.apache.helix.integration.manager.ClusterControllerManager;
-import org.apache.helix.model.IdealState.RebalanceMode;
-import org.apache.helix.model.Leader;
-import org.junit.Assert;
-import org.testng.annotations.Test;
-
-public class BasicMonitoringTest extends ZkUnitTestBase {
-  @Test
-  public void testStartAndStop() throws Exception {
-    final int NUM_PARTICIPANTS = 10;
-    final int NUM_PARTITIONS = 4;
-    final int NUM_REPLICAS = 2;
-
-    String className = TestHelper.getTestClassName();
-    String methodName = TestHelper.getTestMethodName();
-    String clusterName = className + "_" + methodName;
-    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
-
-    // Set up cluster
-    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
-        "localhost", // participant name prefix
-        "TestDB", // resource name prefix
-        1, // resources
-        NUM_PARTITIONS, // partitions per resource
-        NUM_PARTICIPANTS, // number of nodes
-        NUM_REPLICAS, // replicas
-        "MasterSlave", // pick a built-in state model
-        RebalanceMode.FULL_AUTO, // let Helix handle rebalancing
-        true); // do rebalance
-
-    // start controller
-    ClusterControllerManager controller =
-        new ClusterControllerManager(ZK_ADDR, clusterName, "controller");
-    controller.registerMonitoringServer(new RiemannMonitoringServer(InetAddress.getLocalHost()
-        .getHostName()));
-    controller.syncStart();
-
-    // make sure the leader has registered and is showing the server port
-    HelixDataAccessor accessor = controller.getHelixDataAccessor();
-    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
-    Leader leader = accessor.getProperty(keyBuilder.controllerLeader());
-    Assert.assertNotNull(leader);
-    Assert.assertNotEquals(leader.getMonitoringPort(), -1);
-    Assert.assertNotNull(leader.getMonitoringHost());
-
-    // stop controller
-    controller.syncStop();
-
-    // start controller without monitoring
-    ClusterControllerManager rawController =
-        new ClusterControllerManager(ZK_ADDR, clusterName, "controller");
-    rawController.syncStart();
-
-    // make sure the leader has registered, but has no monitoring port
-    accessor = rawController.getHelixDataAccessor();
-    keyBuilder = accessor.keyBuilder();
-    leader = accessor.getProperty(keyBuilder.controllerLeader());
-    Assert.assertNotNull(leader);
-    Assert.assertEquals(leader.getMonitoringPort(), -1);
-    Assert.assertNull(leader.getMonitoringHost());
-
-    // stop controller
-    rawController.syncStop();
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/3505bead/helix-monitor-server/src/test/java/org/apache/helix/monitoring/IntegrationTest.java
----------------------------------------------------------------------
diff --git a/helix-monitor-server/src/test/java/org/apache/helix/monitoring/IntegrationTest.java b/helix-monitor-server/src/test/java/org/apache/helix/monitoring/IntegrationTest.java
new file mode 100644
index 0000000..9d28dc3
--- /dev/null
+++ b/helix-monitor-server/src/test/java/org/apache/helix/monitoring/IntegrationTest.java
@@ -0,0 +1,206 @@
+package org.apache.helix.monitoring;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+import java.util.List;
+
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.MonitoringTestHelper;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.controller.alert.AlertAction;
+import org.apache.helix.controller.alert.AlertName;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.manager.zk.DefaultAlertMsgHandlerFactory;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.model.AlertConfig;
+import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.MonitoringConfig;
+import org.apache.helix.model.builder.HelixConfigScopeBuilder;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Lists;
+
+public class IntegrationTest extends ZkUnitTestBase {
+  @Test
+  public void testBasic() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    // Set up monitoring cluster
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        "MonitoringService", // resource name prefix
+        1, // resources
+        8, // partitions per resource
+        0, // number of nodes
+        1, // replicas
+        "OnlineOffline", // pick a built-in state model
+        RebalanceMode.FULL_AUTO, // let Helix handle rebalancing
+        true); // do rebalance
+
+    // Enable auto-join
+    HelixConfigScope scope =
+        new HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER).forCluster(clusterName).build();
+    ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient);
+    configAccessor.set(scope, ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN, "" + true);
+
+    // Start controller
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller");
+    controller.syncStart();
+
+    // Start helix proxy
+    int proxyPort = MonitoringTestHelper.availableTcpPort();
+    final BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+    final HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
+    final PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    RiemannAlertProxy proxy = new RiemannAlertProxy(proxyPort, baseAccessor);
+    proxy.start();
+
+    // Start monitoring server
+    int riemannPort = MonitoringTestHelper.availableTcpPort();
+    MonitoringConfig riemannConfig = new MonitoringConfig(RiemannConfigs.DEFAULT_RIEMANN_CONFIG);
+    riemannConfig.setConfig(MonitoringTestHelper.getRiemannConfigString(riemannPort));
+
+    MonitoringConfig latencyCheckConfig = new MonitoringConfig("check_latency_config.clj");
+    latencyCheckConfig.setConfig(MonitoringTestHelper.getLatencyCheckConfigString(proxyPort));
+
+    // Set monitoring config on zk
+    accessor.setProperty(keyBuilder.monitoringConfig(RiemannConfigs.DEFAULT_RIEMANN_CONFIG),
+        riemannConfig);
+    accessor.setProperty(keyBuilder.monitoringConfig("check_latency_config.clj"),
+        latencyCheckConfig);
+
+    RiemannConfigs.Builder riemannConfigBuilder =
+        new RiemannConfigs.Builder().addConfigs(Lists.newArrayList(riemannConfig,
+            latencyCheckConfig));
+    RiemannMonitoringServer server = new RiemannMonitoringServer(riemannConfigBuilder.build());
+    server.start();
+
+    // Start Riemann agent
+    RiemannAgent agent = new RiemannAgent(ZK_ADDR, clusterName, riemannPort);
+    agent.start();
+
+    // Check live-instance
+    List<String> liveInstances = accessor.getChildNames(keyBuilder.liveInstances());
+    Assert.assertNotNull(liveInstances);
+    Assert.assertEquals(liveInstances.size(), 1);
+
+    // Check external-view
+    boolean result =
+        ClusterStateVerifier
+            .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+                clusterName));
+    Assert.assertTrue(result);
+
+    // Setup mock storage cluster to be monitored
+    String storageClusterName = clusterName + "_storage";
+    TestHelper.setupCluster(storageClusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        "TestDB", // resource name prefix
+        1, // resources
+        8, // partitions per resource
+        2, // number of nodes
+        1, // replicas
+        "MasterSlave", // pick a built-in state model
+        RebalanceMode.FULL_AUTO, // let Helix handle rebalancing
+        true); // do rebalance
+
+    // Add alert config
+    AlertConfig alertConfig = new AlertConfig("default");
+    AlertName alertName =
+        new AlertName.Builder().cluster(ClusterId.from(storageClusterName)).metric("latency95")
+            .largerThan("1000").build();
+    AlertAction alertAction =
+        new AlertAction.Builder().cmd("enableInstance").args("{cluster}", "{node}", "false")
+            .build();
+    alertConfig.putConfig(alertName, alertAction);
+    final HelixDataAccessor storageAccessor =
+        new ZKHelixDataAccessor(storageClusterName, baseAccessor);
+    final PropertyKey.Builder storageKeyBuilder = storageAccessor.keyBuilder();
+    storageAccessor.setProperty(storageKeyBuilder.alertConfig("default"), alertConfig);
+
+    // Start another controller for mock storage cluster
+    ClusterControllerManager storageController =
+        new ClusterControllerManager(ZK_ADDR, storageClusterName, "controller");
+    MessageHandlerFactory fty = new DefaultAlertMsgHandlerFactory();
+    storageController.getMessagingService()
+        .registerMessageHandlerFactory(fty.getMessageType(), fty);
+    storageController.syncStart();
+
+    // Check localhost_12918 is enabled
+    InstanceConfig instanceConfig =
+        storageAccessor.getProperty(storageKeyBuilder.instanceConfig("localhost_12918"));
+    Assert.assertTrue(instanceConfig.getInstanceEnabled());
+
+    // Connect monitoring client
+    final RiemannMonitoringClient rclient =
+        new RiemannMonitoringClient(ZK_ADDR, ClusterId.from(clusterName),
+            ResourceId.from("MonitoringService0"), 1);
+    rclient.connect();
+
+    MonitoringEvent event =
+        new MonitoringEvent().participant(ParticipantId.from("localhost_12918"))
+            .name("LatencyReport").attribute("latency95", "" + 2)
+            .attribute("cluster", storageClusterName);
+    rclient.send(ResourceId.from("TestDB0"), event, false);
+
+    // Check localhost_12918 is disabled
+    result = TestHelper.verify(new TestHelper.Verifier() {
+
+      @Override
+      public boolean verify() throws Exception {
+        InstanceConfig instanceConfig =
+            storageAccessor.getProperty(storageKeyBuilder.instanceConfig("localhost_12918"));
+        return instanceConfig.getInstanceEnabled() == false;
+      }
+    }, 10 * 1000);
+    Assert.assertTrue(result, "localhost_12918 should be disabled");
+
+    // Cleanup
+    rclient.disconnect();
+    storageController.syncStop();
+    controller.syncStop();
+
+    agent.shutdown();
+    server.stop();
+    proxy.shutdown();
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+}