You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by aj...@apache.org on 2016/07/06 03:21:22 UTC

[1/3] falcon git commit: FALCON-2052 Process SLA monitoring

Repository: falcon
Updated Branches:
  refs/heads/master bd32b610e -> 60e2f68b8


http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/src/build/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/src/build/findbugs-exclude.xml b/src/build/findbugs-exclude.xml
index 62720e3..5c35b8c 100644
--- a/src/build/findbugs-exclude.xml
+++ b/src/build/findbugs-exclude.xml
@@ -53,13 +53,13 @@
     </Match>
 
     <Match>
-        <Class name="org.apache.falcon.persistence.FeedSLAAlertBean" />
+        <Class name="org.apache.falcon.persistence.EntitySLAAlertBean" />
         <Bug pattern="UWF_UNWRITTEN_FIELD,NP_BOOLEAN_RETURN_NULL" />
     </Match>
 
 
     <Match>
-        <Class name="org.apache.falcon.persistence.MonitoredFeedsBean" />
+        <Class name="org.apache.falcon.persistence.MonitoredEntityBean" />
         <Bug pattern="NP_BOOLEAN_RETURN_NULL,UWF_UNWRITTEN_FIELD" />
     </Match>
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/src/conf/startup.properties
----------------------------------------------------------------------
diff --git a/src/conf/startup.properties b/src/conf/startup.properties
index 5ac3d5c..78c7e1e 100644
--- a/src/conf/startup.properties
+++ b/src/conf/startup.properties
@@ -54,7 +54,7 @@
 
 ##For feed SLA monitoring enable these two
 #                        org.apache.falcon.service.FalconJPAService,\
-#                        org.apache.falcon.service.FeedSLAMonitoringService,\
+#                        org.apache.falcon.service.EntitySLAMonitoringService,\
 
 ##Add if you want to send data to graphite
 #                        org.apache.falcon.metrics.MetricNotificationService\
@@ -64,7 +64,7 @@
 #*.application.services=org.apache.falcon.security.AuthenticationInitializationService,\
 #                        org.apache.falcon.workflow.WorkflowJobEndNotificationService, \
 #                        org.apache.falcon.service.ProcessSubscriberService,\
-#                        org.apache.falcon.service.FeedSLAMonitoringService,\
+#                        org.apache.falcon.service.EntitySLAMonitoringService,\
 #                        org.apache.falcon.service.LifecyclePolicyMap,\
 #                        org.apache.falcon.service.FalconJPAService,\
 #                        org.apache.falcon.entity.store.ConfigurationStore,\
@@ -96,7 +96,7 @@ prism.application.services=org.apache.falcon.service.LifecyclePolicyMap,\
                         org.apache.falcon.entity.ColoClusterRelation,\
                         org.apache.falcon.group.FeedGroupMap,\
                         org.apache.falcon.entity.store.FeedLocationStore,\
-                        org.apache.falcon.service.FeedSLAMonitoringService,\
+                        org.apache.falcon.service.EntitySLAMonitoringService,\
                         org.apache.falcon.service.SharedLibraryHostingService
 ## If you wish to use Falcon native scheduler, add the State store as a configstore listener. ##
 #                       org.apache.falcon.state.store.jdbc.JDBCStateStore


[2/3] falcon git commit: FALCON-2052 Process SLA monitoring

