You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pa...@apache.org on 2017/01/05 04:06:26 UTC
falcon git commit: FALCON-2229 BacklogEmitterMetricService fix for
deleting entities' instances in case of removal of SLA or deletion of entity
Repository: falcon
Updated Branches:
refs/heads/master 7c5822cf5 -> 3f6b69024
FALCON-2229 BacklogEmitterMetricService fix for deleting entities' instances in case of removal of SLA or deletion of entity
Author: sandeep <sa...@gmail.com>
Reviewers: @PracheerAgarwal, @pallavi-rao
Closes #335 from sandeepSamudrala/FALCON-2229 and squashes the following commits:
280a079 [sandeep] FALCON-2229 Removed cluster from the named query that deletes all the instances irrespective of the cluster
22a80b6 [sandeep] FALCON-2229 Incorporated review comments. Renamed the method
5cb25e8 [sandeep] FALCON-2229 BacklogEmitterMetricService fix for deleting entities' instances in case of removal of SLA or deletion of entity
3558af3 [sandeep] Merge branch 'master' of https://github.com/apache/falcon into FALCON-2229
cc28658 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
089b10d [sandeep] Merge branch 'master' of https://github.com/apache/falcon
456d4ee [sandeep] Merge branch 'master' of https://github.com/apache/falcon
0cf9af6 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
4a2e23e [sandeep] Merge branch 'master' of https://github.com/apache/falcon
b1546ed [sandeep] Merge branch 'master' of https://github.com/apache/falcon
0a433fb [sandeep] Merge branch 'master' of https://github.com/apache/falcon
194f36a [sandeep] Merge branch 'master' of https://github.com/apache/falcon
e0ad358 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
f96a084 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
9cf36e9 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
bbca081 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
48f6afa [sandeep] Merge branch 'master' of https://github.com/apache/falcon
250cc46 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
d0393e9 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
a178805 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
d6dc8bf [sandeep] Merge branch 'master' of https://github.com/apache/falcon
1bb8d3c [sandeep] Merge branch 'master' of https://github.com/apache/falcon
c065566 [sandeep] reverting last line changes made
1a4dcd2 [sandeep] rebased and resolved the conflicts from master
271318b [sandeep] FALCON-2097. Adding UT to the new method for getting next instance time with Delay.
a94d4fe [sandeep] rebasing from master
9e68a57 [sandeep] FALCON-298. Feed update with replication delay creates holes
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/3f6b6902
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/3f6b6902
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/3f6b6902
Branch: refs/heads/master
Commit: 3f6b6902407b5a8c4c600cab0d9fb92a935bd33b
Parents: 7c5822c
Author: sandeep <sa...@gmail.com>
Authored: Thu Jan 5 09:36:13 2017 +0530
Committer: Pallavi Rao <pa...@inmobi.com>
Committed: Thu Jan 5 09:36:13 2017 +0530
----------------------------------------------------------------------
.../falcon/persistence/BacklogMetricBean.java | 3 +-
.../apache/falcon/jdbc/BacklogMetricStore.java | 11 ++--
.../service/BacklogMetricEmitterService.java | 63 +++++++++-----------
.../service/EntitySLAMonitoringService.java | 20 +++----
.../BacklogMetricEmitterServiceTest.java | 28 +++++----
5 files changed, 62 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/3f6b6902/common/src/main/java/org/apache/falcon/persistence/BacklogMetricBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/BacklogMetricBean.java b/common/src/main/java/org/apache/falcon/persistence/BacklogMetricBean.java
index b563da7..a959fb5 100644
--- a/common/src/main/java/org/apache/falcon/persistence/BacklogMetricBean.java
+++ b/common/src/main/java/org/apache/falcon/persistence/BacklogMetricBean.java
@@ -38,7 +38,8 @@ import java.util.Date;
@Entity
@NamedQueries({
@NamedQuery(name = PersistenceConstants.GET_ALL_BACKLOG_INSTANCES, query = "select OBJECT(a) from BacklogMetricBean a "),
- @NamedQuery(name = PersistenceConstants.DELETE_BACKLOG_METRIC_INSTANCE, query = "delete from BacklogMetricBean a where a.entityName = :entityName and a.clusterName = :clusterName and a.nominalTime = :nominalTime and a.entityType = :entityType")
+ @NamedQuery(name = PersistenceConstants.DELETE_BACKLOG_METRIC_INSTANCE, query = "delete from BacklogMetricBean a where a.entityName = :entityName and a.clusterName = :clusterName and a.nominalTime = :nominalTime and a.entityType = :entityType"),
+ @NamedQuery(name = PersistenceConstants.DELETE_ALL_BACKLOG_ENTITY_INSTANCES, query = "delete from BacklogMetricBean a where a.entityName = :entityName and a.entityType = :entityType")
})
//RESUME CHECKSTYLE CHECK LineLengthCheck
http://git-wip-us.apache.org/repos/asf/falcon/blob/3f6b6902/prism/src/main/java/org/apache/falcon/jdbc/BacklogMetricStore.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/jdbc/BacklogMetricStore.java b/prism/src/main/java/org/apache/falcon/jdbc/BacklogMetricStore.java
index 8bb8bbb..010aefa 100644
--- a/prism/src/main/java/org/apache/falcon/jdbc/BacklogMetricStore.java
+++ b/prism/src/main/java/org/apache/falcon/jdbc/BacklogMetricStore.java
@@ -37,7 +37,7 @@ import java.util.List;
import java.util.Map;
/**
- * Backlog Metric Store for entitties.
+ * Backlog Metric Store for entities' backlog instances.
*/
public class BacklogMetricStore {
@@ -70,18 +70,19 @@ public class BacklogMetricStore {
q.setParameter("clusterName", cluster);
q.setParameter("nominalTime", nominalTime);
q.setParameter("entityType", entityType.name());
- try{
+ try {
q.executeUpdate();
} finally {
commitAndCloseTransaction(entityManager);
}
}
- public void deleteEntityInstance(String entityName){
+ public void deleteEntityBackLogInstances(String entityName, String entityType) {
EntityManager entityManager = getEntityManager();
beginTransaction(entityManager);
Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_ALL_BACKLOG_ENTITY_INSTANCES);
q.setParameter("entityName", entityName);
+ q.setParameter("entityType", entityType);
try {
q.executeUpdate();
} finally {
@@ -110,7 +111,7 @@ public class BacklogMetricStore {
if (CollectionUtils.isEmpty(result)) {
return null;
}
- } finally{
+ } finally {
entityManager.close();
}
@@ -121,7 +122,7 @@ public class BacklogMetricStore {
if (!backlogMetrics.containsKey(entity)) {
backlogMetrics.put(entity, new ArrayList<MetricInfo>());
}
- List<MetricInfo> metrics = backlogMetrics.get(entity);
+ List<MetricInfo> metrics = backlogMetrics.get(entity);
MetricInfo metricInfo = new MetricInfo(BacklogMetricEmitterService.DATE_FORMAT.get()
.format(backlogMetricBean.getNominalTime()),
backlogMetricBean.getClusterName());
http://git-wip-us.apache.org/repos/asf/falcon/blob/3f6b6902/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java b/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java
index 16830f9..5a323fd 100644
--- a/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java
+++ b/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java
@@ -42,7 +42,6 @@ import org.slf4j.LoggerFactory;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
@@ -82,7 +81,7 @@ public final class BacklogMetricEmitterService implements FalconService,
Services.get().getService(MetricNotificationService.SERVICE_NAME);
private static final List<LifeCycle> PROCESS_LIFE_CYCLE =
- Arrays.asList(LifeCycle.valueOf(LifeCycle.EXECUTION.name()));
+ Collections.singletonList(LifeCycle.valueOf(LifeCycle.EXECUTION.name()));
public static BacklogMetricEmitterService get() {
return SERVICE;
@@ -107,18 +106,18 @@ public final class BacklogMetricEmitterService implements FalconService,
private static ConcurrentHashMap<Entity, List<MetricInfo>> entityBacklogs = new ConcurrentHashMap<>();
@Override
- public void onAdd(Entity entity) throws FalconException{
+ public void onAdd(Entity entity) throws FalconException {
addToBacklog(entity);
}
@Override
- public void onRemove(Entity entity) throws FalconException{
- if (entity.getEntityType() != EntityType.PROCESS){
+ public void onRemove(Entity entity) throws FalconException {
+ if (entity.getEntityType() != EntityType.PROCESS) {
return;
}
Process process = (Process) entity;
if (process.getSla() != null) {
- backlogMetricStore.deleteEntityInstance(entity.getName());
+ backlogMetricStore.deleteEntityBackLogInstances(entity.getName(), entity.getEntityType().name());
entityBacklogs.remove(entity);
process = EntityUtil.getEntity(entity.getEntityType(), entity.getName());
for (Cluster cluster : process.getClusters().getClusters()) {
@@ -127,7 +126,7 @@ public final class BacklogMetricEmitterService implements FalconService,
}
}
- public void dropMetric(String clusterName, Process process){
+ private void dropMetric(String clusterName, Process process) {
String pipelinesStr = process.getPipelines();
String metricName;
@@ -144,15 +143,15 @@ public final class BacklogMetricEmitterService implements FalconService,
}
@Override
- public void onChange(Entity oldEntity, Entity newEntity) throws FalconException{
- if (oldEntity.getEntityType() != EntityType.PROCESS){
+ public void onChange(Entity oldEntity, Entity newEntity) throws FalconException {
+ if (oldEntity.getEntityType() != EntityType.PROCESS) {
return;
}
Process newProcess = (Process) newEntity;
Process oldProcess = EntityUtil.getEntity(oldEntity.getEntityType(), oldEntity.getName());
- if (newProcess.getSla() == null || newProcess.getSla().getShouldEndIn() == null){
+ if (newProcess.getSla() == null || newProcess.getSla().getShouldEndIn() == null) {
if (oldProcess.getSla() != null) {
- backlogMetricStore.deleteEntityInstance(newProcess.getName());
+ backlogMetricStore.deleteEntityBackLogInstances(newProcess.getName(), newEntity.getEntityType().name());
entityBacklogs.remove(newProcess);
for (Cluster cluster : oldProcess.getClusters().getClusters()) {
dropMetric(cluster.getName(), oldProcess);
@@ -164,16 +163,16 @@ public final class BacklogMetricEmitterService implements FalconService,
}
@Override
- public void onReload(Entity entity) throws FalconException{
+ public void onReload(Entity entity) throws FalconException {
addToBacklog(entity);
}
- public void addToBacklog(Entity entity) {
+ private void addToBacklog(Entity entity) {
if (entity.getEntityType() != EntityType.PROCESS) {
return;
}
Process process = (Process) entity;
- if (process.getSla() == null){
+ if (process.getSla() == null) {
return;
}
entityBacklogs.putIfAbsent(entity, Collections.synchronizedList(new ArrayList<MetricInfo>()));
@@ -277,9 +276,9 @@ public final class BacklogMetricEmitterService implements FalconService,
}
/**
- * Service which executes backlog evaluation and publishing metrics to Graphite parallel for entities.
+ * Service that executes backlog evaluation and publishes metrics to Graphite for entities in parallel.
*/
- public static class BacklogMetricEmitter implements Runnable {
+ private static class BacklogMetricEmitter implements Runnable {
private ThreadPoolExecutor executor;
@Override
@@ -311,9 +310,9 @@ public final class BacklogMetricEmitterService implements FalconService,
}
/**
- * Service which calculates backlog for given entity and publish to graphite.
+ * Service that calculates backlog for given entity and publishes them to graphite.
*/
- public static class BacklogCalcService implements Runnable {
+ private static class BacklogCalcService implements Runnable {
private Entity entityObj;
private List<MetricInfo> metrics;
@@ -329,18 +328,17 @@ public final class BacklogMetricEmitterService implements FalconService,
MetricInfo metricInfo = null;
HashMap<String, Long> backLogsCluster = new HashMap<>();
synchronized (metrics) {
- if (metrics.isEmpty()){
- Process process = (Process)entityObj;
+ if (metrics.isEmpty()) {
+ Process process = (Process) entityObj;
Clusters clusters = process.getClusters();
- for (Cluster cluster : clusters.getClusters()){
+ for (Cluster cluster : clusters.getClusters()) {
publishBacklog(process, cluster.getName(), 0L);
}
- }else{
+ } else {
long currentTime = System.currentTimeMillis();
- Iterator iter = metrics.iterator();
- while (iter.hasNext()) {
+ for (MetricInfo metric : metrics) {
try {
- metricInfo = (MetricInfo) iter.next();
+ metricInfo = metric;
long time = DATE_FORMAT.get().parse(metricInfo.getNominalTime()).getTime();
long backlog = backLogsCluster.containsKey(metricInfo.getCluster())
? backLogsCluster.get(metricInfo.getCluster()) : 0;
@@ -366,7 +364,7 @@ public final class BacklogMetricEmitterService implements FalconService,
}
- public static void publishBacklog(Process process, String clusterName, Long backlog){
+ private static void publishBacklog(Process process, String clusterName, Long backlog) {
String pipelinesStr = process.getPipelines();
String metricName;
@@ -382,19 +380,17 @@ public final class BacklogMetricEmitterService implements FalconService,
}
}
- public static String getMetricName(String clusterName, String processName, String pipeline){
- String metricName = METRIC_PREFIX + METRIC_SEPARATOR + clusterName + METRIC_SEPARATOR
+ private static String getMetricName(String clusterName, String processName, String pipeline) {
+ return METRIC_PREFIX + METRIC_SEPARATOR + clusterName + METRIC_SEPARATOR
+ pipeline + METRIC_SEPARATOR + LifeCycle.EXECUTION.name()
+ METRIC_SEPARATOR + processName + METRIC_SEPARATOR
+ "backlogInMins";
- return metricName;
}
/**
* Service runs periodically and removes succeeded instances from backlog list.
*/
- public static class BacklogCheckService implements Runnable {
-
+ private static class BacklogCheckService implements Runnable {
@Override
public void run() {
LOG.trace("BacklogCheckService running for entities");
@@ -414,7 +410,7 @@ public final class BacklogMetricEmitterService implements FalconService,
authenticateUser(entity);
if (wfEngine.isMissing(entity)) {
LOG.info("Entity of name {} was deleted so removing instance of "
- + "nominaltime {} ", entity.getName(), nominalTimeStr);
+ + "nominal time {} ", entity.getName(), nominalTimeStr);
backlogMetricStore.deleteMetricInstance(entity.getName(),
metricInfo.getCluster(), nominalTime, entity.getEntityType());
iterator.remove();
@@ -444,7 +440,7 @@ public final class BacklogMetricEmitterService implements FalconService,
}
}
- private static void authenticateUser(Entity entity){
+ private static void authenticateUser(Entity entity) {
if (!CurrentUser.isAuthenticated()) {
if (StringUtils.isNotBlank(entity.getACL().getOwner())) {
CurrentUser.authenticate(entity.getACL().getOwner());
@@ -453,5 +449,4 @@ public final class BacklogMetricEmitterService implements FalconService,
}
}
}
-
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/3f6b6902/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java b/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java
index 451fb95..09671d9 100644
--- a/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java
+++ b/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java
@@ -20,7 +20,7 @@ package org.apache.falcon.service;
import com.google.common.annotations.VisibleForTesting;
import java.text.ParseException;
import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
@@ -80,12 +80,12 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList
private static final EntitySLAMonitoringService SERVICE = new EntitySLAMonitoringService();
- public static final String TAG_CRITICAL = "Missed-SLA-High";
- public static final String TAG_WARN = "Missed-SLA-Low";
+ static final String TAG_CRITICAL = "Missed-SLA-High";
+ static final String TAG_WARN = "Missed-SLA-Low";
private static final long MINUTE_DELAY = 60000L;
private static final List<LifeCycle> PROCESS_LIFE_CYCLE =
- Arrays.asList(LifeCycle.valueOf(LifeCycle.EXECUTION.name()));
+ Collections.singletonList(LifeCycle.valueOf(LifeCycle.EXECUTION.name()));
private EntitySLAMonitoringService() {
@@ -176,7 +176,7 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList
}
}
- public Boolean checkFeedClusterSLA(Feed feed){
+ private Boolean checkFeedClusterSLA(Feed feed){
for(Cluster cluster : feed.getClusters().getClusters()){
Sla sla = FeedHelper.getSLA(cluster, feed);
if (sla != null){
@@ -187,7 +187,7 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList
}
- public Boolean checkProcessClusterSLA(Process process){
+ private Boolean checkProcessClusterSLA(Process process){
Clusters clusters = process.getClusters();
for(org.apache.falcon.entity.v0.process.Cluster cluster : clusters.getClusters()){
org.apache.falcon.entity.v0.process.Sla sla = ProcessHelper.getSLA(cluster, process);
@@ -292,7 +292,7 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList
}
}
- void updatePendingInstances(String entityName, List<String> slaRemovedClusters , String entityType){
+ private void updatePendingInstances(String entityName, List<String> slaRemovedClusters, String entityType){
for(String clusterName :slaRemovedClusters){
MONITORING_JDBC_STATE_STORE.deletePendingInstances(entityName, clusterName,
entityType);
@@ -384,7 +384,7 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList
}
}
- void addPendingEntityInstances(Date checkPointTime) throws FalconException {
+ private void addPendingEntityInstances(Date checkPointTime) throws FalconException {
Set<String> currentClusters = DeploymentUtil.getCurrentClusters();
List<MonitoredEntityBean> entityBeanList = MONITORING_JDBC_STATE_STORE.getAllMonitoredEntities();
for(MonitoredEntityBean monitoredEntityBean : entityBeanList) {
@@ -611,8 +611,8 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList
return result;
}
- Set<Pair<Date, String>> getProcessSLAStatus(org.apache.falcon.entity.v0.process.Sla sla, Date start,
- Date end, List<Date> missingInstances) throws FalconException {
+ private Set<Pair<Date, String>> getProcessSLAStatus(org.apache.falcon.entity.v0.process.Sla sla, Date start,
+ Date end, List<Date> missingInstances) throws FalconException {
Date now = new Date();
Frequency slaHigh = sla.getShouldEndIn();
Set<Pair<Date, String>> result = new HashSet<>();
http://git-wip-us.apache.org/repos/asf/falcon/blob/3f6b6902/prism/src/test/java/org/apache/falcon/service/BacklogMetricEmitterServiceTest.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/service/BacklogMetricEmitterServiceTest.java b/prism/src/test/java/org/apache/falcon/service/BacklogMetricEmitterServiceTest.java
index 67d256e..206eced 100644
--- a/prism/src/test/java/org/apache/falcon/service/BacklogMetricEmitterServiceTest.java
+++ b/prism/src/test/java/org/apache/falcon/service/BacklogMetricEmitterServiceTest.java
@@ -19,6 +19,7 @@ package org.apache.falcon.service;
import org.apache.falcon.cluster.util.EmbeddedCluster;
import org.apache.falcon.entity.AbstractTestBase;
+import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.jdbc.BacklogMetricStore;
import org.apache.falcon.metrics.MetricNotificationService;
@@ -46,9 +47,9 @@ import java.util.Map;
*/
public class BacklogMetricEmitterServiceTest extends AbstractTestBase{
private static final String DB_BASE_DIR = "target/test-data/backlogmetricdb";
- protected static String dbLocation = DB_BASE_DIR + File.separator + "data.db";
- protected static String url = "jdbc:derby:"+ dbLocation +";create=true";
- protected static final String DB_SQL_FILE = DB_BASE_DIR + File.separator + "out.sql";
+ private static String dbLocation = DB_BASE_DIR + File.separator + "data.db";
+ protected static String url = "jdbc:derby:" + dbLocation + ";create=true";
+ private static final String DB_SQL_FILE = DB_BASE_DIR + File.separator + "out.sql";
protected LocalFileSystem fs = new LocalFileSystem();
private static BacklogMetricStore backlogMetricStore;
@@ -56,13 +57,13 @@ public class BacklogMetricEmitterServiceTest extends AbstractTestBase{
private static BacklogMetricEmitterService backlogMetricEmitterService;
private MetricNotificationService mockMetricNotificationService;
- protected int execDBCLICommands(String[] args) {
+ private int execDBCLICommands(String[] args) {
return new FalconStateStoreDBCLI().run(args);
}
- public void createDB(String file) {
+ private void createDB(String file) {
File sqlFile = new File(file);
- String[] argsCreate = { "create", "-sqlfile", sqlFile.getAbsolutePath(), "-run" };
+ String[] argsCreate = {"create", "-sqlfile", sqlFile.getAbsolutePath(), "-run"};
int result = execDBCLICommands(argsCreate);
Assert.assertEquals(0, result);
Assert.assertTrue(sqlFile.exists());
@@ -79,7 +80,7 @@ public class BacklogMetricEmitterServiceTest extends AbstractTestBase{
}
@BeforeClass
- public void setup() throws Exception{
+ public void setup() throws Exception {
StateStoreProperties.get().setProperty(FalconJPAService.URL, url);
Configuration localConf = new Configuration();
fs.initialize(LocalFileSystem.getDefaultUri(localConf), localConf);
@@ -94,7 +95,6 @@ public class BacklogMetricEmitterServiceTest extends AbstractTestBase{
Services.get().register(mockMetricNotificationService);
Services.get().register(BacklogMetricEmitterService.get());
backlogMetricEmitterService = BacklogMetricEmitterService.get();
-
}
@@ -102,7 +102,7 @@ public class BacklogMetricEmitterServiceTest extends AbstractTestBase{
public void testBacklogEmitter() throws Exception {
backlogMetricEmitterService.init();
storeEntity(EntityType.PROCESS, "entity1");
- backlogMetricEmitterService.highSLAMissed("entity1", "cluster1", EntityType.PROCESS,
+ backlogMetricEmitterService.highSLAMissed("entity1", "cluster1", EntityType.PROCESS,
BacklogMetricEmitterService.DATE_FORMAT.get().parse("2016-06-30T00-00Z"));
Thread.sleep(10);
ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
@@ -116,7 +116,11 @@ public class BacklogMetricEmitterServiceTest extends AbstractTestBase{
Mockito.reset(mockMetricNotificationService);
Mockito.verify(mockMetricNotificationService, Mockito.times(0)).publish(Mockito.any(String.class),
Mockito.any(Long.class));
-
+ backlogMetricEmitterService.highSLAMissed("entity1", "cluster1", EntityType.PROCESS,
+ BacklogMetricEmitterService.DATE_FORMAT.get().parse("2016-06-30T00-00Z"));
+ Thread.sleep(1000);
+ backlogMetricEmitterService.onRemove(EntityUtil.getEntity(EntityType.PROCESS, "entity1"));
+ Assert.assertNull(backlogMetricStore.getAllInstances());
}
private WorkflowExecutionContext getWorkflowExecutionContext() {
@@ -126,8 +130,6 @@ public class BacklogMetricEmitterServiceTest extends AbstractTestBase{
args.put(WorkflowExecutionArgs.ENTITY_NAME, "entity1");
args.put(WorkflowExecutionArgs.NOMINAL_TIME, "2016-06-30-00-00");
args.put(WorkflowExecutionArgs.OPERATION, "GENERATE");
- WorkflowExecutionContext workflowExecutionContext = new WorkflowExecutionContext(args);
- return workflowExecutionContext;
-
+ return new WorkflowExecutionContext(args);
}
}