You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2014/02/27 19:33:46 UTC

git commit: [HELIX-319] refactor MonitoringClient to accommodate distributed monitoring server, fix a minor bug, add a new test case

Repository: helix
Updated Branches:
  refs/heads/helix-monitoring 3505beadb -> 8dc50de1a


[HELIX-319] refactor MonitoringClient to accommodate distributed monitoring server, fix a minor bug, add a new test case


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/8dc50de1
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/8dc50de1
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/8dc50de1

Branch: refs/heads/helix-monitoring
Commit: 8dc50de1ad0c7be5a1536e9d31af14564f880772
Parents: 3505bea
Author: zzhang <zz...@apache.org>
Authored: Thu Feb 27 10:33:29 2014 -0800
Committer: zzhang <zz...@apache.org>
Committed: Thu Feb 27 10:33:29 2014 -0800

----------------------------------------------------------------------
 .../main/java/org/apache/helix/PropertyKey.java |   9 ++
 .../zk/DefaultAlertMsgHandlerFactory.java       |  30 ++++--
 .../monitoring/RiemannMonitoringClient.java     |  13 +++
 .../apache/helix/monitoring/RiemannAgent.java   |   2 +-
 .../helix/monitoring/TestRiemannAgent.java      |   2 -
 .../helix/monitoring/TestRiemannAlertProxy.java | 105 +++++++++++++++++++
 6 files changed, 148 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/8dc50de1/helix-core/src/main/java/org/apache/helix/PropertyKey.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/PropertyKey.java b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
index e1c8f5f..ab41cf1 100644
--- a/helix-core/src/main/java/org/apache/helix/PropertyKey.java
+++ b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
@@ -345,6 +345,15 @@ public class PropertyKey {
     }
 
     /**
+    * Get a property key associated with all {@link AlertConfig}
+    * @return {@link PropertyKey}
+    */
+    public PropertyKey alertConfigs() {
+      return new PropertyKey(CONFIGS, ConfigScopeProperty.ALERT, AlertConfig.class,
+           _clusterName, ConfigScopeProperty.ALERT.name());
+    }
+
+    /**
     * Get a property key associated with a single {@link AlertConfig}
     * @param alertConfigName name of the configuration
     * @return {@link PropertyKey}

http://git-wip-us.apache.org/repos/asf/helix/blob/8dc50de1/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultAlertMsgHandlerFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultAlertMsgHandlerFactory.java b/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultAlertMsgHandlerFactory.java
index 814f01c..4766a35 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultAlertMsgHandlerFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultAlertMsgHandlerFactory.java
@@ -19,6 +19,8 @@ package org.apache.helix.manager.zk;
  * under the License.
  */
 
