You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2014/03/19 03:06:40 UTC

[2/2] git commit: [HELIX-319] refactor MonitoringClient to accommodate distributed monitoring server

[HELIX-319] refactor MonitoringClient to accommodate distributed monitoring server


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/c0b1780d
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/c0b1780d
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/c0b1780d

Branch: refs/heads/helix-monitoring
Commit: c0b1780dc472ac3234dbbb6488015211c3e66ed2
Parents: db4c10a
Author: zzhang <zz...@uci.edu>
Authored: Tue Mar 18 19:06:19 2014 -0700
Committer: zzhang <zz...@uci.edu>
Committed: Tue Mar 18 19:06:19 2014 -0700

----------------------------------------------------------------------
 .../helix/monitoring/MonitoringClient.java      |  12 +-
 .../helix/monitoring/MonitoringEvent.java       |  96 +++-
 .../org/apache/helix/MonitoringTestHelper.java  | 114 ----
 .../monitoring/RiemannMonitoringClient.java     | 555 -------------------
 .../helix/monitoring/riemann/ClientUtil.java    |  69 +++
 .../monitoring/riemann/RawRiemannClient.java    | 229 ++++++++
 .../riemann/RiemannClientWrapper.java           | 234 ++++++++
 helix-monitor-server/pom.xml                    |   5 -
 .../apache/helix/monitoring/RiemannAgent.java   | 137 -----
 .../monitoring/RiemannAgentStateModel.java      |  54 --
 .../RiemannAgentStateModelFactory.java          |  29 -
 .../helix/monitoring/RiemannAlertProxy.java     | 111 ----
 .../apache/helix/monitoring/RiemannConfigs.java | 116 ----
 .../monitoring/RiemannMonitoringServer.java     |  73 ---
 .../monitoring/riemann/HelixAlertMessenger.java | 112 ++++
 .../helix/monitoring/riemann/RiemannAgent.java  | 169 ++++++
 .../monitoring/riemann/RiemannConfigs.java      | 116 ++++
 .../riemann/RiemannMonitoringServer.java        |  74 +++
 .../helix/monitoring/IntegrationTest.java       | 206 -------
 .../helix/monitoring/MonitoringTestHelper.java  | 136 +++++
 .../monitoring/TestClientServerMonitoring.java  | 188 -------
 .../helix/monitoring/TestRiemannAgent.java      | 127 -----
 .../helix/monitoring/TestRiemannAlertProxy.java | 105 ----
 .../monitoring/TestRiemannMonitoringServer.java |  78 ---
 .../monitoring/riemann/IntegrationTest.java     | 199 +++++++
 .../riemann/TestClientServerMonitoring.java     | 181 ++++++
 .../riemann/TestHelixAlertMessenger.java        | 110 ++++
 .../monitoring/riemann/TestRiemannAgent.java    | 117 ++++
 .../riemann/TestRiemannClientWrapper.java       | 134 +++++
 .../riemann/TestRiemannMonitoringServer.java    |  82 +++
 30 files changed, 2047 insertions(+), 1921 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/c0b1780d/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringClient.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringClient.java b/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringClient.java
index 743f8b4..a055354 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringClient.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringClient.java
@@ -21,8 +21,6 @@ package org.apache.helix.monitoring;
 
 import java.util.concurrent.TimeUnit;
 
-import org.apache.helix.api.id.ResourceId;
-
 /**
  * Interface for a client that can register with a monitoring server and send events for monitoring
  */
