You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by aj...@apache.org on 2016/04/06 12:26:06 UTC
falcon git commit: FALCON-1886 Feed sla monitoring does not work
across restarts.
Repository: falcon
Updated Branches:
refs/heads/master 08399d038 -> 14b1bb8f4
FALCON-1886 Feed sla monitoring does not work across restarts.
Removed redundant fields.
Added unit tests.
Author: Ajay Yadava <aj...@gmail.com>
Reviewers: Praveen Adlakha
Closes #86 from ajayyadava/1886
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/14b1bb8f
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/14b1bb8f
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/14b1bb8f
Branch: refs/heads/master
Commit: 14b1bb8f4dc6cf5c85957ea0981cbaff0b490ff3
Parents: 08399d0
Author: Ajay Yadava <aj...@gmail.com>
Authored: Wed Apr 6 15:55:18 2016 +0530
Committer: Ajay Yadava <aj...@gmail.com>
Committed: Wed Apr 6 15:55:18 2016 +0530
----------------------------------------------------------------------
.../falcon/persistence/PendingInstanceBean.java | 11 +-
.../persistence/PersistenceConstants.java | 1 +
.../falcon/jdbc/MonitoringJdbcStateStore.java | 20 ++--
.../service/FeedSLAMonitoringService.java | 113 +++++++------------
.../jdbc/MonitoringJdbcStateStoreTest.java | 55 +++++++--
.../falcon/service/FeedSLAMonitoringTest.java | 6 +-
6 files changed, 113 insertions(+), 93 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/14b1bb8f/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java b/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java
index 038244a..108001d 100644
--- a/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java
+++ b/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java
@@ -35,11 +35,12 @@ import java.util.Date;
* */
@Entity
@NamedQueries({
- @NamedQuery(name = PersistenceConstants.GET_PENDING_INSTANCES, query = "select OBJECT(a) from PendingInstanceBean a where a.feedName = :feedName"),
- @NamedQuery(name = PersistenceConstants.DELETE_PENDING_NOMINAL_INSTANCES , query = "delete from PendingInstanceBean a where a.feedName = :feedName and a.clusterName = :clusterName and a.nominalTime = :nominalTime"),
- @NamedQuery(name = PersistenceConstants.DELETE_ALL_INSTANCES_FOR_FEED, query = "delete from PendingInstanceBean a where a.feedName = :feedName and a.clusterName = :clusterName"),
- @NamedQuery(name = PersistenceConstants.GET_DATE_FOR_PENDING_INSTANCES , query = "select a.nominalTime from PendingInstanceBean a where a.feedName = :feedName and a.clusterName = :clusterName"),
- @NamedQuery(name= PersistenceConstants.GET_ALL_PENDING_INSTANCES , query = "select OBJECT(a) from PendingInstanceBean a ")
+ @NamedQuery(name = PersistenceConstants.GET_LATEST_INSTANCE_TIME, query = "select max(a.nominalTime) from PendingInstanceBean a where a.feedName = :feedName"),
+ @NamedQuery(name = PersistenceConstants.GET_PENDING_INSTANCES, query = "select OBJECT(a) from PendingInstanceBean a where a.feedName = :feedName"),
+ @NamedQuery(name = PersistenceConstants.DELETE_PENDING_NOMINAL_INSTANCES , query = "delete from PendingInstanceBean a where a.feedName = :feedName and a.clusterName = :clusterName and a.nominalTime = :nominalTime"),
+ @NamedQuery(name = PersistenceConstants.DELETE_ALL_INSTANCES_FOR_FEED, query = "delete from PendingInstanceBean a where a.feedName = :feedName and a.clusterName = :clusterName"),
+ @NamedQuery(name = PersistenceConstants.GET_DATE_FOR_PENDING_INSTANCES , query = "select a.nominalTime from PendingInstanceBean a where a.feedName = :feedName and a.clusterName = :clusterName"),
+ @NamedQuery(name= PersistenceConstants.GET_ALL_PENDING_INSTANCES , query = "select OBJECT(a) from PendingInstanceBean a ")
})
@Table(name = "PENDING_INSTANCES")
//RESUME CHECKSTYLE CHECK LineLengthCheck
http://git-wip-us.apache.org/repos/asf/falcon/blob/14b1bb8f/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
index e554581..44edc7c 100644
--- a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
+++ b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
@@ -53,4 +53,5 @@ public final class PersistenceConstants {
public static final String GET_LAST_INSTANCE_FOR_ENTITY_CLUSTER = "GET_LAST_INSTANCE_FOR_ENTITY_CLUSTER";
public static final String DELETE_INSTANCES_TABLE = "DELETE_INSTANCES_TABLE";
public static final String GET_INSTANCE_SUMMARY_BY_STATE_WITH_RANGE = "GET_INSTANCE_SUMMARY_BY_STATE_WITH_RANGE";
+ public static final String GET_LATEST_INSTANCE_TIME = "GET_LATEST_INSTANCE_TIME";
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/14b1bb8f/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java b/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java
index 39e2562..6345d44 100644
--- a/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java
+++ b/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java
@@ -94,6 +94,15 @@ public class MonitoringJdbcStateStore {
return result;
}
+ public Date getLastInstanceTime(String feedName) throws ResultNotFoundException {
+ EntityManager entityManager = getEntityManager();
+ Query q = entityManager.createNamedQuery(PersistenceConstants.GET_LATEST_INSTANCE_TIME, Date.class);
+ q.setParameter("feedName", feedName);
+ Date result = (Date)q.getSingleResult();
+ entityManager.close();
+ return result;
+ }
+
public void deletePendingInstance(String feedName, String clusterName , Date nominalTime){
EntityManager entityManager = getEntityManager();
beginTransaction(entityManager);
@@ -133,21 +142,16 @@ public class MonitoringJdbcStateStore {
commitAndCloseTransaction(entityManager);
}
- public List<Date> getNominalInstances(String feedName, String clusterName) throws ResultNotFoundException{
+ public List<Date> getNominalInstances(String feedName, String clusterName) {
EntityManager entityManager = getEntityManager();
Query q = entityManager.createNamedQuery(PersistenceConstants.GET_DATE_FOR_PENDING_INSTANCES);
q.setParameter("feedName", feedName);
q.setParameter("clusterName", clusterName);
List result = q.getResultList();
- try{
- if (CollectionUtils.isEmpty(result)) {
- throw new ResultNotFoundException(feedName + " with " + clusterName + "Not Found");
- }
- } finally {
- entityManager.close();
- }
+ entityManager.close();
return result;
}
+
public List<PendingInstanceBean> getAllInstances(){
EntityManager entityManager = getEntityManager();
Query q = entityManager.createNamedQuery(PersistenceConstants.GET_ALL_PENDING_INSTANCES);
http://git-wip-us.apache.org/repos/asf/falcon/blob/14b1bb8f/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java b/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java
index b5a2569..9de4463 100644
--- a/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java
+++ b/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java
@@ -17,6 +17,14 @@
*/
package org.apache.falcon.service;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
import org.apache.commons.collections.CollectionUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.Pair;
@@ -35,24 +43,18 @@ import org.apache.falcon.jdbc.MonitoringJdbcStateStore;
import org.apache.falcon.persistence.MonitoredFeedsBean;
import org.apache.falcon.persistence.PendingInstanceBean;
import org.apache.falcon.resource.SchedulableEntityInstance;
+import org.apache.falcon.util.DateUtil;
import org.apache.falcon.util.DeploymentUtil;
import org.apache.falcon.util.StartupProperties;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.ArrayList;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import com.google.common.annotations.VisibleForTesting;
/**
* Service to monitor Feed SLAs.
@@ -62,8 +64,6 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
private static final MonitoringJdbcStateStore MONITORING_JDBC_STATE_STORE = new MonitoringJdbcStateStore();
- private static final String ONE_HOUR = String.valueOf(60 * 60 * 1000);
-
private static final int ONE_MS = 1;
private static final FeedSLAMonitoringService SERVICE = new FeedSLAMonitoringService();
@@ -76,17 +76,11 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
return SERVICE;
}
- protected int queueSize;
-
/**
* Permissions for storePath.
*/
private static final FsPermission STORE_PERMISSION = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
- /**
- * Used to store the last time when pending instances were checked for SLA.
- */
- private Date lastCheckedAt;
/**
* Frequency in seconds of "status check" for pending feed instances.
@@ -103,11 +97,6 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
/**
- * Frequency in milliseconds of serializing(for backup) monitoring service's state.
- */
- private int serializationFrequencyMillis;
-
- /**
* Filesystem used for serializing and deserializing.
*/
private FileSystem fileSystem;
@@ -212,21 +201,12 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
filePath = new Path(storePath, "feedSLAMonitoringService");
fileSystem = initializeFileSystem();
- String freq = StartupProperties.get().getProperty("feed.sla.serialization.frequency.millis", ONE_HOUR);
- serializationFrequencyMillis = Integer.parseInt(freq);
-
- freq = StartupProperties.get().getProperty("feed.sla.statusCheck.frequency.seconds", "600");
+ String freq = StartupProperties.get().getProperty("feed.sla.statusCheck.frequency.seconds", "600");
statusCheckFrequencySeconds = Integer.parseInt(freq);
freq = StartupProperties.get().getProperty("feed.sla.lookAheadWindow.millis", "900000");
lookAheadWindowMillis = Integer.parseInt(freq);
-
- String size = StartupProperties.get().getProperty("feed.sla.queue.size", "288");
- queueSize = Integer.parseInt(size);
-
LOG.debug("No old state exists at: {}, Initializing a clean state.", filePath.toString());
- initializeService();
-
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
executor.scheduleWithFixedDelay(new Monitor(), 0, statusCheckFrequencySeconds, TimeUnit.SECONDS);
}
@@ -272,8 +252,7 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
// add Instances from last checked time to 10 minutes from now(some buffer for status check)
Date now = new Date();
Date newCheckPoint = new Date(now.getTime() + lookAheadWindowMillis);
- addNewPendingFeedInstances(lastCheckedAt, newCheckPoint);
- lastCheckedAt = newCheckPoint;
+ addNewPendingFeedInstances(newCheckPoint);
}
} catch (Throwable e) {
LOG.error("Feed SLA monitoring failed: ", e);
@@ -282,7 +261,7 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
}
- void addNewPendingFeedInstances(Date from, Date to) throws FalconException {
+ void addNewPendingFeedInstances(Date to) throws FalconException {
Set<String> currentClusters = DeploymentUtil.getCurrentClusters();
List<MonitoredFeedsBean> feedsBeanList = MONITORING_JDBC_STATE_STORE.getAllMonitoredFeed();
for(MonitoredFeedsBean monitoredFeedsBean : feedsBeanList) {
@@ -290,41 +269,27 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
Feed feed = EntityUtil.getEntity(EntityType.FEED, feedName);
for (Cluster feedCluster : feed.getClusters().getClusters()) {
if (currentClusters.contains(feedCluster.getName())) {
- Date nextInstanceTime = from;
+ // get start of instances from the database
+ Date nextInstanceTime = MONITORING_JDBC_STATE_STORE.getLastInstanceTime(feedName);
Pair<String, String> key = new Pair<>(feed.getName(), feedCluster.getName());
- BlockingQueue<Date> instances = new LinkedBlockingQueue<>(
- MONITORING_JDBC_STATE_STORE.getNominalInstances(feedName, feedCluster.getName()));
- if (CollectionUtils.isEmpty(MONITORING_JDBC_STATE_STORE.getNominalInstances(feedName,
- feedCluster.getName()))) {
- instances = new LinkedBlockingQueue<>(queueSize);
- Date feedStartTime = feedCluster.getValidity().getStart();
- Frequency retentionFrequency = FeedHelper.getRetentionFrequency(feed, feedCluster);
- ExpressionHelper evaluator = ExpressionHelper.get();
- ExpressionHelper.setReferenceDate(new Date());
- Date retention = new Date(evaluator.evaluate(retentionFrequency.toString(), Long.class));
- if (feedStartTime.before(retention)) {
- feedStartTime = retention;
- }
- nextInstanceTime = feedStartTime;
+ if (nextInstanceTime == null) {
+ nextInstanceTime = getInitialStartTime(feed, feedCluster.getName());
+ } else {
+ nextInstanceTime = new Date(nextInstanceTime.getTime() + ONE_MS);
}
- Set<Date> exists = new HashSet<>(instances);
+
+ Set<Date> instances = new HashSet<>();
org.apache.falcon.entity.v0.cluster.Cluster currentCluster =
EntityUtil.getEntity(EntityType.CLUSTER, feedCluster.getName());
nextInstanceTime = EntityUtil.getNextStartTime(feed, currentCluster, nextInstanceTime);
Date endDate = FeedHelper.getClusterValidity(feed, currentCluster.getName()).getEnd();
while (nextInstanceTime.before(to) && nextInstanceTime.before(endDate)) {
- if (instances.size() >= queueSize) { // if no space, first make some space
- LOG.debug("Removing instance={} for <feed,cluster>={}", instances.peek(), key);
- exists.remove(instances.peek());
- instances.remove();
- }
LOG.debug("Adding instance={} for <feed,cluster>={}", nextInstanceTime, key);
- if (exists.add(nextInstanceTime)) {
- instances.add(nextInstanceTime);
- }
+ instances.add(nextInstanceTime);
nextInstanceTime = new Date(nextInstanceTime.getTime() + ONE_MS);
nextInstanceTime = EntityUtil.getNextStartTime(feed, currentCluster, nextInstanceTime);
}
+
for(Date date:instances){
MONITORING_JDBC_STATE_STORE.putPendingInstances(feed.getName(), feedCluster.getName(), date);
}
@@ -376,10 +341,6 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
}
- protected void initializeService() {
- lastCheckedAt = new Date();
- }
-
/**
* Returns all {@link org.apache.falcon.entity.v0.feed.Feed} instances between given time range which have missed
* slaLow or slaHigh.
@@ -403,11 +364,11 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
Sla sla = FeedHelper.getSLA(cluster, feed);
if (sla != null) {
Set<Pair<Date, String>> slaStatus = getSLAStatus(sla, start, end,
- new LinkedBlockingQueue<Date>(MONITORING_JDBC_STATE_STORE.getNominalInstances(
- pendingInstanceBean.getFeedName(), pendingInstanceBean.getClusterName())));
- for (Pair<Date, String> status : slaStatus){
+ MONITORING_JDBC_STATE_STORE.getNominalInstances(pendingInstanceBean.getFeedName(),
+ pendingInstanceBean.getClusterName()));
+ for (Pair<Date, String> status : slaStatus) {
SchedulableEntityInstance instance = new SchedulableEntityInstance(feedClusterPair.first,
- feedClusterPair.second, status.first, EntityType.FEED);
+ feedClusterPair.second, status.first, EntityType.FEED);
instance.setTags(status.second);
result.add(instance);
}
@@ -431,8 +392,7 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
Set<SchedulableEntityInstance> result = new HashSet<>();
Pair<String, String> feedClusterPair = new Pair<>(feedName, clusterName);
- BlockingQueue<Date> missingInstances = new LinkedBlockingQueue<>(MONITORING_JDBC_STATE_STORE.
- getNominalInstances(feedName, clusterName));
+ List<Date> missingInstances = MONITORING_JDBC_STATE_STORE.getNominalInstances(feedName, clusterName);
Feed feed = EntityUtil.getEntity(EntityType.FEED, feedName);
Cluster cluster = FeedHelper.getCluster(feed, feedClusterPair.second);
Sla sla = FeedHelper.getSLA(cluster, feed);
@@ -448,7 +408,7 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
return result;
}
- Set<Pair<Date, String>> getSLAStatus(Sla sla, Date start, Date end, BlockingQueue<Date> missingInstances)
+ Set<Pair<Date, String>> getSLAStatus(Sla sla, Date start, Date end, List<Date> missingInstances)
throws FalconException {
String tagCritical = "Missed SLA High";
String tagWarn = "Missed SLA Low";
@@ -473,4 +433,17 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
}
return result;
}
+
+ @VisibleForTesting
+ Date getInitialStartTime(Feed feed, String clusterName) throws FalconException {
+ Sla sla = FeedHelper.getSLA(clusterName, feed);
+ if (sla == null) {
+ throw new IllegalStateException("InitialStartTime can not be determined as the feed: "
+ + feed.getName() + " and cluster: " + clusterName + " does not have any sla");
+ }
+ Date startTime = FeedHelper.getFeedValidityStart(feed, clusterName);
+ Frequency slaLow = sla.getSlaLow();
+ Date slaTime = new Date(DateUtil.now().getTime() - DateUtil.getFrequencyInMillis(slaLow));
+ return startTime.before(slaTime) ? startTime : slaTime;
+ }
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/14b1bb8f/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java b/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java
index aa32167..b43025d 100644
--- a/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java
+++ b/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java
@@ -17,6 +17,12 @@
*/
package org.apache.falcon.jdbc;
+import java.io.File;
+import java.util.Date;
+
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+
import org.apache.falcon.cluster.util.EmbeddedCluster;
import org.apache.falcon.entity.AbstractTestBase;
import org.apache.falcon.entity.v0.SchemaHelper;
@@ -26,14 +32,12 @@ import org.apache.falcon.util.StateStoreProperties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
+
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import java.io.File;
-import java.util.Date;
-import java.util.Random;
-
/**
*Unit test for MonitoringJdbcStateStore.
* */
@@ -45,7 +49,7 @@ public class MonitoringJdbcStateStoreTest extends AbstractTestBase {
protected static final String DB_SQL_FILE = DB_BASE_DIR + File.separator + "out.sql";
protected LocalFileSystem fs = new LocalFileSystem();
- private static Random randomValGenerator = new Random();
+ private static MonitoringJdbcStateStore monitoringJdbcStateStore;
private static FalconJPAService falconJPAService = FalconJPAService.get();
protected int execDBCLICommands(String[] args) {
@@ -71,12 +75,16 @@ public class MonitoringJdbcStateStoreTest extends AbstractTestBase {
falconJPAService.init();
this.dfsCluster = EmbeddedCluster.newCluster("testCluster");
this.conf = dfsCluster.getConf();
+ monitoringJdbcStateStore = new MonitoringJdbcStateStore();
+ }
+
+ @BeforeMethod
+ public void init() {
+ clear();
}
@Test
public void testInsertRetrieveAndUpdate() throws Exception {
-
- MonitoringJdbcStateStore monitoringJdbcStateStore = new MonitoringJdbcStateStore();
monitoringJdbcStateStore.putMonitoredFeed("test_feed1");
monitoringJdbcStateStore.putMonitoredFeed("test_feed2");
Assert.assertEquals("test_feed1", monitoringJdbcStateStore.getMonitoredFeed("test_feed1").getFeedName());
@@ -94,4 +102,37 @@ public class MonitoringJdbcStateStoreTest extends AbstractTestBase {
Assert.assertEquals(monitoringJdbcStateStore.getNominalInstances("test_feed1", "test_cluster").size(), 1);
monitoringJdbcStateStore.deletePendingInstances("test_feed1", "test_cluster");
}
+
+ @Test
+ public void testEmptyLatestInstance() throws Exception {
+ MonitoringJdbcStateStore store = new MonitoringJdbcStateStore();
+ store.putMonitoredFeed("test-feed1");
+ store.putMonitoredFeed("test-feed2");
+ Assert.assertNull(store.getLastInstanceTime("test-feed1"));
+
+ Date dateOne = SchemaHelper.parseDateUTC("2015-11-20T00:00Z");
+ Date dateTwo = SchemaHelper.parseDateUTC("2015-11-20T01:00Z");
+
+ store.putPendingInstances("test-feed1", "test_cluster", dateTwo);
+ store.putPendingInstances("test-feed1", "test_cluster", dateOne);
+ store.putPendingInstances("test-feed2", "test_cluster", dateOne);
+
+ Assert.assertTrue(dateTwo.equals(store.getLastInstanceTime("test-feed1")));
+ Assert.assertTrue(dateOne.equals(store.getLastInstanceTime("test-feed2")));
+
+ }
+
+ private void clear() {
+ EntityManager em = FalconJPAService.get().getEntityManager();
+ em.getTransaction().begin();
+ try {
+ Query query = em.createNativeQuery("delete from MONITORED_FEEDS");
+ query.executeUpdate();
+ query = em.createNativeQuery("delete from PENDING_INSTANCES");
+ query.executeUpdate();
+ } finally {
+ em.getTransaction().commit();
+ em.close();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/14b1bb8f/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java b/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java
index b739037..dbe0cf4 100644
--- a/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java
+++ b/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java
@@ -21,12 +21,12 @@ package org.apache.falcon.service;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
+import java.util.ArrayList;
import java.util.Date;
import java.util.HashSet;
+import java.util.List;
import java.util.Set;
import java.util.TimeZone;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
import org.apache.falcon.FalconException;
import org.apache.falcon.Pair;
@@ -65,7 +65,7 @@ public class FeedSLAMonitoringTest extends AbstractTestBase {
Date start = SchemaHelper.parseDateUTC("2014-05-05T00:00Z");
Date end = SchemaHelper.parseDateUTC("2015-05-05T00:00Z");
- BlockingQueue<Date> missingInstances = new LinkedBlockingQueue<>();
+ List<Date> missingInstances = new ArrayList<>();
missingInstances.add(SchemaHelper.parseDateUTC("2013-05-05T00:00Z")); // before start time
missingInstances.add(SchemaHelper.parseDateUTC("2014-05-05T00:00Z")); // equal to start time
missingInstances.add(SchemaHelper.parseDateUTC("2014-05-06T00:00Z")); // in between