You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by mo...@apache.org on 2013/12/31 01:34:41 UTC

git commit: OOZIE-1632 Coordinators that undergo change endtime but are doneMaterialization, not getting picked for StatusTransit (mona)

Updated Branches:
  refs/heads/master 8ca266fac -> 6c69089fd


OOZIE-1632 Coordinators that undergo change endtime but are doneMaterialization, not getting picked for StatusTransit (mona)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/6c69089f
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/6c69089f
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/6c69089f

Branch: refs/heads/master
Commit: 6c69089fde5aeaffc090b0d42ff5345a1d568651
Parents: 8ca266f
Author: Mona Chitnis <ch...@yahoo-inc.com>
Authored: Mon Dec 30 16:34:11 2013 -0800
Committer: Mona Chitnis <ch...@yahoo-inc.com>
Committed: Mon Dec 30 16:34:11 2013 -0800

----------------------------------------------------------------------
 .../org/apache/oozie/CoordinatorJobBean.java    |   2 +
 .../command/coord/CoordChangeXCommand.java      |  36 +++--
 .../executor/jpa/CoordActionQueryExecutor.java  |  34 ++++-
 ...ActionsGetByLastModifiedTimeJPAExecutor.java |  68 ----------
 .../executor/jpa/CoordJobQueryExecutor.java     |  11 +-
 .../oozie/service/StatusTransitService.java     |  21 +--
 .../command/coord/TestCoordChangeXCommand.java  | 132 ++++++++++++++++++-
 ...dActionGetByLastModifiedTimeJPAExecutor.java |  80 -----------
 .../executor/jpa/TestCoordJobQueryExecutor.java |  24 +++-
 release-log.txt                                 |   1 +
 10 files changed, 233 insertions(+), 176 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/6c69089f/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java b/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java
index f189b69..16209ce 100644
--- a/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java
+++ b/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java
@@ -97,6 +97,8 @@ import org.json.simple.JSONObject;
 
         @NamedQuery(name = "GET_COORD_JOBS_PENDING", query = "select OBJECT(w) from CoordinatorJobBean w where w.pending = 1 order by w.lastModifiedTimestamp"),
 
+        @NamedQuery(name = "GET_COORD_JOBS_CHANGED", query = "select OBJECT(w) from CoordinatorJobBean w where w.pending = 1 AND w.doneMaterialization = 1 AND w.lastModifiedTimestamp >= :lastModifiedTime"),
+
         @NamedQuery(name = "GET_COORD_JOBS_COUNT", query = "select count(w) from CoordinatorJobBean w"),
 
         @NamedQuery(name = "GET_COORD_JOBS_COLUMNS", query = "select w.id, w.appName, w.statusStr, w.user, w.group, w.startTimestamp, w.endTimestamp, w.appPath, w.concurrency, w.frequency, w.lastActionTimestamp, w.nextMaterializedTimestamp, w.createdTimestamp, w.timeUnitStr, w.timeZone, w.timeOut from CoordinatorJobBean w order by w.createdTimestamp desc"),

