You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/01/18 03:02:43 UTC

[2/2] git commit: A basic alerting example

A basic alerting example


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/b1df294b
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/b1df294b
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/b1df294b

Branch: refs/heads/helix-monitoring
Commit: b1df294baa798f88898029d94f2fe8d165c91ebc
Parents: 902e6fa
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Fri Jan 17 18:02:27 2014 -0800
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Fri Jan 17 18:02:27 2014 -0800

----------------------------------------------------------------------
 .../helix/monitoring/MonitoringEvent.java       |  16 +-
 .../monitoring/RiemannMonitoringClient.java     |  22 +-
 helix-monitor-server/pom.xml                    |   5 +
 .../src/main/resources/riemann.config           |   9 +-
 .../monitoring/TestClientServerMonitoring.java  | 222 +++++++++++++++++++
 5 files changed, 257 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/b1df294b/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringEvent.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringEvent.java b/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringEvent.java
index 3735589..80006fb 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringEvent.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringEvent.java
@@ -40,6 +40,7 @@ public class MonitoringEvent {
   private ClusterId _clusterId;
   private ResourceId _resourceId;
   private PartitionId _partitionId;
+  private String _name;
   private String _host;
   private String _eventState;
   private String _description;
@@ -56,9 +57,10 @@ public class MonitoringEvent {
    */
   public MonitoringEvent() {
     _clusterId = null;
-    _host = null;
     _resourceId = null;
     _partitionId = null;
+    _name = null;
+    _host = null;
     _eventState = null;
     _description = null;
     _time = null;
@@ -71,6 +73,16 @@ public class MonitoringEvent {
   }
 
   /**
+   * Give this event a name
+   * @param name the name
+   * @return MonitoringEvent
+   */
+  public MonitoringEvent name(String name) {
+    _name = name;
+    return this;
+  }
+
+  /**
    * Set the cluster this event corresponds to
    * @param clusterId the cluster id
    * @return MonitoringEvent
@@ -257,7 +269,7 @@ public class MonitoringEvent {
     if (_partitionId == null) {
       _partitionId = PartitionId.from("%");
     }
-    return String.format("%s|%s|%s", _clusterId, _resourceId, _partitionId);
+    return String.format("%s|%s|%s|%s", _clusterId, _resourceId, _partitionId, _name);
   }
 
   String eventState() {

http://git-wip-us.apache.org/repos/asf/helix/blob/b1df294b/helix-monitor-client/src/main/java/org/apache/helix/monitoring/RiemannMonitoringClient.java
----------------------------------------------------------------------
diff --git a/helix-monitor-client/src/main/java/org/apache/helix/monitoring/RiemannMonitoringClient.java b/helix-monitor-client/src/main/java/org/apache/helix/monitoring/RiemannMonitoringClient.java
index 1948308..20b0825 100644
--- a/helix-monitor-client/src/main/java/org/apache/helix/monitoring/RiemannMonitoringClient.java
+++ b/helix-monitor-client/src/main/java/org/apache/helix/monitoring/RiemannMonitoringClient.java
@@ -311,35 +311,35 @@ public class RiemannMonitoringClient implements MonitoringClient {
   private EventDSL convertEvent(AbstractRiemannClient c, MonitoringEvent helixEvent) {
     EventDSL event = c.event();
     if (helixEvent.host() != null) {
-      event = event.host(helixEvent.host());
+      event.host(helixEvent.host());
     }
     if (helixEvent.service() != null) {
-      event = event.service(helixEvent.service());
+      event.service(helixEvent.service());
     }
     if (helixEvent.eventState() != null) {
-      event = event.state(helixEvent.eventState());
+      event.state(helixEvent.eventState());
     }
     if (helixEvent.description() != null) {
-      event = event.description(helixEvent.description());
+      event.description(helixEvent.description());
     }
     if (helixEvent.time() != null) {
-      event = event.time(helixEvent.time());
+      event.time(helixEvent.time());
     }
     if (helixEvent.ttl() != null) {
-      event = event.ttl(helixEvent.ttl());
+      event.ttl(helixEvent.ttl());
     }
     if (helixEvent.longMetric() != null) {
-      event = event.metric(helixEvent.longMetric());
+      event.metric(helixEvent.longMetric());
     } else if (helixEvent.floatMetric() != null) {
-      event = event.metric(helixEvent.floatMetric());
+      event.metric(helixEvent.floatMetric());
     } else if (helixEvent.doubleMetric() != null) {
-      event = event.metric(helixEvent.doubleMetric());
+      event.metric(helixEvent.doubleMetric());
     }
     if (!helixEvent.tags().isEmpty()) {
-      event = event.tags(helixEvent.tags());
+      event.tags(helixEvent.tags());
     }
     if (!helixEvent.attributes().isEmpty()) {
-      event = event.attributes(helixEvent.attributes());
+      event.attributes.putAll(helixEvent.attributes());
     }
     return event;
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/b1df294b/helix-monitor-server/pom.xml
----------------------------------------------------------------------
diff --git a/helix-monitor-server/pom.xml b/helix-monitor-server/pom.xml
index a703e0e..d6fdf52 100644
--- a/helix-monitor-server/pom.xml
+++ b/helix-monitor-server/pom.xml
@@ -44,6 +44,11 @@ under the License.
       <artifactId>helix-core</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.apache.helix</groupId>
+      <artifactId>helix-monitor-client</artifactId>
+      <version>0.7.1-incubating-SNAPSHOT</version>
+    </dependency>
+    <dependency>
       <groupId>riemann</groupId>
       <artifactId>riemann</artifactId>
       <version>0.2.4</version>

http://git-wip-us.apache.org/repos/asf/helix/blob/b1df294b/helix-monitor-server/src/main/resources/riemann.config
----------------------------------------------------------------------
diff --git a/helix-monitor-server/src/main/resources/riemann.config b/helix-monitor-server/src/main/resources/riemann.config
index 08c3bce..0f06dd0 100644
--- a/helix-monitor-server/src/main/resources/riemann.config
+++ b/helix-monitor-server/src/main/resources/riemann.config
@@ -20,13 +20,13 @@
 
 (logging/init :file "/dev/null")
 
-(tcp-server)
+(tcp-server :host "0.0.0.0")
 
 (instrumentation {:interval 1})
 
-(udp-server)
-(ws-server)
-(repl-server)
+(udp-server :host "0.0.0.0")
+(ws-server :host "0.0.0.0")
+(repl-server :host "0.0.0.0")
 
 (periodically-expire 1)
 
@@ -34,3 +34,4 @@
   (streams
     (expired prn)
     index))
+

http://git-wip-us.apache.org/repos/asf/helix/blob/b1df294b/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestClientServerMonitoring.java
----------------------------------------------------------------------
diff --git a/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestClientServerMonitoring.java b/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestClientServerMonitoring.java
new file mode 100644
index 0000000..8b7f839
--- /dev/null
+++ b/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestClientServerMonitoring.java
@@ -0,0 +1,222 @@
+package org.apache.helix.monitoring;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.net.InetAddress;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.Leader;
+import org.apache.helix.model.MonitoringConfig;
+import org.apache.helix.spectator.RoutingTableProvider;
+import org.junit.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Maps;
+
+public class TestClientServerMonitoring extends ZkUnitTestBase {
+  @Test
+  public void testMonitoring() throws Exception {
+    final int NUM_PARTICIPANTS = 4;
+    final int NUM_PARTITIONS = 8;
+    final int NUM_REPLICAS = 2;
+
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    // Set up cluster
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        "TestDB", // resource name prefix
+        1, // resources
+        NUM_PARTITIONS, // partitions per resource
+        NUM_PARTICIPANTS, // number of nodes
+        NUM_REPLICAS, // replicas
+        "MasterSlave", // pick a built-in state model
+        RebalanceMode.FULL_AUTO, // let Helix handle rebalancing
+        true); // do rebalance
+
+    // start participants
+    MockParticipantManager[] participants = new MockParticipantManager[NUM_PARTICIPANTS];
+    for (int i = 0; i < NUM_PARTICIPANTS; i++) {
+      participants[i] =
+          new MockParticipantManager(ZK_ADDR, clusterName, "localhost_" + (12918 + i));
+      participants[i].syncStart();
+    }
+    HelixDataAccessor accessor = participants[0].getHelixDataAccessor();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+    // set a custom monitoring config
+    MonitoringConfig monitoringConfig = new MonitoringConfig("sampleMonitoringConfig");
+    monitoringConfig.setConfig(getMonitoringConfigString());
+    accessor.setProperty(keyBuilder.monitoringConfig("sampleMonitoringConfig"), monitoringConfig);
+
+    // start controller
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller");
+    controller.registerMonitoringServer(new RiemannMonitoringServer(InetAddress.getLocalHost()
+        .getHostName()));
+    controller.syncStart();
+
+    // make sure the leader has registered and is showing the server port
+    Leader leader = accessor.getProperty(keyBuilder.controllerLeader());
+    Assert.assertNotNull(leader);
+    Assert.assertNotEquals(leader.getMonitoringPort(), -1);
+    Assert.assertNotNull(leader.getMonitoringHost());
+
+    // run the spectator
+    spectate(clusterName, "TestDB0", NUM_PARTITIONS);
+
+    // stop participants
+    for (MockParticipantManager participant : participants) {
+      participant.syncStop();
+    }
+
+    // stop controller
+    controller.syncStop();
+  }
+
+  private String getMonitoringConfigString() {
+    StringBuilder sb =
+        new StringBuilder()
+            .append("(defn parse-int\r\n")
+            .append(
+                "  \"Convert a string to an integer\"\r\n  [instr]\r\n  (Integer/parseInt instr))\r\n\r\n")
+            .append("(defn parse-double\r\n  \"Convert a string into a double\"\r\n  [instr]\r\n")
+            .append("  (Double/parseDouble instr))\r\n\r\n(defn check-failure-rate\r\n")
+            .append(
+                "  \"Check if the event should trigger an alarm based on failure rate\"\r\n  [e]\r\n")
+            .append(
+                "  (let [writeCount (parse-int (:writeCount e)) failedCount (parse-int (:failedCount e))]\r\n")
+            .append(
+                "    (if (> writeCount 0)\r\n      (let [ratio (double (/ failedCount writeCount))]\r\n")
+            .append("        (if (> ratio 0.1) ; Report if the failure count exceeds 10%\r\n")
+            .append(
+                "          (prn (:host e) \"has an unacceptable failure rate of\" ratio))))))\r\n\r\n")
+            .append(
+                "(defn check-95th-latency\r\n  \"Check if the 95th percentile latency is within expectations\"\r\n")
+            .append("  [e]\r\n  (let [latency (parse-double (:latency95 e))]\r\n")
+            .append(
+                "    (if (> latency 1.0) ; Report if the 95th percentile latency exceeds 1.0s\r\n")
+            .append(
+                "      (prn (:host e) \"has an unacceptable 95th percentile latency of\" latency))))\r\n\r\n")
+            .append("(streams\r\n  (where\r\n    (service #\".*LatencyReport.*\")")
+            .append(
+                " ; Only process services containing LatencyReport\r\n    check-failure-rate\r\n")
+            .append("    check-95th-latency))");
+    return sb.toString();
+  }
+
+  private void spectate(final String clusterName, final String resourceName, final int numPartitions)
+      throws Exception {
+    final Random random = new Random();
+    final ClusterId clusterId = ClusterId.from(clusterName);
+    final ResourceId resourceId = ResourceId.from(resourceName);
+
+    // Connect to Helix
+    final HelixManager manager =
+        HelixManagerFactory.getZKHelixManager(clusterName, null, InstanceType.SPECTATOR, ZK_ADDR);
+    manager.connect();
+
+    // Attach a monitoring client to this connection
+    final MonitoringClient client =
+        new RiemannMonitoringClient(clusterId, manager.getHelixDataAccessor());
+    client.connect();
+
+    // Start spectating
+    final RoutingTableProvider routingTableProvider = new RoutingTableProvider();
+    manager.addExternalViewChangeListener(routingTableProvider);
+
+    // Send some metrics
+    client.every(5, 0, TimeUnit.SECONDS, new Runnable() {
+      @Override
+      public void run() {
+        Map<ParticipantId, Integer> writeCounts = Maps.newHashMap();
+        Map<ParticipantId, Integer> failedCounts = Maps.newHashMap();
+        Map<ParticipantId, Double> latency95Map = Maps.newHashMap();
+        for (int i = 0; i < numPartitions; i++) {
+          // Figure out who hosts what
+          PartitionId partitionId = PartitionId.from(resourceId, i + "");
+          List<InstanceConfig> instances =
+              routingTableProvider.getInstances(resourceName, partitionId.stringify(), "MASTER");
+          if (instances.size() < 1) {
+            continue;
+          }
+
+          // Normally you would get these attributes by using a CallTracker
+          ParticipantId participantId = instances.get(0).getParticipantId();
+          int writeCount = random.nextInt(1000) + 10;
+          if (!writeCounts.containsKey(participantId)) {
+            writeCounts.put(participantId, writeCount);
+          } else {
+            writeCounts.put(participantId, writeCounts.get(participantId) + writeCount);
+          }
+          int failedCount = i != 0 ? 0 : writeCount / 2; // bad write count from p0 master
+          if (!failedCounts.containsKey(participantId)) {
+            failedCounts.put(participantId, failedCount);
+          } else {
+            failedCounts.put(participantId, failedCounts.get(participantId) + failedCount);
+          }
+          double latency = (i != 1) ? 0.001 : 5.000; // bad 95th latency from p1 master
+          latency95Map.put(participantId, latency);
+        }
+
+        // Send everything grouped by participant
+        for (ParticipantId participantId : writeCounts.keySet()) {
+          Map<String, String> attributes = Maps.newHashMap();
+          attributes.put("writeCount", writeCounts.get(participantId) + "");
+          attributes.put("failedCount", failedCounts.get(participantId) + "");
+          attributes.put("latency95", latency95Map.get(participantId) + "");
+
+          // Send an event with a ttl long enough to span the send interval
+          MonitoringEvent e =
+              new MonitoringEvent().cluster(clusterId).resource(resourceId)
+                  .participant(participantId).name("LatencyReport").attributes(attributes)
+                  .eventState("update").ttl(10.0f);
+          client.send(e, false);
+        }
+      }
+    });
+    Thread.sleep(60000);
+    client.disconnect();
+    manager.disconnect();
+  }
+
+}