You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2014/02/27 19:33:46 UTC
git commit: [HELIX-319] refactor MonitoringClient to accommodate
distributed monitoring server, fix a minor bug, add a new test case
Repository: helix
Updated Branches:
refs/heads/helix-monitoring 3505beadb -> 8dc50de1a
[HELIX-319] refactor MonitoringClient to accommodate distributed monitoring server, fix a minor bug, add a new test case
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/8dc50de1
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/8dc50de1
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/8dc50de1
Branch: refs/heads/helix-monitoring
Commit: 8dc50de1ad0c7be5a1536e9d31af14564f880772
Parents: 3505bea
Author: zzhang <zz...@apache.org>
Authored: Thu Feb 27 10:33:29 2014 -0800
Committer: zzhang <zz...@apache.org>
Committed: Thu Feb 27 10:33:29 2014 -0800
----------------------------------------------------------------------
.../main/java/org/apache/helix/PropertyKey.java | 9 ++
.../zk/DefaultAlertMsgHandlerFactory.java | 30 ++++--
.../monitoring/RiemannMonitoringClient.java | 13 +++
.../apache/helix/monitoring/RiemannAgent.java | 2 +-
.../helix/monitoring/TestRiemannAgent.java | 2 -
.../helix/monitoring/TestRiemannAlertProxy.java | 105 +++++++++++++++++++
6 files changed, 148 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/8dc50de1/helix-core/src/main/java/org/apache/helix/PropertyKey.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/PropertyKey.java b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
index e1c8f5f..ab41cf1 100644
--- a/helix-core/src/main/java/org/apache/helix/PropertyKey.java
+++ b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
@@ -345,6 +345,15 @@ public class PropertyKey {
}
/**
+ * Get a property key associated with all {@link AlertConfig}
+ * @return {@link PropertyKey}
+ */
+ public PropertyKey alertConfigs() {
+ return new PropertyKey(CONFIGS, ConfigScopeProperty.ALERT, AlertConfig.class,
+ _clusterName, ConfigScopeProperty.ALERT.name());
+ }
+
+ /**
* Get a property key associated with a single {@link AlertConfig}
* @param alertConfigName name of the configuration
* @return {@link PropertyKey}
http://git-wip-us.apache.org/repos/asf/helix/blob/8dc50de1/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultAlertMsgHandlerFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultAlertMsgHandlerFactory.java b/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultAlertMsgHandlerFactory.java
index 814f01c..4766a35 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultAlertMsgHandlerFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultAlertMsgHandlerFactory.java
@@ -19,6 +19,8 @@ package org.apache.helix.manager.zk;
* under the License.
*/
+import java.util.List;
+
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
@@ -37,7 +39,6 @@ import org.apache.log4j.Logger;
public class DefaultAlertMsgHandlerFactory implements MessageHandlerFactory {
private static final Logger LOG = Logger.getLogger(DefaultAlertMsgHandlerFactory.class);
- public static final String DEFAULT_ALERT_CONFIG = "default";
public static class DefaultAlertMsgHandler extends MessageHandler {
public DefaultAlertMsgHandler(Message message, NotificationContext context) {
@@ -46,18 +47,27 @@ public class DefaultAlertMsgHandlerFactory implements MessageHandlerFactory {
@Override
public HelixTaskResult handleMessage() throws InterruptedException {
+ LOG.info("Handling alert message: " + _message);
HelixManager manager = _notificationContext.getManager();
HelixTaskResult result = new HelixTaskResult();
- // get alert-name from message
+ // Get alert-name from message
String alertNameStr = _message.getAttribute(Attributes.ALERT_NAME);
AlertName alertName = AlertName.from(alertNameStr);
- // get action from alert config
+ // Find action from alert config
HelixDataAccessor accessor = manager.getHelixDataAccessor();
- AlertConfig defaultAlertConfig =
- accessor.getProperty(accessor.keyBuilder().alertConfig(DEFAULT_ALERT_CONFIG));
- AlertAction action = defaultAlertConfig.findAlertAction(alertName);
+ List<AlertConfig> alertConfigs =
+ accessor.getChildValues(accessor.keyBuilder().alertConfigs());
+
+ AlertAction action = null;
+ for (AlertConfig alertConfig : alertConfigs) {
+ action = alertConfig.findAlertAction(alertName);
+ if (action != null) {
+ LOG.info("Find alertAction: " + action + " for alertName " + alertName);
+ break;
+ }
+ }
if (action != null) {
// perform action
@@ -93,7 +103,7 @@ public class DefaultAlertMsgHandlerFactory implements MessageHandlerFactory {
}
public DefaultAlertMsgHandlerFactory() {
- LOG.info("construct default alert message handler factory");
+ LOG.info("Construct default alert message handler factory");
}
@Override
@@ -102,7 +112,7 @@ public class DefaultAlertMsgHandlerFactory implements MessageHandlerFactory {
if (!type.equals(getMessageType())) {
throw new HelixException("Unexpected msg type for message " + message.getMessageId()
- + " type:" + message.getMsgType());
+ + " type:" + message.getMsgType());
}
return new DefaultAlertMsgHandler(message, context);
@@ -116,7 +126,7 @@ public class DefaultAlertMsgHandlerFactory implements MessageHandlerFactory {
@Override
public void reset() {
- LOG.info("reset default alert message handler factory");
+ LOG.info("Reset default alert message handler factory");
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/8dc50de1/helix-monitor-client/src/main/java/org/apache/helix/monitoring/RiemannMonitoringClient.java
----------------------------------------------------------------------
diff --git a/helix-monitor-client/src/main/java/org/apache/helix/monitoring/RiemannMonitoringClient.java b/helix-monitor-client/src/main/java/org/apache/helix/monitoring/RiemannMonitoringClient.java
index e6c8b2d..591bcb9 100644
--- a/helix-monitor-client/src/main/java/org/apache/helix/monitoring/RiemannMonitoringClient.java
+++ b/helix-monitor-client/src/main/java/org/apache/helix/monitoring/RiemannMonitoringClient.java
@@ -90,6 +90,7 @@ public class RiemannMonitoringClient implements MonitoringClient {
private int _batchSize;
private final ResourceId _monitoringServiceName;
+ private final ClusterId _monitoringCluster;
private int _monitoringServicePartitionNum;
private final HelixManager _spectator;
@@ -119,6 +120,7 @@ public class RiemannMonitoringClient implements MonitoringClient {
ResourceId monitoringServiceName, int batchSize) {
_batchSize = batchSize > 0 ? batchSize : 1;
_monitoringServiceName = monitoringServiceName;
+ _monitoringCluster = monitoringClusterId;
_monitoringServicePartitionNum = 0;
_clientMap = new ConcurrentHashMap<ResourceId, RiemannMonitoringClient.MonitoringClientInfo>();
@@ -143,7 +145,18 @@ public class RiemannMonitoringClient implements MonitoringClient {
HelixDataAccessor accessor = _spectator.getHelixDataAccessor();
IdealState idealState =
accessor.getProperty(accessor.keyBuilder().idealStates(_monitoringServiceName.stringify()));
+ if (idealState == null) {
+ throw new IllegalArgumentException("Resource for MonitoringService: "
+ + _monitoringServiceName + " doesn't exist in cluster: " + _monitoringCluster);
+ }
+
_monitoringServicePartitionNum = idealState.getNumPartitions();
+
+ if (_monitoringServicePartitionNum <= 0) {
+ throw new IllegalArgumentException("Invalid partition number of MonitoringService: "
+ + _monitoringServiceName + " in cluster: " + _monitoringCluster + ", was "
+ + _monitoringServicePartitionNum);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/helix/blob/8dc50de1/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAgent.java
----------------------------------------------------------------------
diff --git a/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAgent.java b/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAgent.java
index 474d838..06957f4 100644
--- a/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAgent.java
+++ b/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAgent.java
@@ -44,7 +44,7 @@ public class RiemannAgent {
final HelixManager _participant;
final RiemannClient _client;
- RiemannAgent(String zkAddr, String clusterName, int riemannPort) throws IOException {
+ public RiemannAgent(String zkAddr, String clusterName, int riemannPort) throws IOException {
_random = new Random();
_zkAddr = zkAddr;
_clusterName = clusterName;
http://git-wip-us.apache.org/repos/asf/helix/blob/8dc50de1/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestRiemannAgent.java
----------------------------------------------------------------------
diff --git a/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestRiemannAgent.java b/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestRiemannAgent.java
index 100c28c..39ece64 100644
--- a/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestRiemannAgent.java
+++ b/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestRiemannAgent.java
@@ -43,8 +43,6 @@ import org.apache.helix.tools.ClusterStateVerifier;
import org.testng.Assert;
import org.testng.annotations.Test;
-import com.google.common.collect.Lists;
-
public class TestRiemannAgent extends ZkUnitTestBase {
@Test
public void testStartAndStop() throws Exception {
http://git-wip-us.apache.org/repos/asf/helix/blob/8dc50de1/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestRiemannAlertProxy.java
----------------------------------------------------------------------
diff --git a/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestRiemannAlertProxy.java b/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestRiemannAlertProxy.java
new file mode 100644
index 0000000..700682a
--- /dev/null
+++ b/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestRiemannAlertProxy.java
@@ -0,0 +1,105 @@
+package org.apache.helix.monitoring;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+import java.util.List;
+
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.MonitoringTestHelper;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.Attributes;
+import org.apache.helix.model.Message.MessageType;
+import org.eclipse.jetty.client.ContentExchange;
+import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.client.HttpExchange;
+import org.eclipse.jetty.io.ByteArrayBuffer;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestRiemannAlertProxy extends ZkUnitTestBase {
+ void sendAlert(int proxyPort, String alertNameStr) throws Exception {
+ HttpClient client = new HttpClient();
+ client.setConnectorType(HttpClient.CONNECTOR_SELECT_CHANNEL);
+ client.start();
+
+ ContentExchange exchange = new ContentExchange(true);
+ exchange.setMethod("POST");
+ exchange.setURL("http://localhost:" + proxyPort);
+ exchange.setRequestContent(new ByteArrayBuffer(alertNameStr));
+
+ client.send(exchange);
+
+ // Waits until the exchange is terminated
+ int exchangeState = exchange.waitForDone();
+ Assert.assertTrue(exchangeState == HttpExchange.STATUS_COMPLETED);
+
+ client.stop();
+ }
+
+ @Test
+ public void testBasic() throws Exception {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+ HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
+ admin.addCluster(clusterName);
+
+ BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+ int proxyPort = MonitoringTestHelper.availableTcpPort();
+
+ RiemannAlertProxy proxy = new RiemannAlertProxy(proxyPort, baseAccessor);
+
+ proxy.start();
+
+ // Send a valid alert
+ String alertNameStr = String.format("(%s.%%.node1)(latency95)>(1000)", clusterName);
+ sendAlert(proxyPort, alertNameStr);
+
+ // Send an invalid alert
+ String inValidAlertNameStr = "IGNORABLE: invalid alert";
+ sendAlert(proxyPort, inValidAlertNameStr);
+
+ // Check only 1 alert controller message is sent
+ HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ List<Message> messages = accessor.getChildValues(keyBuilder.controllerMessages());
+
+ Assert.assertEquals(messages.size(), 1);
+ Message message = messages.get(0);
+ Assert.assertEquals(message.getMsgType(), MessageType.ALERT.toString());
+ Assert.assertEquals(message.getAttribute(Attributes.ALERT_NAME), alertNameStr);
+
+ proxy.shutdown();
+ System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+ }
+}