You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pa...@apache.org on 2016/09/06 09:23:55 UTC
falcon git commit: FALCON-2077 Api support for Process SLA
Repository: falcon
Updated Branches:
refs/heads/master c3697de62 -> 91c0a9926
FALCON-2077 Api support for Process SLA
Author: Praveen Adlakha <ad...@gmail.com>
Reviewers: @pallavi-rao
Closes #278 from PraveenAdlakha/2077_v1 and squashes the following commits:
3b89e83 [Praveen Adlakha] changed method in EntityType
a4c3686 [Praveen Adlakha] comments addressed
860d055 [Praveen Adlakha] FALCON-2077 Api support for Process SLA
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/91c0a992
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/91c0a992
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/91c0a992
Branch: refs/heads/master
Commit: 91c0a9926f157e2883ad976004230d863258ae62
Parents: c3697de
Author: Praveen Adlakha <ad...@gmail.com>
Authored: Tue Sep 6 14:53:44 2016 +0530
Committer: Pallavi Rao <pa...@inmobi.com>
Committed: Tue Sep 6 14:53:44 2016 +0530
----------------------------------------------------------------------
.../org/apache/falcon/entity/v0/EntityType.java | 10 ++
.../org/apache/falcon/entity/EntityUtil.java | 1 +
.../falcon/hadoop/HadoopClientFactory.java | 4 +-
.../falcon/persistence/EntitySLAAlertBean.java | 15 +-
.../falcon/persistence/MonitoredEntityBean.java | 15 +-
.../falcon/persistence/PendingInstanceBean.java | 15 +-
common/src/main/resources/startup.properties | 10 +-
.../workflow/engine/OozieClientFactory.java | 2 +-
.../falcon/jdbc/MonitoringJdbcStateStore.java | 22 +--
.../AbstractSchedulableEntityManager.java | 33 ++--
.../proxy/SchedulableEntityManagerProxy.java | 4 +-
.../falcon/service/EntitySLAAlertService.java | 17 +-
.../service/EntitySLAMonitoringService.java | 94 +++++-----
.../service/EntitySLAAlertServiceTest.java | 4 +-
.../falcon/service/EntitySLAMonitoringTest.java | 174 +++++++++++++++++++
.../falcon/service/FeedSLAMonitoringTest.java | 164 -----------------
prism/src/test/resources/startup.properties | 8 +-
src/conf/startup.properties | 10 +-
.../resource/SchedulableEntityManager.java | 6 +-
19 files changed, 314 insertions(+), 294 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/91c0a992/client/src/main/java/org/apache/falcon/entity/v0/EntityType.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/entity/v0/EntityType.java b/client/src/main/java/org/apache/falcon/entity/v0/EntityType.java
index 3d55547..29dbc7a 100644
--- a/client/src/main/java/org/apache/falcon/entity/v0/EntityType.java
+++ b/client/src/main/java/org/apache/falcon/entity/v0/EntityType.java
@@ -101,6 +101,16 @@ public enum EntityType {
return ((this != EntityType.CLUSTER) && (this != EntityType.DATASOURCE));
}
+ public static void assertSchedulable(String entityType){
+ EntityType type = EntityType.getEnum(entityType);
+ if (type.isSchedulable()){
+ return;
+ } else {
+ throw new IllegalArgumentException("EntityType "+ entityType
+ + " is not valid,Feed and Process are the valid input type.");
+ }
+ }
+
@edu.umd.cs.findbugs.annotations.SuppressWarnings({"EI_EXPOSE_REP"})
public String[] getImmutableProperties() {
return immutableProperties;
http://git-wip-us.apache.org/repos/asf/falcon/blob/91c0a992/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
index aef1fd5..8fe316c 100644
--- a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
+++ b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
@@ -1204,4 +1204,5 @@ public final class EntityUtil {
return false;
}
+
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/91c0a992/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java b/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java
index e30f51e..f32df6d 100644
--- a/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java
+++ b/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java
@@ -202,12 +202,12 @@ public final class HadoopClientFactory {
// prevent falcon impersonating falcon, no need to use doas
final String proxyUserName = ugi.getShortUserName();
if (proxyUserName.equals(UserGroupInformation.getLoginUser().getShortUserName())) {
- LOG.info("Creating FS for the login user {}, impersonation not required",
+ LOG.trace("Creating FS for the login user {}, impersonation not required",
proxyUserName);
return FileSystem.get(uri, conf);
}
- LOG.info("Creating FS impersonating user {}", proxyUserName);
+ LOG.trace("Creating FS impersonating user {}", proxyUserName);
return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
public FileSystem run() throws Exception {
return FileSystem.get(uri, conf);
http://git-wip-us.apache.org/repos/asf/falcon/blob/91c0a992/common/src/main/java/org/apache/falcon/persistence/EntitySLAAlertBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/EntitySLAAlertBean.java b/common/src/main/java/org/apache/falcon/persistence/EntitySLAAlertBean.java
index e2096fe..1419f48 100644
--- a/common/src/main/java/org/apache/falcon/persistence/EntitySLAAlertBean.java
+++ b/common/src/main/java/org/apache/falcon/persistence/EntitySLAAlertBean.java
@@ -18,7 +18,6 @@
package org.apache.falcon.persistence;
-import org.apache.falcon.FalconException;
import org.apache.falcon.entity.v0.EntityType;
import java.util.Date;
@@ -69,9 +68,9 @@ public class EntitySLAAlertBean {
return entityType;
}
- public void setEntityType(String entityType) throws FalconException {
- checkEntityType(entityType);
- this.entityType = entityType;
+ public void setEntityType(String entityType) {
+ EntityType.assertSchedulable(entityType);
+ this.entityType = entityType.toLowerCase();
}
@Basic
@@ -157,12 +156,4 @@ public class EntitySLAAlertBean {
public static final String NOMINALTIME = "nominalTime";
- void checkEntityType(String entityType)throws FalconException{
- if (entityType.equals(EntityType.PROCESS.toString()) || entityType.equals(EntityType.FEED.toString())){
- return;
- } else {
- throw new FalconException("EntityType"+ entityType
- + " is not valid,Feed and Process are the valid input type.");
- }
- }
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/91c0a992/common/src/main/java/org/apache/falcon/persistence/MonitoredEntityBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/MonitoredEntityBean.java b/common/src/main/java/org/apache/falcon/persistence/MonitoredEntityBean.java
index 1db3d04..c620e45 100644
--- a/common/src/main/java/org/apache/falcon/persistence/MonitoredEntityBean.java
+++ b/common/src/main/java/org/apache/falcon/persistence/MonitoredEntityBean.java
@@ -17,7 +17,6 @@
*/
package org.apache.falcon.persistence;
-import org.apache.falcon.FalconException;
import org.apache.falcon.entity.v0.EntityType;
import javax.persistence.Entity;
@@ -64,9 +63,9 @@ public class MonitoredEntityBean {
return entityType;
}
- public void setEntityType(String entityType) throws FalconException {
- checkEntityType(entityType);
- this.entityType = entityType;
+ public void setEntityType(String entityType) {
+ EntityType.assertSchedulable(entityType);
+ this.entityType = entityType.toLowerCase();
}
@Basic
@@ -94,12 +93,4 @@ public class MonitoredEntityBean {
public static final String ENTITYTYPE = "entityType";
- void checkEntityType(String entityType)throws FalconException {
- if (entityType.equals(EntityType.PROCESS.toString()) || entityType.equals(EntityType.FEED.toString())){
- return;
- } else {
- throw new FalconException("EntityType"+ entityType
- + " is not valid,Feed and Process are the valid input type.");
- }
- }
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/91c0a992/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java b/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java
index 863abdc..43b6b8e 100644
--- a/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java
+++ b/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java
@@ -17,7 +17,6 @@
*/
package org.apache.falcon.persistence;
-import org.apache.falcon.FalconException;
import org.apache.falcon.entity.v0.EntityType;
import javax.persistence.Entity;
@@ -73,9 +72,9 @@ public class PendingInstanceBean {
return entityType;
}
- public void setEntityType(String entityType) throws FalconException {
- checkEntityType(entityType);
- this.entityType = entityType;
+ public void setEntityType(String entityType) {
+ EntityType.assertSchedulable(entityType);
+ this.entityType = entityType.toLowerCase();
}
@Basic
@@ -123,12 +122,4 @@ public class PendingInstanceBean {
public static final String ENTITYTYPE = "entityType";
- void checkEntityType(String entityType)throws FalconException {
- if (entityType.equals(EntityType.PROCESS.toString()) || entityType.equals(EntityType.FEED.toString())){
- return;
- } else {
- throw new FalconException("EntityType"+ entityType
- + " is not valid,Feed and Process are the valid input type.");
- }
- }
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/91c0a992/common/src/main/resources/startup.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties
index 8d64c54..3beab62 100644
--- a/common/src/main/resources/startup.properties
+++ b/common/src/main/resources/startup.properties
@@ -73,7 +73,7 @@
# org.apache.falcon.state.store.jdbc.JdbcStateStore
## If you wish to use Feed Alert to know when a feed misses a high SLA register your class here
-*.feedAlert.listeners=
+*.entityAlert.listeners=
##### JMS MQ Broker Implementation class #####
*.broker.impl.class=org.apache.activemq.ActiveMQConnectionFactory
@@ -98,7 +98,7 @@
#Configurations used in UTs
debug.config.store.uri=file://${user.dir}/target/store
#Location to store state of Feed SLA monitoring service
-debug.feed.sla.service.store.uri= file://${user.dir}/target/data/sla/pendingfeedinstances
+debug.entity.sla.service.store.uri= file://${user.dir}/target/data/sla/pendingentityinstances
debug.config.oozie.conf.uri=${user.dir}/target/oozie
debug.system.lib.location=${system.lib.location}
debug.broker.url=vm://localhost
@@ -122,7 +122,7 @@ it.workflow.execution.listeners=org.apache.falcon.catalog.CatalogPartitionHandle
*.falcon.cleanup.service.frequency=minutes(5)
-######### Properties for Feed SLA Monitoring #########
+######### Properties for Entity SLA Monitoring #########
# frequency of serialization for the state of FeedSLAMonitoringService - 1 hour
*.feed.sla.serialization.frequency.millis=3600000
@@ -132,13 +132,13 @@ it.workflow.execution.listeners=org.apache.falcon.catalog.CatalogPartitionHandle
# Do not change unless really sure
# Frequency in seconds of "status check" for pending feed instances, default is 10 mins = 10 * 60
-*.feed.sla.statusCheck.frequency.seconds=600
+*.entity.sla.statusCheck.frequency.seconds=600
# Do not change unless really sure
# Time Duration (in milliseconds) in future for generating pending feed instances.
# In every cycle pending feed instances are added for monitoring, till this time in future.
# It must be more than statusCheck frequency, default is 15 mins = 15 * 60 * 1000
-*.feed.sla.lookAheadWindow.millis=900000
+*.entity.sla.lookAheadWindow.millis=900000
######### Properties for configuring JMS provider - activemq #########
http://git-wip-us.apache.org/repos/asf/falcon/blob/91c0a992/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieClientFactory.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieClientFactory.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieClientFactory.java
index ae5c5fa..3380b1a 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieClientFactory.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieClientFactory.java
@@ -47,7 +47,7 @@ public final class OozieClientFactory {
assert cluster != null : "Cluster cant be null";
String oozieUrl = ClusterHelper.getOozieUrl(cluster);
- LOG.info("Creating Oozie client object for {}", oozieUrl);
+ LOG.trace("Creating Oozie client object for {}", oozieUrl);
return getClientRef(oozieUrl);
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/91c0a992/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java b/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java
index 6a38b0a..c479940 100644
--- a/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java
+++ b/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java
@@ -62,7 +62,7 @@ public class MonitoringJdbcStateStore {
EntityManager entityManager = getEntityManager();
Query q = entityManager.createNamedQuery(PersistenceConstants.GET_MONITERED_INSTANCE);
q.setParameter(MonitoredEntityBean.ENTITYNAME, entityName);
- q.setParameter(MonitoredEntityBean.ENTITYTYPE, entityType);
+ q.setParameter(MonitoredEntityBean.ENTITYTYPE, entityType.toLowerCase());
List result = q.getResultList();
try {
if (result.isEmpty()) {
@@ -79,7 +79,7 @@ public class MonitoringJdbcStateStore {
beginTransaction(entityManager);
Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_MONITORED_INSTANCES);
q.setParameter(MonitoredEntityBean.ENTITYNAME, entityName);
- q.setParameter(MonitoredEntityBean.ENTITYTYPE, entityType);
+ q.setParameter(MonitoredEntityBean.ENTITYTYPE, entityType.toLowerCase());
try{
q.executeUpdate();
} finally {
@@ -98,7 +98,7 @@ public class MonitoringJdbcStateStore {
public List<MonitoredEntityBean> getAllMonitoredEntityForEntity(String entityType) throws ResultNotFoundException {
EntityManager entityManager = getEntityManager();
Query q = entityManager.createNamedQuery(PersistenceConstants.GET_ALL_MONITORING_ENTITY_FOR_TYPE);
- q.setParameter(PendingInstanceBean.ENTITYTYPE, entityType);
+ q.setParameter(PendingInstanceBean.ENTITYTYPE, entityType.toLowerCase());
List result = q.getResultList();
entityManager.close();
return result;
@@ -108,7 +108,7 @@ public class MonitoringJdbcStateStore {
EntityManager entityManager = getEntityManager();
Query q = entityManager.createNamedQuery(PersistenceConstants.GET_LATEST_INSTANCE_TIME, Date.class);
q.setParameter(PendingInstanceBean.ENTITYNAME, entityName);
- q.setParameter(PendingInstanceBean.ENTITYTYPE, entityType);
+ q.setParameter(PendingInstanceBean.ENTITYTYPE, entityType.toLowerCase());
Date result = (Date)q.getSingleResult();
entityManager.close();
return result;
@@ -121,7 +121,7 @@ public class MonitoringJdbcStateStore {
q.setParameter(PendingInstanceBean.ENTITYNAME, entityName);
q.setParameter(PendingInstanceBean.CLUSTERNAME, clusterName);
q.setParameter(PendingInstanceBean.NOMINALTIME, nominalTime);
- q.setParameter(PendingInstanceBean.ENTITYTYPE, entityType);
+ q.setParameter(PendingInstanceBean.ENTITYTYPE, entityType.toLowerCase());
try{
q.executeUpdate();
} finally {
@@ -135,7 +135,7 @@ public class MonitoringJdbcStateStore {
Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_ALL_INSTANCES_FOR_ENTITY);
q.setParameter(PendingInstanceBean.ENTITYNAME, entityName);
q.setParameter(PendingInstanceBean.CLUSTERNAME, clusterName);
- q.setParameter(PendingInstanceBean.ENTITYTYPE, entityType);
+ q.setParameter(PendingInstanceBean.ENTITYTYPE, entityType.toLowerCase());
try{
q.executeUpdate();
} finally {
@@ -162,7 +162,7 @@ public class MonitoringJdbcStateStore {
Query q = entityManager.createNamedQuery(PersistenceConstants.GET_DATE_FOR_PENDING_INSTANCES);
q.setParameter(PendingInstanceBean.ENTITYNAME, entityName);
q.setParameter(PendingInstanceBean.CLUSTERNAME, clusterName);
- q.setParameter(PendingInstanceBean.ENTITYTYPE, entityType);
+ q.setParameter(PendingInstanceBean.ENTITYTYPE, entityType.toLowerCase());
List result = q.getResultList();
entityManager.close();
return result;
@@ -198,7 +198,7 @@ public class MonitoringJdbcStateStore {
q.setParameter(PendingInstanceBean.CLUSTERNAME, clusterName);
q.setParameter(PendingInstanceBean.NOMINALTIME, nominalTime);
- q.setParameter(PendingInstanceBean.ENTITYTYPE, entityType);
+ q.setParameter(PendingInstanceBean.ENTITYTYPE, entityType.toLowerCase());
try {
return q.getSingleResult();
} finally {
@@ -215,7 +215,7 @@ public class MonitoringJdbcStateStore {
q.setParameter(EntitySLAAlertBean.ENTITYNAME, entityName);
q.setParameter(EntitySLAAlertBean.CLUSTERNAME, clusterName);
q.setParameter(EntitySLAAlertBean.NOMINALTIME, nominalTime);
- q.setParameter(EntitySLAAlertBean.ENTITYTYPE, entityType);
+ q.setParameter(EntitySLAAlertBean.ENTITYTYPE, entityType.toLowerCase());
try {
return q.getSingleResult();
} finally {
@@ -248,7 +248,7 @@ public class MonitoringJdbcStateStore {
q.setParameter(EntitySLAAlertBean.ENTITYNAME, entityName);
q.setParameter(EntitySLAAlertBean.CLUSTERNAME, clusterName);
q.setParameter(EntitySLAAlertBean.NOMINALTIME, nominalTime);
- q.setParameter(EntitySLAAlertBean.ENTITYTYPE, entityType);
+ q.setParameter(EntitySLAAlertBean.ENTITYTYPE, entityType.toLowerCase());
try{
q.executeUpdate();
} finally {
@@ -263,7 +263,7 @@ public class MonitoringJdbcStateStore {
q.setParameter(EntitySLAAlertBean.ENTITYNAME, entityName);
q.setParameter(EntitySLAAlertBean.CLUSTERNAME, clusterName);
q.setParameter(EntitySLAAlertBean.NOMINALTIME, nominalTime);
- q.setParameter(EntitySLAAlertBean.ENTITYTYPE, entityType);
+ q.setParameter(EntitySLAAlertBean.ENTITYTYPE, entityType.toLowerCase());
try{
q.executeUpdate();
} finally {
http://git-wip-us.apache.org/repos/asf/falcon/blob/91c0a992/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
index 895f8b2..3bdeb99 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
@@ -111,16 +111,27 @@ public abstract class AbstractSchedulableEntityManager extends AbstractInstanceM
}
}
+ /**
+ * Validates the parameters whether SLA is supported or not.
+ *
+ * @param entityType currently two entityTypes are supported Process and Feed
+ * @param entityName name of the entity
+ * @param start startDate from which SLA is to be looked at
+ * @param end endDate upto which SLA is to be looked at.
+ * @param colo colo in which entity is to be looked into
+ * @throws FalconException if the validation fails
+ * **/
+
public static void validateSlaParams(String entityType, String entityName, String start, String end,
String colo) throws FalconException {
EntityType type = EntityType.getEnum(entityType);
- if (type != EntityType.FEED) {
+ if (!type.isSchedulable()){
throw new ValidationException("SLA monitoring is not supported for: " + type);
}
- // validate valid feed name.
+ // validate valid entity name.
if (StringUtils.isNotBlank(entityName)) {
- EntityUtil.getEntity(EntityType.FEED, entityName);
+ EntityUtil.getEntity(entityType, entityName);
}
Date startTime, endTime;
@@ -146,14 +157,14 @@ public abstract class AbstractSchedulableEntityManager extends AbstractInstanceM
}
/**
- * Returns the feed instances which are not yet available and have missed either slaLow or slaHigh.
- * This api doesn't return the feeds which missed SLA but are now available. Purpose of this api is to show feed
- * instances which you need to attend to.
+ * Returns the entity instances which are not yet available and have missed either slaLow or slaHigh.
+ * This api doesn't return the entitites which missed SLA but are now available. Purpose of this api is to
+ * show entity instances which you need to attend to.
* @param startStr startTime in
* @param endStr
*/
- public SchedulableEntityInstanceResult getFeedSLAMissPendingAlerts(String feedName, String startStr, String endStr,
- String colo) {
+ public SchedulableEntityInstanceResult getEntitySLAMissPendingAlerts(String entityName, String entityType,
+ String startStr, String endStr, String colo) {
Set<SchedulableEntityInstance> instances = new HashSet<>();
try {
@@ -161,12 +172,12 @@ public abstract class AbstractSchedulableEntityManager extends AbstractInstanceM
Date start = EntityUtil.parseDateUTC(startStr);
Date end = (endStr == null) ? new Date() : EntityUtil.parseDateUTC(endStr);
- if (StringUtils.isBlank(feedName)) {
+ if (StringUtils.isBlank(entityName)) {
instances.addAll(EntitySLAMonitoringService.get().getEntitySLAMissPendingAlerts(start, end));
} else {
for (String clusterName : DeploymentUtil.getCurrentClusters()) {
- instances.addAll(EntitySLAMonitoringService.get().getEntitySLAMissPendingAlerts(feedName,
- clusterName, start, end, EntityType.FEED.toString()));
+ instances.addAll(EntitySLAMonitoringService.get().getEntitySLAMissPendingAlerts(entityName,
+ clusterName, start, end, entityType));
}
}
} catch (FalconException e) {
http://git-wip-us.apache.org/repos/asf/falcon/blob/91c0a992/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
index 249c273..07334d6 100644
--- a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
+++ b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
@@ -113,8 +113,8 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
@GET
@Path("sla-alert/{type}")
@Produces({MediaType.APPLICATION_JSON, MediaType.TEXT_XML})
- @Monitored(event = "feed-sla-misses")
- public SchedulableEntityInstanceResult getFeedSLAMissPendingAlerts(
+ @Monitored(event = "entity-sla-misses")
+ public SchedulableEntityInstanceResult getEntitySLAMissPendingAlerts(
@Dimension("entityType") @PathParam("type") final String entityType,
@Dimension("entityName") @QueryParam("name") final String entityName,
@Dimension("start") @QueryParam("start") final String start,
http://git-wip-us.apache.org/repos/asf/falcon/blob/91c0a992/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java b/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java
index a7cafeb..bcf11e3 100644
--- a/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java
+++ b/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java
@@ -39,7 +39,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Service to know which all feeds have missed SLA.
+ * Service to know which all entities have missed SLA.
*/
public final class EntitySLAAlertService implements FalconService, EntitySLAListener {
@@ -68,7 +68,7 @@ public final class EntitySLAAlertService implements FalconService, EntitySLAList
@Override
public void init() throws FalconException {
String listenerClassNames = StartupProperties.get().
- getProperty("feedAlert.listeners");
+ getProperty("entityAlert.listeners");
if (listenerClassNames != null && !listenerClassNames.isEmpty()) {
for (String listenerClassName : listenerClassNames.split(",")) {
listenerClassName = listenerClassName.trim();
@@ -80,7 +80,7 @@ public final class EntitySLAAlertService implements FalconService, EntitySLAList
}
}
- String freq = StartupProperties.get().getProperty("feed.sla.statusCheck.frequency.seconds", "600");
+ String freq = StartupProperties.get().getProperty("entity.sla.statusCheck.frequency.seconds", "600");
int statusCheckFrequencySeconds = Integer.parseInt(freq);
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
@@ -106,7 +106,7 @@ public final class EntitySLAAlertService implements FalconService, EntitySLAList
}
void processSLACandidates(){
- //Get all feeds instances to be monitored
+ //Get all entity instances to be monitored
List<PendingInstanceBean> pendingInstanceBeanList = store.getAllPendingInstances();
if (pendingInstanceBeanList == null || pendingInstanceBeanList.isEmpty()){
return;
@@ -139,17 +139,16 @@ public final class EntitySLAAlertService implements FalconService, EntitySLAList
store.putSLAAlertInstance(entityName, clusterName, entityType,
nominalTime, true, false);
//Mark in DB as SLA missed
- LOG.info("Feed :"+ entityName
- + "Cluster:" + clusterName + "Nominal Time:" + nominalTime + "missed SLALow");
+ LOG.info("Entity : {} Cluster : {} Nominal Time : {} missed SLALow", entityName, entityType,
+ clusterName, nominalTime);
} else if (schedulableEntityInstance.getTags().contains(EntitySLAMonitoringService.get().TAG_CRITICAL)){
if (entityType.equals(EntityType.PROCESS.name())){
store.putSLAAlertInstance(entityName, clusterName, entityType,
nominalTime, true, false);
}
store.updateSLAAlertInstance(entityName, clusterName, nominalTime, entityType);
- LOG.info("Entity :"+ entityName
- + "Cluster:" + clusterName + "Nominal Time:" + nominalTime + "EntityType:"+ entityType
- + "missed SLAHigh");
+ LOG.info("Entity :{} EntityType : {} Cluster: {} Nominal Time: {} missed SLAHigh", entityName,
+ entityType , clusterName , nominalTime);
highSLAMissed(entityName, clusterName, EntityType.valueOf(entityType), nominalTime);
}
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/91c0a992/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java b/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java
index 185e087..1e20a2b 100644
--- a/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java
+++ b/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java
@@ -20,7 +20,6 @@ package org.apache.falcon.service;
import java.text.ParseException;
import java.util.HashSet;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Set;
@@ -72,7 +71,7 @@ import com.google.common.annotations.VisibleForTesting;
* Service to monitor Feed SLAs.
*/
public final class EntitySLAMonitoringService implements ConfigurationChangeListener, FalconService {
- private static final Logger LOG = LoggerFactory.getLogger("FeedSLA");
+ private static final Logger LOG = LoggerFactory.getLogger(EntitySLAMonitoringService.class);
private static final MonitoringJdbcStateStore MONITORING_JDBC_STATE_STORE = new MonitoringJdbcStateStore();
@@ -98,15 +97,15 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList
/**
- * Frequency in seconds of "status check" for pending feed instances.
+ * Frequency in seconds of "status check" for pending entity instances.
*/
private int statusCheckFrequencySeconds; // 10 minutes
/**
- * Time Duration (in milliseconds) in future for generating pending feed instances.
+ * Time Duration (in milliseconds) in future for generating pending entity instances.
*
- * In every cycle pending feed instances are added for monitoring, till this time in future.
+ * In every cycle pending entity instances are added for monitoring, till this time in future.
*/
private int lookAheadWindowMillis; // 15 MINUTES
@@ -117,7 +116,7 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList
private FileSystem fileSystem;
/**
- * Working directory for the feed sla monitoring service.
+ * Working directory for the entity sla monitoring service.
*/
private Path storePath;
@@ -294,15 +293,15 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList
@Override
public void init() throws FalconException {
- String uri = StartupProperties.get().getProperty("feed.sla.service.store.uri");
+ String uri = StartupProperties.get().getProperty("entity.sla.service.store.uri");
storePath = new Path(uri);
- filePath = new Path(storePath, "feedSLAMonitoringService");
+ filePath = new Path(storePath, "entitySLAMonitoringService");
fileSystem = initializeFileSystem();
- String freq = StartupProperties.get().getProperty("feed.sla.statusCheck.frequency.seconds", "600");
+ String freq = StartupProperties.get().getProperty("entity.sla.statusCheck.frequency.seconds", "600");
statusCheckFrequencySeconds = Integer.parseInt(freq);
- freq = StartupProperties.get().getProperty("feed.sla.lookAheadWindow.millis", "900000");
+ freq = StartupProperties.get().getProperty("entity.sla.lookAheadWindow.millis", "900000");
lookAheadWindowMillis = Integer.parseInt(freq);
LOG.debug("No old state exists at: {}, Initializing a clean state.", filePath.toString());
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
@@ -326,13 +325,13 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList
try {
fileSystem = HadoopClientFactory.get().createFalconFileSystem(storePath.toUri());
if (!fileSystem.exists(storePath)) {
- LOG.info("Creating directory for pending feed instances: {}", storePath);
+ LOG.info("Creating directory for pending entity instances: {}", storePath);
// set permissions so config store dir is owned by falcon alone
HadoopClientFactory.mkdirs(fileSystem, storePath, STORE_PERMISSION);
}
return fileSystem;
} catch (Exception e) {
- throw new RuntimeException("Unable to bring up feed sla store for path: " + storePath, e);
+ throw new RuntimeException("Unable to bring up entity sla store for path: " + storePath, e);
}
}
@@ -353,8 +352,8 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList
// add Instances from last checked time to 10 minutes from now(some buffer for status check)
Date now = new Date();
Date newCheckPoint = new Date(now.getTime() + lookAheadWindowMillis);
- addNewPendingFeedInstances(newCheckPoint, EntityType.FEED.toString());
- addNewPendingFeedInstances(newCheckPoint, EntityType.PROCESS.toString());
+ addNewPendingEntityInstances(newCheckPoint, EntityType.FEED.toString());
+ addNewPendingEntityInstances(newCheckPoint, EntityType.PROCESS.toString());
}
} catch (Throwable e) {
LOG.error("Feed SLA monitoring failed: ", e);
@@ -363,7 +362,7 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList
}
- void addNewPendingFeedInstances(Date to, String entityType) throws FalconException {
+ void addNewPendingEntityInstances(Date to, String entityType) throws FalconException {
Set<String> currentClusters = DeploymentUtil.getCurrentClusters();
List<MonitoredEntityBean> entityBeanList = MONITORING_JDBC_STATE_STORE.
getAllMonitoredEntityForEntity(entityType);
@@ -439,7 +438,7 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList
}
}
- // checks whether a given feed instance is available or not
+ // checks whether a given entity instance is available or not
private boolean checkEntityInstanceAvailability(String entityName, String clusterName, Date nominalTime,
String entityType) throws FalconException {
Entity entity = EntityUtil.getEntity(entityType, entityName);
@@ -451,8 +450,8 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList
AbstractWorkflowEngine wfEngine = WorkflowEngineFactory.getWorkflowEngine();
InstancesResult instancesResult = wfEngine.getStatus(entity, nominalTime, nominalTime, null, null);
if (instancesResult.getStatus().equals(APIResult.Status.SUCCEEDED)){
- LOG.debug("Entity instance(feed:{}, cluster:{}, instanceTime:{}) is available.", entity.getName(),
- clusterName, nominalTime);
+ LOG.debug("Entity instance(Process:{}, cluster:{}, instanceTime:{}) is available.",
+ entity.getName(), clusterName, nominalTime);
return true;
}
return false;
@@ -481,35 +480,54 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList
/**
- * Returns all {@link org.apache.falcon.entity.v0.feed.Feed} instances between given time range which have missed
- * slaLow or slaHigh.
+ * Returns all {@link org.apache.falcon.entity.v0.feed.Feed} and {@link org.apache.falcon.entity.v0.process.Process}
+ * instances between given time range which have missed slaLow or slaHigh.
*
- * Only feeds which have defined sla in their definition are considered.
- * Only the feed instances between the given time range are considered.
+ * Only entities which have defined sla in their definition are considered.
+ * Only the entity instances between the given time range are considered.
* Start time and end time are both inclusive.
* @param start start time, inclusive
* @param end end time, inclusive
- * @return Set of pending feed instances belonging to the given range which have missed SLA
+ * @return Set of pending entity instances belonging to the given range which have missed SLA
* @throws FalconException
*/
public Set<SchedulableEntityInstance> getEntitySLAMissPendingAlerts(Date start, Date end)
throws FalconException {
Set<SchedulableEntityInstance> result = new HashSet<>();
for(PendingInstanceBean pendingInstanceBean : MONITORING_JDBC_STATE_STORE.getAllPendingInstances()){
- Pair<String, String> feedClusterPair = new Pair<>(pendingInstanceBean.getEntityName(),
+ Pair<String, String> entityClusterPair = new Pair<>(pendingInstanceBean.getEntityName(),
pendingInstanceBean.getClusterName());
- Feed feed = EntityUtil.getEntity(EntityType.FEED, feedClusterPair.first);
- Cluster cluster = FeedHelper.getCluster(feed, feedClusterPair.second);
- Sla sla = FeedHelper.getSLA(cluster, feed);
- if (sla != null) {
- Set<Pair<Date, String>> slaStatus = getFeedSLAStatus(sla, start, end,
- MONITORING_JDBC_STATE_STORE.getNominalInstances(pendingInstanceBean.getEntityName(),
- pendingInstanceBean.getClusterName(), EntityType.FEED.toString()));
- for (Pair<Date, String> status : slaStatus) {
- SchedulableEntityInstance instance = new SchedulableEntityInstance(feedClusterPair.first,
- feedClusterPair.second, status.first, EntityType.FEED);
- instance.setTags(status.second);
- result.add(instance);
+ String entityType = pendingInstanceBean.getEntityType();
+ if (entityType.equalsIgnoreCase(EntityType.FEED.toString())){
+ Feed feed = EntityUtil.getEntity(entityType, entityClusterPair.first);
+ Cluster cluster = FeedHelper.getCluster(feed, entityClusterPair.second);
+ Sla sla = FeedHelper.getSLA(cluster, feed);
+ if (sla != null) {
+ Set<Pair<Date, String>> slaStatus = getFeedSLAStatus(sla, start, end,
+ MONITORING_JDBC_STATE_STORE.getNominalInstances(entityClusterPair.first,
+ entityClusterPair.second, entityType));
+ for (Pair<Date, String> status : slaStatus) {
+ SchedulableEntityInstance instance = new SchedulableEntityInstance(entityClusterPair.first,
+ entityClusterPair.second, status.first, EntityType.FEED);
+ instance.setTags(status.second);
+ result.add(instance);
+ }
+ }
+ } else {
+ Process process = EntityUtil.getEntity(entityType, entityClusterPair.first);
+ org.apache.falcon.entity.v0.process.Cluster cluster = ProcessHelper.getCluster(process,
+ entityClusterPair.second);
+ org.apache.falcon.entity.v0.process.Sla sla = ProcessHelper.getSLA(cluster, process);
+ if (sla != null){
+ Set<Pair<Date, String>> slaStatus = getProcessSLAStatus(sla, start, end,
+ MONITORING_JDBC_STATE_STORE.getNominalInstances(entityClusterPair.first,
+ entityClusterPair.second, entityType));
+ for (Pair<Date, String> status : slaStatus) {
+ SchedulableEntityInstance instance = new SchedulableEntityInstance(entityClusterPair.first,
+ entityClusterPair.second, status.first, EntityType.PROCESS);
+ instance.setTags(status.second);
+ result.add(instance);
+ }
}
}
}
@@ -528,12 +546,10 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList
*/
public Set<SchedulableEntityInstance> getEntitySLAMissPendingAlerts(String entityName, String clusterName,
Date start, Date end, String entityType) throws FalconException {
-
Set<SchedulableEntityInstance> result = new HashSet<>();
List<Date> missingInstances = MONITORING_JDBC_STATE_STORE.getNominalInstances(entityName, clusterName,
entityType);
- if (missingInstances == null || !Arrays.asList(EntityType.FEED.toString(),
- EntityType.PROCESS.toString()).contains(entityType)){
+ if (missingInstances == null){
return result;
}
Entity entity = EntityUtil.getEntity(entityType, entityName);
http://git-wip-us.apache.org/repos/asf/falcon/blob/91c0a992/prism/src/test/java/org/apache/falcon/service/EntitySLAAlertServiceTest.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/service/EntitySLAAlertServiceTest.java b/prism/src/test/java/org/apache/falcon/service/EntitySLAAlertServiceTest.java
index c8b4f5e..8b51354 100644
--- a/prism/src/test/java/org/apache/falcon/service/EntitySLAAlertServiceTest.java
+++ b/prism/src/test/java/org/apache/falcon/service/EntitySLAAlertServiceTest.java
@@ -105,7 +105,7 @@ public class EntitySLAAlertServiceTest extends AbstractTestBase {
}
}
- @Test
+ @Test(expectedExceptions = javax.persistence.NoResultException.class)
public static void processSLALowCandidates() throws FalconException, InterruptedException{
Date dateOne = new Date(System.currentTimeMillis()-100000);
@@ -141,7 +141,7 @@ public class EntitySLAAlertServiceTest extends AbstractTestBase {
dateOne, EntityType.FEED.toString()).getIsSLALowMissed());
}
- @Test
+ @Test(expectedExceptions = javax.persistence.NoResultException.class)
public static void processSLACandidateProcess() throws FalconException, InterruptedException{
Date dateOne = new Date(System.currentTimeMillis()-130000);
http://git-wip-us.apache.org/repos/asf/falcon/blob/91c0a992/prism/src/test/java/org/apache/falcon/service/EntitySLAMonitoringTest.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/service/EntitySLAMonitoringTest.java b/prism/src/test/java/org/apache/falcon/service/EntitySLAMonitoringTest.java
new file mode 100644
index 0000000..2bc4cbf
--- /dev/null
+++ b/prism/src/test/java/org/apache/falcon/service/EntitySLAMonitoringTest.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.service;
+
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.TimeZone;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.Pair;
+import org.apache.falcon.entity.AbstractTestBase;
+import org.apache.falcon.entity.EntityNotRegisteredException;
+import org.apache.falcon.entity.parser.ValidationException;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.entity.v0.SchemaHelper;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.ClusterType;
+import org.apache.falcon.entity.v0.feed.Clusters;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.Sla;
+import org.apache.falcon.entity.v0.feed.Validity;
+import org.apache.falcon.resource.AbstractSchedulableEntityManager;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+/**
+ * Tests for EntitySLAMonitoring Service.
+ */
+public class EntitySLAMonitoringTest extends AbstractTestBase {
+ private static final String CLUSTER_NAME = "testCluster";
+ private static final String FEED_NAME = "testFeed";
+ private static final TimeZone UTC = TimeZone.getTimeZone("UTC");
+ private static final String TAG_CRITICAL = EntitySLAMonitoringService.get().TAG_CRITICAL;
+
+ @Test
+ public void testSLAStatus() throws FalconException {
+ // sla, start, end, missingInstances
+ Sla sla = new Sla();
+ sla.setSlaLow(new Frequency("days(1)"));
+ sla.setSlaHigh(new Frequency("days(2)"));
+
+ Date start = SchemaHelper.parseDateUTC("2014-05-05T00:00Z");
+ Date end = SchemaHelper.parseDateUTC("2015-05-05T00:00Z");
+
+ List<Date> missingInstances = new ArrayList<>();
+ missingInstances.add(SchemaHelper.parseDateUTC("2013-05-05T00:00Z")); // before start time
+ missingInstances.add(SchemaHelper.parseDateUTC("2014-05-05T00:00Z")); // equal to start time
+ missingInstances.add(SchemaHelper.parseDateUTC("2014-05-06T00:00Z")); // in between
+ missingInstances.add(SchemaHelper.parseDateUTC("2014-05-07T00:00Z"));
+ missingInstances.add(SchemaHelper.parseDateUTC("2015-05-05T00:00Z")); // equal to end time
+ missingInstances.add(SchemaHelper.parseDateUTC("2015-05-06T00:00Z")); // after end time
+
+ Set<Pair<Date, String>> result = EntitySLAMonitoringService.get().getFeedSLAStatus(sla, start, end,
+ missingInstances);
+ Set<Pair<Date, String>> expected = new HashSet<>();
+ expected.add(new Pair<>(SchemaHelper.parseDateUTC("2014-05-05T00:00Z"), TAG_CRITICAL));
+ expected.add(new Pair<>(SchemaHelper.parseDateUTC("2014-05-06T00:00Z"), TAG_CRITICAL));
+ expected.add(new Pair<>(SchemaHelper.parseDateUTC("2014-05-07T00:00Z"), TAG_CRITICAL));
+ expected.add(new Pair<>(SchemaHelper.parseDateUTC("2015-05-05T00:00Z"), TAG_CRITICAL));
+ Assert.assertEquals(result, expected);
+ }
+
+ @Test(expectedExceptions = EntityNotRegisteredException.class,
+ expectedExceptionsMessageRegExp = ".*\\(FEED\\) not found.*")
+ public void testInvalidFeedName() throws FalconException {
+ AbstractSchedulableEntityManager.validateSlaParams("feed",
+ "non-existent", "2015-05-05T00:00Z", "2015-05-05T00:00Z", "*");
+ }
+
+ @Test(expectedExceptions = EntityNotRegisteredException.class,
+ expectedExceptionsMessageRegExp = ".*\\(PROCESS\\) not found.*")
+ public void testInvalidProcessName() throws FalconException {
+ AbstractSchedulableEntityManager.validateSlaParams("process",
+ "non-existent", "2015-05-05T00:00Z", "2015-05-05T00:00Z", "*");
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class,
+ expectedExceptionsMessageRegExp = "2015-05-00T00:00Z is not a valid UTC string")
+ public void testInvalidStart() throws FalconException {
+ AbstractSchedulableEntityManager.validateSlaParams("feed", null, "2015-05-00T00:00Z", "2015-05-05T00:00Z", "*");
+ AbstractSchedulableEntityManager.validateSlaParams("process", null,
+ "2015-05-00T00:00Z", "2015-05-05T00:00Z", "*");
+ }
+
+ @Test(expectedExceptions = ValidationException.class,
+ expectedExceptionsMessageRegExp = "start can not be after end")
+ public void testInvalidRange() throws FalconException {
+ AbstractSchedulableEntityManager.validateSlaParams("feed",
+ null, "2015-05-05T00:00Z", "2014-05-05T00:00Z", "*");
+ AbstractSchedulableEntityManager.validateSlaParams("process",
+ null, "2015-05-05T00:00Z", "2014-05-05T00:00Z", "*");
+ }
+
+ @Test
+ public void testOptionalName() throws FalconException {
+ AbstractSchedulableEntityManager.validateSlaParams("feed", null, "2015-05-05T00:00Z", "2015-05-05T00:00Z", "*");
+ AbstractSchedulableEntityManager.validateSlaParams("feed", "", "2015-05-05T00:00Z", "2015-05-05T00:00Z", "*");
+ AbstractSchedulableEntityManager.validateSlaParams("process", null, "2015-05-05T00:00Z", "2015-05-05T00:00Z",
+ "*");
+ AbstractSchedulableEntityManager.validateSlaParams("process", "", "2015-05-05T00:00Z", "2015-05-05T00:00Z",
+ "*");
+ }
+
+ @Test
+ public void testOptionalEnd() throws FalconException {
+ AbstractSchedulableEntityManager.validateSlaParams("feed", null, "2015-05-05T00:00Z", "", "*");
+ AbstractSchedulableEntityManager.validateSlaParams("feed", null, "2015-05-05T00:00Z", null, "*");
+ AbstractSchedulableEntityManager.validateSlaParams("process", null, "2015-05-05T00:00Z", "", "*");
+ AbstractSchedulableEntityManager.validateSlaParams("process", null, "2015-05-05T00:00Z", null, "*");
+ }
+
+ private Cluster publishCluster() throws FalconException {
+ Cluster cluster = new Cluster();
+ cluster.setName(CLUSTER_NAME);
+ cluster.setColo("default");
+ getStore().publish(EntityType.CLUSTER, cluster);
+ return cluster;
+
+ }
+
+ private Feed publishFeed(Cluster cluster, String frequency, String start, String end)
+ throws FalconException, ParseException {
+ Feed feed = new Feed();
+ feed.setName(FEED_NAME);
+ Frequency f = new Frequency(frequency);
+ feed.setFrequency(f);
+ feed.setTimezone(UTC);
+ Clusters fClusters = new Clusters();
+ org.apache.falcon.entity.v0.feed.Cluster fCluster = new org.apache.falcon.entity.v0.feed.Cluster();
+ fCluster.setType(ClusterType.SOURCE);
+ fCluster.setName(cluster.getName());
+ fCluster.setValidity(getFeedValidity(start, end));
+ fClusters.getClusters().add(fCluster);
+ feed.setClusters(fClusters);
+ getStore().publish(EntityType.FEED, feed);
+ return feed;
+ }
+
+ private Validity getFeedValidity(String start, String end) throws ParseException {
+ Validity validity = new Validity();
+ validity.setStart(getDate(start));
+ validity.setEnd(getDate(end));
+ return validity;
+ }
+
+ private Date getDate(String dateString) throws ParseException {
+ DateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm Z");
+ return format.parse(dateString);
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/91c0a992/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java b/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java
deleted file mode 100644
index 9cf50c2..0000000
--- a/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.service;
-
-import java.text.DateFormat;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.TimeZone;
-
-import org.apache.falcon.FalconException;
-import org.apache.falcon.Pair;
-import org.apache.falcon.entity.AbstractTestBase;
-import org.apache.falcon.entity.EntityNotRegisteredException;
-import org.apache.falcon.entity.parser.ValidationException;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.Frequency;
-import org.apache.falcon.entity.v0.SchemaHelper;
-import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.entity.v0.feed.ClusterType;
-import org.apache.falcon.entity.v0.feed.Clusters;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.feed.Sla;
-import org.apache.falcon.entity.v0.feed.Validity;
-import org.apache.falcon.resource.AbstractSchedulableEntityManager;
-
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-/**
- * Tests for FeedSLAMonitoring Service.
- */
-public class FeedSLAMonitoringTest extends AbstractTestBase {
- private static final String CLUSTER_NAME = "testCluster";
- private static final String FEED_NAME = "testFeed";
- private static final TimeZone UTC = TimeZone.getTimeZone("UTC");
- private static final String TAG_CRITICAL = EntitySLAMonitoringService.get().TAG_CRITICAL;
-
- @Test
- public void testSLAStatus() throws FalconException {
- // sla, start, end, missingInstances
- Sla sla = new Sla();
- sla.setSlaLow(new Frequency("days(1)"));
- sla.setSlaHigh(new Frequency("days(2)"));
-
- Date start = SchemaHelper.parseDateUTC("2014-05-05T00:00Z");
- Date end = SchemaHelper.parseDateUTC("2015-05-05T00:00Z");
-
- List<Date> missingInstances = new ArrayList<>();
- missingInstances.add(SchemaHelper.parseDateUTC("2013-05-05T00:00Z")); // before start time
- missingInstances.add(SchemaHelper.parseDateUTC("2014-05-05T00:00Z")); // equal to start time
- missingInstances.add(SchemaHelper.parseDateUTC("2014-05-06T00:00Z")); // in between
- missingInstances.add(SchemaHelper.parseDateUTC("2014-05-07T00:00Z"));
- missingInstances.add(SchemaHelper.parseDateUTC("2015-05-05T00:00Z")); // equal to end time
- missingInstances.add(SchemaHelper.parseDateUTC("2015-05-06T00:00Z")); // after end time
-
- Set<Pair<Date, String>> result = EntitySLAMonitoringService.get().getFeedSLAStatus(sla, start, end,
- missingInstances);
- Set<Pair<Date, String>> expected = new HashSet<>();
- expected.add(new Pair<>(SchemaHelper.parseDateUTC("2014-05-05T00:00Z"), TAG_CRITICAL));
- expected.add(new Pair<>(SchemaHelper.parseDateUTC("2014-05-06T00:00Z"), TAG_CRITICAL));
- expected.add(new Pair<>(SchemaHelper.parseDateUTC("2014-05-07T00:00Z"), TAG_CRITICAL));
- expected.add(new Pair<>(SchemaHelper.parseDateUTC("2015-05-05T00:00Z"), TAG_CRITICAL));
- Assert.assertEquals(result, expected);
- }
-
- @Test(expectedExceptions = ValidationException.class,
- expectedExceptionsMessageRegExp = "SLA monitoring is not supported for: PROCESS")
- public void testInvalidType() throws FalconException {
- AbstractSchedulableEntityManager.validateSlaParams("process",
- "in", "2015-05-05T00:00Z", "2015-05-05T00:00Z", "*");
- }
-
- @Test(expectedExceptions = EntityNotRegisteredException.class,
- expectedExceptionsMessageRegExp = ".*\\(FEED\\) not found.*")
- public void testInvalidName() throws FalconException {
- AbstractSchedulableEntityManager.validateSlaParams("feed",
- "non-existent", "2015-05-05T00:00Z", "2015-05-05T00:00Z", "*");
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class,
- expectedExceptionsMessageRegExp = "2015-05-00T00:00Z is not a valid UTC string")
- public void testInvalidStart() throws FalconException {
- AbstractSchedulableEntityManager.validateSlaParams("feed", null, "2015-05-00T00:00Z", "2015-05-05T00:00Z", "*");
- }
-
- @Test(expectedExceptions = ValidationException.class,
- expectedExceptionsMessageRegExp = "start can not be after end")
- public void testInvalidRange() throws FalconException {
- AbstractSchedulableEntityManager.validateSlaParams("feed",
- null, "2015-05-05T00:00Z", "2014-05-05T00:00Z", "*");
- }
-
- @Test
- public void testOptionalName() throws FalconException {
- AbstractSchedulableEntityManager.validateSlaParams("feed", null, "2015-05-05T00:00Z", "2015-05-05T00:00Z", "*");
- AbstractSchedulableEntityManager.validateSlaParams("feed", "", "2015-05-05T00:00Z", "2015-05-05T00:00Z", "*");
- }
-
- @Test
- public void testOptionalEnd() throws FalconException {
- AbstractSchedulableEntityManager.validateSlaParams("feed", null, "2015-05-05T00:00Z", "", "*");
- AbstractSchedulableEntityManager.validateSlaParams("feed", null, "2015-05-05T00:00Z", null, "*");
- }
-
- private Cluster publishCluster() throws FalconException {
- Cluster cluster = new Cluster();
- cluster.setName(CLUSTER_NAME);
- cluster.setColo("default");
- getStore().publish(EntityType.CLUSTER, cluster);
- return cluster;
-
- }
-
- private Feed publishFeed(Cluster cluster, String frequency, String start, String end)
- throws FalconException, ParseException {
- Feed feed = new Feed();
- feed.setName(FEED_NAME);
- Frequency f = new Frequency(frequency);
- feed.setFrequency(f);
- feed.setTimezone(UTC);
- Clusters fClusters = new Clusters();
- org.apache.falcon.entity.v0.feed.Cluster fCluster = new org.apache.falcon.entity.v0.feed.Cluster();
- fCluster.setType(ClusterType.SOURCE);
- fCluster.setName(cluster.getName());
- fCluster.setValidity(getFeedValidity(start, end));
- fClusters.getClusters().add(fCluster);
- feed.setClusters(fClusters);
- getStore().publish(EntityType.FEED, feed);
- return feed;
- }
-
- private Validity getFeedValidity(String start, String end) throws ParseException {
- Validity validity = new Validity();
- validity.setStart(getDate(start));
- validity.setEnd(getDate(end));
- return validity;
- }
-
- private Date getDate(String dateString) throws ParseException {
- DateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm Z");
- return format.parse(dateString);
- }
-}
http://git-wip-us.apache.org/repos/asf/falcon/blob/91c0a992/prism/src/test/resources/startup.properties
----------------------------------------------------------------------
diff --git a/prism/src/test/resources/startup.properties b/prism/src/test/resources/startup.properties
index d72dbba..5258b96 100644
--- a/prism/src/test/resources/startup.properties
+++ b/prism/src/test/resources/startup.properties
@@ -73,7 +73,7 @@
# org.apache.falcon.state.store.jdbc.JdbcStateStore
## If you wish to use Feed Alert to know when a feed misses a high SLA register your class here
-*.feedAlert.listeners=
+*.entityAlert.listeners=
##### JMS MQ Broker Implementation class #####
*.broker.impl.class=org.apache.activemq.ActiveMQConnectionFactory
@@ -97,7 +97,7 @@
#Configurations used in UTs
debug.config.store.uri=file://${user.dir}/target/store
#Location to store state of Feed SLA monitoring service
-debug.feed.sla.service.store.uri= file://${user.dir}/target/data/sla/pendingfeedinstances
+debug.entity.sla.service.store.uri= file://${user.dir}/target/data/sla/pendingentityinstances
debug.config.oozie.conf.uri=${user.dir}/target/oozie
debug.system.lib.location=${system.lib.location}
debug.broker.url=vm://localhost
@@ -131,13 +131,13 @@ it.workflow.execution.listeners=org.apache.falcon.catalog.CatalogPartitionHandle
# Do not change unless really sure
# Frequency in seconds of "status check" for pending feed instances, default is 10 mins = 10 * 60
-*.feed.sla.statusCheck.frequency.seconds=600
+*.entity.sla.statusCheck.frequency.seconds=600
# Do not change unless really sure
# Time Duration (in milliseconds) in future for generating pending feed instances.
# In every cycle pending feed instances are added for monitoring, till this time in future.
# It must be more than statusCheck frequency, default is 15 mins = 15 * 60 * 1000
-*.feed.sla.lookAheadWindow.millis=900000
+*.entity.sla.lookAheadWindow.millis=900000
######### Properties for configuring JMS provider - activemq #########
http://git-wip-us.apache.org/repos/asf/falcon/blob/91c0a992/src/conf/startup.properties
----------------------------------------------------------------------
diff --git a/src/conf/startup.properties b/src/conf/startup.properties
index b663f04..6d82516 100644
--- a/src/conf/startup.properties
+++ b/src/conf/startup.properties
@@ -137,7 +137,7 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\
*.extension.store.uri=file://${falcon.home}/extensions/
#Location to store state of Feed SLA monitoring service
-*.feed.sla.service.store.uri = file://${falcon.home}/data/sla/pendingfeedinstances
+*.entity.sla.service.store.uri = file://${falcon.home}/data/sla/pendingentityinstances
# Location of libraries that is shipped to Hadoop
*.system.lib.location=${falcon.home}/server/webapp/${falcon.app.type}/WEB-INF/lib
@@ -152,7 +152,7 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\
# Default timeout in minutes to load entities
*.config.store.start.timeout.minutes=30
-######### Properties for Feed SLA Monitoring #########
+######### Properties for Entity SLA Monitoring #########
# frequency of serialization for the state of FeedSLAMonitoringService - 1 hour
*.feed.sla.serialization.frequency.millis=3600000
@@ -162,16 +162,16 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\
# Do not change unless really sure
# Frequency in seconds of "status check" for pending feed instances, default is 10 mins = 10 * 60
-*.feed.sla.statusCheck.frequency.seconds=600
+*.entity.sla.statusCheck.frequency.seconds=600
# Do not change unless really sure
# Time Duration (in milliseconds) in future for generating pending feed instances.
# In every cycle pending feed instances are added for monitoring, till this time in future.
# It must be more than statusCheck frequency, default is 15 mins = 15 * 60 * 1000
-*.feed.sla.lookAheadWindow.millis=900000
+*.entity.sla.lookAheadWindow.millis=900000
##Add if you want to enable BacklogMetricService
-#*.feedAlert.listeners=org.apache.falcon.service.BacklogMetricEmitterService
+#*.entityAlert.listeners=org.apache.falcon.service.BacklogMetricEmitterService
######### Properties for configuring JMS provider - activemq #########
# Default Active MQ url
http://git-wip-us.apache.org/repos/asf/falcon/blob/91c0a992/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java b/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java
index 657ef9e..5525207 100644
--- a/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java
+++ b/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java
@@ -168,8 +168,8 @@ public class SchedulableEntityManager extends AbstractSchedulableEntityManager {
@GET
@Path("sla-alert/{type}")
@Produces({MediaType.APPLICATION_JSON, MediaType.TEXT_XML})
- @Monitored(event = "feed-sla-misses")
- public SchedulableEntityInstanceResult getFeedSLAMissPendingAlerts(
+ @Monitored(event = "entity-sla-misses")
+ public SchedulableEntityInstanceResult getEntitySLAMissPendingAlerts(
@Dimension("entityType") @PathParam("type") String entityType,
@Dimension("entityName") @QueryParam("name") String entityName,
@Dimension("start") @QueryParam("start") String start,
@@ -177,7 +177,7 @@ public class SchedulableEntityManager extends AbstractSchedulableEntityManager {
@Dimension("colo") @QueryParam("colo") final String colo) {
try {
validateSlaParams(entityType, entityName, start, end, colo);
- return super.getFeedSLAMissPendingAlerts(entityName, start, end, colo);
+ return super.getEntitySLAMissPendingAlerts(entityName, entityType, start, end, colo);
} catch (Throwable e) {
throw FalconWebException.newAPIException(e);
}