You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2014/02/27 01:52:51 UTC
[1/2] [HELIX-319] refactor MonitoringClient to accommodate
distributed monitoring server, rb=18455
Repository: helix
Updated Branches:
refs/heads/helix-monitoring b182690f8 -> 3505beadb
http://git-wip-us.apache.org/repos/asf/helix/blob/3505bead/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestClientServerMonitoring.java
----------------------------------------------------------------------
diff --git a/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestClientServerMonitoring.java b/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestClientServerMonitoring.java
index 8b7f839..588bc35 100644
--- a/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestClientServerMonitoring.java
+++ b/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestClientServerMonitoring.java
@@ -19,204 +19,170 @@ package org.apache.helix.monitoring;
* under the License.
*/
-import java.net.InetAddress;
import java.util.Date;
import java.util.List;
-import java.util.Map;
-import java.util.Random;
import java.util.concurrent.TimeUnit;
+import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerFactory;
-import org.apache.helix.InstanceType;
+import org.apache.helix.MonitoringTestHelper;
import org.apache.helix.PropertyKey;
import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
import org.apache.helix.ZkUnitTestBase;
import org.apache.helix.api.id.ClusterId;
-import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
import org.apache.helix.integration.manager.ClusterControllerManager;
-import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
import org.apache.helix.model.IdealState.RebalanceMode;
-import org.apache.helix.model.InstanceConfig;
-import org.apache.helix.model.Leader;
import org.apache.helix.model.MonitoringConfig;
-import org.apache.helix.spectator.RoutingTableProvider;
+import org.apache.helix.model.builder.HelixConfigScopeBuilder;
+import org.apache.helix.tools.ClusterStateVerifier;
import org.junit.Assert;
import org.testng.annotations.Test;
-import com.google.common.collect.Maps;
+import com.aphyr.riemann.Proto.Event;
+import com.aphyr.riemann.client.RiemannClient;
public class TestClientServerMonitoring extends ZkUnitTestBase {
@Test
public void testMonitoring() throws Exception {
- final int NUM_PARTICIPANTS = 4;
+ final int NUM_PARTICIPANTS = 0;
final int NUM_PARTITIONS = 8;
- final int NUM_REPLICAS = 2;
+ final int NUM_REPLICAS = 1;
String className = TestHelper.getTestClassName();
String methodName = TestHelper.getTestMethodName();
String clusterName = className + "_" + methodName;
System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
- // Set up cluster
+ // Set up monitoring cluster
TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
"localhost", // participant name prefix
- "TestDB", // resource name prefix
+ "MonitoringService", // resource name prefix
1, // resources
NUM_PARTITIONS, // partitions per resource
NUM_PARTICIPANTS, // number of nodes
NUM_REPLICAS, // replicas
- "MasterSlave", // pick a built-in state model
+ "OnlineOffline", // pick a built-in state model
RebalanceMode.FULL_AUTO, // let Helix handle rebalancing
true); // do rebalance
- // start participants
- MockParticipantManager[] participants = new MockParticipantManager[NUM_PARTICIPANTS];
- for (int i = 0; i < NUM_PARTICIPANTS; i++) {
- participants[i] =
- new MockParticipantManager(ZK_ADDR, clusterName, "localhost_" + (12918 + i));
- participants[i].syncStart();
- }
- HelixDataAccessor accessor = participants[0].getHelixDataAccessor();
- PropertyKey.Builder keyBuilder = accessor.keyBuilder();
-
- // set a custom monitoring config
- MonitoringConfig monitoringConfig = new MonitoringConfig("sampleMonitoringConfig");
- monitoringConfig.setConfig(getMonitoringConfigString());
- accessor.setProperty(keyBuilder.monitoringConfig("sampleMonitoringConfig"), monitoringConfig);
-
- // start controller
+ // Enable auto-join
+ HelixConfigScope scope =
+ new HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER).forCluster(clusterName).build();
+ ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient);
+ configAccessor.set(scope, ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN, "" + true);
+
+ // Start controller
ClusterControllerManager controller =
new ClusterControllerManager(ZK_ADDR, clusterName, "controller");
- controller.registerMonitoringServer(new RiemannMonitoringServer(InetAddress.getLocalHost()
- .getHostName()));
controller.syncStart();
- // make sure the leader has registered and is showing the server port
- Leader leader = accessor.getProperty(keyBuilder.controllerLeader());
- Assert.assertNotNull(leader);
- Assert.assertNotEquals(leader.getMonitoringPort(), -1);
- Assert.assertNotNull(leader.getMonitoringHost());
+ // Start monitoring server
+ int port = MonitoringTestHelper.availableTcpPort();
+ MonitoringConfig monitoringConfig = new MonitoringConfig(RiemannConfigs.DEFAULT_RIEMANN_CONFIG);
+ monitoringConfig.setConfig(MonitoringTestHelper.getRiemannConfigString(port));
+
+ RiemannConfigs.Builder builder = new RiemannConfigs.Builder().addConfig(monitoringConfig);
+ RiemannMonitoringServer server = new RiemannMonitoringServer(builder.build());
+ server.start();
+
+ // Start Riemann agent
+ RiemannAgent agent = new RiemannAgent(ZK_ADDR, clusterName, port);
+ agent.start();
+
+ // Check live-instance
+ final HelixDataAccessor accessor =
+ new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+ final PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ List<String> liveInstances = accessor.getChildNames(keyBuilder.liveInstances());
+ Assert.assertNotNull(liveInstances);
+ Assert.assertEquals(liveInstances.size(), 1);
+
+ // Check external-view
+ boolean result =
+ ClusterStateVerifier
+ .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // Connect monitoring client
+ final RiemannMonitoringClient client =
+ new RiemannMonitoringClient(ZK_ADDR, ClusterId.from(clusterName),
+ ResourceId.from("MonitoringService0"), 1);
+ client.connect();
- // run the spectator
- spectate(clusterName, "TestDB0", NUM_PARTITIONS);
+ final RiemannClient rclient = RiemannClient.tcp("localhost", port);
+ rclient.connect();
- // stop participants
- for (MockParticipantManager participant : participants) {
- participant.syncStop();
- }
+ // Test MonitoringEvent#send()
+ MonitoringEvent event = new MonitoringEvent().tag("test").ttl(5);
+ client.send(ResourceId.from("TestDB"), event, false);
- // stop controller
- controller.syncStop();
- }
+ // Check monitoring server has received the event with tag="test"
+ result = TestHelper.verify(new TestHelper.Verifier() {
- private String getMonitoringConfigString() {
- StringBuilder sb =
- new StringBuilder()
- .append("(defn parse-int\r\n")
- .append(
- " \"Convert a string to an integer\"\r\n [instr]\r\n (Integer/parseInt instr))\r\n\r\n")
- .append("(defn parse-double\r\n \"Convert a string into a double\"\r\n [instr]\r\n")
- .append(" (Double/parseDouble instr))\r\n\r\n(defn check-failure-rate\r\n")
- .append(
- " \"Check if the event should trigger an alarm based on failure rate\"\r\n [e]\r\n")
- .append(
- " (let [writeCount (parse-int (:writeCount e)) failedCount (parse-int (:failedCount e))]\r\n")
- .append(
- " (if (> writeCount 0)\r\n (let [ratio (double (/ failedCount writeCount))]\r\n")
- .append(" (if (> ratio 0.1) ; Report if the failure count exceeds 10%\r\n")
- .append(
- " (prn (:host e) \"has an unacceptable failure rate of\" ratio))))))\r\n\r\n")
- .append(
- "(defn check-95th-latency\r\n \"Check if the 95th percentile latency is within expectations\"\r\n")
- .append(" [e]\r\n (let [latency (parse-double (:latency95 e))]\r\n")
- .append(
- " (if (> latency 1.0) ; Report if the 95th percentile latency exceeds 1.0s\r\n")
- .append(
- " (prn (:host e) \"has an unacceptable 95th percentile latency of\" latency))))\r\n\r\n")
- .append("(streams\r\n (where\r\n (service #\".*LatencyReport.*\")")
- .append(
- " ; Only process services containing LatencyReport\r\n check-failure-rate\r\n")
- .append(" check-95th-latency))");
- return sb.toString();
- }
+ @Override
+ public boolean verify() throws Exception {
+ List<Event> events = rclient.query("tagged \"test\"");
+ return (events.size() == 1) && (events.get(0).getTagsCount() == 1)
+ && (events.get(0).getTags(0).equals("test"));
+ }
+ }, 5 * 1000);
+ Assert.assertTrue(result);
- private void spectate(final String clusterName, final String resourceName, final int numPartitions)
- throws Exception {
- final Random random = new Random();
- final ClusterId clusterId = ClusterId.from(clusterName);
- final ResourceId resourceId = ResourceId.from(resourceName);
+ // Test MonitoringEvent#sendAndFlush()
+ MonitoringEvent event2 = new MonitoringEvent().tag("test2").ttl(5);
+ client.sendAndFlush(ResourceId.from("TestDB2"), event2);
- // Connect to Helix
- final HelixManager manager =
- HelixManagerFactory.getZKHelixManager(clusterName, null, InstanceType.SPECTATOR, ZK_ADDR);
- manager.connect();
+ // Check monitoring server has received the event with tag="test2"
+ result = TestHelper.verify(new TestHelper.Verifier() {
- // Attach a monitoring client to this connection
- final MonitoringClient client =
- new RiemannMonitoringClient(clusterId, manager.getHelixDataAccessor());
- client.connect();
+ @Override
+ public boolean verify() throws Exception {
+ List<Event> events = rclient.query("tagged \"test2\"");
+ return (events.size() == 1) && (events.get(0).getTagsCount() == 1)
+ && (events.get(0).getTags(0).equals("test2"));
+ }
+ }, 5 * 1000);
+ Assert.assertTrue(result);
- // Start spectating
- final RoutingTableProvider routingTableProvider = new RoutingTableProvider();
- manager.addExternalViewChangeListener(routingTableProvider);
+ // Test MonitoringEvent#every()
+ client.every(ResourceId.from("TestDB3"), 1, 0, TimeUnit.SECONDS, new Runnable() {
- // Send some metrics
- client.every(5, 0, TimeUnit.SECONDS, new Runnable() {
@Override
public void run() {
- Map<ParticipantId, Integer> writeCounts = Maps.newHashMap();
- Map<ParticipantId, Integer> failedCounts = Maps.newHashMap();
- Map<ParticipantId, Double> latency95Map = Maps.newHashMap();
- for (int i = 0; i < numPartitions; i++) {
- // Figure out who hosts what
- PartitionId partitionId = PartitionId.from(resourceId, i + "");
- List<InstanceConfig> instances =
- routingTableProvider.getInstances(resourceName, partitionId.stringify(), "MASTER");
- if (instances.size() < 1) {
- continue;
- }
-
- // Normally you would get these attributes by using a CallTracker
- ParticipantId participantId = instances.get(0).getParticipantId();
- int writeCount = random.nextInt(1000) + 10;
- if (!writeCounts.containsKey(participantId)) {
- writeCounts.put(participantId, writeCount);
- } else {
- writeCounts.put(participantId, writeCounts.get(participantId) + writeCount);
- }
- int failedCount = i != 0 ? 0 : writeCount / 2; // bad write count from p0 master
- if (!failedCounts.containsKey(participantId)) {
- failedCounts.put(participantId, failedCount);
- } else {
- failedCounts.put(participantId, failedCounts.get(participantId) + failedCount);
- }
- double latency = (i != 1) ? 0.001 : 5.000; // bad 95th latency from p1 master
- latency95Map.put(participantId, latency);
- }
-
- // Send everything grouped by participant
- for (ParticipantId participantId : writeCounts.keySet()) {
- Map<String, String> attributes = Maps.newHashMap();
- attributes.put("writeCount", writeCounts.get(participantId) + "");
- attributes.put("failedCount", failedCounts.get(participantId) + "");
- attributes.put("latency95", latency95Map.get(participantId) + "");
-
- // Send an event with a ttl long enough to span the send interval
- MonitoringEvent e =
- new MonitoringEvent().cluster(clusterId).resource(resourceId)
- .participant(participantId).name("LatencyReport").attributes(attributes)
- .eventState("update").ttl(10.0f);
- client.send(e, false);
- }
+ MonitoringEvent event3 =
+ new MonitoringEvent().tag("test3").resource(ResourceId.from("db" + System.currentTimeMillis())).ttl(5);
+ client.send(ResourceId.from("TestDB3"), event3, false);
}
});
- Thread.sleep(60000);
+
+ // Check monitoring server has received at least 2 event2 with tag="test3"
+ result = TestHelper.verify(new TestHelper.Verifier() {
+
+ @Override
+ public boolean verify() throws Exception {
+ List<Event> events = rclient.query("tagged \"test3\"");
+ return (events.size() > 2) && (events.get(0).getTagsCount() == 1)
+ && (events.get(0).getTags(0).equals("test3"));
+ }
+ }, 10 * 1000);
+ Assert.assertTrue(result);
+
+ // Stop client
client.disconnect();
- manager.disconnect();
+ rclient.disconnect();
+
+ // Stop controller
+ controller.syncStop();
+
+ System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/3505bead/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestRiemannAgent.java
----------------------------------------------------------------------
diff --git a/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestRiemannAgent.java b/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestRiemannAgent.java
new file mode 100644
index 0000000..100c28c
--- /dev/null
+++ b/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestRiemannAgent.java
@@ -0,0 +1,129 @@
+package org.apache.helix.monitoring;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+import java.util.List;
+
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.MonitoringTestHelper;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.MonitoringConfig;
+import org.apache.helix.model.builder.HelixConfigScopeBuilder;
+import org.apache.helix.monitoring.RiemannConfigs.Builder;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Lists;
+
+public class TestRiemannAgent extends ZkUnitTestBase {
+ @Test
+ public void testStartAndStop() throws Exception {
+ final int NUM_PARTICIPANTS = 0;
+ final int NUM_PARTITIONS = 4;
+ final int NUM_REPLICAS = 1;
+
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+ // Set up monitoring cluster
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "MonitoringService", // resource name prefix
+ 1, // resources
+ NUM_PARTITIONS, // partitions per resource
+ NUM_PARTICIPANTS, // number of nodes
+ NUM_REPLICAS, // replicas
+ "OnlineOffline", // pick a built-in state model
+ RebalanceMode.FULL_AUTO, // let Helix handle rebalancing
+ true); // do rebalance
+
+ // Enable auto-join
+ HelixConfigScope scope =
+ new HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER).forCluster(clusterName).build();
+ ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient);
+ configAccessor.set(scope, ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN, "" + true);
+
+ // start controller
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller");
+ controller.syncStart();
+
+ // Start monitoring server
+ int port = MonitoringTestHelper.availableTcpPort();
+ MonitoringConfig monitoringConfig = new MonitoringConfig(RiemannConfigs.DEFAULT_RIEMANN_CONFIG);
+ monitoringConfig.setConfig(MonitoringTestHelper.getRiemannConfigString(port));
+
+ RiemannConfigs.Builder builder = new Builder().addConfig(monitoringConfig);
+ RiemannMonitoringServer server = new RiemannMonitoringServer(builder.build());
+ server.start();
+
+ // Start Riemann agent
+ RiemannAgent agent = new RiemannAgent(ZK_ADDR, clusterName, port);
+ agent.start();
+
+ // Check live-instance
+ final HelixDataAccessor accessor =
+ new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+ final PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ List<String> liveInstances = accessor.getChildNames(keyBuilder.liveInstances());
+ Assert.assertNotNull(liveInstances);
+ Assert.assertEquals(liveInstances.size(), 1);
+
+ // Check external-view
+ boolean result =
+ ClusterStateVerifier
+ .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // Stop monitoring server
+ server.stop();
+
+ result = TestHelper.verify(new TestHelper.Verifier() {
+
+ @Override
+ public boolean verify() throws Exception {
+ List<String> liveInstances = accessor.getChildNames(keyBuilder.liveInstances());
+ return liveInstances != null && liveInstances.size() == 0;
+ }
+ }, 15 * 1000);
+ Assert.assertTrue(result, "RiemannAgent should be disconnected if RiemannServer is stopped");
+
+ // Stop controller
+ controller.syncStop();
+
+ System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/3505bead/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestRiemannMonitoringServer.java
----------------------------------------------------------------------
diff --git a/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestRiemannMonitoringServer.java b/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestRiemannMonitoringServer.java
new file mode 100644
index 0000000..f7f45f9
--- /dev/null
+++ b/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestRiemannMonitoringServer.java
@@ -0,0 +1,79 @@
+package org.apache.helix.monitoring;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.Date;
+
+import org.apache.helix.MonitoringTestHelper;
+import org.apache.helix.TestHelper;
+import org.apache.helix.model.MonitoringConfig;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.aphyr.riemann.client.RiemannClient;
+import com.google.common.collect.Lists;
+
+public class TestRiemannMonitoringServer {
+
+ @Test
+ public void testBasic() throws IOException {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String testName = className + "_" + methodName;
+
+ System.out.println("START " + testName + " at " + new Date(System.currentTimeMillis()));
+
+ int port = MonitoringTestHelper.availableTcpPort();
+ MonitoringConfig monitoringConfig = new MonitoringConfig(RiemannConfigs.DEFAULT_RIEMANN_CONFIG);
+ monitoringConfig.setConfig(MonitoringTestHelper.getRiemannConfigString(port));
+
+ RiemannConfigs.Builder builder = new RiemannConfigs.Builder().addConfig(monitoringConfig);
+ RiemannMonitoringServer server = new RiemannMonitoringServer(builder.build());
+
+ // Check server starts
+ server.start();
+ Assert.assertTrue(server.isStarted());
+
+ RiemannClient rclient = null;
+ try {
+ rclient = RiemannClient.tcp("localhost", port);
+ rclient.connect();
+ } catch (IOException e) {
+ Assert.fail("Riemann server should start on port: " + port);
+ }
+
+ // Check server stops
+ Assert.assertNotNull(rclient);
+ rclient.disconnect();
+ server.stop();
+ Assert.assertFalse(server.isStarted());
+
+ try {
+ rclient = RiemannClient.tcp("localhost", port);
+ rclient.connect();
+ Assert.fail("Riemann server should be stopped on port: " + port);
+ } catch (IOException e) {
+ // ok
+ }
+
+ System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
+ }
+}
[2/2] git commit: [HELIX-319] refactor MonitoringClient to
accommodate distributed monitoring server, rb=18455
Posted by zz...@apache.org.
[HELIX-319] refactor MonitoringClient to accommodate distributed monitoring server, rb=18455
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/3505bead
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/3505bead
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/3505bead
Branch: refs/heads/helix-monitoring
Commit: 3505beadbc0f0b0f5cf3e804e23e0ee1ca6c2fb2
Parents: b182690
Author: zzhang <zz...@apache.org>
Authored: Wed Feb 26 16:52:26 2014 -0800
Committer: zzhang <zz...@apache.org>
Committed: Wed Feb 26 16:52:26 2014 -0800
----------------------------------------------------------------------
.../org/apache/helix/HelixAutoController.java | 4 +-
.../java/org/apache/helix/HelixController.java | 4 +-
.../java/org/apache/helix/HelixManager.java | 4 +-
.../manager/zk/HelixConnectionAdaptor.java | 36 --
.../apache/helix/manager/zk/ZKHelixManager.java | 28 +-
.../helix/manager/zk/ZkHelixAutoController.java | 10 -
.../helix/manager/zk/ZkHelixController.java | 26 +-
.../helix/manager/zk/ZkHelixLeaderElection.java | 6 +-
.../helix/monitoring/MonitoringClient.java | 16 +-
.../helix/monitoring/MonitoringServer.java | 19 -
.../helix/monitoring/MonitoringServerOwner.java | 37 --
.../src/test/java/org/apache/helix/Mocks.java | 12 -
.../org/apache/helix/MonitoringTestHelper.java | 114 +++++
.../controller/stages/DummyClusterManager.java | 11 -
.../helix/participant/MockZKHelixManager.java | 12 -
.../monitoring/RiemannMonitoringClient.java | 428 +++++++++++++------
helix-monitor-server/pom.xml | 5 +
.../apache/helix/monitoring/RiemannAgent.java | 123 ++++++
.../monitoring/RiemannAgentStateModel.java | 54 +++
.../RiemannAgentStateModelFactory.java | 29 ++
.../helix/monitoring/RiemannAlertProxy.java | 111 +++++
.../apache/helix/monitoring/RiemannConfigs.java | 116 +++++
.../monitoring/RiemannMonitoringServer.java | 133 +-----
.../helix/monitoring/BasicMonitoringTest.java | 93 ----
.../helix/monitoring/IntegrationTest.java | 206 +++++++++
.../monitoring/TestClientServerMonitoring.java | 254 +++++------
.../helix/monitoring/TestRiemannAgent.java | 129 ++++++
.../monitoring/TestRiemannMonitoringServer.java | 79 ++++
28 files changed, 1402 insertions(+), 697 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/3505bead/helix-core/src/main/java/org/apache/helix/HelixAutoController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixAutoController.java b/helix-core/src/main/java/org/apache/helix/HelixAutoController.java
index fdab2a6..04cbf2d 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixAutoController.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixAutoController.java
@@ -20,14 +20,12 @@ package org.apache.helix;
*/
import org.apache.helix.api.id.ControllerId;
-import org.apache.helix.monitoring.MonitoringServerOwner;
import org.apache.helix.participant.StateMachineEngine;
/**
* Autonomous controller
*/
-public interface HelixAutoController extends HelixRole, HelixService, HelixConnectionStateListener,
- MonitoringServerOwner {
+public interface HelixAutoController extends HelixRole, HelixService, HelixConnectionStateListener {
/**
* get controller id
* @return controller id
http://git-wip-us.apache.org/repos/asf/helix/blob/3505bead/helix-core/src/main/java/org/apache/helix/HelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixController.java b/helix-core/src/main/java/org/apache/helix/HelixController.java
index 098fd96..9565cfb 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixController.java
@@ -20,10 +20,8 @@ package org.apache.helix;
*/
import org.apache.helix.api.id.ControllerId;
-import org.apache.helix.monitoring.MonitoringServerOwner;
-public interface HelixController extends HelixRole, HelixService, HelixConnectionStateListener,
- MonitoringServerOwner {
+public interface HelixController extends HelixRole, HelixService, HelixConnectionStateListener {
/**
* get controller id
http://git-wip-us.apache.org/repos/asf/helix/blob/3505bead/helix-core/src/main/java/org/apache/helix/HelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixManager.java b/helix-core/src/main/java/org/apache/helix/HelixManager.java
index 17c94e5..75e9196 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixManager.java
@@ -25,7 +25,6 @@ import org.apache.helix.controller.GenericHelixController;
import org.apache.helix.healthcheck.ParticipantHealthReportCollector;
import org.apache.helix.manager.zk.ZKHelixManager;
import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
-import org.apache.helix.monitoring.MonitoringServerOwner;
import org.apache.helix.participant.HelixStateMachineEngine;
import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.spectator.RoutingTableProvider;
@@ -42,7 +41,6 @@ import org.apache.helix.store.zk.ZkHelixPropertyStore;
* // ROLE can be participant, spectator or a controller<br/>
* manager.connect();
* manager.addSOMEListener();
- * manager.start()
* After start is invoked the subsequent interactions will be via listener onChange callbacks
* There will be 3 scenarios for onChange callback, which can be determined using NotificationContext.type
* INIT -> will be invoked the first time the listener is added
@@ -56,7 +54,7 @@ import org.apache.helix.store.zk.ZkHelixPropertyStore;
* @see RoutingTableProvider RoutingTableProvider for spectator
* @see GenericHelixController RoutingTableProvider for controller
*/
-public interface HelixManager extends MonitoringServerOwner {
+public interface HelixManager {
public static final String ALLOW_PARTICIPANT_AUTO_JOIN =
ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN;
http://git-wip-us.apache.org/repos/asf/helix/blob/3505bead/helix-core/src/main/java/org/apache/helix/manager/zk/HelixConnectionAdaptor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/HelixConnectionAdaptor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/HelixConnectionAdaptor.java
index 65a192a..a2f6d9e 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/HelixConnectionAdaptor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/HelixConnectionAdaptor.java
@@ -315,40 +315,4 @@ public class HelixConnectionAdaptor implements HelixManager {
_connection.addControllerMessageListener(_role, listener, _clusterId);
}
- @Override
- public void registerMonitoringServer(MonitoringServer monitoringServer) {
- switch (_role.getType()) {
- case CONTROLLER:
- HelixController controller = (HelixController) _role;
- controller.registerMonitoringServer(monitoringServer);
- break;
- case CONTROLLER_PARTICIPANT:
- HelixAutoController autoController = (HelixAutoController) _role;
- autoController.registerMonitoringServer(monitoringServer);
- break;
- default:
- LOG.error("A non-controller cannot own a monitoring server!");
- break;
- }
- }
-
- @Override
- public MonitoringServer getMonitoringServer() {
- MonitoringServer server = null;
- switch (_role.getType()) {
- case CONTROLLER:
- HelixController controller = (HelixController) _role;
- server = controller.getMonitoringServer();
- break;
- case CONTROLLER_PARTICIPANT:
- HelixAutoController autoController = (HelixAutoController) _role;
- server = autoController.getMonitoringServer();
- break;
- default:
- LOG.error("Cannot get a monitoring server for a non-controller (" + _role.getType() + ")!");
- break;
- }
- return server;
- }
-
}
http://git-wip-us.apache.org/repos/asf/helix/blob/3505bead/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index 83eba53..b7b3381 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -66,6 +66,7 @@ import org.apache.helix.messaging.DefaultMessagingService;
import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
import org.apache.helix.model.Leader;
import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.MonitoringConfig;
import org.apache.helix.monitoring.MonitoringServer;
import org.apache.helix.monitoring.ZKPathDataDumpTask;
import org.apache.helix.participant.HelixStateMachineEngine;
@@ -106,7 +107,6 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
private ConfigAccessor _configAccessor;
private ZkHelixPropertyStore<ZNRecord> _helixPropertyStore;
protected LiveInstanceInfoProvider _liveInstanceInfoProvider = null;
- private MonitoringServer _monitoringServer;
private volatile String _sessionId;
@@ -214,8 +214,6 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
_sessionTimeout =
getSystemPropertyAsInt("zk.session.timeout", ZkClient.DEFAULT_SESSION_TIMEOUT);
- _monitoringServer = null;
-
/**
* instance type specific init
*/
@@ -548,11 +546,6 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
*/
_messagingService.getExecutor().shutdown();
- // stop monitoring
- if (_monitoringServer != null && _monitoringServer.isStarted()) {
- _monitoringServer.stop();
- }
-
// TODO reset user defined handlers only
resetHandlers();
@@ -867,16 +860,6 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
initHandlers(_handlers);
}
- @Override
- public void registerMonitoringServer(MonitoringServer monitoringServer) {
- _monitoringServer = monitoringServer;
- }
-
- @Override
- public MonitoringServer getMonitoringServer() {
- return _monitoringServer;
- }
-
void handleNewSessionAsParticipant() throws Exception {
/**
* auto-join
@@ -907,15 +890,6 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
}
void handleNewSessionAsController() {
- // get the monitoring service up
- if (_monitoringServer != null) {
- if (_monitoringServer.isStarted()) {
- _monitoringServer.stop();
- }
- _monitoringServer.addConfigs(_dataAccessor);
- _monitoringServer.start();
- }
-
// get the leader election process going
if (_leaderElectionHandler != null) {
_leaderElectionHandler.init();
http://git-wip-us.apache.org/repos/asf/helix/blob/3505bead/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixAutoController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixAutoController.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixAutoController.java
index 3967ed9..d17192f 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixAutoController.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixAutoController.java
@@ -131,14 +131,4 @@ public class ZkHelixAutoController implements HelixAutoController {
return _controller.isLeader();
}
- @Override
- public void registerMonitoringServer(MonitoringServer server) {
- _controller.registerMonitoringServer(server);
- }
-
- @Override
- public MonitoringServer getMonitoringServer() {
- return _controller.getMonitoringServer();
- }
-
}
http://git-wip-us.apache.org/repos/asf/helix/blob/3505bead/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java
index 9da59b9..75d15b9 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java
@@ -42,6 +42,7 @@ import org.apache.helix.healthcheck.HealthStatsAggregator;
import org.apache.helix.messaging.DefaultMessagingService;
import org.apache.helix.messaging.handling.MessageHandlerFactory;
import org.apache.helix.model.Leader;
+import org.apache.helix.model.MonitoringConfig;
import org.apache.helix.monitoring.MonitoringServer;
import org.apache.helix.monitoring.StatusDumpTask;
import org.apache.log4j.Logger;
@@ -59,7 +60,6 @@ public class ZkHelixController implements HelixController {
final HelixDataAccessor _accessor;
final HelixManager _manager;
final ZkHelixLeaderElection _leaderElection;
- MonitoringServer _monitoringServer;
public ZkHelixController(ZkHelixConnection connection, ClusterId clusterId,
ControllerId controllerId) {
@@ -79,7 +79,6 @@ public class ZkHelixController implements HelixController {
_timerTasks.add(new HealthStatsAggregationTask(new HealthStatsAggregator(_manager)));
_timerTasks.add(new StatusDumpTask(clusterId, _manager.getHelixDataAccessor()));
- _monitoringServer = null;
}
void startTimerTasks() {
@@ -118,11 +117,6 @@ public class ZkHelixController implements HelixController {
*/
_connection.resetHandlers(this);
- // stop the monitoring service if it's running
- if (_monitoringServer != null && _monitoringServer.isStarted()) {
- _monitoringServer.stop();
- }
-
}
void init() {
@@ -135,14 +129,6 @@ public class ZkHelixController implements HelixController {
}
/**
- * start the monitoring service if it exists; this must happen before leader election
- */
- if (_monitoringServer != null) {
- _monitoringServer.addConfigs(_accessor);
- _monitoringServer.start();
- }
-
- /**
* leader-election listener should be reset/init before all other controller listeners;
* it's ok to add a listener multiple times, since we check existence in
* ZkHelixConnection#addXXXListner()
@@ -213,16 +199,6 @@ public class ZkHelixController implements HelixController {
return false;
}
- @Override
- public void registerMonitoringServer(MonitoringServer server) {
- _monitoringServer = server;
- }
-
- @Override
- public MonitoringServer getMonitoringServer() {
- return _monitoringServer;
- }
-
void addListenersToController(GenericHelixController pipeline) {
try {
/**
http://git-wip-us.apache.org/repos/asf/helix/blob/3505bead/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixLeaderElection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixLeaderElection.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixLeaderElection.java
index 3c5b3db..c69d1a3 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixLeaderElection.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixLeaderElection.java
@@ -159,11 +159,7 @@ public class ZkHelixLeaderElection implements ControllerChangeListener {
} else {
LOG.warn("ZKPropertyTransferServer instance is null");
}
- MonitoringServer server = manager.getMonitoringServer();
- if (server != null) {
- leader.setMonitoringHost(server.getHost());
- leader.setMonitoringPort(server.getPort());
- }
+
boolean success = accessor.createProperty(keyBuilder.controllerLeader(), leader);
if (success) {
return true;
http://git-wip-us.apache.org/repos/asf/helix/blob/3505bead/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringClient.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringClient.java b/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringClient.java
index a794eab..743f8b4 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringClient.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringClient.java
@@ -21,14 +21,17 @@ package org.apache.helix.monitoring;
import java.util.concurrent.TimeUnit;
+import org.apache.helix.api.id.ResourceId;
+
/**
* Interface for a client that can register with a monitoring server and send events for monitoring
*/
public interface MonitoringClient {
/**
* Connect. May be asynchronous.
+ * @throws Exception
*/
- void connect();
+ void connect() throws Exception;
/**
* Disconnect synchronously.
@@ -37,27 +40,30 @@ public interface MonitoringClient {
/**
* Send an event
+ * @param resource
* @param e the event
* @param batch true if this should be part of a batch operation
* @return true if the event was sent (or queued for batching), false otherwise
*/
- boolean send(MonitoringEvent e, boolean batch);
+ boolean send(ResourceId resource, MonitoringEvent e, boolean batch);
/**
* Send an event and flush any outstanding messages
+ * @param resource
* @param e the event
* @return true if events were successfully sent, false otherwise
*/
- boolean sendAndFlush(MonitoringEvent e);
+ boolean sendAndFlush(ResourceId resource, MonitoringEvent e);
/**
* Schedule an operation to run
+ * @param resource
* @param interval the frequency
* @param delay the amount of time to wait before the first execution
* @param unit the unit of time to use
* @param r the code to run
*/
- void every(long interval, long delay, TimeUnit unit, Runnable r);
+ void every(ResourceId resource, long interval, long delay, TimeUnit unit, Runnable r);
/**
* Check if there is a valid connection to a monitoring server
@@ -79,7 +85,7 @@ public interface MonitoringClient {
/**
* Flush all outstanding events
- * @return true if the events were flushed, false otherwise
+ * @return true if all events were flushed, false otherwise
*/
boolean flush();
}
http://git-wip-us.apache.org/repos/asf/helix/blob/3505bead/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringServer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringServer.java b/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringServer.java
index 3168bb2..b8c5a4c 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringServer.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringServer.java
@@ -19,8 +19,6 @@ package org.apache.helix.monitoring;
* under the License.
*/
-import org.apache.helix.HelixDataAccessor;
-
/**
* Generic interface for a monitoring service that should be attached to a controller.
*/
@@ -36,26 +34,9 @@ public interface MonitoringServer {
public void stop();
/**
- * Add a collection of configuration files
- * @param accessor HelixDataAccessor that can reach Helix's backing store
- */
- public void addConfigs(HelixDataAccessor accessor);
-
- /**
* Check if the service has been started
* @return true if started, false otherwise
*/
public boolean isStarted();
- /**
- * Get the host of the service
- * @return String hostname
- */
- public String getHost();
-
- /**
- * Get the port of the service
- * @return integer port
- */
- public int getPort();
}
http://git-wip-us.apache.org/repos/asf/helix/blob/3505bead/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringServerOwner.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringServerOwner.java b/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringServerOwner.java
deleted file mode 100644
index 8e00c54..0000000
--- a/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringServerOwner.java
+++ /dev/null
@@ -1,37 +0,0 @@
-package org.apache.helix.monitoring;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * Interface to implement for Helix components that can manage a monitoring server
- */
-public interface MonitoringServerOwner {
- /**
- * Attach a monitoring server to this controller
- * @param server MonitoringServer implementation
- */
- void registerMonitoringServer(MonitoringServer server);
-
- /**
- * Get the monitoring service, if any, registered with this connection
- * @return a MonitoringService, or null if none
- */
- MonitoringServer getMonitoringServer();
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/3505bead/helix-core/src/test/java/org/apache/helix/Mocks.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/Mocks.java b/helix-core/src/test/java/org/apache/helix/Mocks.java
index 87d4e68..d170d06 100644
--- a/helix-core/src/test/java/org/apache/helix/Mocks.java
+++ b/helix-core/src/test/java/org/apache/helix/Mocks.java
@@ -479,18 +479,6 @@ public class Mocks {
}
- @Override
- public void registerMonitoringServer(MonitoringServer monitoringServer) {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public MonitoringServer getMonitoringServer() {
- // TODO Auto-generated method stub
- return null;
- }
-
}
public static class MockAccessor implements HelixDataAccessor // DataAccessor
http://git-wip-us.apache.org/repos/asf/helix/blob/3505bead/helix-core/src/test/java/org/apache/helix/MonitoringTestHelper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/MonitoringTestHelper.java b/helix-core/src/test/java/org/apache/helix/MonitoringTestHelper.java
new file mode 100644
index 0000000..c17dab0
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/MonitoringTestHelper.java
@@ -0,0 +1,114 @@
+package org.apache.helix;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.net.ServerSocket;
+
+import org.I0Itec.zkclient.NetworkUtil;
+
+public class MonitoringTestHelper {
+ static final int MAX_PORT = 65535;
+
+ /**
+ * generate a default riemann.config
+ * @param riemannPort
+ * @return
+ */
+ public static String getRiemannConfigString(int riemannPort) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("(logging/init :file \"/dev/null\")\n\n")
+ .append("(tcp-server :host \"0.0.0.0\" :port " + riemannPort + ")\n\n")
+ .append("(instrumentation {:interval 1})\n\n")
+ .append("; (udp-server :host \"0.0.0.0\")\n")
+ .append("; (ws-server :host \"0.0.0.0\")\n")
+ .append("; (repl-server :host \"0.0.0.0\")\n\n")
+ .append("(periodically-expire 1)\n\n")
+ .append(
+ "(let [index (default :ttl 3 (update-index (index)))]\n (streams\n (expired prn)\n index))\n");
+
+ return sb.toString();
+ }
+
+ /**
+ * generate a test config for checking latency
+ * @param proxyPort
+ * @return
+ */
+ public static String getLatencyCheckConfigString(int proxyPort)
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append("(require 'riemann.config)\n")
+ .append("(require 'clj-http.client)\n\n")
+ .append("(defn parse-double\n \"Convert a string into a double\"\n ")
+ .append("[instr]\n (Double/parseDouble instr))\n\n")
+ .append("(defn check-95th-latency\n \"Check if the 95th percentile latency is within expectations\"\n ")
+ .append("[e]\n (let [latency (parse-double (:latency95 e))]\n ")
+ .append("(if (> latency 1.0) \n ; Report if the 95th percentile latency exceeds 1.0s\n ")
+ .append("(do (prn (:host e) \"has an unacceptable 95th percentile latency of\" latency)\n ")
+ .append("(let [alert-name-str (str \"(\" (:cluster e) \".%.\" (:host e) \")(latency95)>(1000)\" )\n ")
+ .append("proxy-url (str \"http://localhost:\" " + proxyPort + " )]\n ")
+ .append("(clj-http.client/post proxy-url {:body alert-name-str }))))))\n\n")
+ .append("(streams\n (where\n ; Only process services containing LatencyReport\n ")
+ .append("(and (service #\".*LatencyReport.*\") (not (state \"expired\")))\n ")
+ .append("check-95th-latency))\n");
+
+ return sb.toString();
+ }
+
+ /**
+ * find an available tcp port
+ * @return
+ */
+ public static int availableTcpPort() {
+ ServerSocket ss = null;
+ try {
+ ss = new ServerSocket(0);
+ ss.setReuseAddress(true);
+ return ss.getLocalPort();
+ } catch (IOException e) {
+ // ok
+ } finally {
+ if (ss != null) {
+ try {
+ ss.close();
+ } catch (IOException e) {
+ // should not be thrown
+ }
+ }
+ }
+ return -1;
+ }
+
+ /**
+ * find the first available port starting from startPort inclusive
+ * @param startPort
+ * @return
+ */
+ public static int availableTcpPort(int startPort) {
+ int port = startPort;
+ for (; port <= MAX_PORT; port++) {
+ if (NetworkUtil.isPortFree(port))
+ break;
+ }
+
+ return port > MAX_PORT ? -1 : port;
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/3505bead/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java b/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
index e07f0b5..f73b368 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
@@ -262,15 +262,4 @@ public class DummyClusterManager implements HelixManager {
}
- @Override
- public void registerMonitoringServer(MonitoringServer monitoringServer) {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public MonitoringServer getMonitoringServer() {
- // TODO Auto-generated method stub
- return null;
- }
}
http://git-wip-us.apache.org/repos/asf/helix/blob/3505bead/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java b/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
index 0b8395e..6ac0fa3 100644
--- a/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
+++ b/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
@@ -268,16 +268,4 @@ public class MockZKHelixManager implements HelixManager {
}
- @Override
- public void registerMonitoringServer(MonitoringServer monitoringServer) {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public MonitoringServer getMonitoringServer() {
- // TODO Auto-generated method stub
- return null;
- }
-
}
http://git-wip-us.apache.org/repos/asf/helix/blob/3505bead/helix-monitor-client/src/main/java/org/apache/helix/monitoring/RiemannMonitoringClient.java
----------------------------------------------------------------------
diff --git a/helix-monitor-client/src/main/java/org/apache/helix/monitoring/RiemannMonitoringClient.java b/helix-monitor-client/src/main/java/org/apache/helix/monitoring/RiemannMonitoringClient.java
index 20b0825..e6c8b2d 100644
--- a/helix-monitor-client/src/main/java/org/apache/helix/monitoring/RiemannMonitoringClient.java
+++ b/helix-monitor-client/src/main/java/org/apache/helix/monitoring/RiemannMonitoringClient.java
@@ -22,14 +22,19 @@ package org.apache.helix.monitoring;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
-import org.I0Itec.zkclient.IZkDataListener;
-import org.apache.helix.BaseDataAccessor;
import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.ZNRecord;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
import org.apache.helix.api.id.ClusterId;
-import org.apache.helix.model.Leader;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.spectator.RoutingTableProvider;
import org.apache.log4j.Logger;
import com.aphyr.riemann.client.AbstractRiemannClient;
@@ -40,113 +45,192 @@ import com.aphyr.riemann.client.UnsupportedJVMException;
import com.google.common.collect.Lists;
/**
- * A Riemann-based monitoring client
- * Thread safety note: connect and disconnect are serialized to ensure that there
- * is no attempt to connect or disconnect with an inconsistent state. The send routines are not
- * protected for performance reasons, and so a single send/flush may fail.
+ * A Riemann-based monitoring client Thread safety note: connect and disconnect are serialized to
+ * ensure that there is no attempt to connect or disconnect with an inconsistent state. The send
+ * routines are not protected for performance reasons, and so a single send/flush may fail.
*/
public class RiemannMonitoringClient implements MonitoringClient {
private static final Logger LOG = Logger.getLogger(RiemannMonitoringClient.class);
- private String _host;
- private int _port;
+ public static final String DEFAULT_MONITORING_SERVICE_NAME = "MonitoringService";
+
+ /**
+ * Contains information about a RiemannClient inside a MonitoringClient
+ */
+ class MonitoringClientInfo {
+ /**
+ * host/port of riemann server to which this client connects
+ */
+ String _host;
+ int _port;
+
+ /**
+ * riemann client
+ */
+ RiemannClient _client;
+
+ /**
+ * batch rieman client, null if batch is not enabled
+ */
+ RiemannBatchClient _batchClient;
+
+ /**
+ * list of periodic tasks scheduled on this riemann client
+ */
+ final List<ScheduledItem> _scheduledItems;
+
+ public MonitoringClientInfo() {
+ _host = null;
+ _port = -1;
+ _client = null;
+ _batchClient = null;
+ _scheduledItems = Lists.newArrayList();
+ }
+
+ }
+
private int _batchSize;
- private RiemannClient _client;
- private RiemannBatchClient _batchClient;
- private List<ScheduledItem> _scheduledItems;
- private HelixDataAccessor _accessor;
- private IZkDataListener _leaderListener;
+ private final ResourceId _monitoringServiceName;
+ private int _monitoringServicePartitionNum;
+
+ private final HelixManager _spectator;
+ private final RoutingTableProvider _routingTableProvider;
+ private final Map<ResourceId, MonitoringClientInfo> _clientMap;
/**
* Create a non-batched monitoring client
- * @param clusterId the cluster to monitor
- * @param accessor an accessor for the cluster
+ * @param zkAddr
+ * @param monitoringClusterId
*/
- public RiemannMonitoringClient(ClusterId clusterId, HelixDataAccessor accessor) {
- this(clusterId, accessor, 1);
+ public RiemannMonitoringClient(String zkAddr, ClusterId monitoringClusterId) {
+ this(zkAddr, monitoringClusterId, ResourceId.from(DEFAULT_MONITORING_SERVICE_NAME), 1);
}
/**
* Create a monitoring client that supports batching
- * @param clusterId the cluster to monitor
- * @param accessor an accessor for the cluster
- * @param batchSize the number of events in a batch
+ * @param clusterId
+ * the cluster to monitor
+ * @param accessor
+ * an accessor for the cluster
+ * @param batchSize
+ * the number of events in a batch
+ * @throws Exception
*/
- public RiemannMonitoringClient(ClusterId clusterId, HelixDataAccessor accessor, int batchSize) {
- _host = null;
- _port = -1;
+ public RiemannMonitoringClient(String zkAddr, ClusterId monitoringClusterId,
+ ResourceId monitoringServiceName, int batchSize) {
_batchSize = batchSize > 0 ? batchSize : 1;
- _client = null;
- _batchClient = null;
- _accessor = accessor;
- _scheduledItems = Lists.newLinkedList();
- _leaderListener = getLeaderListener();
+ _monitoringServiceName = monitoringServiceName;
+ _monitoringServicePartitionNum = 0;
+ _clientMap = new ConcurrentHashMap<ResourceId, RiemannMonitoringClient.MonitoringClientInfo>();
+
+ _spectator =
+ HelixManagerFactory.getZKHelixManager(monitoringClusterId.stringify(), null,
+ InstanceType.SPECTATOR, zkAddr);
+ _routingTableProvider = new RoutingTableProvider();
}
@Override
- public void connect() {
+ public void connect() throws Exception {
if (isConnected()) {
LOG.error("Already connected to Riemann!");
return;
}
- // watch for changes
- changeLeaderSubscription(true);
+ // Connect spectator to the cluster being monitored
+ _spectator.connect();
+ _spectator.addExternalViewChangeListener(_routingTableProvider);
- // do the connect asynchronously as a tcp establishment could take time
- Leader leader = _accessor.getProperty(_accessor.keyBuilder().controllerLeader());
- doConnectAsync(leader);
+ // Get partition number of monitoring service
+ HelixDataAccessor accessor = _spectator.getHelixDataAccessor();
+ IdealState idealState =
+ accessor.getProperty(accessor.keyBuilder().idealStates(_monitoringServiceName.stringify()));
+ _monitoringServicePartitionNum = idealState.getNumPartitions();
}
@Override
public void disconnect() {
- changeLeaderSubscription(false);
- disconnectInternal();
+ // disconnect internal riemann clients
+ for (ResourceId resource : _clientMap.keySet()) {
+ disconnectInternal(resource);
+ }
+
+ _spectator.disconnect();
+ _monitoringServicePartitionNum = 0;
}
@Override
public boolean isConnected() {
- return _client != null && _client.isConnected();
+ return _spectator.isConnected();
}
- @Override
- public boolean flush() {
+ /**
+ * Flush a riemann client for a resource
+ * @param resource
+ * @return
+ */
+ private boolean flush(ResourceId resource) {
if (!isConnected()) {
LOG.error("Tried to flush a Riemann client that is not connected!");
return false;
}
- AbstractRiemannClient c = getClient(true);
+
+ AbstractRiemannClient c = getClient(resource, true);
+ if (c == null) {
+ LOG.warn("Fail to get riemann client for resource: " + resource);
+ return false;
+ }
+
try {
c.flush();
return true;
} catch (IOException e) {
- LOG.error("Problem flushing the Riemann event queue!", e);
+ LOG.error("Problem flushing the Riemann event queue for resource: " + resource, e);
}
return false;
}
@Override
- public boolean send(MonitoringEvent event, boolean batch) {
+ public boolean flush() {
+ boolean succeed = true;
+ for (ResourceId resource : _clientMap.keySet()) {
+ succeed = succeed && flush(resource);
+ }
+
+ return succeed;
+ }
+
+ @Override
+ public boolean send(ResourceId resource, MonitoringEvent event, boolean batch) {
if (!isConnected()) {
LOG.error("Riemann connection must be active in order to send an event!");
return false;
}
- AbstractRiemannClient c = getClient(batch);
- convertEvent(c, event).send();
+
+ if (!isConnected(resource)) {
+ connect(resource, null, event);
+ } else {
+ AbstractRiemannClient c = getClient(resource, batch);
+ convertEvent(c, event).send();
+ }
+
return true;
}
@Override
- public boolean sendAndFlush(MonitoringEvent event) {
- boolean sendResult = send(event, true);
+ public boolean sendAndFlush(ResourceId resource, MonitoringEvent event) {
+
+ boolean sendResult = send(resource, event, true);
if (sendResult) {
- return flush();
+ return flush(resource);
}
return false;
}
+ /**
+ * Batch should be enabled for either all or none of riemann clients
+ */
@Override
public boolean isBatchingEnabled() {
- return _batchClient != null && _batchClient.isConnected();
+ return _batchSize > 1;
}
@Override
@@ -154,68 +238,121 @@ public class RiemannMonitoringClient implements MonitoringClient {
return _batchSize;
}
+ /**
+ * Check if a riemann client for given resource is connected
+ * @param resource
+ * @return true if riemann client is connected, false otherwise
+ */
+ private boolean isConnected(ResourceId resource) {
+ if (!isConnected()) {
+ return false;
+ }
+
+ MonitoringClientInfo clientInfo = _clientMap.get(resource);
+ return clientInfo != null && clientInfo._client != null && clientInfo._client.isConnected();
+ }
+
@Override
- public void every(long interval, long delay, TimeUnit unit, Runnable r) {
+ public synchronized void every(ResourceId resource, long interval, long delay, TimeUnit unit,
+ Runnable r) {
+ if (!isConnected()) {
+ LOG.error("Riemann client must be connected in order to send events!");
+ return;
+ }
+
ScheduledItem scheduledItem = new ScheduledItem();
scheduledItem.interval = interval;
scheduledItem.delay = delay;
scheduledItem.unit = unit;
scheduledItem.r = r;
- _scheduledItems.add(scheduledItem);
- if (isConnected()) {
- getClient().every(interval, delay, unit, r);
+
+ if (isConnected(resource)) {
+ MonitoringClientInfo clientInfo = _clientMap.get(resource);
+ clientInfo._scheduledItems.add(scheduledItem);
+ getClient(resource).every(interval, delay, unit, r);
+ } else {
+ connect(resource, scheduledItem, null);
+ }
+ }
+
+ /**
+ * Connect a riemann client to riemann server given a resource
+ * @param resource
+ * @param scheduledItem
+ * @param pendingEvent
+ */
+ private void connect(ResourceId resource, ScheduledItem scheduledItem,
+ MonitoringEvent pendingEvent) {
+ // Hash by resourceId
+ int partitionKey = resource.hashCode() % _monitoringServicePartitionNum;
+ List<InstanceConfig> instances =
+ _routingTableProvider.getInstances(_monitoringServiceName.stringify(),
+ _monitoringServiceName + "_" + partitionKey, "ONLINE");
+
+ if (instances.size() == 0) {
+ LOG.error("Riemann monitoring server for resource: " + resource + " at partitionKey: "
+ + partitionKey + " is not available");
+ return;
}
+
+ InstanceConfig instanceConfig = instances.get(0);
+ String host = instanceConfig.getHostName();
+ int port = Integer.parseInt(instanceConfig.getPort());
+
+ // Do the connect asynchronously as a tcp establishment could take time
+ doConnectAsync(resource, host, port, scheduledItem, pendingEvent);
}
/**
- * Get a raw, non-batched Riemann client.
- * WARNING: do not cache this, as it may be disconnected without notice
+ * Get a raw, non-batched Riemann client. WARNING: do not cache this, as it may be disconnected
+ * without notice
* @return RiemannClient
*/
- private RiemannClient getClient() {
- return _client;
+ private RiemannClient getClient(ResourceId resource) {
+ MonitoringClientInfo clientInfo = _clientMap.get(resource);
+ return clientInfo == null ? null : clientInfo._client;
}
/**
- * Get a batched Riemann client (if batching is supported)
- * WARNING: do not cache this, as it may be disconnected without notice
+ * Get a batched Riemann client (if batching is supported) WARNING: do not cache this, as it may
+ * be disconnected without notice
* @return RiemannBatchClient
*/
- private RiemannBatchClient getBatchClient() {
- return _batchClient;
+ private RiemannBatchClient getBatchClient(ResourceId resource) {
+ MonitoringClientInfo clientInfo = _clientMap.get(resource);
+ return clientInfo == null ? null : clientInfo._batchClient;
}
/**
- * Get a Riemann client
- * WARNING: do not cache this, as it may be disconnected without notice
- * @param batch true if the client is preferred to support batching, false otherwise
+ * Get a Riemann client WARNING: do not cache this, as it may be disconnected without notice
+ * @param batch
+ * true if the client is preferred to support batching, false otherwise
* @return AbstractRiemannClient
*/
- private AbstractRiemannClient getClient(boolean batch) {
+ private AbstractRiemannClient getClient(ResourceId resource, boolean batch) {
if (batch && isBatchingEnabled()) {
- return getBatchClient();
+ return getBatchClient(resource);
} else {
- return getClient();
+ return getClient(resource);
}
}
/**
* Based on the contents of the leader node, connect to a Riemann server
- * @param leader node containing host/port
+ * @param leader
+ * node containing host/port
*/
- private void doConnectAsync(final Leader leader) {
+ private void doConnectAsync(final ResourceId resource, final String host, final int port,
+ final ScheduledItem scheduledItem, final MonitoringEvent pendingEvent) {
new Thread() {
@Override
public void run() {
synchronized (RiemannMonitoringClient.this) {
- // only connect if the leader is available; otherwise it will be picked up by the callback
- if (leader != null) {
- _host = leader.getMonitoringHost();
- _port = leader.getMonitoringPort();
- }
- // connect if there's a valid host and port
- if (_host != null && _port != -1) {
- connectInternal(_host, _port);
+ if (resource != null && host != null && port != -1) {
+ connectInternal(resource, host, port, scheduledItem, pendingEvent);
+ } else {
+ LOG.error("Fail to doConnectAsync becaue of invalid arguments, resource: " + resource
+ + ", host: " + host + ", port: " + port);
}
}
}
@@ -224,24 +361,52 @@ public class RiemannMonitoringClient implements MonitoringClient {
/**
* Establishment of a connection to a Riemann server
- * @param host monitoring server hostname
- * @param port monitoring server port
+ * @param resource
+ * @param host
+ * @param port
+ * @param scheduledItem
+ * @param pendingEvent
*/
- private synchronized void connectInternal(String host, int port) {
- disconnectInternal();
- try {
- _client = RiemannClient.tcp(host, port);
- _client.connect();
- // we might have to reschedule tasks
- for (ScheduledItem item : _scheduledItems) {
- _client.every(item.interval, item.delay, item.unit, item.r);
+ private synchronized void connectInternal(ResourceId resource, String host, int port,
+ ScheduledItem scheduledItem, MonitoringEvent pendingEvent) {
+ MonitoringClientInfo clientInfo = _clientMap.get(resource);
+ if (clientInfo != null && clientInfo._host.equals(host) && clientInfo._port == port
+ && clientInfo._client != null && clientInfo._client.isConnected()) {
+ LOG.info("Riemann client for resource: " + resource + " already connected on " + host + ":"
+ + port);
+
+ // We might have to reschedule tasks
+ if (scheduledItem != null) {
+ clientInfo._scheduledItems.add(scheduledItem);
+ clientInfo._client.every(scheduledItem.interval, scheduledItem.delay, scheduledItem.unit,
+ scheduledItem.r);
}
+
+ // Sending over pending event
+ if (pendingEvent != null) {
+ convertEvent(clientInfo._client, pendingEvent).send();
+ }
+
+ return;
+ }
+
+ // Disconnect from previous riemann server
+ disconnectInternal(resource);
+
+ // Connect to new riemann server
+ RiemannClient client = null;
+ RiemannBatchClient batchClient = null;
+ try {
+ client = RiemannClient.tcp(host, port);
+ client.connect();
} catch (IOException e) {
LOG.error("Error establishing a connection!", e);
+
}
- if (_client != null && getBatchSize() > 1) {
+
+ if (client != null && getBatchSize() > 1) {
try {
- _batchClient = new RiemannBatchClient(_batchSize, _client);
+ batchClient = new RiemannBatchClient(_batchSize, client);
} catch (UnknownHostException e) {
_batchSize = 1;
LOG.error("Could not resolve host", e);
@@ -250,60 +415,61 @@ public class RiemannMonitoringClient implements MonitoringClient {
LOG.warn("Batching not enabled because of incompatible JVM", e);
}
}
- }
- /**
- * Teardown of a connection to a Riemann server
- */
- private synchronized void disconnectInternal() {
- try {
- if (_batchClient != null && _batchClient.isConnected()) {
- _batchClient.disconnect();
- } else if (_client != null && _client.isConnected()) {
- _client.disconnect();
- }
- } catch (IOException e) {
- LOG.error("Disconnection error", e);
+ if (clientInfo == null) {
+ clientInfo = new MonitoringClientInfo();
}
- _batchClient = null;
- _client = null;
- }
- /**
- * Change the subscription status to the Leader node
- * @param subscribe true to subscribe, false to unsubscribe
- */
- private void changeLeaderSubscription(boolean subscribe) {
- String leaderPath = _accessor.keyBuilder().controllerLeader().getPath();
- BaseDataAccessor<ZNRecord> baseAccessor = _accessor.getBaseDataAccessor();
- if (subscribe) {
- baseAccessor.subscribeDataChanges(leaderPath, _leaderListener);
- } else {
- baseAccessor.unsubscribeDataChanges(leaderPath, _leaderListener);
+ clientInfo._host = host;
+ clientInfo._port = port;
+ clientInfo._client = client;
+ clientInfo._batchClient = batchClient;
+ if (scheduledItem != null) {
+ clientInfo._scheduledItems.add(scheduledItem);
+ }
+ _clientMap.put(resource, clientInfo);
+
+ // We might have to reschedule tasks
+ for (ScheduledItem item : clientInfo._scheduledItems) {
+ client.every(item.interval, item.delay, item.unit, item.r);
+ }
+
+ // Send over pending event
+ if (pendingEvent != null) {
+ convertEvent(client, pendingEvent).send();
}
}
/**
- * Get callbacks for when the leader changes
- * @return implemented IZkDataListener
+ * Teardown of a connection to a Riemann server
*/
- private IZkDataListener getLeaderListener() {
- return new IZkDataListener() {
- @Override
- public void handleDataChange(String dataPath, Object data) throws Exception {
- Leader leader = new Leader((ZNRecord) data);
- doConnectAsync(leader);
- }
+ private synchronized void disconnectInternal(ResourceId resource) {
+ MonitoringClientInfo clientInfo = _clientMap.get(resource);
+ if (clientInfo == null) {
+ return;
+ }
- @Override
- public void handleDataDeleted(String dataPath) throws Exception {
- disconnectInternal();
+ RiemannBatchClient batchClient = clientInfo._batchClient;
+ RiemannClient client = clientInfo._client;
+
+ clientInfo._batchClient = null;
+ clientInfo._client = null;
+
+ try {
+ if (batchClient != null && batchClient.isConnected()) {
+ batchClient.scheduler().shutdown();
+ batchClient.disconnect();
+ } else if (client != null && client.isConnected()) {
+ client.scheduler().shutdown();
+ client.disconnect();
}
- };
+ } catch (IOException e) {
+ LOG.error("Disconnection error", e);
+ }
}
/**
- * Change a helix event into a Riemann event
+ * Change a helix monitoring event into a Riemann event
* @param c Riemann client
* @param helixEvent helix event
* @return Riemann EventDSL
http://git-wip-us.apache.org/repos/asf/helix/blob/3505bead/helix-monitor-server/pom.xml
----------------------------------------------------------------------
diff --git a/helix-monitor-server/pom.xml b/helix-monitor-server/pom.xml
index 041a390..2cbddfd 100644
--- a/helix-monitor-server/pom.xml
+++ b/helix-monitor-server/pom.xml
@@ -54,6 +54,11 @@ under the License.
<version>0.2.4</version>
</dependency>
<dependency>
+ <groupId>org.eclipse.jetty.aggregate</groupId>
+ <artifactId>jetty-all-server</artifactId>
+ <version>8.1.14.v20131031</version>
+ </dependency>
+ <dependency>
<groupId>factual</groupId>
<artifactId>clj-helix</artifactId>
<version>0.1.0</version>
http://git-wip-us.apache.org/repos/asf/helix/blob/3505bead/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAgent.java
----------------------------------------------------------------------
diff --git a/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAgent.java b/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAgent.java
new file mode 100644
index 0000000..474d838
--- /dev/null
+++ b/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAgent.java
@@ -0,0 +1,123 @@
+package org.apache.helix.monitoring;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.log4j.Logger;
+
+import com.aphyr.riemann.client.RiemannClient;
+
+public class RiemannAgent {
+ private static final Logger LOG = Logger.getLogger(RiemannAgent.class);
+
+ static final String STATEMODEL_NAME = "OnlineOffline";
+
+ final Random _random;
+ final String _zkAddr;
+ final String _clusterName;
+ final String _instanceName;
+ final int _riemannPort;
+ final HelixManager _participant;
+ final RiemannClient _client;
+
+ RiemannAgent(String zkAddr, String clusterName, int riemannPort) throws IOException {
+ _random = new Random();
+ _zkAddr = zkAddr;
+ _clusterName = clusterName;
+ _instanceName =
+ String.format("%s_%d", InetAddress.getLocalHost().getCanonicalHostName(), riemannPort);
+ _riemannPort = riemannPort;
+ _participant =
+ HelixManagerFactory.getZKHelixManager(clusterName, _instanceName, InstanceType.PARTICIPANT,
+ zkAddr);
+ _client = RiemannClient.tcp("localhost", riemannPort);
+ }
+
+ public void start() throws Exception {
+ LOG.info("Starting RiemannAgent. zk: " + _zkAddr + ", cluster: " + _clusterName
+ + ", instance: " + _instanceName + ", riemannPort: " + _riemannPort);
+
+ // Wait until riemann port is connected
+ int timeout = 30 * 1000;
+ long startT = System.currentTimeMillis();
+ while ((System.currentTimeMillis() - startT) < timeout) {
+ try {
+ _client.connect();
+ break;
+ } catch (IOException e) {
+ int sleep = _random.nextInt(3000) + 3000;
+ LOG.info("Wait " + sleep + "ms for riemann server to come up");
+ TimeUnit.MILLISECONDS.sleep(sleep);
+ }
+ }
+
+ if (!_client.isConnected()) {
+ String err =
+ "Fail to connect to reimann server on localhost:" + _riemannPort + " in " + timeout
+ + "ms";
+ LOG.error(err);
+ throw new RuntimeException(err);
+ }
+ LOG.info("RiemannAgent connected to local riemann server on port: " + _riemannPort);
+
+ // Start helix participant
+ _participant.getStateMachineEngine().registerStateModelFactory(STATEMODEL_NAME,
+ new RiemannAgentStateModelFactory());
+ _participant.connect();
+
+ // Monitor riemann server
+ _client.every(10, 0, TimeUnit.SECONDS, new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ // send heartbeat metrics
+ _client.event().service("heartbeat").state("running").ttl(20).sendWithAck();
+ } catch (Exception e) {
+ LOG.error("Exception in send heatbeat to local riemann server, shutdown RiemannAgent: "
+ + _instanceName, e);
+ shutdown();
+ }
+ }
+ });
+
+ }
+
+ public void shutdown() {
+ LOG.info("Shutting down RiemannAgent. zk: " + _zkAddr + ", cluster: " + _clusterName
+ + ", instance: " + _instanceName + ", riemannPort: " + _riemannPort);
+
+ try {
+ _client.scheduler().shutdown();
+ _client.disconnect();
+ } catch (IOException e) {
+ LOG.error("Exception in disconnect riemann client", e);
+ }
+
+ _participant.disconnect();
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/3505bead/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAgentStateModel.java
----------------------------------------------------------------------
diff --git a/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAgentStateModel.java b/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAgentStateModel.java
new file mode 100644
index 0000000..2e32cef
--- /dev/null
+++ b/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAgentStateModel.java
@@ -0,0 +1,54 @@
+package org.apache.helix.monitoring;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.log4j.Logger;
+
+@StateModelInfo(initialState = "OFFLINE", states = {
+ "DROPPED", "OFFLINE", "ONLINE"
+})
+public class RiemannAgentStateModel extends StateModel {
+ private static final Logger LOG = Logger.getLogger(RiemannAgentStateModel.class);
+
+ void logTransition(Message message) {
+ String toState = message.getToState();
+ String fromState = message.getFromState();
+ String resourceName = message.getResourceName();
+ String partittionName = message.getPartitionName();
+
+ LOG.info("Become " + toState + " from " + fromState + " for resource: " + resourceName
+ + ", partition: " + partittionName);
+ }
+
+ @Transition(to = "ONLINE", from = "OFFLINE")
+ public void onBecomeOnlineFromOffline(Message message, NotificationContext context) {
+ logTransition(message);
+ }
+
+ @Transition(to = "OFFLINE", from = "ONLINE")
+ public void onBecomeOfflineFromOnline(Message message, NotificationContext context) {
+ logTransition(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/3505bead/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAgentStateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAgentStateModelFactory.java b/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAgentStateModelFactory.java
new file mode 100644
index 0000000..a5865ad
--- /dev/null
+++ b/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAgentStateModelFactory.java
@@ -0,0 +1,29 @@
+package org.apache.helix.monitoring;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.participant.statemachine.StateModelFactory;
+
+public class RiemannAgentStateModelFactory extends StateModelFactory<RiemannAgentStateModel> {
+ @Override
+ public RiemannAgentStateModel createNewStateModel(String partitionName) {
+ return new RiemannAgentStateModel();
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/3505bead/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAlertProxy.java
----------------------------------------------------------------------
diff --git a/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAlertProxy.java b/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAlertProxy.java
new file mode 100644
index 0000000..5fe8ebe
--- /dev/null
+++ b/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAlertProxy.java
@@ -0,0 +1,111 @@
+package org.apache.helix.monitoring;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.StringWriter;
+import java.nio.charset.Charset;
+import java.util.UUID;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.alert.AlertName;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.log4j.Logger;
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.handler.AbstractHandler;
+
+/**
+ * Accept alerts from local riemann server and forward it to helix-controller
+ */
+public class RiemannAlertProxy {
+ private static final Logger LOG = Logger.getLogger(RiemannAlertProxy.class);
+
+ class RiemannAlertProxyHandler extends AbstractHandler {
+ @Override
+ public void handle(String target, Request baseRequest, HttpServletRequest request,
+ HttpServletResponse response) throws IOException, ServletException {
+ // Read content-body
+ InputStream inputStream = request.getInputStream();
+ StringWriter writer = new StringWriter();
+ IOUtils.copy(inputStream, writer, Charset.defaultCharset().toString());
+ String alertNameStr = writer.toString();
+ LOG.info("Handling alert: " + alertNameStr);
+
+ // Send alert message to the controller of cluster being monitored
+ try {
+ AlertName alertName = AlertName.from(alertNameStr);
+ String clusterName = alertName.getScope().getClusterId().stringify();
+ HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseAccessor);
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ Message message = new Message(MessageType.ALERT, UUID.randomUUID().toString());
+ message.setAttribute(Message.Attributes.ALERT_NAME, alertNameStr);
+ message.setTgtSessionId("*");
+ message.setTgtName("controller");
+ accessor.setProperty(keyBuilder.controllerMessage(message.getId()), message);
+ } catch (Exception e) {
+ LOG.error("Fail to send alert to cluster being monitored: " + alertNameStr, e);
+ }
+
+ // return ok
+ response.setStatus(HttpServletResponse.SC_OK);
+ baseRequest.setHandled(true);
+ }
+ }
+
+ final int _proxyPort;
+ final Server _server;
+ final BaseDataAccessor<ZNRecord> _baseAccessor;
+ final AbstractHandler _handler;
+
+ public RiemannAlertProxy(int proxyPort, BaseDataAccessor<ZNRecord> baseAccessor) {
+ _proxyPort = proxyPort;
+ _server = new Server(proxyPort);
+ _baseAccessor = baseAccessor;
+ _handler = new RiemannAlertProxyHandler();
+ }
+
+ public void start() throws Exception {
+ LOG.info("Starting RiemannAlertProxy on port: " + _proxyPort);
+ _server.setHandler(_handler);
+ _server.start();
+
+ }
+
+ public void shutdown() {
+ try {
+ LOG.info("Stopping RiemannAlertProxy on port: " + _proxyPort);
+ _server.stop();
+ } catch (Exception e) {
+ LOG.error("Fail to stop RiemannAlertProxy", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/3505bead/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannConfigs.java
----------------------------------------------------------------------
diff --git a/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannConfigs.java b/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannConfigs.java
new file mode 100644
index 0000000..193b763
--- /dev/null
+++ b/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannConfigs.java
@@ -0,0 +1,116 @@
+package org.apache.helix.monitoring;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.PrintWriter;
+import java.util.List;
+
+import org.apache.helix.model.MonitoringConfig;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Riemann configs
+ */
+public class RiemannConfigs {
+ private static final Logger LOG = Logger.getLogger(RiemannConfigs.class);
+ private static final String DEFAULT_CONFIG_DIR = "riemannconfigs";
+ public static final String DEFAULT_RIEMANN_CONFIG = "riemann.config";
+
+ private final String _configDir;
+ private final List<MonitoringConfig> _configs;
+
+ RiemannConfigs(String configDir, List<MonitoringConfig> configs) {
+ _configDir = configDir;
+ _configs = configs;
+ }
+
+ /**
+ * persist configs to riemann config dir
+ */
+ public void persistConfigs() {
+ // create the directory
+ File dir = new File(_configDir);
+ if (!dir.exists()) {
+ dir.mkdir();
+ }
+
+ for (MonitoringConfig config : _configs) {
+ String configData = config.getConfig();
+ String fileName = _configDir + "/" + config.getId();
+ try {
+ PrintWriter writer = new PrintWriter(fileName);
+ writer.println(configData);
+ writer.close();
+
+ // make sure this is cleaned up eventually
+ File file = new File(fileName);
+ file.deleteOnExit();
+ } catch (FileNotFoundException e) {
+ LOG.error("Could not write " + config.getId(), e);
+ }
+ }
+ }
+
+ public String getConfigDir() {
+ return _configDir;
+ }
+
+ public static class Builder {
+ private final List<MonitoringConfig> _configs;
+ private final String _configDir;
+
+ /**
+ * By default, configs will be placed in "{systemTmpDir}/riemannconfigs"
+ */
+ public Builder() {
+ this(System.getProperty("java.io.tmpdir") + "/" + DEFAULT_CONFIG_DIR);
+ }
+
+ public Builder(String configDir) {
+ _configDir = configDir;
+ _configs = Lists.newArrayList();
+ }
+
+ public Builder addConfig(MonitoringConfig monitoringConfig) {
+ _configs.add(monitoringConfig);
+ return this;
+ }
+
+ public Builder addConfigs(List<MonitoringConfig> monitoringConfigs) {
+ _configs.addAll(monitoringConfigs);
+ return this;
+ }
+
+ public RiemannConfigs build() {
+ // Check default riemann config exists
+ for (MonitoringConfig config : _configs) {
+ if (config.getId().equals(DEFAULT_RIEMANN_CONFIG)) {
+ return new RiemannConfigs(_configDir, _configs);
+ }
+ }
+ throw new IllegalArgumentException("Missing default riemann config: "
+ + DEFAULT_RIEMANN_CONFIG);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/3505bead/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannMonitoringServer.java
----------------------------------------------------------------------
diff --git a/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannMonitoringServer.java b/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannMonitoringServer.java
index 36719aa..d4f11a5 100644
--- a/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannMonitoringServer.java
+++ b/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannMonitoringServer.java
@@ -19,25 +19,6 @@ package org.apache.helix.monitoring;
* under the License.
*/
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.net.InetAddress;
-import java.net.URL;
-import java.net.UnknownHostException;
-import java.util.List;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.helix.BaseDataAccessor;
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.model.MonitoringConfig;
-import org.apache.helix.util.ZKClientPool;
import org.apache.log4j.Logger;
import clojure.lang.RT;
@@ -48,62 +29,29 @@ import clojure.lang.Symbol;
*/
public class RiemannMonitoringServer implements MonitoringServer {
private static final Logger LOG = Logger.getLogger(RiemannMonitoringServer.class);
- private static final String DEFAULT_CONFIG_DIR = "/tmp/riemannconfigs";
- private final String _host;
- private boolean _isStarted;
- private boolean _configsAdded;
- private String _configDir;
- /**
- * Create a monitoring server. Configs will be placed in "/tmp/riemannconfigs".
- */
- public RiemannMonitoringServer(String host) {
- this(DEFAULT_CONFIG_DIR, host);
- }
+ private volatile boolean _isStarted;
+ private final RiemannConfigs _config;
/**
- * Create a monitoring server.
- * @param configDir Directory to use for storing configs
- * @param host Hostname where the server lives (i.e. the hostname of this machine)
+ * Create a monitoring server
+ * @param config
*/
- public RiemannMonitoringServer(String configDir, String host) {
- _configDir = configDir;
+ public RiemannMonitoringServer(RiemannConfigs config) {
+ LOG.info("Construct RiemannMonitoringServer with configDir: " + config.getConfigDir());
+ _config = config;
+ config.persistConfigs();
_isStarted = false;
- _configsAdded = false;
- _host = host;
}
@Override
public synchronized void start() {
- // get the config file
- URL url = Thread.currentThread().getContextClassLoader().getResource("riemann.config");
- if (url == null) {
- LOG.error("Riemann config file does not exist!");
- return;
- }
- String path = url.getPath();
- LOG.info("Riemann config file is at: " + path);
-
- // register config files
- if (_configsAdded) {
- try {
- File srcFile = new File(path);
- File file = File.createTempFile("riemann", "config");
- file.deleteOnExit();
- FileUtils.copyFile(srcFile, file);
- PrintWriter out = new PrintWriter(new BufferedWriter(new FileWriter(file, true)));
- out.println("\n(include \"" + _configDir + "\")\n");
- out.close();
- path = file.getAbsolutePath();
- } catch (IOException e) {
- LOG.error("Could not add configs!");
- }
- }
+ LOG.info("Starting Riemann server with configDir: " + _config.getConfigDir());
// start Riemann
RT.var("clojure.core", "require").invoke(Symbol.intern("riemann.bin"));
- RT.var("clojure.core", "require").invoke(Symbol.intern("riemann.config"));
- RT.var("riemann.bin", "-main").invoke(path);
+ RT.var("clojure.core", "require").invoke(Symbol.intern(RiemannConfigs.DEFAULT_RIEMANN_CONFIG));
+ RT.var("riemann.bin", "-main").invoke(_config.getConfigDir());
_isStarted = true;
}
@@ -113,70 +61,13 @@ public class RiemannMonitoringServer implements MonitoringServer {
LOG.error("Tried to stop Riemann when not started!");
return;
}
+ LOG.info("Stopping Riemann server");
RT.var("riemann.config", "stop!").invoke();
_isStarted = false;
}
@Override
- public synchronized void addConfigs(HelixDataAccessor accessor) {
- // create the directory
- File dir = new File(_configDir);
- if (!dir.exists()) {
- dir.mkdir();
- }
-
- // persist ZK-based configs
- if (accessor != null) {
- List<MonitoringConfig> configs =
- accessor.getChildValues(accessor.keyBuilder().monitoringConfigs());
- for (MonitoringConfig config : configs) {
- String configData = config.getConfig();
- String fileName = _configDir + "/" + config.getId();
- try {
- PrintWriter writer = new PrintWriter(fileName);
- writer.println(configData);
- writer.close();
-
- // make sure this is cleaned up eventually
- File file = new File(fileName);
- file.deleteOnExit();
- } catch (FileNotFoundException e) {
- LOG.error("Could not write " + config.getId(), e);
- }
- }
- }
-
- // restart if started
- if (_isStarted) {
- stop();
- start();
- }
- _configsAdded = true;
- }
-
- @Override
public boolean isStarted() {
return _isStarted;
}
-
- @Override
- public String getHost() {
- return _host;
- }
-
- @Override
- public int getPort() {
- return 5555;
- }
-
- public static void main(String[] args) throws InterruptedException, UnknownHostException {
- RiemannMonitoringServer service =
- new RiemannMonitoringServer(InetAddress.getLocalHost().getHostName());
- BaseDataAccessor<ZNRecord> baseAccessor =
- new ZkBaseDataAccessor<ZNRecord>(ZKClientPool.getZkClient("eat1-app87.corp:2181"));
- HelixDataAccessor accessor = new ZKHelixDataAccessor("perf-test-cluster", baseAccessor);
- service.addConfigs(accessor);
- service.start();
- Thread.currentThread().join();
- }
}
http://git-wip-us.apache.org/repos/asf/helix/blob/3505bead/helix-monitor-server/src/test/java/org/apache/helix/monitoring/BasicMonitoringTest.java
----------------------------------------------------------------------
diff --git a/helix-monitor-server/src/test/java/org/apache/helix/monitoring/BasicMonitoringTest.java b/helix-monitor-server/src/test/java/org/apache/helix/monitoring/BasicMonitoringTest.java
deleted file mode 100644
index 6680f00..0000000
--- a/helix-monitor-server/src/test/java/org/apache/helix/monitoring/BasicMonitoringTest.java
+++ /dev/null
@@ -1,93 +0,0 @@
-package org.apache.helix.monitoring;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.net.InetAddress;
-import java.util.Date;
-
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.TestHelper;
-import org.apache.helix.ZkUnitTestBase;
-import org.apache.helix.integration.manager.ClusterControllerManager;
-import org.apache.helix.model.IdealState.RebalanceMode;
-import org.apache.helix.model.Leader;
-import org.junit.Assert;
-import org.testng.annotations.Test;
-
-public class BasicMonitoringTest extends ZkUnitTestBase {
- @Test
- public void testStartAndStop() throws Exception {
- final int NUM_PARTICIPANTS = 10;
- final int NUM_PARTITIONS = 4;
- final int NUM_REPLICAS = 2;
-
- String className = TestHelper.getTestClassName();
- String methodName = TestHelper.getTestMethodName();
- String clusterName = className + "_" + methodName;
- System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
-
- // Set up cluster
- TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
- "localhost", // participant name prefix
- "TestDB", // resource name prefix
- 1, // resources
- NUM_PARTITIONS, // partitions per resource
- NUM_PARTICIPANTS, // number of nodes
- NUM_REPLICAS, // replicas
- "MasterSlave", // pick a built-in state model
- RebalanceMode.FULL_AUTO, // let Helix handle rebalancing
- true); // do rebalance
-
- // start controller
- ClusterControllerManager controller =
- new ClusterControllerManager(ZK_ADDR, clusterName, "controller");
- controller.registerMonitoringServer(new RiemannMonitoringServer(InetAddress.getLocalHost()
- .getHostName()));
- controller.syncStart();
-
- // make sure the leader has registered and is showing the server port
- HelixDataAccessor accessor = controller.getHelixDataAccessor();
- PropertyKey.Builder keyBuilder = accessor.keyBuilder();
- Leader leader = accessor.getProperty(keyBuilder.controllerLeader());
- Assert.assertNotNull(leader);
- Assert.assertNotEquals(leader.getMonitoringPort(), -1);
- Assert.assertNotNull(leader.getMonitoringHost());
-
- // stop controller
- controller.syncStop();
-
- // start controller without monitoring
- ClusterControllerManager rawController =
- new ClusterControllerManager(ZK_ADDR, clusterName, "controller");
- rawController.syncStart();
-
- // make sure the leader has registered, but has no monitoring port
- accessor = rawController.getHelixDataAccessor();
- keyBuilder = accessor.keyBuilder();
- leader = accessor.getProperty(keyBuilder.controllerLeader());
- Assert.assertNotNull(leader);
- Assert.assertEquals(leader.getMonitoringPort(), -1);
- Assert.assertNull(leader.getMonitoringHost());
-
- // stop controller
- rawController.syncStop();
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/3505bead/helix-monitor-server/src/test/java/org/apache/helix/monitoring/IntegrationTest.java
----------------------------------------------------------------------
diff --git a/helix-monitor-server/src/test/java/org/apache/helix/monitoring/IntegrationTest.java b/helix-monitor-server/src/test/java/org/apache/helix/monitoring/IntegrationTest.java
new file mode 100644
index 0000000..9d28dc3
--- /dev/null
+++ b/helix-monitor-server/src/test/java/org/apache/helix/monitoring/IntegrationTest.java
@@ -0,0 +1,206 @@
+package org.apache.helix.monitoring;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+import java.util.List;
+
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.MonitoringTestHelper;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.controller.alert.AlertAction;
+import org.apache.helix.controller.alert.AlertName;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.manager.zk.DefaultAlertMsgHandlerFactory;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.model.AlertConfig;
+import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.MonitoringConfig;
+import org.apache.helix.model.builder.HelixConfigScopeBuilder;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Lists;
+
+public class IntegrationTest extends ZkUnitTestBase {
+ @Test
+ public void testBasic() throws Exception {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+ // Set up monitoring cluster
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "MonitoringService", // resource name prefix
+ 1, // resources
+ 8, // partitions per resource
+ 0, // number of nodes
+ 1, // replicas
+ "OnlineOffline", // pick a built-in state model
+ RebalanceMode.FULL_AUTO, // let Helix handle rebalancing
+ true); // do rebalance
+
+ // Enable auto-join
+ HelixConfigScope scope =
+ new HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER).forCluster(clusterName).build();
+ ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient);
+ configAccessor.set(scope, ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN, "" + true);
+
+ // Start controller
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller");
+ controller.syncStart();
+
+ // Start helix proxy
+ int proxyPort = MonitoringTestHelper.availableTcpPort();
+ final BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+ final HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
+ final PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ RiemannAlertProxy proxy = new RiemannAlertProxy(proxyPort, baseAccessor);
+ proxy.start();
+
+ // Start monitoring server
+ int riemannPort = MonitoringTestHelper.availableTcpPort();
+ MonitoringConfig riemannConfig = new MonitoringConfig(RiemannConfigs.DEFAULT_RIEMANN_CONFIG);
+ riemannConfig.setConfig(MonitoringTestHelper.getRiemannConfigString(riemannPort));
+
+ MonitoringConfig latencyCheckConfig = new MonitoringConfig("check_latency_config.clj");
+ latencyCheckConfig.setConfig(MonitoringTestHelper.getLatencyCheckConfigString(proxyPort));
+
+ // Set monitoring config on zk
+ accessor.setProperty(keyBuilder.monitoringConfig(RiemannConfigs.DEFAULT_RIEMANN_CONFIG),
+ riemannConfig);
+ accessor.setProperty(keyBuilder.monitoringConfig("check_latency_config.clj"),
+ latencyCheckConfig);
+
+ RiemannConfigs.Builder riemannConfigBuilder =
+ new RiemannConfigs.Builder().addConfigs(Lists.newArrayList(riemannConfig,
+ latencyCheckConfig));
+ RiemannMonitoringServer server = new RiemannMonitoringServer(riemannConfigBuilder.build());
+ server.start();
+
+ // Start Riemann agent
+ RiemannAgent agent = new RiemannAgent(ZK_ADDR, clusterName, riemannPort);
+ agent.start();
+
+ // Check live-instance
+ List<String> liveInstances = accessor.getChildNames(keyBuilder.liveInstances());
+ Assert.assertNotNull(liveInstances);
+ Assert.assertEquals(liveInstances.size(), 1);
+
+ // Check external-view
+ boolean result =
+ ClusterStateVerifier
+ .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // Setup mock storage cluster to be monitored
+ String storageClusterName = clusterName + "_storage";
+ TestHelper.setupCluster(storageClusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ 8, // partitions per resource
+ 2, // number of nodes
+ 1, // replicas
+ "MasterSlave", // pick a built-in state model
+ RebalanceMode.FULL_AUTO, // let Helix handle rebalancing
+ true); // do rebalance
+
+ // Add alert config
+ AlertConfig alertConfig = new AlertConfig("default");
+ AlertName alertName =
+ new AlertName.Builder().cluster(ClusterId.from(storageClusterName)).metric("latency95")
+ .largerThan("1000").build();
+ AlertAction alertAction =
+ new AlertAction.Builder().cmd("enableInstance").args("{cluster}", "{node}", "false")
+ .build();
+ alertConfig.putConfig(alertName, alertAction);
+ final HelixDataAccessor storageAccessor =
+ new ZKHelixDataAccessor(storageClusterName, baseAccessor);
+ final PropertyKey.Builder storageKeyBuilder = storageAccessor.keyBuilder();
+ storageAccessor.setProperty(storageKeyBuilder.alertConfig("default"), alertConfig);
+
+ // Start another controller for mock storage cluster
+ ClusterControllerManager storageController =
+ new ClusterControllerManager(ZK_ADDR, storageClusterName, "controller");
+ MessageHandlerFactory fty = new DefaultAlertMsgHandlerFactory();
+ storageController.getMessagingService()
+ .registerMessageHandlerFactory(fty.getMessageType(), fty);
+ storageController.syncStart();
+
+ // Check localhost_12918 is enabled
+ InstanceConfig instanceConfig =
+ storageAccessor.getProperty(storageKeyBuilder.instanceConfig("localhost_12918"));
+ Assert.assertTrue(instanceConfig.getInstanceEnabled());
+
+ // Connect monitoring client
+ final RiemannMonitoringClient rclient =
+ new RiemannMonitoringClient(ZK_ADDR, ClusterId.from(clusterName),
+ ResourceId.from("MonitoringService0"), 1);
+ rclient.connect();
+
+ MonitoringEvent event =
+ new MonitoringEvent().participant(ParticipantId.from("localhost_12918"))
+ .name("LatencyReport").attribute("latency95", "" + 2)
+ .attribute("cluster", storageClusterName);
+ rclient.send(ResourceId.from("TestDB0"), event, false);
+
+ // Check localhost_12918 is disabled
+ result = TestHelper.verify(new TestHelper.Verifier() {
+
+ @Override
+ public boolean verify() throws Exception {
+ InstanceConfig instanceConfig =
+ storageAccessor.getProperty(storageKeyBuilder.instanceConfig("localhost_12918"));
+ return instanceConfig.getInstanceEnabled() == false;
+ }
+ }, 10 * 1000);
+ Assert.assertTrue(result, "localhost_12918 should be disabled");
+
+ // Cleanup
+ rclient.disconnect();
+ storageController.syncStop();
+ controller.syncStop();
+
+ agent.shutdown();
+ server.stop();
+ proxy.shutdown();
+ System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+ }
+}