You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pa...@apache.org on 2016/07/29 03:11:02 UTC

falcon git commit: FALCON-2059 BacklogMetricEmitter Service for Falcon Processes

Repository: falcon
Updated Branches:
  refs/heads/master e94dd72fa -> c7996deb7


FALCON-2059 BacklogMetricEmitter Service for Falcon Processes

Author: pavan.kolamuri <pa...@gmail.com>
Author: Pavan Kolamuri <pa...@appdynamics.com>

Reviewers: @pallavi-rao

Closes #212 from pavankumar526/master and squashes the following commits:

ad84f0f [Pavan Kolamuri] Fixed checkstyle issues
dea8f93 [pavan.kolamuri] Added doc in startup.properties
dbe3a7f [pavan.kolamuri] Added more log statements
d72d228 [pavan.kolamuri] Exception changed to throwable
46dcef8 [pavan.kolamuri] Fixed bug in oozieworkflowengine FALCON-2059
e92d3bc [pavan.kolamuri] Add isMissing method FALCON-2059
6d6cf81 [pavan.kolamuri] Handled when entity was deleted FALCON-2059
6c03701 [pavan.kolamuri] Fixed User authentication issue in oozie
81f0b03 [pavan.kolamuri] Rebased the patch
e3dbe88 [pavan.kolamuri] Handled multiple pipelines processes FALCON-2059
b5d9e70 [pavan.kolamuri] Addressed based on comments FALCON-2059
fb78fba [pavan.kolamuri] Refactored changes based on EntitySLAAlert service
80c015a [pavan.kolamuri] FALCON-2059 BacklogMetricEmitter Service for Falcon Processes


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/c7996deb
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/c7996deb
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/c7996deb

Branch: refs/heads/master
Commit: c7996deb76819fc7ef98dd28fd4dff19474b7602
Parents: e94dd72
Author: pavan.kolamuri <pa...@gmail.com>
Authored: Fri Jul 29 08:40:49 2016 +0530
Committer: Pallavi Rao <pa...@inmobi.com>
Committed: Fri Jul 29 08:40:49 2016 +0530

----------------------------------------------------------------------
 .../falcon/persistence/BacklogMetricBean.java   | 116 ++++++
 .../persistence/PersistenceConstants.java       |   2 +
 .../falcon/tools/FalconStateStoreDBCLI.java     |   1 +
 .../workflow/engine/AbstractWorkflowEngine.java |   2 +
 .../src/main/resources/META-INF/persistence.xml |   9 +-
 common/src/main/resources/startup.properties    |   4 +
 .../entity/parser/ProcessEntityParserTest.java  |   2 +-
 .../resources/config/process/process-0.1.xml    |   2 +-
 .../workflow/engine/OozieWorkflowEngine.java    |  26 +-
 .../apache/falcon/jdbc/BacklogMetricStore.java  | 121 +++++++
 .../falcon/resource/channel/HTTPChannel.java    |   5 +-
 .../service/BacklogMetricEmitterService.java    | 356 +++++++++++++++++++
 .../falcon/service/EntitySLAAlertService.java   |   6 +-
 .../falcon/service/EntitySLAListener.java       |   3 +-
 .../java/org/apache/falcon/util/MetricInfo.java |  79 ++++
 .../BacklogMetricEmitterServiceTest.java        | 133 +++++++
 prism/src/test/resources/startup.properties     | 338 ++++++++++++++++++
 .../workflow/engine/FalconWorkflowEngine.java   |   5 +
 src/build/findbugs-exclude.xml                  |   6 +
 src/conf/startup.properties                     |  16 +
 20 files changed, 1217 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/c7996deb/common/src/main/java/org/apache/falcon/persistence/BacklogMetricBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/BacklogMetricBean.java b/common/src/main/java/org/apache/falcon/persistence/BacklogMetricBean.java