+import java.util.List;
+
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
@@ -37,7 +39,6 @@ import org.apache.log4j.Logger;
 
 public class DefaultAlertMsgHandlerFactory implements MessageHandlerFactory {
   private static final Logger LOG = Logger.getLogger(DefaultAlertMsgHandlerFactory.class);
-  public static final String DEFAULT_ALERT_CONFIG = "default";
 
   public static class DefaultAlertMsgHandler extends MessageHandler {
     public DefaultAlertMsgHandler(Message message, NotificationContext context) {
@@ -46,18 +47,27 @@ public class DefaultAlertMsgHandlerFactory implements MessageHandlerFactory {
 
     @Override
     public HelixTaskResult handleMessage() throws InterruptedException {
+      LOG.info("Handling alert message: " + _message);
       HelixManager manager = _notificationContext.getManager();
       HelixTaskResult result = new HelixTaskResult();
 
-      // get alert-name from message
+      // Get alert-name from message
       String alertNameStr = _message.getAttribute(Attributes.ALERT_NAME);
       AlertName alertName = AlertName.from(alertNameStr);
 
-      // get action from alert config
+      // Find action from alert config
       HelixDataAccessor accessor = manager.getHelixDataAccessor();
-      AlertConfig defaultAlertConfig =
-          accessor.getProperty(accessor.keyBuilder().alertConfig(DEFAULT_ALERT_CONFIG));
-      AlertAction action = defaultAlertConfig.findAlertAction(alertName);
+      List<AlertConfig> alertConfigs =
+          accessor.getChildValues(accessor.keyBuilder().alertConfigs());
+
+      AlertAction action = null;
+      for (AlertConfig alertConfig : alertConfigs) {
+        action = alertConfig.findAlertAction(alertName);
+        if (action != null) {
+          LOG.info("Find alertAction: " + action + " for alertName " + alertName);
+          break;
+        }
+      }
 
       if (action != null) {
         // perform action
@@ -93,7 +103,7 @@ public class DefaultAlertMsgHandlerFactory implements MessageHandlerFactory {
   }
 
   public DefaultAlertMsgHandlerFactory() {
-    LOG.info("construct default alert message handler factory");
+    LOG.info("Construct default alert message handler factory");
   }
 
   @Override
@@ -102,7 +112,7 @@ public class DefaultAlertMsgHandlerFactory implements MessageHandlerFactory {
 
     if (!type.equals(getMessageType())) {
       throw new HelixException("Unexpected msg type for message " + message.getMessageId()
-           + " type:" + message.getMsgType());
+          + " type:" + message.getMsgType());
     }
 
     return new DefaultAlertMsgHandler(message, context);
@@ -116,7 +126,7 @@ public class DefaultAlertMsgHandlerFactory implements MessageHandlerFactory {
 
   @Override
   public void reset() {
-    LOG.info("reset default alert message handler factory");
+    LOG.info("Reset default alert message handler factory");
   }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/8dc50de1/helix-monitor-client/src/main/java/org/apache/helix/monitoring/RiemannMonitoringClient.java
----------------------------------------------------------------------
diff --git a/helix-monitor-client/src/main/java/org/apache/helix/monitoring/RiemannMonitoringClient.java b/helix-monitor-client/src/main/java/org/apache/helix/monitoring/RiemannMonitoringClient.java
index e6c8b2d..591bcb9 100644
--- a/helix-monitor-client/src/main/java/org/apache/helix/monitoring/RiemannMonitoringClient.java
+++ b/helix-monitor-client/src/main/java/org/apache/helix/monitoring/RiemannMonitoringClient.java
@@ -90,6 +90,7 @@ public class RiemannMonitoringClient implements MonitoringClient {
 
   private int _batchSize;
   private final ResourceId _monitoringServiceName;
+  private final ClusterId _monitoringCluster;
   private int _monitoringServicePartitionNum;
 
   private final HelixManager _spectator;
@@ -119,6 +120,7 @@ public class RiemannMonitoringClient implements MonitoringClient {
       ResourceId monitoringServiceName, int batchSize) {
     _batchSize = batchSize > 0 ? batchSize : 1;
     _monitoringServiceName = monitoringServiceName;
+    _monitoringCluster = monitoringClusterId;
     _monitoringServicePartitionNum = 0;
     _clientMap = new ConcurrentHashMap<ResourceId, RiemannMonitoringClient.MonitoringClientInfo>();
 
@@ -143,7 +145,18 @@ public class RiemannMonitoringClient implements MonitoringClient {
     HelixDataAccessor accessor = _spectator.getHelixDataAccessor();
     IdealState idealState =
         accessor.getProperty(accessor.keyBuilder().idealStates(_monitoringServiceName.stringify()));
+    if (idealState == null) {
+      throw new IllegalArgumentException("Resource for MonitoringService: "
+          + _monitoringServiceName + " doesn't exist in cluster: " + _monitoringCluster);
+    }
+
     _monitoringServicePartitionNum = idealState.getNumPartitions();
+
+    if (_monitoringServicePartitionNum <= 0) {
+      throw new IllegalArgumentException("Invalid partition number of MonitoringService: "
+          + _monitoringServiceName + " in cluster: " + _monitoringCluster + ", was "
+          + _monitoringServicePartitionNum);
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/helix/blob/8dc50de1/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAgent.java
----------------------------------------------------------------------
diff --git a/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAgent.java b/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAgent.java
index 474d838..06957f4 100644
--- a/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAgent.java
+++ b/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAgent.java
@@ -44,7 +44,7 @@ public class RiemannAgent {
   final HelixManager _participant;
   final RiemannClient _client;
 
-  RiemannAgent(String zkAddr, String clusterName, int riemannPort) throws IOException {
+  public RiemannAgent(String zkAddr, String clusterName, int riemannPort) throws IOException {
     _random = new Random();
     _zkAddr = zkAddr;
     _clusterName = clusterName;

http://git-wip-us.apache.org/repos/asf/helix/blob/8dc50de1/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestRiemannAgent.java
----------------------------------------------------------------------
diff --git a/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestRiemannAgent.java b/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestRiemannAgent.java
index 100c28c..39ece64 100644
--- a/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestRiemannAgent.java
+++ b/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestRiemannAgent.java
@@ -43,8 +43,6 @@ import org.apache.helix.tools.ClusterStateVerifier;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
-import com.google.common.collect.Lists;
-
 public class TestRiemannAgent extends ZkUnitTestBase {
   @Test
   public void testStartAndStop() throws Exception {

http://git-wip-us.apache.org/repos/asf/helix/blob/8dc50de1/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestRiemannAlertProxy.java
----------------------------------------------------------------------
diff --git a/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestRiemannAlertProxy.java b/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestRiemannAlertProxy.java
new file mode 100644
index 0000000..700682a
--- /dev/null
+++ b/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestRiemannAlertProxy.java
@@ -0,0 +1,105 @@
+package org.apache.helix.monitoring;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+import java.util.List;
+
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.MonitoringTestHelper;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.Attributes;
+import org.apache.helix.model.Message.MessageType;
+import org.eclipse.jetty.client.ContentExchange;
+import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.client.HttpExchange;
+import org.eclipse.jetty.io.ByteArrayBuffer;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestRiemannAlertProxy extends ZkUnitTestBase {
+  void sendAlert(int proxyPort, String alertNameStr) throws Exception {
+    HttpClient client = new HttpClient();
+    client.setConnectorType(HttpClient.CONNECTOR_SELECT_CHANNEL);
+    client.start();
+
+    ContentExchange exchange = new ContentExchange(true);
+    exchange.setMethod("POST");
+    exchange.setURL("http://localhost:" + proxyPort);
+    exchange.setRequestContent(new ByteArrayBuffer(alertNameStr));
+
+    client.send(exchange);
+
+    // Waits until the exchange is terminated
+    int exchangeState = exchange.waitForDone();
+    Assert.assertTrue(exchangeState == HttpExchange.STATUS_COMPLETED);
+
+    client.stop();
+  }
+
+  @Test
+  public void testBasic() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
+    admin.addCluster(clusterName);
+
+    BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+    int proxyPort = MonitoringTestHelper.availableTcpPort();
+
+    RiemannAlertProxy proxy = new RiemannAlertProxy(proxyPort, baseAccessor);
+
+    proxy.start();
+
+    // Send a valid alert
+    String alertNameStr = String.format("(%s.%%.node1)(latency95)>(1000)", clusterName);
+    sendAlert(proxyPort, alertNameStr);
+
+    // Send an invalid alert
+    String inValidAlertNameStr = "IGNORABLE: invalid alert";
+    sendAlert(proxyPort, inValidAlertNameStr);
+
+    // Check only 1 alert controller message is sent
+    HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    List<Message> messages = accessor.getChildValues(keyBuilder.controllerMessages());
+
+    Assert.assertEquals(messages.size(), 1);
+    Message message = messages.get(0);
+    Assert.assertEquals(message.getMsgType(), MessageType.ALERT.toString());
+    Assert.assertEquals(message.getAttribute(Attributes.ALERT_NAME), alertNameStr);
+
+    proxy.shutdown();
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+}