You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pa...@apache.org on 2017/10/23 05:52:52 UTC

[18/26] falcon git commit: FALCON-2303 Backloginmins is not getting updated

FALCON-2303 Backloginmins is not getting updated

Author: Praveen Adlakha <pr...@im1314-x2.corp.inmobi.com>

Reviewers: @pallavi-rao

Closes #385 from PraveenAdlakha/backlogfix and squashes the following commits:

8ad078946 [Praveen Adlakha] comments addressed
182e31c64 [Praveen Adlakha] FALCON-2303 Backloginmins is not getting updated

(cherry picked from commit bfd1805ba13e7755f01754edd062da3dc59aae3b)
Signed-off-by: Pallavi Rao <pa...@inmobi.com>


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/84186f09
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/84186f09
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/84186f09

Branch: refs/heads/master
Commit: 84186f09e17aba7497be4a2bdec6f8b119405b0b
Parents: a2ef339
Author: Praveen Adlakha <pr...@im1314-x2.corp.inmobi.com>
Authored: Thu Aug 31 14:39:32 2017 +0530
Committer: Pallavi Rao <pa...@inmobi.com>
Committed: Thu Aug 31 14:53:55 2017 +0530

----------------------------------------------------------------------
 .../falcon/persistence/PendingInstanceBean.java       |  5 +++--
 .../falcon/persistence/PersistenceConstants.java      |  1 +
 .../apache/falcon/jdbc/MonitoringJdbcStateStore.java  | 14 ++++++++++++++
 .../falcon/service/BacklogMetricEmitterService.java   |  1 +
 .../apache/falcon/service/EntitySLAAlertService.java  |  4 ++--
 .../falcon/service/EntitySLAMonitoringService.java    | 14 +++++++++++---
 src/conf/runtime.properties                           |  3 +++
 7 files changed, 35 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/84186f09/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java b/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java
index 05c5ea3..d35d982 100644
--- a/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java
+++ b/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java
@@ -42,8 +42,9 @@ import java.util.Date;
     @NamedQuery(name = PersistenceConstants.DELETE_PENDING_NOMINAL_INSTANCES , query = "delete from PendingInstanceBean a where a.entityName = :entityName and a.clusterName = :clusterName and a.nominalTime = :nominalTime and a.entityType = :entityType"),
     @NamedQuery(name = PersistenceConstants.DELETE_ALL_PENDING_INSTANCES_FOR_ENTITY, query = "delete from PendingInstanceBean a where a.entityName = :entityName and a.clusterName = :clusterName and a.entityType = :entityType"),
     @NamedQuery(name = PersistenceConstants.GET_DATE_FOR_PENDING_INSTANCES , query = "select a.nominalTime from PendingInstanceBean a where a.entityName = :entityName and a.clusterName = :clusterName and a.entityType = :entityType"),
-    @NamedQuery(name= PersistenceConstants.GET_ALL_PENDING_INSTANCES , query = "select  OBJECT(a) from PendingInstanceBean a  order by a.nominalTime asc"),
-    @NamedQuery(name= PersistenceConstants.GET_PENDING_INSTANCE , query = "select  OBJECT(a) from PendingInstanceBean a  where a.entityName = :entityName and a.clusterName = :clusterName and a.nominalTime = :nominalTime and a.entityType = :entityType")
+    @NamedQuery(name = PersistenceConstants.GET_ALL_PENDING_INSTANCES , query = "select  OBJECT(a) from PendingInstanceBean a  order by a.nominalTime asc"),
+    @NamedQuery(name = PersistenceConstants.GET_PENDING_INSTANCE , query = "select  OBJECT(a) from PendingInstanceBean a  where a.entityName = :entityName and a.clusterName = :clusterName and a.nominalTime = :nominalTime and a.entityType = :entityType"),
+    @NamedQuery(name = PersistenceConstants.GET_PENDING_INSTANCES_BETWEEN_TIME_RANGE, query = "select a.nominalTime from PendingInstanceBean a where a.entityName = :entityName  and a.clusterName = :clusterName and a.entityType = :entityType and a.nominalTime >= :startTime  and a.nominalTime < :endTime ")
 })
 @Table(name = "PENDING_INSTANCES")
 //RESUME CHECKSTYLE CHECK  LineLengthCheck

