You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by ro...@apache.org on 2014/04/23 18:45:17 UTC

git commit: OOZIE-1527 Fix scalability issues with coordinator materialization (puru via rohini)

Repository: oozie
Updated Branches:
  refs/heads/master c707867b7 -> 14599a0a5


 OOZIE-1527 Fix scalability issues with coordinator materialization (puru via rohini)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/14599a0a
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/14599a0a
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/14599a0a

Branch: refs/heads/master
Commit: 14599a0a5c6cde5209b5f99574d65860dd9bffef
Parents: c707867
Author: Rohini Palaniswamy <ro...@yahoo-inc.com>
Authored: Wed Apr 23 09:45:09 2014 -0700
Committer: Rohini Palaniswamy <ro...@yahoo-inc.com>
Committed: Wed Apr 23 09:45:09 2014 -0700

----------------------------------------------------------------------
 .../org/apache/oozie/CoordinatorJobBean.java    |  5 +-
 .../CoordMaterializeTransitionXCommand.java     | 65 +++++++++++++-
 .../executor/jpa/CoordJobQueryExecutor.java     | 16 +++-
 .../CoordJobsToBeMaterializedJPAExecutor.java   |  2 +-
 .../service/CoordMaterializeTriggerService.java | 48 ++++------
 core/src/main/resources/oozie-default.xml       | 36 +++++---
 .../TestCoordMaterializeTransitionXCommand.java | 92 +++++++++++++++++++-
 ...estCoordJobsToBeMaterializedJPAExecutor.java |  2 +
 .../TestCoordMaterializeTriggerService.java     | 83 ++++++++++++++++++
 release-log.txt                                 |  1 +
 10 files changed, 297 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/14599a0a/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java b/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java
index 4a2ea39..7915698 100644
--- a/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java
+++ b/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java
@@ -91,7 +91,7 @@ import org.json.simple.JSONObject;
 
         @NamedQuery(name = "GET_COORD_JOB_ACTION_KILL", query = "select w.id, w.user, w.group, w.appName, w.statusStr from CoordinatorJobBean w where w.id = :id"),
 
-        @NamedQuery(name = "GET_COORD_JOB_MATERIALIZE", query = "select w.id, w.user, w.group, w.appName, w.statusStr, w.frequency, w.matThrottling, w.timeOut, w.timeZone, w.startTimestamp, w.endTimestamp, w.pauseTimestamp, w.nextMaterializedTimestamp, w.lastActionTimestamp, w.lastActionNumber, w.doneMaterialization, w.bundleId, w.conf, w.jobXml, w.appNamespace from CoordinatorJobBean w where w.id = :id"),
+        @NamedQuery(name = "GET_COORD_JOB_MATERIALIZE", query = "select w.id, w.user, w.group, w.appName, w.statusStr, w.frequency, w.matThrottling, w.timeOut, w.timeZone, w.startTimestamp, w.endTimestamp, w.pauseTimestamp, w.nextMaterializedTimestamp, w.lastActionTimestamp, w.lastActionNumber, w.doneMaterialization, w.bundleId, w.conf, w.jobXml, w.appNamespace, w.timeUnitStr from CoordinatorJobBean w where w.id = :id"),
 
         @NamedQuery(name = "GET_COORD_JOB_SUSPEND_KILL", query = "select w.id, w.user, w.group, w.appName, w.statusStr, w.bundleId, w.appNamespace, w.doneMaterialization from CoordinatorJobBean w where w.id = :id"),
 
@@ -103,8 +103,11 @@ import org.json.simple.JSONObject;
 
         @NamedQuery(name = "GET_COORD_JOBS_COLUMNS", query = "select w.id, w.appName, w.statusStr, w.user, w.group, w.startTimestamp, w.endTimestamp, w.appPath, w.concurrency, w.frequency, w.lastActionTimestamp, w.nextMaterializedTimestamp, w.createdTimestamp, w.timeUnitStr, w.timeZone, w.timeOut from CoordinatorJobBean w order by w.createdTimestamp desc"),
 
+        //TODO need to remove.
         @NamedQuery(name = "GET_COORD_JOBS_OLDER_THAN", query = "select OBJECT(w) from CoordinatorJobBean w where w.startTimestamp <= :matTime AND (w.statusStr = 'PREP' OR w.statusStr = 'RUNNING' or w.statusStr = 'RUNNINGWITHERROR') AND (w.nextMaterializedTimestamp < :matTime OR w.nextMaterializedTimestamp IS NULL) AND (w.nextMaterializedTimestamp IS NULL OR (w.endTimestamp > w.nextMaterializedTimestamp AND (w.pauseTimestamp IS NULL OR w.pauseTimestamp > w.nextMaterializedTimestamp))) order by w.lastModifiedTimestamp"),
 
