You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by mo...@apache.org on 2013/12/31 01:34:41 UTC
git commit: OOZIE-1632 Coordinators that undergo change endtime but
are doneMaterialization, not getting picked for StatusTransit (mona)
Updated Branches:
refs/heads/master 8ca266fac -> 6c69089fd
OOZIE-1632 Coordinators that undergo change endtime but are doneMaterialization, not getting picked for StatusTransit (mona)
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/6c69089f
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/6c69089f
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/6c69089f
Branch: refs/heads/master
Commit: 6c69089fde5aeaffc090b0d42ff5345a1d568651
Parents: 8ca266f
Author: Mona Chitnis <ch...@yahoo-inc.com>
Authored: Mon Dec 30 16:34:11 2013 -0800
Committer: Mona Chitnis <ch...@yahoo-inc.com>
Committed: Mon Dec 30 16:34:11 2013 -0800
----------------------------------------------------------------------
.../org/apache/oozie/CoordinatorJobBean.java | 2 +
.../command/coord/CoordChangeXCommand.java | 36 +++--
.../executor/jpa/CoordActionQueryExecutor.java | 34 ++++-
...ActionsGetByLastModifiedTimeJPAExecutor.java | 68 ----------
.../executor/jpa/CoordJobQueryExecutor.java | 11 +-
.../oozie/service/StatusTransitService.java | 21 +--
.../command/coord/TestCoordChangeXCommand.java | 132 ++++++++++++++++++-
...dActionGetByLastModifiedTimeJPAExecutor.java | 80 -----------
.../executor/jpa/TestCoordJobQueryExecutor.java | 24 +++-
release-log.txt | 1 +
10 files changed, 233 insertions(+), 176 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/oozie/blob/6c69089f/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java b/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java
index f189b69..16209ce 100644
--- a/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java
+++ b/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java
@@ -97,6 +97,8 @@ import org.json.simple.JSONObject;
@NamedQuery(name = "GET_COORD_JOBS_PENDING", query = "select OBJECT(w) from CoordinatorJobBean w where w.pending = 1 order by w.lastModifiedTimestamp"),
+ @NamedQuery(name = "GET_COORD_JOBS_CHANGED", query = "select OBJECT(w) from CoordinatorJobBean w where w.pending = 1 AND w.doneMaterialization = 1 AND w.lastModifiedTimestamp >= :lastModifiedTime"),
+
@NamedQuery(name = "GET_COORD_JOBS_COUNT", query = "select count(w) from CoordinatorJobBean w"),
@NamedQuery(name = "GET_COORD_JOBS_COLUMNS", query = "select w.id, w.appName, w.statusStr, w.user, w.group, w.startTimestamp, w.endTimestamp, w.appPath, w.concurrency, w.frequency, w.lastActionTimestamp, w.nextMaterializedTimestamp, w.createdTimestamp, w.timeUnitStr, w.timeZone, w.timeOut from CoordinatorJobBean w order by w.createdTimestamp desc"),
http://git-wip-us.apache.org/repos/asf/oozie/blob/6c69089f/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java
index 65685db..4957330 100644
--- a/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java
@@ -325,19 +325,30 @@ public class CoordChangeXCommand extends CoordinatorXCommand<Void> {
try {
if (newEndTime != null) {
- coordJob.setEndTime(newEndTime);
- if (coordJob.getStatus() == CoordinatorJob.Status.SUCCEEDED){
- coordJob.setStatus(CoordinatorJob.Status.RUNNING);
- }
- if (coordJob.getStatus() == CoordinatorJob.Status.DONEWITHERROR
- || coordJob.getStatus() == CoordinatorJob.Status.FAILED) {
- // Check for backward compatibility for Oozie versions (3.2 and before)
- // when RUNNINGWITHERROR, SUSPENDEDWITHERROR and
- // PAUSEDWITHERROR is not supported
- coordJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(CoordinatorJob.Status.RUNNINGWITHERROR));
+ // during coord materialization, nextMaterializedTime is set to
+ // startTime + n(actions materialized) * frequency and this can be AFTER endTime,
+ // while doneMaterialization is true. Hence the following checks
+ // for newEndTime being in the middle of endTime and nextMatdTime.
+ // Since job is already done materialization so no need to change
+ boolean dontChange = coordJob.getEndTime().before(newEndTime)
+ && coordJob.getNextMaterializedTime() != null
+ && coordJob.getNextMaterializedTime().after(newEndTime);
+ if (!dontChange) {
+ coordJob.setEndTime(newEndTime);
+ if (coordJob.getStatus() == CoordinatorJob.Status.SUCCEEDED) {
+ coordJob.setStatus(CoordinatorJob.Status.RUNNING);
+ }
+ if (coordJob.getStatus() == CoordinatorJob.Status.DONEWITHERROR
+ || coordJob.getStatus() == CoordinatorJob.Status.FAILED) {
+ // Check for backward compatibility for Oozie versions (3.2 and before)
+ // when RUNNINGWITHERROR, SUSPENDEDWITHERROR and
+ // PAUSEDWITHERROR is not supported
+ coordJob.setStatus(StatusUtils
+ .getStatusIfBackwardSupportTrue(CoordinatorJob.Status.RUNNINGWITHERROR));
+ }
+ coordJob.setPending();
+ coordJob.resetDoneMaterialization();
}
- coordJob.setPending();
- coordJob.resetDoneMaterialization();
}
if (newConcurrency != null) {
@@ -376,6 +387,7 @@ public class CoordChangeXCommand extends CoordinatorXCommand<Void> {
coordJob.setDoneMaterialization();
}
+ coordJob.setLastModifiedTime(new Date());
updateList.add(new UpdateEntry<CoordJobQuery>(CoordJobQuery.UPDATE_COORD_JOB_CHANGE, coordJob));
BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, deleteList);
http://git-wip-us.apache.org/repos/asf/oozie/blob/6c69089f/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java
index fc65cf3..f5304ca 100644
--- a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java
+++ b/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java
@@ -17,10 +17,14 @@
*/
package org.apache.oozie.executor.jpa;
+import java.sql.Timestamp;
+import java.util.ArrayList;
import java.util.Date;
import java.util.List;
+
import javax.persistence.EntityManager;
import javax.persistence.Query;
+
import org.apache.oozie.CoordinatorActionBean;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.service.JPAService;
@@ -44,7 +48,8 @@ public class CoordActionQueryExecutor extends
UPDATE_COORD_ACTION_FOR_MODIFIED_DATE,
UPDATE_COORD_ACTION_RERUN,
GET_COORD_ACTION,
- GET_COORD_ACTIVE_ACTIONS_COUNT_BY_JOBID
+ GET_COORD_ACTIVE_ACTIONS_COUNT_BY_JOBID,
+ GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME
};
private static CoordActionQueryExecutor instance = new CoordActionQueryExecutor();
@@ -171,6 +176,9 @@ public class CoordActionQueryExecutor extends
case GET_COORD_ACTION:
query.setParameter("id", parameters[0]);
break;
+ case GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME:
+ query.setParameter("lastModifiedTime", new Timestamp(((Date) parameters[0]).getTime()));
+ break;
default:
throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for "
+ caQuery.name());
@@ -202,14 +210,30 @@ public class CoordActionQueryExecutor extends
throws JPAExecutorException {
EntityManager em = jpaService.getEntityManager();
Query query = getSelectQuery(namedQuery, em, parameters);
- List<CoordinatorActionBean> beanList = (List<CoordinatorActionBean>) jpaService.executeGetList(
- namedQuery.name(), query, em);
- if (beanList == null || beanList.size() == 0) {
- throw new JPAExecutorException(ErrorCode.E0605, query.toString());
+ List<?> retList = (List<?>) jpaService.executeGetList(namedQuery.name(), query, em);
+ List<CoordinatorActionBean> beanList = new ArrayList<CoordinatorActionBean>();
+ if (retList != null) {
+ for (Object ret : retList) {
+ beanList.add(constructBean(namedQuery, ret));
+ }
}
return beanList;
}
+ private CoordinatorActionBean constructBean(CoordActionQuery namedQuery, Object ret) throws JPAExecutorException {
+ CoordinatorActionBean bean;
+ switch (namedQuery) {
+ case GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME:
+ bean = new CoordinatorActionBean();
+ bean.setJobId((String) ret);
+ break;
+ default:
+ throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct action bean for "
+ + namedQuery.name());
+ }
+ return bean;
+ }
+
@VisibleForTesting
public static void destroy() {
if (instance != null) {
http://git-wip-us.apache.org/repos/asf/oozie/blob/6c69089f/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionsGetByLastModifiedTimeJPAExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionsGetByLastModifiedTimeJPAExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionsGetByLastModifiedTimeJPAExecutor.java
deleted file mode 100644
index 4f1e30b..0000000
--- a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionsGetByLastModifiedTimeJPAExecutor.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.oozie.executor.jpa;
-
-import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-
-import javax.persistence.EntityManager;
-import javax.persistence.Query;
-
-import org.apache.oozie.CoordinatorActionBean;
-import org.apache.oozie.ErrorCode;
-
-/**
- * Load the list of CoordinatorAction ordered by lastModifiedTime
- */
-public class CoordActionsGetByLastModifiedTimeJPAExecutor implements JPAExecutor<List<String>> {
- private Date d = null;
-
- public CoordActionsGetByLastModifiedTimeJPAExecutor(Date d) {
- this.d = d;
- }
-
- /* (non-Javadoc)
- * @see org.apache.oozie.executor.jpa.JPAExecutor#getName()
- */
- @Override
- public String getName() {
- return "CoordActionsGetByLastModifiedTimeJPAExecutor";
- }
-
- /* (non-Javadoc)
- * @see org.apache.oozie.executor.jpa.JPAExecutor#execute(javax.persistence.EntityManager)
- */
- @Override
- @SuppressWarnings("unchecked")
- public List<String> execute(EntityManager em) throws JPAExecutorException {
- try {
- Query q = em.createNamedQuery("GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME");
- q.setParameter("lastModifiedTime", new Timestamp(d.getTime()));
- List<String> coordJobIds = q.getResultList();
- return coordJobIds;
- }
- catch (Exception e) {
- throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
- }
-
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/oozie/blob/6c69089f/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java
index 8e97436..240b352 100644
--- a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java
+++ b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java
@@ -19,7 +19,9 @@ package org.apache.oozie.executor.jpa;
import java.sql.Timestamp;
import java.util.ArrayList;
+import java.util.Date;
import java.util.List;
+
import javax.persistence.EntityManager;
import javax.persistence.Query;
@@ -58,7 +60,8 @@ public class CoordJobQueryExecutor extends QueryExecutor<CoordinatorJobBean, Coo
GET_COORD_JOB_ACTION_KILL,
GET_COORD_JOB_MATERIALIZE,
GET_COORD_JOB_SUSPEND_KILL,
- GET_COORD_JOB_STATUS_PARENTID
+ GET_COORD_JOB_STATUS_PARENTID,
+ GET_COORD_JOBS_CHANGED
};
private static CoordJobQueryExecutor instance = new CoordJobQueryExecutor();
@@ -200,6 +203,9 @@ public class CoordJobQueryExecutor extends QueryExecutor<CoordinatorJobBean, Coo
case GET_COORD_JOB_STATUS_PARENTID:
query.setParameter("id", parameters[0]);
break;
+ case GET_COORD_JOBS_CHANGED:
+ query.setParameter("lastModifiedTime", new Timestamp(((Date)parameters[0]).getTime()));
+ break;
default:
throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for "
+ namedQuery.name());
@@ -300,6 +306,9 @@ public class CoordJobQueryExecutor extends QueryExecutor<CoordinatorJobBean, Coo
bean.setStatusStr((String) arr[0]);
bean.setBundleId((String) arr[1]);
break;
+ case GET_COORD_JOBS_CHANGED:
+ bean = (CoordinatorJobBean) ret;
+ break;
default:
throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct job bean for "
+ namedQuery.name());
http://git-wip-us.apache.org/repos/asf/oozie/blob/6c69089f/core/src/main/java/org/apache/oozie/service/StatusTransitService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/StatusTransitService.java b/core/src/main/java/org/apache/oozie/service/StatusTransitService.java
index 4e05a05..e093c7d 100644
--- a/core/src/main/java/org/apache/oozie/service/StatusTransitService.java
+++ b/core/src/main/java/org/apache/oozie/service/StatusTransitService.java
@@ -29,6 +29,7 @@ import java.util.Comparator;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.BundleActionBean;
import org.apache.oozie.BundleJobBean;
+import org.apache.oozie.CoordinatorActionBean;
import org.apache.oozie.CoordinatorJobBean;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.client.CoordinatorAction;
@@ -41,9 +42,9 @@ import org.apache.oozie.executor.jpa.BundleActionQueryExecutor.BundleActionQuery
import org.apache.oozie.executor.jpa.BundleJobQueryExecutor.BundleJobQuery;
import org.apache.oozie.executor.jpa.BundleActionQueryExecutor;
import org.apache.oozie.executor.jpa.BundleJobsGetRunningOrPendingJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordActionsGetByLastModifiedTimeJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
import org.apache.oozie.executor.jpa.CoordJobGetActionsStatusJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetPendingActionsCountJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
@@ -707,18 +708,20 @@ public class StatusTransitService implements Service {
LOG.info("Running coordinator status service from last instance time = "
+ DateUtils.formatDateOozieTZ(lastInstanceStartTime));
// this is not the first instance, we should only check jobs
- // that have actions been
+ // that have actions or jobs been
// updated >= start time of last service run;
- List<String> coordJobIdList = jpaService
- .execute(new CoordActionsGetByLastModifiedTimeJPAExecutor(lastInstanceStartTime));
+ List<CoordinatorActionBean> actionsList = CoordActionQueryExecutor.getInstance().getList(
+ CoordActionQuery.GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME, lastInstanceStartTime);
Set<String> coordIds = new HashSet<String>();
- coordIds.addAll(coordJobIdList);
+ for (CoordinatorActionBean action : actionsList) {
+ coordIds.add(action.getJobId());
+ }
pendingJobCheckList = new ArrayList<CoordinatorJobBean>();
for (String coordId : coordIds.toArray(new String[coordIds.size()])) {
CoordinatorJobBean coordJob;
- try{
- coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordId));
+ try {
+ coordJob = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, coordId);
}
catch (JPAExecutorException jpaee) {
if (jpaee.getErrorCode().equals(ErrorCode.E0604)) {
@@ -737,6 +740,8 @@ public class StatusTransitService implements Service {
pendingJobCheckList.add(coordJob);
}
}
+ pendingJobCheckList.addAll(CoordJobQueryExecutor.getInstance().getList(
+ CoordJobQuery.GET_COORD_JOBS_CHANGED, lastInstanceStartTime));
}
aggregateCoordJobsStatus(pendingJobCheckList);
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/6c69089f/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java b/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java
index 906882a..b9bbf16 100644
--- a/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java
@@ -35,6 +35,8 @@ import org.apache.oozie.executor.jpa.BatchQueryExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetActionByActionNumberJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.executor.jpa.sla.SLARegistrationGetJPAExecutor;
import org.apache.oozie.executor.jpa.sla.SLASummaryGetJPAExecutor;
@@ -280,6 +282,133 @@ public class TestCoordChangeXCommand extends XDataTestCase {
}
/**
+ * Testcase when changing end-time == nextMaterializedTime
+ * reflects correct job status via StatusTransit
+ *
+ * @throws Exception
+ */
+ public void testCoordChangeEndTime1() throws Exception {
+
+ JPAService jpaService = Services.get().get(JPAService.class);
+
+ Date startTime = new Date();
+ Date endTime = new Date(startTime.getTime() + (50 * 60 * 1000));
+ CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, true, true, 1);
+ coordJob.setNextMaterializedTime(new Date(startTime.getTime() + (30 * 60 * 1000)));
+ CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, coordJob);
+ addRecordToCoordActionTable(coordJob.getId(), 1, CoordinatorAction.Status.SUCCEEDED, "coord-action-get.xml", 0);
+
+ Runnable runnable = new StatusTransitService.StatusTransitRunnable();
+ runnable.run(); // dummy run so we get to the interval check following coord job change
+ sleep(1000);
+
+ assertEquals(endTime.getTime(), coordJob.getEndTime().getTime()); // checking before change
+
+ String newEndTime = convertDateToString(startTime.getTime() + 30 * 60 * 1000);
+
+ new CoordChangeXCommand(coordJob.getId(), "endtime=" + newEndTime).call();
+ try {
+ checkCoordJobs(coordJob.getId(), DateUtils.parseDateOozieTZ(newEndTime), null, null, false);
+ }
+ catch (Exception ex) {
+ ex.printStackTrace();
+ fail("Invalid date" + ex);
+ }
+
+ CoordJobGetJPAExecutor coordGetCmd = new CoordJobGetJPAExecutor(coordJob.getId());
+ coordJob = jpaService.execute(coordGetCmd);
+ assertEquals(Job.Status.RUNNING, coordJob.getStatus());
+ assertEquals(newEndTime, convertDateToString(coordJob.getEndTime().getTime())); // checking after change
+ assertTrue(coordJob.isPending());
+ assertTrue(coordJob.isDoneMaterialization());
+
+ runnable.run();
+ sleep(1000);
+ coordJob = jpaService.execute(coordGetCmd);
+ assertEquals(Job.Status.SUCCEEDED, coordJob.getStatus());
+ assertFalse(coordJob.isPending());
+ assertTrue(coordJob.isDoneMaterialization());
+ }
+
+ /**
+ * Testcase when changing end-time > nextMaterializedTime, but < original end
+ * reflects correct job state and values
+ *
+ * @throws Exception
+ */
+ public void testCoordChangeEndTime2() throws Exception {
+
+ JPAService jpaService = Services.get().get(JPAService.class);
+
+ Date startTime = new Date();
+ Date endTime = new Date(startTime.getTime() + (50 * 60 * 1000));
+ CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, true, true, 1);
+ coordJob.setNextMaterializedTime(new Date(startTime.getTime() + (30 * 60 * 1000)));
+ CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, coordJob);
+ addRecordToCoordActionTable(coordJob.getId(), 1, CoordinatorAction.Status.SUCCEEDED, "coord-action-get.xml", 0);
+
+ assertTrue(coordJob.isDoneMaterialization()); // checking initial condition before change
+
+ Runnable runnable = new StatusTransitService.StatusTransitRunnable();
+ runnable.run(); // dummy run so we get to the interval check following coord job change
+ sleep(1000);
+
+ String newEndTime = convertDateToString(startTime.getTime() + 40 * 60 * 1000);
+
+ new CoordChangeXCommand(coordJob.getId(), "endtime=" + newEndTime).call();
+ try {
+ checkCoordJobs(coordJob.getId(), DateUtils.parseDateOozieTZ(newEndTime), null, null, false);
+ }
+ catch (Exception ex) {
+ ex.printStackTrace();
+ fail("Invalid date" + ex);
+ }
+
+ CoordJobGetJPAExecutor coordGetCmd = new CoordJobGetJPAExecutor(coordJob.getId());
+ coordJob = jpaService.execute(coordGetCmd);
+ assertEquals(Job.Status.RUNNING, coordJob.getStatus());
+ assertTrue(coordJob.isPending());
+ assertFalse(coordJob.isDoneMaterialization()); // <-- changed
+ assertEquals(newEndTime, convertDateToString(coordJob.getEndTime().getTime()));
+
+ }
+
+ /**
+ * Testcase when changing end-time to after original end-time
+ * but before nextMaterializedTime should not cause unnecessary changes
+ *
+ * @throws Exception
+ */
+ public void testCoordChangeEndTime3() throws Exception {
+ JPAService jpaService = Services.get().get(JPAService.class);
+ Date startTime = new Date();
+ Date endTime = new Date(startTime.getTime() + (10 * 60 * 1000));
+ CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, true, true, 1);
+ coordJob.setNextMaterializedTime(new Date(startTime.getTime() + (40 * 60 * 1000)));
+ CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, coordJob);
+ addRecordToCoordActionTable(coordJob.getId(), 1, CoordinatorAction.Status.SUCCEEDED, "coord-action-get.xml", 0);
+
+ Runnable runnable = new StatusTransitService.StatusTransitRunnable();
+ runnable.run();
+
+ CoordJobGetJPAExecutor coordGetCmd = new CoordJobGetJPAExecutor(coordJob.getId());
+ coordJob = jpaService.execute(coordGetCmd);
+ assertEquals(Job.Status.SUCCEEDED, coordJob.getStatus());
+ assertFalse(coordJob.isPending());
+ assertTrue(coordJob.isDoneMaterialization());
+
+ String newEndTime = convertDateToString(startTime.getTime() + 20 * 60 * 1000);
+
+ new CoordChangeXCommand(coordJob.getId(), "endtime=" + newEndTime).call();
+
+ coordJob = jpaService.execute(coordGetCmd);
+ assertFalse(Job.Status.RUNNING == coordJob.getStatus());
+ assertFalse(coordJob.isPending());
+ assertTrue(coordJob.isDoneMaterialization());
+
+ }
+
+ /**
* Change the pause time and end time of a failed coordinator job. Check whether the status changes
* to RUNNINGWITHERROR
* @throws Exception
@@ -407,8 +536,9 @@ public class TestCoordChangeXCommand extends XDataTestCase {
new CoordChangeXCommand(job.getId(), pauseTimeChangeStr).call();
fail("Should not reach here.");
} catch(CommandException e) {
- if(e.getErrorCode() != ErrorCode.E1022)
+ if(e.getErrorCode() != ErrorCode.E1022) {
fail("Error code should be E1022");
+ }
}
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/6c69089f/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordActionGetByLastModifiedTimeJPAExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordActionGetByLastModifiedTimeJPAExecutor.java b/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordActionGetByLastModifiedTimeJPAExecutor.java
deleted file mode 100644
index e799e2d..0000000
--- a/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordActionGetByLastModifiedTimeJPAExecutor.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.oozie.executor.jpa;
-
-import java.util.Date;
-import java.util.List;
-
-import org.apache.oozie.CoordinatorJobBean;
-import org.apache.oozie.client.CoordinatorAction;
-import org.apache.oozie.client.CoordinatorJob;
-import org.apache.oozie.local.LocalOozie;
-import org.apache.oozie.service.JPAService;
-import org.apache.oozie.service.Services;
-import org.apache.oozie.test.XDataTestCase;
-
-public class TestCoordActionGetByLastModifiedTimeJPAExecutor extends XDataTestCase {
- Services services;
-
- @Override
- protected void setUp() throws Exception {
- super.setUp();
- services = new Services();
- services.init();
- }
-
- @Override
- protected void tearDown() throws Exception {
- services.destroy();
- super.tearDown();
- }
-
- public void testCoordActionGet() throws Exception {
- // Get date before action is created
- Date dateBeforeAction = new Date();
- int actionNum = 1;
- // Add two jobs with lastmodifiedtime > dateBeforeAction
- CoordinatorJobBean job1 = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
- String jobId1 = job1.getId();
- CoordinatorJobBean job2 = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
- String jobId2 = job2.getId();
- addRecordToCoordActionTable(jobId1, actionNum++,
- CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 0);
- addRecordToCoordActionTable(jobId2, actionNum++,
- CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 0);
-
- _testCoordActionGetByLastModifiedTime(jobId1, jobId2, dateBeforeAction);
- }
-
-
- private void _testCoordActionGetByLastModifiedTime(String jobId1, String jobId2, Date dateBeforeAction)
- throws Exception {
- JPAService jpaService = Services.get().get(JPAService.class);
- assertNotNull(jpaService);
- // Call JPAExecutor to get actions which are modified after the given
- // date
- List<String> coordJobIds = jpaService
- .execute(new CoordActionsGetByLastModifiedTimeJPAExecutor(dateBeforeAction));
- assertEquals(2, coordJobIds.size());
- assertEquals(jobId1, coordJobIds.get(0));
- assertEquals(jobId2, coordJobIds.get(1));
- }
-
-
-
-}
http://git-wip-us.apache.org/repos/asf/oozie/blob/6c69089f/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobQueryExecutor.java b/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobQueryExecutor.java
index 9939bbd..1e75bbd 100644
--- a/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobQueryExecutor.java
+++ b/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobQueryExecutor.java
@@ -17,6 +17,10 @@
*/
package org.apache.oozie.executor.jpa;
+import java.sql.Timestamp;
+import java.util.Date;
+import java.util.List;
+
import javax.persistence.EntityManager;
import javax.persistence.Query;
@@ -257,7 +261,25 @@ public class TestCoordJobQueryExecutor extends XDataTestCase {
}
public void testGetList() throws Exception {
- // TODO
+ CoordinatorJobBean bean1 = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, true, true);
+ CoordinatorJobBean bean2 = addRecordToCoordJobTable(CoordinatorJob.Status.DONEWITHERROR, true, true);
+
+ // time to check last modified time against
+ Date queryTime = new Date();
+ bean1.setLastModifiedTime(new Date(queryTime.getTime() + 1000));
+ bean2.setLastModifiedTime(new Date(queryTime.getTime() + 2000));
+ CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_LAST_MODIFIED_TIME, bean1);
+ CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_LAST_MODIFIED_TIME, bean2);
+
+ // GET_COORD_JOBS_CHANGED
+ List<CoordinatorJobBean> retBeans = CoordJobQueryExecutor.getInstance().getList(
+ CoordJobQuery.GET_COORD_JOBS_CHANGED, new Timestamp(queryTime.getTime()));
+ assertEquals(2, retBeans.size());
+ assertEquals(bean1.getId(), retBeans.get(0).getId());
+ assertEquals(bean1.getStatus(), retBeans.get(0).getStatus());
+
+ assertEquals(bean2.getId(), retBeans.get(1).getId());
+ assertEquals(bean2.getStatus(), retBeans.get(1).getStatus());
}
public void testInsert() throws Exception {
http://git-wip-us.apache.org/repos/asf/oozie/blob/6c69089f/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 3aa3186..6ee2aa2 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
-- Oozie 4.1.0 release (trunk - unreleased)
+OOZIE-1632 Coordinators that undergo change endtime but are doneMaterialization, not getting picked for StatusTransit (mona)
OOZIE-1548 OozieDBCLI changes to convert clob to blob and remove the discriminator column (virag)
OOZIE-1504 Allow specifying a fixed instance as the start instance of a data-in (puru via rohini)
OOZIE-1576 Add documentation for Oozie Sqoop CLI (bowenzhangusa via rkanter)