Posted by aj...@apache.org.
http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java b/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java
new file mode 100644
index 0000000..f931625
--- /dev/null
+++ b/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java
@@ -0,0 +1,644 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.service;
+
+import java.text.ParseException;
+import java.util.HashSet;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.Pair;
+import org.apache.falcon.entity.FeedInstanceStatus;
+import org.apache.falcon.entity.ClusterHelper;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.ProcessHelper;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.entity.v0.feed.Cluster;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.Sla;
+import org.apache.falcon.entity.v0.process.Clusters;
+import org.apache.falcon.expression.ExpressionHelper;
+import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.falcon.jdbc.MonitoringJdbcStateStore;
+import org.apache.falcon.persistence.MonitoredEntityBean;
+import org.apache.falcon.persistence.PendingInstanceBean;
+import org.apache.falcon.resource.APIResult;
+import org.apache.falcon.resource.InstancesResult;
+import org.apache.falcon.resource.SchedulableEntityInstance;
+import org.apache.falcon.util.DateUtil;
+import org.apache.falcon.util.DeploymentUtil;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.falcon.workflow.WorkflowEngineFactory;
+import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.falcon.entity.v0.process.Process;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Service to monitor Feed SLAs.
+ */
+public final class EntitySLAMonitoringService implements ConfigurationChangeListener, FalconService {
+    private static final Logger LOG = LoggerFactory.getLogger("FeedSLA");
+
+    private static final MonitoringJdbcStateStore MONITORING_JDBC_STATE_STORE = new MonitoringJdbcStateStore();
+
+    private static final int ONE_MS = 1;
+
+    private static final EntitySLAMonitoringService SERVICE = new EntitySLAMonitoringService();
+
+    public static final String TAG_CRITICAL = "Missed-SLA-High";
+    public static final String TAG_WARN = "Missed-SLA-Low";
+
+    private EntitySLAMonitoringService() {
+
+    }
+
+    public static EntitySLAMonitoringService get() {
+        return SERVICE;
+    }
+
+    /**
+     * Permissions for storePath.
+     */
+    private static final FsPermission STORE_PERMISSION = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
+
+
+    /**
+     * Frequency in seconds of "status check" for pending feed instances.
+     */
+    private int statusCheckFrequencySeconds; // 10 minutes
+
+
+    /**
+     * Time Duration (in milliseconds) in future for generating pending feed instances.
+     *
+     * In every cycle pending feed instances are added for monitoring, till this time in future.
+     */
+    private int lookAheadWindowMillis; // 15 MINUTES
+
+
+    /**
+     * Filesystem used for serializing and deserializing.
+     */
+    private FileSystem fileSystem;
+
+    /**
+     * Working directory for the feed sla monitoring service.
+     */
+    private Path storePath;
+
+    /**
+     * Path to store the state of the monitoring service.
+     */
+    private Path filePath;
+
+    @Override
+    public void onAdd(Entity entity) throws FalconException {
+        Set<String> currentClusters = DeploymentUtil.getCurrentClusters();
+        if (entity.getEntityType() == EntityType.FEED) {
+            Feed feed = (Feed) entity;
+            // currently sla service is enabled only for fileSystemStorage
+            if (feed.getLocations() != null || feed.getSla() != null || checkFeedClusterSLA(feed)) {
+                for (Cluster cluster : feed.getClusters().getClusters()) {
+                    if (currentClusters.contains(cluster.getName())) {
+                        if (FeedHelper.getSLA(cluster, feed) != null) {
+                            LOG.debug("Adding feed:{} for monitoring", feed.getName());
+                            MONITORING_JDBC_STATE_STORE.putMonitoredEntity(feed.getName(), EntityType.FEED.toString());
+                            break;
+                        }
+                    }
+                }
+            }
+        }
+        if (entity.getEntityType() == EntityType.PROCESS){
+            Process process = (Process) entity;
+            if (process.getSla() != null || checkProcessClusterSLA(process)){
+                for (org.apache.falcon.entity.v0.process.Cluster  cluster : process.getClusters().getClusters()) {
+                    if (currentClusters.contains(cluster.getName())) {
+                        LOG.debug("Adding process:{} for monitoring", process.getName());
+                        MONITORING_JDBC_STATE_STORE.putMonitoredEntity(process.getName(),
+                                EntityType.PROCESS.toString());
+                        break;
+                    }
+                }
+            }
+        }
+    }
+
+    public Boolean checkFeedClusterSLA(Feed feed){
+        for(Cluster  cluster : feed.getClusters().getClusters()){
+            Sla sla =  FeedHelper.getSLA(cluster, feed);
+            if (sla != null){
+                return true;
+            }
+        }
+        return false;
+    }
+
+
+    public Boolean checkProcessClusterSLA(Process process){
+        Clusters clusters = process.getClusters();
+        for(org.apache.falcon.entity.v0.process.Cluster  cluster : clusters.getClusters()){
+            org.apache.falcon.entity.v0.process.Sla sla =  ProcessHelper.getSLA(cluster, process);
+            if (sla != null){
+                return true;
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public void onRemove(Entity entity) throws FalconException {
+        Set<String> currentClusters = DeploymentUtil.getCurrentClusters();
+        if (entity.getEntityType() == EntityType.FEED) {
+            Feed feed = (Feed) entity;
+            // currently sla service is enabled only for fileSystemStorage
+            if (feed.getLocations() != null) {
+                for (Cluster cluster : feed.getClusters().getClusters()) {
+                    if (currentClusters.contains(cluster.getName()) && FeedHelper.getSLA(cluster, feed) != null) {
+                        MONITORING_JDBC_STATE_STORE.deleteMonitoringEntity(feed.getName(), EntityType.FEED.toString());
+                        MONITORING_JDBC_STATE_STORE.deletePendingInstances(feed.getName(), cluster.getName(),
+                                EntityType.FEED.toString());
+                    }
+                }
+            }
+        }
+        if (entity.getEntityType() == EntityType.PROCESS){
+            Process process = (Process) entity;
+            if (process.getSla() != null){
+                for (org.apache.falcon.entity.v0.process.Cluster  cluster : process.getClusters().getClusters()) {
+                    if (currentClusters.contains(cluster.getName())) {
+                        MONITORING_JDBC_STATE_STORE.deleteMonitoringEntity(process.getName(),
+                                EntityType.PROCESS.toString());
+                        MONITORING_JDBC_STATE_STORE.deletePendingInstances(process.getName(), cluster.getName(),
+                                EntityType.PROCESS.toString());
+                    }
+                }
+            }
+        }
+    }
+
+    private boolean isSLAMonitoringEnabledInCurrentColo(Feed feed) {
+        if (feed.getLocations() != null) {
+            Set<String> currentClusters = DeploymentUtil.getCurrentClusters();
+            for (Cluster cluster : feed.getClusters().getClusters()) {
+                if (currentClusters.contains(cluster.getName()) && FeedHelper.getSLA(cluster, feed) != null) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
+    private boolean isSLAMonitoringEnabledInCurrentColo(Process process) {
+
+        Set<String> currentClusters = DeploymentUtil.getCurrentClusters();
+        for (org.apache.falcon.entity.v0.process.Cluster  cluster : process.getClusters().getClusters()) {
+            if (currentClusters.contains(cluster.getName()) && ProcessHelper.getSLA(cluster, process) != null) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public void onChange(Entity oldEntity, Entity newEntity) throws FalconException {
+        if (newEntity.getEntityType() == EntityType.FEED) {
+            Feed oldFeed = (Feed) oldEntity;
+            Feed newFeed = (Feed) newEntity;
+            if (!isSLAMonitoringEnabledInCurrentColo(newFeed)) {
+                onRemove(oldFeed);
+            } else if (!isSLAMonitoringEnabledInCurrentColo(oldFeed)) {
+                onAdd(newFeed);
+            } else {
+                List<String> slaRemovedClusters = new ArrayList<>();
+                for (String oldCluster : EntityUtil.getClustersDefinedInColos(oldFeed)) {
+                    if (FeedHelper.getSLA(oldCluster, oldFeed) != null
+                        && FeedHelper.getSLA(oldCluster, newFeed) == null) {
+                        slaRemovedClusters.add(oldCluster);
+                    }
+                }
+                updatePendingInstances(newFeed.getName(), slaRemovedClusters, EntityType.FEED.toString());
+            }
+        }
+        if (newEntity.getEntityType() == EntityType.PROCESS) {
+            Process oldProcess = (Process) oldEntity;
+            Process newProcess = (Process) newEntity;
+            if (!isSLAMonitoringEnabledInCurrentColo(oldProcess)){
+                onRemove(newProcess);
+            } else if (!isSLAMonitoringEnabledInCurrentColo(newProcess)){
+                onAdd(newProcess);
+            } else {
+                List<String> slaRemovedClusters = new ArrayList<>();
+                for (String oldCluster : EntityUtil.getClustersDefined(oldProcess)){
+                    if (ProcessHelper.getSLA(oldCluster, oldProcess) != null
+                            && ProcessHelper.getSLA(oldCluster, newProcess) == null){
+                        slaRemovedClusters.add(oldCluster);
+                    }
+                }
+                updatePendingInstances(newProcess.getName(), slaRemovedClusters, EntityType.PROCESS.toString());
+            }
+        }
+    }
+
+    void updatePendingInstances(String entityName, List<String> slaRemovedClusters , String entityType){
+        for(String clusterName :slaRemovedClusters){
+            MONITORING_JDBC_STATE_STORE.deletePendingInstances(entityName, clusterName,
+                    entityType);
+        }
+    }
+
+    @Override
+    public void onReload(Entity entity) throws FalconException {
+        onAdd(entity);
+    }
+
+    @Override
+    public String getName() {
+        return EntitySLAMonitoringService.class.getSimpleName();
+    }
+
+    @Override
+    public void init() throws FalconException {
+        String uri = StartupProperties.get().getProperty("feed.sla.service.store.uri");
+        storePath = new Path(uri);
+        filePath = new Path(storePath, "feedSLAMonitoringService");
+        fileSystem = initializeFileSystem();
+
+        String freq = StartupProperties.get().getProperty("feed.sla.statusCheck.frequency.seconds", "600");
+        statusCheckFrequencySeconds = Integer.parseInt(freq);
+
+        freq = StartupProperties.get().getProperty("feed.sla.lookAheadWindow.millis", "900000");
+        lookAheadWindowMillis = Integer.parseInt(freq);
+        LOG.debug("No old state exists at: {}, Initializing a clean state.", filePath.toString());
+        ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
+        executor.scheduleWithFixedDelay(new Monitor(), 0, statusCheckFrequencySeconds, TimeUnit.SECONDS);
+    }
+
+    public void makeFeedInstanceAvailable(String feedName, String clusterName, Date nominalTime)
+        throws FalconException {
+        LOG.info("Removing {} feed's instance {} in cluster {} from pendingSLA", feedName,
+                clusterName, nominalTime);
+        List<Date> instances = (MONITORING_JDBC_STATE_STORE.getNominalInstances(feedName, clusterName,
+                EntityType.FEED.toString()));
+        // Slas for feeds not having sla tag are not stored.
+        if (CollectionUtils.isEmpty(instances)){
+            MONITORING_JDBC_STATE_STORE.deletePendingInstance(feedName, clusterName, nominalTime,
+                    EntityType.FEED.toString());
+        }
+    }
+
+    private FileSystem initializeFileSystem() {
+        try {
+            fileSystem = HadoopClientFactory.get().createFalconFileSystem(storePath.toUri());
+            if (!fileSystem.exists(storePath)) {
+                LOG.info("Creating directory for pending feed instances: {}", storePath);
+                // set permissions so config store dir is owned by falcon alone
+                HadoopClientFactory.mkdirs(fileSystem, storePath, STORE_PERMISSION);
+            }
+            return fileSystem;
+        } catch (Exception e) {
+            throw new RuntimeException("Unable to bring up feed sla store for path: " + storePath, e);
+        }
+    }
+
+    @Override
+    public void destroy() throws FalconException {
+    }
+
+    //Periodically update status of pending instances, add new instances and take backup.
+    private class Monitor implements Runnable {
+
+        @Override
+        public void run() {
+            try {
+                if (MONITORING_JDBC_STATE_STORE.getAllMonitoredFeed().size() > 0) {
+                    checkPendingInstanceAvailability(EntityType.FEED.toString());
+                    checkPendingInstanceAvailability(EntityType.PROCESS.toString());
+
+                    // add Instances from last checked time to 10 minutes from now(some buffer for status check)
+                    Date now = new Date();
+                    Date newCheckPoint = new Date(now.getTime() + lookAheadWindowMillis);
+                    addNewPendingFeedInstances(newCheckPoint, EntityType.FEED.toString());
+                    addNewPendingFeedInstances(newCheckPoint, EntityType.PROCESS.toString());
+                }
+            } catch (Throwable e) {
+                LOG.error("Feed SLA monitoring failed: ", e);
+            }
+        }
+    }
+
+
+    void addNewPendingFeedInstances(Date to, String entityType) throws FalconException {
+        Set<String> currentClusters = DeploymentUtil.getCurrentClusters();
+        List<MonitoredEntityBean> feedsBeanList = MONITORING_JDBC_STATE_STORE.getAllMonitoredFeed();
+        for(MonitoredEntityBean monitoredEntityBean : feedsBeanList) {
+            String entityName = monitoredEntityBean.getFeedName();
+            Entity entity = EntityUtil.getEntity(entityType, entityName);
+            Set<String> clusters =  EntityUtil.getClustersDefined(entity);
+            List<org.apache.falcon.entity.v0.cluster.Cluster> cluster = new ArrayList();
+            for(String string : clusters){
+                cluster.add(ClusterHelper.getCluster(string));
+            }
+            for (org.apache.falcon.entity.v0.cluster.Cluster feedCluster : cluster) {
+                if (currentClusters.contains(feedCluster.getName())) {
+                    // get start of instances from the database
+                    Date nextInstanceTime = MONITORING_JDBC_STATE_STORE.getLastInstanceTime(entityName,
+                            EntityType.FEED.toString());
+                    Pair<String, String> key = new Pair<>(entity.getName(), feedCluster.getName());
+                    if (nextInstanceTime == null) {
+                        nextInstanceTime = getInitialStartTime(entity, feedCluster.getName(), entityType);
+                    } else {
+                        nextInstanceTime = new Date(nextInstanceTime.getTime() + ONE_MS);
+                    }
+
+                    Set<Date> instances = new HashSet<>();
+                    org.apache.falcon.entity.v0.cluster.Cluster currentCluster =
+                            EntityUtil.getEntity(EntityType.CLUSTER, feedCluster.getName());
+                    nextInstanceTime = EntityUtil.getNextStartTime(entity, currentCluster, nextInstanceTime);
+                    Date endDate;
+                    if (entityType.equals(EntityType.FEED.toString())){
+                        endDate =  FeedHelper.getClusterValidity((Feed) entity, currentCluster.getName()).getEnd();
+                    }else {
+                        endDate =  ProcessHelper.getClusterValidity((Process) entity,
+                                currentCluster.getName()).getEnd();
+                    }
+                    while (nextInstanceTime.before(to) && nextInstanceTime.before(endDate)) {
+                        LOG.debug("Adding instance={} for <feed,cluster>={}", nextInstanceTime, key);
+                        instances.add(nextInstanceTime);
+                        nextInstanceTime = new Date(nextInstanceTime.getTime() + ONE_MS);
+                        nextInstanceTime = EntityUtil.getNextStartTime(entity, currentCluster, nextInstanceTime);
+                    }
+
+                    for(Date date:instances){
+                        MONITORING_JDBC_STATE_STORE.putPendingInstances(entity.getName(), feedCluster.getName(), date,
+                                entityType);
+                    }
+                }
+            }
+        }
+    }
+
+
+    /**
+     * Checks the availability of all the pendingInstances and removes the ones which have become available.
+     */
+    private void checkPendingInstanceAvailability(String entityType) throws FalconException {
+        for(PendingInstanceBean pendingInstanceBean : MONITORING_JDBC_STATE_STORE.getAllInstances()){
+            for (Date date : MONITORING_JDBC_STATE_STORE.getNominalInstances(pendingInstanceBean.getEntityName(),
+                    pendingInstanceBean.getClusterName(), entityType)) {
+                boolean status = checkEntityInstanceAvailability(pendingInstanceBean.getEntityName(),
+                        pendingInstanceBean.getClusterName(), date, entityType);
+                if (status) {
+                    MONITORING_JDBC_STATE_STORE.deletePendingInstance(pendingInstanceBean.getEntityName(),
+                            pendingInstanceBean.getClusterName(), date, EntityType.FEED.toString());
+                }
+            }
+        }
+    }
+
+    // checks whether a given feed instance is available or not
+    private boolean checkEntityInstanceAvailability(String entityName, String clusterName, Date nominalTime,
+                                                    String entityType) throws
+        FalconException {
+        Entity entity = EntityUtil.getEntity(entityType, entityName);
+
+        try {
+            if (entityType.equals(EntityType.PROCESS.toString())){
+                LOG.debug("Checking instance availability status for entity:{}, cluster:{}, "
+                        + "instanceTime:{}", entity.getName(), clusterName, nominalTime, entityType);
+                AbstractWorkflowEngine wfEngine = WorkflowEngineFactory.getWorkflowEngine();
+                InstancesResult instancesResult = wfEngine.getStatus(entity, nominalTime, nominalTime, null, null);
+                if (instancesResult.getStatus().equals(APIResult.Status.SUCCEEDED)){
+                    LOG.debug("Entity instance(feed:{}, cluster:{}, instanceTime:{}) is available.", entity.getName(),
+                            clusterName, nominalTime);
+                    return true;
+                }
+                return false;
+            }
+            if (entityType.equals(EntityType.FEED.toString())){
+                LOG.debug("Checking instance availability status for feed:{}, cluster:{}, instanceTime:{}",
+                        entity.getName(), clusterName, nominalTime);
+
+                FeedInstanceStatus.AvailabilityStatus status = FeedHelper.getFeedInstanceStatus((Feed) entity,
+                        clusterName, nominalTime);
+                if (status.equals(FeedInstanceStatus.AvailabilityStatus.AVAILABLE)
+                        || status.equals(FeedInstanceStatus.AvailabilityStatus.EMPTY)) {
+                    LOG.debug("Feed instance(feed:{}, cluster:{}, instanceTime:{}) is available.", entity.getName(),
+                            clusterName, nominalTime);
+                    return true;
+                }
+            }
+        } catch (Throwable e) {
+            LOG.error("Couldn't find status for Entity:{}, cluster:{}, entityType{}", entityName, clusterName,
+                    entityType, e);
+        }
+        LOG.debug("Feed instance(feed:{}, cluster:{}, instanceTime:{}) is not available.", entity.getName(),
+            clusterName, nominalTime);
+        return false;
+    }
+
+
+    /**
+     * Returns all {@link org.apache.falcon.entity.v0.feed.Feed} instances between given time range which have missed
+     * slaLow or slaHigh.
+     *
+     * Only feeds which have defined sla in their definition are considered.
+     * Only the feed instances between the given time range are considered.
+     * Start time and end time are both inclusive.
+     * @param start start time, inclusive
+     * @param end end time, inclusive
+     * @return Set of pending feed instances belonging to the given range which have missed SLA
+     * @throws FalconException
+     */
+    public Set<SchedulableEntityInstance> getEntitySLAMissPendingAlerts(Date start, Date end)
+        throws FalconException {
+        Set<SchedulableEntityInstance> result = new HashSet<>();
+        for(PendingInstanceBean pendingInstanceBean : MONITORING_JDBC_STATE_STORE.getAllInstances()){
+            Pair<String, String> feedClusterPair = new Pair<>(pendingInstanceBean.getEntityName(),
+                    pendingInstanceBean.getClusterName());
+            Feed feed = EntityUtil.getEntity(EntityType.FEED, feedClusterPair.first);
+            Cluster cluster = FeedHelper.getCluster(feed, feedClusterPair.second);
+            Sla sla = FeedHelper.getSLA(cluster, feed);
+            if (sla != null) {
+                Set<Pair<Date, String>> slaStatus = getFeedSLAStatus(sla, start, end,
+                    MONITORING_JDBC_STATE_STORE.getNominalInstances(pendingInstanceBean.getEntityName(),
+                        pendingInstanceBean.getClusterName(), EntityType.FEED.toString()));
+                for (Pair<Date, String> status : slaStatus) {
+                    SchedulableEntityInstance instance = new SchedulableEntityInstance(feedClusterPair.first,
+                        feedClusterPair.second, status.first, EntityType.FEED);
+                    instance.setTags(status.second);
+                    result.add(instance);
+                }
+            }
+        }
+        return result;
+    }
+
+    /**
+     * Returns all {@link org.apache.falcon.entity.v0.feed.Feed} instances of a given feed between the given time range
+     * which missed sla.Only those instances are included which have missed either slaLow or slaHigh.
+     * @param entityName name of the feed
+     * @param clusterName cluster name
+     * @param start start time, inclusive
+     * @param end end time, inclusive
+     * @return Pending feed instances of the given feed which belong to the given time range and have missed SLA.
+     * @throws FalconException
+     */
+    public Set<SchedulableEntityInstance> getEntitySLAMissPendingAlerts(String entityName, String clusterName,
+                                          Date start, Date end, String entityType) throws FalconException {
+
+        Set<SchedulableEntityInstance> result = new HashSet<>();
+        List<Date> missingInstances = MONITORING_JDBC_STATE_STORE.getNominalInstances(entityName, clusterName,
+                entityType);
+        if (missingInstances == null || !Arrays.asList(EntityType.FEED.toString(),
+                EntityType.PROCESS.toString()).contains(entityType)){
+            return result;
+        }
+        Entity entity = EntityUtil.getEntity(entityType, entityName);
+
+        if (entityType.equals(EntityType.FEED.toString())) {
+            Sla sla = FeedHelper.getSLA(clusterName, (Feed) entity);
+
+            if (sla != null) {
+                Set<Pair<Date, String>> slaStatus = getFeedSLAStatus(sla, start, end, missingInstances);
+                for (Pair<Date, String> status : slaStatus){
+                    SchedulableEntityInstance instance = new SchedulableEntityInstance(entityName, clusterName,
+                                status.first, EntityType.FEED);
+                    instance.setTags(status.second);
+                    result.add(instance);
+                }
+            }
+            return result;
+        } else {
+            org.apache.falcon.entity.v0.process.Sla sla = ProcessHelper.getSLA(clusterName, (Process) entity);
+            if (sla != null){
+                Set<Pair<Date, String>> slaStatus = getProcessSLAStatus(sla, start, end, missingInstances);
+                for (Pair<Date, String> status : slaStatus){
+                    SchedulableEntityInstance instance = new SchedulableEntityInstance(entityName, clusterName,
+                            status.first, EntityType.PROCESS);
+                    instance.setTags(status.second);
+                    result.add(instance);
+                }
+            }
+        }
+        return result;
+    }
+
+    Set<Pair<Date, String>> getFeedSLAStatus(Sla sla, Date start, Date end, List<Date> missingInstances)
+        throws FalconException {
+        Date now = new Date();
+        Frequency slaLow = sla.getSlaLow();
+        Frequency slaHigh = sla.getSlaHigh();
+        Set<Pair<Date, String>> result = new HashSet<>();
+        for (Date nominalTime : missingInstances) {
+            if (!nominalTime.before(start) && !nominalTime.after(end)) {
+                ExpressionHelper.setReferenceDate(nominalTime);
+                ExpressionHelper evaluator = ExpressionHelper.get();
+                Long slaHighDuration = evaluator.evaluate(slaHigh.toString(), Long.class);
+                Long slaLowDuration = evaluator.evaluate(slaLow.toString(), Long.class);
+                Date slaCriticalTime = new Date(nominalTime.getTime() + slaHighDuration);
+                Date slaWarnTime = new Date(nominalTime.getTime() + slaLowDuration);
+                if (slaCriticalTime.before(now)) {
+                    result.add(new Pair<>(nominalTime, TAG_CRITICAL));
+                } else if (slaWarnTime.before(now)) {
+                    result.add(new Pair<>(nominalTime, TAG_WARN));
+                }
+            }
+        }
+        return result;
+    }
+
+    Set<Pair<Date, String>> getProcessSLAStatus(org.apache.falcon.entity.v0.process.Sla sla, Date start,
+                                                Date end, List<Date> missingInstances) throws FalconException {
+        Date now = new Date();
+        Frequency slaHigh = sla.getShouldEndIn();
+        Set<Pair<Date, String>> result = new HashSet<>();
+        for (Date nominalTime : missingInstances) {
+            if (!nominalTime.before(start) && !nominalTime.after(end)) {
+                ExpressionHelper.setReferenceDate(nominalTime);
+                ExpressionHelper evaluator = ExpressionHelper.get();
+                Long slaHighDuration = evaluator.evaluate(slaHigh.toString(), Long.class);
+                Date slaCriticalTime = new Date(nominalTime.getTime() + slaHighDuration);
+                if (slaCriticalTime.before(now)) {
+                    result.add(new Pair<>(nominalTime, TAG_CRITICAL));
+                }
+            }
+        }
+        return result;
+    }
+
+    @VisibleForTesting
+    Date getInitialStartTime(Entity entity, String clusterName, String entityType) throws FalconException {
+        if (entityType.equals(EntityType.PROCESS.toString())){
+            Sla sla = FeedHelper.getSLA(clusterName, (Feed) entity);
+            if (sla == null) {
+                throw new IllegalStateException("InitialStartTime can not be determined as the feed: "
+                    + entity.getName() + " and cluster: " + clusterName + " does not have any sla");
+            }
+            Date startTime = FeedHelper.getFeedValidityStart((Feed) entity, clusterName);
+            Frequency slaLow = sla.getSlaLow();
+            Date slaTime = new Date(DateUtil.now().getTime() - DateUtil.getFrequencyInMillis(slaLow));
+            return startTime.before(slaTime) ? startTime : slaTime;
+        } else{
+            org.apache.falcon.entity.v0.process.Sla sla = ProcessHelper.getSLA(clusterName, (Process)entity);
+            if (sla == null) {
+                throw new IllegalStateException("InitialStartTime can not be determined as the feed: "
+                        + entity.getName() + " and cluster: " + clusterName + " does not have any sla");
+            }
+            Date startTime = ProcessHelper.getProcessValidityStart((Process) entity, clusterName);
+            Frequency slaLow = sla.getShouldEndIn();
+            Date slaTime = new Date(DateUtil.now().getTime() - DateUtil.getFrequencyInMillis(slaLow));
+            return startTime.before(slaTime) ? startTime : slaTime;
+        }
+    }
+
+    public void makeProcessInstanceAvailable(String clusterName, String entityName, String date, String entityType) {
+        Date nominalTime = null;
+        try {
+            nominalTime = DateUtil.parseDateFalconTZ(date);
+        }catch (ParseException e){
+            LOG.error("Exception while translating the date:", e);
+        }
+        if (nominalTime!= null){
+            List<Date> instances = (MONITORING_JDBC_STATE_STORE.getNominalInstances(entityName, clusterName,
+                entityType));
+            if (!CollectionUtils.isEmpty(instances)){
+                MONITORING_JDBC_STATE_STORE.deletePendingInstance(entityName, clusterName, nominalTime,
+                    entityType);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/prism/src/main/java/org/apache/falcon/service/FeedSLAAlertService.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/service/FeedSLAAlertService.java b/prism/src/main/java/org/apache/falcon/service/FeedSLAAlertService.java
deleted file mode 100644
index c09c7ae..0000000
--- a/prism/src/main/java/org/apache/falcon/service/FeedSLAAlertService.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.falcon.service;
-
-
-import java.util.Date;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.ArrayList;
-
-import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.FeedHelper;
-import org.apache.falcon.entity.store.ConfigurationStore;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.feed.Cluster;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.jdbc.MonitoringJdbcStateStore;
-import org.apache.falcon.persistence.PendingInstanceBean;
-import org.apache.falcon.resource.SchedulableEntityInstance;
-import org.apache.falcon.util.ReflectionUtils;
-import org.apache.falcon.util.StartupProperties;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
-  * Service to know which all feeds have missed SLA.
-  */
-public final class FeedSLAAlertService implements FalconService, EntitySLAListener {
-
-    private static final String NAME = "FeedSLAAlertService";
-
-    private static final Logger LOG = LoggerFactory.getLogger(FeedSLAAlertService.class);
-
-    private MonitoringJdbcStateStore store = new MonitoringJdbcStateStore();
-
-    private Set<EntitySLAListener> listeners = new LinkedHashSet<EntitySLAListener>();
-
-    private static final FeedSLAAlertService SERVICE = new FeedSLAAlertService();
-
-    public static FeedSLAAlertService get() {
-        return SERVICE;
-    }
-
-    private FeedSLAAlertService(){}
-
-
-    @Override
-    public String getName() {
-        return NAME;
-    }
-
-    @Override
-    public void init() throws FalconException {
-        String listenerClassNames = StartupProperties.get().
-                getProperty("feedAlert.listeners");
-        for (String listenerClassName : listenerClassNames.split(",")) {
-            listenerClassName = listenerClassName.trim();
-            if (listenerClassName.isEmpty()) {
-                continue;
-            }
-            EntitySLAListener listener = ReflectionUtils.getInstanceByClassName(listenerClassName);
-            registerListener(listener);
-        }
-
-        String freq = StartupProperties.get().getProperty("feed.sla.statusCheck.frequency.seconds", "600");
-        int statusCheckFrequencySeconds = Integer.parseInt(freq);
-
-        ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
-        executor.scheduleWithFixedDelay(new Monitor(), 0, statusCheckFrequencySeconds + 10, TimeUnit.SECONDS);
-    }
-
-    public void registerListener(EntitySLAListener listener) {
-        listeners.add(listener);
-    }
-
-    @Override
-    public void destroy() throws FalconException {
-
-    }
-
-
-    private class Monitor implements Runnable {
-
-        @Override
-        public void run() {
-            processSLACandidates();
-        }
-    }
-
-    void processSLACandidates(){
-        //Get all feeds instances to be monitored
-        List<PendingInstanceBean> pendingInstanceBeanList = store.getAllInstances();
-        if (pendingInstanceBeanList.isEmpty()){
-            return;
-        }
-
-        LOG.debug("In processSLACandidates :" + pendingInstanceBeanList.size());
-        try{
-            for (PendingInstanceBean pendingInstanceBean : pendingInstanceBeanList) {
-
-                String feedName = pendingInstanceBean.getFeedName();
-                String clusterName = pendingInstanceBean.getClusterName();
-                Date nominalTime = pendingInstanceBean.getNominalTime();
-                Feed feed = ConfigurationStore.get().get(EntityType.FEED, feedName);
-
-                Cluster cluster =  FeedHelper.getCluster(feed, clusterName);
-
-                Set<SchedulableEntityInstance> schedulableEntityInstances= FeedSLAMonitoringService.get().
-                        getFeedSLAMissPendingAlerts(feed.getName(), cluster.getName(), nominalTime, nominalTime);
-                if (schedulableEntityInstances.isEmpty()){
-                    store.deleteFeedAlertInstance(feed.getName(), cluster.getName(), nominalTime);
-                    return;
-                }
-                List<SchedulableEntityInstance> schedulableEntityList = new ArrayList<>(schedulableEntityInstances);
-                SchedulableEntityInstance schedulableEntityInstance = schedulableEntityList.get(0);
-
-
-                if (schedulableEntityInstance.getTags().contains(FeedSLAMonitoringService.get().TAG_WARN)) {
-                    store.putSLAAlertInstance(feedName, clusterName, nominalTime, true, false);
-                    //Mark in DB as SLA missed
-                    LOG.info("Feed :"+ feedName
-                                + "Cluster:" + clusterName + "Nominal Time:" + nominalTime + "missed SLALow");
-                } else if (schedulableEntityInstance.getTags().contains(FeedSLAMonitoringService.get().TAG_CRITICAL)){
-                    store.updateSLAAlertInstance(feedName, clusterName, nominalTime);
-                    LOG.info("Feed :"+ feedName
-                            + "Cluster:" + clusterName + "Nominal Time:" + nominalTime + "missed SLAHigh");
-                    highSLAMissed(feedName, EntityType.FEED, clusterName, nominalTime);
-                }
-            }
-        } catch (FalconException e){
-            LOG.error("Exception in FeedSLAALertService:", e);
-        }
-
-    }
-
-    @Override
-    public void highSLAMissed(String feedName, EntityType entityType, String clusterName, Date nominalTime)
-        throws FalconException{
-        for (EntitySLAListener listener : listeners) {
-            listener.highSLAMissed(feedName, entityType, clusterName, nominalTime);
-        }
-        store.deleteFeedAlertInstance(feedName, clusterName, nominalTime);
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java b/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java
deleted file mode 100644
index ed7bb08..0000000
--- a/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java
+++ /dev/null
@@ -1,450 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.falcon.service;
-
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.Pair;
-import org.apache.falcon.entity.EntityUtil;
-import org.apache.falcon.entity.FeedHelper;
-import org.apache.falcon.entity.FeedInstanceStatus;
-import org.apache.falcon.entity.v0.Entity;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.Frequency;
-import org.apache.falcon.entity.v0.feed.Cluster;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.feed.Sla;
-import org.apache.falcon.expression.ExpressionHelper;
-import org.apache.falcon.hadoop.HadoopClientFactory;
-import org.apache.falcon.jdbc.MonitoringJdbcStateStore;
-import org.apache.falcon.persistence.MonitoredFeedsBean;
-import org.apache.falcon.persistence.PendingInstanceBean;
-import org.apache.falcon.resource.SchedulableEntityInstance;
-import org.apache.falcon.util.DateUtil;
-import org.apache.falcon.util.DeploymentUtil;
-import org.apache.falcon.util.StartupProperties;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.annotations.VisibleForTesting;
-
-/**
- * Service to monitor Feed SLAs.
- */
-public final class FeedSLAMonitoringService implements ConfigurationChangeListener, FalconService {
-    private static final Logger LOG = LoggerFactory.getLogger("FeedSLA");
-
-    private static final MonitoringJdbcStateStore MONITORING_JDBC_STATE_STORE = new MonitoringJdbcStateStore();
-
-    private static final int ONE_MS = 1;
-
-    private static final FeedSLAMonitoringService SERVICE = new FeedSLAMonitoringService();
-
-    public static final String TAG_CRITICAL = "Missed-SLA-High";
-    public static final String TAG_WARN = "Missed-SLA-Low";
-
-    private FeedSLAMonitoringService() {
-
-    }
-
-    public static FeedSLAMonitoringService get() {
-        return SERVICE;
-    }
-
-    /**
-     * Permissions for storePath.
-     */
-    private static final FsPermission STORE_PERMISSION = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
-
-
-    /**
-     * Frequency in seconds of "status check" for pending feed instances.
-     */
-    private int statusCheckFrequencySeconds; // 10 minutes
-
-
-    /**
-     * Time Duration (in milliseconds) in future for generating pending feed instances.
-     *
-     * In every cycle pending feed instances are added for monitoring, till this time in future.
-     */
-    private int lookAheadWindowMillis; // 15 MINUTES
-
-
-    /**
-     * Filesystem used for serializing and deserializing.
-     */
-    private FileSystem fileSystem;
-
-    /**
-     * Working directory for the feed sla monitoring service.
-     */
-    private Path storePath;
-
-    /**
-     * Path to store the state of the monitoring service.
-     */
-    private Path filePath;
-
-    @Override
-    public void onAdd(Entity entity) throws FalconException {
-        if (entity.getEntityType() == EntityType.FEED) {
-            Feed feed = (Feed) entity;
-            // currently sla service is enabled only for fileSystemStorage
-            if (feed.getLocations() != null) {
-                Set<String> currentClusters = DeploymentUtil.getCurrentClusters();
-                for (Cluster cluster : feed.getClusters().getClusters()) {
-                    if (currentClusters.contains(cluster.getName())) {
-                        if (FeedHelper.getSLA(cluster, feed) != null) {
-                            LOG.debug("Adding feed:{} for monitoring", feed.getName());
-                            MONITORING_JDBC_STATE_STORE.putMonitoredFeed(feed.getName());
-                        }
-                    }
-                }
-            }
-        }
-    }
-
-    @Override
-    public void onRemove(Entity entity) throws FalconException {
-        if (entity.getEntityType() == EntityType.FEED) {
-            Feed feed = (Feed) entity;
-            // currently sla service is enabled only for fileSystemStorage
-            if (feed.getLocations() != null) {
-                Set<String> currentClusters = DeploymentUtil.getCurrentClusters();
-                for (Cluster cluster : feed.getClusters().getClusters()) {
-                    if (currentClusters.contains(cluster.getName()) && FeedHelper.getSLA(cluster, feed) != null) {
-                        MONITORING_JDBC_STATE_STORE.deleteMonitoringFeed(feed.getName());
-                        MONITORING_JDBC_STATE_STORE.deletePendingInstances(feed.getName(), cluster.getName());
-                    }
-                }
-            }
-        }
-    }
-
-    private boolean isSLAMonitoringEnabledInCurrentColo(Feed feed) {
-        if (feed.getLocations() != null) {
-            Set<String> currentClusters = DeploymentUtil.getCurrentClusters();
-            for (Cluster cluster : feed.getClusters().getClusters()) {
-                if (currentClusters.contains(cluster.getName()) && FeedHelper.getSLA(cluster, feed) != null) {
-                    return true;
-                }
-            }
-        }
-        return false;
-    }
-
-    @Override
-    public void onChange(Entity oldEntity, Entity newEntity) throws FalconException {
-        if (newEntity.getEntityType() == EntityType.FEED) {
-            Feed oldFeed = (Feed) oldEntity;
-            Feed newFeed = (Feed) newEntity;
-            if (!isSLAMonitoringEnabledInCurrentColo(newFeed)) {
-                onRemove(oldFeed);
-            } else if (!isSLAMonitoringEnabledInCurrentColo(oldFeed)) {
-                onAdd(newFeed);
-            } else {
-                List<String> slaRemovedClusters = new ArrayList<>();
-                for (String oldCluster : EntityUtil.getClustersDefinedInColos(oldFeed)) {
-                    if (FeedHelper.getSLA(oldCluster, oldFeed) != null
-                        && FeedHelper.getSLA(oldCluster, newFeed) == null) {
-                        slaRemovedClusters.add(oldCluster);
-                    }
-                }
-
-                for (String clusterName : slaRemovedClusters) {
-                    MONITORING_JDBC_STATE_STORE.deletePendingInstances(newFeed.getName(), clusterName);
-                }
-            }
-        }
-    }
-
-    @Override
-    public void onReload(Entity entity) throws FalconException {
-        onAdd(entity);
-    }
-
-    @Override
-    public String getName() {
-        return FeedSLAMonitoringService.class.getSimpleName();
-    }
-
-    @Override
-    public void init() throws FalconException {
-        String uri = StartupProperties.get().getProperty("feed.sla.service.store.uri");
-        storePath = new Path(uri);
-        filePath = new Path(storePath, "feedSLAMonitoringService");
-        fileSystem = initializeFileSystem();
-
-        String freq = StartupProperties.get().getProperty("feed.sla.statusCheck.frequency.seconds", "600");
-        statusCheckFrequencySeconds = Integer.parseInt(freq);
-
-        freq = StartupProperties.get().getProperty("feed.sla.lookAheadWindow.millis", "900000");
-        lookAheadWindowMillis = Integer.parseInt(freq);
-        LOG.debug("No old state exists at: {}, Initializing a clean state.", filePath.toString());
-        ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
-        executor.scheduleWithFixedDelay(new Monitor(), 0, statusCheckFrequencySeconds, TimeUnit.SECONDS);
-    }
-
-    public void makeFeedInstanceAvailable(String feedName, String clusterName, Date nominalTime)
-        throws FalconException {
-        LOG.info("Removing {} feed's instance {} in cluster {} from pendingSLA", feedName,
-                clusterName, nominalTime);
-        List<Date> instances = (MONITORING_JDBC_STATE_STORE.getNominalInstances(feedName, clusterName));
-        // Slas for feeds not having sla tag are not stored.
-        if (CollectionUtils.isEmpty(instances)){
-            MONITORING_JDBC_STATE_STORE.deletePendingInstance(feedName, clusterName, nominalTime);
-        }
-    }
-
-    private FileSystem initializeFileSystem() {
-        try {
-            fileSystem = HadoopClientFactory.get().createFalconFileSystem(storePath.toUri());
-            if (!fileSystem.exists(storePath)) {
-                LOG.info("Creating directory for pending feed instances: {}", storePath);
-                // set permissions so config store dir is owned by falcon alone
-                HadoopClientFactory.mkdirs(fileSystem, storePath, STORE_PERMISSION);
-            }
-            return fileSystem;
-        } catch (Exception e) {
-            throw new RuntimeException("Unable to bring up feed sla store for path: " + storePath, e);
-        }
-    }
-
-    @Override
-    public void destroy() throws FalconException {
-    }
-
-    //Periodically update status of pending instances, add new instances and take backup.
-    private class Monitor implements Runnable {
-
-        @Override
-        public void run() {
-            try {
-                if (MONITORING_JDBC_STATE_STORE.getAllMonitoredFeed().size() > 0) {
-                    checkPendingInstanceAvailability();
-
-                    // add Instances from last checked time to 10 minutes from now(some buffer for status check)
-                    Date now = new Date();
-                    Date newCheckPoint = new Date(now.getTime() + lookAheadWindowMillis);
-                    addNewPendingFeedInstances(newCheckPoint);
-                }
-            } catch (Throwable e) {
-                LOG.error("Feed SLA monitoring failed: ", e);
-            }
-        }
-    }
-
-
-    void addNewPendingFeedInstances(Date to) throws FalconException {
-        Set<String> currentClusters = DeploymentUtil.getCurrentClusters();
-        List<MonitoredFeedsBean> feedsBeanList = MONITORING_JDBC_STATE_STORE.getAllMonitoredFeed();
-        for(MonitoredFeedsBean monitoredFeedsBean : feedsBeanList) {
-            String feedName = monitoredFeedsBean.getFeedName();
-            Feed feed = EntityUtil.getEntity(EntityType.FEED, feedName);
-            for (Cluster feedCluster : feed.getClusters().getClusters()) {
-                if (currentClusters.contains(feedCluster.getName())) {
-                    // get start of instances from the database
-                    Date nextInstanceTime = MONITORING_JDBC_STATE_STORE.getLastInstanceTime(feedName);
-                    Pair<String, String> key = new Pair<>(feed.getName(), feedCluster.getName());
-                    if (nextInstanceTime == null) {
-                        nextInstanceTime = getInitialStartTime(feed, feedCluster.getName());
-                    } else {
-                        nextInstanceTime = new Date(nextInstanceTime.getTime() + ONE_MS);
-                    }
-
-                    Set<Date> instances = new HashSet<>();
-                    org.apache.falcon.entity.v0.cluster.Cluster currentCluster =
-                            EntityUtil.getEntity(EntityType.CLUSTER, feedCluster.getName());
-                    nextInstanceTime = EntityUtil.getNextStartTime(feed, currentCluster, nextInstanceTime);
-                    Date endDate = FeedHelper.getClusterValidity(feed, currentCluster.getName()).getEnd();
-                    while (nextInstanceTime.before(to) && nextInstanceTime.before(endDate)) {
-                        LOG.debug("Adding instance={} for <feed,cluster>={}", nextInstanceTime, key);
-                        instances.add(nextInstanceTime);
-                        nextInstanceTime = new Date(nextInstanceTime.getTime() + ONE_MS);
-                        nextInstanceTime = EntityUtil.getNextStartTime(feed, currentCluster, nextInstanceTime);
-                    }
-
-                    for(Date date:instances){
-                        MONITORING_JDBC_STATE_STORE.putPendingInstances(feed.getName(), feedCluster.getName(), date);
-                    }
-                }
-            }
-        }
-    }
-
-
-    /**
-     * Checks the availability of all the pendingInstances and removes the ones which have become available.
-     */
-    private void checkPendingInstanceAvailability() throws FalconException {
-        for(PendingInstanceBean pendingInstanceBean : MONITORING_JDBC_STATE_STORE.getAllInstances()){
-            for (Date date : MONITORING_JDBC_STATE_STORE.getNominalInstances(pendingInstanceBean.getFeedName(),
-                    pendingInstanceBean.getClusterName())) {
-                boolean status = checkFeedInstanceAvailability(pendingInstanceBean.getFeedName(),
-                        pendingInstanceBean.getClusterName(), date);
-                if (status) {
-                    MONITORING_JDBC_STATE_STORE.deletePendingInstance(pendingInstanceBean.getFeedName(),
-                            pendingInstanceBean.getClusterName(), date);
-                }
-            }
-        }
-    }
-
-    // checks whether a given feed instance is available or not
-    private boolean checkFeedInstanceAvailability(String feedName, String clusterName, Date nominalTime) throws
-        FalconException {
-        Feed feed = EntityUtil.getEntity(EntityType.FEED, feedName);
-
-        try {
-            LOG.debug("Checking instance availability status for feed:{}, cluster:{}, instanceTime:{}", feed.getName(),
-                    clusterName, nominalTime);
-            FeedInstanceStatus.AvailabilityStatus status = FeedHelper.getFeedInstanceStatus(feed, clusterName,
-                    nominalTime);
-            if (status.equals(FeedInstanceStatus.AvailabilityStatus.AVAILABLE)
-                    || status.equals(FeedInstanceStatus.AvailabilityStatus.EMPTY)) {
-                LOG.debug("Feed instance(feed:{}, cluster:{}, instanceTime:{}) is available.", feed.getName(),
-                    clusterName, nominalTime);
-                return true;
-            }
-        } catch (Throwable e) {
-            LOG.error("Couldn't find status for feed:{}, cluster:{}", feedName, clusterName, e);
-        }
-        LOG.debug("Feed instance(feed:{}, cluster:{}, instanceTime:{}) is not available.", feed.getName(),
-            clusterName, nominalTime);
-        return false;
-    }
-
-
-    /**
-     * Returns all {@link org.apache.falcon.entity.v0.feed.Feed} instances between given time range which have missed
-     * slaLow or slaHigh.
-     *
-     * Only feeds which have defined sla in their definition are considered.
-     * Only the feed instances between the given time range are considered.
-     * Start time and end time are both inclusive.
-     * @param start start time, inclusive
-     * @param end end time, inclusive
-     * @return Set of pending feed instances belonging to the given range which have missed SLA
-     * @throws FalconException
-     */
-    public Set<SchedulableEntityInstance> getFeedSLAMissPendingAlerts(Date start, Date end)
-        throws FalconException {
-        Set<SchedulableEntityInstance> result = new HashSet<>();
-        for(PendingInstanceBean pendingInstanceBean : MONITORING_JDBC_STATE_STORE.getAllInstances()){
-            Pair<String, String> feedClusterPair = new Pair<>(pendingInstanceBean.getFeedName(),
-                    pendingInstanceBean.getClusterName());
-            Feed feed = EntityUtil.getEntity(EntityType.FEED, feedClusterPair.first);
-            Cluster cluster = FeedHelper.getCluster(feed, feedClusterPair.second);
-            Sla sla = FeedHelper.getSLA(cluster, feed);
-            if (sla != null) {
-                Set<Pair<Date, String>> slaStatus = getSLAStatus(sla, start, end,
-                    MONITORING_JDBC_STATE_STORE.getNominalInstances(pendingInstanceBean.getFeedName(),
-                        pendingInstanceBean.getClusterName()));
-                for (Pair<Date, String> status : slaStatus) {
-                    SchedulableEntityInstance instance = new SchedulableEntityInstance(feedClusterPair.first,
-                        feedClusterPair.second, status.first, EntityType.FEED);
-                    instance.setTags(status.second);
-                    result.add(instance);
-                }
-            }
-        }
-        return result;
-    }
-
-    /**
-     * Returns all {@link org.apache.falcon.entity.v0.feed.Feed} instances of a given feed between the given time range
-     * which missed sla.Only those instances are included which have missed either slaLow or slaHigh.
-     * @param feedName name of the feed
-     * @param clusterName cluster name
-     * @param start start time, inclusive
-     * @param end end time, inclusive
-     * @return Pending feed instances of the given feed which belong to the given time range and have missed SLA.
-     * @throws FalconException
-     */
-    public Set<SchedulableEntityInstance> getFeedSLAMissPendingAlerts(String feedName, String clusterName,
-                                                                 Date start, Date end) throws FalconException {
-
-        Set<SchedulableEntityInstance> result = new HashSet<>();
-        Pair<String, String> feedClusterPair = new Pair<>(feedName, clusterName);
-        List<Date> missingInstances = MONITORING_JDBC_STATE_STORE.getNominalInstances(feedName, clusterName);
-        Feed feed = EntityUtil.getEntity(EntityType.FEED, feedName);
-        Cluster cluster = FeedHelper.getCluster(feed, feedClusterPair.second);
-        Sla sla = FeedHelper.getSLA(cluster, feed);
-        if (missingInstances != null && sla != null) {
-            Set<Pair<Date, String>> slaStatus = getSLAStatus(sla, start, end, missingInstances);
-            for (Pair<Date, String> status : slaStatus){
-                SchedulableEntityInstance instance = new SchedulableEntityInstance(feedName, clusterName, status.first,
-                        EntityType.FEED);
-                instance.setTags(status.second);
-                result.add(instance);
-            }
-        }
-        return result;
-    }
-
-    Set<Pair<Date, String>> getSLAStatus(Sla sla, Date start, Date end, List<Date> missingInstances)
-        throws FalconException {
-        Date now = new Date();
-        Frequency slaLow = sla.getSlaLow();
-        Frequency slaHigh = sla.getSlaHigh();
-        Set<Pair<Date, String>> result = new HashSet<>();
-        for (Date nominalTime : missingInstances) {
-            if (!nominalTime.before(start) && !nominalTime.after(end)) {
-                ExpressionHelper.setReferenceDate(nominalTime);
-                ExpressionHelper evaluator = ExpressionHelper.get();
-                Long slaHighDuration = evaluator.evaluate(slaHigh.toString(), Long.class);
-                Long slaLowDuration = evaluator.evaluate(slaLow.toString(), Long.class);
-                Date slaCriticalTime = new Date(nominalTime.getTime() + slaHighDuration);
-                Date slaWarnTime = new Date(nominalTime.getTime() + slaLowDuration);
-                if (slaCriticalTime.before(now)) {
-                    result.add(new Pair<>(nominalTime, TAG_CRITICAL));
-                } else if (slaWarnTime.before(now)) {
-                    result.add(new Pair<>(nominalTime, TAG_WARN));
-                }
-            }
-        }
-        return result;
-    }
-
-    @VisibleForTesting
-    Date getInitialStartTime(Feed feed, String clusterName) throws FalconException {
-        Sla sla = FeedHelper.getSLA(clusterName, feed);
-        if (sla == null) {
-            throw new IllegalStateException("InitialStartTime can not be determined as the feed: "
-                + feed.getName() + " and cluster: " + clusterName + " does not have any sla");
-        }
-        Date startTime = FeedHelper.getFeedValidityStart(feed, clusterName);
-        Frequency slaLow = sla.getSlaLow();
-        Date slaTime = new Date(DateUtil.now().getTime() - DateUtil.getFrequencyInMillis(slaLow));
-        return startTime.before(slaTime) ? startTime : slaTime;
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java b/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java
index a4a95be..8cf2b2d 100644
--- a/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java
+++ b/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java
@@ -25,6 +25,7 @@ import javax.persistence.Query;
 
 import org.apache.falcon.cluster.util.EmbeddedCluster;
 import org.apache.falcon.entity.AbstractTestBase;
+import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.service.FalconJPAService;
 import org.apache.falcon.tools.FalconStateStoreDBCLI;
@@ -85,40 +86,44 @@ public class MonitoringJdbcStateStoreTest extends AbstractTestBase {
 
     @Test
     public void testInsertRetrieveAndUpdate() throws Exception {
-        monitoringJdbcStateStore.putMonitoredFeed("test_feed1");
-        monitoringJdbcStateStore.putMonitoredFeed("test_feed2");
-        Assert.assertEquals("test_feed1", monitoringJdbcStateStore.getMonitoredFeed("test_feed1").getFeedName());
+        monitoringJdbcStateStore.putMonitoredEntity("test_feed1", EntityType.FEED.toString());
+        monitoringJdbcStateStore.putMonitoredEntity("test_feed2", EntityType.FEED.toString());
+        Assert.assertEquals("test_feed1", monitoringJdbcStateStore.getMonitoredEntity("test_feed1",
+                EntityType.FEED.toString()).getFeedName());
         Assert.assertEquals(monitoringJdbcStateStore.getAllMonitoredFeed().size(), 2);
 
-        monitoringJdbcStateStore.deleteMonitoringFeed("test_feed1");
-        monitoringJdbcStateStore.deleteMonitoringFeed("test_feed2");
+        monitoringJdbcStateStore.deleteMonitoringEntity("test_feed1", EntityType.FEED.toString());
+        monitoringJdbcStateStore.deleteMonitoringEntity("test_feed2", EntityType.FEED.toString());
         Date dateOne =  SchemaHelper.parseDateUTC("2015-11-20T00:00Z");
         Date dateTwo =  SchemaHelper.parseDateUTC("2015-11-20T01:00Z");
-        monitoringJdbcStateStore.putPendingInstances("test_feed1", "test_cluster", dateOne);
-        monitoringJdbcStateStore.putPendingInstances("test_feed1", "test_cluster", dateTwo);
-
-        Assert.assertEquals(monitoringJdbcStateStore.getNominalInstances("test_feed1", "test_cluster").size(), 2);
-        monitoringJdbcStateStore.deletePendingInstance("test_feed1", "test_cluster", dateOne);
-        Assert.assertEquals(monitoringJdbcStateStore.getNominalInstances("test_feed1", "test_cluster").size(), 1);
-        monitoringJdbcStateStore.deletePendingInstances("test_feed1", "test_cluster");
+        monitoringJdbcStateStore.putPendingInstances("test_feed1", "test_cluster", dateOne, EntityType.FEED.toString());
+        monitoringJdbcStateStore.putPendingInstances("test_feed1", "test_cluster", dateTwo, EntityType.FEED.toString());
+
+        Assert.assertEquals(monitoringJdbcStateStore.getNominalInstances("test_feed1", "test_cluster",
+                EntityType.FEED.toString()).size(), 2);
+        monitoringJdbcStateStore.deletePendingInstance("test_feed1", "test_cluster", dateOne,
+                EntityType.FEED.toString());
+        Assert.assertEquals(monitoringJdbcStateStore.getNominalInstances("test_feed1", "test_cluster",
+                EntityType.FEED.toString()).size(), 1);
+        monitoringJdbcStateStore.deletePendingInstances("test_feed1", "test_cluster", EntityType.FEED.toString());
     }
 
     @Test
     public void testEmptyLatestInstance() throws Exception {
         MonitoringJdbcStateStore store = new MonitoringJdbcStateStore();
-        store.putMonitoredFeed("test-feed1");
-        store.putMonitoredFeed("test-feed2");
-        Assert.assertNull(store.getLastInstanceTime("test-feed1"));
+        store.putMonitoredEntity("test-feed1", EntityType.FEED.toString());
+        store.putMonitoredEntity("test-feed2", EntityType.FEED.toString());
+        Assert.assertNull(store.getLastInstanceTime("test-feed1", EntityType.FEED.toString()));
 
         Date dateOne =  SchemaHelper.parseDateUTC("2015-11-20T00:00Z");
         Date dateTwo =  SchemaHelper.parseDateUTC("2015-11-20T01:00Z");
 
-        store.putPendingInstances("test-feed1", "test_cluster", dateTwo);
-        store.putPendingInstances("test-feed1", "test_cluster", dateOne);
-        store.putPendingInstances("test-feed2", "test_cluster", dateOne);
+        store.putPendingInstances("test-feed1", "test_cluster", dateTwo, EntityType.FEED.toString());
+        store.putPendingInstances("test-feed1", "test_cluster", dateOne, EntityType.FEED.toString());
+        store.putPendingInstances("test-feed2", "test_cluster", dateOne, EntityType.FEED.toString());
 
-        Assert.assertTrue(dateTwo.equals(store.getLastInstanceTime("test-feed1")));
-        Assert.assertTrue(dateOne.equals(store.getLastInstanceTime("test-feed2")));
+        Assert.assertTrue(dateTwo.equals(store.getLastInstanceTime("test-feed1", EntityType.FEED.toString())));
+        Assert.assertTrue(dateOne.equals(store.getLastInstanceTime("test-feed2", EntityType.FEED.toString())));
 
     }
 
@@ -126,14 +131,15 @@ public class MonitoringJdbcStateStoreTest extends AbstractTestBase {
     public void testputSLALowCandidate() throws Exception{
         MonitoringJdbcStateStore store = new MonitoringJdbcStateStore();
         Date dateOne =  SchemaHelper.parseDateUTC("2015-11-20T00:00Z");
-        store.putSLAAlertInstance("test-feed1", "test-cluster", dateOne, Boolean.TRUE, Boolean.FALSE);
-        Assert.assertEquals(Boolean.TRUE, store.getFeedAlertInstance("test-feed1",
-                "test-cluster", dateOne).getIsSLALowMissed());
-        Assert.assertTrue(dateOne.equals(store.getFeedAlertInstance("test-feed1",
-                "test-cluster", dateOne).getNominalTime()));
-        store.updateSLAAlertInstance("test-feed1", "test-cluster", dateOne);
-        Assert.assertEquals(Boolean.TRUE, store.getFeedAlertInstance("test-feed1",
-                "test-cluster", dateOne).getIsSLAHighMissed());
+        store.putSLAAlertInstance("test-feed1", "test-cluster", EntityType.FEED.toString(),
+                dateOne, Boolean.TRUE, Boolean.FALSE);
+        Assert.assertEquals(Boolean.TRUE, store.getEntityAlertInstance("test-feed1",
+                "test-cluster", dateOne, EntityType.FEED.toString()).getIsSLALowMissed());
+        Assert.assertTrue(dateOne.equals(store.getEntityAlertInstance("test-feed1",
+                "test-cluster", dateOne, EntityType.FEED.toString()).getNominalTime()));
+        store.updateSLAAlertInstance("test-feed1", "test-cluster", dateOne, EntityType.FEED.toString());
+        Assert.assertEquals(Boolean.TRUE, store.getEntityAlertInstance("test-feed1",
+                "test-cluster", dateOne, EntityType.FEED.toString()).getIsSLAHighMissed());
     }
 
     @Test
@@ -141,21 +147,22 @@ public class MonitoringJdbcStateStoreTest extends AbstractTestBase {
         MonitoringJdbcStateStore store = new MonitoringJdbcStateStore();
         Date dateOne =  SchemaHelper.parseDateUTC("2015-11-20T00:00Z");
 
-        store.putSLAAlertInstance("test-feed1", "test-cluster", dateOne, Boolean.TRUE, Boolean.FALSE);
-        store.updateSLAAlertInstance("test-feed1", "test-cluster", dateOne);
-        Assert.assertEquals(Boolean.TRUE, store.getFeedAlertInstance("test-feed1",
-                "test-cluster", dateOne).getIsSLAHighMissed());
+        store.putSLAAlertInstance("test-process", "test-cluster", EntityType.PROCESS.toString(),
+                dateOne, Boolean.TRUE, Boolean.FALSE);
+        store.updateSLAAlertInstance("test-process", "test-cluster", dateOne, EntityType.PROCESS.toString());
+        Assert.assertEquals(Boolean.TRUE, store.getEntityAlertInstance("test-process",
+                "test-cluster", dateOne, EntityType.PROCESS.toString()).getIsSLAHighMissed());
     }
 
     private void clear() {
         EntityManager em = FalconJPAService.get().getEntityManager();
         em.getTransaction().begin();
         try {
-            Query query = em.createNativeQuery("delete from MONITORED_FEEDS");
+            Query query = em.createNativeQuery("delete from PENDING_INSTANCES");
             query.executeUpdate();
-            query = em.createNativeQuery("delete from PENDING_INSTANCES");
+            query = em.createNativeQuery("delete from MONITORED_ENTITY");
             query.executeUpdate();
-            query = em.createNativeQuery("delete from FEED_SLA_ALERTS");
+            query = em.createNativeQuery("delete from ENTITY_SLA_ALERTS");
             query.executeUpdate();
 
         } finally {

http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/prism/src/test/java/org/apache/falcon/service/EntitySLAAlertServiceTest.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/service/EntitySLAAlertServiceTest.java b/prism/src/test/java/org/apache/falcon/service/EntitySLAAlertServiceTest.java
new file mode 100644
index 0000000..c8b4f5e
--- /dev/null
+++ b/prism/src/test/java/org/apache/falcon/service/EntitySLAAlertServiceTest.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.service;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.cluster.util.EmbeddedCluster;
+import org.apache.falcon.entity.AbstractTestBase;
+import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.entity.v0.feed.Cluster;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.Sla;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.jdbc.MonitoringJdbcStateStore;
+import org.apache.falcon.tools.FalconStateStoreDBCLI;
+import org.apache.falcon.util.StateStoreProperties;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+import java.io.File;
+import java.util.Date;
+
+/**
+ * Test for EntitySLAMonitoringService.
+ */
+public class EntitySLAAlertServiceTest extends AbstractTestBase {
+    private static final String DB_BASE_DIR = "target/test-data/persistancedb";
+    protected static String dbLocation = DB_BASE_DIR + File.separator + "data.db";
+    protected static String url = "jdbc:derby:"+ dbLocation +";create=true";
+    protected static final String DB_SQL_FILE = DB_BASE_DIR + File.separator + "out.sql";
+    protected LocalFileSystem fs = new LocalFileSystem();
+
+    private static MonitoringJdbcStateStore monitoringJdbcStateStore;
+    private static FalconJPAService falconJPAService = FalconJPAService.get();
+
+    protected int execDBCLICommands(String[] args) {
+        return new FalconStateStoreDBCLI().run(args);
+    }
+
+    public void createDB(String file) {
+        File sqlFile = new File(file);
+        String[] argsCreate = { "create", "-sqlfile", sqlFile.getAbsolutePath(), "-run" };
+        int result = execDBCLICommands(argsCreate);
+        Assert.assertEquals(0, result);
+        Assert.assertTrue(sqlFile.exists());
+
+    }
+
+    @BeforeClass
+    public void setup() throws Exception{
+        StateStoreProperties.get().setProperty(FalconJPAService.URL, url);
+        Configuration localConf = new Configuration();
+        fs.initialize(LocalFileSystem.getDefaultUri(localConf), localConf);
+        fs.mkdirs(new Path(DB_BASE_DIR));
+        createDB(DB_SQL_FILE);
+        falconJPAService.init();
+        this.dfsCluster = EmbeddedCluster.newCluster("testCluster");
+        this.conf = dfsCluster.getConf();
+        monitoringJdbcStateStore = new MonitoringJdbcStateStore();
+    }
+
+    @BeforeMethod
+    public void init() {
+        clear();
+    }
+
+    private void clear() {
+        EntityManager em = FalconJPAService.get().getEntityManager();
+        em.getTransaction().begin();
+        try {
+            Query query = em.createNativeQuery("delete from PENDING_INSTANCES");
+            query.executeUpdate();
+            query = em.createNativeQuery("delete from MONITORED_ENTITY");
+            query.executeUpdate();
+            query = em.createNativeQuery("delete from ENTITY_SLA_ALERTS");
+            query.executeUpdate();
+
+        } finally {
+            em.getTransaction().commit();
+            em.close();
+        }
+    }
+
+    @Test
+    public static void processSLALowCandidates() throws FalconException, InterruptedException{
+
+        Date dateOne =  new Date(System.currentTimeMillis()-100000);
+        monitoringJdbcStateStore.putPendingInstances("test-feed", "test-cluster", dateOne, EntityType.FEED.toString());
+        org.apache.falcon.entity.v0.feed.Clusters cluster = new org.apache.falcon.entity.v0.feed.Clusters();
+        org.apache.falcon.entity.v0.cluster.Cluster cluster1 = new org.apache.falcon.entity.v0.cluster.Cluster();
+        cluster1.setName("test-cluster");
+        Cluster testCluster = new Cluster();
+        testCluster.setName("test-cluster");
+        cluster.getClusters().add(testCluster);
+        Feed mockEntity = new Feed();
+        mockEntity.setName("test-feed");
+        mockEntity.setClusters(cluster);
+        cluster1.setColo("test-cluster");
+
+
+        if (ConfigurationStore.get().get(EntityType.FEED, mockEntity.getName()) == null) {
+            ConfigurationStore.get().publish(EntityType.FEED, mockEntity);
+        }
+        if (ConfigurationStore.get().get(EntityType.CLUSTER, cluster1.getName()) == null) {
+            ConfigurationStore.get().publish(EntityType.CLUSTER, cluster1);
+        }
+        Sla sla = new Sla();
+        Frequency frequencyLow = new Frequency("1", Frequency.TimeUnit.minutes);
+        Frequency frequencyHigh = new Frequency("2", Frequency.TimeUnit.minutes);
+        sla.setSlaLow(frequencyLow);
+        sla.setSlaHigh(frequencyHigh);
+        mockEntity.setSla(sla);
+
+        EntitySLAAlertService.get().init();
+        Thread.sleep(10*1000);
+        Assert.assertTrue(monitoringJdbcStateStore.getEntityAlertInstance("test-feed", "test-cluster",
+                dateOne, EntityType.FEED.toString()).getIsSLALowMissed());
+    }
+
+    @Test
+    public static void processSLACandidateProcess() throws FalconException, InterruptedException{
+        Date dateOne =  new Date(System.currentTimeMillis()-130000);
+
+        monitoringJdbcStateStore.putPendingInstances("test-process", "test-cluster", dateOne,
+                EntityType.PROCESS.name());
+        EntitySLAAlertService.get().init();
+        org.apache.falcon.entity.v0.process.Clusters cluster = new org.apache.falcon.entity.v0.process.Clusters();
+        org.apache.falcon.entity.v0.cluster.Cluster processCluster = new org.apache.falcon.entity.v0.cluster.Cluster();
+        processCluster.setName("test-cluster");
+        org.apache.falcon.entity.v0.process.Cluster testCluster = new org.apache.falcon.entity.v0.process.Cluster();
+        testCluster.setName("test-cluster");
+        cluster.getClusters().add(testCluster);
+        Process process =  new Process();
+        process.setName("test-process");
+        process.setClusters(cluster);
+        processCluster.setColo("test-cluster");
+
+        if (ConfigurationStore.get().get(EntityType.PROCESS, process.getName()) == null){
+            ConfigurationStore.get().publish(EntityType.PROCESS, process);
+        }
+        if (ConfigurationStore.get().get(EntityType.CLUSTER, processCluster.getName()) == null) {
+            ConfigurationStore.get().publish(EntityType.CLUSTER, processCluster);
+        }
+        org.apache.falcon.entity.v0.process.Sla sla = new org.apache.falcon.entity.v0.process.Sla();
+        Frequency processFrequency = new Frequency("1", Frequency.TimeUnit.minutes);
+        sla.setShouldEndIn(processFrequency);
+        process.setSla(sla);
+
+
+        Thread.sleep(10*1000);
+        Assert.assertTrue(monitoringJdbcStateStore.getEntityAlertInstance("test-process", "test-cluster", dateOne,
+                EntityType.PROCESS.name()).getIsSLAHighMissed());
+
+    }
+
+    @Test(expectedExceptions = javax.persistence.NoResultException.class)
+    public static void processSLAHighCandidates() throws FalconException, InterruptedException{
+
+        Date dateOne =  new Date(System.currentTimeMillis()-130000);
+        monitoringJdbcStateStore.putPendingInstances("test-feed", "test-cluster", dateOne, EntityType.FEED.toString());
+        org.apache.falcon.entity.v0.feed.Clusters cluster = new org.apache.falcon.entity.v0.feed.Clusters();
+        org.apache.falcon.entity.v0.cluster.Cluster cluster1 = new org.apache.falcon.entity.v0.cluster.Cluster();
+        cluster1.setName("test-cluster");
+        Cluster testCluster = new Cluster();
+        testCluster.setName("test-cluster");
+        cluster.getClusters().add(testCluster);
+        Feed mockEntity = new Feed();
+        mockEntity.setName("test-feed");
+        mockEntity.setClusters(cluster);
+        cluster1.setColo("test-cluster");
+        if (ConfigurationStore.get().get(EntityType.FEED, mockEntity.getName()) == null) {
+            ConfigurationStore.get().publish(EntityType.FEED, mockEntity);
+        }
+        if (ConfigurationStore.get().get(EntityType.CLUSTER, cluster1.getName()) == null) {
+            ConfigurationStore.get().publish(EntityType.CLUSTER, cluster1);
+        }
+        Sla sla = new Sla();
+        Frequency frequencyLow = new Frequency("1", Frequency.TimeUnit.minutes);
+        Frequency frequencyHigh = new Frequency("2", Frequency.TimeUnit.minutes);
+        sla.setSlaLow(frequencyLow);
+        sla.setSlaHigh(frequencyHigh);
+        mockEntity.setSla(sla);
+
+        EntitySLAAlertService.get().init();
+        Thread.sleep(10*1000);
+        Assert.assertTrue(monitoringJdbcStateStore.getEntityAlertInstance("test-feed", "test-cluster",
+                dateOne, EntityType.FEED.toString()).getIsSLAHighMissed());
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/prism/src/test/java/org/apache/falcon/service/FeedSLAAlertServiceTest.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/service/FeedSLAAlertServiceTest.java b/prism/src/test/java/org/apache/falcon/service/FeedSLAAlertServiceTest.java
deleted file mode 100644
index 7c886c1..0000000
--- a/prism/src/test/java/org/apache/falcon/service/FeedSLAAlertServiceTest.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.service;
-
-import org.apache.falcon.FalconException;
-import org.apache.falcon.cluster.util.EmbeddedCluster;
-import org.apache.falcon.entity.AbstractTestBase;
-import org.apache.falcon.entity.store.ConfigurationStore;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.Frequency;
-import org.apache.falcon.entity.v0.feed.Cluster;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.feed.Sla;
-import org.apache.falcon.jdbc.MonitoringJdbcStateStore;
-import org.apache.falcon.tools.FalconStateStoreDBCLI;
-import org.apache.falcon.util.StateStoreProperties;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.Path;
-import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-import javax.persistence.EntityManager;
-import javax.persistence.Query;
-import java.io.File;
-import java.util.Date;
-
-/**
- * Test for FeedSLAMonitoringService.
- */
-public class FeedSLAAlertServiceTest extends AbstractTestBase {
-    private static final String DB_BASE_DIR = "target/test-data/persistancedb";
-    protected static String dbLocation = DB_BASE_DIR + File.separator + "data.db";
-    protected static String url = "jdbc:derby:"+ dbLocation +";create=true";
-    protected static final String DB_SQL_FILE = DB_BASE_DIR + File.separator + "out.sql";
-    protected LocalFileSystem fs = new LocalFileSystem();
-
-    private static MonitoringJdbcStateStore monitoringJdbcStateStore;
-    private static FalconJPAService falconJPAService = FalconJPAService.get();
-
-    protected int execDBCLICommands(String[] args) {
-        return new FalconStateStoreDBCLI().run(args);
-    }
-
-    public void createDB(String file) {
-        File sqlFile = new File(file);
-        String[] argsCreate = { "create", "-sqlfile", sqlFile.getAbsolutePath(), "-run" };
-        int result = execDBCLICommands(argsCreate);
-        Assert.assertEquals(0, result);
-        Assert.assertTrue(sqlFile.exists());
-
-    }
-
-    @BeforeClass
-    public void setup() throws Exception{
-        StateStoreProperties.get().setProperty(FalconJPAService.URL, url);
-        Configuration localConf = new Configuration();
-        fs.initialize(LocalFileSystem.getDefaultUri(localConf), localConf);
-        fs.mkdirs(new Path(DB_BASE_DIR));
-        createDB(DB_SQL_FILE);
-        falconJPAService.init();
-        this.dfsCluster = EmbeddedCluster.newCluster("testCluster");
-        this.conf = dfsCluster.getConf();
-        monitoringJdbcStateStore = new MonitoringJdbcStateStore();
-    }
-
-    @BeforeMethod
-    public void init() {
-        clear();
-    }
-
-    private void clear() {
-        EntityManager em = FalconJPAService.get().getEntityManager();
-        em.getTransaction().begin();
-        try {
-            Query query = em.createNativeQuery("delete from MONITORED_FEEDS");
-            query.executeUpdate();
-            query = em.createNativeQuery("delete from PENDING_INSTANCES");
-            query.executeUpdate();
-            query = em.createNativeQuery("delete from FEED_SLA_ALERTS");
-            query.executeUpdate();
-
-        } finally {
-            em.getTransaction().commit();
-            em.close();
-        }
-    }
-
-    @Test
-    public static void processSLALowCandidates() throws FalconException, InterruptedException{
-
-        Date dateOne =  new Date(System.currentTimeMillis()-100000);
-        monitoringJdbcStateStore.putPendingInstances("test-feed", "test-cluster", dateOne);
-        org.apache.falcon.entity.v0.feed.Clusters cluster = new org.apache.falcon.entity.v0.feed.Clusters();
-        Cluster testCluster = new Cluster();
-        testCluster.setName("test-cluster");
-        cluster.getClusters().add(testCluster);
-        Feed mockEntity = new Feed();
-        mockEntity.setName("test-feed");
-        mockEntity.setClusters(cluster);
-        if (ConfigurationStore.get().get(EntityType.FEED, mockEntity.getName()) == null) {
-            ConfigurationStore.get().publish(EntityType.FEED, mockEntity);
-        }
-        Sla sla = new Sla();
-        Frequency frequencyLow = new Frequency("1", Frequency.TimeUnit.minutes);
-        Frequency frequencyHigh = new Frequency("2", Frequency.TimeUnit.minutes);
-        sla.setSlaLow(frequencyLow);
-        sla.setSlaHigh(frequencyHigh);
-        mockEntity.setSla(sla);
-
-        FeedSLAAlertService.get().init();
-        Thread.sleep(10*1000);
-        Assert.assertTrue(monitoringJdbcStateStore.getFeedAlertInstance("test-feed", "test-cluster",
-                dateOne).getIsSLALowMissed());
-    }
-
-    @Test(expectedExceptions = javax.persistence.NoResultException.class)
-    public static void processSLAHighCandidates() throws FalconException, InterruptedException{
-
-        Date dateOne =  new Date(System.currentTimeMillis()-130000);
-        monitoringJdbcStateStore.putPendingInstances("test-feed", "test-cluster", dateOne);
-        org.apache.falcon.entity.v0.feed.Clusters cluster = new org.apache.falcon.entity.v0.feed.Clusters();
-        Cluster testCluster = new Cluster();
-        testCluster.setName("test-cluster");
-        cluster.getClusters().add(testCluster);
-        Feed mockEntity = new Feed();
-        mockEntity.setName("test-feed");
-        mockEntity.setClusters(cluster);
-        if (ConfigurationStore.get().get(EntityType.FEED, mockEntity.getName()) == null) {
-            ConfigurationStore.get().publish(EntityType.FEED, mockEntity);
-        }
-        Sla sla = new Sla();
-        Frequency frequencyLow = new Frequency("1", Frequency.TimeUnit.minutes);
-        Frequency frequencyHigh = new Frequency("2", Frequency.TimeUnit.minutes);
-        sla.setSlaLow(frequencyLow);
-        sla.setSlaHigh(frequencyHigh);
-        mockEntity.setSla(sla);
-
-        FeedSLAAlertService.get().init();
-        Thread.sleep(10*1000);
-        Assert.assertTrue(monitoringJdbcStateStore.getFeedAlertInstance("test-feed", "test-cluster",
-                dateOne).getIsSLAHighMissed());
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java b/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java
index 97cc459..9cf50c2 100644
--- a/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java
+++ b/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java
@@ -54,7 +54,7 @@ public class FeedSLAMonitoringTest extends AbstractTestBase {
     private static final String CLUSTER_NAME = "testCluster";
     private static final String FEED_NAME = "testFeed";
     private static final TimeZone UTC = TimeZone.getTimeZone("UTC");
-    private static final String TAG_CRITICAL = FeedSLAMonitoringService.get().TAG_CRITICAL;
+    private static final String TAG_CRITICAL = EntitySLAMonitoringService.get().TAG_CRITICAL;
 
     @Test
     public void testSLAStatus() throws FalconException {
@@ -74,7 +74,8 @@ public class FeedSLAMonitoringTest extends AbstractTestBase {
         missingInstances.add(SchemaHelper.parseDateUTC("2015-05-05T00:00Z")); // equal to end time
         missingInstances.add(SchemaHelper.parseDateUTC("2015-05-06T00:00Z")); // after end time
 
-        Set<Pair<Date, String>> result = FeedSLAMonitoringService.get().getSLAStatus(sla, start, end, missingInstances);
+        Set<Pair<Date, String>> result = EntitySLAMonitoringService.get().getFeedSLAStatus(sla, start, end,
+                missingInstances);
         Set<Pair<Date, String>> expected = new HashSet<>();
         expected.add(new Pair<>(SchemaHelper.parseDateUTC("2014-05-05T00:00Z"), TAG_CRITICAL));
         expected.add(new Pair<>(SchemaHelper.parseDateUTC("2014-05-06T00:00Z"), TAG_CRITICAL));


[3/3] falcon git commit: FALCON-2052 Process SLA monitoring

Posted by aj...@apache.org.
FALCON-2052 Process SLA monitoring

Author: Praveen Adlakha <ad...@gmail.com>

Reviewers: Ajay Yadava <aj...@apache.org>, Pallavi Rao

Closes #202 from PraveenAdlakha/2052


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/60e2f68b
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/60e2f68b
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/60e2f68b

Branch: refs/heads/master
Commit: 60e2f68b867476f600ba43dc4dafb97971a78b2e
Parents: bd32b61
Author: Praveen Adlakha <ad...@gmail.com>
Authored: Wed Jul 6 08:51:03 2016 +0530
Committer: Ajay Yadava <aj...@gmail.com>
Committed: Wed Jul 6 08:51:03 2016 +0530

----------------------------------------------------------------------
 client/src/main/resources/process-0.1.xsd       |   4 +-
 .../org/apache/falcon/entity/ProcessHelper.java |  36 ++
 .../falcon/persistence/EntitySLAAlertBean.java  | 168 +++++
 .../falcon/persistence/FeedSLAAlertBean.java    | 134 ----
 .../falcon/persistence/MonitoredEntityBean.java | 103 +++
 .../falcon/persistence/MonitoredFeedsBean.java  |  73 ---
 .../falcon/persistence/PendingInstanceBean.java |  58 +-
 .../persistence/PersistenceConstants.java       |  10 +-
 .../falcon/tools/FalconStateStoreDBCLI.java     |   4 +-
 .../src/main/resources/META-INF/persistence.xml |  18 +-
 common/src/main/resources/startup.properties    |   4 +-
 .../apache/falcon/entity/AbstractTestBase.java  |   2 +-
 .../entity/store/FeedLocationStoreTest.java     |   2 +-
 docs/src/site/twiki/FalconNativeScheduler.twiki |   2 +-
 docs/src/site/twiki/FeedSLAMonitoring.twiki     |   2 +-
 .../falcon/handler/SLAMonitoringHandler.java    |  13 +-
 .../falcon/jdbc/MonitoringJdbcStateStore.java   | 134 ++--
 .../AbstractSchedulableEntityManager.java       |   8 +-
 .../proxy/SchedulableEntityManagerProxy.java    |   2 +-
 .../falcon/service/EntitySLAAlertService.java   | 168 +++++
 .../falcon/service/EntitySLAListener.java       |   3 +-
 .../service/EntitySLAMonitoringService.java     | 644 +++++++++++++++++++
 .../falcon/service/FeedSLAAlertService.java     | 163 -----
 .../service/FeedSLAMonitoringService.java       | 450 -------------
 .../jdbc/MonitoringJdbcStateStoreTest.java      |  77 ++-
 .../service/EntitySLAAlertServiceTest.java      | 213 ++++++
 .../falcon/service/FeedSLAAlertServiceTest.java | 162 -----
 .../falcon/service/FeedSLAMonitoringTest.java   |   5 +-
 src/build/findbugs-exclude.xml                  |   4 +-
 src/conf/startup.properties                     |   6 +-
 30 files changed, 1544 insertions(+), 1128 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/client/src/main/resources/process-0.1.xsd
----------------------------------------------------------------------
diff --git a/client/src/main/resources/process-0.1.xsd b/client/src/main/resources/process-0.1.xsd
index 0d01e33..7ed8474 100644
--- a/client/src/main/resources/process-0.1.xsd
+++ b/client/src/main/resources/process-0.1.xsd
@@ -204,8 +204,7 @@
             </xs:documentation>
         </xs:annotation>
         <xs:sequence>
-            <xs:element type="cluster" name="cluster" maxOccurs="unbounded" minOccurs="1">
-            </xs:element>
+            <xs:element type="cluster" name="cluster" maxOccurs="unbounded" minOccurs="1"/>
         </xs:sequence>
     </xs:complexType>
 
@@ -218,6 +217,7 @@
         </xs:annotation>
         <xs:sequence>
             <xs:element type="validity" name="validity"/>
+            <xs:element type="sla" name="sla" minOccurs="0" maxOccurs="1"/>
         </xs:sequence>
         <xs:attribute type="IDENTIFIER" name="name" use="required"/>
         <xs:attribute type="xs:int" name="version" use="optional" default="0"/>

http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java b/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
index bbfca68..e563d18 100644
--- a/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
@@ -26,6 +26,8 @@ import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.process.Cluster;
 import org.apache.falcon.entity.v0.process.Input;
 import org.apache.falcon.entity.v0.process.Output;
+import org.apache.falcon.entity.v0.process.Validity;
+import  org.apache.falcon.entity.v0.process.Sla;
 import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.expression.ExpressionHelper;
 import org.apache.falcon.resource.SchedulableEntityInstance;
@@ -185,4 +187,38 @@ public final class ProcessHelper {
         }
         return result;
     }
+
+    public static Validity getClusterValidity(Process process, String clusterName) throws FalconException {
+        org.apache.falcon.entity.v0.process.Cluster cluster = getCluster(process, clusterName);
+        if (cluster == null) {
+            throw new FalconException("Invalid cluster: " + clusterName + " for process: " + process.getName());
+        }
+        return cluster.getValidity();
+    }
+
+    public static Sla getSLA(String clusterName, Process process) throws FalconException{
+        Cluster cluster = getCluster(process, clusterName);
+        if (cluster == null){
+            throw new FalconException("Invalid cluster: " + clusterName + " for process: " + process.getName());
+        }
+        return getSLA(cluster, process);
+    }
+
+    public static Sla getSLA(Cluster cluster, Process process) {
+        final Sla clusterSla = cluster.getSla();
+        if (clusterSla != null) {
+            return clusterSla;
+        }
+        return process.getSla();
+    }
+
+    public static Date getProcessValidityStart(Process process, String clusterName) throws FalconException {
+        Cluster processCluster = getCluster(process, clusterName);
+        if (processCluster != null) {
+            return processCluster.getValidity().getStart();
+        } else {
+            throw new FalconException("No matching cluster " + clusterName
+                    + "found for process " + process.getName());
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/common/src/main/java/org/apache/falcon/persistence/EntitySLAAlertBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/EntitySLAAlertBean.java b/common/src/main/java/org/apache/falcon/persistence/EntitySLAAlertBean.java
new file mode 100644
index 0000000..e2096fe
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/persistence/EntitySLAAlertBean.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.persistence;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.v0.EntityType;
+
+import java.util.Date;
+
+import javax.persistence.Basic;
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.GeneratedValue;
+import javax.persistence.GenerationType;
+import javax.persistence.Id;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
+import javax.persistence.Table;
+import javax.validation.constraints.NotNull;
+
+//SUSPEND CHECKSTYLE CHECK LineLengthCheck
+/**
+ * Feed SLA monitoring.
+ * */
+@Entity
+@NamedQueries({
+@NamedQuery(name = PersistenceConstants.GET_ENTITY_ALERTS, query = "select OBJECT(a) from EntitySLAAlertBean a where a.entityName = :entityName and a.entityType = :entityType"),
+@NamedQuery(name = PersistenceConstants.GET_ALL_ENTITY_ALERTS, query = "OBJECT(a) from PendingInstanceBean a "),
+@NamedQuery(name = PersistenceConstants.GET_SLA_HIGH_CANDIDATES, query = "select OBJECT(a) from EntitySLAAlertBean a where a.isSLALowMissed = true and a.isSLAHighMissed = false "),
+    @NamedQuery(name = PersistenceConstants.UPDATE_SLA_HIGH, query = "update EntitySLAAlertBean a set a.isSLAHighMissed = true where a.entityName = :entityName and a.clusterName = :clusterName and a.nominalTime = :nominalTime and a.entityType = :entityType"),
+@NamedQuery(name = PersistenceConstants.GET_ENTITY_ALERT_INSTANCE, query = "select OBJECT(a) from EntitySLAAlertBean a where a.entityName = :entityName and a.clusterName = :clusterName and a.nominalTime = :nominalTime and a.entityType = :entityType"),
+ @NamedQuery(name = PersistenceConstants.DELETE_ENTITY_ALERT_INSTANCE, query = "delete from EntitySLAAlertBean a where a.entityName = :entityName and a.clusterName = :clusterName and a.nominalTime = :nominalTime and a.entityType = :entityType")
+})
+@Table(name = "ENTITY_SLA_ALERTS")
+//RESUME CHECKSTYLE CHECK  LineLengthCheck
+public class EntitySLAAlertBean {
+    @NotNull
+    @GeneratedValue(strategy = GenerationType.AUTO)
+    @Id
+    private String id;
+
+    @Basic
+    @NotNull
+    @Column(name = "entity_name")
+    private String entityName;
+
+    @Basic
+    @NotNull
+    @Column(name = "cluster_name")
+    private String clusterName;
+
+    public String getEntityType() {
+        return entityType;
+    }
+
+    public void setEntityType(String entityType) throws FalconException {
+        checkEntityType(entityType);
+        this.entityType = entityType;
+    }
+
+    @Basic
+    @NotNull
+    @Column(name = "entity_type")
+    private String entityType;
+
+    @Basic
+    @NotNull
+    @Column(name = "nominal_time")
+    private Date nominalTime;
+
+    @Basic
+    @Column(name = "sla_low_missed")
+    private Boolean isSLALowMissed = false;
+
+    @Basic
+    @Column(name = "sla_high_missed")
+    private Boolean isSLAHighMissed = false;
+
+    @Basic
+    @Column(name = "sla_low_alert_sent")
+    private Boolean slaLowAlertSent;
+
+
+    @Basic
+    @Column(name = "sla_high_alert_sent")
+    private Boolean slaHighAlertSent;
+
+    public Date getNominalTime() {
+        return new Date(nominalTime.getTime());
+    }
+
+    public void setNominalTime(Date nominalTime) {
+        this.nominalTime = new Date(nominalTime.getTime());
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    public String getClusterName() {
+        return clusterName;
+    }
+
+    public void setClusterName(String clusterName) {
+        this.clusterName = clusterName;
+    }
+
+    public String getEntityName() {
+        return entityName;
+    }
+
+    public void setEntityName(String entityName) {
+        this.entityName = entityName;
+    }
+
+    public Boolean getIsSLALowMissed() {
+        return isSLALowMissed;
+    }
+
+    public void setIsSLALowMissed(Boolean isSLALowMissed) {
+        this.isSLALowMissed = isSLALowMissed;
+    }
+
+    public Boolean getIsSLAHighMissed() {
+        return isSLAHighMissed;
+    }
+
+    public void setIsSLAHighMissed(Boolean isSLAHighMissed) {
+        this.isSLAHighMissed = isSLAHighMissed;
+    }
+
+    public static final String ENTITYNAME = "entityName";
+
+    public static final String CLUSTERNAME = "clusterName";
+
+    public static final String ENTITYTYPE = "entityType";
+
+    public static final String NOMINALTIME = "nominalTime";
+
+    void checkEntityType(String entityType)throws FalconException{
+        if (entityType.equals(EntityType.PROCESS.toString()) || entityType.equals(EntityType.FEED.toString())){
+            return;
+        } else {
+            throw new FalconException("EntityType"+ entityType
+                    + " is not valid,Feed and Process are the valid input type.");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/common/src/main/java/org/apache/falcon/persistence/FeedSLAAlertBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/FeedSLAAlertBean.java b/common/src/main/java/org/apache/falcon/persistence/FeedSLAAlertBean.java
deleted file mode 100644
index 4ea3454..0000000
--- a/common/src/main/java/org/apache/falcon/persistence/FeedSLAAlertBean.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.persistence;
-
-import java.util.Date;
-
-import javax.persistence.Basic;
-import javax.persistence.Column;
-import javax.persistence.Entity;
-import javax.persistence.GeneratedValue;
-import javax.persistence.GenerationType;
-import javax.persistence.Id;
-import javax.persistence.NamedQueries;
-import javax.persistence.NamedQuery;
-import javax.persistence.Table;
-import javax.validation.constraints.NotNull;
-
-//SUSPEND CHECKSTYLE CHECK LineLengthCheck
-/**
- * Feed SLA monitoring.
- * */
-@Entity
-@NamedQueries({
-@NamedQuery(name = PersistenceConstants.GET_FEED_ALERTS, query = "select OBJECT(a) from FeedSLAAlertBean a where a.feedName = :feedName"),
-@NamedQuery(name = PersistenceConstants.GET_ALL_FEED_ALERTS, query = "OBJECT(a) from PendingInstanceBean a "),
-@NamedQuery(name = PersistenceConstants.GET_SLA_HIGH_CANDIDATES, query = "select OBJECT(a) from FeedSLAAlertBean a where a.isSLALowMissed = true and a.isSLAHighMissed = false "),
-    @NamedQuery(name = PersistenceConstants.UPDATE_SLA_HIGH, query = "update FeedSLAAlertBean a set a.isSLAHighMissed = true where a.feedName = :feedName and a.clusterName = :clusterName and a.nominalTime = :nominalTime"),
-@NamedQuery(name = PersistenceConstants.GET_FEED_ALERT_INSTANCE, query = "select OBJECT(a) from FeedSLAAlertBean a where a.feedName = :feedName and a.clusterName = :clusterName and a.nominalTime = :nominalTime "),
- @NamedQuery(name = PersistenceConstants.DELETE_FEED_ALERT_INSTANCE, query = "delete from FeedSLAAlertBean a where a.feedName = :feedName and a.clusterName = :clusterName and a.nominalTime = :nominalTime")
-})
-@Table(name = "FEED_SLA_ALERTS")
-//RESUME CHECKSTYLE CHECK  LineLengthCheck
-public class FeedSLAAlertBean {
-    @NotNull
-    @GeneratedValue(strategy = GenerationType.AUTO)
-    @Id
-    private String id;
-
-    @Basic
-    @NotNull
-    @Column(name = "feed_name")
-    private String feedName;
-
-    @Basic
-    @NotNull
-    @Column(name = "cluster_name")
-    private String clusterName;
-
-    @Basic
-    @NotNull
-    @Column(name = "nominal_time")
-    private Date nominalTime;
-
-    @Basic
-    @Column(name = "sla_low_missed")
-    private Boolean isSLALowMissed = false;
-
-    @Basic
-    @Column(name = "sla_high_missed")
-    private Boolean isSLAHighMissed = false;
-
-    @Basic
-    @Column(name = "sla_low_alert_sent")
-    private Boolean slaLowAlertSent;
-
-
-    @Basic
-    @Column(name = "sla_high_alert_sent")
-    private Boolean slaHighAlertSent;
-
-    public Date getNominalTime() {
-        return new Date(nominalTime.getTime());
-    }
-
-    public void setNominalTime(Date nominalTime) {
-        this.nominalTime = new Date(nominalTime.getTime());
-    }
-
-    public String getId() {
-        return id;
-    }
-
-    public void setId(String id) {
-        this.id = id;
-    }
-
-    public String getClusterName() {
-        return clusterName;
-    }
-
-    public void setClusterName(String clusterName) {
-        this.clusterName = clusterName;
-    }
-
-    public String getFeedName() {
-        return feedName;
-    }
-
-    public void setFeedName(String feedName) {
-        this.feedName = feedName;
-    }
-
-    public Boolean getIsSLALowMissed() {
-        return isSLALowMissed;
-    }
-
-    public void setIsSLALowMissed(Boolean isSLALowMissed) {
-        this.isSLALowMissed = isSLALowMissed;
-    }
-
-    public Boolean getIsSLAHighMissed() {
-        return isSLAHighMissed;
-    }
-
-    public void setIsSLAHighMissed(Boolean isSLAHighMissed) {
-        this.isSLAHighMissed = isSLAHighMissed;
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/common/src/main/java/org/apache/falcon/persistence/MonitoredEntityBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/MonitoredEntityBean.java b/common/src/main/java/org/apache/falcon/persistence/MonitoredEntityBean.java
new file mode 100644
index 0000000..20ce537
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/persistence/MonitoredEntityBean.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.persistence;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.v0.EntityType;
+
+import javax.persistence.Entity;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
+import javax.persistence.Table;
+import javax.persistence.GeneratedValue;
+import javax.persistence.GenerationType;
+import javax.persistence.Id;
+import javax.persistence.Column;
+import javax.persistence.Basic;
+import javax.validation.constraints.NotNull;
+
+//SUSPEND CHECKSTYLE CHECK LineLengthCheck
+/**
+* The Feeds that are to be monitered will be stored in the db.
+* */
+
+@Entity
+@NamedQueries({
+        @NamedQuery(name = PersistenceConstants.GET_MONITERED_INSTANCE, query = "select OBJECT(a) from "
+                + "MonitoredEntityBean a where a.entityName = :entityName and a.entityType = :entityType"),
+        @NamedQuery(name = PersistenceConstants.DELETE_MONITORED_INSTANCES, query = "delete from MonitoredEntityBean "
+                + "a where a.entityName = :entityName and a.entityType = :entityType"),
+        @NamedQuery(name = PersistenceConstants.GET_ALL_MONITORING_FEEDS, query = "select OBJECT(a) "
+                + "from MonitoredEntityBean a")
+})
+@Table(name="MONITORED_ENTITY")
+//RESUME CHECKSTYLE CHECK  LineLengthCheck
+public class MonitoredEntityBean {
+    @NotNull
+    @GeneratedValue(strategy = GenerationType.AUTO)
+    @Id
+    private String id;
+
+    @Basic
+    @NotNull
+    @Column(name = "entity_name")
+    private String entityName;
+
+    public String getEntityType() {
+        return entityType;
+    }
+
+    public void setEntityType(String entityType) throws FalconException {
+        checkEntityType(entityType);
+        this.entityType = entityType;
+    }
+
+    @Basic
+    @NotNull
+    @Column(name = "entity_type")
+    private String entityType;
+
+    public String getFeedName() {
+        return entityName;
+    }
+
+    public void setEntityName(String feedName) {
+        this.entityName = feedName;
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    public static final String ENTITYNAME = "entityName";
+
+    public static final String ENTITYTYPE = "entityType";
+
+    void checkEntityType(String entityType)throws FalconException {
+        if (entityType.equals(EntityType.PROCESS.toString()) || entityType.equals(EntityType.FEED.toString())){
+            return;
+        } else {
+            throw new FalconException("EntityType"+ entityType
+                    + " is not valid,Feed and Process are the valid input type.");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/common/src/main/java/org/apache/falcon/persistence/MonitoredFeedsBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/MonitoredFeedsBean.java b/common/src/main/java/org/apache/falcon/persistence/MonitoredFeedsBean.java
deleted file mode 100644
index 2b48569..0000000
--- a/common/src/main/java/org/apache/falcon/persistence/MonitoredFeedsBean.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.falcon.persistence;
-
-import javax.persistence.Entity;
-import javax.persistence.NamedQueries;
-import javax.persistence.NamedQuery;
-import javax.persistence.Table;
-import javax.persistence.GeneratedValue;
-import javax.persistence.GenerationType;
-import javax.persistence.Id;
-import javax.persistence.Column;
-import javax.persistence.Basic;
-import javax.validation.constraints.NotNull;
-
-//SUSPEND CHECKSTYLE CHECK LineLengthCheck
-/**
-* The Feeds that are to be monitered will be stored in the db.
-* */
-
-@Entity
-@NamedQueries({
-        @NamedQuery(name = PersistenceConstants.GET_MONITERED_INSTANCE, query = "select OBJECT(a) from "
-                + "MonitoredFeedsBean a where a.feedName = :feedName"),
-        @NamedQuery(name = PersistenceConstants.DELETE_MONITORED_INSTANCES, query = "delete from MonitoredFeedsBean "
-                + "a where a.feedName = :feedName"),
-        @NamedQuery(name = PersistenceConstants.GET_ALL_MONITORING_FEEDS, query = "select OBJECT(a) "
-                + "from MonitoredFeedsBean a")
-})
-@Table(name="MONITORED_FEEDS")
-//RESUME CHECKSTYLE CHECK  LineLengthCheck
-public class MonitoredFeedsBean {
-    @NotNull
-    @GeneratedValue(strategy = GenerationType.AUTO)
-    @Id
-    private String id;
-
-    @Basic
-    @NotNull
-    @Column(name = "feed_name")
-    private String feedName;
-
-    public String getFeedName() {
-        return feedName;
-    }
-
-    public void setFeedName(String feedName) {
-        this.feedName = feedName;
-    }
-
-    public String getId() {
-        return id;
-    }
-
-    public void setId(String id) {
-        this.id = id;
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java b/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java
index 41eb048..863abdc 100644
--- a/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java
+++ b/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java
@@ -17,6 +17,9 @@
  */
 package org.apache.falcon.persistence;
 
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.v0.EntityType;
+
 import javax.persistence.Entity;
 import javax.persistence.NamedQueries;
 import javax.persistence.NamedQuery;
@@ -35,13 +38,13 @@ import java.util.Date;
 * */
 @Entity
 @NamedQueries({
-    @NamedQuery(name = PersistenceConstants.GET_LATEST_INSTANCE_TIME, query = "select max(a.nominalTime) from PendingInstanceBean a where a.feedName = :feedName"),
-    @NamedQuery(name = PersistenceConstants.GET_PENDING_INSTANCES, query = "select OBJECT(a) from PendingInstanceBean a where a.feedName = :feedName"),
-    @NamedQuery(name = PersistenceConstants.DELETE_PENDING_NOMINAL_INSTANCES , query = "delete from PendingInstanceBean a where a.feedName = :feedName and a.clusterName = :clusterName and a.nominalTime = :nominalTime"),
-    @NamedQuery(name = PersistenceConstants.DELETE_ALL_INSTANCES_FOR_FEED, query = "delete from PendingInstanceBean a where a.feedName = :feedName and a.clusterName = :clusterName"),
-    @NamedQuery(name = PersistenceConstants.GET_DATE_FOR_PENDING_INSTANCES , query = "select a.nominalTime from PendingInstanceBean a where a.feedName = :feedName and a.clusterName = :clusterName"),
+    @NamedQuery(name = PersistenceConstants.GET_LATEST_INSTANCE_TIME, query = "select max(a.nominalTime) from PendingInstanceBean a where a.entityName = :entityName and a.entityType = :entityType"),
+    @NamedQuery(name = PersistenceConstants.GET_PENDING_INSTANCES, query = "select OBJECT(a) from PendingInstanceBean a where a.entityName = :entityName and a.entityType = :entityType"),
+    @NamedQuery(name = PersistenceConstants.DELETE_PENDING_NOMINAL_INSTANCES , query = "delete from PendingInstanceBean a where a.entityName = :entityName and a.clusterName = :clusterName and a.nominalTime = :nominalTime and a.entityType = :entityType"),
+    @NamedQuery(name = PersistenceConstants.DELETE_ALL_INSTANCES_FOR_ENTITY, query = "delete from PendingInstanceBean a where a.entityName = :entityName and a.clusterName = :clusterName and a.entityType = :entityType"),
+    @NamedQuery(name = PersistenceConstants.GET_DATE_FOR_PENDING_INSTANCES , query = "select a.nominalTime from PendingInstanceBean a where a.entityName = :entityName and a.clusterName = :clusterName and a.entityType = :entityType"),
     @NamedQuery(name= PersistenceConstants.GET_ALL_PENDING_INSTANCES , query = "select  OBJECT(a) from PendingInstanceBean a "),
-    @NamedQuery(name= PersistenceConstants.GET_PENDING_INSTANCE , query = "select  OBJECT(a) from PendingInstanceBean a  where a.feedName = :feedName and a.clusterName = :clusterName and a.nominalTime = :nominalTime")
+    @NamedQuery(name= PersistenceConstants.GET_PENDING_INSTANCE , query = "select  OBJECT(a) from PendingInstanceBean a  where a.entityName = :entityName and a.clusterName = :clusterName and a.nominalTime = :nominalTime and a.entityType = :entityType")
 })
 @Table(name = "PENDING_INSTANCES")
 //RESUME CHECKSTYLE CHECK  LineLengthCheck
@@ -53,8 +56,8 @@ public class PendingInstanceBean {
 
     @Basic
     @NotNull
-    @Column(name = "feed_name")
-    private String feedName;
+    @Column(name = "entity_name")
+    private String entityName;
 
     @Basic
     @NotNull
@@ -66,6 +69,20 @@ public class PendingInstanceBean {
     @Column(name = "nominal_time")
     private Date nominalTime;
 
+    public String getEntityType() {
+        return entityType;
+    }
+
+    public void setEntityType(String entityType) throws FalconException {
+        checkEntityType(entityType);
+        this.entityType = entityType;
+    }
+
+    @Basic
+    @NotNull
+    @Column(name = "entity_type")
+    private String entityType;
+
     public Date getNominalTime() {
         return nominalTime;
     }
@@ -90,11 +107,28 @@ public class PendingInstanceBean {
         this.clusterName = clusterName;
     }
 
-    public String getFeedName() {
-        return feedName;
+    public String getEntityName() {
+        return entityName;
+    }
+
+    public void setEntityName(String entityName) {
+        this.entityName = entityName;
     }
 
-    public void setFeedName(String feedName) {
-        this.feedName = feedName;
+    public static final String ENTITYNAME = "entityName";
+
+    public static final String CLUSTERNAME = "clusterName";
+
+    public static final String NOMINALTIME = "nominalTime";
+
+    public static final String ENTITYTYPE = "entityType";
+
+    void checkEntityType(String entityType)throws FalconException {
+        if (entityType.equals(EntityType.PROCESS.toString()) || entityType.equals(EntityType.FEED.toString())){
+            return;
+        } else {
+            throw new FalconException("EntityType"+ entityType
+                    + " is not valid,Feed and Process are the valid input type.");
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
index 72c382e..f9aa1f5 100644
--- a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
+++ b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
@@ -30,7 +30,7 @@ public final class PersistenceConstants {
     public static final String GET_PENDING_INSTANCES = "GET_PENDING_INSTANCES";
     public static final String GET_PENDING_INSTANCE = "GET_PENDING_INSTANCE";
     public static final String DELETE_PENDING_NOMINAL_INSTANCES = "DELETE_PENDING_NOMINAL_INSTANCES";
-    public static final String DELETE_ALL_INSTANCES_FOR_FEED = "DELETE_ALL_INSTANCES_FOR_FEED";
+    public static final String DELETE_ALL_INSTANCES_FOR_ENTITY = "DELETE_ALL_INSTANCES_FOR_ENTITY";
     public static final String GET_DATE_FOR_PENDING_INSTANCES = "GET_DATE_FOR_PENDING_INSTANCES";
     public static final String GET_ALL_PENDING_INSTANCES = "GET_ALL_PENDING_INSTANCES";
     public static final String GET_ENTITY = "GET_ENTITY";
@@ -55,10 +55,10 @@ public final class PersistenceConstants {
     public static final String DELETE_INSTANCES_TABLE = "DELETE_INSTANCES_TABLE";
     public static final String GET_INSTANCE_SUMMARY_BY_STATE_WITH_RANGE = "GET_INSTANCE_SUMMARY_BY_STATE_WITH_RANGE";
     public static final String GET_LATEST_INSTANCE_TIME = "GET_LATEST_INSTANCE_TIME";
-    public static final String GET_FEED_ALERTS = "GET_FEED_ALERTS";
-    public static final String GET_ALL_FEED_ALERTS = "GET_ALL_FEED_ALERTS";
+    public static final String GET_ENTITY_ALERTS = "GET_ENTITY_ALERTS";
+    public static final String GET_ALL_ENTITY_ALERTS = "GET_ALL_ENTITY_ALERTS";
     public static final String GET_SLA_HIGH_CANDIDATES = "GET_SLA_HIGH_CANDIDATES";
     public static final String UPDATE_SLA_HIGH = "UPDATE_SLA_HIGH";
-    public static final String GET_FEED_ALERT_INSTANCE = "GET_FEED_ALERT_INSTANCE";
-    public static final String DELETE_FEED_ALERT_INSTANCE = "DELETE_FEED_ALERT_INSTANCE";
+    public static final String GET_ENTITY_ALERT_INSTANCE = "GET_ENTITY_ALERT_INSTANCE";
+    public static final String DELETE_ENTITY_ALERT_INSTANCE = "DELETE_ENTITY_ALERT_INSTANCE";
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java b/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java
index 1bdfc25..102b986 100644
--- a/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java
+++ b/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java
@@ -244,8 +244,8 @@ public class FalconStateStoreDBCLI {
         args.add("org.apache.falcon.persistence.EntityBean");
         args.add("org.apache.falcon.persistence.InstanceBean");
         args.add("org.apache.falcon.persistence.PendingInstanceBean");
-        args.add("org.apache.falcon.persistence.MonitoredFeedsBean");
-        args.add("org.apache.falcon.persistence.FeedSLAAlertBean");
+        args.add("org.apache.falcon.persistence.MonitoredEntityBean");
+        args.add("org.apache.falcon.persistence.EntitySLAAlertBean");
         return args.toArray(new String[args.size()]);
     }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/common/src/main/resources/META-INF/persistence.xml
----------------------------------------------------------------------
diff --git a/common/src/main/resources/META-INF/persistence.xml b/common/src/main/resources/META-INF/persistence.xml
index c9b444d..ac2f397 100644
--- a/common/src/main/resources/META-INF/persistence.xml
+++ b/common/src/main/resources/META-INF/persistence.xml
@@ -26,8 +26,8 @@
         <class>org.apache.falcon.persistence.EntityBean</class>
         <class>org.apache.falcon.persistence.InstanceBean</class>
         <class>org.apache.falcon.persistence.PendingInstanceBean</class>
-        <class>org.apache.falcon.persistence.MonitoredFeedsBean</class>
-        <class>org.apache.falcon.persistence.FeedSLAAlertBean</class>
+        <class>org.apache.falcon.persistence.MonitoredEntityBean</class>
+        <class>org.apache.falcon.persistence.EntitySLAAlertBean</class>
 
         <properties>
             <property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp.BasicDataSource"/>
@@ -37,7 +37,7 @@
             <property name="openjpa.MetaDataFactory"
                       value="jpa(Types=org.apache.falcon.persistence.EntityBean;
                 org.apache.falcon.persistence.InstanceBean;org.apache.falcon.persistence.PendingInstanceBean;
-                org.apache.falcon.persistence.MonitoredFeedsBean;org.apache.falcon.persistence.FeedSLAAlertBean)"></property>
+                org.apache.falcon.persistence.MonitoredEntityBean;org.apache.falcon.persistence.EntitySLAAlertBean)"></property>
 
             <property name="openjpa.DetachState" value="fetch-groups(DetachedStateField=true)"/>
             <property name="openjpa.LockManager" value="pessimistic"/>
@@ -58,8 +58,8 @@
         <class>org.apache.falcon.persistence.EntityBean</class>
         <class>org.apache.falcon.persistence.InstanceBean</class>
         <class>org.apache.falcon.persistence.PendingInstanceBean</class>
-        <class>org.apache.falcon.persistence.MonitoredFeedsBean</class>
-        <class>org.apache.falcon.persistence.FeedSLAAlertBean</class>
+        <class>org.apache.falcon.persistence.MonitoredEntityBean</class>
+        <class>org.apache.falcon.persistence.EntitySLAAlertBean</class>
 
         <properties>
             <property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp.BasicDataSource"/>
@@ -69,7 +69,7 @@
             <property name="openjpa.MetaDataFactory"
                       value="jpa(Types=org.apache.falcon.persistence.EntityBean;
                 org.apache.falcon.persistence.InstanceBean;org.apache.falcon.persistence.PendingInstanceBean;
-                org.apache.falcon.persistence.MonitoredFeedsBean;org.apache.falcon.persistence.FeedSLAAlertBean)"></property>
+                org.apache.falcon.persistence.MonitoredEntityBean;org.apache.falcon.persistence.EntitySLAAlertBean)"></property>
 
             <property name="openjpa.DetachState" value="fetch-groups(DetachedStateField=true)"/>
             <property name="openjpa.LockManager" value="pessimistic"/>
@@ -88,9 +88,9 @@
 
         <class>org.apache.falcon.persistence.EntityBean</class>
         <class>org.apache.falcon.persistence.InstanceBean</class>
-        <class>org.apache.falcon.persistence.MonitoredFeedsBean</class>
+        <class>org.apache.falcon.persistence.MonitoredEntityBean</class>
         <class>org.apache.falcon.persistence.PendingInstanceBean</class>
-        <class>org.apache.falcon.persistence.FeedSLAAlertBean</class>
+        <class>org.apache.falcon.persistence.EntitySLAAlertBean</class>
 
         <properties>
             <property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp.BasicDataSource"/>
@@ -100,7 +100,7 @@
             <property name="openjpa.MetaDataFactory"
                       value="jpa(Types=org.apache.falcon.persistence.EntityBean;
                 org.apache.falcon.persistence.InstanceBean;org.apache.falcon.persistence.PendingInstanceBean;
-                org.apache.falcon.persistence.MonitoredFeedsBean;org.apache.falcon.persistence.FeedSLAAlertBean)"></property>
+                org.apache.falcon.persistence.MonitoredEntityBean;org.apache.falcon.persistence.EntitySLAAlertBean)"></property>
 
             <property name="openjpa.DetachState" value="fetch-groups(DetachedStateField=true)"/>
             <property name="openjpa.LockManager" value="pessimistic"/>

http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/common/src/main/resources/startup.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties
index 374ff17..de24621 100644
--- a/common/src/main/resources/startup.properties
+++ b/common/src/main/resources/startup.properties
@@ -35,7 +35,7 @@
                         org.apache.falcon.workflow.WorkflowJobEndNotificationService, \
                         org.apache.falcon.service.ProcessSubscriberService,\
                         org.apache.falcon.extensions.ExtensionService,\
-                        org.apache.falcon.service.FeedSLAMonitoringService,\
+                        org.apache.falcon.service.EntitySLAMonitoringService,\
                         org.apache.falcon.service.LifecyclePolicyMap,\
                         org.apache.falcon.entity.store.ConfigurationStore,\
                         org.apache.falcon.rerun.service.RetryService,\
@@ -67,7 +67,7 @@
                         org.apache.falcon.entity.ColoClusterRelation,\
                         org.apache.falcon.group.FeedGroupMap,\
                         org.apache.falcon.entity.store.FeedLocationStore,\
-                        org.apache.falcon.service.FeedSLAMonitoringService,\
+                        org.apache.falcon.service.EntitySLAMonitoringService,\
                         org.apache.falcon.service.SharedLibraryHostingService
 ## If you wish to use Falcon native scheduler, add the State store as a configstore listener. ##
 #                       org.apache.falcon.state.store.jdbc.JdbcStateStore

http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java b/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java
index 3817056..afd9307 100644
--- a/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java
+++ b/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java
@@ -79,7 +79,7 @@ public class AbstractTestBase {
         cleanupStore();
         String listeners = StartupProperties.get().getProperty("configstore.listeners");
         listeners = listeners.replace("org.apache.falcon.service.SharedLibraryHostingService", "");
-        listeners = listeners.replace("org.apache.falcon.service.FeedSLAMonitoringService", "");
+        listeners = listeners.replace("org.apache.falcon.service.EntitySLAMonitoringService", "");
         StartupProperties.get().setProperty("configstore.listeners", listeners);
         store = ConfigurationStore.get();
         store.init();

http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/common/src/test/java/org/apache/falcon/entity/store/FeedLocationStoreTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/store/FeedLocationStoreTest.java b/common/src/test/java/org/apache/falcon/entity/store/FeedLocationStoreTest.java
index 40c077e..d13769e 100644
--- a/common/src/test/java/org/apache/falcon/entity/store/FeedLocationStoreTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/store/FeedLocationStoreTest.java
@@ -63,7 +63,7 @@ public class FeedLocationStoreTest extends AbstractTestBase {
         cleanupStore();
         String listeners = StartupProperties.get().getProperty("configstore.listeners");
         listeners = listeners.replace("org.apache.falcon.service.SharedLibraryHostingService", "");
-        listeners = listeners.replace("org.apache.falcon.service.FeedSLAMonitoringService", "");
+        listeners = listeners.replace("org.apache.falcon.service.EntitySLAMonitoringService", "");
         StartupProperties.get().setProperty("configstore.listeners", listeners);
         store = ConfigurationStore.get();
         store.init();

http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/docs/src/site/twiki/FalconNativeScheduler.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/FalconNativeScheduler.twiki b/docs/src/site/twiki/FalconNativeScheduler.twiki
index 1f51739..b15fd5b 100644
--- a/docs/src/site/twiki/FalconNativeScheduler.twiki
+++ b/docs/src/site/twiki/FalconNativeScheduler.twiki
@@ -27,7 +27,7 @@ You can enable native scheduler by making changes to __$FALCON_HOME/conf/startup
 *.application.services=org.apache.falcon.security.AuthenticationInitializationService,\
                         org.apache.falcon.workflow.WorkflowJobEndNotificationService, \
                         org.apache.falcon.service.ProcessSubscriberService,\
-                        org.apache.falcon.service.FeedSLAMonitoringService,\
+                        org.apache.falcon.service.EntitySLAMonitoringService,\
                         org.apache.falcon.service.LifecyclePolicyMap,\
                         org.apache.falcon.service.FalconJPAService,\
                         org.apache.falcon.entity.store.ConfigurationStore,\

http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/docs/src/site/twiki/FeedSLAMonitoring.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/FeedSLAMonitoring.twiki b/docs/src/site/twiki/FeedSLAMonitoring.twiki
index 88132ce..469c0aa 100644
--- a/docs/src/site/twiki/FeedSLAMonitoring.twiki
+++ b/docs/src/site/twiki/FeedSLAMonitoring.twiki
@@ -6,7 +6,7 @@ Feed SLA monitoring service requires FalconJPAService to be up.Following are the
 In startup.properties :
 
 *.application.services= org.apache.falcon.state.store.service.FalconJPAService,
-                        org.apache.falcon.service.FeedSLAMonitoringService
+                        org.apache.falcon.service.EntitySLAMonitoringService
 
 These properties are required for FalconJPAService in statestore.properties:
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/prism/src/main/java/org/apache/falcon/handler/SLAMonitoringHandler.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/handler/SLAMonitoringHandler.java b/prism/src/main/java/org/apache/falcon/handler/SLAMonitoringHandler.java
index df2a1e0..56376fc 100644
--- a/prism/src/main/java/org/apache/falcon/handler/SLAMonitoringHandler.java
+++ b/prism/src/main/java/org/apache/falcon/handler/SLAMonitoringHandler.java
@@ -26,7 +26,7 @@ import org.apache.falcon.entity.Storage;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.feed.LocationType;
-import org.apache.falcon.service.FeedSLAMonitoringService;
+import org.apache.falcon.service.EntitySLAMonitoringService;
 import org.apache.falcon.workflow.WorkflowExecutionContext;
 import org.apache.falcon.workflow.WorkflowExecutionListener;
 import org.apache.hadoop.fs.Path;
@@ -45,8 +45,14 @@ public class SLAMonitoringHandler implements WorkflowExecutionListener {
     @Override
     public void onSuccess(WorkflowExecutionContext context) throws FalconException {
         if (context.hasWorkflowSucceeded()) {
-            updateSLAMonitoring(context.getClusterName(), context.getOutputFeedNamesList(),
+            if (context.getEntityType().toString().equals(EntityType.FEED.name())){
+                updateSLAMonitoring(context.getClusterName(), context.getOutputFeedNamesList(),
                     context.getOutputFeedInstancePathsList());
+            }
+            if (context.getEntityType().toString().equals(EntityType.PROCESS.name())){
+                EntitySLAMonitoringService.get().makeProcessInstanceAvailable(context.getClusterName(),
+                        context.getEntityName(), context.getNominalTimeAsISO8601(), context.getEntityType());
+            }
         }
     }
 
@@ -60,7 +66,8 @@ public class SLAMonitoringHandler implements WorkflowExecutionListener {
                 String templatePath = new Path(storage.getUriTemplate(LocationType.DATA)).toUri().getPath();
                 Date date = FeedHelper.getDate(templatePath, new Path(outputFeedInstancePathsList[index]),
                     EntityUtil.getTimeZone(feed));
-                FeedSLAMonitoringService.get().makeFeedInstanceAvailable(outputFeedNamesList[index], clusterName, date);
+                EntitySLAMonitoringService.get().makeFeedInstanceAvailable(outputFeedNamesList[index],
+                        clusterName, date);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java b/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java
index 4fd1b53..c1f818a 100644
--- a/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java
+++ b/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java
@@ -18,11 +18,13 @@
 package org.apache.falcon.jdbc;
 
 import org.apache.commons.collections.CollectionUtils;
-import org.apache.falcon.persistence.MonitoredFeedsBean;
-import org.apache.falcon.persistence.FeedSLAAlertBean;
-import org.apache.falcon.persistence.PersistenceConstants;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.persistence.MonitoredEntityBean;
 import org.apache.falcon.persistence.PendingInstanceBean;
+import org.apache.falcon.persistence.PersistenceConstants;
 import org.apache.falcon.persistence.ResultNotFoundException;
+import org.apache.falcon.persistence.EntitySLAAlertBean;
 import org.apache.falcon.service.FalconJPAService;
 
 import javax.persistence.EntityManager;
@@ -32,7 +34,7 @@ import java.util.Date;
 import java.util.List;
 
 /**
-* StateStore for MonitoringFeeds and PendingFeedInstances.
+* StateStore for MonitoringEntity and PendingEntityInstances.
 */
 
 public class MonitoringJdbcStateStore {
@@ -42,23 +44,25 @@ public class MonitoringJdbcStateStore {
     }
 
 
-    public void putMonitoredFeed(String feedName){
+    public void putMonitoredEntity(String entityName, String entityType) throws FalconException{
 
-        MonitoredFeedsBean monitoredFeedsBean = new MonitoredFeedsBean();
-        monitoredFeedsBean.setFeedName(feedName);
+        MonitoredEntityBean monitoredEntityBean = new MonitoredEntityBean();
+        monitoredEntityBean.setEntityName(entityName);
+        monitoredEntityBean.setEntityType(entityType);
         EntityManager entityManager = getEntityManager();
         try {
             beginTransaction(entityManager);
-            entityManager.persist(monitoredFeedsBean);
+            entityManager.persist(monitoredEntityBean);
         } finally {
             commitAndCloseTransaction(entityManager);
         }
     }
 
-    public MonitoredFeedsBean getMonitoredFeed(String feedName){
+    public MonitoredEntityBean getMonitoredEntity(String entityName, String entityType){
         EntityManager entityManager = getEntityManager();
         Query q = entityManager.createNamedQuery(PersistenceConstants.GET_MONITERED_INSTANCE);
-        q.setParameter("feedName", feedName);
+        q.setParameter(MonitoredEntityBean.ENTITYNAME, entityName);
+        q.setParameter(MonitoredEntityBean.ENTITYTYPE, entityType);
         List result = q.getResultList();
         try {
             if (result.isEmpty()) {
@@ -67,14 +71,15 @@ public class MonitoringJdbcStateStore {
         } finally {
             entityManager.close();
         }
-        return ((MonitoredFeedsBean)result.get(0));
+        return ((MonitoredEntityBean)result.get(0));
     }
 
-    public void deleteMonitoringFeed(String feedName) {
+    public void deleteMonitoringEntity(String entityName, String entityType) {
         EntityManager entityManager = getEntityManager();
         beginTransaction(entityManager);
         Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_MONITORED_INSTANCES);
-        q.setParameter("feedName", feedName);
+        q.setParameter(MonitoredEntityBean.ENTITYNAME, entityName);
+        q.setParameter(MonitoredEntityBean.ENTITYTYPE, entityType);
         try{
             q.executeUpdate();
         } finally {
@@ -82,7 +87,7 @@ public class MonitoringJdbcStateStore {
         }
     }
 
-    public List<MonitoredFeedsBean> getAllMonitoredFeed() throws ResultNotFoundException {
+    public List<MonitoredEntityBean> getAllMonitoredFeed() throws ResultNotFoundException {
         EntityManager entityManager = getEntityManager();
         Query q = entityManager.createNamedQuery(PersistenceConstants.GET_ALL_MONITORING_FEEDS);
         List result = q.getResultList();
@@ -90,22 +95,24 @@ public class MonitoringJdbcStateStore {
         return result;
     }
 
-    public Date getLastInstanceTime(String feedName) throws ResultNotFoundException {
+    public Date getLastInstanceTime(String entityName , String entityType) throws ResultNotFoundException {
         EntityManager entityManager = getEntityManager();
         Query q = entityManager.createNamedQuery(PersistenceConstants.GET_LATEST_INSTANCE_TIME, Date.class);
-        q.setParameter("feedName", feedName);
+        q.setParameter(PendingInstanceBean.ENTITYNAME, entityName);
+        q.setParameter(PendingInstanceBean.ENTITYTYPE, entityType);
         Date result = (Date)q.getSingleResult();
         entityManager.close();
         return result;
     }
 
-    public void deletePendingInstance(String feedName, String clusterName , Date nominalTime){
+    public void deletePendingInstance(String entityName, String clusterName , Date nominalTime, String entityType){
         EntityManager entityManager = getEntityManager();
         beginTransaction(entityManager);
         Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_PENDING_NOMINAL_INSTANCES);
-        q.setParameter("feedName", feedName);
-        q.setParameter("clusterName", clusterName);
-        q.setParameter("nominalTime", nominalTime);
+        q.setParameter(PendingInstanceBean.ENTITYNAME, entityName);
+        q.setParameter(PendingInstanceBean.CLUSTERNAME, clusterName);
+        q.setParameter(PendingInstanceBean.NOMINALTIME, nominalTime);
+        q.setParameter(PendingInstanceBean.ENTITYTYPE, entityType);
         try{
             q.executeUpdate();
         } finally {
@@ -113,12 +120,13 @@ public class MonitoringJdbcStateStore {
         }
     }
 
-    public void deletePendingInstances(String feedName, String clusterName){
+    public void deletePendingInstances(String entityName, String clusterName, String entityType){
         EntityManager entityManager = getEntityManager();
         beginTransaction(entityManager);
-        Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_ALL_INSTANCES_FOR_FEED);
-        q.setParameter("feedName", feedName);
-        q.setParameter("clusterName", clusterName);
+        Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_ALL_INSTANCES_FOR_ENTITY);
+        q.setParameter(PendingInstanceBean.ENTITYNAME, entityName);
+        q.setParameter(PendingInstanceBean.CLUSTERNAME, clusterName);
+        q.setParameter(PendingInstanceBean.ENTITYTYPE, entityType);
         try{
             q.executeUpdate();
         } finally {
@@ -126,23 +134,26 @@ public class MonitoringJdbcStateStore {
         }
     }
 
-    public void putPendingInstances(String feed, String clusterName, Date nominalTime){
+    public void putPendingInstances(String entity, String clusterName, Date nominalTime, String entityType)
+        throws FalconException{
         EntityManager entityManager = getEntityManager();
         PendingInstanceBean pendingInstanceBean = new PendingInstanceBean();
-        pendingInstanceBean.setFeedName(feed);
+        pendingInstanceBean.setEntityName(entity);
         pendingInstanceBean.setClusterName(clusterName);
         pendingInstanceBean.setNominalTime(nominalTime);
+        pendingInstanceBean.setEntityType(entityType);
 
         beginTransaction(entityManager);
         entityManager.persist(pendingInstanceBean);
         commitAndCloseTransaction(entityManager);
     }
 
-    public List<Date> getNominalInstances(String feedName, String clusterName) {
+    public List<Date> getNominalInstances(String entityName, String clusterName, String entityType) {
         EntityManager entityManager = getEntityManager();
         Query q = entityManager.createNamedQuery(PersistenceConstants.GET_DATE_FOR_PENDING_INSTANCES);
-        q.setParameter("feedName", feedName);
-        q.setParameter("clusterName", clusterName);
+        q.setParameter(PendingInstanceBean.ENTITYNAME, entityName);
+        q.setParameter(PendingInstanceBean.CLUSTERNAME, clusterName);
+        q.setParameter(PendingInstanceBean.ENTITYTYPE, entityType);
         List result = q.getResultList();
         entityManager.close();
         return result;
@@ -168,15 +179,17 @@ public class MonitoringJdbcStateStore {
         entityManager.close();
     }
 
-    public PendingInstanceBean getPendingInstance(String feedName, String clusterName, Date nominalTime) {
+    public PendingInstanceBean getPendingInstance(String entityName, String clusterName, Date nominalTime,
+                                                  String entityType) {
         EntityManager entityManager = getEntityManager();
         beginTransaction(entityManager);
         TypedQuery<PendingInstanceBean> q = entityManager.createNamedQuery(PersistenceConstants.GET_PENDING_INSTANCE,
                             PendingInstanceBean.class);
-        q.setParameter("feedName", feedName);
+        q.setParameter(PendingInstanceBean.ENTITYNAME, entityName);
 
-        q.setParameter("clusterName", clusterName);
-        q.setParameter("nominalTime", nominalTime);
+        q.setParameter(PendingInstanceBean.CLUSTERNAME, clusterName);
+        q.setParameter(PendingInstanceBean.NOMINALTIME, nominalTime);
+        q.setParameter(PendingInstanceBean.ENTITYTYPE, entityType);
         try {
             return q.getSingleResult();
         } finally {
@@ -184,14 +197,16 @@ public class MonitoringJdbcStateStore {
         }
     }
 
-    public FeedSLAAlertBean getFeedAlertInstance(String feedName, String clusterName, Date nominalTime) {
+    public EntitySLAAlertBean getEntityAlertInstance(String entityName, String clusterName, Date nominalTime,
+                                                     String entityType) {
         EntityManager entityManager = getEntityManager();
         beginTransaction(entityManager);
-        TypedQuery<FeedSLAAlertBean> q = entityManager.createNamedQuery(PersistenceConstants.GET_FEED_ALERT_INSTANCE,
-                FeedSLAAlertBean.class);
-        q.setParameter("feedName", feedName);
-        q.setParameter("clusterName", clusterName);
-        q.setParameter("nominalTime", nominalTime);
+        TypedQuery<EntitySLAAlertBean> q = entityManager.createNamedQuery(PersistenceConstants.
+                GET_ENTITY_ALERT_INSTANCE, EntitySLAAlertBean.class);
+        q.setParameter(EntitySLAAlertBean.ENTITYNAME, entityName);
+        q.setParameter(EntitySLAAlertBean.CLUSTERNAME, clusterName);
+        q.setParameter(EntitySLAAlertBean.NOMINALTIME, nominalTime);
+        q.setParameter(EntitySLAAlertBean.ENTITYTYPE, entityType);
         try {
             return q.getSingleResult();
         } finally {
@@ -199,30 +214,32 @@ public class MonitoringJdbcStateStore {
         }
     }
 
-    public void putSLAAlertInstance(String feedName, String cluster, Date nominalTime, Boolean isSLALowMissed,
-                                    Boolean isSLAHighMissed) {
+    public void putSLAAlertInstance(String entityName, String cluster, String entityType, Date nominalTime,
+                                    Boolean isSLALowMissed, Boolean isSLAHighMissed) throws FalconException{
         EntityManager entityManager = getEntityManager();
-        FeedSLAAlertBean feedSLAAlertBean = new FeedSLAAlertBean();
-        feedSLAAlertBean.setFeedName(feedName);
-        feedSLAAlertBean.setClusterName(cluster);
-        feedSLAAlertBean.setNominalTime(nominalTime);
-        feedSLAAlertBean.setIsSLALowMissed(isSLALowMissed);
-        feedSLAAlertBean.setIsSLAHighMissed(isSLAHighMissed);
+        EntitySLAAlertBean entitySLAAlertBean = new EntitySLAAlertBean();
+        entitySLAAlertBean.setEntityName(entityName);
+        entitySLAAlertBean.setClusterName(cluster);
+        entitySLAAlertBean.setNominalTime(nominalTime);
+        entitySLAAlertBean.setIsSLALowMissed(isSLALowMissed);
+        entitySLAAlertBean.setIsSLAHighMissed(isSLAHighMissed);
+        entitySLAAlertBean.setEntityType(entityType);
         try {
             beginTransaction(entityManager);
-            entityManager.persist(feedSLAAlertBean);
+            entityManager.persist(entitySLAAlertBean);
         } finally {
             commitAndCloseTransaction(entityManager);
         }
     }
 
-    public void updateSLAAlertInstance(String feedName, String clusterName, Date nominalTime) {
+    public void updateSLAAlertInstance(String entityName, String clusterName, Date nominalTime, String entityType) {
         EntityManager entityManager = getEntityManager();
         beginTransaction(entityManager);
         Query q = entityManager.createNamedQuery(PersistenceConstants.UPDATE_SLA_HIGH);
-        q.setParameter("feedName", feedName);
-        q.setParameter("clusterName", clusterName);
-        q.setParameter("nominalTime", nominalTime);
+        q.setParameter(EntitySLAAlertBean.ENTITYNAME, entityName);
+        q.setParameter(EntitySLAAlertBean.CLUSTERNAME, clusterName);
+        q.setParameter(EntitySLAAlertBean.NOMINALTIME, nominalTime);
+        q.setParameter(EntitySLAAlertBean.ENTITYTYPE, entityType);
         try{
             q.executeUpdate();
         } finally {
@@ -230,13 +247,14 @@ public class MonitoringJdbcStateStore {
         }
     }
 
-    public void deleteFeedAlertInstance(String feedName, String clusterName, Date nominalTime){
+    public void deleteEntityAlertInstance(String entityName, String clusterName, Date nominalTime, String entityType){
         EntityManager entityManager = getEntityManager();
         beginTransaction(entityManager);
-        Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_FEED_ALERT_INSTANCE);
-        q.setParameter("feedName", feedName);
-        q.setParameter("clusterName", clusterName);
-        q.setParameter("nominalTime", nominalTime);
+        Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_ENTITY_ALERT_INSTANCE);
+        q.setParameter(EntitySLAAlertBean.ENTITYNAME, entityName);
+        q.setParameter(EntitySLAAlertBean.CLUSTERNAME, clusterName);
+        q.setParameter(EntitySLAAlertBean.NOMINALTIME, nominalTime);
+        q.setParameter(EntitySLAAlertBean.ENTITYTYPE, entityType);
         try{
             q.executeUpdate();
         } finally {
@@ -245,7 +263,7 @@ public class MonitoringJdbcStateStore {
     }
 
 
-    public List<FeedSLAAlertBean> getSLAHighCandidates() {
+    public List<EntitySLAAlertBean> getSLAHighCandidates() {
         EntityManager entityManager = getEntityManager();
         beginTransaction(entityManager);
         Query q = entityManager.createNamedQuery(PersistenceConstants.GET_SLA_HIGH_CANDIDATES);

http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
index c6903a4..895f8b2 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
@@ -31,7 +31,7 @@ import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.entity.v0.UnschedulableEntityException;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.monitors.Dimension;
-import org.apache.falcon.service.FeedSLAMonitoringService;
+import org.apache.falcon.service.EntitySLAMonitoringService;
 import org.apache.falcon.util.DeploymentUtil;
 import org.apache.falcon.workflow.WorkflowEngineFactory;
 import org.apache.hadoop.security.authorize.AuthorizationException;
@@ -162,11 +162,11 @@ public abstract class AbstractSchedulableEntityManager extends AbstractInstanceM
             Date end = (endStr == null) ? new Date() : EntityUtil.parseDateUTC(endStr);
 
             if (StringUtils.isBlank(feedName)) {
-                instances.addAll(FeedSLAMonitoringService.get().getFeedSLAMissPendingAlerts(start, end));
+                instances.addAll(EntitySLAMonitoringService.get().getEntitySLAMissPendingAlerts(start, end));
             } else {
                 for (String clusterName : DeploymentUtil.getCurrentClusters()) {
-                    instances.addAll(FeedSLAMonitoringService.get().getFeedSLAMissPendingAlerts(feedName,
-                            clusterName, start, end));
+                    instances.addAll(EntitySLAMonitoringService.get().getEntitySLAMissPendingAlerts(feedName,
+                            clusterName, start, end, EntityType.FEED.toString()));
                 }
             }
         } catch (FalconException e) {

http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
index 53a9de1..249c273 100644
--- a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
+++ b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
@@ -134,7 +134,7 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
 
             @Override
             protected SchedulableEntityInstanceResult doExecute(String colo) throws FalconException {
-                return getEntityManager(colo).invoke("getFeedSLAMissPendingAlerts", entityType, entityName,
+                return getEntityManager(colo).invoke("getEntitySLAMissPendingAlerts", entityType, entityName,
                         start, end, colo);
             }
         }.execute();

http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java b/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java
new file mode 100644
index 0000000..f023c35
--- /dev/null
+++ b/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.service;
+
+
+import java.util.Date;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.ArrayList;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.ClusterHelper;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.jdbc.MonitoringJdbcStateStore;
+import org.apache.falcon.persistence.PendingInstanceBean;
+import org.apache.falcon.resource.SchedulableEntityInstance;
+import org.apache.falcon.util.ReflectionUtils;
+import org.apache.falcon.util.StartupProperties;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+  * Service to know which all feeds have missed SLA.
+  */
+public final class EntitySLAAlertService implements FalconService, EntitySLAListener {
+
+    private static final String NAME = "EntitySLAAlertService";
+
+    private static final Logger LOG = LoggerFactory.getLogger(EntitySLAAlertService.class);
+
+    private MonitoringJdbcStateStore store = new MonitoringJdbcStateStore();
+
+    private Set<EntitySLAListener> listeners = new LinkedHashSet<EntitySLAListener>();
+
+    private static final EntitySLAAlertService SERVICE = new EntitySLAAlertService();
+
+    public static EntitySLAAlertService get() {
+        return SERVICE;
+    }
+
+    private EntitySLAAlertService(){}
+
+
+    @Override
+    public String getName() {
+        return NAME;
+    }
+
+    @Override
+    public void init() throws FalconException {
+        String listenerClassNames = StartupProperties.get().
+                getProperty("feedAlert.listeners");
+        for (String listenerClassName : listenerClassNames.split(",")) {
+            listenerClassName = listenerClassName.trim();
+            if (listenerClassName.isEmpty()) {
+                continue;
+            }
+            EntitySLAListener listener = ReflectionUtils.getInstanceByClassName(listenerClassName);
+            registerListener(listener);
+        }
+
+        String freq = StartupProperties.get().getProperty("feed.sla.statusCheck.frequency.seconds", "600");
+        int statusCheckFrequencySeconds = Integer.parseInt(freq);
+
+        ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
+        executor.scheduleWithFixedDelay(new Monitor(), 0, statusCheckFrequencySeconds + 10, TimeUnit.SECONDS);
+    }
+
+    public void registerListener(EntitySLAListener listener) {
+        listeners.add(listener);
+    }
+
+    @Override
+    public void destroy() throws FalconException {
+
+    }
+
+
+    private class Monitor implements Runnable {
+
+        @Override
+        public void run() {
+            processSLACandidates();
+        }
+    }
+
+    void processSLACandidates(){
+        //Get all feeds instances to be monitored
+        List<PendingInstanceBean> pendingInstanceBeanList = store.getAllInstances();
+        if (pendingInstanceBeanList == null || pendingInstanceBeanList.isEmpty()){
+            return;
+        }
+
+        LOG.debug("In processSLACandidates :" + pendingInstanceBeanList.size());
+        try{
+            for (PendingInstanceBean pendingInstanceBean : pendingInstanceBeanList) {
+
+                String entityName = pendingInstanceBean.getEntityName();
+                String clusterName = pendingInstanceBean.getClusterName();
+                Date nominalTime = pendingInstanceBean.getNominalTime();
+                String entityType = pendingInstanceBean.getEntityType();
+
+                org.apache.falcon.entity.v0.cluster.Cluster cluster = ClusterHelper.getCluster(clusterName);
+
+                Set<SchedulableEntityInstance> schedulableEntityInstances= EntitySLAMonitoringService.get().
+                        getEntitySLAMissPendingAlerts(entityName, cluster.getName(), nominalTime, nominalTime
+                                , entityType);
+                if (schedulableEntityInstances.isEmpty()){
+                    store.deleteEntityAlertInstance(entityName, cluster.getName(), nominalTime,
+                            entityType);
+                    return;
+                }
+                List<SchedulableEntityInstance> schedulableEntityList = new ArrayList<>(schedulableEntityInstances);
+                SchedulableEntityInstance schedulableEntityInstance = schedulableEntityList.get(0);
+
+
+                if (schedulableEntityInstance.getTags().contains(EntitySLAMonitoringService.get().TAG_WARN)) {
+                    store.putSLAAlertInstance(entityName, clusterName, entityType,
+                            nominalTime, true, false);
+                    //Mark in DB as SLA missed
+                    LOG.info("Feed :"+ entityName
+                                + "Cluster:" + clusterName + "Nominal Time:" + nominalTime + "missed SLALow");
+                } else if (schedulableEntityInstance.getTags().contains(EntitySLAMonitoringService.get().TAG_CRITICAL)){
+                    if (entityType.equals(EntityType.PROCESS.name())){
+                        store.putSLAAlertInstance(entityName, clusterName, entityType,
+                                nominalTime, true, false);
+                    }
+                    store.updateSLAAlertInstance(entityName, clusterName, nominalTime, entityType);
+                    LOG.info("Entity :"+ entityName
+                            + "Cluster:" + clusterName + "Nominal Time:" + nominalTime + "EntityType:"+ entityType
+                            + "missed SLAHigh");
+                    highSLAMissed(entityName, clusterName, entityType, nominalTime);
+                }
+            }
+        } catch (FalconException e){
+            LOG.error("Exception in FeedSLAALertService:", e);
+        }
+
+    }
+
+    @Override
+    public void highSLAMissed(String entityName, String clusterName, String entityType , Date nominalTime
+                              ) throws FalconException {
+        for (EntitySLAListener listener : listeners) {
+            listener.highSLAMissed(entityName, clusterName, entityType, nominalTime);
+            store.deleteEntityAlertInstance(entityName, clusterName, nominalTime, entityType);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/prism/src/main/java/org/apache/falcon/service/EntitySLAListener.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/service/EntitySLAListener.java b/prism/src/main/java/org/apache/falcon/service/EntitySLAListener.java
index 991052f..421ea38 100644
--- a/prism/src/main/java/org/apache/falcon/service/EntitySLAListener.java
+++ b/prism/src/main/java/org/apache/falcon/service/EntitySLAListener.java
@@ -18,7 +18,6 @@
 package org.apache.falcon.service;
 
 import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.v0.EntityType;
 
 import java.util.Date;
 
@@ -26,6 +25,6 @@ import java.util.Date;
  * Interface for FeedSLAAlert to be used by Listeners.
  */
 public interface EntitySLAListener {
-    void highSLAMissed(String enityName, EntityType entityType, String clusterName, Date nominalTime)
+    void highSLAMissed(String entityName, String clusterName, String entityType, Date nominalTime)
         throws FalconException;
 }