http://git-wip-us.apache.org/repos/asf/falcon/blob/84186f09/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
index 1e6a04b..90dcf50 100644
--- a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
+++ b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
@@ -82,4 +82,5 @@ public final class PersistenceConstants {
     public static final String GET_EXTENSION_JOB = "GET_EXTENSION_JOB";
     public static final String GET_JOBS_FOR_AN_EXTENSION = "GET_JOBS_FOR_AN_EXTENSION";
     public static final String GET_ALL_PROCESS_INFO_INSTANCES = "GET_ALL_PROCESS_INFO_INSTANCES";
+    public static final String GET_PENDING_INSTANCES_BETWEEN_TIME_RANGE = "GET_PENDING_INSTANCES_BETWEEN_TIME_RANGE";
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/84186f09/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java b/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java
index 8da2389..51eac94 100644
--- a/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java
+++ b/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java
@@ -182,6 +182,20 @@ public class MonitoringJdbcStateStore {
         return result;
     }
 
+    public List<Date> getNominalInstancesBetweenTimeRange(String entityName, String clusterName, String entityType,
+                                                          Date startTime, Date endTime) {
+        EntityManager entityManager = getEntityManager();
+        Query q = entityManager.createNamedQuery(PersistenceConstants.GET_PENDING_INSTANCES_BETWEEN_TIME_RANGE);
+        q.setParameter(PendingInstanceBean.ENTITY_NAME, entityName);
+        q.setParameter(PendingInstanceBean.CLUSTER_NAME, clusterName);
+        q.setParameter(PendingInstanceBean.ENTITY_TYPE, entityType.toLowerCase());
+        q.setParameter("startTime", startTime);
+        q.setParameter("endTime", endTime);
+        List result = q.getResultList();
+        entityManager.close();
+        return result;
+    }
+
     public List<PendingInstanceBean> getAllPendingInstances(){
         EntityManager entityManager = getEntityManager();
         Query q = entityManager.createNamedQuery(PersistenceConstants.GET_ALL_PENDING_INSTANCES);

http://git-wip-us.apache.org/repos/asf/falcon/blob/84186f09/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java b/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java
index 2480c96..50170b9 100644
--- a/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java
+++ b/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java
@@ -117,6 +117,7 @@ public final class BacklogMetricEmitterService implements FalconService,
         }
         Process process = (Process) entity;
         if (process.getSla() != null) {
+            LOG.debug("Removing process:{} from monitoring", process.getName());
             backlogMetricStore.deleteEntityBackLogInstances(entity.getName(), entity.getEntityType().name());
             entityBacklogs.remove(entity);
             process = EntityUtil.getEntity(entity.getEntityType(), entity.getName());

http://git-wip-us.apache.org/repos/asf/falcon/blob/84186f09/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java b/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java
index 837a170..2f19e6b 100644
--- a/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java
+++ b/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java
@@ -112,7 +112,7 @@ public final class EntitySLAAlertService implements FalconService, EntitySLAList
         if (pendingInstanceBeanList == null || pendingInstanceBeanList.isEmpty()){
             return;
         }
-        LOG.trace("In processSLACandidates :" + pendingInstanceBeanList.size());
+        LOG.debug("In processSLACandidates :" + pendingInstanceBeanList.size());
         try{
             for (PendingInstanceBean pendingInstanceBean : pendingInstanceBeanList) {
 
@@ -129,7 +129,7 @@ public final class EntitySLAAlertService implements FalconService, EntitySLAList
                 if (schedulableEntityInstances.isEmpty()){
                     store.deleteEntityAlertInstance(entityName, cluster.getName(), nominalTime,
                             entityType);
-                    return;
+                    continue;
                 }
                 List<SchedulableEntityInstance> schedulableEntityList = new ArrayList<>(schedulableEntityInstances);
                 SchedulableEntityInstance schedulableEntityInstance = schedulableEntityList.get(0);

http://git-wip-us.apache.org/repos/asf/falcon/blob/84186f09/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java b/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java
index 09671d9..8b3dbe4 100644
--- a/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java
+++ b/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java
@@ -55,6 +55,7 @@ import org.apache.falcon.resource.SchedulableEntityInstance;
 import org.apache.falcon.security.CurrentUser;
 import org.apache.falcon.util.DateUtil;
 import org.apache.falcon.util.DeploymentUtil;
+import org.apache.falcon.util.RuntimeProperties;
 import org.apache.falcon.util.StartupProperties;
 import org.apache.falcon.workflow.WorkflowEngineFactory;
 import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
@@ -207,9 +208,11 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList
             if (feed.getSla() != null && feed.getLocations() != null) {
                 for (Cluster cluster : feed.getClusters().getClusters()) {
                     if (currentClusters.contains(cluster.getName()) && FeedHelper.getSLA(cluster, feed) != null) {
+                        LOG.debug("Removing feed:{} for monitoring", feed.getName());
                         MONITORING_JDBC_STATE_STORE.deleteMonitoringEntity(feed.getName(), EntityType.FEED.toString());
                         MONITORING_JDBC_STATE_STORE.deletePendingInstances(feed.getName(), cluster.getName(),
                                 EntityType.FEED.toString());
+                        LOG.debug("Removing feed:{} for monitoring", feed.getName());
                     }
                 }
             }
@@ -219,6 +222,7 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList
             if (process.getSla() != null){
                 for (org.apache.falcon.entity.v0.process.Cluster  cluster : process.getClusters().getClusters()) {
                     if (currentClusters.contains(cluster.getName())) {
+                        LOG.debug("Removing feed:{} for monitoring", process.getName());
                         MONITORING_JDBC_STATE_STORE.deleteMonitoringEntity(process.getName(),
                                 EntityType.PROCESS.toString());
                         MONITORING_JDBC_STATE_STORE.deletePendingInstances(process.getName(), cluster.getName(),
@@ -364,6 +368,8 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList
                     // add Instances from last checked time to 10 minutes from now(some buffer for status check)
                     Date newCheckPointTime = new Date(now().getTime() + lookAheadWindowMillis);
                     addPendingEntityInstances(newCheckPointTime);
+                } else {
+                    LOG.debug("No entities present for sla monitoring.");
                 }
             } catch (Throwable e) {
                 LOG.error("Feed SLA monitoring failed: ", e);
@@ -450,7 +456,6 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList
                 LOG.trace("Checking instance availability status for entity:{}, cluster:{}, "
                         + "instanceTime:{}", entity.getName(), clusterName, nominalTime, entityType);
                 AbstractWorkflowEngine wfEngine = WorkflowEngineFactory.getWorkflowEngine();
-
                 InstancesResult instancesResult = wfEngine.getStatus(entity, nominalTime,
                         new Date(nominalTime.getTime() + 200), PROCESS_LIFE_CYCLE, false);
                 if (instancesResult.getInstances().length > 0) {
@@ -459,6 +464,9 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList
                                 entity.getName(), clusterName, nominalTime);
                         return true;
                     }
+                } else if ((System.currentTimeMillis() - nominalTime.getTime())/(1000*60*60*24) >= Integer.parseInt(
+                        RuntimeProperties.get().getProperty("workflow.history.expiration.period.days", "7"))) {
+                    return true;
                 }
                 return false;
             }
@@ -552,8 +560,8 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList
     public Set<SchedulableEntityInstance> getEntitySLAMissPendingAlerts(String entityName, String clusterName,
                                           Date start, Date end, String entityType) throws FalconException {
         Set<SchedulableEntityInstance> result = new HashSet<>();
-        List<Date> missingInstances = MONITORING_JDBC_STATE_STORE.getNominalInstances(entityName, clusterName,
-                entityType);
+        List<Date> missingInstances = MONITORING_JDBC_STATE_STORE.getNominalInstancesBetweenTimeRange(entityName,
+                clusterName, entityType, start, end);
         if (missingInstances == null){
             return result;
         }

http://git-wip-us.apache.org/repos/asf/falcon/blob/84186f09/src/conf/runtime.properties
----------------------------------------------------------------------
diff --git a/src/conf/runtime.properties b/src/conf/runtime.properties
index 013ac18..62d12d4 100644
--- a/src/conf/runtime.properties
+++ b/src/conf/runtime.properties
@@ -86,3 +86,6 @@ falcon.current.colo=local
 
 ### Timeout factor for processes ###
 instance.timeout.factor=5
+
+### Workflow expiration period for oozie ###
+workflow.history.expiration.period.days=7
\ No newline at end of file