You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pa...@apache.org on 2017/08/31 09:09:42 UTC
falcon git commit: FALCON-2303 Backloginmins is not getting updated
Repository: falcon
Updated Branches:
refs/heads/master 90cb996f5 -> bfd1805ba
FALCON-2303 Backloginmins is not getting updated
Author: Praveen Adlakha <pr...@im1314-x2.corp.inmobi.com>
Reviewers: @pallavi-rao
Closes #385 from PraveenAdlakha/backlogfix and squashes the following commits:
8ad078946 [Praveen Adlakha] comments addressed
182e31c64 [Praveen Adlakha] FALCON-2303 Backloginmins is not getting updated
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/bfd1805b
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/bfd1805b
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/bfd1805b
Branch: refs/heads/master
Commit: bfd1805ba13e7755f01754edd062da3dc59aae3b
Parents: 90cb996
Author: Praveen Adlakha <pr...@im1314-x2.corp.inmobi.com>
Authored: Thu Aug 31 14:39:32 2017 +0530
Committer: Pallavi Rao <pa...@inmobi.com>
Committed: Thu Aug 31 14:39:32 2017 +0530
----------------------------------------------------------------------
.../falcon/persistence/PendingInstanceBean.java | 5 +++--
.../falcon/persistence/PersistenceConstants.java | 1 +
.../apache/falcon/jdbc/MonitoringJdbcStateStore.java | 14 ++++++++++++++
.../falcon/service/BacklogMetricEmitterService.java | 1 +
.../apache/falcon/service/EntitySLAAlertService.java | 4 ++--
.../falcon/service/EntitySLAMonitoringService.java | 14 +++++++++++---
src/conf/runtime.properties | 3 +++
7 files changed, 35 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/bfd1805b/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java b/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java
index 05c5ea3..d35d982 100644
--- a/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java
+++ b/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java
@@ -42,8 +42,9 @@ import java.util.Date;
@NamedQuery(name = PersistenceConstants.DELETE_PENDING_NOMINAL_INSTANCES , query = "delete from PendingInstanceBean a where a.entityName = :entityName and a.clusterName = :clusterName and a.nominalTime = :nominalTime and a.entityType = :entityType"),
@NamedQuery(name = PersistenceConstants.DELETE_ALL_PENDING_INSTANCES_FOR_ENTITY, query = "delete from PendingInstanceBean a where a.entityName = :entityName and a.clusterName = :clusterName and a.entityType = :entityType"),
@NamedQuery(name = PersistenceConstants.GET_DATE_FOR_PENDING_INSTANCES , query = "select a.nominalTime from PendingInstanceBean a where a.entityName = :entityName and a.clusterName = :clusterName and a.entityType = :entityType"),
- @NamedQuery(name= PersistenceConstants.GET_ALL_PENDING_INSTANCES , query = "select OBJECT(a) from PendingInstanceBean a order by a.nominalTime asc"),
- @NamedQuery(name= PersistenceConstants.GET_PENDING_INSTANCE , query = "select OBJECT(a) from PendingInstanceBean a where a.entityName = :entityName and a.clusterName = :clusterName and a.nominalTime = :nominalTime and a.entityType = :entityType")
+ @NamedQuery(name = PersistenceConstants.GET_ALL_PENDING_INSTANCES , query = "select OBJECT(a) from PendingInstanceBean a order by a.nominalTime asc"),
+ @NamedQuery(name = PersistenceConstants.GET_PENDING_INSTANCE , query = "select OBJECT(a) from PendingInstanceBean a where a.entityName = :entityName and a.clusterName = :clusterName and a.nominalTime = :nominalTime and a.entityType = :entityType"),
+ @NamedQuery(name = PersistenceConstants.GET_PENDING_INSTANCES_BETWEEN_TIME_RANGE, query = "select a.nominalTime from PendingInstanceBean a where a.entityName = :entityName and a.clusterName = :clusterName and a.entityType = :entityType and a.nominalTime >= :startTime and a.nominalTime < :endTime ")
})
@Table(name = "PENDING_INSTANCES")
//RESUME CHECKSTYLE CHECK LineLengthCheck
http://git-wip-us.apache.org/repos/asf/falcon/blob/bfd1805b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
index 1e6a04b..90dcf50 100644
--- a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
+++ b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
@@ -82,4 +82,5 @@ public final class PersistenceConstants {
public static final String GET_EXTENSION_JOB = "GET_EXTENSION_JOB";
public static final String GET_JOBS_FOR_AN_EXTENSION = "GET_JOBS_FOR_AN_EXTENSION";
public static final String GET_ALL_PROCESS_INFO_INSTANCES = "GET_ALL_PROCESS_INFO_INSTANCES";
+ public static final String GET_PENDING_INSTANCES_BETWEEN_TIME_RANGE = "GET_PENDING_INSTANCES_BETWEEN_TIME_RANGE";
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/bfd1805b/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java b/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java
index 8da2389..51eac94 100644
--- a/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java
+++ b/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java
@@ -182,6 +182,20 @@ public class MonitoringJdbcStateStore {
return result;
}
+ public List<Date> getNominalInstancesBetweenTimeRange(String entityName, String clusterName, String entityType,
+ Date startTime, Date endTime) {
+ EntityManager entityManager = getEntityManager();
+ Query q = entityManager.createNamedQuery(PersistenceConstants.GET_PENDING_INSTANCES_BETWEEN_TIME_RANGE);
+ q.setParameter(PendingInstanceBean.ENTITY_NAME, entityName);
+ q.setParameter(PendingInstanceBean.CLUSTER_NAME, clusterName);
+ q.setParameter(PendingInstanceBean.ENTITY_TYPE, entityType.toLowerCase());
+ q.setParameter("startTime", startTime);
+ q.setParameter("endTime", endTime);
+ List result = q.getResultList();
+ entityManager.close();
+ return result;
+ }
+
public List<PendingInstanceBean> getAllPendingInstances(){
EntityManager entityManager = getEntityManager();
Query q = entityManager.createNamedQuery(PersistenceConstants.GET_ALL_PENDING_INSTANCES);
http://git-wip-us.apache.org/repos/asf/falcon/blob/bfd1805b/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java b/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java
index 2480c96..50170b9 100644
--- a/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java
+++ b/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java
@@ -117,6 +117,7 @@ public final class BacklogMetricEmitterService implements FalconService,
}
Process process = (Process) entity;
if (process.getSla() != null) {
+ LOG.debug("Removing process:{} from monitoring", process.getName());
backlogMetricStore.deleteEntityBackLogInstances(entity.getName(), entity.getEntityType().name());
entityBacklogs.remove(entity);
process = EntityUtil.getEntity(entity.getEntityType(), entity.getName());
http://git-wip-us.apache.org/repos/asf/falcon/blob/bfd1805b/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java b/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java
index 837a170..2f19e6b 100644
--- a/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java
+++ b/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java
@@ -112,7 +112,7 @@ public final class EntitySLAAlertService implements FalconService, EntitySLAList
if (pendingInstanceBeanList == null || pendingInstanceBeanList.isEmpty()){
return;
}
- LOG.trace("In processSLACandidates :" + pendingInstanceBeanList.size());
+ LOG.debug("In processSLACandidates :" + pendingInstanceBeanList.size());
try{
for (PendingInstanceBean pendingInstanceBean : pendingInstanceBeanList) {
@@ -129,7 +129,7 @@ public final class EntitySLAAlertService implements FalconService, EntitySLAList
if (schedulableEntityInstances.isEmpty()){
store.deleteEntityAlertInstance(entityName, cluster.getName(), nominalTime,
entityType);
- return;
+ continue;
}
List<SchedulableEntityInstance> schedulableEntityList = new ArrayList<>(schedulableEntityInstances);
SchedulableEntityInstance schedulableEntityInstance = schedulableEntityList.get(0);
http://git-wip-us.apache.org/repos/asf/falcon/blob/bfd1805b/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java b/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java
index 09671d9..8b3dbe4 100644
--- a/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java
+++ b/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java
@@ -55,6 +55,7 @@ import org.apache.falcon.resource.SchedulableEntityInstance;
import org.apache.falcon.security.CurrentUser;
import org.apache.falcon.util.DateUtil;
import org.apache.falcon.util.DeploymentUtil;
+import org.apache.falcon.util.RuntimeProperties;
import org.apache.falcon.util.StartupProperties;
import org.apache.falcon.workflow.WorkflowEngineFactory;
import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
@@ -207,9 +208,11 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList
if (feed.getSla() != null && feed.getLocations() != null) {
for (Cluster cluster : feed.getClusters().getClusters()) {
if (currentClusters.contains(cluster.getName()) && FeedHelper.getSLA(cluster, feed) != null) {
+ LOG.debug("Removing feed:{} for monitoring", feed.getName());
MONITORING_JDBC_STATE_STORE.deleteMonitoringEntity(feed.getName(), EntityType.FEED.toString());
MONITORING_JDBC_STATE_STORE.deletePendingInstances(feed.getName(), cluster.getName(),
EntityType.FEED.toString());
+ LOG.debug("Removing feed:{} for monitoring", feed.getName());
}
}
}
@@ -219,6 +222,7 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList
if (process.getSla() != null){
for (org.apache.falcon.entity.v0.process.Cluster cluster : process.getClusters().getClusters()) {
if (currentClusters.contains(cluster.getName())) {
+ LOG.debug("Removing feed:{} for monitoring", process.getName());
MONITORING_JDBC_STATE_STORE.deleteMonitoringEntity(process.getName(),
EntityType.PROCESS.toString());
MONITORING_JDBC_STATE_STORE.deletePendingInstances(process.getName(), cluster.getName(),
@@ -364,6 +368,8 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList
// add Instances from last checked time to 10 minutes from now(some buffer for status check)
Date newCheckPointTime = new Date(now().getTime() + lookAheadWindowMillis);
addPendingEntityInstances(newCheckPointTime);
+ } else {
+ LOG.debug("No entities present for sla monitoring.");
}
} catch (Throwable e) {
LOG.error("Feed SLA monitoring failed: ", e);
@@ -450,7 +456,6 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList
LOG.trace("Checking instance availability status for entity:{}, cluster:{}, "
+ "instanceTime:{}", entity.getName(), clusterName, nominalTime, entityType);
AbstractWorkflowEngine wfEngine = WorkflowEngineFactory.getWorkflowEngine();
-
InstancesResult instancesResult = wfEngine.getStatus(entity, nominalTime,
new Date(nominalTime.getTime() + 200), PROCESS_LIFE_CYCLE, false);
if (instancesResult.getInstances().length > 0) {
@@ -459,6 +464,9 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList
entity.getName(), clusterName, nominalTime);
return true;
}
+ } else if ((System.currentTimeMillis() - nominalTime.getTime())/(1000*60*60*24) >= Integer.parseInt(
+ RuntimeProperties.get().getProperty("workflow.history.expiration.period.days", "7"))) {
+ return true;
}
return false;
}
@@ -552,8 +560,8 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList
public Set<SchedulableEntityInstance> getEntitySLAMissPendingAlerts(String entityName, String clusterName,
Date start, Date end, String entityType) throws FalconException {
Set<SchedulableEntityInstance> result = new HashSet<>();
- List<Date> missingInstances = MONITORING_JDBC_STATE_STORE.getNominalInstances(entityName, clusterName,
- entityType);
+ List<Date> missingInstances = MONITORING_JDBC_STATE_STORE.getNominalInstancesBetweenTimeRange(entityName,
+ clusterName, entityType, start, end);
if (missingInstances == null){
return result;
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/bfd1805b/src/conf/runtime.properties
----------------------------------------------------------------------
diff --git a/src/conf/runtime.properties b/src/conf/runtime.properties
index 013ac18..62d12d4 100644
--- a/src/conf/runtime.properties
+++ b/src/conf/runtime.properties
@@ -86,3 +86,6 @@ falcon.current.colo=local
### Timeout factor for processes ###
instance.timeout.factor=5
+
+### Workflow expiration period for oozie ###
+workflow.history.expiration.period.days=7
\ No newline at end of file