You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by bz...@apache.org on 2014/07/01 22:04:03 UTC

git commit: OOZIE-1532 Purging should remove completed children job for long running coordinator jobs (bzhang)

Repository: oozie
Updated Branches:
  refs/heads/master f1b71a996 -> e20901133


OOZIE-1532 Purging should remove completed children job for long running coordinator jobs (bzhang)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/e2090113
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/e2090113
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/e2090113

Branch: refs/heads/master
Commit: e20901133e43ff2fce39512d5c78abc588acbe39
Parents: f1b71a9
Author: Bowen Zhang <bo...@yahoo.com>
Authored: Tue Jul 1 13:02:55 2014 -0700
Committer: Bowen Zhang <bo...@yahoo.com>
Committed: Tue Jul 1 13:03:45 2014 -0700

----------------------------------------------------------------------
 .../org/apache/oozie/CoordinatorActionBean.java |   2 +
 .../java/org/apache/oozie/WorkflowJobBean.java  |   2 +
 .../org/apache/oozie/command/PurgeXCommand.java |  60 +++++++-
 .../jpa/CoordActionsDeleteJPAExecutor.java      |  92 +++++++++++
 .../CoordJobGetActionsSubsetJPAExecutor.java    |  31 +++-
 .../executor/jpa/WorkflowJobQueryExecutor.java  |  17 +-
 .../org/apache/oozie/service/PurgeService.java  |  15 +-
 core/src/main/resources/oozie-default.xml       |  10 ++
 .../apache/oozie/command/TestPurgeXCommand.java | 127 +++++++++++++++
 .../jpa/TestCoordActionsDeleteJPAExecutor.java  | 154 +++++++++++++++++++
 .../jpa/TestWorkflowJobQueryExecutor.java       |  24 +++
 release-log.txt                                 |   1 +
 12 files changed, 525 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/e2090113/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java b/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
index 51eaf2d..795bf63 100644
--- a/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
+++ b/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
@@ -80,6 +80,8 @@ import org.json.simple.JSONObject;
 
         @NamedQuery(name = "DELETE_ACTIONS_FOR_COORDINATOR", query = "delete from CoordinatorActionBean a where a.jobId = :jobId"),
 
+        @NamedQuery(name = "DELETE_ACTIONS_FOR_LONG_RUNNING_COORDINATOR", query = "delete from CoordinatorActionBean a where a.id = :actionId"),
+
         @NamedQuery(name = "DELETE_UNSCHEDULED_ACTION", query = "delete from CoordinatorActionBean a where a.id = :id and (a.statusStr = 'WAITING' OR a.statusStr = 'READY')"),
 
         // Query used by XTestcase to setup tables

http://git-wip-us.apache.org/repos/asf/oozie/blob/e2090113/core/src/main/java/org/apache/oozie/WorkflowJobBean.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/WorkflowJobBean.java b/core/src/main/java/org/apache/oozie/WorkflowJobBean.java
index 5fbee82..36bc4ae 100644
--- a/core/src/main/java/org/apache/oozie/WorkflowJobBean.java
+++ b/core/src/main/java/org/apache/oozie/WorkflowJobBean.java
@@ -85,6 +85,8 @@ import org.json.simple.JSONObject;
 
     @NamedQuery(name = "GET_COMPLETED_WORKFLOWS_WITH_NO_PARENT_OLDER_THAN", query = "select w.id from WorkflowJobBean w where w.endTimestamp < :endTime and w.parentId is null"),
 
+    @NamedQuery(name = "GET_COMPLETED_COORD_WORKFLOWS_OLDER_THAN", query = "select w.id, w.parentId from WorkflowJobBean w where w.endTimestamp < :endTime and w.parentId like '%C@%'"),
+
     @NamedQuery(name = "GET_WORKFLOW", query = "select OBJECT(w) from WorkflowJobBean w where w.id = :id"),
 
     @NamedQuery(name = "GET_WORKFLOW_STARTTIME", query = "select w.id, w.startTimestamp from WorkflowJobBean w where w.id = :id"),

http://git-wip-us.apache.org/repos/asf/oozie/blob/e2090113/core/src/main/java/org/apache/oozie/command/PurgeXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/PurgeXCommand.java b/core/src/main/java/org/apache/oozie/command/PurgeXCommand.java
index 9973719..da94d39 100644
--- a/core/src/main/java/org/apache/oozie/command/PurgeXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/PurgeXCommand.java
@@ -22,10 +22,14 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.oozie.WorkflowJobBean;
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.XException;
 import org.apache.oozie.executor.jpa.BundleJobsDeleteJPAExecutor;
 import org.apache.oozie.executor.jpa.BundleJobsGetForPurgeJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordActionsDeleteJPAExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
 import org.apache.oozie.executor.jpa.CoordJobsCountNotForPurgeFromParentIdJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobsDeleteJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobsGetForPurgeJPAExecutor;
@@ -50,21 +54,30 @@ public class PurgeXCommand extends XCommand<Void> {
     private int wfOlderThan;
     private int coordOlderThan;
     private int bundleOlderThan;
+    private boolean purgeOldCoordAction = false;
     private final int limit;
     private List<String> wfList;
+    private List<String> coordActionList;
     private List<String> coordList;
     private List<String> bundleList;
     private int wfDel;
     private int coordDel;
+    private int coordActionDel;
     private int bundleDel;
 
     public PurgeXCommand(int wfOlderThan, int coordOlderThan, int bundleOlderThan, int limit) {
+        this(wfOlderThan, coordOlderThan, bundleOlderThan, limit, false);
+    }
+
+    public PurgeXCommand(int wfOlderThan, int coordOlderThan, int bundleOlderThan, int limit, boolean purgeOldCoordAction) {
         super("purge", "purge", 0);
         this.wfOlderThan = wfOlderThan;
         this.coordOlderThan = coordOlderThan;
         this.bundleOlderThan = bundleOlderThan;
+        this.purgeOldCoordAction = purgeOldCoordAction;
         this.limit = limit;
         wfList = new ArrayList<String>();
+        coordActionList = new ArrayList<String>();
         coordList = new ArrayList<String>();
         bundleList = new ArrayList<String>();
         wfDel = 0;
@@ -87,6 +100,20 @@ public class PurgeXCommand extends XCommand<Void> {
                     size = wfList.size();
                     wfList.addAll(jpaService.execute(new WorkflowJobsGetForPurgeJPAExecutor(wfOlderThan, wfList.size(), limit)));
                 } while(size != wfList.size());
+                if (purgeOldCoordAction) {
+                    LOG.debug("Purging workflows of long running coordinators is turned on");
+                    do {
+                        size = coordActionList.size();
+                        long olderThan = wfOlderThan;
+                        List<WorkflowJobBean> jobBeans = WorkflowJobQueryExecutor.getInstance().getList(
+                                WorkflowJobQuery.GET_COMPLETED_COORD_WORKFLOWS_OLDER_THAN, olderThan,
+                                coordActionList.size(), limit);
+                        for (WorkflowJobBean bean : jobBeans) {
+                            coordActionList.add(bean.getParentId());
+                            wfList.add(bean.getId());
+                        }
+                    } while(size != coordActionList.size());
+                }
                 do {
                     size = coordList.size();
                     coordList.addAll(jpaService.execute(
@@ -112,7 +139,7 @@ public class PurgeXCommand extends XCommand<Void> {
      */
     @Override
     protected Void execute() throws CommandException {
-        LOG.debug("STARTED Purge to purge Workflow Jobs older than [{0}] days, Coordinator Jobs older than [{1}] days, and Bundle"
+        LOG.info("STARTED Purge to purge Workflow Jobs older than [{0}] days, Coordinator Jobs older than [{1}] days, and Bundle"
                 + "jobs older than [{2}] days.", wfOlderThan, coordOlderThan, bundleOlderThan);
 
         // Process parentless workflows to purge them and their children
@@ -125,6 +152,15 @@ public class PurgeXCommand extends XCommand<Void> {
             }
         }
 
+        // Process coordinator actions of long running coordinators and purge them
+        if (!coordActionList.isEmpty()) {
+            try {
+                purgeCoordActions(coordActionList);
+            }
+            catch (JPAExecutorException je) {
+                throw new CommandException(je);
+            }
+        }
         // Processs parentless coordinators to purge them and their children
         if (!coordList.isEmpty()) {
             try {
@@ -145,7 +181,8 @@ public class PurgeXCommand extends XCommand<Void> {
             }
         }
 
-        LOG.debug("ENDED Purge deleted [{0}] workflows, [{1}] coordinators, [{2}] bundles", wfDel, coordDel, bundleDel);
+        LOG.info("ENDED Purge deleted [{0}] workflows, [{1}] coordinatorActions, [{2}] coordinators, [{3}] bundles",
+                wfDel, coordActionDel, coordDel, bundleDel);
         return null;
     }
 
@@ -158,6 +195,9 @@ public class PurgeXCommand extends XCommand<Void> {
      */
     private void processWorkflows(List<String> wfs) throws JPAExecutorException {
         List<String> wfsToPurge = processWorkflowsHelper(wfs);
+        for (String id: wfsToPurge) {
+            LOG.debug("Purging workflow " + id);
+        }
         purgeWorkflows(wfsToPurge);
     }
 
@@ -212,6 +252,7 @@ public class PurgeXCommand extends XCommand<Void> {
                     new WorkflowJobsCountNotForPurgeFromCoordParentIdJPAExecutor(wfOlderThan, coordId));
             if (numChildrenNotReady == 0) {
                 coordsToPurge.add(coordId);
+                LOG.debug("Purging coordinator " + coordId);
                 // Get all of the direct children for this coord
                 List<String> children = new ArrayList<String>();
                 int size;
@@ -245,6 +286,7 @@ public class PurgeXCommand extends XCommand<Void> {
                     new CoordJobsCountNotForPurgeFromParentIdJPAExecutor(coordOlderThan, bundleId));
             if (numChildrenNotReady == 0) {
                 bundlesToPurge.add(bundleId);
+                LOG.debug("Purging bundle " + bundleId);
                 // Get all of the direct children for this bundle
                 List<String> children = new ArrayList<String>();
                 int size;
@@ -279,6 +321,20 @@ public class PurgeXCommand extends XCommand<Void> {
     }
 
     /**
+     * Purge coordActions of long running coordinators and purge them
+     *
+     * @param coordActions List of coordActions to purge
+     * @throws JPAExecutorException If a JPA executor has a problem
+     */
+    private void purgeCoordActions(List<String> coordActions) throws JPAExecutorException {
+        coordActionDel = coordActions.size();
+        for (int startIndex = 0; startIndex < coordActions.size(); ) {
+            int endIndex = (startIndex + limit < coordActions.size()) ? (startIndex + limit) : coordActions.size();
+            jpaService.execute(new CoordActionsDeleteJPAExecutor(coordActions.subList(startIndex, endIndex)));
+            startIndex = endIndex;
+        }
+    }
+    /**
      * Purge the coordinators in SOME order in batches of size 'limit' (its in reverse order only for convenience)
      *
      * @param coords List of coordinators to purge

http://git-wip-us.apache.org/repos/asf/oozie/blob/e2090113/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionsDeleteJPAExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionsDeleteJPAExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionsDeleteJPAExecutor.java
new file mode 100644
index 0000000..0e00718
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionsDeleteJPAExecutor.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import java.util.Collection;
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.FaultInjection;
+import org.apache.oozie.util.ParamChecker;
+/**
+ * Delete Coord actions of long running coordinators, return the number of actions that were deleted.
+ */
+public class CoordActionsDeleteJPAExecutor implements JPAExecutor<Integer> {
+    private Collection<String> deleteList;
+
+    /**
+     * Initialize the JPAExecutor using the delete list of CoordinatorActionBeans
+     * @param deleteList
+     */
+    public CoordActionsDeleteJPAExecutor(Collection<String> deleteList) {
+        this.deleteList = deleteList;
+    }
+
+    public CoordActionsDeleteJPAExecutor() {
+    }
+
+    /**
+     * Sets the delete list for CoordinatorActionBeans
+     *
+     * @param deleteList
+     */
+    public void setDeleteList(Collection<String> deleteList) {
+        this.deleteList = deleteList;
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see org.apache.oozie.executor.jpa.JPAExecutor#getName()
+     */
+    @Override
+    public String getName() {
+        return "CoordActionsDeleteJPAExecutor";
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see org.apache.oozie.executor.jpa.JPAExecutor#execute(javax.persistence.
+     * EntityManager)
+     */
+    @Override
+    public Integer execute(EntityManager em) throws JPAExecutorException {
+        int actionsDeleted = 0;
+        try {
+            // Only used by test cases to check for rollback of transaction
+            FaultInjection.activate("org.apache.oozie.command.SkipCommitFaultInjection");
+            if (deleteList != null) {
+                for (String id : deleteList) {
+                    ParamChecker.notNull(id, "Coordinator Action Id");
+
+                    // Delete coordAction
+                    Query g = em.createNamedQuery("DELETE_ACTIONS_FOR_LONG_RUNNING_COORDINATOR");
+                    g.setParameter("actionId", id);
+                    g.executeUpdate();
+                    actionsDeleted++;
+                }
+            }
+        }
+        catch (Exception e) {
+            throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
+        }
+        return actionsDeleted;
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/e2090113/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionsSubsetJPAExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionsSubsetJPAExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionsSubsetJPAExecutor.java
index 873f081..420a466 100644
--- a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionsSubsetJPAExecutor.java
+++ b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionsSubsetJPAExecutor.java
@@ -101,6 +101,7 @@ public class CoordJobGetActionsSubsetJPAExecutor implements JPAExecutor<List<Coo
             StringBuilder statusClause = new StringBuilder();
             getStatusClause(statusClause, filterMap.get(CoordinatorEngine.POSITIVE_FILTER), true);
             getStatusClause(statusClause, filterMap.get(CoordinatorEngine.NEGATIVE_FILTER), false);
+            getIdClause(statusClause);
             // Insert 'where' before 'order by'
             sbTotal.insert(offset, statusClause);
             q = em.createQuery(sbTotal.toString());
@@ -108,8 +109,7 @@ public class CoordJobGetActionsSubsetJPAExecutor implements JPAExecutor<List<Coo
         if (desc) {
             q = em.createQuery(q.toString().concat(" desc"));
         }
-        q.setParameter("jobId", coordJobId);
-        q.setFirstResult(start - 1);
+        q.setParameter("jobId", coordJobId);;
         q.setMaxResults(len);
         return q;
     }
@@ -124,15 +124,15 @@ public class CoordJobGetActionsSubsetJPAExecutor implements JPAExecutor<List<Coo
             for (String statusVal : filterList) {
                 if (!isStatus) {
                     if (positive) {
-                        sb.append(" and a.statusStr IN (\'" + statusVal + "\'");
+                        sb.append(" and a.statusStr IN (\'").append(statusVal).append("\'");
                     }
                     else {
-                        sb.append(" and a.statusStr NOT IN (\'" + statusVal + "\'");
+                        sb.append(" and a.statusStr NOT IN (\'").append(statusVal).append("\'");
                     }
                     isStatus = true;
                 }
                 else {
-                    sb.append(",\'" + statusVal + "\'");
+                    sb.append(",\'").append(statusVal).append("\'");
                 }
             }
             sb.append(") ");
@@ -140,6 +140,27 @@ public class CoordJobGetActionsSubsetJPAExecutor implements JPAExecutor<List<Coo
         return sb;
     }
 
+    // Form the where clause for coord action ids
+    private StringBuilder getIdClause(StringBuilder sb) {
+        if (sb == null) {
+            sb = new StringBuilder();
+        }
+        sb.append("and a.id IN (");
+        boolean isFirst = true;
+        for (int i = start; i < start + len; i++) {
+            if (isFirst) {
+                sb.append("\'").append(coordJobId).append("@").append(i).append("\'");
+                isFirst = false;
+            }
+            else {
+                sb.append(", \'").append(coordJobId).append("@").append(i).append("\'");
+            }
+        }
+        sb.append(") ");
+
+        return sb;
+    }
+
     private CoordinatorActionBean getBeanForRunningCoordAction(Object arr[]) {
         CoordinatorActionBean bean = new CoordinatorActionBean();
         if (arr[0] != null) {

http://git-wip-us.apache.org/repos/asf/oozie/blob/e2090113/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobQueryExecutor.java
index e2a9438..733fd64 100644
--- a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobQueryExecutor.java
+++ b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobQueryExecutor.java
@@ -59,7 +59,8 @@ public class WorkflowJobQueryExecutor extends QueryExecutor<WorkflowJobBean, Wor
         GET_WORKFLOW_KILL,
         GET_WORKFLOW_RESUME,
         GET_WORKFLOW_STATUS,
-        GET_WORKFLOWS_PARENT_COORD_RERUN
+        GET_WORKFLOWS_PARENT_COORD_RERUN,
+        GET_COMPLETED_COORD_WORKFLOWS_OLDER_THAN
     };
 
     private static WorkflowJobQueryExecutor instance = new WorkflowJobQueryExecutor();
@@ -185,6 +186,14 @@ public class WorkflowJobQueryExecutor extends QueryExecutor<WorkflowJobBean, Wor
             case GET_WORKFLOWS_PARENT_COORD_RERUN:
                 query.setParameter("parentId", parameters[0]);
                 break;
+            case GET_COMPLETED_COORD_WORKFLOWS_OLDER_THAN:
+                long dayInMs = 24 * 60 * 60 * 1000;
+                long olderThanDays = (Long) parameters[0];
+                Timestamp maxEndtime = new Timestamp(System.currentTimeMillis() - (olderThanDays * dayInMs));
+                query.setParameter("endTime", maxEndtime);
+                query.setFirstResult((Integer) parameters[1]);
+                query.setMaxResults((Integer) parameters[2]);
+                break;
             default:
                 throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for "
                         + namedQuery.name());
@@ -322,6 +331,12 @@ public class WorkflowJobQueryExecutor extends QueryExecutor<WorkflowJobBean, Wor
                 bean.setStartTime(DateUtils.toDate((Timestamp) arr[2]));
                 bean.setEndTime(DateUtils.toDate((Timestamp) arr[3]));
                 break;
+            case GET_COMPLETED_COORD_WORKFLOWS_OLDER_THAN:
+                bean = new WorkflowJobBean();
+                arr = (Object[]) ret;
+                bean.setId((String) arr[0]);
+                bean.setParentId((String) arr[1]);
+                break;
             default:
                 throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct job bean for "
                         + namedQuery.name());

http://git-wip-us.apache.org/repos/asf/oozie/blob/e2090113/core/src/main/java/org/apache/oozie/service/PurgeService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/PurgeService.java b/core/src/main/java/org/apache/oozie/service/PurgeService.java
index 9cc3ebe..9eeee30 100644
--- a/core/src/main/java/org/apache/oozie/service/PurgeService.java
+++ b/core/src/main/java/org/apache/oozie/service/PurgeService.java
@@ -32,6 +32,7 @@ public class PurgeService implements Service {
     public static final String CONF_OLDER_THAN = CONF_PREFIX + "older.than";
     public static final String COORD_CONF_OLDER_THAN = CONF_PREFIX + "coord.older.than";
     public static final String BUNDLE_CONF_OLDER_THAN = CONF_PREFIX + "bundle.older.than";
+    public static final String PURGE_OLD_COORD_ACTION = CONF_PREFIX + "purge.old.coord.action";
     /**
      * Time interval, in seconds, at which the purge jobs service will be scheduled to run.
      */
@@ -47,6 +48,7 @@ public class PurgeService implements Service {
         private int coordOlderThan;
         private int bundleOlderThan;
         private int limit;
+        private boolean purgeOldCoordAction = false;
 
         public PurgeRunnable(int wfOlderThan, int coordOlderThan, int bundleOlderThan, int limit) {
             this.wfOlderThan = wfOlderThan;
@@ -55,11 +57,20 @@ public class PurgeService implements Service {
             this.limit = limit;
         }
 
+        public PurgeRunnable(int wfOlderThan, int coordOlderThan, int bundleOlderThan, int limit,
+                             boolean purgeOldCoordAction) {
+            this.wfOlderThan = wfOlderThan;
+            this.coordOlderThan = coordOlderThan;
+            this.bundleOlderThan = bundleOlderThan;
+            this.limit = limit;
+            this.purgeOldCoordAction = purgeOldCoordAction;
+        }
+
         public void run() {
             // Only queue the purge command if this is the first server
             if (Services.get().get(JobsConcurrencyService.class).isFirstServer()) {
                 Services.get().get(CallableQueueService.class).queue(
-                        new PurgeXCommand(wfOlderThan, coordOlderThan, bundleOlderThan, limit));
+                        new PurgeXCommand(wfOlderThan, coordOlderThan, bundleOlderThan, limit, purgeOldCoordAction));
             }
         }
 
@@ -75,7 +86,7 @@ public class PurgeService implements Service {
         Configuration conf = services.getConf();
         Runnable purgeJobsRunnable = new PurgeRunnable(conf.getInt(
                 CONF_OLDER_THAN, 30), conf.getInt(COORD_CONF_OLDER_THAN, 7), conf.getInt(BUNDLE_CONF_OLDER_THAN, 7),
-                                      conf.getInt(PURGE_LIMIT, 100));
+                                      conf.getInt(PURGE_LIMIT, 100), conf.getBoolean(PURGE_OLD_COORD_ACTION, false));
         services.get(SchedulerService.class).schedule(purgeJobsRunnable, 10, conf.getInt(CONF_PURGE_INTERVAL, 3600),
                                                       SchedulerService.Unit.SEC);
     }

http://git-wip-us.apache.org/repos/asf/oozie/blob/e2090113/core/src/main/resources/oozie-default.xml
----------------------------------------------------------------------
diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml
index 61ac388..982c82f 100644
--- a/core/src/main/resources/oozie-default.xml
+++ b/core/src/main/resources/oozie-default.xml
@@ -287,6 +287,16 @@
             Completed bundle jobs older than this value, in days, will be purged by the PurgeService.
         </description>
     </property>
+
+    <property>
+        <name>oozie.service.PurgeService.purge.old.coord.action</name>
+        <value>false</value>
+        <description>
+            Whether to purge completed workflows and their corresponding coordinator actions
+            of long running coordinator jobs if the completed workflow jobs are older than the value
+            specified in oozie.service.PurgeService.older.than.
+        </description>
+    </property>
     
     <property>
 		<name>oozie.service.PurgeService.purge.limit</name>

http://git-wip-us.apache.org/repos/asf/oozie/blob/e2090113/core/src/test/java/org/apache/oozie/command/TestPurgeXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/TestPurgeXCommand.java b/core/src/test/java/org/apache/oozie/command/TestPurgeXCommand.java
index 666271e..979cbbc 100644
--- a/core/src/test/java/org/apache/oozie/command/TestPurgeXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/TestPurgeXCommand.java
@@ -678,6 +678,133 @@ public class TestPurgeXCommand extends XDataTestCase {
     }
 
     /**
+     * Test : The workflow should get purged, but the coordinator parent shouldn't get purged -->
+     * the workflow and corresponding coord actions will get purged after we turn the purge.old.coord.action on
+     * Coordinator itself will not be purged
+     *
+     * @throws Exception
+     */
+    public void testPurgeLongRunningCoordWithWFChild() throws Exception {
+        JPAService jpaService = Services.get().get(JPAService.class);
+        assertNotNull(jpaService);
+
+        CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
+        WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
+        WorkflowActionBean wfAction = addRecordToWfActionTable(wfJob.getId(), "1", WorkflowAction.Status.OK);
+        CoordinatorActionBean coordAction = addRecordToCoordActionTable(coordJob.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
+                "coord-action-get.xml", wfJob.getId(), "SUCCEEDED", 0);
+
+        WorkflowJobGetJPAExecutor wfJobGetCmd = new WorkflowJobGetJPAExecutor(wfJob.getId());
+        WorkflowActionGetJPAExecutor wfActionGetCmd = new WorkflowActionGetJPAExecutor(wfAction.getId());
+        CoordJobGetJPAExecutor coordJobGetCmd = new CoordJobGetJPAExecutor(coordJob.getId());
+        CoordActionGetJPAExecutor coordActionGetCmd = new CoordActionGetJPAExecutor(coordAction.getId());
+
+        wfJob = jpaService.execute(wfJobGetCmd);
+        wfAction = jpaService.execute(wfActionGetCmd);
+        coordJob = jpaService.execute(coordJobGetCmd);
+        coordAction = jpaService.execute(coordActionGetCmd);
+        assertEquals(WorkflowJob.Status.SUCCEEDED, wfJob.getStatus());
+        assertEquals(WorkflowAction.Status.OK, wfAction.getStatus());
+        assertEquals(CoordinatorJob.Status.RUNNING, coordJob.getStatus());
+        assertEquals(CoordinatorAction.Status.SUCCEEDED, coordAction.getStatus());
+
+        new PurgeXCommand(7, getNumDaysToNotBePurged(coordJob.getLastModifiedTime()), 1, 10, true).call();
+
+        try {
+            jpaService.execute(coordJobGetCmd);
+        }
+        catch (JPAExecutorException je) {
+            fail("Coordinator Job should not have been purged");
+        }
+
+        try {
+            jpaService.execute(coordActionGetCmd);
+            fail("Coordinator Action should have been purged");
+        }
+        catch (JPAExecutorException je) {
+            assertEquals(ErrorCode.E0605, je.getErrorCode());
+        }
+
+        try {
+            jpaService.execute(wfJobGetCmd);
+            fail("Workflow Job should have been purged");
+        }
+        catch (JPAExecutorException je) {
+            assertEquals(ErrorCode.E0604, je.getErrorCode());
+        }
+
+        try {
+            jpaService.execute(wfActionGetCmd);
+            fail("Workflow Action should have been purged");
+        }
+        catch (JPAExecutorException je) {
+            assertEquals(ErrorCode.E0605, je.getErrorCode());
+        }
+    }
+
+    /**
+     * Test : The workflow should get purged, but the coordinator parent shouldn't get purged -->
+     * the workflow and corresponding coord actions will NOT get purged after we turn the purge.old.coord.action off
+     * Neither will be purged
+     *
+     * @throws Exception
+     */
+    public void testPurgeLongRunningCoordWithWFChildNegative() throws Exception {
+        JPAService jpaService = Services.get().get(JPAService.class);
+        assertNotNull(jpaService);
+
+        CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
+        WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
+        WorkflowActionBean wfAction = addRecordToWfActionTable(wfJob.getId(), "1", WorkflowAction.Status.OK);
+        CoordinatorActionBean coordAction = addRecordToCoordActionTable(coordJob.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
+                "coord-action-get.xml", wfJob.getId(), "SUCCEEDED", 0);
+
+        WorkflowJobGetJPAExecutor wfJobGetCmd = new WorkflowJobGetJPAExecutor(wfJob.getId());
+        WorkflowActionGetJPAExecutor wfActionGetCmd = new WorkflowActionGetJPAExecutor(wfAction.getId());
+        CoordJobGetJPAExecutor coordJobGetCmd = new CoordJobGetJPAExecutor(coordJob.getId());
+        CoordActionGetJPAExecutor coordActionGetCmd = new CoordActionGetJPAExecutor(coordAction.getId());
+
+        wfJob = jpaService.execute(wfJobGetCmd);
+        wfAction = jpaService.execute(wfActionGetCmd);
+        coordJob = jpaService.execute(coordJobGetCmd);
+        coordAction = jpaService.execute(coordActionGetCmd);
+        assertEquals(WorkflowJob.Status.SUCCEEDED, wfJob.getStatus());
+        assertEquals(WorkflowAction.Status.OK, wfAction.getStatus());
+        assertEquals(CoordinatorJob.Status.RUNNING, coordJob.getStatus());
+        assertEquals(CoordinatorAction.Status.SUCCEEDED, coordAction.getStatus());
+
+        new PurgeXCommand(7, getNumDaysToNotBePurged(coordJob.getLastModifiedTime()), 1, 10, false).call();
+
+        try {
+            jpaService.execute(coordJobGetCmd);
+        }
+        catch (JPAExecutorException je) {
+            fail("Coordinator Job should not have been purged");
+        }
+
+        try {
+            jpaService.execute(coordActionGetCmd);
+        }
+        catch (JPAExecutorException je) {
+            fail("Coordinator Action should not have been purged");
+        }
+
+        try {
+            jpaService.execute(wfJobGetCmd);
+        }
+        catch (JPAExecutorException je) {
+            fail("Workflow Job should not have been purged");
+        }
+
+        try {
+            jpaService.execute(wfActionGetCmd);
+        }
+        catch (JPAExecutorException je) {
+            fail("Workflow Action should not have been purged");
+        }
+    }
+
+    /**
      * Test : The workflow should not get purged, but the coordinator parent should get purged --> neither will get purged
      *
      * @throws Exception

http://git-wip-us.apache.org/repos/asf/oozie/blob/e2090113/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordActionsDeleteJPAExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordActionsDeleteJPAExecutor.java b/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordActionsDeleteJPAExecutor.java
new file mode 100644
index 0000000..02392dc
--- /dev/null
+++ b/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordActionsDeleteJPAExecutor.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.executor.jpa;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.FaultInjection;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.command.SkipCommitFaultInjection;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+public class TestCoordActionsDeleteJPAExecutor extends XDataTestCase {
+    Services services;
+    private String[] excludedServices = { "org.apache.oozie.service.StatusTransitService",
+            "org.apache.oozie.service.PauseTransitService", "org.apache.oozie.service.PurgeService",
+            "org.apache.oozie.service.CoordMaterializeTriggerService", "org.apache.oozie.service.RecoveryService" };
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        services = new Services();
+        setClassesToBeExcluded(services.getConf(), excludedServices);
+        services.init();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        services.destroy();
+        super.tearDown();
+    }
+
+    public void testDeleteCoordActions() throws Exception {
+        CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
+        CoordinatorActionBean action1 = addRecordToCoordActionTable(job.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
+                "coord-action-get.xml", 0);
+        CoordinatorActionBean action2 = addRecordToCoordActionTable(job.getId(), 2, CoordinatorAction.Status.SUCCEEDED,
+                "coord-action-get.xml", 0);
+        CoordinatorActionBean action3 = addRecordToCoordActionTable(job.getId(), 3, CoordinatorAction.Status.SUCCEEDED,
+                "coord-action-get.xml", 0);
+
+        JPAService jpaService = Services.get().get(JPAService.class);
+        assertNotNull(jpaService);
+
+        List<String> deleteList = new ArrayList<String>();
+        deleteList.add(action1.getId());
+        deleteList.add(action2.getId());
+        deleteList.add(action3.getId());
+
+        jpaService.execute(new CoordActionsDeleteJPAExecutor(deleteList));
+
+        try {
+            jpaService.execute(new CoordActionGetJPAExecutor(action1.getId()));
+            fail("CoordinatorAction action1 should have been deleted");
+        }
+        catch (JPAExecutorException je) {
+            assertEquals(ErrorCode.E0605, je.getErrorCode());
+        }
+
+        try {
+            jpaService.execute(new CoordActionGetJPAExecutor(action2.getId()));
+            fail("CoordinatorAction action1 should have been deleted");
+        }
+        catch (JPAExecutorException je) {
+            assertEquals(ErrorCode.E0605, je.getErrorCode());
+        }
+
+        try {
+            jpaService.execute(new CoordActionGetJPAExecutor(action3.getId()));
+            fail("CoordinatorAction action1 should have been deleted");
+        }
+        catch (JPAExecutorException je) {
+            assertEquals(ErrorCode.E0605, je.getErrorCode());
+        }
+    }
+
+    public void testDeleteCoordActionsRollback() throws Exception {
+        CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
+        CoordinatorActionBean action1 = addRecordToCoordActionTable(job.getId(), 1, CoordinatorAction.Status.SUCCEEDED,
+                "coord-action-get.xml", 0);
+        CoordinatorActionBean action2 = addRecordToCoordActionTable(job.getId(), 2, CoordinatorAction.Status.SUCCEEDED,
+                "coord-action-get.xml", 0);
+        CoordinatorActionBean action3 = addRecordToCoordActionTable(job.getId(), 3, CoordinatorAction.Status.SUCCEEDED,
+                "coord-action-get.xml", 0);
+
+        JPAService jpaService = Services.get().get(JPAService.class);
+        assertNotNull(jpaService);
+
+        List<String> deleteList = new ArrayList<String>();
+        deleteList.add(action1.getId());
+        deleteList.add(action2.getId());
+        deleteList.add(action3.getId());
+
+        try {
+            // set fault injection to true, so transaction is roll backed
+            setSystemProperty(FaultInjection.FAULT_INJECTION, "true");
+            setSystemProperty(SkipCommitFaultInjection.ACTION_FAILOVER_FAULT_INJECTION, "true");
+
+            try {
+                jpaService.execute(new CoordActionsDeleteJPAExecutor(deleteList));
+                fail("Should have skipped commit for failover testing");
+            }
+            catch (RuntimeException re) {
+                assertEquals("Skipping Commit for Failover Testing", re.getMessage());
+            }
+        }
+        finally {
+            // Remove fault injection
+            FaultInjection.deactivate("org.apache.oozie.command.SkipCommitFaultInjection");
+        }
+
+        try {
+            jpaService.execute(new CoordActionGetJPAExecutor(action1.getId()));
+        }
+        catch (JPAExecutorException je) {
+            fail("Coordinator Action1 should not have been deleted");
+        }
+
+        try {
+            jpaService.execute(new CoordActionGetJPAExecutor(action2.getId()));
+        }
+        catch (JPAExecutorException je) {
+            fail("Coordinator Action2 should not have been deleted");
+        }
+
+        try {
+            jpaService.execute(new CoordActionGetJPAExecutor(action3.getId()));
+        }
+        catch (JPAExecutorException je) {
+            fail("Coordinator Action3 should not have been deleted");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/e2090113/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobQueryExecutor.java b/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobQueryExecutor.java
index 7a10685..02fabd2 100644
--- a/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobQueryExecutor.java
+++ b/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobQueryExecutor.java
@@ -18,8 +18,10 @@
 package org.apache.oozie.executor.jpa;
 
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.Date;
 import java.util.List;
+import java.util.HashSet;
 
 import javax.persistence.EntityManager;
 import javax.persistence.Query;
@@ -35,6 +37,7 @@ import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.test.XDataTestCase;
+import org.apache.oozie.util.DateUtils;
 import org.apache.oozie.workflow.WorkflowInstance;
 
 public class TestWorkflowJobQueryExecutor extends XDataTestCase {
@@ -339,5 +342,26 @@ public class TestWorkflowJobQueryExecutor extends XDataTestCase {
         assertEquals(2, wfsForRerun.size());
         assertEquals(wfJob1.getId(), wfsForRerun.get(0).getId());
         assertEquals(wfJob2.getId(), wfsForRerun.get(1).getId());
+
+        // GET_COMPLETED_COORD_WORKFLOWS_OLDER_THAN
+        coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, null, null, false,
+                false, 1);
+        wfJob1 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED,
+                coordJob.getId() + "@1");
+        wfJob1.setEndTime(DateUtils.parseDateOozieTZ("2009-12-18T03:00Z"));
+        WorkflowJobQueryExecutor.getInstance().executeUpdate(WorkflowJobQuery.UPDATE_WORKFLOW, wfJob1);
+        wfJob2 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED,
+                coordJob.getId() + "@2");
+        wfJob2.setEndTime(DateUtils.parseDateOozieTZ("2009-12-18T03:00Z"));
+        WorkflowJobQueryExecutor.getInstance().executeUpdate(WorkflowJobQuery.UPDATE_WORKFLOW, wfJob2);
+        long olderthan = 30;
+        List<WorkflowJobBean> jobBeans = WorkflowJobQueryExecutor.getInstance().getList(
+                WorkflowJobQuery.GET_COMPLETED_COORD_WORKFLOWS_OLDER_THAN, olderthan,
+                0, 10);
+
+        HashSet<String> jobIds = new HashSet<String>(Arrays.asList(wfJob1.getId(), wfJob2.getId()));
+        assertEquals(2, jobBeans.size());
+        assertTrue(jobIds.contains(jobBeans.get(0).getId()));
+        assertTrue(jobIds.contains(jobBeans.get(1).getId()));
     }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/e2090113/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index bbfc50e..755c9f0 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.1.0 release (trunk - unreleased)
 
+OOZIE-1532 Purging should remove completed children job for long running coordinator jobs (bzhang)
 OOZIE-1909 log prefix information missing in JavaActionExecutor.check (ryota)
 OOZIE-1907 DB upgrade from 3.3.0 to trunk fails on derby (rkanter)
 OOZIE-1877 Setting to fail oozie server startup in case of sharelib misconfiguration (puru via rohini)