http://git-wip-us.apache.org/repos/asf/oozie/blob/6c69089f/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java
index 65685db..4957330 100644
--- a/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java
@@ -325,19 +325,30 @@ public class CoordChangeXCommand extends CoordinatorXCommand<Void> {
 
         try {
             if (newEndTime != null) {
-                coordJob.setEndTime(newEndTime);
-                if (coordJob.getStatus() == CoordinatorJob.Status.SUCCEEDED){
-                    coordJob.setStatus(CoordinatorJob.Status.RUNNING);
-                }
-                if (coordJob.getStatus() == CoordinatorJob.Status.DONEWITHERROR
-                        || coordJob.getStatus() == CoordinatorJob.Status.FAILED) {
-                    // Check for backward compatibility for Oozie versions (3.2 and before)
-                    // when RUNNINGWITHERROR, SUSPENDEDWITHERROR and
-                    // PAUSEDWITHERROR is not supported
-                    coordJob.setStatus(StatusUtils.getStatusIfBackwardSupportTrue(CoordinatorJob.Status.RUNNINGWITHERROR));
+                // during coord materialization, nextMaterializedTime is set to
+                // startTime + n(actions materialized) * frequency and this can be AFTER endTime,
+                // while doneMaterialization is true. Hence the following checks
+                // for newEndTime being in the middle of endTime and nextMatdTime.
+                // Since job is already done materialization so no need to change
+                boolean dontChange = coordJob.getEndTime().before(newEndTime)
+                        && coordJob.getNextMaterializedTime() != null
+                        && coordJob.getNextMaterializedTime().after(newEndTime);
+                if (!dontChange) {
+                    coordJob.setEndTime(newEndTime);
+                    if (coordJob.getStatus() == CoordinatorJob.Status.SUCCEEDED) {
+                        coordJob.setStatus(CoordinatorJob.Status.RUNNING);
+                    }
+                    if (coordJob.getStatus() == CoordinatorJob.Status.DONEWITHERROR
+                            || coordJob.getStatus() == CoordinatorJob.Status.FAILED) {
+                        // Check for backward compatibility for Oozie versions (3.2 and before)
+                        // when RUNNINGWITHERROR, SUSPENDEDWITHERROR and
+                        // PAUSEDWITHERROR is not supported
+                        coordJob.setStatus(StatusUtils
+                                .getStatusIfBackwardSupportTrue(CoordinatorJob.Status.RUNNINGWITHERROR));
+                    }
+                    coordJob.setPending();
+                    coordJob.resetDoneMaterialization();
                 }
-                coordJob.setPending();
-                coordJob.resetDoneMaterialization();
             }
 
             if (newConcurrency != null) {
@@ -376,6 +387,7 @@ public class CoordChangeXCommand extends CoordinatorXCommand<Void> {
                 coordJob.setDoneMaterialization();
             }
 
+            coordJob.setLastModifiedTime(new Date());
             updateList.add(new UpdateEntry<CoordJobQuery>(CoordJobQuery.UPDATE_COORD_JOB_CHANGE, coordJob));
             BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, deleteList);
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/6c69089f/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java
index fc65cf3..f5304ca 100644
--- a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java
+++ b/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java
@@ -17,10 +17,14 @@
  */
 package org.apache.oozie.executor.jpa;
 
+import java.sql.Timestamp;
+import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
+
 import javax.persistence.EntityManager;
 import javax.persistence.Query;
+
 import org.apache.oozie.CoordinatorActionBean;
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.service.JPAService;
@@ -44,7 +48,8 @@ public class CoordActionQueryExecutor extends
         UPDATE_COORD_ACTION_FOR_MODIFIED_DATE,
         UPDATE_COORD_ACTION_RERUN,
         GET_COORD_ACTION,
-        GET_COORD_ACTIVE_ACTIONS_COUNT_BY_JOBID
+        GET_COORD_ACTIVE_ACTIONS_COUNT_BY_JOBID,
+        GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME
     };
 
     private static CoordActionQueryExecutor instance = new CoordActionQueryExecutor();
@@ -171,6 +176,9 @@ public class CoordActionQueryExecutor extends
             case GET_COORD_ACTION:
                 query.setParameter("id", parameters[0]);
                 break;
+            case GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME:
+                query.setParameter("lastModifiedTime", new Timestamp(((Date) parameters[0]).getTime()));
+                break;
             default:
                 throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for "
                         + caQuery.name());
@@ -202,14 +210,30 @@ public class CoordActionQueryExecutor extends
             throws JPAExecutorException {
         EntityManager em = jpaService.getEntityManager();
         Query query = getSelectQuery(namedQuery, em, parameters);
-        List<CoordinatorActionBean> beanList = (List<CoordinatorActionBean>) jpaService.executeGetList(
-                namedQuery.name(), query, em);
-        if (beanList == null || beanList.size() == 0) {
-            throw new JPAExecutorException(ErrorCode.E0605, query.toString());
+        List<?> retList = (List<?>) jpaService.executeGetList(namedQuery.name(), query, em);
+        List<CoordinatorActionBean> beanList = new ArrayList<CoordinatorActionBean>();
+        if (retList != null) {
+            for (Object ret : retList) {
+                beanList.add(constructBean(namedQuery, ret));
+            }
         }
         return beanList;
     }
 
+    private CoordinatorActionBean constructBean(CoordActionQuery namedQuery, Object ret) throws JPAExecutorException {
+        CoordinatorActionBean bean;
+        switch (namedQuery) {
+            case GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME:
+                bean = new CoordinatorActionBean();
+                bean.setJobId((String) ret);
+                break;
+            default:
+                throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct action bean for "
+                        + namedQuery.name());
+        }
+        return bean;
+    }
+
     @VisibleForTesting
     public static void destroy() {
         if (instance != null) {

http://git-wip-us.apache.org/repos/asf/oozie/blob/6c69089f/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionsGetByLastModifiedTimeJPAExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionsGetByLastModifiedTimeJPAExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionsGetByLastModifiedTimeJPAExecutor.java
deleted file mode 100644
index 4f1e30b..0000000
--- a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionsGetByLastModifiedTimeJPAExecutor.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.oozie.executor.jpa;
-
-import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-
-import javax.persistence.EntityManager;
-import javax.persistence.Query;
-
-import org.apache.oozie.CoordinatorActionBean;
-import org.apache.oozie.ErrorCode;
-
-/**
- * Load the list of CoordinatorAction ordered by lastModifiedTime
- */
-public class CoordActionsGetByLastModifiedTimeJPAExecutor implements JPAExecutor<List<String>> {
-    private Date d = null;
-
-    public CoordActionsGetByLastModifiedTimeJPAExecutor(Date d) {
-        this.d = d;
-    }
-
-    /* (non-Javadoc)
-     * @see org.apache.oozie.executor.jpa.JPAExecutor#getName()
-     */
-    @Override
-    public String getName() {
-        return "CoordActionsGetByLastModifiedTimeJPAExecutor";
-    }
-
-    /* (non-Javadoc)
-     * @see org.apache.oozie.executor.jpa.JPAExecutor#execute(javax.persistence.EntityManager)
-     */
-    @Override
-    @SuppressWarnings("unchecked")
-    public List<String> execute(EntityManager em) throws JPAExecutorException {
-        try {
-            Query q = em.createNamedQuery("GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME");
-            q.setParameter("lastModifiedTime", new Timestamp(d.getTime()));
-            List<String> coordJobIds = q.getResultList();
-            return coordJobIds;
-        }
-        catch (Exception e) {
-            throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
-        }
-
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/oozie/blob/6c69089f/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java
index 8e97436..240b352 100644
--- a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java
+++ b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java
@@ -19,7 +19,9 @@ package org.apache.oozie.executor.jpa;
 
 import java.sql.Timestamp;
 import java.util.ArrayList;
+import java.util.Date;
 import java.util.List;
+
 import javax.persistence.EntityManager;
 import javax.persistence.Query;
 
@@ -58,7 +60,8 @@ public class CoordJobQueryExecutor extends QueryExecutor<CoordinatorJobBean, Coo
         GET_COORD_JOB_ACTION_KILL,
         GET_COORD_JOB_MATERIALIZE,
         GET_COORD_JOB_SUSPEND_KILL,
-        GET_COORD_JOB_STATUS_PARENTID
+        GET_COORD_JOB_STATUS_PARENTID,
+        GET_COORD_JOBS_CHANGED
     };
 
     private static CoordJobQueryExecutor instance = new CoordJobQueryExecutor();
@@ -200,6 +203,9 @@ public class CoordJobQueryExecutor extends QueryExecutor<CoordinatorJobBean, Coo
             case GET_COORD_JOB_STATUS_PARENTID:
                 query.setParameter("id", parameters[0]);
                 break;
+            case GET_COORD_JOBS_CHANGED:
+                query.setParameter("lastModifiedTime", new Timestamp(((Date)parameters[0]).getTime()));
+                break;
             default:
                 throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for "
                         + namedQuery.name());
@@ -300,6 +306,9 @@ public class CoordJobQueryExecutor extends QueryExecutor<CoordinatorJobBean, Coo
                 bean.setStatusStr((String) arr[0]);
                 bean.setBundleId((String) arr[1]);
                 break;
+            case GET_COORD_JOBS_CHANGED:
+                bean = (CoordinatorJobBean) ret;
+                break;
             default:
                 throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct job bean for "
                         + namedQuery.name());

http://git-wip-us.apache.org/repos/asf/oozie/blob/6c69089f/core/src/main/java/org/apache/oozie/service/StatusTransitService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/StatusTransitService.java b/core/src/main/java/org/apache/oozie/service/StatusTransitService.java
index 4e05a05..e093c7d 100644
--- a/core/src/main/java/org/apache/oozie/service/StatusTransitService.java
+++ b/core/src/main/java/org/apache/oozie/service/StatusTransitService.java
@@ -29,6 +29,7 @@ import java.util.Comparator;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.oozie.BundleActionBean;
 import org.apache.oozie.BundleJobBean;
+import org.apache.oozie.CoordinatorActionBean;
 import org.apache.oozie.CoordinatorJobBean;
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.client.CoordinatorAction;
@@ -41,9 +42,9 @@ import org.apache.oozie.executor.jpa.BundleActionQueryExecutor.BundleActionQuery
 import org.apache.oozie.executor.jpa.BundleJobQueryExecutor.BundleJobQuery;
 import org.apache.oozie.executor.jpa.BundleActionQueryExecutor;
 import org.apache.oozie.executor.jpa.BundleJobsGetRunningOrPendingJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordActionsGetByLastModifiedTimeJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
 import org.apache.oozie.executor.jpa.CoordJobGetActionsStatusJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetPendingActionsCountJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
 import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
@@ -707,18 +708,20 @@ public class StatusTransitService implements Service {
                 LOG.info("Running coordinator status service from last instance time =  "
                         + DateUtils.formatDateOozieTZ(lastInstanceStartTime));
                 // this is not the first instance, we should only check jobs
-                // that have actions been
+                // that have actions or jobs been
                 // updated >= start time of last service run;
-                List<String> coordJobIdList = jpaService
-                        .execute(new CoordActionsGetByLastModifiedTimeJPAExecutor(lastInstanceStartTime));
+                List<CoordinatorActionBean> actionsList = CoordActionQueryExecutor.getInstance().getList(
+                        CoordActionQuery.GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME, lastInstanceStartTime);
                 Set<String> coordIds = new HashSet<String>();
-                coordIds.addAll(coordJobIdList);
+                for (CoordinatorActionBean action : actionsList) {
+                    coordIds.add(action.getJobId());
+                }
 
                 pendingJobCheckList = new ArrayList<CoordinatorJobBean>();
                 for (String coordId : coordIds.toArray(new String[coordIds.size()])) {
                     CoordinatorJobBean coordJob;
-                    try{
-                        coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordId));
+                    try {
+                        coordJob = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, coordId);
                     }
                     catch (JPAExecutorException jpaee) {
                         if (jpaee.getErrorCode().equals(ErrorCode.E0604)) {
@@ -737,6 +740,8 @@ public class StatusTransitService implements Service {
                         pendingJobCheckList.add(coordJob);
                     }
                 }
+                pendingJobCheckList.addAll(CoordJobQueryExecutor.getInstance().getList(
+                        CoordJobQuery.GET_COORD_JOBS_CHANGED, lastInstanceStartTime));
             }
             aggregateCoordJobsStatus(pendingJobCheckList);
         }

http://git-wip-us.apache.org/repos/asf/oozie/blob/6c69089f/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java b/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java
index 906882a..b9bbf16 100644
--- a/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java
@@ -35,6 +35,8 @@ import org.apache.oozie.executor.jpa.BatchQueryExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetActionByActionNumberJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.executor.jpa.sla.SLARegistrationGetJPAExecutor;
 import org.apache.oozie.executor.jpa.sla.SLASummaryGetJPAExecutor;
@@ -280,6 +282,133 @@ public class TestCoordChangeXCommand extends XDataTestCase {
     }
 
     /**
+     * Testcase when changing end-time == nextMaterializedTime
+     * reflects correct job status via StatusTransit
+     *
+     * @throws Exception
+     */
+    public void testCoordChangeEndTime1() throws Exception {
+
+        JPAService jpaService = Services.get().get(JPAService.class);
+
+        Date startTime = new Date();
+        Date endTime = new Date(startTime.getTime() + (50 * 60 * 1000));
+        CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, true, true, 1);
+        coordJob.setNextMaterializedTime(new Date(startTime.getTime() + (30 * 60 * 1000)));
+        CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, coordJob);
+        addRecordToCoordActionTable(coordJob.getId(), 1, CoordinatorAction.Status.SUCCEEDED, "coord-action-get.xml", 0);
+
+        Runnable runnable = new StatusTransitService.StatusTransitRunnable();
+        runnable.run(); // dummy run so we get to the interval check following coord job change
+        sleep(1000);
+
+        assertEquals(endTime.getTime(), coordJob.getEndTime().getTime()); // checking before change
+
+        String newEndTime = convertDateToString(startTime.getTime() + 30 * 60 * 1000);
+
+        new CoordChangeXCommand(coordJob.getId(), "endtime=" + newEndTime).call();
+        try {
+            checkCoordJobs(coordJob.getId(), DateUtils.parseDateOozieTZ(newEndTime), null, null, false);
+        }
+        catch (Exception ex) {
+            ex.printStackTrace();
+            fail("Invalid date" + ex);
+        }
+
+        CoordJobGetJPAExecutor coordGetCmd = new CoordJobGetJPAExecutor(coordJob.getId());
+        coordJob = jpaService.execute(coordGetCmd);
+        assertEquals(Job.Status.RUNNING, coordJob.getStatus());
+        assertEquals(newEndTime, convertDateToString(coordJob.getEndTime().getTime())); // checking after change
+        assertTrue(coordJob.isPending());
+        assertTrue(coordJob.isDoneMaterialization());
+
+        runnable.run();
+        sleep(1000);
+        coordJob = jpaService.execute(coordGetCmd);
+        assertEquals(Job.Status.SUCCEEDED, coordJob.getStatus());
+        assertFalse(coordJob.isPending());
+        assertTrue(coordJob.isDoneMaterialization());
+    }
+
+    /**
+     * Testcase when changing end-time > nextMaterializedTime, but < original end
+     * reflects correct job state and values
+     *
+     * @throws Exception
+     */
+    public void testCoordChangeEndTime2() throws Exception {
+
+        JPAService jpaService = Services.get().get(JPAService.class);
+
+        Date startTime = new Date();
+        Date endTime = new Date(startTime.getTime() + (50 * 60 * 1000));
+        CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, true, true, 1);
+        coordJob.setNextMaterializedTime(new Date(startTime.getTime() + (30 * 60 * 1000)));
+        CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, coordJob);
+        addRecordToCoordActionTable(coordJob.getId(), 1, CoordinatorAction.Status.SUCCEEDED, "coord-action-get.xml", 0);
+
+        assertTrue(coordJob.isDoneMaterialization()); // checking initial condition before change
+
+        Runnable runnable = new StatusTransitService.StatusTransitRunnable();
+        runnable.run(); // dummy run so we get to the interval check following coord job change
+        sleep(1000);
+
+        String newEndTime = convertDateToString(startTime.getTime() + 40 * 60 * 1000);
+
+        new CoordChangeXCommand(coordJob.getId(), "endtime=" + newEndTime).call();
+        try {
+            checkCoordJobs(coordJob.getId(), DateUtils.parseDateOozieTZ(newEndTime), null, null, false);
+        }
+        catch (Exception ex) {
+            ex.printStackTrace();
+            fail("Invalid date" + ex);
+        }
+
+        CoordJobGetJPAExecutor coordGetCmd = new CoordJobGetJPAExecutor(coordJob.getId());
+        coordJob = jpaService.execute(coordGetCmd);
+        assertEquals(Job.Status.RUNNING, coordJob.getStatus());
+        assertTrue(coordJob.isPending());
+        assertFalse(coordJob.isDoneMaterialization()); // <-- changed
+        assertEquals(newEndTime, convertDateToString(coordJob.getEndTime().getTime()));
+
+    }
+
+    /**
+     * Testcase when changing end-time to after original end-time
+     * but before nextMaterializedTime should not cause unnecessary changes
+     *
+     * @throws Exception
+     */
+    public void testCoordChangeEndTime3() throws Exception {
+        JPAService jpaService = Services.get().get(JPAService.class);
+        Date startTime = new Date();
+        Date endTime = new Date(startTime.getTime() + (10 * 60 * 1000));
+        CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, true, true, 1);
+        coordJob.setNextMaterializedTime(new Date(startTime.getTime() + (40 * 60 * 1000)));
+        CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, coordJob);
+        addRecordToCoordActionTable(coordJob.getId(), 1, CoordinatorAction.Status.SUCCEEDED, "coord-action-get.xml", 0);
+
+        Runnable runnable = new StatusTransitService.StatusTransitRunnable();
+        runnable.run();
+
+        CoordJobGetJPAExecutor coordGetCmd = new CoordJobGetJPAExecutor(coordJob.getId());
+        coordJob = jpaService.execute(coordGetCmd);
+        assertEquals(Job.Status.SUCCEEDED, coordJob.getStatus());
+        assertFalse(coordJob.isPending());
+        assertTrue(coordJob.isDoneMaterialization());
+
+        String newEndTime = convertDateToString(startTime.getTime() + 20 * 60 * 1000);
+
+        new CoordChangeXCommand(coordJob.getId(), "endtime=" + newEndTime).call();
+
+        coordJob = jpaService.execute(coordGetCmd);
+        assertFalse(Job.Status.RUNNING == coordJob.getStatus());
+        assertFalse(coordJob.isPending());
+        assertTrue(coordJob.isDoneMaterialization());
+
+    }
+
+    /**
      * Change the pause time and end time of a failed coordinator job. Check whether the status changes
      * to RUNNINGWITHERROR
      * @throws Exception
@@ -407,8 +536,9 @@ public class TestCoordChangeXCommand extends XDataTestCase {
             new CoordChangeXCommand(job.getId(), pauseTimeChangeStr).call();
             fail("Should not reach here.");
         } catch(CommandException e) {
-            if(e.getErrorCode() != ErrorCode.E1022)
+            if(e.getErrorCode() != ErrorCode.E1022) {
                 fail("Error code should be E1022");
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/6c69089f/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordActionGetByLastModifiedTimeJPAExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordActionGetByLastModifiedTimeJPAExecutor.java b/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordActionGetByLastModifiedTimeJPAExecutor.java
deleted file mode 100644
index e799e2d..0000000
--- a/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordActionGetByLastModifiedTimeJPAExecutor.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.oozie.executor.jpa;
-
-import java.util.Date;
-import java.util.List;
-
-import org.apache.oozie.CoordinatorJobBean;
-import org.apache.oozie.client.CoordinatorAction;
-import org.apache.oozie.client.CoordinatorJob;
-import org.apache.oozie.local.LocalOozie;
-import org.apache.oozie.service.JPAService;
-import org.apache.oozie.service.Services;
-import org.apache.oozie.test.XDataTestCase;
-
-public class TestCoordActionGetByLastModifiedTimeJPAExecutor extends XDataTestCase {
-    Services services;
-
-    @Override
-    protected void setUp() throws Exception {
-        super.setUp();
-        services = new Services();
-        services.init();
-    }
-
-    @Override
-    protected void tearDown() throws Exception {
-        services.destroy();
-        super.tearDown();
-    }
-
-    public void testCoordActionGet() throws Exception {
-        // Get date before action is created
-        Date dateBeforeAction = new Date();
-        int actionNum = 1;
-        // Add two jobs with lastmodifiedtime > dateBeforeAction
-        CoordinatorJobBean job1 = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
-        String jobId1 = job1.getId();
-        CoordinatorJobBean job2 = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
-        String jobId2 = job2.getId();
-        addRecordToCoordActionTable(jobId1, actionNum++,
-                CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 0);
-        addRecordToCoordActionTable(jobId2, actionNum++,
-                CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 0);
-
-        _testCoordActionGetByLastModifiedTime(jobId1, jobId2, dateBeforeAction);
-    }
-
-
-    private void _testCoordActionGetByLastModifiedTime(String jobId1, String jobId2, Date dateBeforeAction)
-            throws Exception {
-        JPAService jpaService = Services.get().get(JPAService.class);
-        assertNotNull(jpaService);
-        // Call JPAExecutor to get actions which are modified after the given
-        // date
-        List<String> coordJobIds = jpaService
-                .execute(new CoordActionsGetByLastModifiedTimeJPAExecutor(dateBeforeAction));
-        assertEquals(2, coordJobIds.size());
-        assertEquals(jobId1, coordJobIds.get(0));
-        assertEquals(jobId2, coordJobIds.get(1));
-    }
-
-
-
-}

http://git-wip-us.apache.org/repos/asf/oozie/blob/6c69089f/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobQueryExecutor.java b/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobQueryExecutor.java
index 9939bbd..1e75bbd 100644
--- a/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobQueryExecutor.java
+++ b/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobQueryExecutor.java
@@ -17,6 +17,10 @@
  */
 package org.apache.oozie.executor.jpa;
 