+        @NamedQuery(name = "GET_COORD_JOBS_OLDER_FOR_MATERILZATION", query = "select w.id from CoordinatorJobBean w where w.startTimestamp <= :matTime AND (w.statusStr = 'PREP' OR w.statusStr = 'RUNNING' or w.statusStr = 'RUNNINGWITHERROR') AND (w.nextMaterializedTimestamp < :matTime OR w.nextMaterializedTimestamp IS NULL) AND (w.nextMaterializedTimestamp IS NULL OR (w.endTimestamp > w.nextMaterializedTimestamp AND (w.pauseTimestamp IS NULL OR w.pauseTimestamp > w.nextMaterializedTimestamp))) and w.matThrottling > ( select count(w.id) from CoordinatorActionBean a where a.jobId = w.id and a.statusStr = 'WAITING') order by w.lastModifiedTimestamp"),
+
         @NamedQuery(name = "GET_COORD_JOBS_OLDER_THAN_STATUS", query = "select OBJECT(w) from CoordinatorJobBean w where w.statusStr = :status AND w.lastModifiedTimestamp <= :lastModTime order by w.lastModifiedTimestamp"),
 
         @NamedQuery(name = "GET_COMPLETED_COORD_JOBS_OLDER_THAN_STATUS", query = "select OBJECT(w) from CoordinatorJobBean w where ( w.statusStr = 'SUCCEEDED' OR w.statusStr = 'FAILED' or w.statusStr = 'KILLED') AND w.lastModifiedTimestamp <= :lastModTime order by w.lastModifiedTimestamp"),

http://git-wip-us.apache.org/repos/asf/oozie/blob/14599a0a/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
index b483799..57cbb34 100644
--- a/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
@@ -45,6 +45,7 @@ import org.apache.oozie.executor.jpa.CoordActionsActiveCountJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
 import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.service.CoordMaterializeTriggerService;
 import org.apache.oozie.service.EventHandlerService;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Service;
@@ -59,13 +60,14 @@ import org.apache.oozie.util.XConfiguration;
 import org.apache.oozie.util.XmlUtils;
 import org.apache.oozie.util.db.SLADbOperations;
 import org.jdom.Element;
+import org.jdom.JDOMException;
 
 /**
  * Materialize actions for specified start and end time for coordinator job.
  */
 @SuppressWarnings("deprecation")
 public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCommand {
-    private static final int LOOKAHEAD_WINDOW = 300; // We look ahead 5 minutes for materialization;
+
     private JPAService jpaService = null;
     private CoordinatorJobBean coordJob = null;
     private String jobId = null;
@@ -74,6 +76,13 @@ public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCo
     private final int materializationWindow;
     private int lastActionNumber = 1; // over-ride by DB value
     private CoordinatorJob.Status prevStatus = null;
+
+    static final private int lookAheadWindow = Services
+            .get()
+            .getConf()
+            .getInt(CoordMaterializeTriggerService.CONF_LOOKUP_INTERVAL,
+                    CoordMaterializeTriggerService.CONF_LOOKUP_INTERVAL_DEFAULT);
+
     /**
      * Default MAX timeout in minutes, after which coordinator input check will timeout
      */
@@ -84,6 +93,7 @@ public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCo
      *
      * @param jobId coordinator job id
      * @param materializationWindow materialization window to calculate end time
+     * @param lookahead window
      */
     public CoordMaterializeTransitionXCommand(String jobId, int materializationWindow) {
         super("coord_mater", "coord_mater", 1);
@@ -186,6 +196,7 @@ public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCo
 
         startMatdTime = DateUtils.toDate(new Timestamp(startTimeMilli));
         endMatdTime = DateUtils.toDate(new Timestamp(endTimeMilli));
+        endMatdTime = getMaterializationTimeForCatchUp(endMatdTime);
         // if MaterializationWindow end time is greater than endTime
         // for job, then set it to endTime of job
         Date jobEndTime = coordJob.getEndTime();
@@ -197,6 +208,54 @@ public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCo
                 + ", window=" + materializationWindow);
     }
 
