You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by aj...@apache.org on 2015/10/01 14:13:19 UTC

falcon git commit: FALCON-1476 Maintaining threshold on monitoring entities for SLA service. Contributed by Ajay Yadava.

Repository: falcon
Updated Branches:
  refs/heads/master 4783a19c4 -> 9872ce8e7


FALCON-1476 Maintaining threshold on monitoring entities for SLA service. Contributed by Ajay Yadava.


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/9872ce8e
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/9872ce8e
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/9872ce8e

Branch: refs/heads/master
Commit: 9872ce8e793f0d16ade52cb06e95bef50339e25a
Parents: 4783a19
Author: Ajay Yadava <aj...@gmail.com>
Authored: Thu Oct 1 16:22:30 2015 +0530
Committer: Ajay Yadava <aj...@gmail.com>
Committed: Thu Oct 1 16:22:30 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 common/src/main/resources/startup.properties    |  4 ++
 .../service/FeedSLAMonitoringService.java       | 62 +++++++++++++++-----
 .../falcon/service/FeedSLAMonitoringTest.java   |  4 +-
 src/conf/startup.properties                     |  4 ++
 5 files changed, 61 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/9872ce8e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b2104f4..7cc3efb 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -15,6 +15,8 @@ Trunk (Unreleased)
     FALCON-1027 Falcon proxy user support(Sowmya Ramesh)
 
   IMPROVEMENTS
+    FALCON-1476 Maintaining threshold on monitoring entities for SLA service(Ajay Yadava)
+
     FALCON-592 Refactor FalconCLI to make it more manageable(Balu Vellanki via Ajay Yadava)
 
     FALCON-1472 Improvements in SLA service(Ajay Yadava)

http://git-wip-us.apache.org/repos/asf/falcon/blob/9872ce8e/common/src/main/resources/startup.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties
index e853c5a..1da7d23 100644
--- a/common/src/main/resources/startup.properties
+++ b/common/src/main/resources/startup.properties
@@ -104,6 +104,10 @@ it.workflow.execution.listeners=org.apache.falcon.catalog.CatalogPartitionHandle
 # frequency of serialization for the state of FeedSLAMonitoringService - 1 hour
 *.feed.sla.serialization.frequency.millis=3600000
 
+# Maximum number of pending instances per feed that will be recorded. After this older instances will be removed in
+# a FIFO fashion.
+*.feed.sla.queue.size=288
+
 # Do not change unless really sure
 # Frequency in seconds of "status check" for pending feed instances, default is 10 mins = 10 * 60
 *.feed.sla.statusCheck.frequency.seconds=600

http://git-wip-us.apache.org/repos/asf/falcon/blob/9872ce8e/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java b/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java
index 193aa64..b4e0427 100644
--- a/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java
+++ b/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java
@@ -52,7 +52,9 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
@@ -76,6 +78,8 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
         return SERVICE;
     }
 
+    private int queueSize;
+
     /**
      * Permissions for storePath.
      */
@@ -91,7 +95,7 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
      * Map<Pair<feedName, clusterName>, Set<instanceTime> to store
      * each missing instance of a feed.
      */