@@ -40,30 +38,26 @@ public interface MonitoringClient {
 
   /**
    * Send an event
-   * @param resource
    * @param e the event
-   * @param batch true if this should be part of a batch operation
    * @return true if the event was sent (or queued for batching), false otherwise
    */
-  boolean send(ResourceId resource, MonitoringEvent e, boolean batch);
+  boolean send(MonitoringEvent e);
 
   /**
    * Send an event and flush any outstanding messages
-   * @param resource
    * @param e the event
    * @return true if events were successfully sent, false otherwise
    */
-  boolean sendAndFlush(ResourceId resource, MonitoringEvent e);
+  boolean sendAndFlush(MonitoringEvent e);
 
   /**
    * Schedule an operation to run
-   * @param resource
    * @param interval the frequency
    * @param delay the amount of time to wait before the first execution
    * @param unit the unit of time to use
    * @param r the code to run
    */
-  void every(ResourceId resource, long interval, long delay, TimeUnit unit, Runnable r);
+  void every(long interval, long delay, TimeUnit unit, Runnable r);
 
   /**
    * Check if there is a valid connection to a monitoring server

http://git-wip-us.apache.org/repos/asf/helix/blob/c0b1780d/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringEvent.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringEvent.java b/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringEvent.java
index 80006fb..2044a3a 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringEvent.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringEvent.java
@@ -19,10 +19,14 @@ package org.apache.helix.monitoring;
  * under the License.
  */
 
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
+import org.apache.helix.api.Scope;
+import org.apache.helix.api.Scope.ScopeType;
 import org.apache.helix.api.id.ClusterId;
 import org.apache.helix.api.id.ControllerId;
 import org.apache.helix.api.id.ParticipantId;
@@ -32,6 +36,7 @@ import org.apache.helix.api.id.SpectatorId;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 
 /**
  * A generic monitoring event based on Helix constructs. This is based on Riemann's EventDSL.
@@ -49,8 +54,10 @@ public class MonitoringEvent {
   private Float _floatMetric;
   private Double _doubleMetric;
   private Float _ttl;
-  private List<String> _tags;
-  private Map<String, String> _attributes;
+  private final List<String> _tags;
+  private final Map<String, String> _attributes;
+  private String _shardingStr;
+  private final Set<ScopeType> _shardingScopes;
 
   /**
    * Create an empty MonitoringEvent
@@ -70,6 +77,8 @@ public class MonitoringEvent {
     _ttl = null;
     _tags = Lists.newLinkedList();
     _attributes = Maps.newHashMap();
+    _shardingStr = null;
+    _shardingScopes = Sets.newHashSet();
   }
 
   /**
@@ -253,13 +262,69 @@ public class MonitoringEvent {
     return this;
   }
 
-  // below are a set of package-private getters for each of the fields
+  /**
+   * Set sharding key using string
+   * @param shardingStr
+   * @return MonitoringEvent
+   */
+  public MonitoringEvent shardingString(String shardingStr) {
+    _shardingStr = shardingStr;
+    return this;
+  }
+
+  /**
+   * Set sharding key using scopes
+   * @param scopes
+   * @return MonitoringEvent
+   */
+  public MonitoringEvent shardingScopes(ScopeType... scopes) {
+    _shardingScopes.clear();
+    _shardingScopes.addAll(Arrays.asList(scopes));
+    return this;
+  }
+
+  /**
+   * Return sharding key which is used by MonitoringClient to choose MonitoringServer
+   * @return sharding key
+   */
+  public String shardingKey() {
+    // if shardingStr exists, use shardingStr
+    if (_shardingStr != null) {
+      return _shardingStr;
+    }
+
+    // if shardingStr doesn't exist, use shardingScopes
+    if (_shardingScopes.isEmpty()) {
+      _shardingScopes.addAll(Arrays.asList(ScopeType.CLUSTER, ScopeType.RESOURCE));
+    }
 
-  String host() {
+    StringBuilder sb = new StringBuilder();
+    if (_shardingScopes.contains(ScopeType.CLUSTER)) {
+      sb.append(_clusterId == null ? "%" : _clusterId.stringify());
+    }
+    if (_shardingScopes.contains(ScopeType.RESOURCE)) {
+      sb.append("|");
+      sb.append(_resourceId == null ? "%" : _resourceId.stringify());
+    }
+    if (_shardingScopes.contains(ScopeType.PARTITION)) {
+      sb.append("|");
+      sb.append(_partitionId == null ? "%" : _partitionId.stringify());
+    }
+    if (_shardingScopes.contains(ScopeType.PARTICIPANT)) {
+      sb.append("|");
+      sb.append(_host == null ? "%" : _host);
+    }
+
+    return sb.toString();
+  }
+
+  // below are used for converting MonitoringEvent to Riemann EventDSL
+
+  public String host() {
     return _host;
   }
 
-  String service() {
+  public String service() {
     if (_clusterId == null) {
       _clusterId = ClusterId.from("%");
     }
@@ -269,42 +334,45 @@ public class MonitoringEvent {
     if (_partitionId == null) {
       _partitionId = PartitionId.from("%");
     }
+    if (_name == null) {
+      _name = "%";
+    }
     return String.format("%s|%s|%s|%s", _clusterId, _resourceId, _partitionId, _name);
   }
 
-  String eventState() {
+  public String eventState() {
     return _eventState;
   }
 
-  String description() {
+  public String description() {
     return _description;
   }
 
-  Long time() {
+  public Long time() {
     return _time;
   }
 
-  Long longMetric() {
+  public Long longMetric() {
     return _longMetric;
   }
 
-  Float floatMetric() {
+  public Float floatMetric() {
     return _floatMetric;
   }
 
-  Double doubleMetric() {
+  public Double doubleMetric() {
     return _doubleMetric;
   }
 
-  Float ttl() {
+  public Float ttl() {
     return _ttl;
   }
 
-  List<String> tags() {
+  public List<String> tags() {
     return _tags;
   }
 
-  Map<String, String> attributes() {
+  public Map<String, String> attributes() {
     return _attributes;
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/c0b1780d/helix-core/src/test/java/org/apache/helix/MonitoringTestHelper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/MonitoringTestHelper.java b/helix-core/src/test/java/org/apache/helix/MonitoringTestHelper.java
deleted file mode 100644
index c17dab0..0000000
--- a/helix-core/src/test/java/org/apache/helix/MonitoringTestHelper.java
+++ /dev/null
@@ -1,114 +0,0 @@
-package org.apache.helix;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.io.IOException;
-import java.net.ServerSocket;
-
-import org.I0Itec.zkclient.NetworkUtil;
-
-public class MonitoringTestHelper {
-  static final int MAX_PORT = 65535;
-
-  /**
-   * generate a default riemann.config
-   * @param riemannPort
-   * @return
-   */
-  public static String getRiemannConfigString(int riemannPort) {
-    StringBuilder sb = new StringBuilder();
-    sb.append("(logging/init :file \"/dev/null\")\n\n")
-        .append("(tcp-server :host \"0.0.0.0\" :port " + riemannPort + ")\n\n")
-        .append("(instrumentation {:interval 1})\n\n")
-        .append("; (udp-server :host \"0.0.0.0\")\n")
-        .append("; (ws-server :host \"0.0.0.0\")\n")
-        .append("; (repl-server :host \"0.0.0.0\")\n\n")
-        .append("(periodically-expire 1)\n\n")
-        .append(
-            "(let [index (default :ttl 3 (update-index (index)))]\n  (streams\n    (expired prn)\n    index))\n");
-
-    return sb.toString();
-  }
-
-  /**
-   * generate a test config for checking latency
-   * @param proxyPort
-   * @return
-   */
-  public static String getLatencyCheckConfigString(int proxyPort)
-  {
-    StringBuilder sb = new StringBuilder();
-    sb.append("(require 'riemann.config)\n")
-      .append("(require 'clj-http.client)\n\n")
-      .append("(defn parse-double\n  \"Convert a string into a double\"\n  ")
-      .append("[instr]\n  (Double/parseDouble instr))\n\n")
-      .append("(defn check-95th-latency\n  \"Check if the 95th percentile latency is within expectations\"\n  ")
-      .append("[e]\n  (let [latency (parse-double (:latency95 e))]\n    ")
-      .append("(if (> latency 1.0) \n      ; Report if the 95th percentile latency exceeds 1.0s\n      ")
-      .append("(do (prn (:host e) \"has an unacceptable 95th percentile latency of\" latency)\n      ")
-      .append("(let [alert-name-str (str \"(\" (:cluster e) \".%.\" (:host e) \")(latency95)>(1000)\" )\n        ")
-      .append("proxy-url (str \"http://localhost:\" " + proxyPort + " )]\n        ")
-      .append("(clj-http.client/post proxy-url {:body alert-name-str }))))))\n\n")
-      .append("(streams\n  (where\n    ; Only process services containing LatencyReport\n    ")
-      .append("(and (service #\".*LatencyReport.*\") (not (state \"expired\")))\n    ")
-      .append("check-95th-latency))\n");
-    
-    return sb.toString();
-  }
-  
-  /**
-   * find an available tcp port
-   * @return
-   */
-  public static int availableTcpPort() {
-    ServerSocket ss = null;
-    try {
-      ss = new ServerSocket(0);
-      ss.setReuseAddress(true);
-      return ss.getLocalPort();
-    } catch (IOException e) {
-      // ok
-    } finally {
-      if (ss != null) {
-        try {
-          ss.close();
-        } catch (IOException e) {
-          // should not be thrown
-        }
-      }
-    }
-    return -1;
-  }
-
-  /**
-   * find the first available port starting from startPort inclusive
-   * @param startPort
-   * @return
-   */
-  public static int availableTcpPort(int startPort) {
-    int port = startPort;
-    for (; port <= MAX_PORT; port++) {
-      if (NetworkUtil.isPortFree(port))
-        break;
-    }
-
-    return port > MAX_PORT ? -1 : port;
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/c0b1780d/helix-monitor-client/src/main/java/org/apache/helix/monitoring/RiemannMonitoringClient.java
----------------------------------------------------------------------
diff --git a/helix-monitor-client/src/main/java/org/apache/helix/monitoring/RiemannMonitoringClient.java b/helix-monitor-client/src/main/java/org/apache/helix/monitoring/RiemannMonitoringClient.java
deleted file mode 100644
index 591bcb9..0000000
--- a/helix-monitor-client/src/main/java/org/apache/helix/monitoring/RiemannMonitoringClient.java
+++ /dev/null
@@ -1,555 +0,0 @@
-package org.apache.helix.monitoring;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.io.IOException;
-import java.net.UnknownHostException;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerFactory;
-import org.apache.helix.InstanceType;
-import org.apache.helix.api.id.ClusterId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.InstanceConfig;
-import org.apache.helix.spectator.RoutingTableProvider;
-import org.apache.log4j.Logger;
-
-import com.aphyr.riemann.client.AbstractRiemannClient;
-import com.aphyr.riemann.client.EventDSL;
-import com.aphyr.riemann.client.RiemannBatchClient;
-import com.aphyr.riemann.client.RiemannClient;
-import com.aphyr.riemann.client.UnsupportedJVMException;
-import com.google.common.collect.Lists;
-
-/**
- * A Riemann-based monitoring client Thread safety note: connect and disconnect are serialized to
- * ensure that there is no attempt to connect or disconnect with an inconsistent state. The send
- * routines are not protected for performance reasons, and so a single send/flush may fail.
- */
-public class RiemannMonitoringClient implements MonitoringClient {
-  private static final Logger LOG = Logger.getLogger(RiemannMonitoringClient.class);
-  public static final String DEFAULT_MONITORING_SERVICE_NAME = "MonitoringService";
-
-  /**
-   * Contains information about a RiemannClient inside a MonitoringClient
-   */
-  class MonitoringClientInfo {
-    /**
-     * host/port of riemann server to which this client connects
-     */
-    String _host;
-    int _port;
-
-    /**
-     * riemann client
-     */
-    RiemannClient _client;
-
-    /**
-     * batch rieman client, null if batch is not enabled
-     */
-    RiemannBatchClient _batchClient;
-
-    /**
-     * list of periodic tasks scheduled on this riemann client
-     */
-    final List<ScheduledItem> _scheduledItems;
-
-    public MonitoringClientInfo() {
-      _host = null;
-      _port = -1;
-      _client = null;
-      _batchClient = null;
-      _scheduledItems = Lists.newArrayList();
-    }
-
-  }
-
-  private int _batchSize;
-  private final ResourceId _monitoringServiceName;
-  private final ClusterId _monitoringCluster;
-  private int _monitoringServicePartitionNum;
-
-  private final HelixManager _spectator;
-  private final RoutingTableProvider _routingTableProvider;
-  private final Map<ResourceId, MonitoringClientInfo> _clientMap;
-
-  /**
-   * Create a non-batched monitoring client
-   * @param zkAddr
-   * @param monitoringClusterId
-   */
-  public RiemannMonitoringClient(String zkAddr, ClusterId monitoringClusterId) {
-    this(zkAddr, monitoringClusterId, ResourceId.from(DEFAULT_MONITORING_SERVICE_NAME), 1);
-  }
-
-  /**
-   * Create a monitoring client that supports batching
-   * @param clusterId
-   *          the cluster to monitor
-   * @param accessor
-   *          an accessor for the cluster
-   * @param batchSize
-   *          the number of events in a batch
-   * @throws Exception
-   */
-  public RiemannMonitoringClient(String zkAddr, ClusterId monitoringClusterId,
-      ResourceId monitoringServiceName, int batchSize) {
-    _batchSize = batchSize > 0 ? batchSize : 1;
-    _monitoringServiceName = monitoringServiceName;
-    _monitoringCluster = monitoringClusterId;
-    _monitoringServicePartitionNum = 0;
-    _clientMap = new ConcurrentHashMap<ResourceId, RiemannMonitoringClient.MonitoringClientInfo>();
-
-    _spectator =
-        HelixManagerFactory.getZKHelixManager(monitoringClusterId.stringify(), null,
-            InstanceType.SPECTATOR, zkAddr);
-    _routingTableProvider = new RoutingTableProvider();
-  }
-
-  @Override
-  public void connect() throws Exception {
-    if (isConnected()) {
-      LOG.error("Already connected to Riemann!");
-      return;
-    }
-
-    // Connect spectator to the cluster being monitored
-    _spectator.connect();
-    _spectator.addExternalViewChangeListener(_routingTableProvider);
-
-    // Get partition number of monitoring service
-    HelixDataAccessor accessor = _spectator.getHelixDataAccessor();
-    IdealState idealState =
-        accessor.getProperty(accessor.keyBuilder().idealStates(_monitoringServiceName.stringify()));
-    if (idealState == null) {
-      throw new IllegalArgumentException("Resource for MonitoringService: "
-          + _monitoringServiceName + " doesn't exist in cluster: " + _monitoringCluster);
-    }
-
-    _monitoringServicePartitionNum = idealState.getNumPartitions();
-
-    if (_monitoringServicePartitionNum <= 0) {
-      throw new IllegalArgumentException("Invalid partition number of MonitoringService: "
-          + _monitoringServiceName + " in cluster: " + _monitoringCluster + ", was "
-          + _monitoringServicePartitionNum);
-    }
-  }
-
-  @Override
-  public void disconnect() {
-    // disconnect internal riemann clients
-    for (ResourceId resource : _clientMap.keySet()) {
-      disconnectInternal(resource);
-    }
-
-    _spectator.disconnect();
-    _monitoringServicePartitionNum = 0;
-  }
-
-  @Override
-  public boolean isConnected() {
-    return _spectator.isConnected();
-  }
-
-  /**
-   * Flush a riemann client for a resource
-   * @param resource
-   * @return
-   */
-  private boolean flush(ResourceId resource) {
-    if (!isConnected()) {
-      LOG.error("Tried to flush a Riemann client that is not connected!");
-      return false;
-    }
-
-    AbstractRiemannClient c = getClient(resource, true);
-    if (c == null) {
-      LOG.warn("Fail to get riemann client for resource: " + resource);
-      return false;
-    }
-
-    try {
-      c.flush();
-      return true;
-    } catch (IOException e) {
-      LOG.error("Problem flushing the Riemann event queue for resource: " + resource, e);
-    }
-    return false;
-  }
-
-  @Override
-  public boolean flush() {
-    boolean succeed = true;
-    for (ResourceId resource : _clientMap.keySet()) {
-      succeed = succeed && flush(resource);
-    }
-
-    return succeed;
-  }
-
-  @Override
-  public boolean send(ResourceId resource, MonitoringEvent event, boolean batch) {
-    if (!isConnected()) {
-      LOG.error("Riemann connection must be active in order to send an event!");
-      return false;
-    }
-
-    if (!isConnected(resource)) {
-      connect(resource, null, event);
-    } else {
-      AbstractRiemannClient c = getClient(resource, batch);
-      convertEvent(c, event).send();
-    }
-
-    return true;
-  }
-
-  @Override
-  public boolean sendAndFlush(ResourceId resource, MonitoringEvent event) {
-
-    boolean sendResult = send(resource, event, true);
-    if (sendResult) {
-      return flush(resource);
-    }
-    return false;
-  }
-
-  /**
-   * Batch should be enabled for either all or none of riemann clients
-   */
-  @Override
-  public boolean isBatchingEnabled() {
-    return _batchSize > 1;
-  }
-
-  @Override
-  public int getBatchSize() {
-    return _batchSize;
-  }
-
-  /**
-   * Check if a riemann client for given resource is connected
-   * @param resource
-   * @return true if riemann client is connected, false otherwise
-   */
-  private boolean isConnected(ResourceId resource) {
-    if (!isConnected()) {
-      return false;
-    }
-
-    MonitoringClientInfo clientInfo = _clientMap.get(resource);
-    return clientInfo != null && clientInfo._client != null && clientInfo._client.isConnected();
-  }
-
-  @Override
-  public synchronized void every(ResourceId resource, long interval, long delay, TimeUnit unit,
-      Runnable r) {
-    if (!isConnected()) {
-      LOG.error("Riemann client must be connected in order to send events!");
-      return;
-    }
-
-    ScheduledItem scheduledItem = new ScheduledItem();
-    scheduledItem.interval = interval;
-    scheduledItem.delay = delay;
-    scheduledItem.unit = unit;
-    scheduledItem.r = r;
-
-    if (isConnected(resource)) {
-      MonitoringClientInfo clientInfo = _clientMap.get(resource);
-      clientInfo._scheduledItems.add(scheduledItem);
-      getClient(resource).every(interval, delay, unit, r);
-    } else {
-      connect(resource, scheduledItem, null);
-    }
-  }
-
-  /**
-   * Connect a riemann client to riemann server given a resource
-   * @param resource
-   * @param scheduledItem
-   * @param pendingEvent
-   */
-  private void connect(ResourceId resource, ScheduledItem scheduledItem,
-      MonitoringEvent pendingEvent) {
-    // Hash by resourceId
-    int partitionKey = resource.hashCode() % _monitoringServicePartitionNum;
-    List<InstanceConfig> instances =
-        _routingTableProvider.getInstances(_monitoringServiceName.stringify(),
-            _monitoringServiceName + "_" + partitionKey, "ONLINE");
-
-    if (instances.size() == 0) {
-      LOG.error("Riemann monitoring server for resource: " + resource + " at partitionKey: "
-          + partitionKey + " is not available");
-      return;
-    }
-
-    InstanceConfig instanceConfig = instances.get(0);
-    String host = instanceConfig.getHostName();
-    int port = Integer.parseInt(instanceConfig.getPort());
-
-    // Do the connect asynchronously as a tcp establishment could take time
-    doConnectAsync(resource, host, port, scheduledItem, pendingEvent);
-  }
-
-  /**
-   * Get a raw, non-batched Riemann client. WARNING: do not cache this, as it may be disconnected
-   * without notice
-   * @return RiemannClient
-   */
-  private RiemannClient getClient(ResourceId resource) {
-    MonitoringClientInfo clientInfo = _clientMap.get(resource);
-    return clientInfo == null ? null : clientInfo._client;
-  }
-
-  /**
-   * Get a batched Riemann client (if batching is supported) WARNING: do not cache this, as it may
-   * be disconnected without notice
-   * @return RiemannBatchClient
-   */
-  private RiemannBatchClient getBatchClient(ResourceId resource) {
-    MonitoringClientInfo clientInfo = _clientMap.get(resource);
-    return clientInfo == null ? null : clientInfo._batchClient;
-  }
-
-  /**
-   * Get a Riemann client WARNING: do not cache this, as it may be disconnected without notice
-   * @param batch
-   *          true if the client is preferred to support batching, false otherwise
-   * @return AbstractRiemannClient
-   */
-  private AbstractRiemannClient getClient(ResourceId resource, boolean batch) {
-    if (batch && isBatchingEnabled()) {
-      return getBatchClient(resource);
-    } else {
-      return getClient(resource);
-    }
-  }
-
-  /**
-   * Based on the contents of the leader node, connect to a Riemann server
-   * @param leader
-   *          node containing host/port
-   */
-  private void doConnectAsync(final ResourceId resource, final String host, final int port,
-      final ScheduledItem scheduledItem, final MonitoringEvent pendingEvent) {
-    new Thread() {
-      @Override
-      public void run() {
-        synchronized (RiemannMonitoringClient.this) {
-          if (resource != null && host != null && port != -1) {
-            connectInternal(resource, host, port, scheduledItem, pendingEvent);
-          } else {
-            LOG.error("Fail to doConnectAsync becaue of invalid arguments, resource: " + resource
-                + ", host: " + host + ", port: " + port);
-          }
-        }
-      }
-    }.start();
-  }
-
-  /**
-   * Establishment of a connection to a Riemann server
-   * @param resource
-   * @param host
-   * @param port
-   * @param scheduledItem
-   * @param pendingEvent
-   */
-  private synchronized void connectInternal(ResourceId resource, String host, int port,
-      ScheduledItem scheduledItem, MonitoringEvent pendingEvent) {
-    MonitoringClientInfo clientInfo = _clientMap.get(resource);
-    if (clientInfo != null && clientInfo._host.equals(host) && clientInfo._port == port
-        && clientInfo._client != null && clientInfo._client.isConnected()) {
-      LOG.info("Riemann client for resource: " + resource + " already connected on " + host + ":"
-          + port);
-
-      // We might have to reschedule tasks
-      if (scheduledItem != null) {
-        clientInfo._scheduledItems.add(scheduledItem);
-        clientInfo._client.every(scheduledItem.interval, scheduledItem.delay, scheduledItem.unit,
-            scheduledItem.r);
-      }
-
-      // Sending over pending event
-      if (pendingEvent != null) {
-        convertEvent(clientInfo._client, pendingEvent).send();
-      }
-
-      return;
-    }
-
-    // Disconnect from previous riemann server
-    disconnectInternal(resource);
-
-    // Connect to new riemann server
-    RiemannClient client = null;
-    RiemannBatchClient batchClient = null;
-    try {
-      client = RiemannClient.tcp(host, port);
-      client.connect();
-    } catch (IOException e) {
-      LOG.error("Error establishing a connection!", e);
-
-    }
-
-    if (client != null && getBatchSize() > 1) {
-      try {
-        batchClient = new RiemannBatchClient(_batchSize, client);
-      } catch (UnknownHostException e) {
-        _batchSize = 1;
-        LOG.error("Could not resolve host", e);
-      } catch (UnsupportedJVMException e) {
-        _batchSize = 1;
-        LOG.warn("Batching not enabled because of incompatible JVM", e);
-      }
-    }
-
-    if (clientInfo == null) {
-      clientInfo = new MonitoringClientInfo();
-    }
-
-    clientInfo._host = host;
-    clientInfo._port = port;
-    clientInfo._client = client;
-    clientInfo._batchClient = batchClient;
-    if (scheduledItem != null) {
-      clientInfo._scheduledItems.add(scheduledItem);
-    }
-    _clientMap.put(resource, clientInfo);
-
-    // We might have to reschedule tasks
-    for (ScheduledItem item : clientInfo._scheduledItems) {
-      client.every(item.interval, item.delay, item.unit, item.r);
-    }
-
-    // Send over pending event
-    if (pendingEvent != null) {
-      convertEvent(client, pendingEvent).send();
-    }
-  }
-
-  /**
-   * Teardown of a connection to a Riemann server
-   */
-  private synchronized void disconnectInternal(ResourceId resource) {
-    MonitoringClientInfo clientInfo = _clientMap.get(resource);
-    if (clientInfo == null) {
-      return;
-    }
-
-    RiemannBatchClient batchClient = clientInfo._batchClient;
-    RiemannClient client = clientInfo._client;
-
-    clientInfo._batchClient = null;
-    clientInfo._client = null;
-
-    try {
-      if (batchClient != null && batchClient.isConnected()) {
-        batchClient.scheduler().shutdown();
-        batchClient.disconnect();
-      } else if (client != null && client.isConnected()) {
-        client.scheduler().shutdown();
-        client.disconnect();
-      }
-    } catch (IOException e) {
-      LOG.error("Disconnection error", e);
-    }
-  }
-
-  /**
-   * Change a helix monitoring event into a Riemann event
-   * @param c Riemann client
-   * @param helixEvent helix event
-   * @return Riemann EventDSL
-   */
-  private EventDSL convertEvent(AbstractRiemannClient c, MonitoringEvent helixEvent) {
-    EventDSL event = c.event();
-    if (helixEvent.host() != null) {
-      event.host(helixEvent.host());
-    }
-    if (helixEvent.service() != null) {
-      event.service(helixEvent.service());
-    }
-    if (helixEvent.eventState() != null) {
-      event.state(helixEvent.eventState());
-    }
-    if (helixEvent.description() != null) {
-      event.description(helixEvent.description());
-    }
-    if (helixEvent.time() != null) {
-      event.time(helixEvent.time());
-    }
-    if (helixEvent.ttl() != null) {
-      event.ttl(helixEvent.ttl());
-    }
-    if (helixEvent.longMetric() != null) {
-      event.metric(helixEvent.longMetric());
-    } else if (helixEvent.floatMetric() != null) {
-      event.metric(helixEvent.floatMetric());
-    } else if (helixEvent.doubleMetric() != null) {
-      event.metric(helixEvent.doubleMetric());
-    }
-    if (!helixEvent.tags().isEmpty()) {
-      event.tags(helixEvent.tags());
-    }
-    if (!helixEvent.attributes().isEmpty()) {
-      event.attributes.putAll(helixEvent.attributes());
-    }
-    return event;
-  }
-
-  /**
-   * Wrapper for a task that should be run to a schedule
-   */
-  private static class ScheduledItem {
-    long interval;
-    long delay;
-    TimeUnit unit;
-    Runnable r;
-
-    @Override
-    public boolean equals(Object other) {
-      if (other instanceof ScheduledItem) {
-        ScheduledItem that = (ScheduledItem) other;
-        return interval == that.interval && delay == that.delay && unit == that.unit && r == that.r;
-      }
-      return false;
-    }
-
-    @Override
-    public int hashCode() {
-      return toString().hashCode();
-    }
-
-    @Override
-    public String toString() {
-      return String.format("interval: %d|delay: %d|timeunit: %s|runnable: %s", interval, delay,
-          unit.toString(), r.toString());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/c0b1780d/helix-monitor-client/src/main/java/org/apache/helix/monitoring/riemann/ClientUtil.java
----------------------------------------------------------------------
diff --git a/helix-monitor-client/src/main/java/org/apache/helix/monitoring/riemann/ClientUtil.java b/helix-monitor-client/src/main/java/org/apache/helix/monitoring/riemann/ClientUtil.java
new file mode 100644
index 0000000..0c12ca3
--- /dev/null
+++ b/helix-monitor-client/src/main/java/org/apache/helix/monitoring/riemann/ClientUtil.java
@@ -0,0 +1,69 @@
+package org.apache.helix.monitoring.riemann;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.monitoring.MonitoringEvent;
+
+import com.aphyr.riemann.client.AbstractRiemannClient;
+import com.aphyr.riemann.client.EventDSL;
+
+public class ClientUtil {
+  /**
+   * Change a helix monitoring event into a Riemann event
+   * @param c Riemann client
+   * @param helixEvent helix event
+   * @return Riemann EventDSL
+   */
+  public static EventDSL convertEvent(AbstractRiemannClient c, MonitoringEvent helixEvent) {
+    EventDSL event = c.event();
+    if (helixEvent.host() != null) {
+      event.host(helixEvent.host());
+    }
+    if (helixEvent.service() != null) {
+      event.service(helixEvent.service());
+    }
+    if (helixEvent.eventState() != null) {
+      event.state(helixEvent.eventState());
+    }
+    if (helixEvent.description() != null) {
+      event.description(helixEvent.description());
+    }
+    if (helixEvent.time() != null) {
+      event.time(helixEvent.time());
+    }
+    if (helixEvent.ttl() != null) {
+      event.ttl(helixEvent.ttl());
+    }
+    if (helixEvent.longMetric() != null) {
+      event.metric(helixEvent.longMetric());
+    } else if (helixEvent.floatMetric() != null) {
+      event.metric(helixEvent.floatMetric());
+    } else if (helixEvent.doubleMetric() != null) {
+      event.metric(helixEvent.doubleMetric());
+    }
+    if (!helixEvent.tags().isEmpty()) {
+      event.tags(helixEvent.tags());
+    }
+    if (!helixEvent.attributes().isEmpty()) {
+      event.attributes.putAll(helixEvent.attributes());
+    }
+    return event;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/c0b1780d/helix-monitor-client/src/main/java/org/apache/helix/monitoring/riemann/RawRiemannClient.java
----------------------------------------------------------------------
diff --git a/helix-monitor-client/src/main/java/org/apache/helix/monitoring/riemann/RawRiemannClient.java b/helix-monitor-client/src/main/java/org/apache/helix/monitoring/riemann/RawRiemannClient.java
new file mode 100644
index 0000000..4fe5902
--- /dev/null
+++ b/helix-monitor-client/src/main/java/org/apache/helix/monitoring/riemann/RawRiemannClient.java
@@ -0,0 +1,229 @@
+package org.apache.helix.monitoring.riemann;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.helix.monitoring.MonitoringEvent;
+import org.apache.log4j.Logger;
+
+import com.aphyr.riemann.client.AbstractRiemannClient;
+import com.aphyr.riemann.client.RiemannBatchClient;
+import com.aphyr.riemann.client.RiemannClient;
+import com.aphyr.riemann.client.UnsupportedJVMException;
+
+/**
+ * Simple wrapper around RiemannClient that does auto reconnect
+ */
+class RawRiemannClient {
+  private static final Logger LOG = Logger.getLogger(RawRiemannClient.class);
+  private static final int HEARTBEAT_PERIOD = 10;
+  private static final int TIMEOUT_LIMIT = 3;
+
+  enum State {
+    DISCONNECTED,
+    CONNECTED,
+    RECONNECTING
+  }
+
+  private RiemannClient _rclient;
+  private RiemannBatchClient _brclient;
+  private volatile State _state = State.DISCONNECTED;
+  private final String _host;
+  private final int _port;
+  private int _batchSize;
+  private Thread _reconnectThread;
+
+  public RawRiemannClient(String host, int port) {
+    this(host, port, 1);
+  }
+
+  public RawRiemannClient(String host, int port, int batchSize) {
+    _host = host;
+    _port = port;
+    _batchSize = batchSize > 0 ? batchSize : 1;
+  }
+
+  private synchronized boolean doConnect() {
+    if (_state == State.CONNECTED) {
+      return true;
+    }
+
+    try {
+      _rclient = RiemannClient.tcp(_host, _port);
+      _rclient.connect();
+      if (_rclient != null && _batchSize > 1) {
+        try {
+          _brclient = new RiemannBatchClient(_batchSize, _rclient);
+        } catch (UnknownHostException e) {
+          _batchSize = 1;
+          LOG.error("Could not resolve host", e);
+        } catch (UnsupportedJVMException e) {
+          _batchSize = 1;
+          LOG.warn("Batching not enabled because of incompatible JVM", e);
+        }
+      }
+
+      Random random = new Random();
+      _rclient.every(HEARTBEAT_PERIOD, random.nextInt(HEARTBEAT_PERIOD), TimeUnit.SECONDS,
+          new Runnable() {
+
+            @Override
+            public void run() {
+              try {
+                _rclient.event().service("heartbeat").ttl(TIMEOUT_LIMIT * HEARTBEAT_PERIOD)
+                    .sendWithAck();
+                _state = State.CONNECTED;
+              } catch (Exception e) {
+                LOG.error("Exception in send heatbeat to riemann server: " + _host + ":" + _port, e);
+                _state = State.RECONNECTING;
+              }
+            }
+          });
+      _state = State.CONNECTED;
+      return true;
+    } catch (IOException e) {
+      LOG.error("Fail to connect to riemann server: " + _host + ":" + _port);
+    }
+
+    return false;
+  }
+
+  /**
+   * Make a connection to Riemann server; if fails, start a thread for retrying
+   */
+  public synchronized void connect() {
+    boolean success = doConnect();
+    if (!success) {
+      _reconnectThread = new Thread(new Runnable() {
+
+        @Override
+        public void run() {
+          LOG.info("Start reconnect thread");
+          Random random = new Random();
+          try {
+            while (!Thread.currentThread().isInterrupted()) {
+              boolean success = doConnect();
+              if (success) {
+                break;
+              }
+
+              TimeUnit.SECONDS.sleep(HEARTBEAT_PERIOD + random.nextInt() % HEARTBEAT_PERIOD);
+
+            }
+          } catch (InterruptedException e) {
+            LOG.info("Reconnect thread is interrupted");
+          } finally {
+            LOG.info("Terminate reconnect thread");
+          }
+        }
+      });
+
+      _reconnectThread.start();
+    }
+  }
+
+  /**
+   * Disconnect from Riemann server
+   */
+  public synchronized void disconnect() {
+    try {
+      if (_reconnectThread != null) {
+        _reconnectThread.interrupt();
+      }
+
+      if (_rclient != null) {
+        _rclient.scheduler().shutdown();
+        _rclient.disconnect();
+      }
+    } catch (IOException e) {
+      LOG.error("Fail to disconnect rclient for " + _host + ":" + _port, e);
+    }
+    _state = State.DISCONNECTED;
+  }
+
+  public boolean isConnected() {
+    return _state == State.CONNECTED;
+  }
+
+  private AbstractRiemannClient client() {
+    if (isBatchEnabled()) {
+      return _brclient;
+    } else {
+      return _rclient;
+    }
+  }
+
+  public boolean send(MonitoringEvent event) {
+    if (!isConnected()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Fail to send event: " + event + " to " + _host + ":" + _port
+            + ", because state is not connected, was: " + _state);
+      }
+      return false;
+    }
+
+    try {
+      ClientUtil.convertEvent(client(), event).send();
+      return true;
+    } catch (Exception e) {
+      LOG.error("Fail to send event: " + event + " to " + _host + ":" + _port, e);
+    }
+    return false;
+  }
+
+  public boolean flush() {
+    if (!isConnected()) {
+      return false;
+    }
+
+    try {
+      client().flush();
+      return true;
+    } catch (IOException e) {
+      LOG.error("Problem flushing the Riemann event queue", e);
+    }
+    return false;
+  }
+
+  public boolean sendAndFlush(MonitoringEvent event) {
+    boolean success = send(event);
+    if (success) {
+      return flush();
+    }
+
+    return false;
+  }
+
+  public int getBatchSize() {
+    return _batchSize;
+  }
+
+  public boolean isBatchEnabled() {
+    return _batchSize > 1;
+  }
+
+  public State getState() {
+    return _state;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/c0b1780d/helix-monitor-client/src/main/java/org/apache/helix/monitoring/riemann/RiemannClientWrapper.java
----------------------------------------------------------------------
diff --git a/helix-monitor-client/src/main/java/org/apache/helix/monitoring/riemann/RiemannClientWrapper.java b/helix-monitor-client/src/main/java/org/apache/helix/monitoring/riemann/RiemannClientWrapper.java
new file mode 100644
index 0000000..bd6517f
--- /dev/null
+++ b/helix-monitor-client/src/main/java/org/apache/helix/monitoring/riemann/RiemannClientWrapper.java
@@ -0,0 +1,234 @@
+package org.apache.helix.monitoring.riemann;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.helix.monitoring.MonitoringClient;
+import org.apache.helix.monitoring.MonitoringEvent;
+import org.apache.log4j.Logger;
+
+/**
+ * Wrapper around a list of RawRiemannClients; if one client fails, try the next one in list
+ */
+public class RiemannClientWrapper implements MonitoringClient {
+  private static final Logger LOG = Logger.getLogger(RiemannClientWrapper.class);
+
+  /**
+   * A list of "host:port" addresses for Riemann servers
+   */
+  private final List<String> _rsHosts;
+  private boolean _isConnected;
+  private List<RawRiemannClient> _rclients;
+  private int _batchSize;
+
+  private ScheduledThreadPoolExecutor _pool;
+
+  public RiemannClientWrapper(List<String> rsHosts) {
+    this(rsHosts, 1);
+  }
+
+  public RiemannClientWrapper(List<String> rsHosts, int batchSize) {
+    _rsHosts = rsHosts;
+    Collections.sort(_rsHosts);
+    _batchSize = batchSize > 0 ? batchSize : 1;
+    _isConnected = false;
+  }
+
+  // Returns the pool for this client. Creates the pool on first use
+  private synchronized ScheduledThreadPoolExecutor pool() {
+    if (_pool == null) {
+      _pool = new ScheduledThreadPoolExecutor(1);
+      _pool.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+      _pool.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+    }
+
+    return _pool;
+  }
+
+  @Override
+  public synchronized void connect() throws Exception {
+    if (_isConnected) {
+      LOG.warn("Client already connected");
+      return;
+    }
+
+    _rclients = new ArrayList<RawRiemannClient>();
+    for (String rsHost : _rsHosts) {
+      String[] splits = rsHost.split(":");
+
+      if (splits == null || splits.length != 2) {
+        throw new IllegalArgumentException("Invalid Riemann server: " + rsHost);
+      }
+
+      String host = splits[0];
+      int port = Integer.parseInt(splits[1]);
+
+      RawRiemannClient rclient = new RawRiemannClient(host, port, _batchSize);
+      rclient.connect();
+
+      /**
+       * If any Riemann client doesn't support batch, set it to 1
+       */
+      if (rclient.isConnected() && rclient.getBatchSize() == 1) {
+        _batchSize = 1;
+      }
+      _rclients.add(rclient);
+    }
+
+    _isConnected = true;
+  }
+
+  @Override
+  public synchronized void disconnect() {
+    if (!_isConnected || _rclients == null) {
+      LOG.warn("Client already disconnected");
+      return;
+    }
+
+    for (RawRiemannClient rclient : _rclients) {
+      rclient.disconnect();
+    }
+    _rclients = null;
+    _isConnected = false;
+  }
+
+  /**
+   * Get raw client based on event's sharding key
+   * @param event
+   * @return
+   */
+  private RawRiemannClient client(MonitoringEvent event) {
+    String shardingKey = event.shardingKey();
+    int baseIdx = shardingKey.hashCode() % _rsHosts.size();
+
+    // find the first rclient in CONNECTED state and send
+    for (int i = 0; i < _rsHosts.size(); i++) {
+      int idx = (baseIdx + i) % _rsHosts.size();
+      RawRiemannClient rclient = _rclients.get(idx);
+      if (rclient.isConnected()) {
+        return rclient;
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public boolean send(MonitoringEvent event) {
+    if (!_isConnected || _rclients == null) {
+      LOG.warn("Client is not connected. Fail to send event: " + event);
+      return false;
+    }
+
+    RawRiemannClient rclient = client(event);
+    if (rclient != null) {
+      return rclient.send(event);
+    }
+
+    LOG.error("Fail to send event: " + event + ", no rclient is available");
+    return false;
+  }
+
+  @Override
+  public void every(long interval, long delay, TimeUnit unit, Runnable r) {
+    pool().scheduleAtFixedRate(r, delay, interval, unit);
+  }
+
+  @Override
+  public boolean sendAndFlush(MonitoringEvent event) {
+    if (!_isConnected || _rclients == null) {
+      LOG.warn("Client is not connected. Fail to send event: " + event);
+      return false;
+    }
+
+    RawRiemannClient rclient = client(event);
+    if (rclient != null) {
+      boolean success = rclient.send(event);
+      if (success) {
+        return rclient.flush();
+      }
+    }
+    LOG.error("Fail to send event: " + event + ", no rclient is available");
+    return false;
+  }
+
+  @Override
+  public boolean isConnected() {
+    return _isConnected;
+  }
+
+  @Override
+  public boolean isBatchingEnabled() {
+    if (!_isConnected || _rclients == null) {
+      LOG.warn("Client is not connected");
+      return false;
+    }
+
+    /**
+     * Batch should be enabled for all or none raw clients
+     */
+    for (RawRiemannClient rclient : _rclients) {
+      if (rclient.isConnected()) {
+        return rclient.isBatchEnabled();
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Return batch size if connected or 1 otherwise
+   */
+  @Override
+  public int getBatchSize() {
+    if (!_isConnected || _rclients == null) {
+      LOG.warn("Client is not connected");
+      return 1;
+    }
+
+    /**
+     * Batch size should be the same for all raw clients
+     */
+    for (RawRiemannClient rclient : _rclients) {
+      if (rclient.isConnected()) {
+        return rclient.getBatchSize();
+      }
+    }
+
+    return 1;
+  }
+
+  @Override
+  public boolean flush() {
+    if (!_isConnected || _rclients == null) {
+      LOG.warn("Client is not connected");
+      return false;
+    }
+
+    boolean success = true;
+    for (RawRiemannClient rclient : _rclients) {
+      success &= rclient.flush();
+    }
+    return success;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/c0b1780d/helix-monitor-server/pom.xml
----------------------------------------------------------------------
diff --git a/helix-monitor-server/pom.xml b/helix-monitor-server/pom.xml
index 2cbddfd..041a390 100644
--- a/helix-monitor-server/pom.xml
+++ b/helix-monitor-server/pom.xml
@@ -54,11 +54,6 @@ under the License.
       <version>0.2.4</version>
     </dependency>
     <dependency>
-      <groupId>org.eclipse.jetty.aggregate</groupId>
-      <artifactId>jetty-all-server</artifactId>
-      <version>8.1.14.v20131031</version>
-    </dependency>
-    <dependency>
       <groupId>factual</groupId>
       <artifactId>clj-helix</artifactId>
       <version>0.1.0</version>

http://git-wip-us.apache.org/repos/asf/helix/blob/c0b1780d/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAgent.java
----------------------------------------------------------------------
diff --git a/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAgent.java b/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAgent.java
deleted file mode 100644
index 61e3d6c..0000000
--- a/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAgent.java
+++ /dev/null
@@ -1,137 +0,0 @@
-package org.apache.helix.monitoring;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.util.Random;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerFactory;
-import org.apache.helix.InstanceType;
-import org.apache.log4j.Logger;
-
-import com.aphyr.riemann.client.RiemannClient;
-
-public class RiemannAgent {
-  private static final Logger LOG = Logger.getLogger(RiemannAgent.class);
-
-  static final String STATEMODEL_NAME = "OnlineOffline";
-
-  final Random _random;
-  final String _zkAddr;
-  final String _clusterName;
-  final String _instanceName;
-  final int _riemannPort;
-  final HelixManager _participant;
-  final RiemannClient _client;
-
-  public RiemannAgent(String zkAddr, String clusterName, int riemannPort) throws IOException {
-    _random = new Random();
-    _zkAddr = zkAddr;
-    _clusterName = clusterName;
-    _instanceName =
-        String.format("%s_%d", InetAddress.getLocalHost().getCanonicalHostName(), riemannPort);
-    _riemannPort = riemannPort;
-    _participant =
-        HelixManagerFactory.getZKHelixManager(clusterName, _instanceName, InstanceType.PARTICIPANT,
-            zkAddr);
-    _client = RiemannClient.tcp("localhost", riemannPort);
-  }
-
-  public void start() throws Exception {
-    LOG.info("Starting RiemannAgent. zk: " + _zkAddr + ", cluster: " + _clusterName
-        + ", instance: " + _instanceName + ", riemannPort: " + _riemannPort);
-
-    // Wait until riemann port is connected
-    int timeout = 30 * 1000;
-    long startT = System.currentTimeMillis();
-    while ((System.currentTimeMillis() - startT) < timeout) {
-      try {
-        _client.connect();
-        _client.event().service("heartbeat").state("running").ttl(20).sendWithAck();
-        break;
-      } catch (IOException e) {
-        int sleep = _random.nextInt(3000) + 3000;
-        LOG.info("Wait " + sleep + "ms for riemann server to come up");
-        TimeUnit.MILLISECONDS.sleep(sleep);
-      }
-    }
-
-    if (!_client.isConnected()) {
-      String err =
-          "Fail to connect to reimann server on localhost:" + _riemannPort + " in " + timeout
-              + "ms";
-      LOG.error(err);
-      throw new RuntimeException(err);
-    }
-    LOG.info("RiemannAgent connected to local riemann server on port: " + _riemannPort);
-
-    // Start helix participant
-    _participant.getStateMachineEngine().registerStateModelFactory(STATEMODEL_NAME,
-        new RiemannAgentStateModelFactory());
-    _participant.connect();
-
-    // Monitor riemann server
-    _client.every(10, 0, TimeUnit.SECONDS, new Runnable() {
-
-      @Override
-      public void run() {
-        try {
-          // Send heartbeat metrics
-          _client.event().service("heartbeat").state("running").ttl(20).sendWithAck();
-        } catch (Exception e) {
-          LOG.error("Exception in send heatbeat to local riemann server, shutdown RiemannAgent: "
-              + _instanceName, e);
-          asyncShutdown();
-        }
-      }
-    });
-
-  }
-
-  /**
-   * Do shutdown asynchronously
-   */
-  void asyncShutdown() {
-    new Thread(new Runnable() {
-
-      @Override
-      public void run() {
-        shutdown();
-      }
-    }).start();
-  }
-
-  public void shutdown() {
-    LOG.info("Shutting down RiemannAgent. zk: " + _zkAddr + ", cluster: " + _clusterName
-        + ", instance: " + _instanceName + ", riemannPort: " + _riemannPort);
-
-    try {
-      _client.scheduler().shutdown();
-      _client.disconnect();
-    } catch (IOException e) {
-      LOG.error("Exception in disconnect riemann client", e);
-    }
-
-    _participant.disconnect();
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/c0b1780d/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAgentStateModel.java
----------------------------------------------------------------------
diff --git a/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAgentStateModel.java b/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAgentStateModel.java
deleted file mode 100644
index 2e32cef..0000000
--- a/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAgentStateModel.java
+++ /dev/null
@@ -1,54 +0,0 @@
-package org.apache.helix.monitoring;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.NotificationContext;
-import org.apache.helix.model.Message;
-import org.apache.helix.participant.statemachine.StateModel;
-import org.apache.helix.participant.statemachine.StateModelInfo;
-import org.apache.helix.participant.statemachine.Transition;
-import org.apache.log4j.Logger;
-
-@StateModelInfo(initialState = "OFFLINE", states = {
-    "DROPPED", "OFFLINE", "ONLINE"
-})
-public class RiemannAgentStateModel extends StateModel {
-  private static final Logger LOG = Logger.getLogger(RiemannAgentStateModel.class);
-
-  void logTransition(Message message) {
-    String toState = message.getToState();
-    String fromState = message.getFromState();
-    String resourceName = message.getResourceName();
-    String partittionName = message.getPartitionName();
-
-    LOG.info("Become " + toState + " from " + fromState + " for resource: " + resourceName
-        + ", partition: " + partittionName);
-  }
-
-  @Transition(to = "ONLINE", from = "OFFLINE")
-  public void onBecomeOnlineFromOffline(Message message, NotificationContext context) {
-    logTransition(message);
-  }
-
-  @Transition(to = "OFFLINE", from = "ONLINE")
-  public void onBecomeOfflineFromOnline(Message message, NotificationContext context) {
-    logTransition(message);
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/c0b1780d/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAgentStateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAgentStateModelFactory.java b/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAgentStateModelFactory.java
deleted file mode 100644
index a5865ad..0000000
--- a/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAgentStateModelFactory.java
+++ /dev/null
@@ -1,29 +0,0 @@
-package org.apache.helix.monitoring;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.participant.statemachine.StateModelFactory;
-
-public class RiemannAgentStateModelFactory extends StateModelFactory<RiemannAgentStateModel> {
-  @Override
-  public RiemannAgentStateModel createNewStateModel(String partitionName) {
-    return new RiemannAgentStateModel();
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/c0b1780d/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAlertProxy.java
----------------------------------------------------------------------
diff --git a/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAlertProxy.java b/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAlertProxy.java
deleted file mode 100644
index 5fe8ebe..0000000
--- a/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAlertProxy.java
+++ /dev/null
@@ -1,111 +0,0 @@
-package org.apache.helix.monitoring;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.StringWriter;
-import java.nio.charset.Charset;
-import java.util.UUID;
-
-import javax.servlet.ServletException;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.helix.BaseDataAccessor;
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.controller.alert.AlertName;
-import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.model.Message;
-import org.apache.helix.model.Message.MessageType;
-import org.apache.log4j.Logger;
-import org.eclipse.jetty.server.Request;
-import org.eclipse.jetty.server.Server;
-import org.eclipse.jetty.server.handler.AbstractHandler;
-
-/**
- * Accept alerts from local riemann server and forward it to helix-controller
- */
-public class RiemannAlertProxy {
-  private static final Logger LOG = Logger.getLogger(RiemannAlertProxy.class);
-
-  class RiemannAlertProxyHandler extends AbstractHandler {
-    @Override
-    public void handle(String target, Request baseRequest, HttpServletRequest request,
-        HttpServletResponse response) throws IOException, ServletException {
-      // Read content-body
-      InputStream inputStream = request.getInputStream();
-      StringWriter writer = new StringWriter();
-      IOUtils.copy(inputStream, writer, Charset.defaultCharset().toString());
-      String alertNameStr = writer.toString();
-      LOG.info("Handling alert: " + alertNameStr);
-
-      // Send alert message to the controller of cluster being monitored
-      try {
-        AlertName alertName = AlertName.from(alertNameStr);
-        String clusterName = alertName.getScope().getClusterId().stringify();
-        HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseAccessor);
-        PropertyKey.Builder keyBuilder = accessor.keyBuilder();
-        Message message = new Message(MessageType.ALERT, UUID.randomUUID().toString());
-        message.setAttribute(Message.Attributes.ALERT_NAME, alertNameStr);
-        message.setTgtSessionId("*");
-        message.setTgtName("controller");
-        accessor.setProperty(keyBuilder.controllerMessage(message.getId()), message);
-      } catch (Exception e) {
-        LOG.error("Fail to send alert to cluster being monitored: " + alertNameStr, e);
-      }
-
-      // return ok
-      response.setStatus(HttpServletResponse.SC_OK);
-      baseRequest.setHandled(true);
-    }
-  }
-
-  final int _proxyPort;
-  final Server _server;
-  final BaseDataAccessor<ZNRecord> _baseAccessor;
-  final AbstractHandler _handler;
-
-  public RiemannAlertProxy(int proxyPort, BaseDataAccessor<ZNRecord> baseAccessor) {
-    _proxyPort = proxyPort;
-    _server = new Server(proxyPort);
-    _baseAccessor = baseAccessor;
-    _handler = new RiemannAlertProxyHandler();
-  }
-
-  public void start() throws Exception {
-    LOG.info("Starting RiemannAlertProxy on port: " + _proxyPort);
-    _server.setHandler(_handler);
-    _server.start();
-
-  }
-
-  public void shutdown() {
-    try {
-      LOG.info("Stopping RiemannAlertProxy on port: " + _proxyPort);
-      _server.stop();
-    } catch (Exception e) {
-      LOG.error("Fail to stop RiemannAlertProxy", e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/c0b1780d/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannConfigs.java
----------------------------------------------------------------------
diff --git a/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannConfigs.java b/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannConfigs.java
deleted file mode 100644
index 193b763..0000000
--- a/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannConfigs.java
+++ /dev/null
@@ -1,116 +0,0 @@
-package org.apache.helix.monitoring;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.PrintWriter;
-import java.util.List;
-
-import org.apache.helix.model.MonitoringConfig;
-import org.apache.log4j.Logger;
-
-import com.google.common.collect.Lists;
-
-/**
- * Riemann configs
- */
-public class RiemannConfigs {
-  private static final Logger LOG = Logger.getLogger(RiemannConfigs.class);
-  private static final String DEFAULT_CONFIG_DIR = "riemannconfigs";
-  public static final String DEFAULT_RIEMANN_CONFIG = "riemann.config";
-
-  private final String _configDir;
-  private final List<MonitoringConfig> _configs;
-
-  RiemannConfigs(String configDir, List<MonitoringConfig> configs) {
-    _configDir = configDir;
-    _configs = configs;
-  }
-
-  /**
-   * persist configs to riemann config dir
-   */
-  public void persistConfigs() {
-    // create the directory
-    File dir = new File(_configDir);
-    if (!dir.exists()) {
-      dir.mkdir();
-    }
-
-    for (MonitoringConfig config : _configs) {
-      String configData = config.getConfig();
-      String fileName = _configDir + "/" + config.getId();
-      try {
-        PrintWriter writer = new PrintWriter(fileName);
-        writer.println(configData);
-        writer.close();
-
-        // make sure this is cleaned up eventually
-        File file = new File(fileName);
-        file.deleteOnExit();
-      } catch (FileNotFoundException e) {
-        LOG.error("Could not write " + config.getId(), e);
-      }
-    }
-  }
-
-  public String getConfigDir() {
-    return _configDir;
-  }
-
-  public static class Builder {
-    private final List<MonitoringConfig> _configs;
-    private final String _configDir;
-
-    /**
-     * By default, configs will be placed in "{systemTmpDir}/riemannconfigs"
-     */
-    public Builder() {
-      this(System.getProperty("java.io.tmpdir") + "/" + DEFAULT_CONFIG_DIR);
-    }
-
-    public Builder(String configDir) {
-      _configDir = configDir;
-      _configs = Lists.newArrayList();
-    }
-
-    public Builder addConfig(MonitoringConfig monitoringConfig) {
-      _configs.add(monitoringConfig);
-      return this;
-    }
-
-    public Builder addConfigs(List<MonitoringConfig> monitoringConfigs) {
-      _configs.addAll(monitoringConfigs);
-      return this;
-    }
-
-    public RiemannConfigs build() {
-      // Check default riemann config exists
-      for (MonitoringConfig config : _configs) {
-        if (config.getId().equals(DEFAULT_RIEMANN_CONFIG)) {
-          return new RiemannConfigs(_configDir, _configs);
-        }
-      }
-      throw new IllegalArgumentException("Missing default riemann config: "
-          + DEFAULT_RIEMANN_CONFIG);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/c0b1780d/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannMonitoringServer.java
----------------------------------------------------------------------
diff --git a/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannMonitoringServer.java b/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannMonitoringServer.java
deleted file mode 100644
index d4f11a5..0000000
--- a/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannMonitoringServer.java
+++ /dev/null
@@ -1,73 +0,0 @@
-package org.apache.helix.monitoring;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.log4j.Logger;
-
-import clojure.lang.RT;
-import clojure.lang.Symbol;
-
-/**
- * A monitoring server implementation that uses Riemann
- */
-public class RiemannMonitoringServer implements MonitoringServer {
-  private static final Logger LOG = Logger.getLogger(RiemannMonitoringServer.class);
-
-  private volatile boolean _isStarted;
-  private final RiemannConfigs _config;
-
-  /**
-   * Create a monitoring server
-   * @param config
-   */
-  public RiemannMonitoringServer(RiemannConfigs config) {
-    LOG.info("Construct RiemannMonitoringServer with configDir: " + config.getConfigDir());
-    _config = config;
-    config.persistConfigs();
-    _isStarted = false;
-  }
-
-  @Override
-  public synchronized void start() {
-    LOG.info("Starting Riemann server with configDir: " + _config.getConfigDir());
-
-    // start Riemann
-    RT.var("clojure.core", "require").invoke(Symbol.intern("riemann.bin"));
-    RT.var("clojure.core", "require").invoke(Symbol.intern(RiemannConfigs.DEFAULT_RIEMANN_CONFIG));
-    RT.var("riemann.bin", "-main").invoke(_config.getConfigDir());
-    _isStarted = true;
-  }
-
-  @Override
-  public synchronized void stop() {
-    if (!_isStarted) {
-      LOG.error("Tried to stop Riemann when not started!");
-      return;
-    }
-    LOG.info("Stopping Riemann server");
-    RT.var("riemann.config", "stop!").invoke();
-    _isStarted = false;
-  }
-
-  @Override
-  public boolean isStarted() {
-    return _isStarted;
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/c0b1780d/helix-monitor-server/src/main/java/org/apache/helix/monitoring/riemann/HelixAlertMessenger.java
----------------------------------------------------------------------
diff --git a/helix-monitor-server/src/main/java/org/apache/helix/monitoring/riemann/HelixAlertMessenger.java b/helix-monitor-server/src/main/java/org/apache/helix/monitoring/riemann/HelixAlertMessenger.java
new file mode 100644
index 0000000..cbc209a
--- /dev/null
+++ b/helix-monitor-server/src/main/java/org/apache/helix/monitoring/riemann/HelixAlertMessenger.java
@@ -0,0 +1,112 @@
+package org.apache.helix.monitoring.riemann;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.alert.AlertName;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.log4j.Logger;
+
+/**
+ * Accept alerts from local Riemann server and forward it to helix-controller
+ */
+public class HelixAlertMessenger {
+  private static final Logger LOG = Logger.getLogger(HelixAlertMessenger.class);
+  private static final int DEFAULT_MAX_ALERT_COUNT = 1;
+  private static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MINUTES;
+
+  private final ZkClient _zkclient;
+  private final BaseDataAccessor<ZNRecord> _baseAccessor;
+
+  /**
+   * A queue that keeps track of timestamps (in millisecond) of last N alerts sent
+   */
+  private final Queue<Long> _queue;
+  private final int _maxAlertCount;
+  private final TimeUnit _timeUnit;
+
+  public HelixAlertMessenger(String zkHosts) {
+    this(zkHosts, DEFAULT_MAX_ALERT_COUNT, DEFAULT_TIME_UNIT);
+  }
+
+  public HelixAlertMessenger(String zkHosts, int maxAlertCount, TimeUnit timeUnit) {
+    _zkclient =
+        new ZkClient(zkHosts, ZkClient.DEFAULT_SESSION_TIMEOUT,
+            ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
+    _baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkclient);
+    _queue = new LinkedList<Long>();
+    _maxAlertCount = maxAlertCount;
+    _timeUnit = timeUnit;
+  }
+
+  /**
+   * Send alert to helix controller; throttle if necessary
+   * not thread-safe
+   * @param alertNameStr
+   */
+  public void onAlert(String alertNameStr) {
+    LOG.info("Handling alert: " + alertNameStr);
+
+    // throttling
+    long now = System.currentTimeMillis();
+    if (_queue.size() >= _maxAlertCount) {
+      if (_queue.peek() + _timeUnit.toMillis(_maxAlertCount) > now) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Throttling alert: " + alertNameStr);
+        }
+        return;
+      } else {
+        _queue.remove();
+      }
+    }
+
+    // Send alert message to the controller of cluster being monitored
+    try {
+      AlertName alertName = AlertName.from(alertNameStr);
+      String clusterName = alertName.getScope().getClusterId().stringify();
+      HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseAccessor);
+      PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+      Message message = new Message(MessageType.ALERT, UUID.randomUUID().toString());
+      message.setAttribute(Message.Attributes.ALERT_NAME, alertNameStr);
+      message.setTgtSessionId("*");
+      message.setTgtName("controller");
+      accessor.setProperty(keyBuilder.controllerMessage(message.getId()), message);
+
+      // record the timestamp
+      _queue.add(now);
+
+    } catch (Exception e) {
+      LOG.error("Fail to send alert to cluster being monitored: " + alertNameStr, e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/c0b1780d/helix-monitor-server/src/main/java/org/apache/helix/monitoring/riemann/RiemannAgent.java
----------------------------------------------------------------------
diff --git a/helix-monitor-server/src/main/java/org/apache/helix/monitoring/riemann/RiemannAgent.java b/helix-monitor-server/src/main/java/org/apache/helix/monitoring/riemann/RiemannAgent.java
new file mode 100644
index 0000000..3e7efca
--- /dev/null
+++ b/helix-monitor-server/src/main/java/org/apache/helix/monitoring/riemann/RiemannAgent.java
@@ -0,0 +1,169 @@
+package org.apache.helix.monitoring.riemann;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.log4j.Logger;
+
+import com.aphyr.riemann.client.RiemannClient;
+
+/**
+ * Start a Helix participant that joins cluster and represents local Riemann server
+ */
+public class RiemannAgent {
+  private static final Logger LOG = Logger.getLogger(RiemannAgent.class);
+  private static final int HEARTBEAT_PERIOD = 10;
+  private static final int TIMEOUT_LIMIT = 3;
+
+  private final String _zkAddr;
+  private final String _clusterName;
+  private final String _instanceName;
+  private final int _riemannPort;
+  private HelixManager _participant;
+  private final RiemannClient _client;
+  private Thread _reconnectThread;
+
+  public RiemannAgent(String zkAddr, String clusterName, int riemannPort) throws IOException {
+    _zkAddr = zkAddr;
+    _clusterName = clusterName;
+    _instanceName =
+        String.format("%s_%d", InetAddress.getLocalHost().getCanonicalHostName(), riemannPort);
+    _riemannPort = riemannPort;
+    _client = RiemannClient.tcp("localhost", riemannPort);
+  }
+
+  private synchronized boolean doStart() throws Exception {
+    try {
+      _client.connect();
+      _client.event().service("heartbeat").state("running").ttl(TIMEOUT_LIMIT * HEARTBEAT_PERIOD)
+          .sendWithAck();
+      LOG.info("RiemannAgent connected to local riemann server on localhost:" + _riemannPort);
+      _participant =
+          HelixManagerFactory.getZKHelixManager(_clusterName, _instanceName,
+              InstanceType.PARTICIPANT, _zkAddr);
+      _participant.connect();
+
+      // Monitoring Riemann server
+      Random random = new Random();
+      _client.every(HEARTBEAT_PERIOD, random.nextInt(HEARTBEAT_PERIOD), TimeUnit.SECONDS,
+          new Runnable() {
+
+            @Override
+            public void run() {
+              try {
+                // Send heartbeat metrics
+                _client.event().service("heartbeat").state("running")
+                    .ttl(TIMEOUT_LIMIT * HEARTBEAT_PERIOD).sendWithAck();
+                if (_participant == null) {
+                  _participant =
+                      HelixManagerFactory.getZKHelixManager(_clusterName, _instanceName,
+                          InstanceType.PARTICIPANT, _zkAddr);
+                  _participant.connect();
+                }
+              } catch (Exception e) {
+                LOG.error(
+                    "Exception in send heatbeat to local riemann server, shutdown RiemannAgent: "
+                        + _instanceName, e);
+
+                if (_participant != null) {
+                  _participant.disconnect();
+                  _participant = null;
+                }
+              }
+            }
+          });
+
+      return true;
+    } catch (IOException e) {
+      LOG.error("Fail to connect to Riemann server on localhost:" + _riemannPort);
+    }
+    return false;
+  }
+
+  /**
+   * Try connect local Riemann server; if fails, start a thread to retry async
+   * @throws Exception
+   */
+  public synchronized void start() throws Exception {
+    LOG.info("Starting RiemannAgent. zk: " + _zkAddr + ", cluster: " + _clusterName
+        + ", instance: " + _instanceName + ", riemannPort: " + _riemannPort);
+
+    boolean success = doStart();
+    if (!success) {
+      _reconnectThread = new Thread(new Runnable() {
+
+        @Override
+        public void run() {
+          LOG.info("Start reconnect thread");
+          Random random = new Random();
+          try {
+            while (!Thread.currentThread().isInterrupted()) {
+              boolean success = doStart();
+              if (success) {
+                break;
+              }
+
+              TimeUnit.SECONDS.sleep(HEARTBEAT_PERIOD + random.nextInt() % HEARTBEAT_PERIOD);
+
+            }
+          } catch (InterruptedException e) {
+            LOG.info("Reconnect thread is interrupted");
+          } catch (Exception e) {
+            LOG.error("Fail to start RiemannAgent", e);
+          } finally {
+            LOG.info("Terminate reconnect thread");
+          }
+
+        }
+      });
+      _reconnectThread.start();
+    }
+
+  }
+
+  public synchronized void shutdown() {
+    LOG.info("Shutting down RiemannAgent. zk: " + _zkAddr + ", cluster: " + _clusterName
+        + ", instance: " + _instanceName + ", riemannPort: " + _riemannPort);
+
+    if (_reconnectThread != null) {
+      _reconnectThread.interrupt();
+      _reconnectThread = null;
+    }
+
+    try {
+      _client.scheduler().shutdown();
+      _client.disconnect();
+    } catch (IOException e) {
+      LOG.error("Exception in disconnect riemann client", e);
+    }
+
+    if (_participant != null) {
+      _participant.disconnect();
+      _participant = null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/c0b1780d/helix-monitor-server/src/main/java/org/apache/helix/monitoring/riemann/RiemannConfigs.java
----------------------------------------------------------------------
diff --git a/helix-monitor-server/src/main/java/org/apache/helix/monitoring/riemann/RiemannConfigs.java b/helix-monitor-server/src/main/java/org/apache/helix/monitoring/riemann/RiemannConfigs.java
new file mode 100644
index 0000000..43b957e
--- /dev/null
+++ b/helix-monitor-server/src/main/java/org/apache/helix/monitoring/riemann/RiemannConfigs.java
@@ -0,0 +1,116 @@
+package org.apache.helix.monitoring.riemann;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.PrintWriter;
+import java.util.List;
+
+import org.apache.helix.model.MonitoringConfig;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Riemann configs
+ */
+public class RiemannConfigs {
+  private static final Logger LOG = Logger.getLogger(RiemannConfigs.class);
+  public static final String DEFAULT_CONFIG_DIR = "riemannconfigs";
+  public static final String DEFAULT_RIEMANN_CONFIG = "riemann.config";
+
+  private final String _configDir;
+  private final List<MonitoringConfig> _configs;
+
+  RiemannConfigs(String configDir, List<MonitoringConfig> configs) {
+    _configDir = configDir;
+    _configs = configs;
+  }
+
+  /**
+   * persist configs to riemann config dir
+   */
+  public void persistConfigs() {
+    // create the directory
+    File dir = new File(_configDir);
+    if (!dir.exists()) {
+      dir.mkdir();
+    }
+
+    for (MonitoringConfig config : _configs) {
+      String configData = config.getConfig();
+      String fileName = _configDir + "/" + config.getId();
+      try {
+        PrintWriter writer = new PrintWriter(fileName);
+        writer.println(configData);
+        writer.close();
+
+        // make sure this is cleaned up eventually
+        File file = new File(fileName);
+        file.deleteOnExit();
+      } catch (FileNotFoundException e) {
+        LOG.error("Could not write " + config.getId(), e);
+      }
+    }
+  }
+
+  public String getConfigDir() {
+    return _configDir;
+  }
+
+  public static class Builder {
+    private final List<MonitoringConfig> _configs;
+    private final String _configDir;
+
+    /**
+     * By default, configs will be placed in "{systemTmpDir}/riemannconfigs"
+     */
+    public Builder() {
+      this(System.getProperty("java.io.tmpdir") + "/" + DEFAULT_CONFIG_DIR);
+    }
+
+    public Builder(String configDir) {
+      _configDir = configDir;
+      _configs = Lists.newArrayList();
+    }
+
+    public Builder addConfig(MonitoringConfig monitoringConfig) {
+      _configs.add(monitoringConfig);
+      return this;
+    }
+
+    public Builder addConfigs(List<MonitoringConfig> monitoringConfigs) {
+      _configs.addAll(monitoringConfigs);
+      return this;
+    }
+
+    public RiemannConfigs build() {
+      // Check default riemann config exists
+      for (MonitoringConfig config : _configs) {
+        if (config.getId().equals(DEFAULT_RIEMANN_CONFIG)) {
+          return new RiemannConfigs(_configDir, _configs);
+        }
+      }
+      throw new IllegalArgumentException("Missing default riemann config: "
+          + DEFAULT_RIEMANN_CONFIG);
+    }
+  }
+}