You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pa...@apache.org on 2017/01/05 04:06:26 UTC

falcon git commit: FALCON-2229 BacklogEmitterMetricService fix for deleting entities' instances in case of removal of SLA or deletion of entity

Repository: falcon
Updated Branches:
  refs/heads/master 7c5822cf5 -> 3f6b69024


FALCON-2229 BacklogEmitterMetricService fix for deleting entities' instances in case of removal of SLA or deletion of entity

Author: sandeep <sa...@gmail.com>

Reviewers: @PracheerAgarwal, @pallavi-rao

Closes #335 from sandeepSamudrala/FALCON-2229 and squashes the following commits:

280a079 [sandeep] FALCON-2229 Removed cluster from the named query that deletes all the instances irrespective of the cluster
22a80b6 [sandeep] FALCON-2229 Incorporated review comments. Renamed the method
5cb25e8 [sandeep] FALCON-2229 BacklogEmitterMetricService fix for deleting entities' instances in case of removal of SLA or deletion of entity
3558af3 [sandeep] Merge branch 'master' of https://github.com/apache/falcon into FALCON-2229
cc28658 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
089b10d [sandeep] Merge branch 'master' of https://github.com/apache/falcon
456d4ee [sandeep] Merge branch 'master' of https://github.com/apache/falcon
0cf9af6 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
4a2e23e [sandeep] Merge branch 'master' of https://github.com/apache/falcon
b1546ed [sandeep] Merge branch 'master' of https://github.com/apache/falcon
0a433fb [sandeep] Merge branch 'master' of https://github.com/apache/falcon
194f36a [sandeep] Merge branch 'master' of https://github.com/apache/falcon
e0ad358 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
f96a084 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
9cf36e9 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
bbca081 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
48f6afa [sandeep] Merge branch 'master' of https://github.com/apache/falcon
250cc46 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
d0393e9 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
a178805 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
d6dc8bf [sandeep] Merge branch 'master' of https://github.com/apache/falcon
1bb8d3c [sandeep] Merge branch 'master' of https://github.com/apache/falcon
c065566 [sandeep] reverting last line changes made
1a4dcd2 [sandeep] rebased and resolved the conflicts from master
271318b [sandeep] FALCON-2097. Adding UT to the new method for getting next instance time with Delay.
a94d4fe [sandeep] rebasing from master
9e68a57 [sandeep] FALCON-298. Feed update with replication delay creates holes


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/3f6b6902
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/3f6b6902
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/3f6b6902

Branch: refs/heads/master
Commit: 3f6b6902407b5a8c4c600cab0d9fb92a935bd33b
Parents: 7c5822c
Author: sandeep <sa...@gmail.com>
Authored: Thu Jan 5 09:36:13 2017 +0530
Committer: Pallavi Rao <pa...@inmobi.com>
Committed: Thu Jan 5 09:36:13 2017 +0530

----------------------------------------------------------------------
 .../falcon/persistence/BacklogMetricBean.java   |  3 +-
 .../apache/falcon/jdbc/BacklogMetricStore.java  | 11 ++--
 .../service/BacklogMetricEmitterService.java    | 63 +++++++++-----------
 .../service/EntitySLAMonitoringService.java     | 20 +++----
 .../BacklogMetricEmitterServiceTest.java        | 28 +++++----
 5 files changed, 62 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/3f6b6902/common/src/main/java/org/apache/falcon/persistence/BacklogMetricBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/BacklogMetricBean.java b/common/src/main/java/org/apache/falcon/persistence/BacklogMetricBean.java