+import java.sql.Timestamp;
+import java.util.Date;
+import java.util.List;
+
 import javax.persistence.EntityManager;
 import javax.persistence.Query;
 
@@ -257,7 +261,25 @@ public class TestCoordJobQueryExecutor extends XDataTestCase {
     }
 
     public void testGetList() throws Exception {
-        // TODO
+        CoordinatorJobBean bean1 = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, true, true);
+        CoordinatorJobBean bean2 = addRecordToCoordJobTable(CoordinatorJob.Status.DONEWITHERROR, true, true);
+
+        // time to check last modified time against
+        Date queryTime = new Date();
+        bean1.setLastModifiedTime(new Date(queryTime.getTime() + 1000));
+        bean2.setLastModifiedTime(new Date(queryTime.getTime() + 2000));
+        CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_LAST_MODIFIED_TIME, bean1);
+        CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_LAST_MODIFIED_TIME, bean2);
+
+        // GET_COORD_JOBS_CHANGED
+        List<CoordinatorJobBean> retBeans = CoordJobQueryExecutor.getInstance().getList(
+                CoordJobQuery.GET_COORD_JOBS_CHANGED, new Timestamp(queryTime.getTime()));
+        assertEquals(2, retBeans.size());
+        assertEquals(bean1.getId(), retBeans.get(0).getId());
+        assertEquals(bean1.getStatus(), retBeans.get(0).getStatus());
+
+        assertEquals(bean2.getId(), retBeans.get(1).getId());
+        assertEquals(bean2.getStatus(), retBeans.get(1).getStatus());
     }
 
     public void testInsert() throws Exception {

http://git-wip-us.apache.org/repos/asf/oozie/blob/6c69089f/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 3aa3186..6ee2aa2 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.1.0 release (trunk - unreleased)
 
+OOZIE-1632 Coordinators that undergo change endtime but are doneMaterialization, not getting picked for StatusTransit (mona)
 OOZIE-1548 OozieDBCLI changes to convert clob to blob and remove the discriminator column (virag)
 OOZIE-1504 Allow specifying a fixed instance as the start instance of a data-in (puru via rohini)
 OOZIE-1576 Add documentation for Oozie Sqoop CLI (bowenzhangusa via rkanter)