+    /**
+     * Get materialization for window for catch-up jobs. for current jobs,it reruns currentMatdate, For catch-up, end
+     * Mataterilized Time = startMatdTime + MatThrottling * frequency
+     *
+     * @param currentMatTime
+     * @return
+     * @throws CommandException
+     * @throws JDOMException
+     */
+    private Date getMaterializationTimeForCatchUp(Date currentMatTime) throws CommandException {
+        if (currentMatTime.after(new Date())) {
+            return currentMatTime;
+        }
+        int frequency = 0;
+        try {
+            frequency = Integer.parseInt(coordJob.getFrequency());
+        }
+        catch (NumberFormatException e) {
+            return currentMatTime;
+        }
+
+        TimeZone appTz = DateUtils.getTimeZone(coordJob.getTimeZone());
+        TimeUnit freqTU = TimeUnit.valueOf(coordJob.getTimeUnitStr());
+        Calendar startInstance = Calendar.getInstance(appTz);
+        startInstance.setTime(startMatdTime);
+        Calendar endMatInstance = null;
+        Calendar previousInstance = startInstance;
+        for (int i = 1; i <= coordJob.getMatThrottling(); i++) {
+            endMatInstance = (Calendar) startInstance.clone();
+            endMatInstance.add(freqTU.getCalendarUnit(), i * frequency);
+            if (endMatInstance.getTime().compareTo(new Date()) >= 0) {
+                if (previousInstance.after(currentMatTime)) {
+                    return previousInstance.getTime();
+                }
+                else {
+                    return currentMatTime;
+                }
+            }
+            previousInstance = endMatInstance;
+        }
+        if (endMatInstance == null) {
+            return currentMatTime;
+        }
+        else {
+            return endMatInstance.getTime();
+        }
+    }
+
     /* (non-Javadoc)
      * @see org.apache.oozie.command.XCommand#verifyPrecondition()
      */
@@ -223,7 +282,7 @@ public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCo
         if (startTime == null) {
             startTime = coordJob.getStartTimestamp();
 
-            if (startTime.after(new Timestamp(System.currentTimeMillis() + LOOKAHEAD_WINDOW * 1000))) {
+            if (startTime.after(new Timestamp(System.currentTimeMillis() + lookAheadWindow * 1000))) {
                 throw new PreconditionException(ErrorCode.E1100, "CoordMaterializeTransitionXCommand for jobId="
                         + jobId + " job's start time is not reached yet - nothing to materialize");
             }
@@ -311,7 +370,7 @@ public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCo
         TimeZone appTz = DateUtils.getTimeZone(coordJob.getTimeZone());
 
         String frequency = coordJob.getFrequency();
-        TimeUnit freqTU = TimeUnit.valueOf(eJob.getAttributeValue("freq_timeunit"));
+        TimeUnit freqTU = TimeUnit.valueOf(coordJob.getTimeUnitStr());
         TimeUnit endOfFlag = TimeUnit.valueOf(eJob.getAttributeValue("end_of_duration"));
         Calendar start = Calendar.getInstance(appTz);
         start.setTime(startMatdTime);

http://git-wip-us.apache.org/repos/asf/oozie/blob/14599a0a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java
index 67a919d..1a6ded7 100644
--- a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java
+++ b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java
@@ -61,7 +61,8 @@ public class CoordJobQueryExecutor extends QueryExecutor<CoordinatorJobBean, Coo
         GET_COORD_JOB_MATERIALIZE,
         GET_COORD_JOB_SUSPEND_KILL,
         GET_COORD_JOB_STATUS_PARENTID,
-        GET_COORD_JOBS_CHANGED
+        GET_COORD_JOBS_CHANGED,
+        GET_COORD_JOBS_OLDER_FOR_MATERILZATION
     };
 
     private static CoordJobQueryExecutor instance = new CoordJobQueryExecutor();
@@ -208,6 +209,14 @@ public class CoordJobQueryExecutor extends QueryExecutor<CoordinatorJobBean, Coo
             case GET_COORD_JOBS_CHANGED:
                 query.setParameter("lastModifiedTime", new Timestamp(((Date)parameters[0]).getTime()));
                 break;
+            case GET_COORD_JOBS_OLDER_FOR_MATERILZATION:
+                query.setParameter("matTime", new Timestamp(((Date)parameters[0]).getTime()));
+                int limit = (Integer) parameters[1];
+                if (limit > 0) {
+                    query.setMaxResults(limit);
+                }
+                break;
+
             default:
                 throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for "
                         + namedQuery.name());