-    private Map<Pair<String, String>, Set<Date>> pendingInstances;
+    private Map<Pair<String, String>, BlockingQueue<Date>> pendingInstances;
 
 
     /**
@@ -206,6 +210,9 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
         freq = StartupProperties.get().getProperty("feed.sla.lookAheadWindow.millis", "900000");
         lookAheadWindowMillis = Integer.valueOf(freq);
 
+        String size = StartupProperties.get().getProperty("feed.sla.queue.size", "288");
+        queueSize = Integer.valueOf(size);
+
         try {
             if (fileSystem.exists(filePath)) {
                 deserialize(filePath);
@@ -270,23 +277,31 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
     void addNewPendingFeedInstances(Date from, Date to) throws FalconException {
         Set<String> currentClusters = DeploymentUtil.getCurrentClusters();
         for (String feedName : monitoredFeeds) {
-
             Feed feed = EntityUtil.getEntity(EntityType.FEED, feedName);
             for (Cluster feedCluster : feed.getClusters().getClusters()) {
                 if (currentClusters.contains(feedCluster.getName())) {
                     Date nextInstanceTime = from;
                     Pair<String, String> key = new Pair<>(feed.getName(), feedCluster.getName());
-                    Set<Date> instances = pendingInstances.get(key);
+                    BlockingQueue<Date> instances = pendingInstances.get(key);
                     if (instances == null) {
-                        instances = new HashSet<>();
+                        instances = new LinkedBlockingQueue<>(queueSize);
                     }
-
+                    Set<Date> exists = new HashSet<>(instances);
                     org.apache.falcon.entity.v0.cluster.Cluster currentCluster =
                             EntityUtil.getEntity(EntityType.CLUSTER, feedCluster.getName());
+                    nextInstanceTime = EntityUtil.getNextStartTime(feed, currentCluster, nextInstanceTime);
                     while (nextInstanceTime.before(to)) {
-                        nextInstanceTime = EntityUtil.getNextStartTime(feed, currentCluster, nextInstanceTime);
-                        instances.add(nextInstanceTime);
+                        if (instances.size() >= queueSize) { // if no space, first make some space
+                            LOG.debug("Removing instance={} for <feed,cluster>={}", instances.peek(), key);
+                            exists.remove(instances.peek());
+                            instances.remove();
+                        }
+                        LOG.debug("Adding instance={} for <feed,cluster>={}", nextInstanceTime, key);
+                        if (exists.add(nextInstanceTime)) {
+                            instances.add(nextInstanceTime);
+                        }
                         nextInstanceTime = new Date(nextInstanceTime.getTime() + ONE_MS);
+                        nextInstanceTime = EntityUtil.getNextStartTime(feed, currentCluster, nextInstanceTime);
                     }
                     pendingInstances.put(key, instances);
                 }
@@ -299,7 +314,7 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
      * Checks the availability of all the pendingInstances and removes the ones which have become available.
      */
     private void checkPendingInstanceAvailability() throws FalconException {
-        for (Map.Entry<Pair<String, String>, Set<Date>> entry: pendingInstances.entrySet()) {
+        for (Map.Entry<Pair<String, String>, BlockingQueue<Date>> entry: pendingInstances.entrySet()) {
             for (Date date : entry.getValue()) {
                 boolean status = checkFeedInstanceAvailability(entry.getKey().first, entry.getKey().second, date);
                 if (status) {
@@ -358,7 +373,26 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
     private void deserialize(Path path) throws FalconException {
         try {
             Map<String, Object> state = deserializeInternal(path);
-            pendingInstances = (ConcurrentHashMap<Pair<String, String>, Set<Date>>) state.get("pendingInstances");
+            pendingInstances = new ConcurrentHashMap<>();
+            Map<Pair<String, String>, BlockingQueue<Date>> pendingInstancesCopy =
+                    (Map<Pair<String, String>, BlockingQueue<Date>>) state.get("pendingInstances");
+            // queue size can change during restarts, hence copy
+            for (Map.Entry<Pair<String, String>, BlockingQueue<Date>> entry : pendingInstancesCopy.entrySet()) {
+                BlockingQueue<Date> value = new LinkedBlockingQueue<>(queueSize);
+                BlockingQueue<Date> oldValue = entry.getValue();
+                LOG.debug("Number of old instances:{}, new queue size:{}", oldValue.size(), queueSize);
+                while (!oldValue.isEmpty()) {
+                    Date instance = oldValue.remove();
+                    if (value.size() == queueSize) { // if full
+                        LOG.debug("Deserialization: Removing value={} for <feed,cluster>={}", value.peek(),
+                            entry.getKey());
+                        value.remove();
+                    }
+                    LOG.debug("Deserialization Adding: key={} to <feed,cluster>={}", entry.getKey(), instance);
+                    value.add(instance);
+                }
+                pendingInstances.put(entry.getKey(), value);
+            }
             lastCheckedAt = new Date((Long) state.get("lastCheckedAt"));
             lastSerializedAt = new Date((Long) state.get("lastSerializedAt"));
             monitoredFeeds = new ConcurrentHashSet<>(); // will be populated on the onLoad of entities.
@@ -397,13 +431,13 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
      * Start time and end time are both inclusive.
      * @param start start time, inclusive
      * @param end end time, inclusive
-     * @return
+     * @return Set of pending feed instances belonging to the given range which have missed SLA
      * @throws FalconException
      */
     public Set<SchedulableEntityInstance> getFeedSLAMissPendingAlerts(Date start, Date end)
         throws FalconException {
         Set<SchedulableEntityInstance> result = new HashSet<>();
-        for (Map.Entry<Pair<String, String>, Set<Date>> feedInstances : pendingInstances.entrySet()) {
+        for (Map.Entry<Pair<String, String>, BlockingQueue<Date>> feedInstances : pendingInstances.entrySet()) {
             Pair<String, String> feedClusterPair = feedInstances.getKey();
             Feed feed = EntityUtil.getEntity(EntityType.FEED, feedClusterPair.first);
             Cluster cluster = FeedHelper.getCluster(feed, feedClusterPair.second);
@@ -428,7 +462,7 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
      * @param clusterName cluster name
      * @param start start time, inclusive
      * @param end end time, inclusive
-     * @return
+     * @return Pending feed instances of the given feed which belong to the given time range and have missed SLA.
      * @throws FalconException
      */
     public Set<SchedulableEntityInstance> getFeedSLAMissPendingAlerts(String feedName, String clusterName,
@@ -436,7 +470,7 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
 
         Set<SchedulableEntityInstance> result = new HashSet<>();
         Pair<String, String> feedClusterPair = new Pair<>(feedName, clusterName);
-        Set<Date> missingInstances = pendingInstances.get(feedClusterPair);
+        BlockingQueue<Date> missingInstances = pendingInstances.get(feedClusterPair);
         Feed feed = EntityUtil.getEntity(EntityType.FEED, feedName);
         Cluster cluster = FeedHelper.getCluster(feed, feedClusterPair.second);
         Sla sla = FeedHelper.getSLA(cluster, feed);
@@ -452,7 +486,7 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
         return result;
     }
 
-    Set<Pair<Date, String>> getSLAStatus(Sla sla, Date start, Date end, Set<Date> missingInstances)
+    Set<Pair<Date, String>> getSLAStatus(Sla sla, Date start, Date end, BlockingQueue<Date> missingInstances)
         throws FalconException {
         String tagCritical = "Missed SLA High";
         String tagWarn = "Missed SLA Low";

http://git-wip-us.apache.org/repos/asf/falcon/blob/9872ce8e/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java b/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java
index bc03cb5..ca55d01 100644
--- a/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java
+++ b/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java
@@ -32,6 +32,8 @@ import org.testng.annotations.Test;
 import java.util.Date;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 
 /**
  * Tests for FeedSLAMonitoring Service.
@@ -48,7 +50,7 @@ public class FeedSLAMonitoringTest {
         Date start = SchemaHelper.parseDateUTC("2014-05-05T00:00Z");
         Date end = SchemaHelper.parseDateUTC("2015-05-05T00:00Z");
 
-        Set<Date> missingInstances = new HashSet<>();
+        BlockingQueue<Date> missingInstances = new LinkedBlockingQueue<>();
         missingInstances.add(SchemaHelper.parseDateUTC("2013-05-05T00:00Z")); // before start time
         missingInstances.add(SchemaHelper.parseDateUTC("2014-05-05T00:00Z")); // equal to start time
         missingInstances.add(SchemaHelper.parseDateUTC("2014-05-06T00:00Z")); // in between

http://git-wip-us.apache.org/repos/asf/falcon/blob/9872ce8e/src/conf/startup.properties
----------------------------------------------------------------------
diff --git a/src/conf/startup.properties b/src/conf/startup.properties
index 37cb044..ee1f141 100644
--- a/src/conf/startup.properties
+++ b/src/conf/startup.properties
@@ -102,6 +102,10 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\
 # frequency of serialization for the state of FeedSLAMonitoringService - 1 hour
 *.feed.sla.serialization.frequency.millis=3600000
 
+# Maximum number of pending instances per feed that will be recorded. After this older instances will be removed in
+# a FIFO fashion.
+*.feed.sla.queue.size=288
+
 # Do not change unless really sure
 # Frequency in seconds of "status check" for pending feed instances, default is 10 mins = 10 * 60
 *.feed.sla.statusCheck.frequency.seconds=600