You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/01/18 03:02:42 UTC
[1/2] git commit: [HELIX-317] Create a monitoring client API
Updated Branches:
refs/heads/helix-monitoring 77f14b4a4 -> b1df294ba
[HELIX-317] Create a monitoring client API
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/902e6fa2
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/902e6fa2
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/902e6fa2
Branch: refs/heads/helix-monitoring
Commit: 902e6fa220ba9d1148fc70b4f39b52430758f719
Parents: 77f14b4
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Tue Dec 10 13:35:32 2013 -0800
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Tue Dec 10 13:35:32 2013 -0800
----------------------------------------------------------------------
.../java/org/apache/helix/HelixConnection.java | 1 -
.../helix/manager/zk/ZkHelixConnection.java | 4 +
.../helix/monitoring/MonitoringClient.java | 85 +++++
.../helix/monitoring/MonitoringClientOwner.java | 37 ++
.../helix/monitoring/MonitoringEvent.java | 298 +++++++++++++++
helix-monitor-client/DISCLAIMER | 15 +
helix-monitor-client/LICENSE | 273 ++++++++++++++
helix-monitor-client/NOTICE | 30 ++
helix-monitor-client/pom.xml | 113 ++++++
helix-monitor-client/src/assemble/assembly.xml | 60 +++
.../src/main/config/log4j.properties | 31 ++
.../monitoring/RiemannMonitoringClient.java | 376 +++++++++++++++++++
helix-monitor-client/src/test/conf/testng.xml | 27 ++
pom.xml | 1 +
14 files changed, 1350 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/902e6fa2/helix-core/src/main/java/org/apache/helix/HelixConnection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixConnection.java b/helix-core/src/main/java/org/apache/helix/HelixConnection.java
index 0e674d2..ca98767 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixConnection.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixConnection.java
@@ -25,7 +25,6 @@ import org.apache.helix.api.accessor.ResourceAccessor;
import org.apache.helix.api.id.ClusterId;
import org.apache.helix.api.id.ControllerId;
import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.api.id.ResourceId;
import org.apache.helix.api.id.SessionId;
import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
import org.apache.helix.store.HelixPropertyStore;
http://git-wip-us.apache.org/repos/asf/helix/blob/902e6fa2/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixConnection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixConnection.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixConnection.java
index 1bdc54c..d0aa2c7 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixConnection.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixConnection.java
@@ -69,6 +69,7 @@ import org.apache.helix.api.id.ResourceId;
import org.apache.helix.api.id.SessionId;
import org.apache.helix.messaging.DefaultMessagingService;
import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
+import org.apache.helix.monitoring.MonitoringClient;
import org.apache.helix.store.HelixPropertyStore;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.log4j.Logger;
@@ -105,6 +106,8 @@ public class ZkHelixConnection implements HelixConnection, IZkStateListener {
* helix version#
*/
final String _version;
+
+ private MonitoringClient _monitoringClient;
public ZkHelixConnection(String zkAddr) {
_zkAddr = zkAddr;
@@ -127,6 +130,7 @@ public class ZkHelixConnection implements HelixConnection, IZkStateListener {
_properties = new HelixManagerProperties("cluster-manager-version.properties");
_version = _properties.getVersion();
+ _monitoringClient = null;
}
private int getSystemPropertyAsInt(String propertyKey, int propertyDefaultValue) {
http://git-wip-us.apache.org/repos/asf/helix/blob/902e6fa2/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringClient.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringClient.java b/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringClient.java
new file mode 100644
index 0000000..a794eab
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringClient.java
@@ -0,0 +1,85 @@
+package org.apache.helix.monitoring;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Interface for a client that can register with a monitoring server and send events for monitoring
+ */
+public interface MonitoringClient {
+ /**
+ * Connect. May be asynchronous.
+ */
+ void connect();
+
+ /**
+ * Disconnect synchronously.
+ */
+ void disconnect();
+
+ /**
+ * Send an event
+ * @param e the event
+ * @param batch true if this should be part of a batch operation
+ * @return true if the event was sent (or queued for batching), false otherwise
+ */
+ boolean send(MonitoringEvent e, boolean batch);
+
+ /**
+ * Send an event and flush any outstanding messages
+ * @param e the event
+ * @return true if events were successfully sent, false otherwise
+ */
+ boolean sendAndFlush(MonitoringEvent e);
+
+ /**
+ * Schedule an operation to run
+ * @param interval the frequency
+ * @param delay the amount of time to wait before the first execution
+ * @param unit the unit of time to use
+ * @param r the code to run
+ */
+ void every(long interval, long delay, TimeUnit unit, Runnable r);
+
+ /**
+ * Check if there is a valid connection to a monitoring server
+ * @return true if connected, false otherwise
+ */
+ boolean isConnected();
+
+ /**
+ * Check if batching is being used
+ * @return true if enabled, false otherwise
+ */
+ boolean isBatchingEnabled();
+
+ /**
+ * Check the number of events sent as a batch
+ * @return the batch size, or 1 if batching is not used
+ */
+ int getBatchSize();
+
+ /**
+ * Flush all outstanding events
+ * @return true if the events were flushed, false otherwise
+ */
+ boolean flush();
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/902e6fa2/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringClientOwner.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringClientOwner.java b/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringClientOwner.java
new file mode 100644
index 0000000..c623ab1
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringClientOwner.java
@@ -0,0 +1,37 @@
+package org.apache.helix.monitoring;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+public interface MonitoringClientOwner {
+
+ /**
+ * Register a monitoring client that can be used to report statistics.
+ * This will connect a monitoring client. If an existing client is connected, it will be
+ * disconnected.
+ * @param monitoringClient a reference to an instantiated client
+ */
+ void registerMonitoringClient(MonitoringClient monitoringClient);
+
+ /**
+ * Get the registered monitoring client
+ * @return a MonitoringClient object, or null
+ */
+ MonitoringClient getMonitoringClient();
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/902e6fa2/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringEvent.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringEvent.java b/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringEvent.java
new file mode 100644
index 0000000..3735589
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringEvent.java
@@ -0,0 +1,298 @@
+package org.apache.helix.monitoring;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ControllerId;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.api.id.SpectatorId;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * A generic monitoring event based on Helix constructs. This is based on Riemann's EventDSL.
+ */
+public class MonitoringEvent {
+ private ClusterId _clusterId;
+ private ResourceId _resourceId;
+ private PartitionId _partitionId;
+ private String _host;
+ private String _eventState;
+ private String _description;
+ private Long _time;
+ private Long _longMetric;
+ private Float _floatMetric;
+ private Double _doubleMetric;
+ private Float _ttl;
+ private List<String> _tags;
+ private Map<String, String> _attributes;
+
+ /**
+ * Create an empty MonitoringEvent
+ */
+ public MonitoringEvent() {
+ _clusterId = null;
+ _host = null;
+ _resourceId = null;
+ _partitionId = null;
+ _eventState = null;
+ _description = null;
+ _time = null;
+ _longMetric = null;
+ _floatMetric = null;
+ _doubleMetric = null;
+ _ttl = null;
+ _tags = Lists.newLinkedList();
+ _attributes = Maps.newHashMap();
+ }
+
+ /**
+ * Set the cluster this event corresponds to
+ * @param clusterId the cluster id
+ * @return MonitoringEvent
+ */
+ public MonitoringEvent cluster(ClusterId clusterId) {
+ _clusterId = clusterId;
+ return this;
+ }
+
+ /**
+ * Set the participant this event corresponds to
+ * @param participantId the participant id
+ * @return MonitoringEvent
+ */
+ public MonitoringEvent participant(ParticipantId participantId) {
+ _host = participantId.stringify();
+ return this;
+ }
+
+ /**
+ * Set the spectator this event corresponds to
+ * @param spectatorId the spectator id
+ * @return MonitoringEvent
+ */
+ public MonitoringEvent spectator(SpectatorId spectatorId) {
+ _host = spectatorId.stringify();
+ return this;
+ }
+
+ /**
+ * Set the controller this event corresponds to
+ * @param controllerId the controller id
+ * @return MonitoringEvent
+ */
+ public MonitoringEvent controller(ControllerId controllerId) {
+ _host = controllerId.stringify();
+ return this;
+ }
+
+ /**
+ * Set the resource this event corresponds to
+ * @param resourceId the resource id
+ * @return MonitoringEvent
+ */
+ public MonitoringEvent resource(ResourceId resourceId) {
+ _resourceId = resourceId;
+ return this;
+ }
+
+ /**
+ * Set the partition this event corresponds to
+ * @param partitionId the partition id
+ * @return MonitoringEvent
+ */
+ public MonitoringEvent partition(PartitionId partitionId) {
+ _partitionId = partitionId;
+ return this;
+ }
+
+ /**
+ * Set the state of the metric
+ * @param eventState the event state (e.g. "OK", "Failing", etc)
+ * @return MonitoringEvent
+ */
+ public MonitoringEvent eventState(String eventState) {
+ _eventState = eventState;
+ return this;
+ }
+
+ /**
+ * Give the event a description
+ * @param description descriptive text
+ * @return MonitoringEvent
+ */
+ public MonitoringEvent description(String description) {
+ _description = description;
+ return this;
+ }
+
+ /**
+ * Set the time that the event occurred
+ * @param time long UNIX timestamp
+ * @return MonitoringEvent
+ */
+ public MonitoringEvent time(long time) {
+ _time = time;
+ return this;
+ }
+
+ /**
+ * Give the event a long metric
+ * @param metric the metric (the measured quantity)
+ * @return MonitoringEvent
+ */
+ public MonitoringEvent metric(long metric) {
+ _longMetric = metric;
+ return this;
+ }
+
+ /**
+ * Give the event a float metric
+ * @param metric the metric (the measured quantity)
+ * @return MonitoringEvent
+ */
+ public MonitoringEvent metric(float metric) {
+ _floatMetric = metric;
+ return this;
+ }
+
+ /**
+ * Give the event a double metric
+ * @param metric the metric (the measured quantity)
+ * @return MonitoringEvent
+ */
+ public MonitoringEvent metric(double metric) {
+ _doubleMetric = metric;
+ return this;
+ }
+
+ /**
+ * Give the time before the event will expire
+ * @param ttl time to live
+ * @return MonitoringEvent
+ */
+ public MonitoringEvent ttl(float ttl) {
+ _ttl = ttl;
+ return this;
+ }
+
+ /**
+ * Add a tag to the event
+ * @param tag arbitrary string
+ * @return MonitoringEvent
+ */
+ public MonitoringEvent tag(String tag) {
+ _tags.add(tag);
+ return this;
+ }
+
+ /**
+ * Add multiple tags to an event
+ * @param tags a collection of tags
+ * @return MonitoringEvent
+ */
+ public MonitoringEvent tags(Collection<String> tags) {
+ _tags.addAll(tags);
+ return this;
+ }
+
+ /**
+ * Add an attribute (a key-value pair)
+ * @param name the attribute name
+ * @param value the attribute value
+ * @return MonitoringEvent
+ */
+ public MonitoringEvent attribute(String name, String value) {
+ _attributes.put(name, value);
+ return this;
+ }
+
+ /**
+ * Add multiple attributes
+ * @param attributes map of attribute name to value
+ * @return MonitoringEvent
+ */
+ public MonitoringEvent attributes(Map<String, String> attributes) {
+ _attributes.putAll(attributes);
+ return this;
+ }
+
+ // below are a set of package-private getters for each of the fields
+
+ String host() {
+ return _host;
+ }
+
+ String service() {
+ if (_clusterId == null) {
+ _clusterId = ClusterId.from("%");
+ }
+ if (_resourceId == null) {
+ _resourceId = ResourceId.from("%");
+ }
+ if (_partitionId == null) {
+ _partitionId = PartitionId.from("%");
+ }
+ return String.format("%s|%s|%s", _clusterId, _resourceId, _partitionId);
+ }
+
+ String eventState() {
+ return _eventState;
+ }
+
+ String description() {
+ return _description;
+ }
+
+ Long time() {
+ return _time;
+ }
+
+ Long longMetric() {
+ return _longMetric;
+ }
+
+ Float floatMetric() {
+ return _floatMetric;
+ }
+
+ Double doubleMetric() {
+ return _doubleMetric;
+ }
+
+ Float ttl() {
+ return _ttl;
+ }
+
+ List<String> tags() {
+ return _tags;
+ }
+
+ Map<String, String> attributes() {
+ return _attributes;
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/902e6fa2/helix-monitor-client/DISCLAIMER
----------------------------------------------------------------------
diff --git a/helix-monitor-client/DISCLAIMER b/helix-monitor-client/DISCLAIMER
new file mode 100644
index 0000000..2001d31
--- /dev/null
+++ b/helix-monitor-client/DISCLAIMER
@@ -0,0 +1,15 @@
+Apache Helix is an effort undergoing incubation at the Apache Software
+Foundation (ASF), sponsored by the Apache Incubator PMC.
+
+Incubation is required of all newly accepted projects until a further review
+indicates that the infrastructure, communications, and decision making process
+have stabilized in a manner consistent with other successful ASF projects.
+
+While incubation status is not necessarily a reflection of the completeness
+or stability of the code, it does indicate that the project has yet to be
+fully endorsed by the ASF.
+
+For more information about the incubation status of the Apache Helix project you
+can go to the following page:
+
+http://incubator.apache.org/projects/helix.html
http://git-wip-us.apache.org/repos/asf/helix/blob/902e6fa2/helix-monitor-client/LICENSE
----------------------------------------------------------------------
diff --git a/helix-monitor-client/LICENSE b/helix-monitor-client/LICENSE
new file mode 100644
index 0000000..413913f
--- /dev/null
+++ b/helix-monitor-client/LICENSE
@@ -0,0 +1,273 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+
+
+For xstream:
+
+Copyright (c) 2003-2006, Joe Walnes
+Copyright (c) 2006-2009, 2011 XStream Committers
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+
+1. Redistributions of source code must retain the above copyright notice, this list of
+conditions and the following disclaimer.
+
+2. Redistributions in binary form must reproduce the above copyright notice, this list of
+conditions and the following disclaimer in the documentation and/or other materials provided
+with the distribution.
+
+3. Neither the name of XStream nor the names of its contributors may be used to endorse
+or promote products derived from this software without specific prior written
+permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
+EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
+SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
+INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
+TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY
+WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
+DAMAGE.
+
+for jline:
+
+Copyright (c) 2002-2006, Marc Prud'hommeaux <mw...@cornell.edu>
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or
+without modification, are permitted provided that the following
+conditions are met:
+
+Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+
+Redistributions in binary form must reproduce the above copyright
+notice, this list of conditions and the following disclaimer
+in the documentation and/or other materials provided with
+the distribution.
+
+Neither the name of JLine nor the names of its contributors
+may be used to endorse or promote products derived from this
+software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING,
+BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
+AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
+EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE
+FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY,
+OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
+IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
+OF THE POSSIBILITY OF SUCH DAMAGE.
+
+
+
http://git-wip-us.apache.org/repos/asf/helix/blob/902e6fa2/helix-monitor-client/NOTICE
----------------------------------------------------------------------
diff --git a/helix-monitor-client/NOTICE b/helix-monitor-client/NOTICE
new file mode 100644
index 0000000..e070e15
--- /dev/null
+++ b/helix-monitor-client/NOTICE
@@ -0,0 +1,30 @@
+Apache Helix
+Copyright 2012 The Apache Software Foundation
+
+
+I. Included Software
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+Licensed under the Apache License 2.0.
+
+This product includes software developed at
+Codehaus (http://www.codehaus.org/)
+Licensed under the BSD License.
+
+This product includes software developed at
+jline (http://jline.sourceforge.net/ )
+Licensed under the BSD License.
+
+This product includes software developed at
+josql (http://sourceforge.net/projects/josql/).
+Licensed under the Apache License 2.0.
+
+This product includes software developed at
+restlet (http://www.restlet.org/about/legal).
+Licensed under the Apache License 2.0.
+
+
+II. License Summary
+- Apache License 2.0
+- BSD License
http://git-wip-us.apache.org/repos/asf/helix/blob/902e6fa2/helix-monitor-client/pom.xml
----------------------------------------------------------------------
diff --git a/helix-monitor-client/pom.xml b/helix-monitor-client/pom.xml
new file mode 100644
index 0000000..5857a59
--- /dev/null
+++ b/helix-monitor-client/pom.xml
@@ -0,0 +1,113 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache.helix</groupId>
+ <artifactId>helix</artifactId>
+ <version>0.7.1-incubating-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>helix-monitor-client</artifactId>
+ <packaging>jar</packaging>
+ <name>Apache Helix :: Helix Monitoring Client</name>
+
+ <properties>
+ <osgi.import>
+ org.apache.helix*,
+ org.apache.log4j,
+ *
+ </osgi.import>
+ <osgi.export>org.apache.helix.monitoring*;version="${project.version};-noimport:=true</osgi.export>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.helix</groupId>
+ <artifactId>helix-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.aphyr</groupId>
+ <artifactId>riemann-java-client</artifactId>
+ <version>0.2.8</version>
+ </dependency>
+ <dependency>
+ <groupId>org.testng</groupId>
+ <artifactId>testng</artifactId>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.helix</groupId>
+ <artifactId>helix-core</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <resources>
+ <resource>
+ <directory>${basedir}/src/main/resources</directory>
+ <filtering>true</filtering>
+ </resource>
+ <resource>
+ <directory>${basedir}</directory>
+ <includes>
+ <include>DISCLAIMER</include>
+ </includes>
+ </resource>
+ </resources>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>appassembler-maven-plugin</artifactId>
+ <configuration>
+ <platforms>
+ <platform>windows</platform>
+ <platform>unix</platform>
+ </platforms>
+ <programs />
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <configuration>
+ <descriptors>
+ <descriptor>src/assemble/assembly.xml</descriptor>
+ </descriptors>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/helix/blob/902e6fa2/helix-monitor-client/src/assemble/assembly.xml
----------------------------------------------------------------------
diff --git a/helix-monitor-client/src/assemble/assembly.xml b/helix-monitor-client/src/assemble/assembly.xml
new file mode 100644
index 0000000..c2d08a1
--- /dev/null
+++ b/helix-monitor-client/src/assemble/assembly.xml
@@ -0,0 +1,60 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<assembly>
+ <id>pkg</id>
+ <formats>
+ <format>tar</format>
+ </formats>
+ <fileSets>
+ <fileSet>
+ <directory>${project.build.directory}/${project.artifactId}-pkg/bin</directory>
+ <outputDirectory>bin</outputDirectory>
+ <lineEnding>unix</lineEnding>
+ <fileMode>0755</fileMode>
+ <directoryMode>0755</directoryMode>
+ </fileSet>
+ <fileSet>
+ <directory>${project.build.directory}/${project.artifactId}-pkg/repo/</directory>
+ <outputDirectory>repo</outputDirectory>
+ <fileMode>0755</fileMode>
+ <directoryMode>0755</directoryMode>
+ <excludes>
+ <exclude>**/*.xml</exclude>
+ </excludes>
+ </fileSet>
+ <fileSet>
+ <directory>${project.build.directory}/${project.artifactId}-pkg/conf</directory>
+ <outputDirectory>conf</outputDirectory>
+ <lineEnding>unix</lineEnding>
+ <fileMode>0755</fileMode>
+ <directoryMode>0755</directoryMode>
+ </fileSet>
+ <fileSet>
+ <directory>${project.basedir}</directory>
+ <outputDirectory>/</outputDirectory>
+ <includes>
+ <include>LICENSE</include>
+ <include>NOTICE</include>
+ <include>DISCLAIMER</include>
+ </includes>
+ <fileMode>0755</fileMode>
+ </fileSet>
+ </fileSets>
+</assembly>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/helix/blob/902e6fa2/helix-monitor-client/src/main/config/log4j.properties
----------------------------------------------------------------------
diff --git a/helix-monitor-client/src/main/config/log4j.properties b/helix-monitor-client/src/main/config/log4j.properties
new file mode 100644
index 0000000..4b3dc31
--- /dev/null
+++ b/helix-monitor-client/src/main/config/log4j.properties
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Set root logger level to DEBUG and its only appender to A1.
+log4j.rootLogger=ERROR,A1
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+log4j.logger.org.I0Itec=ERROR
+log4j.logger.org.apache=ERROR
http://git-wip-us.apache.org/repos/asf/helix/blob/902e6fa2/helix-monitor-client/src/main/java/org/apache/helix/monitoring/RiemannMonitoringClient.java
----------------------------------------------------------------------
diff --git a/helix-monitor-client/src/main/java/org/apache/helix/monitoring/RiemannMonitoringClient.java b/helix-monitor-client/src/main/java/org/apache/helix/monitoring/RiemannMonitoringClient.java
new file mode 100644
index 0000000..1948308
--- /dev/null
+++ b/helix-monitor-client/src/main/java/org/apache/helix/monitoring/RiemannMonitoringClient.java
@@ -0,0 +1,376 @@
+package org.apache.helix.monitoring;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.I0Itec.zkclient.IZkDataListener;
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.model.Leader;
+import org.apache.log4j.Logger;
+
+import com.aphyr.riemann.client.AbstractRiemannClient;
+import com.aphyr.riemann.client.EventDSL;
+import com.aphyr.riemann.client.RiemannBatchClient;
+import com.aphyr.riemann.client.RiemannClient;
+import com.aphyr.riemann.client.UnsupportedJVMException;
+import com.google.common.collect.Lists;
+
+/**
+ * A Riemann-based monitoring client
+ * Thread safety note: connect and disconnect are serialized to ensure that there
+ * is no attempt to connect or disconnect with an inconsistent state. The send routines are not
+ * protected for performance reasons, and so a single send/flush may fail.
+ */
+public class RiemannMonitoringClient implements MonitoringClient {
+ private static final Logger LOG = Logger.getLogger(RiemannMonitoringClient.class);
+ private String _host;
+ private int _port;
+ private int _batchSize;
+ private RiemannClient _client;
+ private RiemannBatchClient _batchClient;
+ private List<ScheduledItem> _scheduledItems;
+ private HelixDataAccessor _accessor;
+ private IZkDataListener _leaderListener;
+
+ /**
+ * Create a non-batched monitoring client
+ * @param clusterId the cluster to monitor
+ * @param accessor an accessor for the cluster
+ */
+ public RiemannMonitoringClient(ClusterId clusterId, HelixDataAccessor accessor) {
+ this(clusterId, accessor, 1);
+ }
+
+ /**
+ * Create a monitoring client that supports batching
+ * @param clusterId the cluster to monitor
+ * @param accessor an accessor for the cluster
+ * @param batchSize the number of events in a batch
+ */
+ public RiemannMonitoringClient(ClusterId clusterId, HelixDataAccessor accessor, int batchSize) {
+ _host = null;
+ _port = -1;
+ _batchSize = batchSize > 0 ? batchSize : 1;
+ _client = null;
+ _batchClient = null;
+ _accessor = accessor;
+ _scheduledItems = Lists.newLinkedList();
+ _leaderListener = getLeaderListener();
+ }
+
+ @Override
+ public void connect() {
+ if (isConnected()) {
+ LOG.error("Already connected to Riemann!");
+ return;
+ }
+
+ // watch for changes
+ changeLeaderSubscription(true);
+
+ // do the connect asynchronously as a tcp establishment could take time
+ Leader leader = _accessor.getProperty(_accessor.keyBuilder().controllerLeader());
+ doConnectAsync(leader);
+ }
+
+ @Override
+ public void disconnect() {
+ changeLeaderSubscription(false);
+ disconnectInternal();
+ }
+
+ @Override
+ public boolean isConnected() {
+ return _client != null && _client.isConnected();
+ }
+
+ @Override
+ public boolean flush() {
+ if (!isConnected()) {
+ LOG.error("Tried to flush a Riemann client that is not connected!");
+ return false;
+ }
+ AbstractRiemannClient c = getClient(true);
+ try {
+ c.flush();
+ return true;
+ } catch (IOException e) {
+ LOG.error("Problem flushing the Riemann event queue!", e);
+ }
+ return false;
+ }
+
+ @Override
+ public boolean send(MonitoringEvent event, boolean batch) {
+ if (!isConnected()) {
+ LOG.error("Riemann connection must be active in order to send an event!");
+ return false;
+ }
+ AbstractRiemannClient c = getClient(batch);
+ convertEvent(c, event).send();
+ return true;
+ }
+
+ @Override
+ public boolean sendAndFlush(MonitoringEvent event) {
+ boolean sendResult = send(event, true);
+ if (sendResult) {
+ return flush();
+ }
+ return false;
+ }
+
+ @Override
+ public boolean isBatchingEnabled() {
+ return _batchClient != null && _batchClient.isConnected();
+ }
+
+ @Override
+ public int getBatchSize() {
+ return _batchSize;
+ }
+
+ @Override
+ public void every(long interval, long delay, TimeUnit unit, Runnable r) {
+ ScheduledItem scheduledItem = new ScheduledItem();
+ scheduledItem.interval = interval;
+ scheduledItem.delay = delay;
+ scheduledItem.unit = unit;
+ scheduledItem.r = r;
+ _scheduledItems.add(scheduledItem);
+ if (isConnected()) {
+ getClient().every(interval, delay, unit, r);
+ }
+ }
+
+ /**
+ * Get a raw, non-batched Riemann client.
+ * WARNING: do not cache this, as it may be disconnected without notice
+ * @return RiemannClient
+ */
+ private RiemannClient getClient() {
+ return _client;
+ }
+
+ /**
+ * Get a batched Riemann client (if batching is supported)
+ * WARNING: do not cache this, as it may be disconnected without notice
+ * @return RiemannBatchClient
+ */
+ private RiemannBatchClient getBatchClient() {
+ return _batchClient;
+ }
+
+ /**
+ * Get a Riemann client
+ * WARNING: do not cache this, as it may be disconnected without notice
+ * @param batch true if the client is preferred to support batching, false otherwise
+ * @return AbstractRiemannClient
+ */
+ private AbstractRiemannClient getClient(boolean batch) {
+ if (batch && isBatchingEnabled()) {
+ return getBatchClient();
+ } else {
+ return getClient();
+ }
+ }
+
+ /**
+ * Based on the contents of the leader node, connect to a Riemann server
+ * @param leader node containing host/port
+ */
+ private void doConnectAsync(final Leader leader) {
+ new Thread() {
+ @Override
+ public void run() {
+ synchronized (RiemannMonitoringClient.this) {
+ // only connect if the leader is available; otherwise it will be picked up by the callback
+ if (leader != null) {
+ _host = leader.getMonitoringHost();
+ _port = leader.getMonitoringPort();
+ }
+ // connect if there's a valid host and port
+ if (_host != null && _port != -1) {
+ connectInternal(_host, _port);
+ }
+ }
+ }
+ }.start();
+ }
+
+ /**
+ * Establishment of a connection to a Riemann server
+ * @param host monitoring server hostname
+ * @param port monitoring server port
+ */
+ private synchronized void connectInternal(String host, int port) {
+ disconnectInternal();
+ try {
+ _client = RiemannClient.tcp(host, port);
+ _client.connect();
+ // we might have to reschedule tasks
+ for (ScheduledItem item : _scheduledItems) {
+ _client.every(item.interval, item.delay, item.unit, item.r);
+ }
+ } catch (IOException e) {
+ LOG.error("Error establishing a connection!", e);
+ }
+ if (_client != null && getBatchSize() > 1) {
+ try {
+ _batchClient = new RiemannBatchClient(_batchSize, _client);
+ } catch (UnknownHostException e) {
+ _batchSize = 1;
+ LOG.error("Could not resolve host", e);
+ } catch (UnsupportedJVMException e) {
+ _batchSize = 1;
+ LOG.warn("Batching not enabled because of incompatible JVM", e);
+ }
+ }
+ }
+
+ /**
+ * Teardown of a connection to a Riemann server
+ */
+ private synchronized void disconnectInternal() {
+ try {
+ if (_batchClient != null && _batchClient.isConnected()) {
+ _batchClient.disconnect();
+ } else if (_client != null && _client.isConnected()) {
+ _client.disconnect();
+ }
+ } catch (IOException e) {
+ LOG.error("Disconnection error", e);
+ }
+ _batchClient = null;
+ _client = null;
+ }
+
+ /**
+ * Change the subscription status to the Leader node
+ * @param subscribe true to subscribe, false to unsubscribe
+ */
+ private void changeLeaderSubscription(boolean subscribe) {
+ String leaderPath = _accessor.keyBuilder().controllerLeader().getPath();
+ BaseDataAccessor<ZNRecord> baseAccessor = _accessor.getBaseDataAccessor();
+ if (subscribe) {
+ baseAccessor.subscribeDataChanges(leaderPath, _leaderListener);
+ } else {
+ baseAccessor.unsubscribeDataChanges(leaderPath, _leaderListener);
+ }
+ }
+
+ /**
+ * Get callbacks for when the leader changes
+ * @return implemented IZkDataListener
+ */
+ private IZkDataListener getLeaderListener() {
+ return new IZkDataListener() {
+ @Override
+ public void handleDataChange(String dataPath, Object data) throws Exception {
+ Leader leader = new Leader((ZNRecord) data);
+ doConnectAsync(leader);
+ }
+
+ @Override
+ public void handleDataDeleted(String dataPath) throws Exception {
+ disconnectInternal();
+ }
+ };
+ }
+
+ /**
+ * Change a helix event into a Riemann event
+ * @param c Riemann client
+ * @param helixEvent helix event
+ * @return Riemann EventDSL
+ */
+ private EventDSL convertEvent(AbstractRiemannClient c, MonitoringEvent helixEvent) {
+ EventDSL event = c.event();
+ if (helixEvent.host() != null) {
+ event = event.host(helixEvent.host());
+ }
+ if (helixEvent.service() != null) {
+ event = event.service(helixEvent.service());
+ }
+ if (helixEvent.eventState() != null) {
+ event = event.state(helixEvent.eventState());
+ }
+ if (helixEvent.description() != null) {
+ event = event.description(helixEvent.description());
+ }
+ if (helixEvent.time() != null) {
+ event = event.time(helixEvent.time());
+ }
+ if (helixEvent.ttl() != null) {
+ event = event.ttl(helixEvent.ttl());
+ }
+ if (helixEvent.longMetric() != null) {
+ event = event.metric(helixEvent.longMetric());
+ } else if (helixEvent.floatMetric() != null) {
+ event = event.metric(helixEvent.floatMetric());
+ } else if (helixEvent.doubleMetric() != null) {
+ event = event.metric(helixEvent.doubleMetric());
+ }
+ if (!helixEvent.tags().isEmpty()) {
+ event = event.tags(helixEvent.tags());
+ }
+ if (!helixEvent.attributes().isEmpty()) {
+ event = event.attributes(helixEvent.attributes());
+ }
+ return event;
+ }
+
+ /**
+ * Wrapper for a task that should be run to a schedule
+ */
+ private static class ScheduledItem {
+ long interval;
+ long delay;
+ TimeUnit unit;
+ Runnable r;
+
+ @Override
+ public boolean equals(Object other) {
+ if (other instanceof ScheduledItem) {
+ ScheduledItem that = (ScheduledItem) other;
+ return interval == that.interval && delay == that.delay && unit == that.unit && r == that.r;
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return toString().hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return String.format("interval: %d|delay: %d|timeunit: %s|runnable: %s", interval, delay,
+ unit.toString(), r.toString());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/902e6fa2/helix-monitor-client/src/test/conf/testng.xml
----------------------------------------------------------------------
diff --git a/helix-monitor-client/src/test/conf/testng.xml b/helix-monitor-client/src/test/conf/testng.xml
new file mode 100644
index 0000000..90910aa
--- /dev/null
+++ b/helix-monitor-client/src/test/conf/testng.xml
@@ -0,0 +1,27 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<!DOCTYPE suite SYSTEM "http://testng.org/testng-1.0.dtd">
+<suite name="Suite" parallel="none">
+ <test name="Test" preserve-order="false">
+ <packages>
+ <package name="org.apache.helix.monitoring"/>
+ </packages>
+ </test>
+</suite>
http://git-wip-us.apache.org/repos/asf/helix/blob/902e6fa2/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index c220e2b..e7309a0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -199,6 +199,7 @@ under the License.
<module>helix-agent</module>
<module>helix-examples</module>
<module>helix-monitor-server</module>
+ <module>helix-monitor-client</module>
<module>recipes</module>
<module>site-releases</module>
</modules>
[2/2] git commit: A basic alerting example
Posted by ka...@apache.org.
A basic alerting example
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/b1df294b
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/b1df294b
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/b1df294b
Branch: refs/heads/helix-monitoring
Commit: b1df294baa798f88898029d94f2fe8d165c91ebc
Parents: 902e6fa
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Fri Jan 17 18:02:27 2014 -0800
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Fri Jan 17 18:02:27 2014 -0800
----------------------------------------------------------------------
.../helix/monitoring/MonitoringEvent.java | 16 +-
.../monitoring/RiemannMonitoringClient.java | 22 +-
helix-monitor-server/pom.xml | 5 +
.../src/main/resources/riemann.config | 9 +-
.../monitoring/TestClientServerMonitoring.java | 222 +++++++++++++++++++
5 files changed, 257 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/b1df294b/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringEvent.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringEvent.java b/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringEvent.java
index 3735589..80006fb 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringEvent.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringEvent.java
@@ -40,6 +40,7 @@ public class MonitoringEvent {
private ClusterId _clusterId;
private ResourceId _resourceId;
private PartitionId _partitionId;
+ private String _name;
private String _host;
private String _eventState;
private String _description;
@@ -56,9 +57,10 @@ public class MonitoringEvent {
*/
public MonitoringEvent() {
_clusterId = null;
- _host = null;
_resourceId = null;
_partitionId = null;
+ _name = null;
+ _host = null;
_eventState = null;
_description = null;
_time = null;
@@ -71,6 +73,16 @@ public class MonitoringEvent {
}
/**
+ * Give this event a name
+ * @param name the name
+ * @return MonitoringEvent
+ */
+ public MonitoringEvent name(String name) {
+ _name = name;
+ return this;
+ }
+
+ /**
* Set the cluster this event corresponds to
* @param clusterId the cluster id
* @return MonitoringEvent
@@ -257,7 +269,7 @@ public class MonitoringEvent {
if (_partitionId == null) {
_partitionId = PartitionId.from("%");
}
- return String.format("%s|%s|%s", _clusterId, _resourceId, _partitionId);
+ return String.format("%s|%s|%s|%s", _clusterId, _resourceId, _partitionId, _name);
}
String eventState() {
http://git-wip-us.apache.org/repos/asf/helix/blob/b1df294b/helix-monitor-client/src/main/java/org/apache/helix/monitoring/RiemannMonitoringClient.java
----------------------------------------------------------------------
diff --git a/helix-monitor-client/src/main/java/org/apache/helix/monitoring/RiemannMonitoringClient.java b/helix-monitor-client/src/main/java/org/apache/helix/monitoring/RiemannMonitoringClient.java
index 1948308..20b0825 100644
--- a/helix-monitor-client/src/main/java/org/apache/helix/monitoring/RiemannMonitoringClient.java
+++ b/helix-monitor-client/src/main/java/org/apache/helix/monitoring/RiemannMonitoringClient.java
@@ -311,35 +311,35 @@ public class RiemannMonitoringClient implements MonitoringClient {
private EventDSL convertEvent(AbstractRiemannClient c, MonitoringEvent helixEvent) {
EventDSL event = c.event();
if (helixEvent.host() != null) {
- event = event.host(helixEvent.host());
+ event.host(helixEvent.host());
}
if (helixEvent.service() != null) {
- event = event.service(helixEvent.service());
+ event.service(helixEvent.service());
}
if (helixEvent.eventState() != null) {
- event = event.state(helixEvent.eventState());
+ event.state(helixEvent.eventState());
}
if (helixEvent.description() != null) {
- event = event.description(helixEvent.description());
+ event.description(helixEvent.description());
}
if (helixEvent.time() != null) {
- event = event.time(helixEvent.time());
+ event.time(helixEvent.time());
}
if (helixEvent.ttl() != null) {
- event = event.ttl(helixEvent.ttl());
+ event.ttl(helixEvent.ttl());
}
if (helixEvent.longMetric() != null) {
- event = event.metric(helixEvent.longMetric());
+ event.metric(helixEvent.longMetric());
} else if (helixEvent.floatMetric() != null) {
- event = event.metric(helixEvent.floatMetric());
+ event.metric(helixEvent.floatMetric());
} else if (helixEvent.doubleMetric() != null) {
- event = event.metric(helixEvent.doubleMetric());
+ event.metric(helixEvent.doubleMetric());
}
if (!helixEvent.tags().isEmpty()) {
- event = event.tags(helixEvent.tags());
+ event.tags(helixEvent.tags());
}
if (!helixEvent.attributes().isEmpty()) {
- event = event.attributes(helixEvent.attributes());
+ event.attributes.putAll(helixEvent.attributes());
}
return event;
}
http://git-wip-us.apache.org/repos/asf/helix/blob/b1df294b/helix-monitor-server/pom.xml
----------------------------------------------------------------------
diff --git a/helix-monitor-server/pom.xml b/helix-monitor-server/pom.xml
index a703e0e..d6fdf52 100644
--- a/helix-monitor-server/pom.xml
+++ b/helix-monitor-server/pom.xml
@@ -44,6 +44,11 @@ under the License.
<artifactId>helix-core</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.helix</groupId>
+ <artifactId>helix-monitor-client</artifactId>
+ <version>0.7.1-incubating-SNAPSHOT</version>
+ </dependency>
+ <dependency>
<groupId>riemann</groupId>
<artifactId>riemann</artifactId>
<version>0.2.4</version>
http://git-wip-us.apache.org/repos/asf/helix/blob/b1df294b/helix-monitor-server/src/main/resources/riemann.config
----------------------------------------------------------------------
diff --git a/helix-monitor-server/src/main/resources/riemann.config b/helix-monitor-server/src/main/resources/riemann.config
index 08c3bce..0f06dd0 100644
--- a/helix-monitor-server/src/main/resources/riemann.config
+++ b/helix-monitor-server/src/main/resources/riemann.config
@@ -20,13 +20,13 @@
(logging/init :file "/dev/null")
-(tcp-server)
+(tcp-server :host "0.0.0.0")
(instrumentation {:interval 1})
-(udp-server)
-(ws-server)
-(repl-server)
+(udp-server :host "0.0.0.0")
+(ws-server :host "0.0.0.0")
+(repl-server :host "0.0.0.0")
(periodically-expire 1)
@@ -34,3 +34,4 @@
(streams
(expired prn)
index))
+
http://git-wip-us.apache.org/repos/asf/helix/blob/b1df294b/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestClientServerMonitoring.java
----------------------------------------------------------------------
diff --git a/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestClientServerMonitoring.java b/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestClientServerMonitoring.java
new file mode 100644
index 0000000..8b7f839
--- /dev/null
+++ b/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestClientServerMonitoring.java
@@ -0,0 +1,222 @@
+package org.apache.helix.monitoring;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.net.InetAddress;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.Leader;
+import org.apache.helix.model.MonitoringConfig;
+import org.apache.helix.spectator.RoutingTableProvider;
+import org.junit.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Maps;
+
+public class TestClientServerMonitoring extends ZkUnitTestBase {
+ @Test
+ public void testMonitoring() throws Exception {
+ final int NUM_PARTICIPANTS = 4;
+ final int NUM_PARTITIONS = 8;
+ final int NUM_REPLICAS = 2;
+
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+ // Set up cluster
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ NUM_PARTITIONS, // partitions per resource
+ NUM_PARTICIPANTS, // number of nodes
+ NUM_REPLICAS, // replicas
+ "MasterSlave", // pick a built-in state model
+ RebalanceMode.FULL_AUTO, // let Helix handle rebalancing
+ true); // do rebalance
+
+ // start participants
+ MockParticipantManager[] participants = new MockParticipantManager[NUM_PARTICIPANTS];
+ for (int i = 0; i < NUM_PARTICIPANTS; i++) {
+ participants[i] =
+ new MockParticipantManager(ZK_ADDR, clusterName, "localhost_" + (12918 + i));
+ participants[i].syncStart();
+ }
+ HelixDataAccessor accessor = participants[0].getHelixDataAccessor();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+ // set a custom monitoring config
+ MonitoringConfig monitoringConfig = new MonitoringConfig("sampleMonitoringConfig");
+ monitoringConfig.setConfig(getMonitoringConfigString());
+ accessor.setProperty(keyBuilder.monitoringConfig("sampleMonitoringConfig"), monitoringConfig);
+
+ // start controller
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller");
+ controller.registerMonitoringServer(new RiemannMonitoringServer(InetAddress.getLocalHost()
+ .getHostName()));
+ controller.syncStart();
+
+ // make sure the leader has registered and is showing the server port
+ Leader leader = accessor.getProperty(keyBuilder.controllerLeader());
+ Assert.assertNotNull(leader);
+ Assert.assertNotEquals(leader.getMonitoringPort(), -1);
+ Assert.assertNotNull(leader.getMonitoringHost());
+
+ // run the spectator
+ spectate(clusterName, "TestDB0", NUM_PARTITIONS);
+
+ // stop participants
+ for (MockParticipantManager participant : participants) {
+ participant.syncStop();
+ }
+
+ // stop controller
+ controller.syncStop();
+ }
+
+ private String getMonitoringConfigString() {
+ StringBuilder sb =
+ new StringBuilder()
+ .append("(defn parse-int\r\n")
+ .append(
+ " \"Convert a string to an integer\"\r\n [instr]\r\n (Integer/parseInt instr))\r\n\r\n")
+ .append("(defn parse-double\r\n \"Convert a string into a double\"\r\n [instr]\r\n")
+ .append(" (Double/parseDouble instr))\r\n\r\n(defn check-failure-rate\r\n")
+ .append(
+ " \"Check if the event should trigger an alarm based on failure rate\"\r\n [e]\r\n")
+ .append(
+ " (let [writeCount (parse-int (:writeCount e)) failedCount (parse-int (:failedCount e))]\r\n")
+ .append(
+ " (if (> writeCount 0)\r\n (let [ratio (double (/ failedCount writeCount))]\r\n")
+ .append(" (if (> ratio 0.1) ; Report if the failure count exceeds 10%\r\n")
+ .append(
+ " (prn (:host e) \"has an unacceptable failure rate of\" ratio))))))\r\n\r\n")
+ .append(
+ "(defn check-95th-latency\r\n \"Check if the 95th percentile latency is within expectations\"\r\n")
+ .append(" [e]\r\n (let [latency (parse-double (:latency95 e))]\r\n")
+ .append(
+ " (if (> latency 1.0) ; Report if the 95th percentile latency exceeds 1.0s\r\n")
+ .append(
+ " (prn (:host e) \"has an unacceptable 95th percentile latency of\" latency))))\r\n\r\n")
+ .append("(streams\r\n (where\r\n (service #\".*LatencyReport.*\")")
+ .append(
+ " ; Only process services containing LatencyReport\r\n check-failure-rate\r\n")
+ .append(" check-95th-latency))");
+ return sb.toString();
+ }
+
+ private void spectate(final String clusterName, final String resourceName, final int numPartitions)
+ throws Exception {
+ final Random random = new Random();
+ final ClusterId clusterId = ClusterId.from(clusterName);
+ final ResourceId resourceId = ResourceId.from(resourceName);
+
+ // Connect to Helix
+ final HelixManager manager =
+ HelixManagerFactory.getZKHelixManager(clusterName, null, InstanceType.SPECTATOR, ZK_ADDR);
+ manager.connect();
+
+ // Attach a monitoring client to this connection
+ final MonitoringClient client =
+ new RiemannMonitoringClient(clusterId, manager.getHelixDataAccessor());
+ client.connect();
+
+ // Start spectating
+ final RoutingTableProvider routingTableProvider = new RoutingTableProvider();
+ manager.addExternalViewChangeListener(routingTableProvider);
+
+ // Send some metrics
+ client.every(5, 0, TimeUnit.SECONDS, new Runnable() {
+ @Override
+ public void run() {
+ Map<ParticipantId, Integer> writeCounts = Maps.newHashMap();
+ Map<ParticipantId, Integer> failedCounts = Maps.newHashMap();
+ Map<ParticipantId, Double> latency95Map = Maps.newHashMap();
+ for (int i = 0; i < numPartitions; i++) {
+ // Figure out who hosts what
+ PartitionId partitionId = PartitionId.from(resourceId, i + "");
+ List<InstanceConfig> instances =
+ routingTableProvider.getInstances(resourceName, partitionId.stringify(), "MASTER");
+ if (instances.size() < 1) {
+ continue;
+ }
+
+ // Normally you would get these attributes by using a CallTracker
+ ParticipantId participantId = instances.get(0).getParticipantId();
+ int writeCount = random.nextInt(1000) + 10;
+ if (!writeCounts.containsKey(participantId)) {
+ writeCounts.put(participantId, writeCount);
+ } else {
+ writeCounts.put(participantId, writeCounts.get(participantId) + writeCount);
+ }
+ int failedCount = i != 0 ? 0 : writeCount / 2; // bad write count from p0 master
+ if (!failedCounts.containsKey(participantId)) {
+ failedCounts.put(participantId, failedCount);
+ } else {
+ failedCounts.put(participantId, failedCounts.get(participantId) + failedCount);
+ }
+ double latency = (i != 1) ? 0.001 : 5.000; // bad 95th latency from p1 master
+ latency95Map.put(participantId, latency);
+ }
+
+ // Send everything grouped by participant
+ for (ParticipantId participantId : writeCounts.keySet()) {
+ Map<String, String> attributes = Maps.newHashMap();
+ attributes.put("writeCount", writeCounts.get(participantId) + "");
+ attributes.put("failedCount", failedCounts.get(participantId) + "");
+ attributes.put("latency95", latency95Map.get(participantId) + "");
+
+ // Send an event with a ttl long enough to span the send interval
+ MonitoringEvent e =
+ new MonitoringEvent().cluster(clusterId).resource(resourceId)
+ .participant(participantId).name("LatencyReport").attributes(attributes)
+ .eventState("update").ttl(10.0f);
+ client.send(e, false);
+ }
+ }
+ });
+ Thread.sleep(60000);
+ client.disconnect();
+ manager.disconnect();
+ }
+
+}