@@ -288,6 +297,7 @@ public class CoordJobQueryExecutor extends QueryExecutor<CoordinatorJobBean, Coo
                 bean.setConfBlob((StringBlob) arr[17]);
                 bean.setJobXmlBlob((StringBlob) arr[18]);
                 bean.setAppNamespace((String) arr[19]);
+                bean.setTimeUnitStr((String) arr[20]);
                 break;
             case GET_COORD_JOB_SUSPEND_KILL:
                 bean = new CoordinatorJobBean();
@@ -311,6 +321,10 @@ public class CoordJobQueryExecutor extends QueryExecutor<CoordinatorJobBean, Coo
             case GET_COORD_JOBS_CHANGED:
                 bean = (CoordinatorJobBean) ret;
                 break;
+            case GET_COORD_JOBS_OLDER_FOR_MATERILZATION:
+                bean = new CoordinatorJobBean();
+                bean.setId((String) ret);
+                break;
             default:
                 throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct job bean for "
                         + namedQuery.name());

http://git-wip-us.apache.org/repos/asf/oozie/blob/14599a0a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobsToBeMaterializedJPAExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobsToBeMaterializedJPAExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobsToBeMaterializedJPAExecutor.java
index ca11a24..40decff 100644
--- a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobsToBeMaterializedJPAExecutor.java
+++ b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobsToBeMaterializedJPAExecutor.java
@@ -55,7 +55,7 @@ public class CoordJobsToBeMaterializedJPAExecutor implements JPAExecutor<List<Co
     public List<CoordinatorJobBean> execute(EntityManager em) throws JPAExecutorException {
         List<CoordinatorJobBean> cjBeans;
         try {
-            Query q = em.createNamedQuery("GET_COORD_JOBS_OLDER_THAN");
+            Query q = em.createNamedQuery("GET_COORD_JOBS_OLDER_FOR_MATERILZATION");
             q.setParameter("matTime", new Timestamp(this.dateInput.getTime()));
             if (limit > 0) {
                 q.setMaxResults(limit);

http://git-wip-us.apache.org/repos/asf/oozie/blob/14599a0a/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java b/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java
index 550eb80..ef3c3f4 100644
--- a/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java
+++ b/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java
@@ -24,10 +24,9 @@ import java.util.List;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.oozie.CoordinatorJobBean;
 import org.apache.oozie.command.coord.CoordMaterializeTransitionXCommand;
-import org.apache.oozie.executor.jpa.CoordActionsActiveCountJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
-import org.apache.oozie.executor.jpa.CoordJobsToBeMaterializedJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
 import org.apache.oozie.util.XCallable;
 import org.apache.oozie.util.XLog;
 import org.apache.oozie.util.DateUtils;
@@ -43,6 +42,8 @@ public class CoordMaterializeTriggerService implements Service {
      * Time interval, in seconds, at which the Job materialization service will be scheduled to run.
      */
     public static final String CONF_LOOKUP_INTERVAL = CONF_PREFIX + "lookup.interval";
+
+    public static final String CONF_SCHEDULING_INTERVAL = CONF_PREFIX + "scheduling.interval";
     /**
      * This configuration defined the duration for which job should be materialized in future
      */
@@ -58,7 +59,8 @@ public class CoordMaterializeTriggerService implements Service {
 
     private static final String INSTRUMENTATION_GROUP = "coord_job_mat";
     private static final String INSTR_MAT_JOBS_COUNTER = "jobs";
-    private static final int CONF_LOOKUP_INTERVAL_DEFAULT = 300;
+    public static final int CONF_LOOKUP_INTERVAL_DEFAULT = 300;
+    private static final int CONF_SCHEDULING_INTERVAL_DEFAULT = 300;
     private static final int CONF_MATERIALIZATION_WINDOW_DEFAULT = 3600;
     private static final int CONF_MATERIALIZATION_SYSTEM_LIMIT_DEFAULT = 50;
 
@@ -116,11 +118,7 @@ public class CoordMaterializeTriggerService implements Service {
                 // get list of all jobs that have actions that should be materialized.
                 int materializationLimit = Services.get().getConf()
                         .getInt(CONF_MATERIALIZATION_SYSTEM_LIMIT, CONF_MATERIALIZATION_SYSTEM_LIMIT_DEFAULT);
-                // account for under-utilization of limit due to jobs maxed out
-                // against mat_throttle. hence repeat
-                if (materializeCoordJobs(currDate, materializationLimit, LOG)) {
-                    materializeCoordJobs(currDate, materializationLimit, LOG);
-                }
+                materializeCoordJobs(currDate, materializationLimit, LOG);
             }
 
             catch (Exception ex) {
@@ -128,43 +126,27 @@ public class CoordMaterializeTriggerService implements Service {
             }
         }
 
-        private boolean materializeCoordJobs(Date currDate, int limit, XLog LOG) {
+        private void materializeCoordJobs(Date currDate, int limit, XLog LOG) throws JPAExecutorException {
             try {
-                JPAService jpaService = Services.get().get(JPAService.class);
-                CoordJobsToBeMaterializedJPAExecutor cmatcmd = new CoordJobsToBeMaterializedJPAExecutor(currDate, limit);
-                List<CoordinatorJobBean> materializeJobs = jpaService.execute(cmatcmd);
-                int rejected = 0;
-                LOG.info("CoordMaterializeTriggerService - Curr Date= " + DateUtils.formatDateOozieTZ(currDate)  + ", Num jobs to materialize = "
-                        + materializeJobs.size());
+                List<CoordinatorJobBean> materializeJobs = CoordJobQueryExecutor.getInstance().getList(
+                        CoordJobQuery.GET_COORD_JOBS_OLDER_FOR_MATERILZATION, currDate, limit);
+                LOG.info("CoordMaterializeTriggerService - Curr Date= " + DateUtils.formatDateOozieTZ(currDate)
+                        + ", Num jobs to materialize = " + materializeJobs.size());
                 for (CoordinatorJobBean coordJob : materializeJobs) {
                     if (Services.get().get(JobsConcurrencyService.class).isJobIdForThisServer(coordJob.getId())) {
                         Services.get().get(InstrumentationService.class).get()
                                 .incr(INSTRUMENTATION_GROUP, INSTR_MAT_JOBS_COUNTER, 1);
-                        int numWaitingActions = jpaService.execute(new CoordActionsActiveCountJPAExecutor(coordJob
-                                .getId()));
-                        LOG.info("Job :" + coordJob.getId() + "  numWaitingActions : " + numWaitingActions
-                                + " MatThrottle : " + coordJob.getMatThrottling());
-                        // update lastModifiedTime so next time others get picked up in LRU fashion
+                        queueCallable(new CoordMaterializeTransitionXCommand(coordJob.getId(), materializationWindow));
                         coordJob.setLastModifiedTime(new Date());
+                        // TODO In place of calling single query, we should call bulk update.
                         CoordJobQueryExecutor.getInstance().executeUpdate(
                                 CoordJobQueryExecutor.CoordJobQuery.UPDATE_COORD_JOB_LAST_MODIFIED_TIME, coordJob);
-                        if (numWaitingActions >= coordJob.getMatThrottling()) {
-                            LOG.info("info for JobID [" + coordJob.getId() + "] " + numWaitingActions
-                                    + " actions already waiting. MatThrottle is : " + coordJob.getMatThrottling());
-                            rejected++;
-                            continue;
-                        }
-                        queueCallable(new CoordMaterializeTransitionXCommand(coordJob.getId(), materializationWindow));
                     }
                 }
-                if (materializeJobs.size() == limit && rejected > 0) {
-                    return true;
-                }
             }
             catch (JPAExecutorException jex) {
                 LOG.warn("JPAExecutorException while attempting to materialize coordinator jobs", jex);
             }
-            return false;
         }
 
         /**
@@ -200,10 +182,12 @@ public class CoordMaterializeTriggerService implements Service {
         int materializationWindow = conf.getInt(CONF_MATERIALIZATION_WINDOW, CONF_MATERIALIZATION_WINDOW_DEFAULT);
         // default is 300sec (5min)
         int lookupInterval = Services.get().getConf().getInt(CONF_LOOKUP_INTERVAL, CONF_LOOKUP_INTERVAL_DEFAULT);
+        // default is 300sec (5min)
+        int schedulingInterval = Services.get().getConf().getInt(CONF_SCHEDULING_INTERVAL, CONF_SCHEDULING_INTERVAL_DEFAULT);
 
         Runnable lookupTriggerJobsRunnable = new CoordMaterializeTriggerRunnable(materializationWindow, lookupInterval);
 
-        services.get(SchedulerService.class).schedule(lookupTriggerJobsRunnable, 10, lookupInterval,
+        services.get(SchedulerService.class).schedule(lookupTriggerJobsRunnable, 10, schedulingInterval,
                                                       SchedulerService.Unit.SEC);
     }
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/14599a0a/core/src/main/resources/oozie-default.xml
----------------------------------------------------------------------
diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml
index 0198230..47fa0e4 100644
--- a/core/src/main/resources/oozie-default.xml
+++ b/core/src/main/resources/oozie-default.xml
@@ -408,21 +408,29 @@
 
     <!--  CoordMaterializeTriggerService -->
 
-	<property>
-		<name>oozie.service.CoordMaterializeTriggerService.lookup.interval
-		</name>
-		<value>300</value>
-		<description> Coordinator Job Lookup trigger command is scheduled at
-			this "interval" (in seconds).</description>
-	</property>
+    <property>
+        <name>oozie.service.CoordMaterializeTriggerService.lookup.interval
+        </name>
+        <value>300</value>
+        <description> Coordinator Job Lookup interval.(in seconds).
+        </description>
+    </property>
 
-	<property>
-		<name>oozie.service.CoordMaterializeTriggerService.materialization.window
-		</name>
-		<value>3600</value>
-		<description> Coordinator Job Lookup command materialized each job for
-			this next "window" duration</description>
-	</property>
+    <property>
+        <name>oozie.service.CoordMaterializeTriggerService.scheduling.interval
+        </name>
+        <value>300</value>
+        <description> The frequency at which the CoordMaterializeTriggerService will run.</description>
+    </property>
+
+    <property>
+        <name>oozie.service.CoordMaterializeTriggerService.materialization.window
+        </name>
+        <value>3600</value>
+        <description> Coordinator Job Lookup command materialized each
+            job for this next "window" duration
+        </description>
+    </property>
 
     <property>
         <name>oozie.service.CoordMaterializeTriggerService.callable.batch.size</name>

http://git-wip-us.apache.org/repos/asf/oozie/blob/14599a0a/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java b/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java
index 6c73585..9a8d65b 100644
--- a/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java
@@ -18,7 +18,6 @@
 package org.apache.oozie.command.coord;
 
 import java.sql.Timestamp;
-import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
 
@@ -35,6 +34,8 @@ import org.apache.oozie.executor.jpa.CoordJobGetActionsJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetRunningActionsCountJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.executor.jpa.CoordJobGetActionsSubsetJPAExecutor;
 import org.apache.oozie.executor.jpa.SLAEventsGetForSeqIdJPAExecutor;
@@ -473,6 +474,95 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
         checkCoordJobs(job.getId(), CoordinatorJob.Status.PREP);
     }
 
+    /**
+     * Test lookup materialization for catchup jobs
+     *
+     * @throws Exception
+     */
+    public void testMaterizationLookup() throws Exception {
+        long TIME_IN_MIN = 60 * 1000;
+        long TIME_IN_HOURS = TIME_IN_MIN * 60;
+        long TIME_IN_DAY = TIME_IN_HOURS * 24;
+        JPAService jpaService = Services.get().get(JPAService.class);
+        // test with days
+        Date startTime = DateUtils.parseDateOozieTZ("2009-02-01T01:00Z");
+        Date endTime = DateUtils.parseDateOozieTZ("2009-05-03T23:59Z");
+        CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, startTime, endTime, false, false,
+                0);
+        job.setNextMaterializedTime(startTime);
+        job.setMatThrottling(3);
+        job.setFrequency("1");
+        job.setTimeUnitStr("DAY");
+        CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job);
+        new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
+        job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
+        assertEquals(new Date(startTime.getTime() + TIME_IN_DAY * 3), job.getNextMaterializedTime());
+
+        // test with hours
+        startTime = DateUtils.parseDateOozieTZ("2009-02-01T01:00Z");
+        endTime = DateUtils.parseDateOozieTZ("2009-05-03T23:59Z");
+        job = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, startTime, endTime, false, false, 0);
+        job.setNextMaterializedTime(startTime);
+        job.setMatThrottling(10);
+        job.setFrequency("1");
+        job.setTimeUnitStr("HOUR");
+        CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job);
+        new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
+        job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
+        assertEquals(new Date(startTime.getTime() + TIME_IN_HOURS * 10), job.getNextMaterializedTime());
+
+        // test with hours, time should not pass the current time.
+        startTime = new Date(new Date().getTime() - TIME_IN_DAY * 3);
+        endTime = new Date(startTime.getTime() + TIME_IN_DAY * 3);
+        job = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, startTime, endTime, false, false, 0);
+        job.setNextMaterializedTime(startTime);
+        job.setMatThrottling(10);
+        job.setFrequency("1");
+        job.setTimeUnitStr("DAY");
+        CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job);
+        new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
+        job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
+        assertEquals(new Date(startTime.getTime() + TIME_IN_DAY ), job.getNextMaterializedTime());
+
+        // test with hours, time should not pass the current time.
+        startTime = new Date(new Date().getTime());
+        endTime = new Date(startTime.getTime() + TIME_IN_DAY * 3);
+        job = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, startTime, endTime, false, false, 0);
+        job.setNextMaterializedTime(startTime);
+        job.setMatThrottling(10);
+        job.setFrequency("1");
+        job.setTimeUnitStr("DAY");
+        CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job);
+        new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
+        job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
+        assertEquals(new Date(startTime.getTime() + TIME_IN_DAY), job.getNextMaterializedTime());
+
+        // for current job in min, should not exceed hour windows
+        startTime = new Date(new Date().getTime());
+        endTime = new Date(startTime.getTime() + TIME_IN_HOURS * 24);
+        job = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, startTime, endTime, false, false, 0);
+        job.setMatThrottling(20);
+        job.setFrequency("5");
+        job.setTimeUnitStr("MINUTE");
+        CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job);
+        new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
+        job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
+        assertEquals(new Date(startTime.getTime() + TIME_IN_HOURS), job.getNextMaterializedTime());
+
+        // for current job in hour, should not exceed hour windows
+        startTime = new Date(new Date().getTime());
+        endTime = new Date(startTime.getTime() + TIME_IN_DAY * 24);
+        job = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, startTime, endTime, false, false, 0);
+        job.setMatThrottling(20);
+        job.setFrequency("1");
+        job.setTimeUnitStr("DAY");
+        CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job);
+        new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
+        job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
+        assertEquals(new Date(startTime.getTime() + TIME_IN_DAY), job.getNextMaterializedTime());
+
+    }
+
     protected CoordinatorJobBean addRecordToCoordJobTable(CoordinatorJob.Status status, Date startTime, Date endTime,
             Date pauseTime, String freq) throws Exception {
         return addRecordToCoordJobTable(status, startTime, endTime, pauseTime, -1, freq);

http://git-wip-us.apache.org/repos/asf/oozie/blob/14599a0a/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobsToBeMaterializedJPAExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobsToBeMaterializedJPAExecutor.java b/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobsToBeMaterializedJPAExecutor.java
index 9c5217f..9e03928 100644
--- a/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobsToBeMaterializedJPAExecutor.java
+++ b/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobsToBeMaterializedJPAExecutor.java
@@ -118,6 +118,8 @@ public class TestCoordJobsToBeMaterializedJPAExecutor extends XFsTestCase {
         coordJob.setFrequency("1");
         coordJob.setExecutionOrder(Execution.FIFO);
         coordJob.setConcurrency(1);
+        coordJob.setMatThrottling(1);
+
         try {
             coordJob.setStartTime(DateUtils.parseDateOozieTZ("2009-12-15T01:00Z"));
             coordJob.setEndTime(DateUtils.parseDateOozieTZ("2009-12-17T01:00Z"));

http://git-wip-us.apache.org/repos/asf/oozie/blob/14599a0a/core/src/test/java/org/apache/oozie/service/TestCoordMaterializeTriggerService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestCoordMaterializeTriggerService.java b/core/src/test/java/org/apache/oozie/service/TestCoordMaterializeTriggerService.java
index 9d4bd3b..9e85da3 100644
--- a/core/src/test/java/org/apache/oozie/service/TestCoordMaterializeTriggerService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestCoordMaterializeTriggerService.java
@@ -20,6 +20,8 @@ package org.apache.oozie.service;
 import java.io.IOException;
 import java.io.Reader;
 import java.util.Date;
+import java.util.List;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.oozie.CoordinatorJobBean;
@@ -27,9 +29,12 @@ import org.apache.oozie.client.CoordinatorAction;
 import org.apache.oozie.client.CoordinatorJob;
 import org.apache.oozie.client.CoordinatorJob.Execution;
 import org.apache.oozie.client.CoordinatorJob.Timeunit;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordJobGetActionsJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetRunningActionsCountJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
 import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
 import org.apache.oozie.service.CoordMaterializeTriggerService.CoordMaterializeTriggerRunnable;
 import org.apache.oozie.service.UUIDService.ApplicationType;
@@ -134,6 +139,84 @@ public class TestCoordMaterializeTriggerService extends XDataTestCase {
         assertEquals(CoordinatorJob.Status.PREP, job3.getStatus());
     }
 
+    public void testMaxMatThrottleNotPicked() throws Exception {
+        Services.get().destroy();
+        setSystemProperty(CoordMaterializeTriggerService.CONF_MATERIALIZATION_SYSTEM_LIMIT, "10");
+        services = new Services();
+        services.init();
+
+        Date start = new Date();
+        Date end = new Date(start.getTime() + 3600 * 5 * 1000);
+        CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, false, false, 1);
+        addRecordToCoordActionTable(job.getId(), 1, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0);
+        addRecordToCoordActionTable(job.getId(), 2, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0);
+        job.setMatThrottling(3);
+        CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job);
+        JPAService jpaService = Services.get().get(JPAService.class);
+        job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
+        Date lastModifiedDate = job.getLastModifiedTime();
+        Runnable runnable = new CoordMaterializeTriggerRunnable(3600, 300);
+        runnable.run();
+        sleep(1000);
+        job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
+        assertNotSame(lastModifiedDate, job.getLastModifiedTime());
+
+        job.setMatThrottling(2);
+        CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job);
+        job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
+        lastModifiedDate = job.getLastModifiedTime();
+        runnable.run();
+        sleep(1000);
+        job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
+        assertEquals(lastModifiedDate, job.getLastModifiedTime());
+    }
+
+    public void testMaxMatThrottleNotPickedMultipleJobs() throws Exception {
+        Services.get().destroy();
+        setSystemProperty(CoordMaterializeTriggerService.CONF_MATERIALIZATION_SYSTEM_LIMIT, "3");
+        services = new Services();
+        services.init();
+        Date start = new Date();
+        Date end = new Date(start.getTime() + 3600 * 5 * 1000);
+        CoordinatorJobBean job1 = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, false, false, 1);
+        addRecordToCoordActionTable(job1.getId(), 1, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0);
+        addRecordToCoordActionTable(job1.getId(), 2, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0);
+        job1.setMatThrottling(3);
+        CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job1);
+
+        CoordinatorJobBean job2 = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, false, false, 1);
+        addRecordToCoordActionTable(job2.getId(), 1, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0);
+        addRecordToCoordActionTable(job2.getId(), 2, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0);
+        job2.setMatThrottling(3);
+        CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job2);
+
+        CoordinatorJobBean job3 = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, false, false, 1);
+        addRecordToCoordActionTable(job3.getId(), 1, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0);
+        addRecordToCoordActionTable(job3.getId(), 2, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0);
+        job3.setMatThrottling(2);
+        CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job3);
+
+        JPAService jpaService = Services.get().get(JPAService.class);
+        job1 = jpaService.execute(new CoordJobGetJPAExecutor(job1.getId()));
+        Date lastModifiedDate1 = job1.getLastModifiedTime();
+        job2 = jpaService.execute(new CoordJobGetJPAExecutor(job2.getId()));
+        Date lastModifiedDate2 = job2.getLastModifiedTime();
+        job3 = jpaService.execute(new CoordJobGetJPAExecutor(job3.getId()));
+        Date lastModifiedDate3 = job3.getLastModifiedTime();
+
+
+        Runnable runnable = new CoordMaterializeTriggerRunnable(3600, 300);
+        runnable.run();
+        sleep(1000);
+
+        job1 = jpaService.execute(new CoordJobGetJPAExecutor(job1.getId()));
+        assertNotSame(lastModifiedDate1, job1.getLastModifiedTime());
+        job2 = jpaService.execute(new CoordJobGetJPAExecutor(job2.getId()));
+        assertNotSame(lastModifiedDate2, job2.getLastModifiedTime());
+        job3 = jpaService.execute(new CoordJobGetJPAExecutor(job3.getId()));
+        assertEquals(lastModifiedDate3, job3.getLastModifiedTime());
+    }
+
     @Override
     protected CoordinatorJobBean createCoordJob(CoordinatorJob.Status status, Date start, Date end, boolean pending, boolean doneMatd, int lastActionNum) throws Exception {
         Path appPath = new Path(getFsTestCaseDir(), "coord");

http://git-wip-us.apache.org/repos/asf/oozie/blob/14599a0a/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index d2b7e44..0a58977 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.1.0 release (trunk - unreleased)
 
+OOZIE-1527 Fix scalability issues with coordinator materialization (puru via rohini)
 OOZIE-1797 Workflow rerun command should use existing workflow properties (puru via rohini)
 OOZIE-1769 An option to update coord properties/definition (puru via rohini)
 OOZIE-1796 Job status should not transition from KILLED (puru via rohini)