You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by ro...@apache.org on 2014/04/23 18:45:17 UTC
git commit: OOZIE-1527 Fix scalability issues with coordinator
materialization (puru via rohini)
Repository: oozie
Updated Branches:
refs/heads/master c707867b7 -> 14599a0a5
OOZIE-1527 Fix scalability issues with coordinator materialization (puru via rohini)
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/14599a0a
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/14599a0a
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/14599a0a
Branch: refs/heads/master
Commit: 14599a0a5c6cde5209b5f99574d65860dd9bffef
Parents: c707867
Author: Rohini Palaniswamy <ro...@yahoo-inc.com>
Authored: Wed Apr 23 09:45:09 2014 -0700
Committer: Rohini Palaniswamy <ro...@yahoo-inc.com>
Committed: Wed Apr 23 09:45:09 2014 -0700
----------------------------------------------------------------------
.../org/apache/oozie/CoordinatorJobBean.java | 5 +-
.../CoordMaterializeTransitionXCommand.java | 65 +++++++++++++-
.../executor/jpa/CoordJobQueryExecutor.java | 16 +++-
.../CoordJobsToBeMaterializedJPAExecutor.java | 2 +-
.../service/CoordMaterializeTriggerService.java | 48 ++++------
core/src/main/resources/oozie-default.xml | 36 +++++---
.../TestCoordMaterializeTransitionXCommand.java | 92 +++++++++++++++++++-
...estCoordJobsToBeMaterializedJPAExecutor.java | 2 +
.../TestCoordMaterializeTriggerService.java | 83 ++++++++++++++++++
release-log.txt | 1 +
10 files changed, 297 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/oozie/blob/14599a0a/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java b/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java
index 4a2ea39..7915698 100644
--- a/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java
+++ b/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java
@@ -91,7 +91,7 @@ import org.json.simple.JSONObject;
@NamedQuery(name = "GET_COORD_JOB_ACTION_KILL", query = "select w.id, w.user, w.group, w.appName, w.statusStr from CoordinatorJobBean w where w.id = :id"),
- @NamedQuery(name = "GET_COORD_JOB_MATERIALIZE", query = "select w.id, w.user, w.group, w.appName, w.statusStr, w.frequency, w.matThrottling, w.timeOut, w.timeZone, w.startTimestamp, w.endTimestamp, w.pauseTimestamp, w.nextMaterializedTimestamp, w.lastActionTimestamp, w.lastActionNumber, w.doneMaterialization, w.bundleId, w.conf, w.jobXml, w.appNamespace from CoordinatorJobBean w where w.id = :id"),
+ @NamedQuery(name = "GET_COORD_JOB_MATERIALIZE", query = "select w.id, w.user, w.group, w.appName, w.statusStr, w.frequency, w.matThrottling, w.timeOut, w.timeZone, w.startTimestamp, w.endTimestamp, w.pauseTimestamp, w.nextMaterializedTimestamp, w.lastActionTimestamp, w.lastActionNumber, w.doneMaterialization, w.bundleId, w.conf, w.jobXml, w.appNamespace, w.timeUnitStr from CoordinatorJobBean w where w.id = :id"),
@NamedQuery(name = "GET_COORD_JOB_SUSPEND_KILL", query = "select w.id, w.user, w.group, w.appName, w.statusStr, w.bundleId, w.appNamespace, w.doneMaterialization from CoordinatorJobBean w where w.id = :id"),
@@ -103,8 +103,11 @@ import org.json.simple.JSONObject;
@NamedQuery(name = "GET_COORD_JOBS_COLUMNS", query = "select w.id, w.appName, w.statusStr, w.user, w.group, w.startTimestamp, w.endTimestamp, w.appPath, w.concurrency, w.frequency, w.lastActionTimestamp, w.nextMaterializedTimestamp, w.createdTimestamp, w.timeUnitStr, w.timeZone, w.timeOut from CoordinatorJobBean w order by w.createdTimestamp desc"),
+ //TODO need to remove.
@NamedQuery(name = "GET_COORD_JOBS_OLDER_THAN", query = "select OBJECT(w) from CoordinatorJobBean w where w.startTimestamp <= :matTime AND (w.statusStr = 'PREP' OR w.statusStr = 'RUNNING' or w.statusStr = 'RUNNINGWITHERROR') AND (w.nextMaterializedTimestamp < :matTime OR w.nextMaterializedTimestamp IS NULL) AND (w.nextMaterializedTimestamp IS NULL OR (w.endTimestamp > w.nextMaterializedTimestamp AND (w.pauseTimestamp IS NULL OR w.pauseTimestamp > w.nextMaterializedTimestamp))) order by w.lastModifiedTimestamp"),
+ @NamedQuery(name = "GET_COORD_JOBS_OLDER_FOR_MATERILZATION", query = "select w.id from CoordinatorJobBean w where w.startTimestamp <= :matTime AND (w.statusStr = 'PREP' OR w.statusStr = 'RUNNING' or w.statusStr = 'RUNNINGWITHERROR') AND (w.nextMaterializedTimestamp < :matTime OR w.nextMaterializedTimestamp IS NULL) AND (w.nextMaterializedTimestamp IS NULL OR (w.endTimestamp > w.nextMaterializedTimestamp AND (w.pauseTimestamp IS NULL OR w.pauseTimestamp > w.nextMaterializedTimestamp))) and w.matThrottling > ( select count(w.id) from CoordinatorActionBean a where a.jobId = w.id and a.statusStr = 'WAITING') order by w.lastModifiedTimestamp"),
+
@NamedQuery(name = "GET_COORD_JOBS_OLDER_THAN_STATUS", query = "select OBJECT(w) from CoordinatorJobBean w where w.statusStr = :status AND w.lastModifiedTimestamp <= :lastModTime order by w.lastModifiedTimestamp"),
@NamedQuery(name = "GET_COMPLETED_COORD_JOBS_OLDER_THAN_STATUS", query = "select OBJECT(w) from CoordinatorJobBean w where ( w.statusStr = 'SUCCEEDED' OR w.statusStr = 'FAILED' or w.statusStr = 'KILLED') AND w.lastModifiedTimestamp <= :lastModTime order by w.lastModifiedTimestamp"),
http://git-wip-us.apache.org/repos/asf/oozie/blob/14599a0a/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
index b483799..57cbb34 100644
--- a/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
@@ -45,6 +45,7 @@ import org.apache.oozie.executor.jpa.CoordActionsActiveCountJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.service.CoordMaterializeTriggerService;
import org.apache.oozie.service.EventHandlerService;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Service;
@@ -59,13 +60,14 @@ import org.apache.oozie.util.XConfiguration;
import org.apache.oozie.util.XmlUtils;
import org.apache.oozie.util.db.SLADbOperations;
import org.jdom.Element;
+import org.jdom.JDOMException;
/**
* Materialize actions for specified start and end time for coordinator job.
*/
@SuppressWarnings("deprecation")
public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCommand {
- private static final int LOOKAHEAD_WINDOW = 300; // We look ahead 5 minutes for materialization;
+
private JPAService jpaService = null;
private CoordinatorJobBean coordJob = null;
private String jobId = null;
@@ -74,6 +76,13 @@ public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCo
private final int materializationWindow;
private int lastActionNumber = 1; // over-ride by DB value
private CoordinatorJob.Status prevStatus = null;
+
+ static final private int lookAheadWindow = Services
+ .get()
+ .getConf()
+ .getInt(CoordMaterializeTriggerService.CONF_LOOKUP_INTERVAL,
+ CoordMaterializeTriggerService.CONF_LOOKUP_INTERVAL_DEFAULT);
+
/**
* Default MAX timeout in minutes, after which coordinator input check will timeout
*/
@@ -84,6 +93,7 @@ public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCo
*
* @param jobId coordinator job id
* @param materializationWindow materialization window to calculate end time
+ * @param lookahead window
*/
public CoordMaterializeTransitionXCommand(String jobId, int materializationWindow) {
super("coord_mater", "coord_mater", 1);
@@ -186,6 +196,7 @@ public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCo
startMatdTime = DateUtils.toDate(new Timestamp(startTimeMilli));
endMatdTime = DateUtils.toDate(new Timestamp(endTimeMilli));
+ endMatdTime = getMaterializationTimeForCatchUp(endMatdTime);
// if MaterializationWindow end time is greater than endTime
// for job, then set it to endTime of job
Date jobEndTime = coordJob.getEndTime();
@@ -197,6 +208,54 @@ public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCo
+ ", window=" + materializationWindow);
}
+ /**
+ * Get materialization for window for catch-up jobs. for current jobs,it reruns currentMatdate, For catch-up, end
+ * Mataterilized Time = startMatdTime + MatThrottling * frequency
+ *
+ * @param currentMatTime
+ * @return
+ * @throws CommandException
+ * @throws JDOMException
+ */
+ private Date getMaterializationTimeForCatchUp(Date currentMatTime) throws CommandException {
+ if (currentMatTime.after(new Date())) {
+ return currentMatTime;
+ }
+ int frequency = 0;
+ try {
+ frequency = Integer.parseInt(coordJob.getFrequency());
+ }
+ catch (NumberFormatException e) {
+ return currentMatTime;
+ }
+
+ TimeZone appTz = DateUtils.getTimeZone(coordJob.getTimeZone());
+ TimeUnit freqTU = TimeUnit.valueOf(coordJob.getTimeUnitStr());
+ Calendar startInstance = Calendar.getInstance(appTz);
+ startInstance.setTime(startMatdTime);
+ Calendar endMatInstance = null;
+ Calendar previousInstance = startInstance;
+ for (int i = 1; i <= coordJob.getMatThrottling(); i++) {
+ endMatInstance = (Calendar) startInstance.clone();
+ endMatInstance.add(freqTU.getCalendarUnit(), i * frequency);
+ if (endMatInstance.getTime().compareTo(new Date()) >= 0) {
+ if (previousInstance.after(currentMatTime)) {
+ return previousInstance.getTime();
+ }
+ else {
+ return currentMatTime;
+ }
+ }
+ previousInstance = endMatInstance;
+ }
+ if (endMatInstance == null) {
+ return currentMatTime;
+ }
+ else {
+ return endMatInstance.getTime();
+ }
+ }
+
/* (non-Javadoc)
* @see org.apache.oozie.command.XCommand#verifyPrecondition()
*/
@@ -223,7 +282,7 @@ public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCo
if (startTime == null) {
startTime = coordJob.getStartTimestamp();
- if (startTime.after(new Timestamp(System.currentTimeMillis() + LOOKAHEAD_WINDOW * 1000))) {
+ if (startTime.after(new Timestamp(System.currentTimeMillis() + lookAheadWindow * 1000))) {
throw new PreconditionException(ErrorCode.E1100, "CoordMaterializeTransitionXCommand for jobId="
+ jobId + " job's start time is not reached yet - nothing to materialize");
}
@@ -311,7 +370,7 @@ public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCo
TimeZone appTz = DateUtils.getTimeZone(coordJob.getTimeZone());
String frequency = coordJob.getFrequency();
- TimeUnit freqTU = TimeUnit.valueOf(eJob.getAttributeValue("freq_timeunit"));
+ TimeUnit freqTU = TimeUnit.valueOf(coordJob.getTimeUnitStr());
TimeUnit endOfFlag = TimeUnit.valueOf(eJob.getAttributeValue("end_of_duration"));
Calendar start = Calendar.getInstance(appTz);
start.setTime(startMatdTime);
http://git-wip-us.apache.org/repos/asf/oozie/blob/14599a0a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java
index 67a919d..1a6ded7 100644
--- a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java
+++ b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java
@@ -61,7 +61,8 @@ public class CoordJobQueryExecutor extends QueryExecutor<CoordinatorJobBean, Coo
GET_COORD_JOB_MATERIALIZE,
GET_COORD_JOB_SUSPEND_KILL,
GET_COORD_JOB_STATUS_PARENTID,
- GET_COORD_JOBS_CHANGED
+ GET_COORD_JOBS_CHANGED,
+ GET_COORD_JOBS_OLDER_FOR_MATERILZATION
};
private static CoordJobQueryExecutor instance = new CoordJobQueryExecutor();
@@ -208,6 +209,14 @@ public class CoordJobQueryExecutor extends QueryExecutor<CoordinatorJobBean, Coo
case GET_COORD_JOBS_CHANGED:
query.setParameter("lastModifiedTime", new Timestamp(((Date)parameters[0]).getTime()));
break;
+ case GET_COORD_JOBS_OLDER_FOR_MATERILZATION:
+ query.setParameter("matTime", new Timestamp(((Date)parameters[0]).getTime()));
+ int limit = (Integer) parameters[1];
+ if (limit > 0) {
+ query.setMaxResults(limit);
+ }
+ break;
+
default:
throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for "
+ namedQuery.name());
@@ -288,6 +297,7 @@ public class CoordJobQueryExecutor extends QueryExecutor<CoordinatorJobBean, Coo
bean.setConfBlob((StringBlob) arr[17]);
bean.setJobXmlBlob((StringBlob) arr[18]);
bean.setAppNamespace((String) arr[19]);
+ bean.setTimeUnitStr((String) arr[20]);
break;
case GET_COORD_JOB_SUSPEND_KILL:
bean = new CoordinatorJobBean();
@@ -311,6 +321,10 @@ public class CoordJobQueryExecutor extends QueryExecutor<CoordinatorJobBean, Coo
case GET_COORD_JOBS_CHANGED:
bean = (CoordinatorJobBean) ret;
break;
+ case GET_COORD_JOBS_OLDER_FOR_MATERILZATION:
+ bean = new CoordinatorJobBean();
+ bean.setId((String) ret);
+ break;
default:
throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct job bean for "
+ namedQuery.name());
http://git-wip-us.apache.org/repos/asf/oozie/blob/14599a0a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobsToBeMaterializedJPAExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobsToBeMaterializedJPAExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobsToBeMaterializedJPAExecutor.java
index ca11a24..40decff 100644
--- a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobsToBeMaterializedJPAExecutor.java
+++ b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobsToBeMaterializedJPAExecutor.java
@@ -55,7 +55,7 @@ public class CoordJobsToBeMaterializedJPAExecutor implements JPAExecutor<List<Co
public List<CoordinatorJobBean> execute(EntityManager em) throws JPAExecutorException {
List<CoordinatorJobBean> cjBeans;
try {
- Query q = em.createNamedQuery("GET_COORD_JOBS_OLDER_THAN");
+ Query q = em.createNamedQuery("GET_COORD_JOBS_OLDER_FOR_MATERILZATION");
q.setParameter("matTime", new Timestamp(this.dateInput.getTime()));
if (limit > 0) {
q.setMaxResults(limit);
http://git-wip-us.apache.org/repos/asf/oozie/blob/14599a0a/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java b/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java
index 550eb80..ef3c3f4 100644
--- a/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java
+++ b/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java
@@ -24,10 +24,9 @@ import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.CoordinatorJobBean;
import org.apache.oozie.command.coord.CoordMaterializeTransitionXCommand;
-import org.apache.oozie.executor.jpa.CoordActionsActiveCountJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
-import org.apache.oozie.executor.jpa.CoordJobsToBeMaterializedJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
import org.apache.oozie.util.XCallable;
import org.apache.oozie.util.XLog;
import org.apache.oozie.util.DateUtils;
@@ -43,6 +42,8 @@ public class CoordMaterializeTriggerService implements Service {
* Time interval, in seconds, at which the Job materialization service will be scheduled to run.
*/
public static final String CONF_LOOKUP_INTERVAL = CONF_PREFIX + "lookup.interval";
+
+ public static final String CONF_SCHEDULING_INTERVAL = CONF_PREFIX + "scheduling.interval";
/**
* This configuration defined the duration for which job should be materialized in future
*/
@@ -58,7 +59,8 @@ public class CoordMaterializeTriggerService implements Service {
private static final String INSTRUMENTATION_GROUP = "coord_job_mat";
private static final String INSTR_MAT_JOBS_COUNTER = "jobs";
- private static final int CONF_LOOKUP_INTERVAL_DEFAULT = 300;
+ public static final int CONF_LOOKUP_INTERVAL_DEFAULT = 300;
+ private static final int CONF_SCHEDULING_INTERVAL_DEFAULT = 300;
private static final int CONF_MATERIALIZATION_WINDOW_DEFAULT = 3600;
private static final int CONF_MATERIALIZATION_SYSTEM_LIMIT_DEFAULT = 50;
@@ -116,11 +118,7 @@ public class CoordMaterializeTriggerService implements Service {
// get list of all jobs that have actions that should be materialized.
int materializationLimit = Services.get().getConf()
.getInt(CONF_MATERIALIZATION_SYSTEM_LIMIT, CONF_MATERIALIZATION_SYSTEM_LIMIT_DEFAULT);
- // account for under-utilization of limit due to jobs maxed out
- // against mat_throttle. hence repeat
- if (materializeCoordJobs(currDate, materializationLimit, LOG)) {
- materializeCoordJobs(currDate, materializationLimit, LOG);
- }
+ materializeCoordJobs(currDate, materializationLimit, LOG);
}
catch (Exception ex) {
@@ -128,43 +126,27 @@ public class CoordMaterializeTriggerService implements Service {
}
}
- private boolean materializeCoordJobs(Date currDate, int limit, XLog LOG) {
+ private void materializeCoordJobs(Date currDate, int limit, XLog LOG) throws JPAExecutorException {
try {
- JPAService jpaService = Services.get().get(JPAService.class);
- CoordJobsToBeMaterializedJPAExecutor cmatcmd = new CoordJobsToBeMaterializedJPAExecutor(currDate, limit);
- List<CoordinatorJobBean> materializeJobs = jpaService.execute(cmatcmd);
- int rejected = 0;
- LOG.info("CoordMaterializeTriggerService - Curr Date= " + DateUtils.formatDateOozieTZ(currDate) + ", Num jobs to materialize = "
- + materializeJobs.size());
+ List<CoordinatorJobBean> materializeJobs = CoordJobQueryExecutor.getInstance().getList(
+ CoordJobQuery.GET_COORD_JOBS_OLDER_FOR_MATERILZATION, currDate, limit);
+ LOG.info("CoordMaterializeTriggerService - Curr Date= " + DateUtils.formatDateOozieTZ(currDate)
+ + ", Num jobs to materialize = " + materializeJobs.size());
for (CoordinatorJobBean coordJob : materializeJobs) {
if (Services.get().get(JobsConcurrencyService.class).isJobIdForThisServer(coordJob.getId())) {
Services.get().get(InstrumentationService.class).get()
.incr(INSTRUMENTATION_GROUP, INSTR_MAT_JOBS_COUNTER, 1);
- int numWaitingActions = jpaService.execute(new CoordActionsActiveCountJPAExecutor(coordJob
- .getId()));
- LOG.info("Job :" + coordJob.getId() + " numWaitingActions : " + numWaitingActions
- + " MatThrottle : " + coordJob.getMatThrottling());
- // update lastModifiedTime so next time others get picked up in LRU fashion
+ queueCallable(new CoordMaterializeTransitionXCommand(coordJob.getId(), materializationWindow));
coordJob.setLastModifiedTime(new Date());
+ // TODO In place of calling single query, we should call bulk update.
CoordJobQueryExecutor.getInstance().executeUpdate(
CoordJobQueryExecutor.CoordJobQuery.UPDATE_COORD_JOB_LAST_MODIFIED_TIME, coordJob);
- if (numWaitingActions >= coordJob.getMatThrottling()) {
- LOG.info("info for JobID [" + coordJob.getId() + "] " + numWaitingActions
- + " actions already waiting. MatThrottle is : " + coordJob.getMatThrottling());
- rejected++;
- continue;
- }
- queueCallable(new CoordMaterializeTransitionXCommand(coordJob.getId(), materializationWindow));
}
}
- if (materializeJobs.size() == limit && rejected > 0) {
- return true;
- }
}
catch (JPAExecutorException jex) {
LOG.warn("JPAExecutorException while attempting to materialize coordinator jobs", jex);
}
- return false;
}
/**
@@ -200,10 +182,12 @@ public class CoordMaterializeTriggerService implements Service {
int materializationWindow = conf.getInt(CONF_MATERIALIZATION_WINDOW, CONF_MATERIALIZATION_WINDOW_DEFAULT);
// default is 300sec (5min)
int lookupInterval = Services.get().getConf().getInt(CONF_LOOKUP_INTERVAL, CONF_LOOKUP_INTERVAL_DEFAULT);
+ // default is 300sec (5min)
+ int schedulingInterval = Services.get().getConf().getInt(CONF_SCHEDULING_INTERVAL, CONF_SCHEDULING_INTERVAL_DEFAULT);
Runnable lookupTriggerJobsRunnable = new CoordMaterializeTriggerRunnable(materializationWindow, lookupInterval);
- services.get(SchedulerService.class).schedule(lookupTriggerJobsRunnable, 10, lookupInterval,
+ services.get(SchedulerService.class).schedule(lookupTriggerJobsRunnable, 10, schedulingInterval,
SchedulerService.Unit.SEC);
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/14599a0a/core/src/main/resources/oozie-default.xml
----------------------------------------------------------------------
diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml
index 0198230..47fa0e4 100644
--- a/core/src/main/resources/oozie-default.xml
+++ b/core/src/main/resources/oozie-default.xml
@@ -408,21 +408,29 @@
<!-- CoordMaterializeTriggerService -->
- <property>
- <name>oozie.service.CoordMaterializeTriggerService.lookup.interval
- </name>
- <value>300</value>
- <description> Coordinator Job Lookup trigger command is scheduled at
- this "interval" (in seconds).</description>
- </property>
+ <property>
+ <name>oozie.service.CoordMaterializeTriggerService.lookup.interval
+ </name>
+ <value>300</value>
+ <description> Coordinator Job Lookup interval.(in seconds).
+ </description>
+ </property>
- <property>
- <name>oozie.service.CoordMaterializeTriggerService.materialization.window
- </name>
- <value>3600</value>
- <description> Coordinator Job Lookup command materialized each job for
- this next "window" duration</description>
- </property>
+ <property>
+ <name>oozie.service.CoordMaterializeTriggerService.scheduling.interval
+ </name>
+ <value>300</value>
+ <description> The frequency at which the CoordMaterializeTriggerService will run.</description>
+ </property>
+
+ <property>
+ <name>oozie.service.CoordMaterializeTriggerService.materialization.window
+ </name>
+ <value>3600</value>
+ <description> Coordinator Job Lookup command materialized each
+ job for this next "window" duration
+ </description>
+ </property>
<property>
<name>oozie.service.CoordMaterializeTriggerService.callable.batch.size</name>
http://git-wip-us.apache.org/repos/asf/oozie/blob/14599a0a/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java b/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java
index 6c73585..9a8d65b 100644
--- a/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java
@@ -18,7 +18,6 @@
package org.apache.oozie.command.coord;
import java.sql.Timestamp;
-import java.util.ArrayList;
import java.util.Date;
import java.util.List;
@@ -35,6 +34,8 @@ import org.apache.oozie.executor.jpa.CoordJobGetActionsJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetRunningActionsCountJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.executor.jpa.CoordJobGetActionsSubsetJPAExecutor;
import org.apache.oozie.executor.jpa.SLAEventsGetForSeqIdJPAExecutor;
@@ -473,6 +474,95 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
checkCoordJobs(job.getId(), CoordinatorJob.Status.PREP);
}
+ /**
+ * Test lookup materialization for catchup jobs
+ *
+ * @throws Exception
+ */
+ public void testMaterizationLookup() throws Exception {
+ long TIME_IN_MIN = 60 * 1000;
+ long TIME_IN_HOURS = TIME_IN_MIN * 60;
+ long TIME_IN_DAY = TIME_IN_HOURS * 24;
+ JPAService jpaService = Services.get().get(JPAService.class);
+ // test with days
+ Date startTime = DateUtils.parseDateOozieTZ("2009-02-01T01:00Z");
+ Date endTime = DateUtils.parseDateOozieTZ("2009-05-03T23:59Z");
+ CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, startTime, endTime, false, false,
+ 0);
+ job.setNextMaterializedTime(startTime);
+ job.setMatThrottling(3);
+ job.setFrequency("1");
+ job.setTimeUnitStr("DAY");
+ CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job);
+ new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
+ job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
+ assertEquals(new Date(startTime.getTime() + TIME_IN_DAY * 3), job.getNextMaterializedTime());
+
+ // test with hours
+ startTime = DateUtils.parseDateOozieTZ("2009-02-01T01:00Z");
+ endTime = DateUtils.parseDateOozieTZ("2009-05-03T23:59Z");
+ job = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, startTime, endTime, false, false, 0);
+ job.setNextMaterializedTime(startTime);
+ job.setMatThrottling(10);
+ job.setFrequency("1");
+ job.setTimeUnitStr("HOUR");
+ CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job);
+ new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
+ job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
+ assertEquals(new Date(startTime.getTime() + TIME_IN_HOURS * 10), job.getNextMaterializedTime());
+
+ // test with hours, time should not pass the current time.
+ startTime = new Date(new Date().getTime() - TIME_IN_DAY * 3);
+ endTime = new Date(startTime.getTime() + TIME_IN_DAY * 3);
+ job = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, startTime, endTime, false, false, 0);
+ job.setNextMaterializedTime(startTime);
+ job.setMatThrottling(10);
+ job.setFrequency("1");
+ job.setTimeUnitStr("DAY");
+ CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job);
+ new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
+ job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
+ assertEquals(new Date(startTime.getTime() + TIME_IN_DAY ), job.getNextMaterializedTime());
+
+ // test with hours, time should not pass the current time.
+ startTime = new Date(new Date().getTime());
+ endTime = new Date(startTime.getTime() + TIME_IN_DAY * 3);
+ job = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, startTime, endTime, false, false, 0);
+ job.setNextMaterializedTime(startTime);
+ job.setMatThrottling(10);
+ job.setFrequency("1");
+ job.setTimeUnitStr("DAY");
+ CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job);
+ new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
+ job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
+ assertEquals(new Date(startTime.getTime() + TIME_IN_DAY), job.getNextMaterializedTime());
+
+ // for current job in min, should not exceed hour windows
+ startTime = new Date(new Date().getTime());
+ endTime = new Date(startTime.getTime() + TIME_IN_HOURS * 24);
+ job = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, startTime, endTime, false, false, 0);
+ job.setMatThrottling(20);
+ job.setFrequency("5");
+ job.setTimeUnitStr("MINUTE");
+ CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job);
+ new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
+ job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
+ assertEquals(new Date(startTime.getTime() + TIME_IN_HOURS), job.getNextMaterializedTime());
+
+ // for current job in hour, should not exceed hour windows
+ startTime = new Date(new Date().getTime());
+ endTime = new Date(startTime.getTime() + TIME_IN_DAY * 24);
+ job = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, startTime, endTime, false, false, 0);
+ job.setMatThrottling(20);
+ job.setFrequency("1");
+ job.setTimeUnitStr("DAY");
+ CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job);
+ new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
+ job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
+ assertEquals(new Date(startTime.getTime() + TIME_IN_DAY), job.getNextMaterializedTime());
+
+ }
+
protected CoordinatorJobBean addRecordToCoordJobTable(CoordinatorJob.Status status, Date startTime, Date endTime,
Date pauseTime, String freq) throws Exception {
return addRecordToCoordJobTable(status, startTime, endTime, pauseTime, -1, freq);
http://git-wip-us.apache.org/repos/asf/oozie/blob/14599a0a/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobsToBeMaterializedJPAExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobsToBeMaterializedJPAExecutor.java b/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobsToBeMaterializedJPAExecutor.java
index 9c5217f..9e03928 100644
--- a/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobsToBeMaterializedJPAExecutor.java
+++ b/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobsToBeMaterializedJPAExecutor.java
@@ -118,6 +118,8 @@ public class TestCoordJobsToBeMaterializedJPAExecutor extends XFsTestCase {
coordJob.setFrequency("1");
coordJob.setExecutionOrder(Execution.FIFO);
coordJob.setConcurrency(1);
+ coordJob.setMatThrottling(1);
+
try {
coordJob.setStartTime(DateUtils.parseDateOozieTZ("2009-12-15T01:00Z"));
coordJob.setEndTime(DateUtils.parseDateOozieTZ("2009-12-17T01:00Z"));
http://git-wip-us.apache.org/repos/asf/oozie/blob/14599a0a/core/src/test/java/org/apache/oozie/service/TestCoordMaterializeTriggerService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestCoordMaterializeTriggerService.java b/core/src/test/java/org/apache/oozie/service/TestCoordMaterializeTriggerService.java
index 9d4bd3b..9e85da3 100644
--- a/core/src/test/java/org/apache/oozie/service/TestCoordMaterializeTriggerService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestCoordMaterializeTriggerService.java
@@ -20,6 +20,8 @@ package org.apache.oozie.service;
import java.io.IOException;
import java.io.Reader;
import java.util.Date;
+import java.util.List;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.oozie.CoordinatorJobBean;
@@ -27,9 +29,12 @@ import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.client.CoordinatorJob;
import org.apache.oozie.client.CoordinatorJob.Execution;
import org.apache.oozie.client.CoordinatorJob.Timeunit;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordJobGetActionsJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetRunningActionsCountJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
import org.apache.oozie.service.CoordMaterializeTriggerService.CoordMaterializeTriggerRunnable;
import org.apache.oozie.service.UUIDService.ApplicationType;
@@ -134,6 +139,84 @@ public class TestCoordMaterializeTriggerService extends XDataTestCase {
assertEquals(CoordinatorJob.Status.PREP, job3.getStatus());
}
+ public void testMaxMatThrottleNotPicked() throws Exception {
+ Services.get().destroy();
+ setSystemProperty(CoordMaterializeTriggerService.CONF_MATERIALIZATION_SYSTEM_LIMIT, "10");
+ services = new Services();
+ services.init();
+
+ Date start = new Date();
+ Date end = new Date(start.getTime() + 3600 * 5 * 1000);
+ CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, false, false, 1);
+ addRecordToCoordActionTable(job.getId(), 1, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0);
+ addRecordToCoordActionTable(job.getId(), 2, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0);
+ job.setMatThrottling(3);
+ CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job);
+ JPAService jpaService = Services.get().get(JPAService.class);
+ job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
+ Date lastModifiedDate = job.getLastModifiedTime();
+ Runnable runnable = new CoordMaterializeTriggerRunnable(3600, 300);
+ runnable.run();
+ sleep(1000);
+ job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
+ assertNotSame(lastModifiedDate, job.getLastModifiedTime());
+
+ job.setMatThrottling(2);
+ CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job);
+ job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
+ lastModifiedDate = job.getLastModifiedTime();
+ runnable.run();
+ sleep(1000);
+ job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
+ assertEquals(lastModifiedDate, job.getLastModifiedTime());
+ }
+
+ public void testMaxMatThrottleNotPickedMultipleJobs() throws Exception {
+ Services.get().destroy();
+ setSystemProperty(CoordMaterializeTriggerService.CONF_MATERIALIZATION_SYSTEM_LIMIT, "3");
+ services = new Services();
+ services.init();
+ Date start = new Date();
+ Date end = new Date(start.getTime() + 3600 * 5 * 1000);
+ CoordinatorJobBean job1 = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, false, false, 1);
+ addRecordToCoordActionTable(job1.getId(), 1, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0);
+ addRecordToCoordActionTable(job1.getId(), 2, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0);
+ job1.setMatThrottling(3);
+ CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job1);
+
+ CoordinatorJobBean job2 = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, false, false, 1);
+ addRecordToCoordActionTable(job2.getId(), 1, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0);
+ addRecordToCoordActionTable(job2.getId(), 2, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0);
+ job2.setMatThrottling(3);
+ CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job2);
+
+ CoordinatorJobBean job3 = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, false, false, 1);
+ addRecordToCoordActionTable(job3.getId(), 1, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0);
+ addRecordToCoordActionTable(job3.getId(), 2, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0);
+ job3.setMatThrottling(2);
+ CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job3);
+
+ JPAService jpaService = Services.get().get(JPAService.class);
+ job1 = jpaService.execute(new CoordJobGetJPAExecutor(job1.getId()));
+ Date lastModifiedDate1 = job1.getLastModifiedTime();
+ job2 = jpaService.execute(new CoordJobGetJPAExecutor(job2.getId()));
+ Date lastModifiedDate2 = job2.getLastModifiedTime();
+ job3 = jpaService.execute(new CoordJobGetJPAExecutor(job3.getId()));
+ Date lastModifiedDate3 = job3.getLastModifiedTime();
+
+
+ Runnable runnable = new CoordMaterializeTriggerRunnable(3600, 300);
+ runnable.run();
+ sleep(1000);
+
+ job1 = jpaService.execute(new CoordJobGetJPAExecutor(job1.getId()));
+ assertNotSame(lastModifiedDate1, job1.getLastModifiedTime());
+ job2 = jpaService.execute(new CoordJobGetJPAExecutor(job2.getId()));
+ assertNotSame(lastModifiedDate2, job2.getLastModifiedTime());
+ job3 = jpaService.execute(new CoordJobGetJPAExecutor(job3.getId()));
+ assertEquals(lastModifiedDate3, job3.getLastModifiedTime());
+ }
+
@Override
protected CoordinatorJobBean createCoordJob(CoordinatorJob.Status status, Date start, Date end, boolean pending, boolean doneMatd, int lastActionNum) throws Exception {
Path appPath = new Path(getFsTestCaseDir(), "coord");
http://git-wip-us.apache.org/repos/asf/oozie/blob/14599a0a/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index d2b7e44..0a58977 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
-- Oozie 4.1.0 release (trunk - unreleased)
+OOZIE-1527 Fix scalability issues with coordinator materialization (puru via rohini)
OOZIE-1797 Workflow rerun command should use existing workflow properties (puru via rohini)
OOZIE-1769 An option to update coord properties/definition (puru via rohini)
OOZIE-1796 Job status should not transition from KILLED (puru via rohini)