You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/01/18 03:02:43 UTC
[2/2] git commit: A basic alerting example
A basic alerting example
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/b1df294b
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/b1df294b
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/b1df294b
Branch: refs/heads/helix-monitoring
Commit: b1df294baa798f88898029d94f2fe8d165c91ebc
Parents: 902e6fa
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Fri Jan 17 18:02:27 2014 -0800
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Fri Jan 17 18:02:27 2014 -0800
----------------------------------------------------------------------
.../helix/monitoring/MonitoringEvent.java | 16 +-
.../monitoring/RiemannMonitoringClient.java | 22 +-
helix-monitor-server/pom.xml | 5 +
.../src/main/resources/riemann.config | 9 +-
.../monitoring/TestClientServerMonitoring.java | 222 +++++++++++++++++++
5 files changed, 257 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/b1df294b/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringEvent.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringEvent.java b/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringEvent.java
index 3735589..80006fb 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringEvent.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringEvent.java
@@ -40,6 +40,7 @@ public class MonitoringEvent {
private ClusterId _clusterId;
private ResourceId _resourceId;
private PartitionId _partitionId;
+ private String _name;
private String _host;
private String _eventState;
private String _description;
@@ -56,9 +57,10 @@ public class MonitoringEvent {
*/
public MonitoringEvent() {
_clusterId = null;
- _host = null;
_resourceId = null;
_partitionId = null;
+ _name = null;
+ _host = null;
_eventState = null;
_description = null;
_time = null;
@@ -71,6 +73,16 @@ public class MonitoringEvent {
}
/**
+ * Give this event a name
+ * @param name the name
+ * @return MonitoringEvent
+ */
+ public MonitoringEvent name(String name) {
+ _name = name;
+ return this;
+ }
+
+ /**
* Set the cluster this event corresponds to
* @param clusterId the cluster id
* @return MonitoringEvent
@@ -257,7 +269,7 @@ public class MonitoringEvent {
if (_partitionId == null) {
_partitionId = PartitionId.from("%");
}
- return String.format("%s|%s|%s", _clusterId, _resourceId, _partitionId);
+ return String.format("%s|%s|%s|%s", _clusterId, _resourceId, _partitionId, _name);
}
String eventState() {
http://git-wip-us.apache.org/repos/asf/helix/blob/b1df294b/helix-monitor-client/src/main/java/org/apache/helix/monitoring/RiemannMonitoringClient.java
----------------------------------------------------------------------
diff --git a/helix-monitor-client/src/main/java/org/apache/helix/monitoring/RiemannMonitoringClient.java b/helix-monitor-client/src/main/java/org/apache/helix/monitoring/RiemannMonitoringClient.java
index 1948308..20b0825 100644
--- a/helix-monitor-client/src/main/java/org/apache/helix/monitoring/RiemannMonitoringClient.java
+++ b/helix-monitor-client/src/main/java/org/apache/helix/monitoring/RiemannMonitoringClient.java
@@ -311,35 +311,35 @@ public class RiemannMonitoringClient implements MonitoringClient {
private EventDSL convertEvent(AbstractRiemannClient c, MonitoringEvent helixEvent) {
EventDSL event = c.event();
if (helixEvent.host() != null) {
- event = event.host(helixEvent.host());
+ event.host(helixEvent.host());
}
if (helixEvent.service() != null) {
- event = event.service(helixEvent.service());
+ event.service(helixEvent.service());
}
if (helixEvent.eventState() != null) {
- event = event.state(helixEvent.eventState());
+ event.state(helixEvent.eventState());
}
if (helixEvent.description() != null) {
- event = event.description(helixEvent.description());
+ event.description(helixEvent.description());
}
if (helixEvent.time() != null) {
- event = event.time(helixEvent.time());
+ event.time(helixEvent.time());
}
if (helixEvent.ttl() != null) {
- event = event.ttl(helixEvent.ttl());
+ event.ttl(helixEvent.ttl());
}
if (helixEvent.longMetric() != null) {
- event = event.metric(helixEvent.longMetric());
+ event.metric(helixEvent.longMetric());
} else if (helixEvent.floatMetric() != null) {
- event = event.metric(helixEvent.floatMetric());
+ event.metric(helixEvent.floatMetric());
} else if (helixEvent.doubleMetric() != null) {
- event = event.metric(helixEvent.doubleMetric());
+ event.metric(helixEvent.doubleMetric());
}
if (!helixEvent.tags().isEmpty()) {
- event = event.tags(helixEvent.tags());
+ event.tags(helixEvent.tags());
}
if (!helixEvent.attributes().isEmpty()) {
- event = event.attributes(helixEvent.attributes());
+ event.attributes.putAll(helixEvent.attributes());
}
return event;
}
http://git-wip-us.apache.org/repos/asf/helix/blob/b1df294b/helix-monitor-server/pom.xml
----------------------------------------------------------------------
diff --git a/helix-monitor-server/pom.xml b/helix-monitor-server/pom.xml
index a703e0e..d6fdf52 100644
--- a/helix-monitor-server/pom.xml
+++ b/helix-monitor-server/pom.xml
@@ -44,6 +44,11 @@ under the License.
<artifactId>helix-core</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.helix</groupId>
+ <artifactId>helix-monitor-client</artifactId>
+ <version>0.7.1-incubating-SNAPSHOT</version>
+ </dependency>
+ <dependency>
<groupId>riemann</groupId>
<artifactId>riemann</artifactId>
<version>0.2.4</version>
http://git-wip-us.apache.org/repos/asf/helix/blob/b1df294b/helix-monitor-server/src/main/resources/riemann.config
----------------------------------------------------------------------
diff --git a/helix-monitor-server/src/main/resources/riemann.config b/helix-monitor-server/src/main/resources/riemann.config
index 08c3bce..0f06dd0 100644
--- a/helix-monitor-server/src/main/resources/riemann.config
+++ b/helix-monitor-server/src/main/resources/riemann.config
@@ -20,13 +20,13 @@
(logging/init :file "/dev/null")
-(tcp-server)
+(tcp-server :host "0.0.0.0")
(instrumentation {:interval 1})
-(udp-server)
-(ws-server)
-(repl-server)
+(udp-server :host "0.0.0.0")
+(ws-server :host "0.0.0.0")
+(repl-server :host "0.0.0.0")
(periodically-expire 1)
@@ -34,3 +34,4 @@
(streams
(expired prn)
index))
+
http://git-wip-us.apache.org/repos/asf/helix/blob/b1df294b/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestClientServerMonitoring.java
----------------------------------------------------------------------
diff --git a/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestClientServerMonitoring.java b/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestClientServerMonitoring.java
new file mode 100644
index 0000000..8b7f839
--- /dev/null
+++ b/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestClientServerMonitoring.java
@@ -0,0 +1,222 @@
+package org.apache.helix.monitoring;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.net.InetAddress;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.Leader;
+import org.apache.helix.model.MonitoringConfig;
+import org.apache.helix.spectator.RoutingTableProvider;
+import org.junit.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Maps;
+
+public class TestClientServerMonitoring extends ZkUnitTestBase {
+ @Test
+ public void testMonitoring() throws Exception {
+ final int NUM_PARTICIPANTS = 4;
+ final int NUM_PARTITIONS = 8;
+ final int NUM_REPLICAS = 2;
+
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+ // Set up cluster
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ NUM_PARTITIONS, // partitions per resource
+ NUM_PARTICIPANTS, // number of nodes
+ NUM_REPLICAS, // replicas
+ "MasterSlave", // pick a built-in state model
+ RebalanceMode.FULL_AUTO, // let Helix handle rebalancing
+ true); // do rebalance
+
+ // start participants
+ MockParticipantManager[] participants = new MockParticipantManager[NUM_PARTICIPANTS];
+ for (int i = 0; i < NUM_PARTICIPANTS; i++) {
+ participants[i] =
+ new MockParticipantManager(ZK_ADDR, clusterName, "localhost_" + (12918 + i));
+ participants[i].syncStart();
+ }
+ HelixDataAccessor accessor = participants[0].getHelixDataAccessor();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+ // set a custom monitoring config
+ MonitoringConfig monitoringConfig = new MonitoringConfig("sampleMonitoringConfig");
+ monitoringConfig.setConfig(getMonitoringConfigString());
+ accessor.setProperty(keyBuilder.monitoringConfig("sampleMonitoringConfig"), monitoringConfig);
+
+ // start controller
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller");
+ controller.registerMonitoringServer(new RiemannMonitoringServer(InetAddress.getLocalHost()
+ .getHostName()));
+ controller.syncStart();
+
+ // make sure the leader has registered and is showing the server port
+ Leader leader = accessor.getProperty(keyBuilder.controllerLeader());
+ Assert.assertNotNull(leader);
+ Assert.assertNotEquals(leader.getMonitoringPort(), -1);
+ Assert.assertNotNull(leader.getMonitoringHost());
+
+ // run the spectator
+ spectate(clusterName, "TestDB0", NUM_PARTITIONS);
+
+ // stop participants
+ for (MockParticipantManager participant : participants) {
+ participant.syncStop();
+ }
+
+ // stop controller
+ controller.syncStop();
+ }
+
+ private String getMonitoringConfigString() {
+ StringBuilder sb =
+ new StringBuilder()
+ .append("(defn parse-int\r\n")
+ .append(
+ " \"Convert a string to an integer\"\r\n [instr]\r\n (Integer/parseInt instr))\r\n\r\n")
+ .append("(defn parse-double\r\n \"Convert a string into a double\"\r\n [instr]\r\n")
+ .append(" (Double/parseDouble instr))\r\n\r\n(defn check-failure-rate\r\n")
+ .append(
+ " \"Check if the event should trigger an alarm based on failure rate\"\r\n [e]\r\n")
+ .append(
+ " (let [writeCount (parse-int (:writeCount e)) failedCount (parse-int (:failedCount e))]\r\n")
+ .append(
+ " (if (> writeCount 0)\r\n (let [ratio (double (/ failedCount writeCount))]\r\n")
+ .append(" (if (> ratio 0.1) ; Report if the failure count exceeds 10%\r\n")
+ .append(
+ " (prn (:host e) \"has an unacceptable failure rate of\" ratio))))))\r\n\r\n")
+ .append(
+ "(defn check-95th-latency\r\n \"Check if the 95th percentile latency is within expectations\"\r\n")
+ .append(" [e]\r\n (let [latency (parse-double (:latency95 e))]\r\n")
+ .append(
+ " (if (> latency 1.0) ; Report if the 95th percentile latency exceeds 1.0s\r\n")
+ .append(
+ " (prn (:host e) \"has an unacceptable 95th percentile latency of\" latency))))\r\n\r\n")
+ .append("(streams\r\n (where\r\n (service #\".*LatencyReport.*\")")
+ .append(
+ " ; Only process services containing LatencyReport\r\n check-failure-rate\r\n")
+ .append(" check-95th-latency))");
+ return sb.toString();
+ }
+
+ private void spectate(final String clusterName, final String resourceName, final int numPartitions)
+ throws Exception {
+ final Random random = new Random();
+ final ClusterId clusterId = ClusterId.from(clusterName);
+ final ResourceId resourceId = ResourceId.from(resourceName);
+
+ // Connect to Helix
+ final HelixManager manager =
+ HelixManagerFactory.getZKHelixManager(clusterName, null, InstanceType.SPECTATOR, ZK_ADDR);
+ manager.connect();
+
+ // Attach a monitoring client to this connection
+ final MonitoringClient client =
+ new RiemannMonitoringClient(clusterId, manager.getHelixDataAccessor());
+ client.connect();
+
+ // Start spectating
+ final RoutingTableProvider routingTableProvider = new RoutingTableProvider();
+ manager.addExternalViewChangeListener(routingTableProvider);
+
+ // Send some metrics
+ client.every(5, 0, TimeUnit.SECONDS, new Runnable() {
+ @Override
+ public void run() {
+ Map<ParticipantId, Integer> writeCounts = Maps.newHashMap();
+ Map<ParticipantId, Integer> failedCounts = Maps.newHashMap();
+ Map<ParticipantId, Double> latency95Map = Maps.newHashMap();
+ for (int i = 0; i < numPartitions; i++) {
+ // Figure out who hosts what
+ PartitionId partitionId = PartitionId.from(resourceId, i + "");
+ List<InstanceConfig> instances =
+ routingTableProvider.getInstances(resourceName, partitionId.stringify(), "MASTER");
+ if (instances.size() < 1) {
+ continue;
+ }
+
+ // Normally you would get these attributes by using a CallTracker
+ ParticipantId participantId = instances.get(0).getParticipantId();
+ int writeCount = random.nextInt(1000) + 10;
+ if (!writeCounts.containsKey(participantId)) {
+ writeCounts.put(participantId, writeCount);
+ } else {
+ writeCounts.put(participantId, writeCounts.get(participantId) + writeCount);
+ }
+ int failedCount = i != 0 ? 0 : writeCount / 2; // bad write count from p0 master
+ if (!failedCounts.containsKey(participantId)) {
+ failedCounts.put(participantId, failedCount);
+ } else {
+ failedCounts.put(participantId, failedCounts.get(participantId) + failedCount);
+ }
+ double latency = (i != 1) ? 0.001 : 5.000; // bad 95th latency from p1 master
+ latency95Map.put(participantId, latency);
+ }
+
+ // Send everything grouped by participant
+ for (ParticipantId participantId : writeCounts.keySet()) {
+ Map<String, String> attributes = Maps.newHashMap();
+ attributes.put("writeCount", writeCounts.get(participantId) + "");
+ attributes.put("failedCount", failedCounts.get(participantId) + "");
+ attributes.put("latency95", latency95Map.get(participantId) + "");
+
+ // Send an event with a ttl long enough to span the send interval
+ MonitoringEvent e =
+ new MonitoringEvent().cluster(clusterId).resource(resourceId)
+ .participant(participantId).name("LatencyReport").attributes(attributes)
+ .eventState("update").ttl(10.0f);
+ client.send(e, false);
+ }
+ }
+ });
+ Thread.sleep(60000);
+ client.disconnect();
+ manager.disconnect();
+ }
+
+}