You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/01/18 03:02:42 UTC

[1/2] git commit: [HELIX-317] Create a monitoring client API

Updated Branches:
  refs/heads/helix-monitoring 77f14b4a4 -> b1df294ba


[HELIX-317] Create a monitoring client API


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/902e6fa2
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/902e6fa2
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/902e6fa2

Branch: refs/heads/helix-monitoring
Commit: 902e6fa220ba9d1148fc70b4f39b52430758f719
Parents: 77f14b4
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Tue Dec 10 13:35:32 2013 -0800
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Tue Dec 10 13:35:32 2013 -0800

----------------------------------------------------------------------
 .../java/org/apache/helix/HelixConnection.java  |   1 -
 .../helix/manager/zk/ZkHelixConnection.java     |   4 +
 .../helix/monitoring/MonitoringClient.java      |  85 +++++
 .../helix/monitoring/MonitoringClientOwner.java |  37 ++
 .../helix/monitoring/MonitoringEvent.java       | 298 +++++++++++++++
 helix-monitor-client/DISCLAIMER                 |  15 +
 helix-monitor-client/LICENSE                    | 273 ++++++++++++++
 helix-monitor-client/NOTICE                     |  30 ++
 helix-monitor-client/pom.xml                    | 113 ++++++
 helix-monitor-client/src/assemble/assembly.xml  |  60 +++
 .../src/main/config/log4j.properties            |  31 ++
 .../monitoring/RiemannMonitoringClient.java     | 376 +++++++++++++++++++
 helix-monitor-client/src/test/conf/testng.xml   |  27 ++
 pom.xml                                         |   1 +
 14 files changed, 1350 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/902e6fa2/helix-core/src/main/java/org/apache/helix/HelixConnection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixConnection.java b/helix-core/src/main/java/org/apache/helix/HelixConnection.java
index 0e674d2..ca98767 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixConnection.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixConnection.java
@@ -25,7 +25,6 @@ import org.apache.helix.api.accessor.ResourceAccessor;
 import org.apache.helix.api.id.ClusterId;
 import org.apache.helix.api.id.ControllerId;
 import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.SessionId;
 import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
 import org.apache.helix.store.HelixPropertyStore;

http://git-wip-us.apache.org/repos/asf/helix/blob/902e6fa2/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixConnection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixConnection.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixConnection.java
index 1bdc54c..d0aa2c7 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixConnection.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixConnection.java
@@ -69,6 +69,7 @@ import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.SessionId;
 import org.apache.helix.messaging.DefaultMessagingService;
 import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
+import org.apache.helix.monitoring.MonitoringClient;
 import org.apache.helix.store.HelixPropertyStore;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.log4j.Logger;