index b563da7..a959fb5 100644
--- a/common/src/main/java/org/apache/falcon/persistence/BacklogMetricBean.java
+++ b/common/src/main/java/org/apache/falcon/persistence/BacklogMetricBean.java
@@ -38,7 +38,8 @@ import java.util.Date;
 @Entity
 @NamedQueries({
         @NamedQuery(name = PersistenceConstants.GET_ALL_BACKLOG_INSTANCES, query = "select  OBJECT(a) from BacklogMetricBean a "),
-        @NamedQuery(name = PersistenceConstants.DELETE_BACKLOG_METRIC_INSTANCE, query = "delete from BacklogMetricBean a where a.entityName = :entityName and a.clusterName = :clusterName and a.nominalTime = :nominalTime and a.entityType = :entityType")
+        @NamedQuery(name = PersistenceConstants.DELETE_BACKLOG_METRIC_INSTANCE, query = "delete from BacklogMetricBean a where a.entityName = :entityName and a.clusterName = :clusterName and a.nominalTime = :nominalTime and a.entityType = :entityType"),
+        @NamedQuery(name = PersistenceConstants.DELETE_ALL_BACKLOG_ENTITY_INSTANCES, query = "delete from BacklogMetricBean a where a.entityName = :entityName and a.entityType = :entityType")
 })
 //RESUME CHECKSTYLE CHECK  LineLengthCheck
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/3f6b6902/prism/src/main/java/org/apache/falcon/jdbc/BacklogMetricStore.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/jdbc/BacklogMetricStore.java b/prism/src/main/java/org/apache/falcon/jdbc/BacklogMetricStore.java
index 8bb8bbb..010aefa 100644
--- a/prism/src/main/java/org/apache/falcon/jdbc/BacklogMetricStore.java
+++ b/prism/src/main/java/org/apache/falcon/jdbc/BacklogMetricStore.java
@@ -37,7 +37,7 @@ import java.util.List;
 import java.util.Map;
 
 /**
- * Backlog Metric Store for entitties.
+ * Backlog Metric Store for entities' backlog instances.
  */
 public class BacklogMetricStore {
 
@@ -70,18 +70,19 @@ public class BacklogMetricStore {
         q.setParameter("clusterName", cluster);
         q.setParameter("nominalTime", nominalTime);
         q.setParameter("entityType", entityType.name());
-        try{
+        try {
             q.executeUpdate();
         } finally {
             commitAndCloseTransaction(entityManager);
         }
     }
 
-    public void deleteEntityInstance(String entityName){
+    public void deleteEntityBackLogInstances(String entityName, String entityType) {
         EntityManager entityManager = getEntityManager();
         beginTransaction(entityManager);
         Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_ALL_BACKLOG_ENTITY_INSTANCES);
         q.setParameter("entityName", entityName);
+        q.setParameter("entityType", entityType);
         try {
             q.executeUpdate();
         } finally {
@@ -110,7 +111,7 @@ public class BacklogMetricStore {
             if (CollectionUtils.isEmpty(result)) {
                 return null;
             }
-        } finally{
+        } finally {
             entityManager.close();
         }
 
@@ -121,7 +122,7 @@ public class BacklogMetricStore {
             if (!backlogMetrics.containsKey(entity)) {
                 backlogMetrics.put(entity, new ArrayList<MetricInfo>());
             }
-            List<MetricInfo> metrics =  backlogMetrics.get(entity);
+            List<MetricInfo> metrics = backlogMetrics.get(entity);
             MetricInfo metricInfo = new MetricInfo(BacklogMetricEmitterService.DATE_FORMAT.get()
                     .format(backlogMetricBean.getNominalTime()),
                     backlogMetricBean.getClusterName());

http://git-wip-us.apache.org/repos/asf/falcon/blob/3f6b6902/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java b/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java
index 16830f9..5a323fd 100644
--- a/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java
+++ b/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java
@@ -42,7 +42,6 @@ import org.slf4j.LoggerFactory;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
@@ -82,7 +81,7 @@ public final class BacklogMetricEmitterService implements FalconService,
             Services.get().getService(MetricNotificationService.SERVICE_NAME);
 
     private static final List<LifeCycle> PROCESS_LIFE_CYCLE =
-            Arrays.asList(LifeCycle.valueOf(LifeCycle.EXECUTION.name()));
+            Collections.singletonList(LifeCycle.valueOf(LifeCycle.EXECUTION.name()));
 
     public static BacklogMetricEmitterService get() {
         return SERVICE;
@@ -107,18 +106,18 @@ public final class BacklogMetricEmitterService implements FalconService,
     private static ConcurrentHashMap<Entity, List<MetricInfo>> entityBacklogs = new ConcurrentHashMap<>();
 
     @Override
-    public void onAdd(Entity entity) throws FalconException{
+    public void onAdd(Entity entity) throws FalconException {
         addToBacklog(entity);
     }
 
     @Override
-    public void onRemove(Entity entity) throws FalconException{
-        if (entity.getEntityType() != EntityType.PROCESS){
+    public void onRemove(Entity entity) throws FalconException {
+        if (entity.getEntityType() != EntityType.PROCESS) {
             return;
         }
         Process process = (Process) entity;
         if (process.getSla() != null) {
-            backlogMetricStore.deleteEntityInstance(entity.getName());
+            backlogMetricStore.deleteEntityBackLogInstances(entity.getName(), entity.getEntityType().name());
             entityBacklogs.remove(entity);
             process = EntityUtil.getEntity(entity.getEntityType(), entity.getName());
             for (Cluster cluster : process.getClusters().getClusters()) {
@@ -127,7 +126,7 @@ public final class BacklogMetricEmitterService implements FalconService,
         }
     }
 
-    public void dropMetric(String clusterName, Process process){
+    private void dropMetric(String clusterName, Process process) {
         String pipelinesStr = process.getPipelines();
         String metricName;
 
@@ -144,15 +143,15 @@ public final class BacklogMetricEmitterService implements FalconService,
     }
 
     @Override
-    public void onChange(Entity oldEntity, Entity newEntity) throws FalconException{
-        if (oldEntity.getEntityType() != EntityType.PROCESS){
+    public void onChange(Entity oldEntity, Entity newEntity) throws FalconException {
+        if (oldEntity.getEntityType() != EntityType.PROCESS) {
             return;
         }
         Process newProcess = (Process) newEntity;
         Process oldProcess = EntityUtil.getEntity(oldEntity.getEntityType(), oldEntity.getName());
-        if (newProcess.getSla() == null || newProcess.getSla().getShouldEndIn() == null){
+        if (newProcess.getSla() == null || newProcess.getSla().getShouldEndIn() == null) {
             if (oldProcess.getSla() != null) {
-                backlogMetricStore.deleteEntityInstance(newProcess.getName());
+                backlogMetricStore.deleteEntityBackLogInstances(newProcess.getName(), newEntity.getEntityType().name());
                 entityBacklogs.remove(newProcess);
                 for (Cluster cluster : oldProcess.getClusters().getClusters()) {
                     dropMetric(cluster.getName(), oldProcess);
@@ -164,16 +163,16 @@ public final class BacklogMetricEmitterService implements FalconService,
     }
 
     @Override
-    public void onReload(Entity entity) throws FalconException{
+    public void onReload(Entity entity) throws FalconException {
         addToBacklog(entity);
     }
 
-    public void addToBacklog(Entity entity) {
+    private void addToBacklog(Entity entity) {
         if (entity.getEntityType() != EntityType.PROCESS) {
             return;
         }
         Process process = (Process) entity;
-        if (process.getSla() == null){
+        if (process.getSla() == null) {
             return;
         }
         entityBacklogs.putIfAbsent(entity, Collections.synchronizedList(new ArrayList<MetricInfo>()));
@@ -277,9 +276,9 @@ public final class BacklogMetricEmitterService implements FalconService,
     }
 
     /**
-     * Service which executes backlog evaluation and publishing metrics to Graphite parallel for entities.
+     * Service that executes backlog evaluation and publishes metrics to Graphite for entities in parallel.
      */
-    public static class BacklogMetricEmitter implements Runnable {
+    private static class BacklogMetricEmitter implements Runnable {
         private ThreadPoolExecutor executor;
 
         @Override
@@ -311,9 +310,9 @@ public final class BacklogMetricEmitterService implements FalconService,
     }
 
     /**
-     * Service which calculates backlog for given entity and publish to graphite.
+     * Service that calculates backlog for given entity and publishes them to graphite.
      */
-    public static class BacklogCalcService implements Runnable {
+    private static class BacklogCalcService implements Runnable {
 
         private Entity entityObj;
         private List<MetricInfo> metrics;
@@ -329,18 +328,17 @@ public final class BacklogMetricEmitterService implements FalconService,
             MetricInfo metricInfo = null;
             HashMap<String, Long> backLogsCluster = new HashMap<>();
             synchronized (metrics) {
-                if (metrics.isEmpty()){
-                    Process process = (Process)entityObj;
+                if (metrics.isEmpty()) {
+                    Process process = (Process) entityObj;
                     Clusters clusters = process.getClusters();
-                    for (Cluster cluster : clusters.getClusters()){
+                    for (Cluster cluster : clusters.getClusters()) {
                         publishBacklog(process, cluster.getName(), 0L);
                     }
-                }else{
+                } else {
                     long currentTime = System.currentTimeMillis();
-                    Iterator iter = metrics.iterator();
-                    while (iter.hasNext()) {
+                    for (MetricInfo metric : metrics) {
                         try {
-                            metricInfo = (MetricInfo) iter.next();
+                            metricInfo = metric;
                             long time = DATE_FORMAT.get().parse(metricInfo.getNominalTime()).getTime();
                             long backlog = backLogsCluster.containsKey(metricInfo.getCluster())
                                     ? backLogsCluster.get(metricInfo.getCluster()) : 0;
@@ -366,7 +364,7 @@ public final class BacklogMetricEmitterService implements FalconService,
     }
 
 
-    public static void publishBacklog(Process process, String clusterName, Long backlog){
+    private static void publishBacklog(Process process, String clusterName, Long backlog) {
         String pipelinesStr = process.getPipelines();
         String metricName;
 
@@ -382,19 +380,17 @@ public final class BacklogMetricEmitterService implements FalconService,
         }
     }
 
-    public static String getMetricName(String clusterName, String processName, String pipeline){
-        String metricName = METRIC_PREFIX + METRIC_SEPARATOR + clusterName + METRIC_SEPARATOR
+    private static String getMetricName(String clusterName, String processName, String pipeline) {
+        return METRIC_PREFIX + METRIC_SEPARATOR + clusterName + METRIC_SEPARATOR
                 + pipeline + METRIC_SEPARATOR + LifeCycle.EXECUTION.name()
                 + METRIC_SEPARATOR + processName + METRIC_SEPARATOR
                 + "backlogInMins";
-        return metricName;
     }
 
     /**
      * Service runs periodically and removes succeeded instances from backlog list.
      */
-    public static class BacklogCheckService implements Runnable {
-
+    private static class BacklogCheckService implements Runnable {
         @Override
         public void run() {
             LOG.trace("BacklogCheckService running for entities");
@@ -414,7 +410,7 @@ public final class BacklogMetricEmitterService implements FalconService,
                                     authenticateUser(entity);
                                     if (wfEngine.isMissing(entity)) {
                                         LOG.info("Entity of name {} was deleted so removing instance of "
-                                                + "nominaltime {} ", entity.getName(), nominalTimeStr);
+                                                + "nominal time {} ", entity.getName(), nominalTimeStr);
                                         backlogMetricStore.deleteMetricInstance(entity.getName(),
                                                 metricInfo.getCluster(), nominalTime, entity.getEntityType());
                                         iterator.remove();
@@ -444,7 +440,7 @@ public final class BacklogMetricEmitterService implements FalconService,
         }
     }
 
-    private static void authenticateUser(Entity entity){
+    private static void authenticateUser(Entity entity) {
         if (!CurrentUser.isAuthenticated()) {
             if (StringUtils.isNotBlank(entity.getACL().getOwner())) {
                 CurrentUser.authenticate(entity.getACL().getOwner());
@@ -453,5 +449,4 @@ public final class BacklogMetricEmitterService implements FalconService,
             }
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/3f6b6902/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java b/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java
index 451fb95..09671d9 100644
--- a/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java
+++ b/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java
@@ -20,7 +20,7 @@ package org.apache.falcon.service;
 import com.google.common.annotations.VisibleForTesting;
 import java.text.ParseException;
 import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.Collections;
 import java.util.Date;
 import java.util.HashSet;
 import java.util.List;
@@ -80,12 +80,12 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList
 
     private static final EntitySLAMonitoringService SERVICE = new EntitySLAMonitoringService();
 
-    public static final String TAG_CRITICAL = "Missed-SLA-High";
-    public static final String TAG_WARN = "Missed-SLA-Low";
+    static final String TAG_CRITICAL = "Missed-SLA-High";
+    static final String TAG_WARN = "Missed-SLA-Low";
     private static final long MINUTE_DELAY = 60000L;
 
     private static final List<LifeCycle> PROCESS_LIFE_CYCLE =
-            Arrays.asList(LifeCycle.valueOf(LifeCycle.EXECUTION.name()));
+            Collections.singletonList(LifeCycle.valueOf(LifeCycle.EXECUTION.name()));
 
     private EntitySLAMonitoringService() {
 
@@ -176,7 +176,7 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList
         }
     }
 
-    public Boolean checkFeedClusterSLA(Feed feed){
+    private Boolean checkFeedClusterSLA(Feed feed){
         for(Cluster  cluster : feed.getClusters().getClusters()){
             Sla sla =  FeedHelper.getSLA(cluster, feed);
             if (sla != null){
@@ -187,7 +187,7 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList
     }
 
 
-    public Boolean checkProcessClusterSLA(Process process){
+    private Boolean checkProcessClusterSLA(Process process){
         Clusters clusters = process.getClusters();
         for(org.apache.falcon.entity.v0.process.Cluster  cluster : clusters.getClusters()){
             org.apache.falcon.entity.v0.process.Sla sla =  ProcessHelper.getSLA(cluster, process);
@@ -292,7 +292,7 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList
         }
     }
 
-    void updatePendingInstances(String entityName, List<String> slaRemovedClusters , String entityType){
+    private void updatePendingInstances(String entityName, List<String> slaRemovedClusters, String entityType){
         for(String clusterName :slaRemovedClusters){
             MONITORING_JDBC_STATE_STORE.deletePendingInstances(entityName, clusterName,
                     entityType);
@@ -384,7 +384,7 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList
         }
     }
 
-    void addPendingEntityInstances(Date checkPointTime) throws FalconException {
+    private void addPendingEntityInstances(Date checkPointTime) throws FalconException {
         Set<String> currentClusters = DeploymentUtil.getCurrentClusters();
         List<MonitoredEntityBean> entityBeanList = MONITORING_JDBC_STATE_STORE.getAllMonitoredEntities();
         for(MonitoredEntityBean monitoredEntityBean : entityBeanList) {
@@ -611,8 +611,8 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList
         return result;
     }
 
-    Set<Pair<Date, String>> getProcessSLAStatus(org.apache.falcon.entity.v0.process.Sla sla, Date start,
-                                                Date end, List<Date> missingInstances) throws FalconException {
+    private Set<Pair<Date, String>> getProcessSLAStatus(org.apache.falcon.entity.v0.process.Sla sla, Date start,
+                                                        Date end, List<Date> missingInstances) throws FalconException {
         Date now = new Date();
         Frequency slaHigh = sla.getShouldEndIn();
         Set<Pair<Date, String>> result = new HashSet<>();

http://git-wip-us.apache.org/repos/asf/falcon/blob/3f6b6902/prism/src/test/java/org/apache/falcon/service/BacklogMetricEmitterServiceTest.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/service/BacklogMetricEmitterServiceTest.java b/prism/src/test/java/org/apache/falcon/service/BacklogMetricEmitterServiceTest.java
index 67d256e..206eced 100644
--- a/prism/src/test/java/org/apache/falcon/service/BacklogMetricEmitterServiceTest.java
+++ b/prism/src/test/java/org/apache/falcon/service/BacklogMetricEmitterServiceTest.java
@@ -19,6 +19,7 @@ package org.apache.falcon.service;
 
 import org.apache.falcon.cluster.util.EmbeddedCluster;
 import org.apache.falcon.entity.AbstractTestBase;
+import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.jdbc.BacklogMetricStore;
 import org.apache.falcon.metrics.MetricNotificationService;
@@ -46,9 +47,9 @@ import java.util.Map;
  */
 public class BacklogMetricEmitterServiceTest extends AbstractTestBase{
     private static final String DB_BASE_DIR = "target/test-data/backlogmetricdb";
-    protected static String dbLocation = DB_BASE_DIR + File.separator + "data.db";
-    protected static String url = "jdbc:derby:"+ dbLocation +";create=true";
-    protected static final String DB_SQL_FILE = DB_BASE_DIR + File.separator + "out.sql";
+    private static String dbLocation = DB_BASE_DIR + File.separator + "data.db";
+    protected static String url = "jdbc:derby:" + dbLocation + ";create=true";
+    private static final String DB_SQL_FILE = DB_BASE_DIR + File.separator + "out.sql";
     protected LocalFileSystem fs = new LocalFileSystem();
 
     private static BacklogMetricStore backlogMetricStore;
@@ -56,13 +57,13 @@ public class BacklogMetricEmitterServiceTest extends AbstractTestBase{
     private static BacklogMetricEmitterService backlogMetricEmitterService;
     private MetricNotificationService mockMetricNotificationService;
 
-    protected int execDBCLICommands(String[] args) {
+    private int execDBCLICommands(String[] args) {
         return new FalconStateStoreDBCLI().run(args);
     }
 
-    public void createDB(String file) {
+    private void createDB(String file) {
         File sqlFile = new File(file);
-        String[] argsCreate = { "create", "-sqlfile", sqlFile.getAbsolutePath(), "-run" };
+        String[] argsCreate = {"create", "-sqlfile", sqlFile.getAbsolutePath(), "-run"};
         int result = execDBCLICommands(argsCreate);
         Assert.assertEquals(0, result);
         Assert.assertTrue(sqlFile.exists());
@@ -79,7 +80,7 @@ public class BacklogMetricEmitterServiceTest extends AbstractTestBase{
     }
 
     @BeforeClass
-    public void setup() throws Exception{
+    public void setup() throws Exception {
         StateStoreProperties.get().setProperty(FalconJPAService.URL, url);
         Configuration localConf = new Configuration();
         fs.initialize(LocalFileSystem.getDefaultUri(localConf), localConf);
@@ -94,7 +95,6 @@ public class BacklogMetricEmitterServiceTest extends AbstractTestBase{
         Services.get().register(mockMetricNotificationService);
         Services.get().register(BacklogMetricEmitterService.get());
         backlogMetricEmitterService = BacklogMetricEmitterService.get();
-
     }
 
 
@@ -102,7 +102,7 @@ public class BacklogMetricEmitterServiceTest extends AbstractTestBase{
     public void testBacklogEmitter() throws Exception {
         backlogMetricEmitterService.init();
         storeEntity(EntityType.PROCESS, "entity1");
-        backlogMetricEmitterService.highSLAMissed("entity1",  "cluster1", EntityType.PROCESS,
+        backlogMetricEmitterService.highSLAMissed("entity1", "cluster1", EntityType.PROCESS,
                 BacklogMetricEmitterService.DATE_FORMAT.get().parse("2016-06-30T00-00Z"));
         Thread.sleep(10);
         ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
@@ -116,7 +116,11 @@ public class BacklogMetricEmitterServiceTest extends AbstractTestBase{
         Mockito.reset(mockMetricNotificationService);
         Mockito.verify(mockMetricNotificationService, Mockito.times(0)).publish(Mockito.any(String.class),
                 Mockito.any(Long.class));
-
+        backlogMetricEmitterService.highSLAMissed("entity1", "cluster1", EntityType.PROCESS,
+                BacklogMetricEmitterService.DATE_FORMAT.get().parse("2016-06-30T00-00Z"));
+        Thread.sleep(1000);
+        backlogMetricEmitterService.onRemove(EntityUtil.getEntity(EntityType.PROCESS, "entity1"));
+        Assert.assertNull(backlogMetricStore.getAllInstances());
     }
 
     private WorkflowExecutionContext getWorkflowExecutionContext() {
@@ -126,8 +130,6 @@ public class BacklogMetricEmitterServiceTest extends AbstractTestBase{
         args.put(WorkflowExecutionArgs.ENTITY_NAME, "entity1");
         args.put(WorkflowExecutionArgs.NOMINAL_TIME, "2016-06-30-00-00");
         args.put(WorkflowExecutionArgs.OPERATION, "GENERATE");
-        WorkflowExecutionContext workflowExecutionContext = new WorkflowExecutionContext(args);
-        return workflowExecutionContext;
-
+        return new WorkflowExecutionContext(args);
     }
 }