new file mode 100644
index 0000000..b563da7
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/persistence/BacklogMetricBean.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.persistence;
+
+import org.apache.openjpa.persistence.jdbc.Index;
+
+import javax.persistence.Basic;
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.GeneratedValue;
+import javax.persistence.GenerationType;
+import javax.persistence.Id;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
+import javax.persistence.Table;
+import javax.validation.constraints.NotNull;
+import java.util.Date;
+
+//SUSPEND CHECKSTYLE CHECK LineLengthCheck
+/**
+ * Backlog Metric Object stored in DB.
+ */
+@Entity
+@NamedQueries({
+        @NamedQuery(name = PersistenceConstants.GET_ALL_BACKLOG_INSTANCES, query = "select  OBJECT(a) from BacklogMetricBean a "),
+        @NamedQuery(name = PersistenceConstants.DELETE_BACKLOG_METRIC_INSTANCE, query = "delete from BacklogMetricBean a where a.entityName = :entityName and a.clusterName = :clusterName and a.nominalTime = :nominalTime and a.entityType = :entityType")
+})
+//RESUME CHECKSTYLE CHECK  LineLengthCheck
+
+@Table(name = "BACKLOG_METRIC")
+public class BacklogMetricBean {
+
+    @NotNull
+    @GeneratedValue(strategy = GenerationType.AUTO)
+    @Id
+    private String id;
+
+    @Basic
+    @NotNull
+    @Index
+    @Column(name = "entity_name")
+    private String entityName;
+
+    @Basic
+    @NotNull
+    @Column(name = "cluster_name")
+    private String clusterName;
+
+    @Basic
+    @NotNull
+    @Index
+    @Column(name = "nominal_time")
+    private Date nominalTime;
+
+    @Basic
+    @NotNull
+    @Index
+    @Column(name = "entity_type")
+    private String entityType;
+
+
+    public String getId() {
+        return id;
+    }
+
+    public String getEntityName() {
+        return entityName;
+    }
+
+    public String getClusterName() {
+        return clusterName;
+    }
+
+    public Date getNominalTime() {
+        return nominalTime;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    public void setEntityName(String entityName) {
+        this.entityName = entityName;
+    }
+
+    public void setClusterName(String clusterName) {
+        this.clusterName = clusterName;
+    }
+
+    public void setNominalTime(Date nominalTime) {
+        this.nominalTime = nominalTime;
+    }
+
+    public String getEntityType() {
+        return entityType;
+    }
+
+    public void setEntityType(String entityType) {
+        this.entityType = entityType;
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/c7996deb/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
index 7c2479d..5c3de51 100644
--- a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
+++ b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
@@ -61,5 +61,7 @@ public final class PersistenceConstants {
     public static final String UPDATE_SLA_HIGH = "UPDATE_SLA_HIGH";
     public static final String GET_ENTITY_ALERT_INSTANCE = "GET_ENTITY_ALERT_INSTANCE";
     public static final String DELETE_ENTITY_ALERT_INSTANCE = "DELETE_ENTITY_ALERT_INSTANCE";
+    public static final String DELETE_BACKLOG_METRIC_INSTANCE = "DELETE_BACKLOG_METRIC_INSTANCE";
+    public static final String GET_ALL_BACKLOG_INSTANCES = "GET_ALL_BACKLOG_INSTANCES";
     public static final String GET_ALL_MONITORING_ENTITY = "GET_ALL_MONITORING_ENTITY";
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/c7996deb/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java b/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java
index 102b986..9c6e8b3 100644
--- a/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java
+++ b/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java
@@ -246,6 +246,7 @@ public class FalconStateStoreDBCLI {
         args.add("org.apache.falcon.persistence.PendingInstanceBean");
         args.add("org.apache.falcon.persistence.MonitoredEntityBean");
         args.add("org.apache.falcon.persistence.EntitySLAAlertBean");
+        args.add("org.apache.falcon.persistence.BacklogMetricBean");
         return args.toArray(new String[args.size()]);
     }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/c7996deb/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
index 4d8402a..0db7e9b 100644
--- a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
+++ b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
@@ -72,6 +72,8 @@ public abstract class AbstractWorkflowEngine {
 
     public abstract boolean isCompleted(Entity entity) throws FalconException;
 
+    public abstract boolean isMissing(Entity entity) throws FalconException;
+
     public abstract InstancesResult getRunningInstances(Entity entity,
                                                         List<LifeCycle> lifeCycles) throws FalconException;
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/c7996deb/common/src/main/resources/META-INF/persistence.xml
----------------------------------------------------------------------
diff --git a/common/src/main/resources/META-INF/persistence.xml b/common/src/main/resources/META-INF/persistence.xml
index ac2f397..d58e21c 100644
--- a/common/src/main/resources/META-INF/persistence.xml
+++ b/common/src/main/resources/META-INF/persistence.xml
@@ -28,6 +28,7 @@
         <class>org.apache.falcon.persistence.PendingInstanceBean</class>
         <class>org.apache.falcon.persistence.MonitoredEntityBean</class>
         <class>org.apache.falcon.persistence.EntitySLAAlertBean</class>
+        <class>org.apache.falcon.persistence.BacklogMetricBean</class>
 
         <properties>
             <property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp.BasicDataSource"/>
@@ -60,7 +61,7 @@
         <class>org.apache.falcon.persistence.PendingInstanceBean</class>
         <class>org.apache.falcon.persistence.MonitoredEntityBean</class>
         <class>org.apache.falcon.persistence.EntitySLAAlertBean</class>
-
+        <class>org.apache.falcon.persistence.BacklogMetricBean</class>
         <properties>
             <property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp.BasicDataSource"/>
 
@@ -91,7 +92,7 @@
         <class>org.apache.falcon.persistence.MonitoredEntityBean</class>
         <class>org.apache.falcon.persistence.PendingInstanceBean</class>
         <class>org.apache.falcon.persistence.EntitySLAAlertBean</class>
-
+        <class>org.apache.falcon.persistence.BacklogMetricBean</class>
         <properties>
             <property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp.BasicDataSource"/>
 
@@ -100,8 +101,8 @@
             <property name="openjpa.MetaDataFactory"
                       value="jpa(Types=org.apache.falcon.persistence.EntityBean;
                 org.apache.falcon.persistence.InstanceBean;org.apache.falcon.persistence.PendingInstanceBean;
-                org.apache.falcon.persistence.MonitoredEntityBean;org.apache.falcon.persistence.EntitySLAAlertBean)"></property>
-
+                org.apache.falcon.persistence.MonitoredEntityBean;org.apache.falcon.persistence.EntitySLAAlertBean;
+                org.apache.falcon.persistence.BacklogMetricBean)"/>
             <property name="openjpa.DetachState" value="fetch-groups(DetachedStateField=true)"/>
             <property name="openjpa.LockManager" value="pessimistic"/>
             <property name="openjpa.ReadLockLevel" value="read"/>

http://git-wip-us.apache.org/repos/asf/falcon/blob/c7996deb/common/src/main/resources/startup.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties
index de24621..4b692a2 100644
--- a/common/src/main/resources/startup.properties
+++ b/common/src/main/resources/startup.properties
@@ -332,3 +332,7 @@ it.workflow.execution.listeners=org.apache.falcon.catalog.CatalogPartitionHandle
 #*.falcon.graphite.port=2003
 #*.falcon.graphite.frequency=1
 #*.falcon.graphite.prefix=falcon
+
+# Backlog Metric Properties
+#*.falcon.backlog.metricservice.emit.interval.millisecs=60000
+#*.falcon.backlog.metricservice.recheck.interval.millisecs=600000

http://git-wip-us.apache.org/repos/asf/falcon/blob/c7996deb/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java b/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java
index c4bfff6..3398c26 100644
--- a/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java
@@ -103,7 +103,7 @@ public class ProcessEntityParserTest extends AbstractTestBase {
 
         Assert.assertEquals(process.getTags(),
                 "consumer=consumer@xyz.com, owner=producer@xyz.com, _department_type=forecasting");
-        Assert.assertEquals(process.getPipelines(), "testPipeline,dataReplication_Pipeline");
+        Assert.assertEquals(process.getPipelines(), "testPipeline");
 
         Assert.assertEquals(process.getInputs().getInputs().get(0).getName(), "impression");
         Assert.assertEquals(process.getInputs().getInputs().get(0).getFeed(), "impressionFeed");

http://git-wip-us.apache.org/repos/asf/falcon/blob/c7996deb/common/src/test/resources/config/process/process-0.1.xml
----------------------------------------------------------------------
diff --git a/common/src/test/resources/config/process/process-0.1.xml b/common/src/test/resources/config/process/process-0.1.xml
index 4ce7ad1..1550101 100644
--- a/common/src/test/resources/config/process/process-0.1.xml
+++ b/common/src/test/resources/config/process/process-0.1.xml
@@ -18,7 +18,7 @@
   -->
 <process name="sample" version="0" xmlns="uri:falcon:process:0.1">
     <tags>consumer=consumer@xyz.com, owner=producer@xyz.com, _department_type=forecasting</tags>
-    <pipelines>testPipeline,dataReplication_Pipeline</pipelines>
+    <pipelines>testPipeline</pipelines>
     <clusters>
         <cluster name="testCluster">
             <validity start="2011-11-02T00:00Z" end="2091-12-30T00:00Z"/>

http://git-wip-us.apache.org/repos/asf/falcon/blob/c7996deb/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index 9a09f18..38a6c00 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -272,6 +272,24 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
                 || isBundleInState(bundles, BundleStatus.KILLED));
     }
 
+    @Override
+    public boolean isMissing(Entity entity) throws FalconException {
+        List<String> bundlesToRemove = new ArrayList<>();
+        Map<String, BundleJob> bundles = findLatestBundle(entity);
+        for (Map.Entry<String, BundleJob> clusterBundle : bundles.entrySet()) {
+            if (clusterBundle.getValue() == MISSING) { // There is no active bundle for this cluster
+                bundlesToRemove.add(clusterBundle.getKey());
+            }
+        }
+        for (String bundleToRemove : bundlesToRemove) {
+            bundles.remove(bundleToRemove);
+        }
+        if (bundles.size() == 0) {
+            return true;
+        }
+        return false;
+    }
+
     private enum BundleStatus {
         ACTIVE, RUNNING, SUSPENDED, FAILED, KILLED, SUCCEEDED
     }
@@ -1216,9 +1234,11 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
     }
 
     private boolean isCoordApplicable(String appName, List<LifeCycle> lifeCycles) {
-        for (LifeCycle lifeCycle : lifeCycles) {
-            if (appName.contains(lifeCycle.getTag().name())) {
-                return true;
+        if (lifeCycles != null && !lifeCycles.isEmpty()) {
+            for (LifeCycle lifeCycle : lifeCycles) {
+                if (appName.contains(lifeCycle.getTag().name())) {
+                    return true;
+                }
             }
         }
         return false;

http://git-wip-us.apache.org/repos/asf/falcon/blob/c7996deb/prism/src/main/java/org/apache/falcon/jdbc/BacklogMetricStore.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/jdbc/BacklogMetricStore.java b/prism/src/main/java/org/apache/falcon/jdbc/BacklogMetricStore.java
new file mode 100644
index 0000000..ef9a396
--- /dev/null
+++ b/prism/src/main/java/org/apache/falcon/jdbc/BacklogMetricStore.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.jdbc;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.persistence.BacklogMetricBean;
+import org.apache.falcon.persistence.PersistenceConstants;
+import org.apache.falcon.service.BacklogMetricEmitterService;
+import org.apache.falcon.service.FalconJPAService;
+import org.apache.falcon.util.MetricInfo;
+
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Backlog Metric Store for entitties.
+ */
+public class BacklogMetricStore {
+
+    private EntityManager getEntityManager() {
+        return FalconJPAService.get().getEntityManager();
+    }
+
+
+    public void addInstance(String entityName, String cluster, Date nominalTime, EntityType entityType) {
+        BacklogMetricBean backlogMetricBean = new BacklogMetricBean();
+        backlogMetricBean.setClusterName(cluster);
+        backlogMetricBean.setEntityName(entityName);
+        backlogMetricBean.setNominalTime(nominalTime);
+        backlogMetricBean.setEntityType(entityType.name());
+        EntityManager entityManager = getEntityManager();
+        try {
+            beginTransaction(entityManager);
+            entityManager.persist(backlogMetricBean);
+        } finally {
+            commitAndCloseTransaction(entityManager);
+        }
+    }
+
+    public synchronized void deleteMetricInstance(String entityName, String cluster, Date nominalTime,
+                                                  EntityType entityType) {
+        EntityManager entityManager = getEntityManager();
+        beginTransaction(entityManager);
+        Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_BACKLOG_METRIC_INSTANCE);
+        q.setParameter("entityName", entityName);
+        q.setParameter("clusterName", cluster);
+        q.setParameter("nominalTime", nominalTime);
+        q.setParameter("entityType", entityType.name());
+        try{
+            q.executeUpdate();
+        } finally {
+            commitAndCloseTransaction(entityManager);
+        }
+    }
+
+
+    private void beginTransaction(EntityManager entityManager) {
+        entityManager.getTransaction().begin();
+    }
+
+    private void commitAndCloseTransaction(EntityManager entityManager) {
+        if (entityManager != null) {
+            entityManager.getTransaction().commit();
+            entityManager.close();
+        }
+    }
+
+    public Map<Entity, List<MetricInfo>> getAllInstances() throws FalconException {
+        EntityManager entityManager = getEntityManager();
+        Query q = entityManager.createNamedQuery(PersistenceConstants.GET_ALL_BACKLOG_INSTANCES);
+        List<BacklogMetricBean> result = q.getResultList();
+
+        try {
+            if (CollectionUtils.isEmpty(result)) {
+                return null;
+            }
+        } finally{
+            entityManager.close();
+        }
+
+        Map<Entity, List<MetricInfo>> backlogMetrics = new HashMap<>();
+        for (BacklogMetricBean backlogMetricBean : result) {
+            Entity entity = EntityUtil.getEntity(backlogMetricBean.getEntityType(),
+                    backlogMetricBean.getEntityName());
+            if (!backlogMetrics.containsKey(entity)) {
+                backlogMetrics.put(entity, new ArrayList<MetricInfo>());
+            }
+            List<MetricInfo> metrics =  backlogMetrics.get(entity);
+            MetricInfo metricInfo = new MetricInfo(BacklogMetricEmitterService.DATE_FORMAT.get()
+                    .format(backlogMetricBean.getNominalTime()),
+                    backlogMetricBean.getClusterName());
+            metrics.add(metricInfo);
+            backlogMetrics.put(entity, metrics);
+        }
+        return backlogMetrics;
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/c7996deb/prism/src/main/java/org/apache/falcon/resource/channel/HTTPChannel.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/channel/HTTPChannel.java b/prism/src/main/java/org/apache/falcon/resource/channel/HTTPChannel.java
index a63ae63..187d6c7 100644
--- a/prism/src/main/java/org/apache/falcon/resource/channel/HTTPChannel.java
+++ b/prism/src/main/java/org/apache/falcon/resource/channel/HTTPChannel.java
@@ -46,6 +46,7 @@ import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status.Family;
 import javax.ws.rs.core.UriBuilder;
+import java.io.IOException;
 import java.lang.annotation.Annotation;
 import java.lang.reflect.Method;
 import java.util.List;
@@ -138,8 +139,8 @@ public class HTTPChannel extends AbstractChannel {
                 if (incomingRequest != null) {
                     incomingRequest.getInputStream().reset();
                 }
-            } catch (Exception ignore) {
-                // nothing to be done;
+            } catch (IOException e) {
+                LOG.error("Error in HTTPChannel", e);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/c7996deb/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java b/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java
new file mode 100644
index 0000000..801ab36
--- /dev/null
+++ b/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java
@@ -0,0 +1,356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.service;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.LifeCycle;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.SchemaHelper;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.jdbc.BacklogMetricStore;
+import org.apache.falcon.metrics.MetricNotificationService;
+import org.apache.falcon.resource.InstancesResult;
+import org.apache.falcon.security.CurrentUser;
+import org.apache.falcon.util.MetricInfo;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.falcon.workflow.WorkflowExecutionContext;
+import org.apache.falcon.workflow.WorkflowExecutionListener;
+import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.falcon.workflow.WorkflowEngineFactory.getWorkflowEngine;
+
+/**
+ * Backlog Metric Emitter Service to publish metrics to Graphite.
+ */
+public final class BacklogMetricEmitterService implements FalconService,
+        EntitySLAListener, WorkflowExecutionListener {
+
+    private static final String METRIC_PREFIX = "falcon";
+    private static final String METRIC_SEPARATOR = ".";
+    private static final String BACKLOG_METRIC_EMIT_INTERVAL = "falcon.backlog.metricservice.emit.interval.millisecs";
+    private static final String BACKLOG_METRIC_RECHECK_INTERVAL = "falcon.backlog.metricservice."
+            + "recheck.interval.millisecs";
+    private static final String DEFAULT_PIPELINE = "DEFAULT";
+
+    private static final Logger LOG = LoggerFactory.getLogger(BacklogMetricEmitterService.class);
+
+    private static BacklogMetricStore backlogMetricStore = new BacklogMetricStore();
+
+    private static final BacklogMetricEmitterService SERVICE = new BacklogMetricEmitterService();
+
+    private static MetricNotificationService metricNotificationService =
+            Services.get().getService(MetricNotificationService.SERVICE_NAME);
+
+    public static BacklogMetricEmitterService get() {
+        return SERVICE;
+    }
+
+    private BacklogMetricEmitterService() {
+    }
+
+    private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor1 = new ScheduledThreadPoolExecutor(1);
+    private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor2 = new ScheduledThreadPoolExecutor(1);
+
+
+    public static final ThreadLocal<SimpleDateFormat> DATE_FORMAT = new ThreadLocal<SimpleDateFormat>() {
+        @Override
+        protected SimpleDateFormat initialValue() {
+            SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd'T'HH-mm'Z'");
+            format.setTimeZone(TimeZone.getTimeZone("UTC"));
+            return format;
+        }
+    };
+
+    private static ConcurrentHashMap<Entity, List<MetricInfo>> entityBacklogs = new ConcurrentHashMap<>();
+
+    @Override
+    public void highSLAMissed(String entityName, String clusterName, EntityType entityType, Date nominalTime)
+        throws FalconException {
+
+        if (entityType != EntityType.PROCESS) {
+            return;
+        }
+        Entity entity = EntityUtil.getEntity(entityType, entityName);
+        entityBacklogs.putIfAbsent(entity, Collections.synchronizedList(new ArrayList<MetricInfo>()));
+        List<MetricInfo> metricInfoList = entityBacklogs.get(entity);
+        String nominalTimeStr = DATE_FORMAT.get().format(nominalTime);
+        MetricInfo metricInfo = new MetricInfo(nominalTimeStr, clusterName);
+        if (!metricInfoList.contains(metricInfo)) {
+            synchronized (metricInfoList) {
+                backlogMetricStore.addInstance(entityName, clusterName, nominalTime, entityType);
+                metricInfoList.add(metricInfo);
+            }
+        }
+    }
+
+    @Override
+    public String getName() {
+        return this.getClass().getSimpleName();
+    }
+
+    @Override
+    public void init() throws FalconException {
+        initInstances();
+        int emitInterval = Integer.parseInt(StartupProperties.get().getProperty(BACKLOG_METRIC_EMIT_INTERVAL,
+                "60000"));
+        int recheckInterval = Integer.parseInt(StartupProperties.get().getProperty(BACKLOG_METRIC_RECHECK_INTERVAL,
+                "60000"));
+        scheduledThreadPoolExecutor1.scheduleAtFixedRate(new BacklogMetricEmitter(),
+                1, emitInterval, TimeUnit.MILLISECONDS);
+        scheduledThreadPoolExecutor2.scheduleAtFixedRate(new BacklogCheckService(),
+                1, recheckInterval, TimeUnit.MILLISECONDS);
+    }
+
+    private void initInstances() throws FalconException {
+        LOG.info("Initializing backlog instances from state store");
+        Map<Entity, List<MetricInfo>> backlogInstances = backlogMetricStore.getAllInstances();
+        if (backlogInstances != null && !backlogInstances.isEmpty()) {
+            for (Map.Entry<Entity, List<MetricInfo>> entry : backlogInstances.entrySet()) {
+                List<MetricInfo> metricsInDB = entry.getValue();
+                List<MetricInfo> metricInfoList = Collections.synchronizedList(metricsInDB);
+                entityBacklogs.put(entry.getKey(), metricInfoList);
+                LOG.debug("Backlog of entity " + entry.getKey().getName() + " for instances " + metricInfoList);
+            }
+        }
+    }
+
+    @Override
+    public void destroy() throws FalconException {
+        scheduledThreadPoolExecutor1.shutdown();
+        scheduledThreadPoolExecutor2.shutdown();
+    }
+
+    @Override
+    public synchronized void onSuccess(WorkflowExecutionContext context) throws FalconException {
+        Entity entity = EntityUtil.getEntity(context.getEntityType(), context.getEntityName());
+        if (entity.getEntityType() != EntityType.PROCESS) {
+            return;
+        }
+        if (entityBacklogs.containsKey(entity)) {
+            List<MetricInfo> metrics = entityBacklogs.get(entity);
+            synchronized (metrics) {
+                Date date = SchemaHelper.parseDateUTC(context.getNominalTimeAsISO8601());
+                backlogMetricStore.deleteMetricInstance(entity.getName(), context.getClusterName(),
+                        date, entity.getEntityType());
+                metrics.remove(new MetricInfo(DATE_FORMAT.get().format(date), context.getClusterName()));
+                if (metrics.isEmpty()) {
+                    entityBacklogs.remove(entity);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void onFailure(WorkflowExecutionContext context) throws FalconException {
+        // Do Nothing
+    }
+
+    @Override
+    public void onStart(WorkflowExecutionContext context) throws FalconException {
+        // Do Nothing
+    }
+
+    @Override
+    public void onSuspend(WorkflowExecutionContext context) throws FalconException {
+        // Do Nothing
+    }
+
+    @Override
+    public void onWait(WorkflowExecutionContext context) throws FalconException {
+        // Do Nothing
+    }
+
+    /**
+     * Service which executes backlog evaluation and publishing metrics to Graphite parallel for entities.
+     */
+    public static class BacklogMetricEmitter implements Runnable {
+        private ThreadPoolExecutor executor;
+
+        @Override
+        public void run() {
+            LOG.debug("BacklogMetricEmitter running for entities");
+            executor = new ScheduledThreadPoolExecutor(10);
+            List<Future> futures = new ArrayList<>();
+            try {
+                for (Entity entity : entityBacklogs.keySet()) {
+                    futures.add(executor.submit(new BacklogCalcService(entity, entityBacklogs.get(entity))));
+                }
+                waitForFuturesToComplete(futures);
+            } finally {
+                executor.shutdown();
+            }
+        }
+
+        private void waitForFuturesToComplete(List<Future> futures) {
+            try {
+                for (Future future : futures) {
+                    future.get();
+                }
+            } catch (InterruptedException e) {
+                LOG.error("Interruption while executing tasks " + e);
+            } catch (ExecutionException e) {
+                LOG.error("Error in executing threads " + e);
+            }
+        }
+    }
+
+    /**
+     * Service which calculates backlog for given entity and publish to graphite.
+     */
+    public static class BacklogCalcService implements Runnable {
+
+        private Entity entityObj;
+        private List<MetricInfo> metrics;
+
+        BacklogCalcService(Entity entity, List<MetricInfo> metricInfoList) {
+            this.entityObj = entity;
+            this.metrics = metricInfoList;
+        }
+
+        @Override
+        public void run() {
+
+            MetricInfo metricInfo = null;
+            HashMap<String, Long> backLogsCluster = new HashMap<>();
+            synchronized (metrics) {
+                long currentTime = System.currentTimeMillis();
+                Iterator iter = metrics.iterator();
+                while (iter.hasNext()) {
+                    try {
+                        metricInfo = (MetricInfo) iter.next();
+                        long time = DATE_FORMAT.get().parse(metricInfo.getNominalTime()).getTime();
+                        long backlog = backLogsCluster.containsKey(metricInfo.getCluster())
+                                ? backLogsCluster.get(metricInfo.getCluster()) : 0;
+                        backlog += (currentTime - time);
+                        backLogsCluster.put(metricInfo.getCluster(), backlog);
+                    } catch (ParseException e) {
+                        LOG.error("Unable to parse nominal time" + metricInfo.getNominalTime());
+                    }
+                }
+
+            }
+            org.apache.falcon.entity.v0.process.Process process = (Process) entityObj;
+
+            if (backLogsCluster != null && !backLogsCluster.isEmpty()) {
+                for (Map.Entry<String, Long> entry : backLogsCluster.entrySet()) {
+                    String clusterName = entry.getKey();
+                    String pipelinesStr = process.getPipelines();
+                    String metricName;
+                    Long backlog = entry.getValue() / (60 * 1000L); // Converting to minutes
+                    if (pipelinesStr != null && !pipelinesStr.isEmpty()) {
+                        String[] pipelines = pipelinesStr.split(",");
+                        for (String pipeline : pipelines) {
+                            metricName = METRIC_PREFIX + METRIC_SEPARATOR + clusterName + METRIC_SEPARATOR
+                                    + pipeline + METRIC_SEPARATOR + LifeCycle.EXECUTION.name()
+                                    + METRIC_SEPARATOR + entityObj.getName() + METRIC_SEPARATOR
+                                    + "backlogInMins";
+                            metricNotificationService.publish(metricName, backlog);
+                        }
+                    } else {
+                        metricName = METRIC_PREFIX + METRIC_SEPARATOR + clusterName + METRIC_SEPARATOR
+                                + DEFAULT_PIPELINE + METRIC_SEPARATOR + LifeCycle.EXECUTION.name()
+                                + METRIC_SEPARATOR + entityObj.getName() + METRIC_SEPARATOR
+                                + "backlogInMins";
+                        metricNotificationService.publish(metricName, backlog);
+                    }
+                }
+            }
+        }
+    }
+
+
+    /**
+     * Service runs periodically and removes succeeded instances from backlog list.
+     */
+    public static class BacklogCheckService implements Runnable {
+
+        @Override
+        public void run() {
+            LOG.debug("BacklogCheckService running for entities");
+            try {
+                AbstractWorkflowEngine wfEngine = getWorkflowEngine();
+                for (Entity entity : entityBacklogs.keySet()) {
+                    List<MetricInfo> metrics = entityBacklogs.get(entity);
+                    if (!metrics.isEmpty()) {
+                        synchronized (metrics) {
+                            Iterator iterator = metrics.iterator();
+                            while (iterator.hasNext()) {
+                                MetricInfo metricInfo = (MetricInfo) iterator.next();
+                                String nominalTimeStr = metricInfo.getNominalTime();
+                                Date nominalTime;
+                                try {
+                                    nominalTime = DATE_FORMAT.get().parse(nominalTimeStr);
+                                    if (entity.getACL().getOwner() != null && !entity.getACL().getOwner().isEmpty()) {
+                                        CurrentUser.authenticate(entity.getACL().getOwner());
+                                    } else {
+                                        CurrentUser.authenticate(System.getProperty("user.name"));
+                                    }
+                                    if (wfEngine.isMissing(entity)) {
+                                        LOG.info("Entity of name {} was deleted so removing instance of "
+                                                + "nominaltime {} ", entity.getName(), nominalTimeStr);
+                                        backlogMetricStore.deleteMetricInstance(entity.getName(),
+                                                metricInfo.getCluster(), nominalTime, entity.getEntityType());
+                                        iterator.remove();
+                                        continue;
+                                    }
+                                    InstancesResult status = wfEngine.getStatus(entity, nominalTime,
+                                            nominalTime, null, null);
+                                    if (status.getInstances().length > 0
+                                            && status.getInstances()[0].status == InstancesResult.
+                                            WorkflowStatus.SUCCEEDED) {
+                                        LOG.debug("Instance of nominaltime {} of entity {} was succeeded, removing "
+                                                + "from backlog entries", nominalTimeStr, entity.getName());
+                                        backlogMetricStore.deleteMetricInstance(entity.getName(),
+                                                metricInfo.getCluster(), nominalTime, entity.getEntityType());
+                                        iterator.remove();
+                                    }
+                                } catch (ParseException e) {
+                                    LOG.error("Unable to parse date " + nominalTimeStr);
+                                }
+                            }
+                        }
+                    }
+                }
+            } catch (Throwable e) {
+                LOG.error("Error while checking backlog metrics" + e);
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/c7996deb/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java b/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java
index 57e46b7..a7cafeb 100644
--- a/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java
+++ b/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java
@@ -150,7 +150,7 @@ public final class EntitySLAAlertService implements FalconService, EntitySLAList
                     LOG.info("Entity :"+ entityName
                             + "Cluster:" + clusterName + "Nominal Time:" + nominalTime + "EntityType:"+ entityType
                             + "missed SLAHigh");
-                    highSLAMissed(entityName, clusterName, entityType, nominalTime);
+                    highSLAMissed(entityName, clusterName, EntityType.valueOf(entityType), nominalTime);
                 }
             }
         } catch (FalconException e){
@@ -160,12 +160,12 @@ public final class EntitySLAAlertService implements FalconService, EntitySLAList
     }
 
     @Override
-    public void highSLAMissed(String entityName, String clusterName, String entityType , Date nominalTime
+    public void highSLAMissed(String entityName, String clusterName, EntityType entityType , Date nominalTime
                               ) throws FalconException {
         LOG.debug("Listners called...");
         for (EntitySLAListener listener : listeners) {
             listener.highSLAMissed(entityName, clusterName, entityType, nominalTime);
-            store.deleteEntityAlertInstance(entityName, clusterName, nominalTime, entityType);
+            store.deleteEntityAlertInstance(entityName, clusterName, nominalTime, entityType.name());
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/c7996deb/prism/src/main/java/org/apache/falcon/service/EntitySLAListener.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/service/EntitySLAListener.java b/prism/src/main/java/org/apache/falcon/service/EntitySLAListener.java
index 421ea38..73d383b 100644
--- a/prism/src/main/java/org/apache/falcon/service/EntitySLAListener.java
+++ b/prism/src/main/java/org/apache/falcon/service/EntitySLAListener.java
@@ -18,6 +18,7 @@
 package org.apache.falcon.service;
 
 import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.v0.EntityType;
 
 import java.util.Date;
 
@@ -25,6 +26,6 @@ import java.util.Date;
  * Interface for FeedSLAAlert to be used by Listeners.
  */
 public interface EntitySLAListener {
-    void highSLAMissed(String entityName, String clusterName, String entityType, Date nominalTime)
+    void highSLAMissed(String entityName, String clusterName, EntityType entityType, Date nominalTime)
         throws FalconException;
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/c7996deb/prism/src/main/java/org/apache/falcon/util/MetricInfo.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/util/MetricInfo.java b/prism/src/main/java/org/apache/falcon/util/MetricInfo.java
new file mode 100644
index 0000000..694bb87
--- /dev/null
+++ b/prism/src/main/java/org/apache/falcon/util/MetricInfo.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.util;
+
+/**
+ * Storage for Backlog Metrics.
+ */
+public class MetricInfo {
+
+    private String nominalTime;
+    private String cluster;
+
+    public MetricInfo(String nominalTimeStr, String clusterName) {
+        this.nominalTime = nominalTimeStr;
+        this.cluster = clusterName;
+    }
+
+    public String getNominalTime() {
+        return nominalTime;
+    }
+
+    public String getCluster() {
+        return cluster;
+    }
+
+    public void setCluster(String cluster) {
+        this.cluster = cluster;
+    }
+
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || !o.getClass().equals(this.getClass())) {
+            return false;
+        }
+
+        MetricInfo other = (MetricInfo) o;
+
+        boolean nominalTimeEqual = this.getNominalTime() != null
+                ? this.getNominalTime().equals(other.getNominalTime()) : other.getNominalTime() == null;
+
+        boolean clusterEqual = this.getCluster() != null
+                ? this.getCluster().equals(other.getCluster()) : other.getCluster() == null;
+
+        return this == other
+                || (nominalTimeEqual && clusterEqual);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = nominalTime != null ? nominalTime.hashCode() : 0;
+        result = 31 * result + (cluster != null ? cluster.hashCode() : 0);
+        return result;
+    }
+
+    public String toString() {
+        return "Nominaltime: " + this.getNominalTime() + " cluster: " + this.getCluster();
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/c7996deb/prism/src/test/java/org/apache/falcon/service/BacklogMetricEmitterServiceTest.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/service/BacklogMetricEmitterServiceTest.java b/prism/src/test/java/org/apache/falcon/service/BacklogMetricEmitterServiceTest.java
new file mode 100644
index 0000000..67d256e
--- /dev/null
+++ b/prism/src/test/java/org/apache/falcon/service/BacklogMetricEmitterServiceTest.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.service;
+
+import org.apache.falcon.cluster.util.EmbeddedCluster;
+import org.apache.falcon.entity.AbstractTestBase;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.jdbc.BacklogMetricStore;
+import org.apache.falcon.metrics.MetricNotificationService;
+import org.apache.falcon.tools.FalconStateStoreDBCLI;
+import org.apache.falcon.util.StateStoreProperties;
+import org.apache.falcon.workflow.WorkflowExecutionArgs;
+import org.apache.falcon.workflow.WorkflowExecutionContext;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Test cases for Backlog Metric Store.
+ */
+public class BacklogMetricEmitterServiceTest extends AbstractTestBase{
+    private static final String DB_BASE_DIR = "target/test-data/backlogmetricdb";
+    protected static String dbLocation = DB_BASE_DIR + File.separator + "data.db";
+    protected static String url = "jdbc:derby:"+ dbLocation +";create=true";
+    protected static final String DB_SQL_FILE = DB_BASE_DIR + File.separator + "out.sql";
+    protected LocalFileSystem fs = new LocalFileSystem();
+
+    private static BacklogMetricStore backlogMetricStore;
+    private static FalconJPAService falconJPAService = FalconJPAService.get();
+    private static BacklogMetricEmitterService backlogMetricEmitterService;
+    private MetricNotificationService mockMetricNotificationService;
+
+    protected int execDBCLICommands(String[] args) {
+        return new FalconStateStoreDBCLI().run(args);
+    }
+
+    public void createDB(String file) {
+        File sqlFile = new File(file);
+        String[] argsCreate = { "create", "-sqlfile", sqlFile.getAbsolutePath(), "-run" };
+        int result = execDBCLICommands(argsCreate);
+        Assert.assertEquals(0, result);
+        Assert.assertTrue(sqlFile.exists());
+
+    }
+
+    @AfterClass
+    public void cleanup() throws IOException {
+        cleanupDB();
+    }
+
+    private void cleanupDB() throws IOException {
+        fs.delete(new Path(DB_BASE_DIR), true);
+    }
+
+    @BeforeClass
+    public void setup() throws Exception{
+        StateStoreProperties.get().setProperty(FalconJPAService.URL, url);
+        Configuration localConf = new Configuration();
+        fs.initialize(LocalFileSystem.getDefaultUri(localConf), localConf);
+        fs.mkdirs(new Path(DB_BASE_DIR));
+        createDB(DB_SQL_FILE);
+        falconJPAService.init();
+        this.dfsCluster = EmbeddedCluster.newCluster("testCluster");
+        this.conf = dfsCluster.getConf();
+        backlogMetricStore = new BacklogMetricStore();
+        mockMetricNotificationService = Mockito.mock(MetricNotificationService.class);
+        Mockito.when(mockMetricNotificationService.getName()).thenReturn("MetricNotificationService");
+        Services.get().register(mockMetricNotificationService);
+        Services.get().register(BacklogMetricEmitterService.get());
+        backlogMetricEmitterService = BacklogMetricEmitterService.get();
+
+    }
+
+
+    @Test
+    public void testBacklogEmitter() throws Exception {
+        backlogMetricEmitterService.init();
+        storeEntity(EntityType.PROCESS, "entity1");
+        backlogMetricEmitterService.highSLAMissed("entity1",  "cluster1", EntityType.PROCESS,
+                BacklogMetricEmitterService.DATE_FORMAT.get().parse("2016-06-30T00-00Z"));
+        Thread.sleep(10);
+        ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
+        ArgumentCaptor<Long> valueCaptor = ArgumentCaptor.forClass(Long.class);
+        Mockito.verify(mockMetricNotificationService, Mockito.atLeastOnce()).publish(captor.capture(),
+                valueCaptor.capture());
+        Assert.assertEquals(captor.getValue(), "falcon.cluster1.testPipeline.EXECUTION.entity1.backlogInMins");
+        WorkflowExecutionContext workflowExecutionContext = getWorkflowExecutionContext();
+        backlogMetricEmitterService.onSuccess(workflowExecutionContext);
+        Thread.sleep(100);
+        Mockito.reset(mockMetricNotificationService);
+        Mockito.verify(mockMetricNotificationService, Mockito.times(0)).publish(Mockito.any(String.class),
+                Mockito.any(Long.class));
+
+    }
+
+    private WorkflowExecutionContext getWorkflowExecutionContext() {
+        Map<WorkflowExecutionArgs, String> args = new HashMap<>();
+        args.put(WorkflowExecutionArgs.ENTITY_TYPE, "process");
+        args.put(WorkflowExecutionArgs.CLUSTER_NAME, "cluster1");
+        args.put(WorkflowExecutionArgs.ENTITY_NAME, "entity1");
+        args.put(WorkflowExecutionArgs.NOMINAL_TIME, "2016-06-30-00-00");
+        args.put(WorkflowExecutionArgs.OPERATION, "GENERATE");
+        WorkflowExecutionContext workflowExecutionContext = new WorkflowExecutionContext(args);
+        return workflowExecutionContext;
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/c7996deb/prism/src/test/resources/startup.properties
----------------------------------------------------------------------
diff --git a/prism/src/test/resources/startup.properties b/prism/src/test/resources/startup.properties
new file mode 100644
index 0000000..d72dbba
--- /dev/null
+++ b/prism/src/test/resources/startup.properties
@@ -0,0 +1,338 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+*.domain=debug
+
+######### Implementation classes #########
+## DONT MODIFY UNLESS SURE ABOUT CHANGE ##
+
+*.workflow.engine.impl=org.apache.falcon.workflow.engine.OozieWorkflowEngine
+*.lifecycle.engine.impl=org.apache.falcon.lifecycle.engine.oozie.OoziePolicyBuilderFactory
+*.oozie.process.workflow.builder=org.apache.falcon.workflow.OozieProcessWorkflowBuilder
+*.oozie.feed.workflow.builder=org.apache.falcon.workflow.OozieFeedWorkflowBuilder
+*.SchedulableEntityManager.impl=org.apache.falcon.resource.SchedulableEntityManager
+*.ConfigSyncService.impl=org.apache.falcon.resource.ConfigSyncService
+*.ProcessInstanceManager.impl=org.apache.falcon.resource.InstanceManager
+*.catalog.service.impl=org.apache.falcon.catalog.HiveCatalogService
+
+##### Falcon Services #####
+*.application.services=org.apache.falcon.security.AuthenticationInitializationService,\
+                        org.apache.falcon.workflow.WorkflowJobEndNotificationService, \
+                        org.apache.falcon.service.ProcessSubscriberService,\
+                        org.apache.falcon.extensions.ExtensionService,\
+                        org.apache.falcon.service.EntitySLAMonitoringService,\
+                        org.apache.falcon.service.LifecyclePolicyMap,\
+                        org.apache.falcon.entity.store.ConfigurationStore,\
+                        org.apache.falcon.rerun.service.RetryService,\
+                        org.apache.falcon.rerun.service.LateRunService,\
+                        org.apache.falcon.metadata.MetadataMappingService,\
+                        org.apache.falcon.service.LogCleanupService,\
+                        org.apache.falcon.service.GroupsService,\
+                        org.apache.falcon.service.ProxyUserService,\
+                        org.apache.falcon.service.FalconJPAService
+##Add if you want to send data to graphite
+#                        org.apache.falcon.metrics.MetricNotificationService\
+## Add if you want to use Falcon Azure integration ##
+#                        org.apache.falcon.adfservice.ADFProviderService
+## If you wish to use Falcon native scheduler add the commented out services below to application.services ##
+#                        org.apache.falcon.notification.service.impl.JobCompletionService,\
+#                        org.apache.falcon.notification.service.impl.SchedulerService,\
+#                        org.apache.falcon.notification.service.impl.AlarmService,\
+#                        org.apache.falcon.notification.service.impl.DataAvailabilityService,\
+#                        org.apache.falcon.execution.FalconExecutionService,\
+
+
+
+# List of Lifecycle policies configured.
+*.falcon.feed.lifecycle.policies=org.apache.falcon.lifecycle.retention.AgeBasedDelete
+# List of builders for the policies.
+*.falcon.feed.lifecycle.policy.builders=org.apache.falcon.lifecycle.engine.oozie.retention.AgeBasedDeleteBuilder
+##### Falcon Configuration Store Change listeners #####
+*.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\
+                        org.apache.falcon.entity.ColoClusterRelation,\
+                        org.apache.falcon.group.FeedGroupMap,\
+                        org.apache.falcon.entity.store.FeedLocationStore,\
+                        org.apache.falcon.service.EntitySLAMonitoringService,\
+                        org.apache.falcon.service.SharedLibraryHostingService
+## If you wish to use Falcon native scheduler, add the State store as a configstore listener. ##
+#                       org.apache.falcon.state.store.jdbc.JdbcStateStore
+
+## If you wish to use Feed Alert to know when a feed misses a high SLA register your class here
+*.feedAlert.listeners=
+
+##### JMS MQ Broker Implementation class #####
+*.broker.impl.class=org.apache.activemq.ActiveMQConnectionFactory
+
+##### List of shared libraries for Falcon workflows #####
+*.shared.libs=activemq-all,geronimo-j2ee-management,jms,json-simple,oozie-client,spring-jms,commons-lang3,commons-el
+
+##### Workflow Job Execution Completion listeners #####
+*.workflow.execution.listeners=
+
+######### Implementation classes #########
+
+
+######### System startup parameters #########
+
+# Location of libraries that is shipped to Hadoop
+*.system.lib.location=${FALCON_HOME}/sharedlibs
+
+# Location to store user entity configurations
+
+#Configurations used in UTs
+debug.config.store.uri=file://${user.dir}/target/store
+#Location to store state of Feed SLA monitoring service
+debug.feed.sla.service.store.uri= file://${user.dir}/target/data/sla/pendingfeedinstances
+debug.config.oozie.conf.uri=${user.dir}/target/oozie
+debug.system.lib.location=${system.lib.location}
+debug.broker.url=vm://localhost
+debug.retry.recorder.path=${user.dir}/target/retry
+debug.libext.feed.retention.paths=${falcon.libext}
+debug.libext.feed.replication.paths=${falcon.libext}
+debug.libext.process.paths=${falcon.libext}
+
+debug.extension.store.uri=file://${user.dir}/target/extension/store
+
+#Configurations used in ITs
+it.config.store.uri=file://${user.dir}/target/store
+it.config.oozie.conf.uri=${user.dir}/target/oozie
+it.system.lib.location=${system.lib.location}
+it.broker.url=tcp://localhost:61616
+it.retry.recorder.path=${user.dir}/target/retry
+it.libext.feed.retention.paths=${falcon.libext}
+it.libext.feed.replication.paths=${falcon.libext}
+it.libext.process.paths=${falcon.libext}
+it.workflow.execution.listeners=org.apache.falcon.catalog.CatalogPartitionHandler
+
+*.falcon.cleanup.service.frequency=minutes(5)
+
+######### Properties for Feed SLA Monitoring #########
+# frequency of serialization for the state of FeedSLAMonitoringService - 1 hour
+*.feed.sla.serialization.frequency.millis=3600000
+
+# Maximum number of pending instances per feed that will be recorded. After this older instances will be removed in
+# a FIFO fashion.
+*.feed.sla.queue.size=288
+
+# Do not change unless really sure
+# Frequency in seconds of "status check" for pending feed instances, default is 10 mins = 10 * 60
+*.feed.sla.statusCheck.frequency.seconds=600
+
+# Do not change unless really sure
+# Time Duration (in milliseconds) in future for generating pending feed instances.
+# In every cycle pending feed instances are added for monitoring, till this time in future.
+# It must be more than statusCheck frequency, default is 15 mins = 15 * 60 * 1000
+*.feed.sla.lookAheadWindow.millis=900000
+
+
+######### Properties for configuring JMS provider - activemq #########
+# Default Active MQ url
+*.broker.url=tcp://localhost:61616
+
+# default time-to-live for a JMS message 3 days (time in minutes)
+*.broker.ttlInMins=4320
+*.entity.topic=FALCON.ENTITY.TOPIC
+*.max.retry.failure.count=1
+*.retry.recorder.path=${user.dir}/logs/retry
+
+######### Properties for configuring iMon client and metric #########
+*.internal.queue.size=1000
+
+
+######### Graph Database Properties #########
+# Graph implementation
+*.falcon.graph.blueprints.graph=com.thinkaurelius.titan.core.TitanFactory
+
+# Graph Storage
+# IMPORTANT:   Please enable one of the graph db backend: hbase or berkeleydb, per instructions below.
+
+# Enable the following for Berkeley DB.  Make sure je-5.0.73.jar is downloaded and available
+# under Falcon webapp directory or under falcon server classpath.
+#*.falcon.graph.storage.backend=berkeleyje
+#*.falcon.graph.storage.directory=/${falcon.home}/data/graphdb
+#*.falcon.graph.serialize.path=${user.dir}/target/graphdb
+
+# Enable the following for HBase
+#*.falcon.graph.storage.backend=hbase
+# For standalone mode , set hostname to localhost; for distributed mode, set to the zookeeper quorum
+# @see http://s3.thinkaurelius.com/docs/titan/current/hbase.html#_remote_server_mode_2
+#*.falcon.graph.storage.hostname=localhost
+#*.falcon.graph.storage.hbase.table=falcon_titan
+
+# Avoid acquiring read lock when iterating over large graphs
+# See http://s3.thinkaurelius.com/docs/titan/0.5.4/bdb.html
+*.falcon.graph.storage.transactions=false
+
+# Uncomment and override the following properties for enabling metrics for titan db and pushing them to graphite. You
+# can use other reporters like ganglia also.
+# Refer (http://thinkaurelius.github.io/titan/wikidoc/0.4.2/Titan-Performance-and-Monitoring)for finding the
+# relevant configurations for your use case. NOTE: you have to prefix all the properties with "*.falcon.graph."
+# *.falcon.graph.storage.enable-basic-metrics = true
+# Required; IP or hostname string
+# *.falcon.graph.metrics.graphite.hostname = 192.168.0.1
+# Required; specify logging interval in milliseconds
+# *.falcon.graph.metrics.graphite.interval = 60000
+
+######### Authentication Properties #########
+
+# Authentication type must be specified: simple|kerberos
+*.falcon.authentication.type=simple
+
+##### Service Configuration
+
+# Indicates the Kerberos principal to be used in Falcon Service.
+*.falcon.service.authentication.kerberos.principal=
+
+# Location of the keytab file with the credentials for the Service principal.
+*.falcon.service.authentication.kerberos.keytab=
+
+# name node principal to talk to config store
+*.dfs.namenode.kerberos.principal=
+
+##### SPNEGO Configuration
+
+# Authentication type must be specified: simple|kerberos|<class>
+# org.apache.falcon.security.RemoteUserInHeaderBasedAuthenticationHandler can be used for backwards compatibility
+*.falcon.http.authentication.type=simple
+
+# Indicates how long (in seconds) an authentication token is valid before it has to be renewed.
+*.falcon.http.authentication.token.validity=36000
+
+# The signature secret for signing the authentication tokens.
+*.falcon.http.authentication.signature.secret=falcon
+
+# The domain to use for the HTTP cookie that stores the authentication token.
+*.falcon.http.authentication.cookie.domain=
+
+# Indicates if anonymous requests are allowed when using 'simple' authentication.
+*.falcon.http.authentication.simple.anonymous.allowed=false
+
+# Indicates the Kerberos principal to be used for HTTP endpoint.
+# The principal MUST start with 'HTTP/' as per Kerberos HTTP SPNEGO specification.
+*.falcon.http.authentication.kerberos.principal=
+
+# Location of the keytab file with the credentials for the HTTP principal.
+*.falcon.http.authentication.kerberos.keytab=
+
+# The kerberos names rules is to resolve kerberos principal names, refer to Hadoop's KerberosName for more details.
+*.falcon.http.authentication.kerberos.name.rules=DEFAULT
+
+# Comma separated list of black listed users
+*.falcon.http.authentication.blacklisted.users=
+
+######### Authentication Properties #########
+
+
+######### Authorization Properties #########
+
+# Authorization Enabled flag: false (default)|true
+*.falcon.security.authorization.enabled=false
+
+# The name of the group of super-users
+*.falcon.security.authorization.superusergroup=falcon
+
+# Admin Users, comma separated users
+*.falcon.security.authorization.admin.users=falcon,ambari-qa
+
+# Admin Group Membership, comma separated users
+*.falcon.security.authorization.admin.groups=falcon,staff
+
+# Authorization Provider Implementation Fully Qualified Class Name
+*.falcon.security.authorization.provider=org.apache.falcon.security.DefaultAuthorizationProvider
+
+######### Authorization Properties #########
+
+######### ADF Configurations start #########
+
+# A String object that represents the namespace
+*.microsoft.windowsazure.services.servicebus.namespace=
+
+# Request and status queues on the namespace
+*.microsoft.windowsazure.services.servicebus.requestqueuename=
+*.microsoft.windowsazure.services.servicebus.statusqueuename=
+
+# A String object that contains the SAS key name
+*.microsoft.windowsazure.services.servicebus.sasKeyName=
+
+# A String object that contains the SAS key
+*.microsoft.windowsazure.services.servicebus.sasKey=
+
+# A String object containing the base URI that is added to your Service Bus namespace to form the URI to connect
+# to the Service Bus service. To access the default public Azure service, pass ".servicebus.windows.net"
+*.microsoft.windowsazure.services.servicebus.serviceBusRootUri=
+
+# Service bus polling frequency
+*.microsoft.windowsazure.services.servicebus.polling.frequency=
+
+# Super user
+*.microsoft.windowsazure.services.servicebus.superuser=
+
+######### ADF Configurations end ###########
+
+######### SMTP Properties ########
+
+# Setting SMTP hostname
+#*.falcon.email.smtp.host=localhost
+
+# Setting SMTP port number
+#*.falcon.email.smtp.port=25
+
+# Setting email from address
+#*.falcon.email.from.address=falcon@localhost
+
+# Setting email Auth
+#*.falcon.email.smtp.auth=false
+
+#Setting user name
+#*.falcon.email.smtp.user=""
+
+#Setting password
+#*.falcon.email.smtp.password=""
+
+# Setting monitoring plugin, if SMTP parameters is defined
+#*.monitoring.plugins=org.apache.falcon.plugin.DefaultMonitoringPlugin,\
+#                     org.apache.falcon.plugin.EmailNotificationPlugin
+
+######### StateStore Properties #####
+#*.falcon.state.store.impl=org.apache.falcon.state.store.jdbc.JDBCStateStore
+#*.falcon.statestore.jdbc.driver=org.apache.derby.jdbc.EmbeddedDriver
+#*.falcon.statestore.jdbc.url=jdbc:derby:data/statestore.db;create=true
+#*.falcon.statestore.jdbc.username=sa
+#*.falcon.statestore.jdbc.password=
+#*.falcon.statestore.connection.data.source=org.apache.commons.dbcp.BasicDataSource
+## Maximum number of active connections that can be allocated from this pool at the same time.
+#*.falcon.statestore.pool.max.active.conn=10
+#*.falcon.statestore.connection.properties=
+## Indicates the interval (in milliseconds) between eviction runs.
+#*.falcon.statestore.validate.db.connection.eviction.interval=300000
+## The number of objects to examine during each run of the idle object evictor thread.
+#*.falcon.statestore.validate.db.connection.eviction.num=10
+## Creates Falcon DB.
+## If set to true, it creates the DB schema if it does not exist. If the DB schema exists is a NOP.
+## If set to false, it does not create the DB schema. If the DB schema does not exist it fails start up.
+#*.falcon.statestore.create.db.schema=true
+
+# Graphite properties
+*.falcon.graphite.hostname=localhost
+*.falcon.graphite.port=2003
+*.falcon.graphite.frequency=1
+*.falcon.graphite.prefix=falcon
+
+# Backlog Metric Properties
+*.falcon.backlog.metricservice.emit.interval.millisecs=10
+*.falcon.backlog.metricservice.recheck.interval.millisecs=1000

http://git-wip-us.apache.org/repos/asf/falcon/blob/c7996deb/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
index 6dbec0c..7b7da0a 100644
--- a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
+++ b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
@@ -128,6 +128,11 @@ public class FalconWorkflowEngine extends AbstractWorkflowEngine {
     }
 
     @Override
+    public boolean isMissing(Entity entity) throws FalconException {
+        return !STATE_STORE.entityExists(new EntityID(entity));
+    }
+
+    @Override
     public String suspend(Entity entity) throws FalconException {
         EXECUTION_SERVICE.suspend(entity);
         return "SUCCESS";

http://git-wip-us.apache.org/repos/asf/falcon/blob/c7996deb/src/build/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/src/build/findbugs-exclude.xml b/src/build/findbugs-exclude.xml
index 5c35b8c..346583d 100644
--- a/src/build/findbugs-exclude.xml
+++ b/src/build/findbugs-exclude.xml
@@ -57,6 +57,12 @@
         <Bug pattern="UWF_UNWRITTEN_FIELD,NP_BOOLEAN_RETURN_NULL" />
     </Match>
 
+    <Match>
+        <Class name="org.apache.falcon.persistence.BacklogMetricBean" />
+        <Bug pattern="UWF_UNWRITTEN_FIELD,NP_BOOLEAN_RETURN_NULL" />
+    </Match>
+
+
 
     <Match>
         <Class name="org.apache.falcon.persistence.MonitoredEntityBean" />

http://git-wip-us.apache.org/repos/asf/falcon/blob/c7996deb/src/conf/startup.properties
----------------------------------------------------------------------
diff --git a/src/conf/startup.properties b/src/conf/startup.properties
index a107eca..ef07e57 100644
--- a/src/conf/startup.properties
+++ b/src/conf/startup.properties
@@ -58,6 +58,15 @@
 
 ##Add if you want to send data to graphite
 #                        org.apache.falcon.metrics.MetricNotificationService\
+
+##Add if you want to enable BacklogMetricService
+#                        org.apache.falcon.service.FalconJPAService,\
+#                        org.apache.falcon.metrics.MetricNotificationService,\
+#                        org.apache.falcon.service.EntitySLAMonitoringService,\
+#                        org.apache.falcon.service.EntitySLAAlertService,\
+#                        org.apache.falcon.service.BacklogMetricEmitterService
+
+
 ## Add if you want to use Falcon Azure integration ##
 #                        org.apache.falcon.adfservice.ADFProviderService
 ## If you wish to use Falcon native scheduler uncomment out below  application services and comment out above application services ##
@@ -160,6 +169,9 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\
 # It must be more than statusCheck frequency, default is 15 mins = 15 * 60 * 1000
 *.feed.sla.lookAheadWindow.millis=900000
 
+##Add if you want to enable BacklogMetricService
+#*.feedAlert.listeners=org.apache.falcon.service.BacklogMetricEmitterService
+
 ######### Properties for configuring JMS provider - activemq #########
 # Default Active MQ url
 *.broker.url=tcp://localhost:61616
@@ -337,3 +349,7 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\
 #*.falcon.graphite.port=2003
 #*.falcon.graphite.frequency=1
 #*.falcon.graphite.prefix=falcon
+
+# Backlog Metric Properties
+#*.falcon.backlog.metricservice.emit.interval.millisecs=60000
+#*.falcon.backlog.metricservice.recheck.interval.millisecs=600000