You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by aj...@apache.org on 2015/10/01 14:13:19 UTC
falcon git commit: FALCON-1476 Maintaining threshold on monitoring
entities for SLA service. Contributed by Ajay Yadava.
Repository: falcon
Updated Branches:
refs/heads/master 4783a19c4 -> 9872ce8e7
FALCON-1476 Maintaining threshold on monitoring entities for SLA service. Contributed by Ajay Yadava.
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/9872ce8e
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/9872ce8e
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/9872ce8e
Branch: refs/heads/master
Commit: 9872ce8e793f0d16ade52cb06e95bef50339e25a
Parents: 4783a19
Author: Ajay Yadava <aj...@gmail.com>
Authored: Thu Oct 1 16:22:30 2015 +0530
Committer: Ajay Yadava <aj...@gmail.com>
Committed: Thu Oct 1 16:22:30 2015 +0530
----------------------------------------------------------------------
CHANGES.txt | 2 +
common/src/main/resources/startup.properties | 4 ++
.../service/FeedSLAMonitoringService.java | 62 +++++++++++++++-----
.../falcon/service/FeedSLAMonitoringTest.java | 4 +-
src/conf/startup.properties | 4 ++
5 files changed, 61 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/9872ce8e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b2104f4..7cc3efb 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -15,6 +15,8 @@ Trunk (Unreleased)
FALCON-1027 Falcon proxy user support(Sowmya Ramesh)
IMPROVEMENTS
+ FALCON-1476 Maintaining threshold on monitoring entities for SLA service(Ajay Yadava)
+
FALCON-592 Refactor FalconCLI to make it more manageable(Balu Vellanki via Ajay Yadava)
FALCON-1472 Improvements in SLA service(Ajay Yadava)
http://git-wip-us.apache.org/repos/asf/falcon/blob/9872ce8e/common/src/main/resources/startup.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties
index e853c5a..1da7d23 100644
--- a/common/src/main/resources/startup.properties
+++ b/common/src/main/resources/startup.properties
@@ -104,6 +104,10 @@ it.workflow.execution.listeners=org.apache.falcon.catalog.CatalogPartitionHandle
# frequency of serialization for the state of FeedSLAMonitoringService - 1 hour
*.feed.sla.serialization.frequency.millis=3600000
+# Maximum number of pending instances per feed that will be recorded. After this older instances will be removed in
+# a FIFO fashion.
+*.feed.sla.queue.size=288
+
# Do not change unless really sure
# Frequency in seconds of "status check" for pending feed instances, default is 10 mins = 10 * 60
*.feed.sla.statusCheck.frequency.seconds=600
http://git-wip-us.apache.org/repos/asf/falcon/blob/9872ce8e/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java b/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java
index 193aa64..b4e0427 100644
--- a/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java
+++ b/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java
@@ -52,7 +52,9 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -76,6 +78,8 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
return SERVICE;
}
+ private int queueSize;
+
/**
* Permissions for storePath.
*/
@@ -91,7 +95,7 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
* Map<Pair<feedName, clusterName>, Set<instanceTime> to store
* each missing instance of a feed.
*/
- private Map<Pair<String, String>, Set<Date>> pendingInstances;
+ private Map<Pair<String, String>, BlockingQueue<Date>> pendingInstances;
/**
@@ -206,6 +210,9 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
freq = StartupProperties.get().getProperty("feed.sla.lookAheadWindow.millis", "900000");
lookAheadWindowMillis = Integer.valueOf(freq);
+ String size = StartupProperties.get().getProperty("feed.sla.queue.size", "288");
+ queueSize = Integer.valueOf(size);
+
try {
if (fileSystem.exists(filePath)) {
deserialize(filePath);
@@ -270,23 +277,31 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
void addNewPendingFeedInstances(Date from, Date to) throws FalconException {
Set<String> currentClusters = DeploymentUtil.getCurrentClusters();
for (String feedName : monitoredFeeds) {
-
Feed feed = EntityUtil.getEntity(EntityType.FEED, feedName);
for (Cluster feedCluster : feed.getClusters().getClusters()) {
if (currentClusters.contains(feedCluster.getName())) {
Date nextInstanceTime = from;
Pair<String, String> key = new Pair<>(feed.getName(), feedCluster.getName());
- Set<Date> instances = pendingInstances.get(key);
+ BlockingQueue<Date> instances = pendingInstances.get(key);
if (instances == null) {
- instances = new HashSet<>();
+ instances = new LinkedBlockingQueue<>(queueSize);
}
-
+ Set<Date> exists = new HashSet<>(instances);
org.apache.falcon.entity.v0.cluster.Cluster currentCluster =
EntityUtil.getEntity(EntityType.CLUSTER, feedCluster.getName());
+ nextInstanceTime = EntityUtil.getNextStartTime(feed, currentCluster, nextInstanceTime);
while (nextInstanceTime.before(to)) {
- nextInstanceTime = EntityUtil.getNextStartTime(feed, currentCluster, nextInstanceTime);
- instances.add(nextInstanceTime);
+ if (instances.size() >= queueSize) { // if no space, first make some space
+ LOG.debug("Removing instance={} for <feed,cluster>={}", instances.peek(), key);
+ exists.remove(instances.peek());
+ instances.remove();
+ }
+ LOG.debug("Adding instance={} for <feed,cluster>={}", nextInstanceTime, key);
+ if (exists.add(nextInstanceTime)) {
+ instances.add(nextInstanceTime);
+ }
nextInstanceTime = new Date(nextInstanceTime.getTime() + ONE_MS);
+ nextInstanceTime = EntityUtil.getNextStartTime(feed, currentCluster, nextInstanceTime);
}
pendingInstances.put(key, instances);
}
@@ -299,7 +314,7 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
* Checks the availability of all the pendingInstances and removes the ones which have become available.
*/
private void checkPendingInstanceAvailability() throws FalconException {
- for (Map.Entry<Pair<String, String>, Set<Date>> entry: pendingInstances.entrySet()) {
+ for (Map.Entry<Pair<String, String>, BlockingQueue<Date>> entry: pendingInstances.entrySet()) {
for (Date date : entry.getValue()) {
boolean status = checkFeedInstanceAvailability(entry.getKey().first, entry.getKey().second, date);
if (status) {
@@ -358,7 +373,26 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
private void deserialize(Path path) throws FalconException {
try {
Map<String, Object> state = deserializeInternal(path);
- pendingInstances = (ConcurrentHashMap<Pair<String, String>, Set<Date>>) state.get("pendingInstances");
+ pendingInstances = new ConcurrentHashMap<>();
+ Map<Pair<String, String>, BlockingQueue<Date>> pendingInstancesCopy =
+ (Map<Pair<String, String>, BlockingQueue<Date>>) state.get("pendingInstances");
+ // queue size can change during restarts, hence copy
+ for (Map.Entry<Pair<String, String>, BlockingQueue<Date>> entry : pendingInstancesCopy.entrySet()) {
+ BlockingQueue<Date> value = new LinkedBlockingQueue<>(queueSize);
+ BlockingQueue<Date> oldValue = entry.getValue();
+ LOG.debug("Number of old instances:{}, new queue size:{}", oldValue.size(), queueSize);
+ while (!oldValue.isEmpty()) {
+ Date instance = oldValue.remove();
+ if (value.size() == queueSize) { // if full
+ LOG.debug("Deserialization: Removing value={} for <feed,cluster>={}", value.peek(),
+ entry.getKey());
+ value.remove();
+ }
+ LOG.debug("Deserialization Adding: key={} to <feed,cluster>={}", entry.getKey(), instance);
+ value.add(instance);
+ }
+ pendingInstances.put(entry.getKey(), value);
+ }
lastCheckedAt = new Date((Long) state.get("lastCheckedAt"));
lastSerializedAt = new Date((Long) state.get("lastSerializedAt"));
monitoredFeeds = new ConcurrentHashSet<>(); // will be populated on the onLoad of entities.
@@ -397,13 +431,13 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
* Start time and end time are both inclusive.
* @param start start time, inclusive
* @param end end time, inclusive
- * @return
+ * @return Set of pending feed instances belonging to the given range which have missed SLA
* @throws FalconException
*/
public Set<SchedulableEntityInstance> getFeedSLAMissPendingAlerts(Date start, Date end)
throws FalconException {
Set<SchedulableEntityInstance> result = new HashSet<>();
- for (Map.Entry<Pair<String, String>, Set<Date>> feedInstances : pendingInstances.entrySet()) {
+ for (Map.Entry<Pair<String, String>, BlockingQueue<Date>> feedInstances : pendingInstances.entrySet()) {
Pair<String, String> feedClusterPair = feedInstances.getKey();
Feed feed = EntityUtil.getEntity(EntityType.FEED, feedClusterPair.first);
Cluster cluster = FeedHelper.getCluster(feed, feedClusterPair.second);
@@ -428,7 +462,7 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
* @param clusterName cluster name
* @param start start time, inclusive
* @param end end time, inclusive
- * @return
+ * @return Pending feed instances of the given feed which belong to the given time range and have missed SLA.
* @throws FalconException
*/
public Set<SchedulableEntityInstance> getFeedSLAMissPendingAlerts(String feedName, String clusterName,
@@ -436,7 +470,7 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
Set<SchedulableEntityInstance> result = new HashSet<>();
Pair<String, String> feedClusterPair = new Pair<>(feedName, clusterName);
- Set<Date> missingInstances = pendingInstances.get(feedClusterPair);
+ BlockingQueue<Date> missingInstances = pendingInstances.get(feedClusterPair);
Feed feed = EntityUtil.getEntity(EntityType.FEED, feedName);
Cluster cluster = FeedHelper.getCluster(feed, feedClusterPair.second);
Sla sla = FeedHelper.getSLA(cluster, feed);
@@ -452,7 +486,7 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
return result;
}
- Set<Pair<Date, String>> getSLAStatus(Sla sla, Date start, Date end, Set<Date> missingInstances)
+ Set<Pair<Date, String>> getSLAStatus(Sla sla, Date start, Date end, BlockingQueue<Date> missingInstances)
throws FalconException {
String tagCritical = "Missed SLA High";
String tagWarn = "Missed SLA Low";
http://git-wip-us.apache.org/repos/asf/falcon/blob/9872ce8e/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java b/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java
index bc03cb5..ca55d01 100644
--- a/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java
+++ b/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java
@@ -32,6 +32,8 @@ import org.testng.annotations.Test;
import java.util.Date;
import java.util.HashSet;
import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
/**
* Tests for FeedSLAMonitoring Service.
@@ -48,7 +50,7 @@ public class FeedSLAMonitoringTest {
Date start = SchemaHelper.parseDateUTC("2014-05-05T00:00Z");
Date end = SchemaHelper.parseDateUTC("2015-05-05T00:00Z");
- Set<Date> missingInstances = new HashSet<>();
+ BlockingQueue<Date> missingInstances = new LinkedBlockingQueue<>();
missingInstances.add(SchemaHelper.parseDateUTC("2013-05-05T00:00Z")); // before start time
missingInstances.add(SchemaHelper.parseDateUTC("2014-05-05T00:00Z")); // equal to start time
missingInstances.add(SchemaHelper.parseDateUTC("2014-05-06T00:00Z")); // in between
http://git-wip-us.apache.org/repos/asf/falcon/blob/9872ce8e/src/conf/startup.properties
----------------------------------------------------------------------
diff --git a/src/conf/startup.properties b/src/conf/startup.properties
index 37cb044..ee1f141 100644
--- a/src/conf/startup.properties
+++ b/src/conf/startup.properties
@@ -102,6 +102,10 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\
# frequency of serialization for the state of FeedSLAMonitoringService - 1 hour
*.feed.sla.serialization.frequency.millis=3600000
+# Maximum number of pending instances per feed that will be recorded. After this older instances will be removed in
+# a FIFO fashion.
+*.feed.sla.queue.size=288
+
# Do not change unless really sure
# Frequency in seconds of "status check" for pending feed instances, default is 10 mins = 10 * 60
*.feed.sla.statusCheck.frequency.seconds=600