@@ -105,6 +106,8 @@ public class ZkHelixConnection implements HelixConnection, IZkStateListener {
    * helix version#
    */
   final String _version;
+  
+  private MonitoringClient _monitoringClient;
 
   public ZkHelixConnection(String zkAddr) {
     _zkAddr = zkAddr;
@@ -127,6 +130,7 @@ public class ZkHelixConnection implements HelixConnection, IZkStateListener {
     _properties = new HelixManagerProperties("cluster-manager-version.properties");
     _version = _properties.getVersion();
 
+    _monitoringClient = null;
   }
 
   private int getSystemPropertyAsInt(String propertyKey, int propertyDefaultValue) {

http://git-wip-us.apache.org/repos/asf/helix/blob/902e6fa2/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringClient.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringClient.java b/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringClient.java
new file mode 100644
index 0000000..a794eab
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringClient.java
@@ -0,0 +1,85 @@
+package org.apache.helix.monitoring;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Interface for a client that can register with a monitoring server and send events for monitoring
+ */
+public interface MonitoringClient {
+  /**
+   * Connect. May be asynchronous.
+   */
+  void connect();
+
+  /**
+   * Disconnect synchronously.
+   */
+  void disconnect();
+
+  /**
+   * Send an event
+   * @param e the event
+   * @param batch true if this should be part of a batch operation
+   * @return true if the event was sent (or queued for batching), false otherwise
+   */
+  boolean send(MonitoringEvent e, boolean batch);
+
+  /**
+   * Send an event and flush any outstanding messages
+   * @param e the event
+   * @return true if events were successfully sent, false otherwise
+   */
+  boolean sendAndFlush(MonitoringEvent e);
+
+  /**
+   * Schedule an operation to run
+   * @param interval the frequency
+   * @param delay the amount of time to wait before the first execution
+   * @param unit the unit of time to use
+   * @param r the code to run
+   */
+  void every(long interval, long delay, TimeUnit unit, Runnable r);
+
+  /**
+   * Check if there is a valid connection to a monitoring server
+   * @return true if connected, false otherwise
+   */
+  boolean isConnected();
+
+  /**
+   * Check if batching is being used
+   * @return true if enabled, false otherwise
+   */
+  boolean isBatchingEnabled();
+
+  /**
+   * Check the number of events sent as a batch
+   * @return the batch size, or 1 if batching is not used
+   */
+  int getBatchSize();
+
+  /**
+   * Flush all outstanding events
+   * @return true if the events were flushed, false otherwise
+   */
+  boolean flush();
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/902e6fa2/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringClientOwner.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringClientOwner.java b/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringClientOwner.java
new file mode 100644
index 0000000..c623ab1
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringClientOwner.java
@@ -0,0 +1,37 @@
+package org.apache.helix.monitoring;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+public interface MonitoringClientOwner {
+
+  /**
+   * Register a monitoring client that can be used to report statistics.
+   * This will connect a monitoring client. If an existing client is connected, it will be
+   * disconnected.
+   * @param monitoringClient a reference to an instantiated client
+   */
+  void registerMonitoringClient(MonitoringClient monitoringClient);
+
+  /**
+   * Get the registered monitoring client
+   * @return a MonitoringClient object, or null
+   */
+  MonitoringClient getMonitoringClient();
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/902e6fa2/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringEvent.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringEvent.java b/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringEvent.java
new file mode 100644
index 0000000..3735589
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringEvent.java
@@ -0,0 +1,298 @@
+package org.apache.helix.monitoring;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ControllerId;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.api.id.SpectatorId;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * A generic monitoring event based on Helix constructs. This is based on Riemann's EventDSL.
+ */
+public class MonitoringEvent {
+  private ClusterId _clusterId;
+  private ResourceId _resourceId;
+  private PartitionId _partitionId;
+  private String _host;
+  private String _eventState;
+  private String _description;
+  private Long _time;
+  private Long _longMetric;
+  private Float _floatMetric;
+  private Double _doubleMetric;
+  private Float _ttl;
+  private List<String> _tags;
+  private Map<String, String> _attributes;
+
+  /**
+   * Create an empty MonitoringEvent
+   */
+  public MonitoringEvent() {
+    _clusterId = null;
+    _host = null;
+    _resourceId = null;
+    _partitionId = null;
+    _eventState = null;
+    _description = null;
+    _time = null;
+    _longMetric = null;
+    _floatMetric = null;
+    _doubleMetric = null;
+    _ttl = null;
+    _tags = Lists.newLinkedList();
+    _attributes = Maps.newHashMap();
+  }
+
+  /**
+   * Set the cluster this event corresponds to
+   * @param clusterId the cluster id
+   * @return MonitoringEvent
+   */
+  public MonitoringEvent cluster(ClusterId clusterId) {
+    _clusterId = clusterId;
+    return this;
+  }
+
+  /**
+   * Set the participant this event corresponds to
+   * @param participantId the participant id
+   * @return MonitoringEvent
+   */
+  public MonitoringEvent participant(ParticipantId participantId) {
+    _host = participantId.stringify();
+    return this;
+  }
+
+  /**
+   * Set the spectator this event corresponds to
+   * @param spectatorId the spectator id
+   * @return MonitoringEvent
+   */
+  public MonitoringEvent spectator(SpectatorId spectatorId) {
+    _host = spectatorId.stringify();
+    return this;
+  }
+
+  /**
+   * Set the controller this event corresponds to
+   * @param controllerId the controller id
+   * @return MonitoringEvent
+   */
+  public MonitoringEvent controller(ControllerId controllerId) {
+    _host = controllerId.stringify();
+    return this;
+  }
+
+  /**
+   * Set the resource this event corresponds to
+   * @param resourceId the resource id
+   * @return MonitoringEvent
+   */
+  public MonitoringEvent resource(ResourceId resourceId) {
+    _resourceId = resourceId;
+    return this;
+  }
+
+  /**
+   * Set the partition this event corresponds to
+   * @param partitionId the partition id
+   * @return MonitoringEvent
+   */
+  public MonitoringEvent partition(PartitionId partitionId) {
+    _partitionId = partitionId;
+    return this;
+  }
+
+  /**
+   * Set the state of the metric
+   * @param eventState the event state (e.g. "OK", "Failing", etc)
+   * @return MonitoringEvent
+   */
+  public MonitoringEvent eventState(String eventState) {
+    _eventState = eventState;
+    return this;
+  }
+
+  /**
+   * Give the event a description
+   * @param description descriptive text
+   * @return MonitoringEvent
+   */
+  public MonitoringEvent description(String description) {
+    _description = description;
+    return this;
+  }
+
+  /**
+   * Set the time that the event occurred
+   * @param time long UNIX timestamp
+   * @return MonitoringEvent
+   */
+  public MonitoringEvent time(long time) {
+    _time = time;
+    return this;
+  }
+
+  /**
+   * Give the event a long metric
+   * @param metric the metric (the measured quantity)
+   * @return MonitoringEvent
+   */
+  public MonitoringEvent metric(long metric) {
+    _longMetric = metric;
+    return this;
+  }
+
+  /**
+   * Give the event a float metric
+   * @param metric the metric (the measured quantity)
+   * @return MonitoringEvent
+   */
+  public MonitoringEvent metric(float metric) {
+    _floatMetric = metric;
+    return this;
+  }
+
+  /**
+   * Give the event a double metric
+   * @param metric the metric (the measured quantity)
+   * @return MonitoringEvent
+   */
+  public MonitoringEvent metric(double metric) {
+    _doubleMetric = metric;
+    return this;
+  }
+
+  /**
+   * Give the time before the event will expire
+   * @param ttl time to live
+   * @return MonitoringEvent
+   */
+  public MonitoringEvent ttl(float ttl) {
+    _ttl = ttl;
+    return this;
+  }
+
+  /**
+   * Add a tag to the event
+   * @param tag arbitrary string
+   * @return MonitoringEvent
+   */
+  public MonitoringEvent tag(String tag) {
+    _tags.add(tag);
+    return this;
+  }
+
+  /**
+   * Add multiple tags to an event
+   * @param tags a collection of tags
+   * @return MonitoringEvent
+   */
+  public MonitoringEvent tags(Collection<String> tags) {
+    _tags.addAll(tags);
+    return this;
+  }
+
+  /**
+   * Add an attribute (a key-value pair)
+   * @param name the attribute name
+   * @param value the attribute value
+   * @return MonitoringEvent
+   */
+  public MonitoringEvent attribute(String name, String value) {
+    _attributes.put(name, value);
+    return this;
+  }
+
+  /**
+   * Add multiple attributes
+   * @param attributes map of attribute name to value
+   * @return MonitoringEvent
+   */
+  public MonitoringEvent attributes(Map<String, String> attributes) {
+    _attributes.putAll(attributes);
+    return this;
+  }
+
+  // below are a set of package-private getters for each of the fields
+
+  String host() {
+    return _host;
+  }
+
+  String service() {
+    if (_clusterId == null) {
+      _clusterId = ClusterId.from("%");
+    }
+    if (_resourceId == null) {
+      _resourceId = ResourceId.from("%");
+    }
+    if (_partitionId == null) {
+      _partitionId = PartitionId.from("%");
+    }
+    return String.format("%s|%s|%s", _clusterId, _resourceId, _partitionId);
+  }
+
+  String eventState() {
+    return _eventState;
+  }
+
+  String description() {
+    return _description;
+  }
+
+  Long time() {
+    return _time;
+  }
+
+  Long longMetric() {
+    return _longMetric;
+  }
+
+  Float floatMetric() {
+    return _floatMetric;
+  }
+
+  Double doubleMetric() {
+    return _doubleMetric;
+  }
+
+  Float ttl() {
+    return _ttl;
+  }
+
+  List<String> tags() {
+    return _tags;
+  }
+
+  Map<String, String> attributes() {
+    return _attributes;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/902e6fa2/helix-monitor-client/DISCLAIMER
----------------------------------------------------------------------
diff --git a/helix-monitor-client/DISCLAIMER b/helix-monitor-client/DISCLAIMER
new file mode 100644
index 0000000..2001d31
--- /dev/null
+++ b/helix-monitor-client/DISCLAIMER
@@ -0,0 +1,15 @@
+Apache Helix is an effort undergoing incubation at the Apache Software 
+Foundation (ASF), sponsored by the Apache Incubator PMC. 
+
+Incubation is required of all newly accepted projects until a further review 
+indicates that the infrastructure, communications, and decision making process 
+have stabilized in a manner consistent with other successful ASF projects. 
+
+While incubation status is not necessarily a reflection of the completeness 
+or stability of the code, it does indicate that the project has yet to be 
+fully endorsed by the ASF.
+
+For more information about the incubation status of the Apache Helix project you
+can go to the following page:
+
+http://incubator.apache.org/projects/helix.html

http://git-wip-us.apache.org/repos/asf/helix/blob/902e6fa2/helix-monitor-client/LICENSE
----------------------------------------------------------------------
diff --git a/helix-monitor-client/LICENSE b/helix-monitor-client/LICENSE
new file mode 100644
index 0000000..413913f
--- /dev/null
+++ b/helix-monitor-client/LICENSE
@@ -0,0 +1,273 @@
+
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "[]"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright [yyyy] [name of copyright owner]
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+
+
+
+For xstream:
+
+Copyright (c) 2003-2006, Joe Walnes
+Copyright (c) 2006-2009, 2011 XStream Committers
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+
+1. Redistributions of source code must retain the above copyright notice, this list of
+conditions and the following disclaimer.
+
+2. Redistributions in binary form must reproduce the above copyright notice, this list of
+conditions and the following disclaimer in the documentation and/or other materials provided
+with the distribution.
+
+3. Neither the name of XStream nor the names of its contributors may be used to endorse
+or promote products derived from this software without specific prior written
+permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
+EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
+SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
+INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
+TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY
+WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
+DAMAGE.
+
+for jline:
+
+Copyright (c) 2002-2006, Marc Prud'hommeaux <mw...@cornell.edu>
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or
+without modification, are permitted provided that the following
+conditions are met:
+
+Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+
+Redistributions in binary form must reproduce the above copyright
+notice, this list of conditions and the following disclaimer
+in the documentation and/or other materials provided with
+the distribution.
+
+Neither the name of JLine nor the names of its contributors
+may be used to endorse or promote products derived from this
+software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING,
+BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
+AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
+EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE
+FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY,
+OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
+IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
+OF THE POSSIBILITY OF SUCH DAMAGE.
+
+
+

http://git-wip-us.apache.org/repos/asf/helix/blob/902e6fa2/helix-monitor-client/NOTICE
----------------------------------------------------------------------
diff --git a/helix-monitor-client/NOTICE b/helix-monitor-client/NOTICE
new file mode 100644
index 0000000..e070e15
--- /dev/null
+++ b/helix-monitor-client/NOTICE
@@ -0,0 +1,30 @@
+Apache Helix
+Copyright 2012 The Apache Software Foundation
+
+
+I. Included Software
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+Licensed under the Apache License 2.0.
+
+This product includes software developed at
+Codehaus (http://www.codehaus.org/)
+Licensed under the BSD License.
+
+This product includes software developed at
+jline (http://jline.sourceforge.net/ )
+Licensed under the BSD License.
+
+This product includes software developed at
+josql (http://sourceforge.net/projects/josql/).
+Licensed under the Apache License 2.0.
+
+This product includes software developed at
+restlet (http://www.restlet.org/about/legal).
+Licensed under the Apache License 2.0.
+
+
+II. License Summary
+- Apache License 2.0
+- BSD License

http://git-wip-us.apache.org/repos/asf/helix/blob/902e6fa2/helix-monitor-client/pom.xml
----------------------------------------------------------------------
diff --git a/helix-monitor-client/pom.xml b/helix-monitor-client/pom.xml
new file mode 100644
index 0000000..5857a59
--- /dev/null
+++ b/helix-monitor-client/pom.xml
@@ -0,0 +1,113 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <parent>
+    <groupId>org.apache.helix</groupId>
+    <artifactId>helix</artifactId>
+    <version>0.7.1-incubating-SNAPSHOT</version>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>helix-monitor-client</artifactId>
+  <packaging>jar</packaging>
+  <name>Apache Helix :: Helix Monitoring Client</name>
+
+  <properties>
+    <osgi.import>
+      org.apache.helix*,
+      org.apache.log4j,
+      *
+    </osgi.import>
+    <osgi.export>org.apache.helix.monitoring*;version="${project.version};-noimport:=true</osgi.export>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.helix</groupId>
+      <artifactId>helix-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.aphyr</groupId>
+      <artifactId>riemann-java-client</artifactId>
+      <version>0.2.8</version>
+    </dependency>
+    <dependency>
+      <groupId>org.testng</groupId>
+      <artifactId>testng</artifactId>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>junit</groupId>
+          <artifactId>junit</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.helix</groupId>
+      <artifactId>helix-core</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+  <build>
+    <resources>
+      <resource>
+        <directory>${basedir}/src/main/resources</directory>
+        <filtering>true</filtering>
+      </resource>
+      <resource>
+        <directory>${basedir}</directory>
+        <includes>
+          <include>DISCLAIMER</include>
+        </includes>
+      </resource>
+    </resources>
+    <plugins>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>appassembler-maven-plugin</artifactId>
+        <configuration>
+          <platforms>
+            <platform>windows</platform>
+            <platform>unix</platform>
+          </platforms>
+          <programs />
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-assembly-plugin</artifactId>
+        <configuration>
+          <descriptors>
+            <descriptor>src/assemble/assembly.xml</descriptor>
+          </descriptors>
+        </configuration>
+        <executions>
+          <execution>
+            <phase>package</phase>
+            <goals>
+              <goal>single</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/helix/blob/902e6fa2/helix-monitor-client/src/assemble/assembly.xml
----------------------------------------------------------------------
diff --git a/helix-monitor-client/src/assemble/assembly.xml b/helix-monitor-client/src/assemble/assembly.xml
new file mode 100644
index 0000000..c2d08a1
--- /dev/null
+++ b/helix-monitor-client/src/assemble/assembly.xml
@@ -0,0 +1,60 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<assembly>
+  <id>pkg</id>
+  <formats>
+    <format>tar</format>
+  </formats>
+  <fileSets>
+    <fileSet>
+      <directory>${project.build.directory}/${project.artifactId}-pkg/bin</directory>
+      <outputDirectory>bin</outputDirectory>
+      <lineEnding>unix</lineEnding>
+      <fileMode>0755</fileMode>
+      <directoryMode>0755</directoryMode>
+    </fileSet>
+    <fileSet>
+      <directory>${project.build.directory}/${project.artifactId}-pkg/repo/</directory>
+      <outputDirectory>repo</outputDirectory>
+      <fileMode>0755</fileMode>
+      <directoryMode>0755</directoryMode>
+      <excludes>
+        <exclude>**/*.xml</exclude>
+      </excludes>
+    </fileSet>
+     <fileSet>
+      <directory>${project.build.directory}/${project.artifactId}-pkg/conf</directory>
+      <outputDirectory>conf</outputDirectory>
+      <lineEnding>unix</lineEnding>
+      <fileMode>0755</fileMode>
+      <directoryMode>0755</directoryMode>
+    </fileSet>
+    <fileSet>
+      <directory>${project.basedir}</directory>
+      <outputDirectory>/</outputDirectory>
+      <includes>
+        <include>LICENSE</include>
+        <include>NOTICE</include>
+        <include>DISCLAIMER</include>
+      </includes>
+      <fileMode>0755</fileMode>
+    </fileSet>
+  </fileSets>
+</assembly>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/helix/blob/902e6fa2/helix-monitor-client/src/main/config/log4j.properties
----------------------------------------------------------------------
diff --git a/helix-monitor-client/src/main/config/log4j.properties b/helix-monitor-client/src/main/config/log4j.properties
new file mode 100644
index 0000000..4b3dc31
--- /dev/null
+++ b/helix-monitor-client/src/main/config/log4j.properties
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Set root logger level to DEBUG and its only appender to A1.
+log4j.rootLogger=ERROR,A1
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+log4j.logger.org.I0Itec=ERROR
+log4j.logger.org.apache=ERROR

http://git-wip-us.apache.org/repos/asf/helix/blob/902e6fa2/helix-monitor-client/src/main/java/org/apache/helix/monitoring/RiemannMonitoringClient.java
----------------------------------------------------------------------
diff --git a/helix-monitor-client/src/main/java/org/apache/helix/monitoring/RiemannMonitoringClient.java b/helix-monitor-client/src/main/java/org/apache/helix/monitoring/RiemannMonitoringClient.java
new file mode 100644
index 0000000..1948308
--- /dev/null
+++ b/helix-monitor-client/src/main/java/org/apache/helix/monitoring/RiemannMonitoringClient.java
@@ -0,0 +1,376 @@
+package org.apache.helix.monitoring;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.I0Itec.zkclient.IZkDataListener;
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.model.Leader;
+import org.apache.log4j.Logger;
+
+import com.aphyr.riemann.client.AbstractRiemannClient;
+import com.aphyr.riemann.client.EventDSL;
+import com.aphyr.riemann.client.RiemannBatchClient;
+import com.aphyr.riemann.client.RiemannClient;
+import com.aphyr.riemann.client.UnsupportedJVMException;
+import com.google.common.collect.Lists;
+
+/**
+ * A Riemann-based monitoring client
+ * Thread safety note: connect and disconnect are serialized to ensure that there
+ * is no attempt to connect or disconnect with an inconsistent state. The send routines are not
+ * protected for performance reasons, and so a single send/flush may fail.
+ */
+public class RiemannMonitoringClient implements MonitoringClient {
+  private static final Logger LOG = Logger.getLogger(RiemannMonitoringClient.class);
+  private String _host;
+  private int _port;
+  private int _batchSize;
+  private RiemannClient _client;
+  private RiemannBatchClient _batchClient;
+  private List<ScheduledItem> _scheduledItems;
+  private HelixDataAccessor _accessor;
+  private IZkDataListener _leaderListener;
+
+  /**
+   * Create a non-batched monitoring client
+   * @param clusterId the cluster to monitor
+   * @param accessor an accessor for the cluster
+   */
+  public RiemannMonitoringClient(ClusterId clusterId, HelixDataAccessor accessor) {
+    this(clusterId, accessor, 1);
+  }
+
+  /**
+   * Create a monitoring client that supports batching
+   * @param clusterId the cluster to monitor
+   * @param accessor an accessor for the cluster
+   * @param batchSize the number of events in a batch
+   */
+  public RiemannMonitoringClient(ClusterId clusterId, HelixDataAccessor accessor, int batchSize) {
+    _host = null;
+    _port = -1;
+    _batchSize = batchSize > 0 ? batchSize : 1;
+    _client = null;
+    _batchClient = null;
+    _accessor = accessor;
+    _scheduledItems = Lists.newLinkedList();
+    _leaderListener = getLeaderListener();
+  }
+
+  @Override
+  public void connect() {
+    if (isConnected()) {
+      LOG.error("Already connected to Riemann!");
+      return;
+    }
+
+    // watch for changes
+    changeLeaderSubscription(true);
+
+    // do the connect asynchronously as a tcp establishment could take time
+    Leader leader = _accessor.getProperty(_accessor.keyBuilder().controllerLeader());
+    doConnectAsync(leader);
+  }
+
+  @Override
+  public void disconnect() {
+    changeLeaderSubscription(false);
+    disconnectInternal();
+  }
+
+  @Override
+  public boolean isConnected() {
+    return _client != null && _client.isConnected();
+  }
+
+  @Override
+  public boolean flush() {
+    if (!isConnected()) {
+      LOG.error("Tried to flush a Riemann client that is not connected!");
+      return false;
+    }
+    AbstractRiemannClient c = getClient(true);
+    try {
+      c.flush();
+      return true;
+    } catch (IOException e) {
+      LOG.error("Problem flushing the Riemann event queue!", e);
+    }
+    return false;
+  }
+
+  @Override
+  public boolean send(MonitoringEvent event, boolean batch) {
+    if (!isConnected()) {
+      LOG.error("Riemann connection must be active in order to send an event!");
+      return false;
+    }
+    AbstractRiemannClient c = getClient(batch);
+    convertEvent(c, event).send();
+    return true;
+  }
+
+  @Override
+  public boolean sendAndFlush(MonitoringEvent event) {
+    boolean sendResult = send(event, true);
+    if (sendResult) {
+      return flush();
+    }
+    return false;
+  }
+
+  @Override
+  public boolean isBatchingEnabled() {
+    return _batchClient != null && _batchClient.isConnected();
+  }
+
+  @Override
+  public int getBatchSize() {
+    return _batchSize;
+  }
+
+  @Override
+  public void every(long interval, long delay, TimeUnit unit, Runnable r) {
+    ScheduledItem scheduledItem = new ScheduledItem();
+    scheduledItem.interval = interval;
+    scheduledItem.delay = delay;
+    scheduledItem.unit = unit;
+    scheduledItem.r = r;
+    _scheduledItems.add(scheduledItem);
+    if (isConnected()) {
+      getClient().every(interval, delay, unit, r);
+    }
+  }
+
+  /**
+   * Get a raw, non-batched Riemann client.
+   * WARNING: do not cache this, as it may be disconnected without notice
+   * @return RiemannClient
+   */
+  private RiemannClient getClient() {
+    return _client;
+  }
+
+  /**
+   * Get a batched Riemann client (if batching is supported)
+   * WARNING: do not cache this, as it may be disconnected without notice
+   * @return RiemannBatchClient
+   */
+  private RiemannBatchClient getBatchClient() {
+    return _batchClient;
+  }
+
+  /**
+   * Get a Riemann client
+   * WARNING: do not cache this, as it may be disconnected without notice
+   * @param batch true if the client is preferred to support batching, false otherwise
+   * @return AbstractRiemannClient
+   */
+  private AbstractRiemannClient getClient(boolean batch) {
+    if (batch && isBatchingEnabled()) {
+      return getBatchClient();
+    } else {
+      return getClient();
+    }
+  }
+
+  /**
+   * Based on the contents of the leader node, connect to a Riemann server
+   * @param leader node containing host/port
+   */
+  private void doConnectAsync(final Leader leader) {
+    new Thread() {
+      @Override
+      public void run() {
+        synchronized (RiemannMonitoringClient.this) {
+          // only connect if the leader is available; otherwise it will be picked up by the callback
+          if (leader != null) {
+            _host = leader.getMonitoringHost();
+            _port = leader.getMonitoringPort();
+          }
+          // connect if there's a valid host and port
+          if (_host != null && _port != -1) {
+            connectInternal(_host, _port);
+          }
+        }
+      }
+    }.start();
+  }
+
+  /**
+   * Establishment of a connection to a Riemann server
+   * @param host monitoring server hostname
+   * @param port monitoring server port
+   */
+  private synchronized void connectInternal(String host, int port) {
+    disconnectInternal();
+    try {
+      _client = RiemannClient.tcp(host, port);
+      _client.connect();
+      // we might have to reschedule tasks
+      for (ScheduledItem item : _scheduledItems) {
+        _client.every(item.interval, item.delay, item.unit, item.r);
+      }
+    } catch (IOException e) {
+      LOG.error("Error establishing a connection!", e);
+    }
+    if (_client != null && getBatchSize() > 1) {
+      try {
+        _batchClient = new RiemannBatchClient(_batchSize, _client);
+      } catch (UnknownHostException e) {
+        _batchSize = 1;
+        LOG.error("Could not resolve host", e);
+      } catch (UnsupportedJVMException e) {
+        _batchSize = 1;
+        LOG.warn("Batching not enabled because of incompatible JVM", e);
+      }
+    }
+  }
+
+  /**
+   * Teardown of a connection to a Riemann server
+   */
+  private synchronized void disconnectInternal() {
+    try {
+      if (_batchClient != null && _batchClient.isConnected()) {
+        _batchClient.disconnect();
+      } else if (_client != null && _client.isConnected()) {
+        _client.disconnect();
+      }
+    } catch (IOException e) {
+      LOG.error("Disconnection error", e);
+    }
+    _batchClient = null;
+    _client = null;
+  }
+
+  /**
+   * Change the subscription status to the Leader node
+   * @param subscribe true to subscribe, false to unsubscribe
+   */
+  private void changeLeaderSubscription(boolean subscribe) {
+    String leaderPath = _accessor.keyBuilder().controllerLeader().getPath();
+    BaseDataAccessor<ZNRecord> baseAccessor = _accessor.getBaseDataAccessor();
+    if (subscribe) {
+      baseAccessor.subscribeDataChanges(leaderPath, _leaderListener);
+    } else {
+      baseAccessor.unsubscribeDataChanges(leaderPath, _leaderListener);
+    }
+  }
+
+  /**
+   * Get callbacks for when the leader changes
+   * @return implemented IZkDataListener
+   */
+  private IZkDataListener getLeaderListener() {
+    return new IZkDataListener() {
+      @Override
+      public void handleDataChange(String dataPath, Object data) throws Exception {
+        Leader leader = new Leader((ZNRecord) data);
+        doConnectAsync(leader);
+      }
+
+      @Override
+      public void handleDataDeleted(String dataPath) throws Exception {
+        disconnectInternal();
+      }
+    };
+  }
+
+  /**
+   * Change a helix event into a Riemann event
+   * @param c Riemann client
+   * @param helixEvent helix event
+   * @return Riemann EventDSL
+   */
+  private EventDSL convertEvent(AbstractRiemannClient c, MonitoringEvent helixEvent) {
+    EventDSL event = c.event();
+    if (helixEvent.host() != null) {
+      event = event.host(helixEvent.host());
+    }
+    if (helixEvent.service() != null) {
+      event = event.service(helixEvent.service());
+    }
+    if (helixEvent.eventState() != null) {
+      event = event.state(helixEvent.eventState());
+    }
+    if (helixEvent.description() != null) {
+      event = event.description(helixEvent.description());
+    }
+    if (helixEvent.time() != null) {
+      event = event.time(helixEvent.time());
+    }
+    if (helixEvent.ttl() != null) {
+      event = event.ttl(helixEvent.ttl());
+    }
+    if (helixEvent.longMetric() != null) {
+      event = event.metric(helixEvent.longMetric());
+    } else if (helixEvent.floatMetric() != null) {
+      event = event.metric(helixEvent.floatMetric());
+    } else if (helixEvent.doubleMetric() != null) {
+      event = event.metric(helixEvent.doubleMetric());
+    }
+    if (!helixEvent.tags().isEmpty()) {
+      event = event.tags(helixEvent.tags());
+    }
+    if (!helixEvent.attributes().isEmpty()) {
+      event = event.attributes(helixEvent.attributes());
+    }
+    return event;
+  }
+
+  /**
+   * Wrapper for a task that should be run to a schedule
+   */
+  private static class ScheduledItem {
+    long interval;
+    long delay;
+    TimeUnit unit;
+    Runnable r;
+
+    @Override
+    public boolean equals(Object other) {
+      if (other instanceof ScheduledItem) {
+        ScheduledItem that = (ScheduledItem) other;
+        return interval == that.interval && delay == that.delay && unit == that.unit && r == that.r;
+      }
+      return false;
+    }
+
+    @Override
+    public int hashCode() {
+      return toString().hashCode();
+    }
+
+    @Override
+    public String toString() {
+      return String.format("interval: %d|delay: %d|timeunit: %s|runnable: %s", interval, delay,
+          unit.toString(), r.toString());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/902e6fa2/helix-monitor-client/src/test/conf/testng.xml
----------------------------------------------------------------------
diff --git a/helix-monitor-client/src/test/conf/testng.xml b/helix-monitor-client/src/test/conf/testng.xml
new file mode 100644
index 0000000..90910aa
--- /dev/null
+++ b/helix-monitor-client/src/test/conf/testng.xml
@@ -0,0 +1,27 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<!DOCTYPE suite SYSTEM "http://testng.org/testng-1.0.dtd">
+<suite name="Suite" parallel="none">
+  <test name="Test" preserve-order="false">
+    <packages>
+      <package name="org.apache.helix.monitoring"/>
+    </packages>
+  </test>
+</suite>

http://git-wip-us.apache.org/repos/asf/helix/blob/902e6fa2/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index c220e2b..e7309a0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -199,6 +199,7 @@ under the License.
     <module>helix-agent</module>
     <module>helix-examples</module>
     <module>helix-monitor-server</module>
+    <module>helix-monitor-client</module>
     <module>recipes</module>
     <module>site-releases</module>
   </modules>


[2/2] git commit: A basic alerting example

Posted by ka...@apache.org.
A basic alerting example


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/b1df294b
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/b1df294b
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/b1df294b

Branch: refs/heads/helix-monitoring
Commit: b1df294baa798f88898029d94f2fe8d165c91ebc
Parents: 902e6fa
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Fri Jan 17 18:02:27 2014 -0800
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Fri Jan 17 18:02:27 2014 -0800

----------------------------------------------------------------------
 .../helix/monitoring/MonitoringEvent.java       |  16 +-
 .../monitoring/RiemannMonitoringClient.java     |  22 +-
 helix-monitor-server/pom.xml                    |   5 +
 .../src/main/resources/riemann.config           |   9 +-
 .../monitoring/TestClientServerMonitoring.java  | 222 +++++++++++++++++++
 5 files changed, 257 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/b1df294b/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringEvent.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringEvent.java b/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringEvent.java
index 3735589..80006fb 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringEvent.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringEvent.java
@@ -40,6 +40,7 @@ public class MonitoringEvent {
   private ClusterId _clusterId;
   private ResourceId _resourceId;
   private PartitionId _partitionId;
+  private String _name;
   private String _host;
   private String _eventState;
   private String _description;
@@ -56,9 +57,10 @@ public class MonitoringEvent {
    */
   public MonitoringEvent() {
     _clusterId = null;
-    _host = null;
     _resourceId = null;
     _partitionId = null;
+    _name = null;
+    _host = null;
     _eventState = null;
     _description = null;
     _time = null;
@@ -71,6 +73,16 @@ public class MonitoringEvent {
   }
 
   /**
+   * Give this event a name
+   * @param name the name
+   * @return MonitoringEvent
+   */
+  public MonitoringEvent name(String name) {
+    _name = name;
+    return this;
+  }
+
+  /**
    * Set the cluster this event corresponds to
    * @param clusterId the cluster id
    * @return MonitoringEvent
@@ -257,7 +269,7 @@ public class MonitoringEvent {
     if (_partitionId == null) {
       _partitionId = PartitionId.from("%");
     }
-    return String.format("%s|%s|%s", _clusterId, _resourceId, _partitionId);
+    return String.format("%s|%s|%s|%s", _clusterId, _resourceId, _partitionId, _name);
   }
 
   String eventState() {

http://git-wip-us.apache.org/repos/asf/helix/blob/b1df294b/helix-monitor-client/src/main/java/org/apache/helix/monitoring/RiemannMonitoringClient.java
----------------------------------------------------------------------
diff --git a/helix-monitor-client/src/main/java/org/apache/helix/monitoring/RiemannMonitoringClient.java b/helix-monitor-client/src/main/java/org/apache/helix/monitoring/RiemannMonitoringClient.java
index 1948308..20b0825 100644
--- a/helix-monitor-client/src/main/java/org/apache/helix/monitoring/RiemannMonitoringClient.java
+++ b/helix-monitor-client/src/main/java/org/apache/helix/monitoring/RiemannMonitoringClient.java
@@ -311,35 +311,35 @@ public class RiemannMonitoringClient implements MonitoringClient {
   private EventDSL convertEvent(AbstractRiemannClient c, MonitoringEvent helixEvent) {
     EventDSL event = c.event();
     if (helixEvent.host() != null) {
-      event = event.host(helixEvent.host());
+      event.host(helixEvent.host());
     }
     if (helixEvent.service() != null) {
-      event = event.service(helixEvent.service());
+      event.service(helixEvent.service());
     }
     if (helixEvent.eventState() != null) {
-      event = event.state(helixEvent.eventState());
+      event.state(helixEvent.eventState());
     }
     if (helixEvent.description() != null) {
-      event = event.description(helixEvent.description());
+      event.description(helixEvent.description());
     }
     if (helixEvent.time() != null) {
-      event = event.time(helixEvent.time());
+      event.time(helixEvent.time());
     }
     if (helixEvent.ttl() != null) {
-      event = event.ttl(helixEvent.ttl());
+      event.ttl(helixEvent.ttl());
     }
     if (helixEvent.longMetric() != null) {
-      event = event.metric(helixEvent.longMetric());
+      event.metric(helixEvent.longMetric());
     } else if (helixEvent.floatMetric() != null) {
-      event = event.metric(helixEvent.floatMetric());
+      event.metric(helixEvent.floatMetric());
     } else if (helixEvent.doubleMetric() != null) {
-      event = event.metric(helixEvent.doubleMetric());
+      event.metric(helixEvent.doubleMetric());
     }
     if (!helixEvent.tags().isEmpty()) {
-      event = event.tags(helixEvent.tags());
+      event.tags(helixEvent.tags());
     }
     if (!helixEvent.attributes().isEmpty()) {
-      event = event.attributes(helixEvent.attributes());
+      event.attributes.putAll(helixEvent.attributes());
     }
     return event;
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/b1df294b/helix-monitor-server/pom.xml
----------------------------------------------------------------------
diff --git a/helix-monitor-server/pom.xml b/helix-monitor-server/pom.xml
index a703e0e..d6fdf52 100644
--- a/helix-monitor-server/pom.xml
+++ b/helix-monitor-server/pom.xml
@@ -44,6 +44,11 @@ under the License.
       <artifactId>helix-core</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.apache.helix</groupId>
+      <artifactId>helix-monitor-client</artifactId>
+      <version>0.7.1-incubating-SNAPSHOT</version>
+    </dependency>
+    <dependency>
       <groupId>riemann</groupId>
       <artifactId>riemann</artifactId>
       <version>0.2.4</version>

http://git-wip-us.apache.org/repos/asf/helix/blob/b1df294b/helix-monitor-server/src/main/resources/riemann.config
----------------------------------------------------------------------
diff --git a/helix-monitor-server/src/main/resources/riemann.config b/helix-monitor-server/src/main/resources/riemann.config
index 08c3bce..0f06dd0 100644
--- a/helix-monitor-server/src/main/resources/riemann.config
+++ b/helix-monitor-server/src/main/resources/riemann.config
@@ -20,13 +20,13 @@
 
 (logging/init :file "/dev/null")
 
-(tcp-server)
+(tcp-server :host "0.0.0.0")
 
 (instrumentation {:interval 1})
 
-(udp-server)
-(ws-server)
-(repl-server)
+(udp-server :host "0.0.0.0")
+(ws-server :host "0.0.0.0")
+(repl-server :host "0.0.0.0")
 
 (periodically-expire 1)
 
@@ -34,3 +34,4 @@
   (streams
     (expired prn)
     index))
+

http://git-wip-us.apache.org/repos/asf/helix/blob/b1df294b/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestClientServerMonitoring.java
----------------------------------------------------------------------
diff --git a/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestClientServerMonitoring.java b/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestClientServerMonitoring.java
new file mode 100644
index 0000000..8b7f839
--- /dev/null
+++ b/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestClientServerMonitoring.java
@@ -0,0 +1,222 @@
+package org.apache.helix.monitoring;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.net.InetAddress;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.Leader;
+import org.apache.helix.model.MonitoringConfig;
+import org.apache.helix.spectator.RoutingTableProvider;
+import org.junit.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Maps;
+
+public class TestClientServerMonitoring extends ZkUnitTestBase {
+  @Test
+  public void testMonitoring() throws Exception {
+    final int NUM_PARTICIPANTS = 4;
+    final int NUM_PARTITIONS = 8;
+    final int NUM_REPLICAS = 2;
+
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    // Set up cluster
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        "TestDB", // resource name prefix
+        1, // resources
+        NUM_PARTITIONS, // partitions per resource
+        NUM_PARTICIPANTS, // number of nodes
+        NUM_REPLICAS, // replicas
+        "MasterSlave", // pick a built-in state model
+        RebalanceMode.FULL_AUTO, // let Helix handle rebalancing
+        true); // do rebalance
+
+    // start participants
+    MockParticipantManager[] participants = new MockParticipantManager[NUM_PARTICIPANTS];
+    for (int i = 0; i < NUM_PARTICIPANTS; i++) {
+      participants[i] =
+          new MockParticipantManager(ZK_ADDR, clusterName, "localhost_" + (12918 + i));
+      participants[i].syncStart();
+    }
+    HelixDataAccessor accessor = participants[0].getHelixDataAccessor();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+    // set a custom monitoring config
+    MonitoringConfig monitoringConfig = new MonitoringConfig("sampleMonitoringConfig");
+    monitoringConfig.setConfig(getMonitoringConfigString());
+    accessor.setProperty(keyBuilder.monitoringConfig("sampleMonitoringConfig"), monitoringConfig);
+
+    // start controller
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller");
+    controller.registerMonitoringServer(new RiemannMonitoringServer(InetAddress.getLocalHost()
+        .getHostName()));
+    controller.syncStart();
+
+    // make sure the leader has registered and is showing the server port
+    Leader leader = accessor.getProperty(keyBuilder.controllerLeader());
+    Assert.assertNotNull(leader);
+    Assert.assertNotEquals(leader.getMonitoringPort(), -1);
+    Assert.assertNotNull(leader.getMonitoringHost());
+
+    // run the spectator
+    spectate(clusterName, "TestDB0", NUM_PARTITIONS);
+
+    // stop participants
+    for (MockParticipantManager participant : participants) {
+      participant.syncStop();
+    }
+
+    // stop controller
+    controller.syncStop();
+  }
+
+  private String getMonitoringConfigString() {
+    StringBuilder sb =
+        new StringBuilder()
+            .append("(defn parse-int\r\n")
+            .append(
+                "  \"Convert a string to an integer\"\r\n  [instr]\r\n  (Integer/parseInt instr))\r\n\r\n")
+            .append("(defn parse-double\r\n  \"Convert a string into a double\"\r\n  [instr]\r\n")
+            .append("  (Double/parseDouble instr))\r\n\r\n(defn check-failure-rate\r\n")
+            .append(
+                "  \"Check if the event should trigger an alarm based on failure rate\"\r\n  [e]\r\n")
+            .append(
+                "  (let [writeCount (parse-int (:writeCount e)) failedCount (parse-int (:failedCount e))]\r\n")
+            .append(
+                "    (if (> writeCount 0)\r\n      (let [ratio (double (/ failedCount writeCount))]\r\n")
+            .append("        (if (> ratio 0.1) ; Report if the failure count exceeds 10%\r\n")
+            .append(
+                "          (prn (:host e) \"has an unacceptable failure rate of\" ratio))))))\r\n\r\n")
+            .append(
+                "(defn check-95th-latency\r\n  \"Check if the 95th percentile latency is within expectations\"\r\n")
+            .append("  [e]\r\n  (let [latency (parse-double (:latency95 e))]\r\n")
+            .append(
+                "    (if (> latency 1.0) ; Report if the 95th percentile latency exceeds 1.0s\r\n")
+            .append(
+                "      (prn (:host e) \"has an unacceptable 95th percentile latency of\" latency))))\r\n\r\n")
+            .append("(streams\r\n  (where\r\n    (service #\".*LatencyReport.*\")")
+            .append(
+                " ; Only process services containing LatencyReport\r\n    check-failure-rate\r\n")
+            .append("    check-95th-latency))");
+    return sb.toString();
+  }
+
+  private void spectate(final String clusterName, final String resourceName, final int numPartitions)
+      throws Exception {
+    final Random random = new Random();
+    final ClusterId clusterId = ClusterId.from(clusterName);
+    final ResourceId resourceId = ResourceId.from(resourceName);
+
+    // Connect to Helix
+    final HelixManager manager =
+        HelixManagerFactory.getZKHelixManager(clusterName, null, InstanceType.SPECTATOR, ZK_ADDR);
+    manager.connect();
+
+    // Attach a monitoring client to this connection
+    final MonitoringClient client =
+        new RiemannMonitoringClient(clusterId, manager.getHelixDataAccessor());
+    client.connect();
+
+    // Start spectating
+    final RoutingTableProvider routingTableProvider = new RoutingTableProvider();
+    manager.addExternalViewChangeListener(routingTableProvider);
+
+    // Send some metrics
+    client.every(5, 0, TimeUnit.SECONDS, new Runnable() {
+      @Override
+      public void run() {
+        Map<ParticipantId, Integer> writeCounts = Maps.newHashMap();
+        Map<ParticipantId, Integer> failedCounts = Maps.newHashMap();
+        Map<ParticipantId, Double> latency95Map = Maps.newHashMap();
+        for (int i = 0; i < numPartitions; i++) {
+          // Figure out who hosts what
+          PartitionId partitionId = PartitionId.from(resourceId, i + "");
+          List<InstanceConfig> instances =
+              routingTableProvider.getInstances(resourceName, partitionId.stringify(), "MASTER");
+          if (instances.size() < 1) {
+            continue;
+          }
+
+          // Normally you would get these attributes by using a CallTracker
+          ParticipantId participantId = instances.get(0).getParticipantId();
+          int writeCount = random.nextInt(1000) + 10;
+          if (!writeCounts.containsKey(participantId)) {
+            writeCounts.put(participantId, writeCount);
+          } else {
+            writeCounts.put(participantId, writeCounts.get(participantId) + writeCount);
+          }
+          int failedCount = i != 0 ? 0 : writeCount / 2; // bad write count from p0 master
+          if (!failedCounts.containsKey(participantId)) {
+            failedCounts.put(participantId, failedCount);
+          } else {
+            failedCounts.put(participantId, failedCounts.get(participantId) + failedCount);
+          }
+          double latency = (i != 1) ? 0.001 : 5.000; // bad 95th latency from p1 master
+          latency95Map.put(participantId, latency);
+        }
+
+        // Send everything grouped by participant
+        for (ParticipantId participantId : writeCounts.keySet()) {
+          Map<String, String> attributes = Maps.newHashMap();
+          attributes.put("writeCount", writeCounts.get(participantId) + "");
+          attributes.put("failedCount", failedCounts.get(participantId) + "");
+          attributes.put("latency95", latency95Map.get(participantId) + "");
+
+          // Send an event with a ttl long enough to span the send interval
+          MonitoringEvent e =
+              new MonitoringEvent().cluster(clusterId).resource(resourceId)
+                  .participant(participantId).name("LatencyReport").attributes(attributes)
+                  .eventState("update").ttl(10.0f);
+          client.send(e, false);
+        }
+      }
+    });
+    Thread.sleep(60000);
+    client.disconnect();
+    manager.disconnect();
+  }
+
+}