You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2014/03/19 03:06:40 UTC
[2/2] git commit: [HELIX-319] refactor MonitoringClient to
accommodate distributed monitoring server
[HELIX-319] refactor MonitoringClient to accommodate distributed monitoring server
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/c0b1780d
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/c0b1780d
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/c0b1780d
Branch: refs/heads/helix-monitoring
Commit: c0b1780dc472ac3234dbbb6488015211c3e66ed2
Parents: db4c10a
Author: zzhang <zz...@uci.edu>
Authored: Tue Mar 18 19:06:19 2014 -0700
Committer: zzhang <zz...@uci.edu>
Committed: Tue Mar 18 19:06:19 2014 -0700
----------------------------------------------------------------------
.../helix/monitoring/MonitoringClient.java | 12 +-
.../helix/monitoring/MonitoringEvent.java | 96 +++-
.../org/apache/helix/MonitoringTestHelper.java | 114 ----
.../monitoring/RiemannMonitoringClient.java | 555 -------------------
.../helix/monitoring/riemann/ClientUtil.java | 69 +++
.../monitoring/riemann/RawRiemannClient.java | 229 ++++++++
.../riemann/RiemannClientWrapper.java | 234 ++++++++
helix-monitor-server/pom.xml | 5 -
.../apache/helix/monitoring/RiemannAgent.java | 137 -----
.../monitoring/RiemannAgentStateModel.java | 54 --
.../RiemannAgentStateModelFactory.java | 29 -
.../helix/monitoring/RiemannAlertProxy.java | 111 ----
.../apache/helix/monitoring/RiemannConfigs.java | 116 ----
.../monitoring/RiemannMonitoringServer.java | 73 ---
.../monitoring/riemann/HelixAlertMessenger.java | 112 ++++
.../helix/monitoring/riemann/RiemannAgent.java | 169 ++++++
.../monitoring/riemann/RiemannConfigs.java | 116 ++++
.../riemann/RiemannMonitoringServer.java | 74 +++
.../helix/monitoring/IntegrationTest.java | 206 -------
.../helix/monitoring/MonitoringTestHelper.java | 136 +++++
.../monitoring/TestClientServerMonitoring.java | 188 -------
.../helix/monitoring/TestRiemannAgent.java | 127 -----
.../helix/monitoring/TestRiemannAlertProxy.java | 105 ----
.../monitoring/TestRiemannMonitoringServer.java | 78 ---
.../monitoring/riemann/IntegrationTest.java | 199 +++++++
.../riemann/TestClientServerMonitoring.java | 181 ++++++
.../riemann/TestHelixAlertMessenger.java | 110 ++++
.../monitoring/riemann/TestRiemannAgent.java | 117 ++++
.../riemann/TestRiemannClientWrapper.java | 134 +++++
.../riemann/TestRiemannMonitoringServer.java | 82 +++
30 files changed, 2047 insertions(+), 1921 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/c0b1780d/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringClient.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringClient.java b/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringClient.java
index 743f8b4..a055354 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringClient.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringClient.java
@@ -21,8 +21,6 @@ package org.apache.helix.monitoring;
import java.util.concurrent.TimeUnit;
-import org.apache.helix.api.id.ResourceId;
-
/**
* Interface for a client that can register with a monitoring server and send events for monitoring
*/
@@ -40,30 +38,26 @@ public interface MonitoringClient {
/**
* Send an event
- * @param resource
* @param e the event
- * @param batch true if this should be part of a batch operation
* @return true if the event was sent (or queued for batching), false otherwise
*/
- boolean send(ResourceId resource, MonitoringEvent e, boolean batch);
+ boolean send(MonitoringEvent e);
/**
* Send an event and flush any outstanding messages
- * @param resource
* @param e the event
* @return true if events were successfully sent, false otherwise
*/
- boolean sendAndFlush(ResourceId resource, MonitoringEvent e);
+ boolean sendAndFlush(MonitoringEvent e);
/**
* Schedule an operation to run
- * @param resource
* @param interval the frequency
* @param delay the amount of time to wait before the first execution
* @param unit the unit of time to use
* @param r the code to run
*/
- void every(ResourceId resource, long interval, long delay, TimeUnit unit, Runnable r);
+ void every(long interval, long delay, TimeUnit unit, Runnable r);
/**
* Check if there is a valid connection to a monitoring server
http://git-wip-us.apache.org/repos/asf/helix/blob/c0b1780d/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringEvent.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringEvent.java b/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringEvent.java
index 80006fb..2044a3a 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringEvent.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringEvent.java
@@ -19,10 +19,14 @@ package org.apache.helix.monitoring;
* under the License.
*/
+import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import org.apache.helix.api.Scope;
+import org.apache.helix.api.Scope.ScopeType;
import org.apache.helix.api.id.ClusterId;
import org.apache.helix.api.id.ControllerId;
import org.apache.helix.api.id.ParticipantId;
@@ -32,6 +36,7 @@ import org.apache.helix.api.id.SpectatorId;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
/**
* A generic monitoring event based on Helix constructs. This is based on Riemann's EventDSL.
@@ -49,8 +54,10 @@ public class MonitoringEvent {
private Float _floatMetric;
private Double _doubleMetric;
private Float _ttl;
- private List<String> _tags;
- private Map<String, String> _attributes;
+ private final List<String> _tags;
+ private final Map<String, String> _attributes;
+ private String _shardingStr;
+ private final Set<ScopeType> _shardingScopes;
/**
* Create an empty MonitoringEvent
@@ -70,6 +77,8 @@ public class MonitoringEvent {
_ttl = null;
_tags = Lists.newLinkedList();
_attributes = Maps.newHashMap();
+ _shardingStr = null;
+ _shardingScopes = Sets.newHashSet();
}
/**
@@ -253,13 +262,69 @@ public class MonitoringEvent {
return this;
}
- // below are a set of package-private getters for each of the fields
+ /**
+ * Set sharding key using string
+ * @param shardingStr
+ * @return MonitoringEvent
+ */
+ public MonitoringEvent shardingString(String shardingStr) {
+ _shardingStr = shardingStr;
+ return this;
+ }
+
+ /**
+ * Set sharding key using scopes
+ * @param scopes
+ * @return MonitoringEvent
+ */
+ public MonitoringEvent shardingScopes(ScopeType... scopes) {
+ _shardingScopes.clear();
+ _shardingScopes.addAll(Arrays.asList(scopes));
+ return this;
+ }
+
+ /**
+ * Return sharding key which is used by MonitoringClient to choose MonitoringServer
+ * @return sharding key
+ */
+ public String shardingKey() {
+ // if shardingStr exists, use shardingStr
+ if (_shardingStr != null) {
+ return _shardingStr;
+ }
+
+ // if shardingStr doesn't exist, use shardingScopes
+ if (_shardingScopes.isEmpty()) {
+ _shardingScopes.addAll(Arrays.asList(ScopeType.CLUSTER, ScopeType.RESOURCE));
+ }
- String host() {
+ StringBuilder sb = new StringBuilder();
+ if (_shardingScopes.contains(ScopeType.CLUSTER)) {
+ sb.append(_clusterId == null ? "%" : _clusterId.stringify());
+ }
+ if (_shardingScopes.contains(ScopeType.RESOURCE)) {
+ sb.append("|");
+ sb.append(_resourceId == null ? "%" : _resourceId.stringify());
+ }
+ if (_shardingScopes.contains(ScopeType.PARTITION)) {
+ sb.append("|");
+ sb.append(_partitionId == null ? "%" : _partitionId.stringify());
+ }
+ if (_shardingScopes.contains(ScopeType.PARTICIPANT)) {
+ sb.append("|");
+ sb.append(_host == null ? "%" : _host);
+ }
+
+ return sb.toString();
+ }
+
+ // below are used for converting MonitoringEvent to Riemann EventDSL
+
+ public String host() {
return _host;
}
- String service() {
+ public String service() {
if (_clusterId == null) {
_clusterId = ClusterId.from("%");
}
@@ -269,42 +334,45 @@ public class MonitoringEvent {
if (_partitionId == null) {
_partitionId = PartitionId.from("%");
}
+ if (_name == null) {
+ _name = "%";
+ }
return String.format("%s|%s|%s|%s", _clusterId, _resourceId, _partitionId, _name);
}
- String eventState() {
+ public String eventState() {
return _eventState;
}
- String description() {
+ public String description() {
return _description;
}
- Long time() {
+ public Long time() {
return _time;
}
- Long longMetric() {
+ public Long longMetric() {
return _longMetric;
}
- Float floatMetric() {
+ public Float floatMetric() {
return _floatMetric;
}
- Double doubleMetric() {
+ public Double doubleMetric() {
return _doubleMetric;
}
- Float ttl() {
+ public Float ttl() {
return _ttl;
}
- List<String> tags() {
+ public List<String> tags() {
return _tags;
}
- Map<String, String> attributes() {
+ public Map<String, String> attributes() {
return _attributes;
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/c0b1780d/helix-core/src/test/java/org/apache/helix/MonitoringTestHelper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/MonitoringTestHelper.java b/helix-core/src/test/java/org/apache/helix/MonitoringTestHelper.java
deleted file mode 100644
index c17dab0..0000000
--- a/helix-core/src/test/java/org/apache/helix/MonitoringTestHelper.java
+++ /dev/null
@@ -1,114 +0,0 @@
-package org.apache.helix;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.io.IOException;
-import java.net.ServerSocket;
-
-import org.I0Itec.zkclient.NetworkUtil;
-
-public class MonitoringTestHelper {
- static final int MAX_PORT = 65535;
-
- /**
- * generate a default riemann.config
- * @param riemannPort
- * @return
- */
- public static String getRiemannConfigString(int riemannPort) {
- StringBuilder sb = new StringBuilder();
- sb.append("(logging/init :file \"/dev/null\")\n\n")
- .append("(tcp-server :host \"0.0.0.0\" :port " + riemannPort + ")\n\n")
- .append("(instrumentation {:interval 1})\n\n")
- .append("; (udp-server :host \"0.0.0.0\")\n")
- .append("; (ws-server :host \"0.0.0.0\")\n")
- .append("; (repl-server :host \"0.0.0.0\")\n\n")
- .append("(periodically-expire 1)\n\n")
- .append(
- "(let [index (default :ttl 3 (update-index (index)))]\n (streams\n (expired prn)\n index))\n");
-
- return sb.toString();
- }
-
- /**
- * generate a test config for checking latency
- * @param proxyPort
- * @return
- */
- public static String getLatencyCheckConfigString(int proxyPort)
- {
- StringBuilder sb = new StringBuilder();
- sb.append("(require 'riemann.config)\n")
- .append("(require 'clj-http.client)\n\n")
- .append("(defn parse-double\n \"Convert a string into a double\"\n ")
- .append("[instr]\n (Double/parseDouble instr))\n\n")
- .append("(defn check-95th-latency\n \"Check if the 95th percentile latency is within expectations\"\n ")
- .append("[e]\n (let [latency (parse-double (:latency95 e))]\n ")
- .append("(if (> latency 1.0) \n ; Report if the 95th percentile latency exceeds 1.0s\n ")
- .append("(do (prn (:host e) \"has an unacceptable 95th percentile latency of\" latency)\n ")
- .append("(let [alert-name-str (str \"(\" (:cluster e) \".%.\" (:host e) \")(latency95)>(1000)\" )\n ")
- .append("proxy-url (str \"http://localhost:\" " + proxyPort + " )]\n ")
- .append("(clj-http.client/post proxy-url {:body alert-name-str }))))))\n\n")
- .append("(streams\n (where\n ; Only process services containing LatencyReport\n ")
- .append("(and (service #\".*LatencyReport.*\") (not (state \"expired\")))\n ")
- .append("check-95th-latency))\n");
-
- return sb.toString();
- }
-
- /**
- * find an available tcp port
- * @return
- */
- public static int availableTcpPort() {
- ServerSocket ss = null;
- try {
- ss = new ServerSocket(0);
- ss.setReuseAddress(true);
- return ss.getLocalPort();
- } catch (IOException e) {
- // ok
- } finally {
- if (ss != null) {
- try {
- ss.close();
- } catch (IOException e) {
- // should not be thrown
- }
- }
- }
- return -1;
- }
-
- /**
- * find the first available port starting from startPort inclusive
- * @param startPort
- * @return
- */
- public static int availableTcpPort(int startPort) {
- int port = startPort;
- for (; port <= MAX_PORT; port++) {
- if (NetworkUtil.isPortFree(port))
- break;
- }
-
- return port > MAX_PORT ? -1 : port;
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/c0b1780d/helix-monitor-client/src/main/java/org/apache/helix/monitoring/RiemannMonitoringClient.java
----------------------------------------------------------------------
diff --git a/helix-monitor-client/src/main/java/org/apache/helix/monitoring/RiemannMonitoringClient.java b/helix-monitor-client/src/main/java/org/apache/helix/monitoring/RiemannMonitoringClient.java
deleted file mode 100644
index 591bcb9..0000000
--- a/helix-monitor-client/src/main/java/org/apache/helix/monitoring/RiemannMonitoringClient.java
+++ /dev/null
@@ -1,555 +0,0 @@
-package org.apache.helix.monitoring;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.io.IOException;
-import java.net.UnknownHostException;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerFactory;
-import org.apache.helix.InstanceType;
-import org.apache.helix.api.id.ClusterId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.InstanceConfig;
-import org.apache.helix.spectator.RoutingTableProvider;
-import org.apache.log4j.Logger;
-
-import com.aphyr.riemann.client.AbstractRiemannClient;
-import com.aphyr.riemann.client.EventDSL;
-import com.aphyr.riemann.client.RiemannBatchClient;
-import com.aphyr.riemann.client.RiemannClient;
-import com.aphyr.riemann.client.UnsupportedJVMException;
-import com.google.common.collect.Lists;
-
-/**
- * A Riemann-based monitoring client Thread safety note: connect and disconnect are serialized to
- * ensure that there is no attempt to connect or disconnect with an inconsistent state. The send
- * routines are not protected for performance reasons, and so a single send/flush may fail.
- */
-public class RiemannMonitoringClient implements MonitoringClient {
- private static final Logger LOG = Logger.getLogger(RiemannMonitoringClient.class);
- public static final String DEFAULT_MONITORING_SERVICE_NAME = "MonitoringService";
-
- /**
- * Contains information about a RiemannClient inside a MonitoringClient
- */
- class MonitoringClientInfo {
- /**
- * host/port of riemann server to which this client connects
- */
- String _host;
- int _port;
-
- /**
- * riemann client
- */
- RiemannClient _client;
-
- /**
- * batch rieman client, null if batch is not enabled
- */
- RiemannBatchClient _batchClient;
-
- /**
- * list of periodic tasks scheduled on this riemann client
- */
- final List<ScheduledItem> _scheduledItems;
-
- public MonitoringClientInfo() {
- _host = null;
- _port = -1;
- _client = null;
- _batchClient = null;
- _scheduledItems = Lists.newArrayList();
- }
-
- }
-
- private int _batchSize;
- private final ResourceId _monitoringServiceName;
- private final ClusterId _monitoringCluster;
- private int _monitoringServicePartitionNum;
-
- private final HelixManager _spectator;
- private final RoutingTableProvider _routingTableProvider;
- private final Map<ResourceId, MonitoringClientInfo> _clientMap;
-
- /**
- * Create a non-batched monitoring client
- * @param zkAddr
- * @param monitoringClusterId
- */
- public RiemannMonitoringClient(String zkAddr, ClusterId monitoringClusterId) {
- this(zkAddr, monitoringClusterId, ResourceId.from(DEFAULT_MONITORING_SERVICE_NAME), 1);
- }
-
- /**
- * Create a monitoring client that supports batching
- * @param clusterId
- * the cluster to monitor
- * @param accessor
- * an accessor for the cluster
- * @param batchSize
- * the number of events in a batch
- * @throws Exception
- */
- public RiemannMonitoringClient(String zkAddr, ClusterId monitoringClusterId,
- ResourceId monitoringServiceName, int batchSize) {
- _batchSize = batchSize > 0 ? batchSize : 1;
- _monitoringServiceName = monitoringServiceName;
- _monitoringCluster = monitoringClusterId;
- _monitoringServicePartitionNum = 0;
- _clientMap = new ConcurrentHashMap<ResourceId, RiemannMonitoringClient.MonitoringClientInfo>();
-
- _spectator =
- HelixManagerFactory.getZKHelixManager(monitoringClusterId.stringify(), null,
- InstanceType.SPECTATOR, zkAddr);
- _routingTableProvider = new RoutingTableProvider();
- }
-
- @Override
- public void connect() throws Exception {
- if (isConnected()) {
- LOG.error("Already connected to Riemann!");
- return;
- }
-
- // Connect spectator to the cluster being monitored
- _spectator.connect();
- _spectator.addExternalViewChangeListener(_routingTableProvider);
-
- // Get partition number of monitoring service
- HelixDataAccessor accessor = _spectator.getHelixDataAccessor();
- IdealState idealState =
- accessor.getProperty(accessor.keyBuilder().idealStates(_monitoringServiceName.stringify()));
- if (idealState == null) {
- throw new IllegalArgumentException("Resource for MonitoringService: "
- + _monitoringServiceName + " doesn't exist in cluster: " + _monitoringCluster);
- }
-
- _monitoringServicePartitionNum = idealState.getNumPartitions();
-
- if (_monitoringServicePartitionNum <= 0) {
- throw new IllegalArgumentException("Invalid partition number of MonitoringService: "
- + _monitoringServiceName + " in cluster: " + _monitoringCluster + ", was "
- + _monitoringServicePartitionNum);
- }
- }
-
- @Override
- public void disconnect() {
- // disconnect internal riemann clients
- for (ResourceId resource : _clientMap.keySet()) {
- disconnectInternal(resource);
- }
-
- _spectator.disconnect();
- _monitoringServicePartitionNum = 0;
- }
-
- @Override
- public boolean isConnected() {
- return _spectator.isConnected();
- }
-
- /**
- * Flush a riemann client for a resource
- * @param resource
- * @return
- */
- private boolean flush(ResourceId resource) {
- if (!isConnected()) {
- LOG.error("Tried to flush a Riemann client that is not connected!");
- return false;
- }
-
- AbstractRiemannClient c = getClient(resource, true);
- if (c == null) {
- LOG.warn("Fail to get riemann client for resource: " + resource);
- return false;
- }
-
- try {
- c.flush();
- return true;
- } catch (IOException e) {
- LOG.error("Problem flushing the Riemann event queue for resource: " + resource, e);
- }
- return false;
- }
-
- @Override
- public boolean flush() {
- boolean succeed = true;
- for (ResourceId resource : _clientMap.keySet()) {
- succeed = succeed && flush(resource);
- }
-
- return succeed;
- }
-
- @Override
- public boolean send(ResourceId resource, MonitoringEvent event, boolean batch) {
- if (!isConnected()) {
- LOG.error("Riemann connection must be active in order to send an event!");
- return false;
- }
-
- if (!isConnected(resource)) {
- connect(resource, null, event);
- } else {
- AbstractRiemannClient c = getClient(resource, batch);
- convertEvent(c, event).send();
- }
-
- return true;
- }
-
- @Override
- public boolean sendAndFlush(ResourceId resource, MonitoringEvent event) {
-
- boolean sendResult = send(resource, event, true);
- if (sendResult) {
- return flush(resource);
- }
- return false;
- }
-
- /**
- * Batch should be enabled for either all or none of riemann clients
- */
- @Override
- public boolean isBatchingEnabled() {
- return _batchSize > 1;
- }
-
- @Override
- public int getBatchSize() {
- return _batchSize;
- }
-
- /**
- * Check if a riemann client for given resource is connected
- * @param resource
- * @return true if riemann client is connected, false otherwise
- */
- private boolean isConnected(ResourceId resource) {
- if (!isConnected()) {
- return false;
- }
-
- MonitoringClientInfo clientInfo = _clientMap.get(resource);
- return clientInfo != null && clientInfo._client != null && clientInfo._client.isConnected();
- }
-
- @Override
- public synchronized void every(ResourceId resource, long interval, long delay, TimeUnit unit,
- Runnable r) {
- if (!isConnected()) {
- LOG.error("Riemann client must be connected in order to send events!");
- return;
- }
-
- ScheduledItem scheduledItem = new ScheduledItem();
- scheduledItem.interval = interval;
- scheduledItem.delay = delay;
- scheduledItem.unit = unit;
- scheduledItem.r = r;
-
- if (isConnected(resource)) {
- MonitoringClientInfo clientInfo = _clientMap.get(resource);
- clientInfo._scheduledItems.add(scheduledItem);
- getClient(resource).every(interval, delay, unit, r);
- } else {
- connect(resource, scheduledItem, null);
- }
- }
-
- /**
- * Connect a riemann client to riemann server given a resource
- * @param resource
- * @param scheduledItem
- * @param pendingEvent
- */
- private void connect(ResourceId resource, ScheduledItem scheduledItem,
- MonitoringEvent pendingEvent) {
- // Hash by resourceId
- int partitionKey = resource.hashCode() % _monitoringServicePartitionNum;
- List<InstanceConfig> instances =
- _routingTableProvider.getInstances(_monitoringServiceName.stringify(),
- _monitoringServiceName + "_" + partitionKey, "ONLINE");
-
- if (instances.size() == 0) {
- LOG.error("Riemann monitoring server for resource: " + resource + " at partitionKey: "
- + partitionKey + " is not available");
- return;
- }
-
- InstanceConfig instanceConfig = instances.get(0);
- String host = instanceConfig.getHostName();
- int port = Integer.parseInt(instanceConfig.getPort());
-
- // Do the connect asynchronously as a tcp establishment could take time
- doConnectAsync(resource, host, port, scheduledItem, pendingEvent);
- }
-
- /**
- * Get a raw, non-batched Riemann client. WARNING: do not cache this, as it may be disconnected
- * without notice
- * @return RiemannClient
- */
- private RiemannClient getClient(ResourceId resource) {
- MonitoringClientInfo clientInfo = _clientMap.get(resource);
- return clientInfo == null ? null : clientInfo._client;
- }
-
- /**
- * Get a batched Riemann client (if batching is supported) WARNING: do not cache this, as it may
- * be disconnected without notice
- * @return RiemannBatchClient
- */
- private RiemannBatchClient getBatchClient(ResourceId resource) {
- MonitoringClientInfo clientInfo = _clientMap.get(resource);
- return clientInfo == null ? null : clientInfo._batchClient;
- }
-
- /**
- * Get a Riemann client WARNING: do not cache this, as it may be disconnected without notice
- * @param batch
- * true if the client is preferred to support batching, false otherwise
- * @return AbstractRiemannClient
- */
- private AbstractRiemannClient getClient(ResourceId resource, boolean batch) {
- if (batch && isBatchingEnabled()) {
- return getBatchClient(resource);
- } else {
- return getClient(resource);
- }
- }
-
- /**
- * Based on the contents of the leader node, connect to a Riemann server
- * @param leader
- * node containing host/port
- */
- private void doConnectAsync(final ResourceId resource, final String host, final int port,
- final ScheduledItem scheduledItem, final MonitoringEvent pendingEvent) {
- new Thread() {
- @Override
- public void run() {
- synchronized (RiemannMonitoringClient.this) {
- if (resource != null && host != null && port != -1) {
- connectInternal(resource, host, port, scheduledItem, pendingEvent);
- } else {
- LOG.error("Fail to doConnectAsync becaue of invalid arguments, resource: " + resource
- + ", host: " + host + ", port: " + port);
- }
- }
- }
- }.start();
- }
-
- /**
- * Establishment of a connection to a Riemann server
- * @param resource
- * @param host
- * @param port
- * @param scheduledItem
- * @param pendingEvent
- */
- private synchronized void connectInternal(ResourceId resource, String host, int port,
- ScheduledItem scheduledItem, MonitoringEvent pendingEvent) {
- MonitoringClientInfo clientInfo = _clientMap.get(resource);
- if (clientInfo != null && clientInfo._host.equals(host) && clientInfo._port == port
- && clientInfo._client != null && clientInfo._client.isConnected()) {
- LOG.info("Riemann client for resource: " + resource + " already connected on " + host + ":"
- + port);
-
- // We might have to reschedule tasks
- if (scheduledItem != null) {
- clientInfo._scheduledItems.add(scheduledItem);
- clientInfo._client.every(scheduledItem.interval, scheduledItem.delay, scheduledItem.unit,
- scheduledItem.r);
- }
-
- // Sending over pending event
- if (pendingEvent != null) {
- convertEvent(clientInfo._client, pendingEvent).send();
- }
-
- return;
- }
-
- // Disconnect from previous riemann server
- disconnectInternal(resource);
-
- // Connect to new riemann server
- RiemannClient client = null;
- RiemannBatchClient batchClient = null;
- try {
- client = RiemannClient.tcp(host, port);
- client.connect();
- } catch (IOException e) {
- LOG.error("Error establishing a connection!", e);
-
- }
-
- if (client != null && getBatchSize() > 1) {
- try {
- batchClient = new RiemannBatchClient(_batchSize, client);
- } catch (UnknownHostException e) {
- _batchSize = 1;
- LOG.error("Could not resolve host", e);
- } catch (UnsupportedJVMException e) {
- _batchSize = 1;
- LOG.warn("Batching not enabled because of incompatible JVM", e);
- }
- }
-
- if (clientInfo == null) {
- clientInfo = new MonitoringClientInfo();
- }
-
- clientInfo._host = host;
- clientInfo._port = port;
- clientInfo._client = client;
- clientInfo._batchClient = batchClient;
- if (scheduledItem != null) {
- clientInfo._scheduledItems.add(scheduledItem);
- }
- _clientMap.put(resource, clientInfo);
-
- // We might have to reschedule tasks
- for (ScheduledItem item : clientInfo._scheduledItems) {
- client.every(item.interval, item.delay, item.unit, item.r);
- }
-
- // Send over pending event
- if (pendingEvent != null) {
- convertEvent(client, pendingEvent).send();
- }
- }
-
- /**
- * Teardown of a connection to a Riemann server
- */
- private synchronized void disconnectInternal(ResourceId resource) {
- MonitoringClientInfo clientInfo = _clientMap.get(resource);
- if (clientInfo == null) {
- return;
- }
-
- RiemannBatchClient batchClient = clientInfo._batchClient;
- RiemannClient client = clientInfo._client;
-
- clientInfo._batchClient = null;
- clientInfo._client = null;
-
- try {
- if (batchClient != null && batchClient.isConnected()) {
- batchClient.scheduler().shutdown();
- batchClient.disconnect();
- } else if (client != null && client.isConnected()) {
- client.scheduler().shutdown();
- client.disconnect();
- }
- } catch (IOException e) {
- LOG.error("Disconnection error", e);
- }
- }
-
- /**
- * Change a helix monitoring event into a Riemann event
- * @param c Riemann client
- * @param helixEvent helix event
- * @return Riemann EventDSL
- */
- private EventDSL convertEvent(AbstractRiemannClient c, MonitoringEvent helixEvent) {
- EventDSL event = c.event();
- if (helixEvent.host() != null) {
- event.host(helixEvent.host());
- }
- if (helixEvent.service() != null) {
- event.service(helixEvent.service());
- }
- if (helixEvent.eventState() != null) {
- event.state(helixEvent.eventState());
- }
- if (helixEvent.description() != null) {
- event.description(helixEvent.description());
- }
- if (helixEvent.time() != null) {
- event.time(helixEvent.time());
- }
- if (helixEvent.ttl() != null) {
- event.ttl(helixEvent.ttl());
- }
- if (helixEvent.longMetric() != null) {
- event.metric(helixEvent.longMetric());
- } else if (helixEvent.floatMetric() != null) {
- event.metric(helixEvent.floatMetric());
- } else if (helixEvent.doubleMetric() != null) {
- event.metric(helixEvent.doubleMetric());
- }
- if (!helixEvent.tags().isEmpty()) {
- event.tags(helixEvent.tags());
- }
- if (!helixEvent.attributes().isEmpty()) {
- event.attributes.putAll(helixEvent.attributes());
- }
- return event;
- }
-
- /**
- * Wrapper for a task that should be run to a schedule
- */
- private static class ScheduledItem {
- long interval;
- long delay;
- TimeUnit unit;
- Runnable r;
-
- @Override
- public boolean equals(Object other) {
- if (other instanceof ScheduledItem) {
- ScheduledItem that = (ScheduledItem) other;
- return interval == that.interval && delay == that.delay && unit == that.unit && r == that.r;
- }
- return false;
- }
-
- @Override
- public int hashCode() {
- return toString().hashCode();
- }
-
- @Override
- public String toString() {
- return String.format("interval: %d|delay: %d|timeunit: %s|runnable: %s", interval, delay,
- unit.toString(), r.toString());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/c0b1780d/helix-monitor-client/src/main/java/org/apache/helix/monitoring/riemann/ClientUtil.java
----------------------------------------------------------------------
diff --git a/helix-monitor-client/src/main/java/org/apache/helix/monitoring/riemann/ClientUtil.java b/helix-monitor-client/src/main/java/org/apache/helix/monitoring/riemann/ClientUtil.java
new file mode 100644
index 0000000..0c12ca3
--- /dev/null
+++ b/helix-monitor-client/src/main/java/org/apache/helix/monitoring/riemann/ClientUtil.java
@@ -0,0 +1,69 @@
+package org.apache.helix.monitoring.riemann;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.monitoring.MonitoringEvent;
+
+import com.aphyr.riemann.client.AbstractRiemannClient;
+import com.aphyr.riemann.client.EventDSL;
+
+public class ClientUtil {
+ /**
+ * Change a helix monitoring event into a Riemann event
+ * @param c Riemann client
+ * @param helixEvent helix event
+ * @return Riemann EventDSL
+ */
+ public static EventDSL convertEvent(AbstractRiemannClient c, MonitoringEvent helixEvent) {
+ EventDSL event = c.event();
+ if (helixEvent.host() != null) {
+ event.host(helixEvent.host());
+ }
+ if (helixEvent.service() != null) {
+ event.service(helixEvent.service());
+ }
+ if (helixEvent.eventState() != null) {
+ event.state(helixEvent.eventState());
+ }
+ if (helixEvent.description() != null) {
+ event.description(helixEvent.description());
+ }
+ if (helixEvent.time() != null) {
+ event.time(helixEvent.time());
+ }
+ if (helixEvent.ttl() != null) {
+ event.ttl(helixEvent.ttl());
+ }
+ if (helixEvent.longMetric() != null) {
+ event.metric(helixEvent.longMetric());
+ } else if (helixEvent.floatMetric() != null) {
+ event.metric(helixEvent.floatMetric());
+ } else if (helixEvent.doubleMetric() != null) {
+ event.metric(helixEvent.doubleMetric());
+ }
+ if (!helixEvent.tags().isEmpty()) {
+ event.tags(helixEvent.tags());
+ }
+ if (!helixEvent.attributes().isEmpty()) {
+ event.attributes.putAll(helixEvent.attributes());
+ }
+ return event;
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/c0b1780d/helix-monitor-client/src/main/java/org/apache/helix/monitoring/riemann/RawRiemannClient.java
----------------------------------------------------------------------
diff --git a/helix-monitor-client/src/main/java/org/apache/helix/monitoring/riemann/RawRiemannClient.java b/helix-monitor-client/src/main/java/org/apache/helix/monitoring/riemann/RawRiemannClient.java
new file mode 100644
index 0000000..4fe5902
--- /dev/null
+++ b/helix-monitor-client/src/main/java/org/apache/helix/monitoring/riemann/RawRiemannClient.java
@@ -0,0 +1,229 @@
+package org.apache.helix.monitoring.riemann;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.helix.monitoring.MonitoringEvent;
+import org.apache.log4j.Logger;
+
+import com.aphyr.riemann.client.AbstractRiemannClient;
+import com.aphyr.riemann.client.RiemannBatchClient;
+import com.aphyr.riemann.client.RiemannClient;
+import com.aphyr.riemann.client.UnsupportedJVMException;
+
+/**
+ * Simple wrapper around RiemannClient that does auto reconnect
+ */
+class RawRiemannClient {
+ private static final Logger LOG = Logger.getLogger(RawRiemannClient.class);
+ private static final int HEARTBEAT_PERIOD = 10;
+ private static final int TIMEOUT_LIMIT = 3;
+
+ enum State {
+ DISCONNECTED,
+ CONNECTED,
+ RECONNECTING
+ }
+
+ private RiemannClient _rclient;
+ private RiemannBatchClient _brclient;
+ private volatile State _state = State.DISCONNECTED;
+ private final String _host;
+ private final int _port;
+ private int _batchSize;
+ private Thread _reconnectThread;
+
+ public RawRiemannClient(String host, int port) {
+ this(host, port, 1);
+ }
+
+ public RawRiemannClient(String host, int port, int batchSize) {
+ _host = host;
+ _port = port;
+ _batchSize = batchSize > 0 ? batchSize : 1;
+ }
+
+ private synchronized boolean doConnect() {
+ if (_state == State.CONNECTED) {
+ return true;
+ }
+
+ try {
+ _rclient = RiemannClient.tcp(_host, _port);
+ _rclient.connect();
+ if (_rclient != null && _batchSize > 1) {
+ try {
+ _brclient = new RiemannBatchClient(_batchSize, _rclient);
+ } catch (UnknownHostException e) {
+ _batchSize = 1;
+ LOG.error("Could not resolve host", e);
+ } catch (UnsupportedJVMException e) {
+ _batchSize = 1;
+ LOG.warn("Batching not enabled because of incompatible JVM", e);
+ }
+ }
+
+ Random random = new Random();
+ _rclient.every(HEARTBEAT_PERIOD, random.nextInt(HEARTBEAT_PERIOD), TimeUnit.SECONDS,
+ new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ _rclient.event().service("heartbeat").ttl(TIMEOUT_LIMIT * HEARTBEAT_PERIOD)
+ .sendWithAck();
+ _state = State.CONNECTED;
+ } catch (Exception e) {
+ LOG.error("Exception in send heatbeat to riemann server: " + _host + ":" + _port, e);
+ _state = State.RECONNECTING;
+ }
+ }
+ });
+ _state = State.CONNECTED;
+ return true;
+ } catch (IOException e) {
+ LOG.error("Fail to connect to riemann server: " + _host + ":" + _port);
+ }
+
+ return false;
+ }
+
+ /**
+ * Make a connection to Riemann server; if fails, start a thread for retrying
+ */
+ public synchronized void connect() {
+ boolean success = doConnect();
+ if (!success) {
+ _reconnectThread = new Thread(new Runnable() {
+
+ @Override
+ public void run() {
+ LOG.info("Start reconnect thread");
+ Random random = new Random();
+ try {
+ while (!Thread.currentThread().isInterrupted()) {
+ boolean success = doConnect();
+ if (success) {
+ break;
+ }
+
+ TimeUnit.SECONDS.sleep(HEARTBEAT_PERIOD + random.nextInt() % HEARTBEAT_PERIOD);
+
+ }
+ } catch (InterruptedException e) {
+ LOG.info("Reconnect thread is interrupted");
+ } finally {
+ LOG.info("Terminate reconnect thread");
+ }
+ }
+ });
+
+ _reconnectThread.start();
+ }
+ }
+
+ /**
+ * Disconnect from Riemann server
+ */
+ public synchronized void disconnect() {
+ try {
+ if (_reconnectThread != null) {
+ _reconnectThread.interrupt();
+ }
+
+ if (_rclient != null) {
+ _rclient.scheduler().shutdown();
+ _rclient.disconnect();
+ }
+ } catch (IOException e) {
+ LOG.error("Fail to disconnect rclient for " + _host + ":" + _port, e);
+ }
+ _state = State.DISCONNECTED;
+ }
+
+ public boolean isConnected() {
+ return _state == State.CONNECTED;
+ }
+
+ private AbstractRiemannClient client() {
+ if (isBatchEnabled()) {
+ return _brclient;
+ } else {
+ return _rclient;
+ }
+ }
+
+ public boolean send(MonitoringEvent event) {
+ if (!isConnected()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Fail to send event: " + event + " to " + _host + ":" + _port
+ + ", because state is not connected, was: " + _state);
+ }
+ return false;
+ }
+
+ try {
+ ClientUtil.convertEvent(client(), event).send();
+ return true;
+ } catch (Exception e) {
+ LOG.error("Fail to send event: " + event + " to " + _host + ":" + _port, e);
+ }
+ return false;
+ }
+
+ public boolean flush() {
+ if (!isConnected()) {
+ return false;
+ }
+
+ try {
+ client().flush();
+ return true;
+ } catch (IOException e) {
+ LOG.error("Problem flushing the Riemann event queue", e);
+ }
+ return false;
+ }
+
+ public boolean sendAndFlush(MonitoringEvent event) {
+ boolean success = send(event);
+ if (success) {
+ return flush();
+ }
+
+ return false;
+ }
+
+ public int getBatchSize() {
+ return _batchSize;
+ }
+
+ public boolean isBatchEnabled() {
+ return _batchSize > 1;
+ }
+
+ public State getState() {
+ return _state;
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/c0b1780d/helix-monitor-client/src/main/java/org/apache/helix/monitoring/riemann/RiemannClientWrapper.java
----------------------------------------------------------------------
diff --git a/helix-monitor-client/src/main/java/org/apache/helix/monitoring/riemann/RiemannClientWrapper.java b/helix-monitor-client/src/main/java/org/apache/helix/monitoring/riemann/RiemannClientWrapper.java
new file mode 100644
index 0000000..bd6517f
--- /dev/null
+++ b/helix-monitor-client/src/main/java/org/apache/helix/monitoring/riemann/RiemannClientWrapper.java
@@ -0,0 +1,234 @@
+package org.apache.helix.monitoring.riemann;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.helix.monitoring.MonitoringClient;
+import org.apache.helix.monitoring.MonitoringEvent;
+import org.apache.log4j.Logger;
+
+/**
+ * Wrapper around a list of RawRiemannClients; if one client fails, try the next one in list
+ */
+public class RiemannClientWrapper implements MonitoringClient {
+ private static final Logger LOG = Logger.getLogger(RiemannClientWrapper.class);
+
+ /**
+ * A list of "host:port" addresses for Riemann servers
+ */
+ private final List<String> _rsHosts;
+ private boolean _isConnected;
+ private List<RawRiemannClient> _rclients;
+ private int _batchSize;
+
+ private ScheduledThreadPoolExecutor _pool;
+
+ public RiemannClientWrapper(List<String> rsHosts) {
+ this(rsHosts, 1);
+ }
+
+ public RiemannClientWrapper(List<String> rsHosts, int batchSize) {
+ _rsHosts = rsHosts;
+ Collections.sort(_rsHosts);
+ _batchSize = batchSize > 0 ? batchSize : 1;
+ _isConnected = false;
+ }
+
+ // Returns the pool for this client. Creates the pool on first use
+ private synchronized ScheduledThreadPoolExecutor pool() {
+ if (_pool == null) {
+ _pool = new ScheduledThreadPoolExecutor(1);
+ _pool.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+ _pool.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+ }
+
+ return _pool;
+ }
+
+ @Override
+ public synchronized void connect() throws Exception {
+ if (_isConnected) {
+ LOG.warn("Client already connected");
+ return;
+ }
+
+ _rclients = new ArrayList<RawRiemannClient>();
+ for (String rsHost : _rsHosts) {
+ String[] splits = rsHost.split(":");
+
+ if (splits == null || splits.length != 2) {
+ throw new IllegalArgumentException("Invalid Riemann server: " + rsHost);
+ }
+
+ String host = splits[0];
+ int port = Integer.parseInt(splits[1]);
+
+ RawRiemannClient rclient = new RawRiemannClient(host, port, _batchSize);
+ rclient.connect();
+
+ /**
+ * If any Riemann client doesn't support batch, set it to 1
+ */
+ if (rclient.isConnected() && rclient.getBatchSize() == 1) {
+ _batchSize = 1;
+ }
+ _rclients.add(rclient);
+ }
+
+ _isConnected = true;
+ }
+
+ @Override
+ public synchronized void disconnect() {
+ if (!_isConnected || _rclients == null) {
+ LOG.warn("Client already disconnected");
+ return;
+ }
+
+ for (RawRiemannClient rclient : _rclients) {
+ rclient.disconnect();
+ }
+ _rclients = null;
+ _isConnected = false;
+ }
+
+ /**
+ * Get raw client based on event's sharding key
+ * @param event
+ * @return
+ */
+ private RawRiemannClient client(MonitoringEvent event) {
+ String shardingKey = event.shardingKey();
+ int baseIdx = shardingKey.hashCode() % _rsHosts.size();
+
+ // find the first rclient in CONNECTED state and send
+ for (int i = 0; i < _rsHosts.size(); i++) {
+ int idx = (baseIdx + i) % _rsHosts.size();
+ RawRiemannClient rclient = _rclients.get(idx);
+ if (rclient.isConnected()) {
+ return rclient;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public boolean send(MonitoringEvent event) {
+ if (!_isConnected || _rclients == null) {
+ LOG.warn("Client is not connected. Fail to send event: " + event);
+ return false;
+ }
+
+ RawRiemannClient rclient = client(event);
+ if (rclient != null) {
+ return rclient.send(event);
+ }
+
+ LOG.error("Fail to send event: " + event + ", no rclient is available");
+ return false;
+ }
+
+ @Override
+ public void every(long interval, long delay, TimeUnit unit, Runnable r) {
+ pool().scheduleAtFixedRate(r, delay, interval, unit);
+ }
+
+ @Override
+ public boolean sendAndFlush(MonitoringEvent event) {
+ if (!_isConnected || _rclients == null) {
+ LOG.warn("Client is not connected. Fail to send event: " + event);
+ return false;
+ }
+
+ RawRiemannClient rclient = client(event);
+ if (rclient != null) {
+ boolean success = rclient.send(event);
+ if (success) {
+ return rclient.flush();
+ }
+ }
+ LOG.error("Fail to send event: " + event + ", no rclient is available");
+ return false;
+ }
+
+ @Override
+ public boolean isConnected() {
+ return _isConnected;
+ }
+
+ @Override
+ public boolean isBatchingEnabled() {
+ if (!_isConnected || _rclients == null) {
+ LOG.warn("Client is not connected");
+ return false;
+ }
+
+ /**
+ * Batch should be enabled for all or none raw clients
+ */
+ for (RawRiemannClient rclient : _rclients) {
+ if (rclient.isConnected()) {
+ return rclient.isBatchEnabled();
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Return batch size if connected or 1 otherwise
+ */
+ @Override
+ public int getBatchSize() {
+ if (!_isConnected || _rclients == null) {
+ LOG.warn("Client is not connected");
+ return 1;
+ }
+
+ /**
+ * Batch size should be the same for all raw clients
+ */
+ for (RawRiemannClient rclient : _rclients) {
+ if (rclient.isConnected()) {
+ return rclient.getBatchSize();
+ }
+ }
+
+ return 1;
+ }
+
+ @Override
+ public boolean flush() {
+ if (!_isConnected || _rclients == null) {
+ LOG.warn("Client is not connected");
+ return false;
+ }
+
+ boolean success = true;
+ for (RawRiemannClient rclient : _rclients) {
+ success &= rclient.flush();
+ }
+ return success;
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/c0b1780d/helix-monitor-server/pom.xml
----------------------------------------------------------------------
diff --git a/helix-monitor-server/pom.xml b/helix-monitor-server/pom.xml
index 2cbddfd..041a390 100644
--- a/helix-monitor-server/pom.xml
+++ b/helix-monitor-server/pom.xml
@@ -54,11 +54,6 @@ under the License.
<version>0.2.4</version>
</dependency>
<dependency>
- <groupId>org.eclipse.jetty.aggregate</groupId>
- <artifactId>jetty-all-server</artifactId>
- <version>8.1.14.v20131031</version>
- </dependency>
- <dependency>
<groupId>factual</groupId>
<artifactId>clj-helix</artifactId>
<version>0.1.0</version>
http://git-wip-us.apache.org/repos/asf/helix/blob/c0b1780d/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAgent.java
----------------------------------------------------------------------
diff --git a/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAgent.java b/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAgent.java
deleted file mode 100644
index 61e3d6c..0000000
--- a/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAgent.java
+++ /dev/null
@@ -1,137 +0,0 @@
-package org.apache.helix.monitoring;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.util.Random;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerFactory;
-import org.apache.helix.InstanceType;
-import org.apache.log4j.Logger;
-
-import com.aphyr.riemann.client.RiemannClient;
-
-public class RiemannAgent {
- private static final Logger LOG = Logger.getLogger(RiemannAgent.class);
-
- static final String STATEMODEL_NAME = "OnlineOffline";
-
- final Random _random;
- final String _zkAddr;
- final String _clusterName;
- final String _instanceName;
- final int _riemannPort;
- final HelixManager _participant;
- final RiemannClient _client;
-
- public RiemannAgent(String zkAddr, String clusterName, int riemannPort) throws IOException {
- _random = new Random();
- _zkAddr = zkAddr;
- _clusterName = clusterName;
- _instanceName =
- String.format("%s_%d", InetAddress.getLocalHost().getCanonicalHostName(), riemannPort);
- _riemannPort = riemannPort;
- _participant =
- HelixManagerFactory.getZKHelixManager(clusterName, _instanceName, InstanceType.PARTICIPANT,
- zkAddr);
- _client = RiemannClient.tcp("localhost", riemannPort);
- }
-
- public void start() throws Exception {
- LOG.info("Starting RiemannAgent. zk: " + _zkAddr + ", cluster: " + _clusterName
- + ", instance: " + _instanceName + ", riemannPort: " + _riemannPort);
-
- // Wait until riemann port is connected
- int timeout = 30 * 1000;
- long startT = System.currentTimeMillis();
- while ((System.currentTimeMillis() - startT) < timeout) {
- try {
- _client.connect();
- _client.event().service("heartbeat").state("running").ttl(20).sendWithAck();
- break;
- } catch (IOException e) {
- int sleep = _random.nextInt(3000) + 3000;
- LOG.info("Wait " + sleep + "ms for riemann server to come up");
- TimeUnit.MILLISECONDS.sleep(sleep);
- }
- }
-
- if (!_client.isConnected()) {
- String err =
- "Fail to connect to reimann server on localhost:" + _riemannPort + " in " + timeout
- + "ms";
- LOG.error(err);
- throw new RuntimeException(err);
- }
- LOG.info("RiemannAgent connected to local riemann server on port: " + _riemannPort);
-
- // Start helix participant
- _participant.getStateMachineEngine().registerStateModelFactory(STATEMODEL_NAME,
- new RiemannAgentStateModelFactory());
- _participant.connect();
-
- // Monitor riemann server
- _client.every(10, 0, TimeUnit.SECONDS, new Runnable() {
-
- @Override
- public void run() {
- try {
- // Send heartbeat metrics
- _client.event().service("heartbeat").state("running").ttl(20).sendWithAck();
- } catch (Exception e) {
- LOG.error("Exception in send heatbeat to local riemann server, shutdown RiemannAgent: "
- + _instanceName, e);
- asyncShutdown();
- }
- }
- });
-
- }
-
- /**
- * Do shutdown asynchronously
- */
- void asyncShutdown() {
- new Thread(new Runnable() {
-
- @Override
- public void run() {
- shutdown();
- }
- }).start();
- }
-
- public void shutdown() {
- LOG.info("Shutting down RiemannAgent. zk: " + _zkAddr + ", cluster: " + _clusterName
- + ", instance: " + _instanceName + ", riemannPort: " + _riemannPort);
-
- try {
- _client.scheduler().shutdown();
- _client.disconnect();
- } catch (IOException e) {
- LOG.error("Exception in disconnect riemann client", e);
- }
-
- _participant.disconnect();
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/c0b1780d/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAgentStateModel.java
----------------------------------------------------------------------
diff --git a/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAgentStateModel.java b/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAgentStateModel.java
deleted file mode 100644
index 2e32cef..0000000
--- a/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAgentStateModel.java
+++ /dev/null
@@ -1,54 +0,0 @@
-package org.apache.helix.monitoring;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.NotificationContext;
-import org.apache.helix.model.Message;
-import org.apache.helix.participant.statemachine.StateModel;
-import org.apache.helix.participant.statemachine.StateModelInfo;
-import org.apache.helix.participant.statemachine.Transition;
-import org.apache.log4j.Logger;
-
-@StateModelInfo(initialState = "OFFLINE", states = {
- "DROPPED", "OFFLINE", "ONLINE"
-})
-public class RiemannAgentStateModel extends StateModel {
- private static final Logger LOG = Logger.getLogger(RiemannAgentStateModel.class);
-
- void logTransition(Message message) {
- String toState = message.getToState();
- String fromState = message.getFromState();
- String resourceName = message.getResourceName();
- String partittionName = message.getPartitionName();
-
- LOG.info("Become " + toState + " from " + fromState + " for resource: " + resourceName
- + ", partition: " + partittionName);
- }
-
- @Transition(to = "ONLINE", from = "OFFLINE")
- public void onBecomeOnlineFromOffline(Message message, NotificationContext context) {
- logTransition(message);
- }
-
- @Transition(to = "OFFLINE", from = "ONLINE")
- public void onBecomeOfflineFromOnline(Message message, NotificationContext context) {
- logTransition(message);
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/c0b1780d/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAgentStateModelFactory.java
----------------------------------------------------------------------
diff --git a/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAgentStateModelFactory.java b/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAgentStateModelFactory.java
deleted file mode 100644
index a5865ad..0000000
--- a/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAgentStateModelFactory.java
+++ /dev/null
@@ -1,29 +0,0 @@
-package org.apache.helix.monitoring;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.participant.statemachine.StateModelFactory;
-
-public class RiemannAgentStateModelFactory extends StateModelFactory<RiemannAgentStateModel> {
- @Override
- public RiemannAgentStateModel createNewStateModel(String partitionName) {
- return new RiemannAgentStateModel();
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/c0b1780d/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAlertProxy.java
----------------------------------------------------------------------
diff --git a/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAlertProxy.java b/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAlertProxy.java
deleted file mode 100644
index 5fe8ebe..0000000
--- a/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannAlertProxy.java
+++ /dev/null
@@ -1,111 +0,0 @@
-package org.apache.helix.monitoring;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.StringWriter;
-import java.nio.charset.Charset;
-import java.util.UUID;
-
-import javax.servlet.ServletException;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.helix.BaseDataAccessor;
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.controller.alert.AlertName;
-import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.model.Message;
-import org.apache.helix.model.Message.MessageType;
-import org.apache.log4j.Logger;
-import org.eclipse.jetty.server.Request;
-import org.eclipse.jetty.server.Server;
-import org.eclipse.jetty.server.handler.AbstractHandler;
-
-/**
- * Accept alerts from local riemann server and forward it to helix-controller
- */
-public class RiemannAlertProxy {
- private static final Logger LOG = Logger.getLogger(RiemannAlertProxy.class);
-
- class RiemannAlertProxyHandler extends AbstractHandler {
- @Override
- public void handle(String target, Request baseRequest, HttpServletRequest request,
- HttpServletResponse response) throws IOException, ServletException {
- // Read content-body
- InputStream inputStream = request.getInputStream();
- StringWriter writer = new StringWriter();
- IOUtils.copy(inputStream, writer, Charset.defaultCharset().toString());
- String alertNameStr = writer.toString();
- LOG.info("Handling alert: " + alertNameStr);
-
- // Send alert message to the controller of cluster being monitored
- try {
- AlertName alertName = AlertName.from(alertNameStr);
- String clusterName = alertName.getScope().getClusterId().stringify();
- HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseAccessor);
- PropertyKey.Builder keyBuilder = accessor.keyBuilder();
- Message message = new Message(MessageType.ALERT, UUID.randomUUID().toString());
- message.setAttribute(Message.Attributes.ALERT_NAME, alertNameStr);
- message.setTgtSessionId("*");
- message.setTgtName("controller");
- accessor.setProperty(keyBuilder.controllerMessage(message.getId()), message);
- } catch (Exception e) {
- LOG.error("Fail to send alert to cluster being monitored: " + alertNameStr, e);
- }
-
- // return ok
- response.setStatus(HttpServletResponse.SC_OK);
- baseRequest.setHandled(true);
- }
- }
-
- final int _proxyPort;
- final Server _server;
- final BaseDataAccessor<ZNRecord> _baseAccessor;
- final AbstractHandler _handler;
-
- public RiemannAlertProxy(int proxyPort, BaseDataAccessor<ZNRecord> baseAccessor) {
- _proxyPort = proxyPort;
- _server = new Server(proxyPort);
- _baseAccessor = baseAccessor;
- _handler = new RiemannAlertProxyHandler();
- }
-
- public void start() throws Exception {
- LOG.info("Starting RiemannAlertProxy on port: " + _proxyPort);
- _server.setHandler(_handler);
- _server.start();
-
- }
-
- public void shutdown() {
- try {
- LOG.info("Stopping RiemannAlertProxy on port: " + _proxyPort);
- _server.stop();
- } catch (Exception e) {
- LOG.error("Fail to stop RiemannAlertProxy", e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/c0b1780d/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannConfigs.java
----------------------------------------------------------------------
diff --git a/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannConfigs.java b/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannConfigs.java
deleted file mode 100644
index 193b763..0000000
--- a/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannConfigs.java
+++ /dev/null
@@ -1,116 +0,0 @@
-package org.apache.helix.monitoring;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.PrintWriter;
-import java.util.List;
-
-import org.apache.helix.model.MonitoringConfig;
-import org.apache.log4j.Logger;
-
-import com.google.common.collect.Lists;
-
-/**
- * Riemann configs
- */
-public class RiemannConfigs {
- private static final Logger LOG = Logger.getLogger(RiemannConfigs.class);
- private static final String DEFAULT_CONFIG_DIR = "riemannconfigs";
- public static final String DEFAULT_RIEMANN_CONFIG = "riemann.config";
-
- private final String _configDir;
- private final List<MonitoringConfig> _configs;
-
- RiemannConfigs(String configDir, List<MonitoringConfig> configs) {
- _configDir = configDir;
- _configs = configs;
- }
-
- /**
- * persist configs to riemann config dir
- */
- public void persistConfigs() {
- // create the directory
- File dir = new File(_configDir);
- if (!dir.exists()) {
- dir.mkdir();
- }
-
- for (MonitoringConfig config : _configs) {
- String configData = config.getConfig();
- String fileName = _configDir + "/" + config.getId();
- try {
- PrintWriter writer = new PrintWriter(fileName);
- writer.println(configData);
- writer.close();
-
- // make sure this is cleaned up eventually
- File file = new File(fileName);
- file.deleteOnExit();
- } catch (FileNotFoundException e) {
- LOG.error("Could not write " + config.getId(), e);
- }
- }
- }
-
- public String getConfigDir() {
- return _configDir;
- }
-
- public static class Builder {
- private final List<MonitoringConfig> _configs;
- private final String _configDir;
-
- /**
- * By default, configs will be placed in "{systemTmpDir}/riemannconfigs"
- */
- public Builder() {
- this(System.getProperty("java.io.tmpdir") + "/" + DEFAULT_CONFIG_DIR);
- }
-
- public Builder(String configDir) {
- _configDir = configDir;
- _configs = Lists.newArrayList();
- }
-
- public Builder addConfig(MonitoringConfig monitoringConfig) {
- _configs.add(monitoringConfig);
- return this;
- }
-
- public Builder addConfigs(List<MonitoringConfig> monitoringConfigs) {
- _configs.addAll(monitoringConfigs);
- return this;
- }
-
- public RiemannConfigs build() {
- // Check default riemann config exists
- for (MonitoringConfig config : _configs) {
- if (config.getId().equals(DEFAULT_RIEMANN_CONFIG)) {
- return new RiemannConfigs(_configDir, _configs);
- }
- }
- throw new IllegalArgumentException("Missing default riemann config: "
- + DEFAULT_RIEMANN_CONFIG);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/c0b1780d/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannMonitoringServer.java
----------------------------------------------------------------------
diff --git a/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannMonitoringServer.java b/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannMonitoringServer.java
deleted file mode 100644
index d4f11a5..0000000
--- a/helix-monitor-server/src/main/java/org/apache/helix/monitoring/RiemannMonitoringServer.java
+++ /dev/null
@@ -1,73 +0,0 @@
-package org.apache.helix.monitoring;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.log4j.Logger;
-
-import clojure.lang.RT;
-import clojure.lang.Symbol;
-
-/**
- * A monitoring server implementation that uses Riemann
- */
-public class RiemannMonitoringServer implements MonitoringServer {
- private static final Logger LOG = Logger.getLogger(RiemannMonitoringServer.class);
-
- private volatile boolean _isStarted;
- private final RiemannConfigs _config;
-
- /**
- * Create a monitoring server
- * @param config
- */
- public RiemannMonitoringServer(RiemannConfigs config) {
- LOG.info("Construct RiemannMonitoringServer with configDir: " + config.getConfigDir());
- _config = config;
- config.persistConfigs();
- _isStarted = false;
- }
-
- @Override
- public synchronized void start() {
- LOG.info("Starting Riemann server with configDir: " + _config.getConfigDir());
-
- // start Riemann
- RT.var("clojure.core", "require").invoke(Symbol.intern("riemann.bin"));
- RT.var("clojure.core", "require").invoke(Symbol.intern(RiemannConfigs.DEFAULT_RIEMANN_CONFIG));
- RT.var("riemann.bin", "-main").invoke(_config.getConfigDir());
- _isStarted = true;
- }
-
- @Override
- public synchronized void stop() {
- if (!_isStarted) {
- LOG.error("Tried to stop Riemann when not started!");
- return;
- }
- LOG.info("Stopping Riemann server");
- RT.var("riemann.config", "stop!").invoke();
- _isStarted = false;
- }
-
- @Override
- public boolean isStarted() {
- return _isStarted;
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/c0b1780d/helix-monitor-server/src/main/java/org/apache/helix/monitoring/riemann/HelixAlertMessenger.java
----------------------------------------------------------------------
diff --git a/helix-monitor-server/src/main/java/org/apache/helix/monitoring/riemann/HelixAlertMessenger.java b/helix-monitor-server/src/main/java/org/apache/helix/monitoring/riemann/HelixAlertMessenger.java
new file mode 100644
index 0000000..cbc209a
--- /dev/null
+++ b/helix-monitor-server/src/main/java/org/apache/helix/monitoring/riemann/HelixAlertMessenger.java
@@ -0,0 +1,112 @@
+package org.apache.helix.monitoring.riemann;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.alert.AlertName;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.log4j.Logger;
+
+/**
+ * Accept alerts from local Riemann server and forward it to helix-controller
+ */
+public class HelixAlertMessenger {
+ private static final Logger LOG = Logger.getLogger(HelixAlertMessenger.class);
+ private static final int DEFAULT_MAX_ALERT_COUNT = 1;
+ private static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MINUTES;
+
+ private final ZkClient _zkclient;
+ private final BaseDataAccessor<ZNRecord> _baseAccessor;
+
+ /**
+ * A queue that keeps track of timestamps (in millisecond) of last N alerts sent
+ */
+ private final Queue<Long> _queue;
+ private final int _maxAlertCount;
+ private final TimeUnit _timeUnit;
+
+ public HelixAlertMessenger(String zkHosts) {
+ this(zkHosts, DEFAULT_MAX_ALERT_COUNT, DEFAULT_TIME_UNIT);
+ }
+
+ public HelixAlertMessenger(String zkHosts, int maxAlertCount, TimeUnit timeUnit) {
+ _zkclient =
+ new ZkClient(zkHosts, ZkClient.DEFAULT_SESSION_TIMEOUT,
+ ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
+ _baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkclient);
+ _queue = new LinkedList<Long>();
+ _maxAlertCount = maxAlertCount;
+ _timeUnit = timeUnit;
+ }
+
+ /**
+ * Send alert to helix controller; throttle if necessary
+ * not thread-safe
+ * @param alertNameStr
+ */
+ public void onAlert(String alertNameStr) {
+ LOG.info("Handling alert: " + alertNameStr);
+
+ // throttling
+ long now = System.currentTimeMillis();
+ if (_queue.size() >= _maxAlertCount) {
+ if (_queue.peek() + _timeUnit.toMillis(_maxAlertCount) > now) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Throttling alert: " + alertNameStr);
+ }
+ return;
+ } else {
+ _queue.remove();
+ }
+ }
+
+ // Send alert message to the controller of cluster being monitored
+ try {
+ AlertName alertName = AlertName.from(alertNameStr);
+ String clusterName = alertName.getScope().getClusterId().stringify();
+ HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseAccessor);
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ Message message = new Message(MessageType.ALERT, UUID.randomUUID().toString());
+ message.setAttribute(Message.Attributes.ALERT_NAME, alertNameStr);
+ message.setTgtSessionId("*");
+ message.setTgtName("controller");
+ accessor.setProperty(keyBuilder.controllerMessage(message.getId()), message);
+
+ // record the timestamp
+ _queue.add(now);
+
+ } catch (Exception e) {
+ LOG.error("Fail to send alert to cluster being monitored: " + alertNameStr, e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/c0b1780d/helix-monitor-server/src/main/java/org/apache/helix/monitoring/riemann/RiemannAgent.java
----------------------------------------------------------------------
diff --git a/helix-monitor-server/src/main/java/org/apache/helix/monitoring/riemann/RiemannAgent.java b/helix-monitor-server/src/main/java/org/apache/helix/monitoring/riemann/RiemannAgent.java
new file mode 100644
index 0000000..3e7efca
--- /dev/null
+++ b/helix-monitor-server/src/main/java/org/apache/helix/monitoring/riemann/RiemannAgent.java
@@ -0,0 +1,169 @@
+package org.apache.helix.monitoring.riemann;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.log4j.Logger;
+
+import com.aphyr.riemann.client.RiemannClient;
+
+/**
+ * Start a Helix participant that joins cluster and represents local Riemann server
+ */
+public class RiemannAgent {
+ private static final Logger LOG = Logger.getLogger(RiemannAgent.class);
+ private static final int HEARTBEAT_PERIOD = 10;
+ private static final int TIMEOUT_LIMIT = 3;
+
+ private final String _zkAddr;
+ private final String _clusterName;
+ private final String _instanceName;
+ private final int _riemannPort;
+ private HelixManager _participant;
+ private final RiemannClient _client;
+ private Thread _reconnectThread;
+
+ public RiemannAgent(String zkAddr, String clusterName, int riemannPort) throws IOException {
+ _zkAddr = zkAddr;
+ _clusterName = clusterName;
+ _instanceName =
+ String.format("%s_%d", InetAddress.getLocalHost().getCanonicalHostName(), riemannPort);
+ _riemannPort = riemannPort;
+ _client = RiemannClient.tcp("localhost", riemannPort);
+ }
+
+ private synchronized boolean doStart() throws Exception {
+ try {
+ _client.connect();
+ _client.event().service("heartbeat").state("running").ttl(TIMEOUT_LIMIT * HEARTBEAT_PERIOD)
+ .sendWithAck();
+ LOG.info("RiemannAgent connected to local riemann server on localhost:" + _riemannPort);
+ _participant =
+ HelixManagerFactory.getZKHelixManager(_clusterName, _instanceName,
+ InstanceType.PARTICIPANT, _zkAddr);
+ _participant.connect();
+
+ // Monitoring Riemann server
+ Random random = new Random();
+ _client.every(HEARTBEAT_PERIOD, random.nextInt(HEARTBEAT_PERIOD), TimeUnit.SECONDS,
+ new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ // Send heartbeat metrics
+ _client.event().service("heartbeat").state("running")
+ .ttl(TIMEOUT_LIMIT * HEARTBEAT_PERIOD).sendWithAck();
+ if (_participant == null) {
+ _participant =
+ HelixManagerFactory.getZKHelixManager(_clusterName, _instanceName,
+ InstanceType.PARTICIPANT, _zkAddr);
+ _participant.connect();
+ }
+ } catch (Exception e) {
+ LOG.error(
+ "Exception in send heatbeat to local riemann server, shutdown RiemannAgent: "
+ + _instanceName, e);
+
+ if (_participant != null) {
+ _participant.disconnect();
+ _participant = null;
+ }
+ }
+ }
+ });
+
+ return true;
+ } catch (IOException e) {
+ LOG.error("Fail to connect to Riemann server on localhost:" + _riemannPort);
+ }
+ return false;
+ }
+
+ /**
+ * Try connect local Riemann server; if fails, start a thread to retry async
+ * @throws Exception
+ */
+ public synchronized void start() throws Exception {
+ LOG.info("Starting RiemannAgent. zk: " + _zkAddr + ", cluster: " + _clusterName
+ + ", instance: " + _instanceName + ", riemannPort: " + _riemannPort);
+
+ boolean success = doStart();
+ if (!success) {
+ _reconnectThread = new Thread(new Runnable() {
+
+ @Override
+ public void run() {
+ LOG.info("Start reconnect thread");
+ Random random = new Random();
+ try {
+ while (!Thread.currentThread().isInterrupted()) {
+ boolean success = doStart();
+ if (success) {
+ break;
+ }
+
+ TimeUnit.SECONDS.sleep(HEARTBEAT_PERIOD + random.nextInt() % HEARTBEAT_PERIOD);
+
+ }
+ } catch (InterruptedException e) {
+ LOG.info("Reconnect thread is interrupted");
+ } catch (Exception e) {
+ LOG.error("Fail to start RiemannAgent", e);
+ } finally {
+ LOG.info("Terminate reconnect thread");
+ }
+
+ }
+ });
+ _reconnectThread.start();
+ }
+
+ }
+
+ public synchronized void shutdown() {
+ LOG.info("Shutting down RiemannAgent. zk: " + _zkAddr + ", cluster: " + _clusterName
+ + ", instance: " + _instanceName + ", riemannPort: " + _riemannPort);
+
+ if (_reconnectThread != null) {
+ _reconnectThread.interrupt();
+ _reconnectThread = null;
+ }
+
+ try {
+ _client.scheduler().shutdown();
+ _client.disconnect();
+ } catch (IOException e) {
+ LOG.error("Exception in disconnect riemann client", e);
+ }
+
+ if (_participant != null) {
+ _participant.disconnect();
+ _participant = null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/c0b1780d/helix-monitor-server/src/main/java/org/apache/helix/monitoring/riemann/RiemannConfigs.java
----------------------------------------------------------------------
diff --git a/helix-monitor-server/src/main/java/org/apache/helix/monitoring/riemann/RiemannConfigs.java b/helix-monitor-server/src/main/java/org/apache/helix/monitoring/riemann/RiemannConfigs.java
new file mode 100644
index 0000000..43b957e
--- /dev/null
+++ b/helix-monitor-server/src/main/java/org/apache/helix/monitoring/riemann/RiemannConfigs.java
@@ -0,0 +1,116 @@
+package org.apache.helix.monitoring.riemann;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.PrintWriter;
+import java.util.List;
+
+import org.apache.helix.model.MonitoringConfig;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Riemann configs
+ */
+public class RiemannConfigs {
+ private static final Logger LOG = Logger.getLogger(RiemannConfigs.class);
+ public static final String DEFAULT_CONFIG_DIR = "riemannconfigs";
+ public static final String DEFAULT_RIEMANN_CONFIG = "riemann.config";
+
+ private final String _configDir;
+ private final List<MonitoringConfig> _configs;
+
+ RiemannConfigs(String configDir, List<MonitoringConfig> configs) {
+ _configDir = configDir;
+ _configs = configs;
+ }
+
+ /**
+ * persist configs to riemann config dir
+ */
+ public void persistConfigs() {
+ // create the directory
+ File dir = new File(_configDir);
+ if (!dir.exists()) {
+ dir.mkdir();
+ }
+
+ for (MonitoringConfig config : _configs) {
+ String configData = config.getConfig();
+ String fileName = _configDir + "/" + config.getId();
+ try {
+ PrintWriter writer = new PrintWriter(fileName);
+ writer.println(configData);
+ writer.close();
+
+ // make sure this is cleaned up eventually
+ File file = new File(fileName);
+ file.deleteOnExit();
+ } catch (FileNotFoundException e) {
+ LOG.error("Could not write " + config.getId(), e);
+ }
+ }
+ }
+
+ public String getConfigDir() {
+ return _configDir;
+ }
+
+ public static class Builder {
+ private final List<MonitoringConfig> _configs;
+ private final String _configDir;
+
+ /**
+ * By default, configs will be placed in "{systemTmpDir}/riemannconfigs"
+ */
+ public Builder() {
+ this(System.getProperty("java.io.tmpdir") + "/" + DEFAULT_CONFIG_DIR);
+ }
+
+ public Builder(String configDir) {
+ _configDir = configDir;
+ _configs = Lists.newArrayList();
+ }
+
+ public Builder addConfig(MonitoringConfig monitoringConfig) {
+ _configs.add(monitoringConfig);
+ return this;
+ }
+
+ public Builder addConfigs(List<MonitoringConfig> monitoringConfigs) {
+ _configs.addAll(monitoringConfigs);
+ return this;
+ }
+
+ public RiemannConfigs build() {
+ // Check default riemann config exists
+ for (MonitoringConfig config : _configs) {
+ if (config.getId().equals(DEFAULT_RIEMANN_CONFIG)) {
+ return new RiemannConfigs(_configDir, _configs);
+ }
+ }
+ throw new IllegalArgumentException("Missing default riemann config: "
+ + DEFAULT_RIEMANN_CONFIG);
+ }
+ }
+}