You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by ry...@apache.org on 2014/06/16 10:29:07 UTC

git commit: OOZIE-1492 Make sure HA works with HCat (ryota)

Repository: oozie
Updated Branches:
  refs/heads/master ab3a17497 -> 198f5c2a5


OOZIE-1492 Make sure HA works with HCat (ryota)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/198f5c2a
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/198f5c2a
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/198f5c2a

Branch: refs/heads/master
Commit: 198f5c2a5e7fce71d6fa638d2d26fa006fee171a
Parents: ab3a174
Author: egashira <ry...@yahoo.com>
Authored: Mon Jun 16 01:22:04 2014 -0700
Committer: egashira <ry...@yahoo.com>
Committed: Mon Jun 16 01:23:29 2014 -0700

----------------------------------------------------------------------
 .../org/apache/oozie/CoordinatorActionBean.java |   3 +
 .../coord/CoordPushDependencyCheckXCommand.java |   4 +
 .../hcat/EhcacheHCatDependencyCache.java        |  54 ++++
 .../dependency/hcat/HCatDependencyCache.java    |  12 +
 .../hcat/SimpleHCatDependencyCache.java         | 118 ++++++++-
 .../executor/jpa/CoordActionQueryExecutor.java  |  21 +-
 .../oozie/service/JobsConcurrencyService.java   |   8 +
 .../PartitionDependencyManagerService.java      |  93 +++++++
 .../oozie/service/ZKJobsConcurrencyService.java |  10 +
 .../TestCoordActionInputCheckXCommand.java      |   5 +-
 .../TestCoordPushDependencyCheckXCommand.java   |   1 -
 ...TestHAPartitionDependencyManagerEhCache.java |  42 +++
 ...TestHAPartitionDependencyManagerService.java | 257 +++++++++++++++++++
 .../TestPartitionDependencyManagerEhcache.java  |   1 -
 .../TestPartitionDependencyManagerService.java  |  36 +--
 .../java/org/apache/oozie/test/XTestCase.java   |  10 +-
 .../java/org/apache/oozie/test/ZKXTestCase.java |   2 +-
 release-log.txt                                 |   1 +
 18 files changed, 653 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/198f5c2a/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java b/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
index 8cbcc4f..51eaf2d 100644
--- a/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
+++ b/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
@@ -136,6 +136,9 @@ import org.json.simple.JSONObject;
         // Query to retrieve status of Coordinator actions
         @NamedQuery(name = "GET_COORD_ACTIONS_STATUS_UNIGNORED", query = "select a.statusStr from CoordinatorActionBean a where a.jobId = :jobId AND a.statusStr <> 'IGNORED'"),
 
+        // Query to retrieve status of Coordinator actions
+        @NamedQuery(name = "GET_COORD_ACTION_STATUS", query = "select a.statusStr from CoordinatorActionBean a where a.id = :id"),
+
         @NamedQuery(name = "GET_COORD_ACTION_FOR_COORD_JOB_BY_ACTION_NUMBER", query = "select a.id from CoordinatorActionBean a where a.jobId = :jobId AND a.actionNumber = :actionNumber"),
 
         @NamedQuery(name = "GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME", query = "select a.jobId from CoordinatorActionBean a where a.lastModifiedTimestamp >= :lastModifiedTime"),

http://git-wip-us.apache.org/repos/asf/oozie/blob/198f5c2a/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
index 2e5cd47..ae71924 100644
--- a/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
@@ -23,6 +23,7 @@ import java.net.URI;
 import java.util.Arrays;
 import java.util.Date;
 import java.util.List;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.oozie.CoordinatorActionBean;
 import org.apache.oozie.CoordinatorJobBean;
@@ -43,6 +44,7 @@ import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.service.CallableQueueService;
 import org.apache.oozie.service.EventHandlerService;
 import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.PartitionDependencyManagerService;
 import org.apache.oozie.service.RecoveryService;
 import org.apache.oozie.service.Service;
 import org.apache.oozie.service.Services;
@@ -207,6 +209,8 @@ public class CoordPushDependencyCheckXCommand extends CoordinatorXCommand<Void>
 
     protected void onAllPushDependenciesAvailable() throws CommandException {
         coordAction.setPushMissingDependencies("");
+        Services.get().get(PartitionDependencyManagerService.class)
+                .removeCoordActionWithDependenciesAvailable(coordAction.getId());
         if (coordAction.getMissingDependencies() == null || coordAction.getMissingDependencies().length() == 0) {
             Date nominalTime = coordAction.getNominalTime();
             Date currentTime = new Date();

http://git-wip-us.apache.org/repos/asf/oozie/blob/198f5c2a/core/src/main/java/org/apache/oozie/dependency/hcat/EhcacheHCatDependencyCache.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/dependency/hcat/EhcacheHCatDependencyCache.java b/core/src/main/java/org/apache/oozie/dependency/hcat/EhcacheHCatDependencyCache.java
index 6f127c4..5743c15 100644
--- a/core/src/main/java/org/apache/oozie/dependency/hcat/EhcacheHCatDependencyCache.java
+++ b/core/src/main/java/org/apache/oozie/dependency/hcat/EhcacheHCatDependencyCache.java
@@ -17,13 +17,16 @@
  */
 package org.apache.oozie.dependency.hcat;
 
+import java.net.URISyntaxException;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
@@ -472,4 +475,55 @@ public class EhcacheHCatDependencyCache implements HCatDependencyCache, CacheEve
         }
     }
 
+    @Override
+    public void removeNonWaitingCoordActions(Set<String> staleActions) {
+        Iterator<String> serverItr = missingDepsByServer.keySet().iterator();
+        while (serverItr.hasNext()) {
+            String server = serverItr.next();
+            Cache missingCache = missingDepsByServer.get(server);
+            if (missingCache == null) {
+                continue;
+            }
+            synchronized (missingCache) {
+                for (Object key : missingCache.getKeys()) {
+                    Element element = missingCache.get(key);
+                    if (element == null) {
+                        continue;
+                    }
+                    Collection<WaitingAction> waitingActions = ((WaitingActions) element.getObjectValue())
+                            .getWaitingActions();
+                    Iterator<WaitingAction> wactionItr = waitingActions.iterator();
+                    HCatURI hcatURI = null;
+                    while(wactionItr.hasNext()) {
+                        WaitingAction waction = wactionItr.next();
+                        if(staleActions.contains(waction.getActionID())) {
+                            try {
+                                hcatURI = new HCatURI(waction.getDependencyURI());
+                                wactionItr.remove();
+                            }
+                            catch (URISyntaxException e) {
+                                continue;
+                            }
+                        }
+                    }
+                    if (waitingActions.isEmpty() && hcatURI != null) {
+                        missingCache.remove(key);
+                        // Decrement partition key pattern count if the cache entry is removed
+                        SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap());
+                        String partKeys = sortedPKV.getPartKeys();
+                        String tableKey = hcatURI.getServer() + TABLE_DELIMITER + hcatURI.getDb() + TABLE_DELIMITER
+                                + hcatURI.getTable();
+                        String hcatURIStr = hcatURI.toURIString();
+                        decrementPartKeyPatternCount(tableKey, partKeys, hcatURIStr);
+                    }
+                }
+            }
+        }
+    }
+
+    @Override
+    public void removeCoordActionWithDependenciesAvailable(String coordAction) {
+        // to be implemented when reverse-lookup data structure for purging is added
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/198f5c2a/core/src/main/java/org/apache/oozie/dependency/hcat/HCatDependencyCache.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/dependency/hcat/HCatDependencyCache.java b/core/src/main/java/org/apache/oozie/dependency/hcat/HCatDependencyCache.java
index df3afd3..e1e770f 100644
--- a/core/src/main/java/org/apache/oozie/dependency/hcat/HCatDependencyCache.java
+++ b/core/src/main/java/org/apache/oozie/dependency/hcat/HCatDependencyCache.java
@@ -19,6 +19,7 @@ package org.apache.oozie.dependency.hcat;
 
 import java.util.Collection;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.oozie.util.HCatURI;
@@ -89,4 +90,15 @@ public interface HCatDependencyCache {
      * Destroy the cache
      */
     public void destroy();
+
+    /**
+     * Purge stale actions
+     */
+    public void removeNonWaitingCoordActions(Set<String> coordActions);
+
+    /**
+     * Remove coordAction when all dependencies met
+     */
+    public void removeCoordActionWithDependenciesAvailable(String coordAction);
+
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/198f5c2a/core/src/main/java/org/apache/oozie/dependency/hcat/SimpleHCatDependencyCache.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/dependency/hcat/SimpleHCatDependencyCache.java b/core/src/main/java/org/apache/oozie/dependency/hcat/SimpleHCatDependencyCache.java
index e8e3ebc..08aa8f9 100644
--- a/core/src/main/java/org/apache/oozie/dependency/hcat/SimpleHCatDependencyCache.java
+++ b/core/src/main/java/org/apache/oozie/dependency/hcat/SimpleHCatDependencyCache.java
@@ -17,14 +17,17 @@
  */
 package org.apache.oozie.dependency.hcat;
 
+import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
@@ -51,13 +54,16 @@ public class SimpleHCatDependencyCache implements HCatDependencyCache {
      */
     private ConcurrentMap<String, Collection<String>> availableDeps;
 
-    // TODO:
-    // Gather and print stats on cache hits and misses.
+    /**
+     * Map of actionIDs and partitions for reverse-lookup in purging
+     */
+    private ConcurrentMap<String, ConcurrentMap<String, Collection<String>>> actionPartitionMap;
 
     @Override
     public void init(Configuration conf) {
         missingDeps = new ConcurrentHashMap<String, ConcurrentMap<String, Map<String, Collection<WaitingAction>>>>();
         availableDeps = new ConcurrentHashMap<String, Collection<String>>();
+        actionPartitionMap = new ConcurrentHashMap<String, ConcurrentMap<String, Collection<String>>>();
     }
 
     @Override
@@ -77,6 +83,23 @@ public class SimpleHCatDependencyCache implements HCatDependencyCache {
                 partKeyPatterns = existingMap;
             }
         }
+        ConcurrentMap<String, Collection<String>> partitionMap = actionPartitionMap.get(actionID);
+        if (partitionMap == null) {
+            partitionMap = new ConcurrentHashMap<String, Collection<String>>();
+            ConcurrentMap<String, Collection<String>> existingPartMap = actionPartitionMap.putIfAbsent(actionID,
+                    partitionMap);
+            if (existingPartMap != null) {
+                partitionMap = existingPartMap;
+            }
+        }
+        synchronized (partitionMap) {
+            Collection<String> partKeys = partitionMap.get(tableKey);
+            if (partKeys == null) {
+                partKeys = new ArrayList<String>();
+            }
+            partKeys.add(partKey);
+            partitionMap.put(tableKey, partKeys);
+        }
         synchronized (partKeyPatterns) {
             missingDeps.put(tableKey, partKeyPatterns); // To handle race condition with removal of partKeyPatterns
             Map<String, Collection<WaitingAction>> partValues = partKeyPatterns.get(partKey);
@@ -105,6 +128,22 @@ public class SimpleHCatDependencyCache implements HCatDependencyCache {
                     hcatURI.toURIString(), actionID);
             return false;
         }
+        ConcurrentMap<String, Collection<String>> partitionMap = actionPartitionMap.get(actionID);
+        if (partitionMap != null) {
+            synchronized (partitionMap) {
+                Collection<String> partKeys = partitionMap.get(tableKey);
+                if (partKeys != null) {
+                    partKeys.remove(partKey);
+                }
+                if (partKeys.size() == 0) {
+                    partitionMap.remove(tableKey);
+                }
+                if (partitionMap.size() == 0) {
+                    actionPartitionMap.remove(actionID);
+                }
+            }
+        }
+
         synchronized(partKeyPatterns) {
             Map<String, Collection<WaitingAction>> partValues = partKeyPatterns.get(partKey);
             if (partValues == null) {
@@ -292,7 +331,82 @@ public class SimpleHCatDependencyCache implements HCatDependencyCache {
         public String getPartVals() {
             return partVals.toString();
         }
+    }
+
+    private HCatURI removePartitions(String coordActionId, Collection<String> partKeys,
+            Map<String, Map<String, Collection<WaitingAction>>> partKeyPatterns) {
+        HCatURI hcatUri = null;
+        for (String partKey : partKeys) {
+            Map<String, Collection<WaitingAction>> partValues = partKeyPatterns.get(partKey);
+            Iterator<String> partValItr = partValues.keySet().iterator();
+            while (partValItr.hasNext()) {
+                String partVal = partValItr.next();
+                Collection<WaitingAction> waitingActions = partValues.get(partVal);
+                if (waitingActions != null) {
+                    Iterator<WaitingAction> waitItr = waitingActions.iterator();
+                    while (waitItr.hasNext()) {
+                        WaitingAction waction = waitItr.next();
+                        if (coordActionId.contains(waction.getActionID())) {
+                            waitItr.remove();
+                            if (hcatUri == null) {
+                                try {
+                                    hcatUri = new HCatURI(waction.getDependencyURI());
+                                }
+                                catch (URISyntaxException e) {
+                                    continue;
+                                }
+                            }
+                        }
+                    }
+                }
+                // delete partition value with no waiting actions
+                if (waitingActions.size() == 0) {
+                    partValItr.remove();
+                }
+            }
+            if (partValues.size() == 0) {
+                partKeyPatterns.remove(partKey);
+            }
+        }
+        return hcatUri;
+    }
 
+    @Override
+    public void removeNonWaitingCoordActions(Set<String> coordActions) {
+        HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
+        for (String coordActionId : coordActions) {
+            synchronized (actionPartitionMap) {
+                Map<String, Collection<String>> partitionMap = actionPartitionMap.get(coordActionId);
+                if (partitionMap != null) {
+                    Iterator<String> tableItr = partitionMap.keySet().iterator();
+                    while (tableItr.hasNext()) {
+                        String tableKey = tableItr.next();
+                        HCatURI hcatUri = null;
+                        Map<String, Map<String, Collection<WaitingAction>>> partKeyPatterns = missingDeps.get(tableKey);
+                        if (partKeyPatterns != null) {
+                            synchronized (partKeyPatterns) {
+                                Collection<String> partKeys = partitionMap.get(tableKey);
+                                if (partKeys != null) {
+                                    hcatUri = removePartitions(coordActionId, partKeys, partKeyPatterns);
+                                }
+                            }
+                            if (partKeyPatterns.size() == 0) {
+                                tableItr.remove();
+                                if (hcatUri != null) {
+                                    // Close JMS session. Stop listening on topic
+                                    hcatService.unregisterFromNotification(hcatUri);
+                                }
+                            }
+                        }
+                    }
+                }
+                actionPartitionMap.remove(coordActionId);
+            }
+        }
     }
 
+    @Override
+    public void removeCoordActionWithDependenciesAvailable(String coordAction) {
+        actionPartitionMap.remove(coordAction);
+    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/198f5c2a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java
index f5304ca..d56af7b 100644
--- a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java
+++ b/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java
@@ -25,10 +25,16 @@ import java.util.List;
 import javax.persistence.EntityManager;
 import javax.persistence.Query;
 
+import org.apache.oozie.BinaryBlob;
 import org.apache.oozie.CoordinatorActionBean;
 import org.apache.oozie.ErrorCode;
+import org.apache.oozie.StringBlob;
+import org.apache.oozie.WorkflowActionBean;
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
+import org.apache.oozie.util.DateUtils;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -48,6 +54,7 @@ public class CoordActionQueryExecutor extends
         UPDATE_COORD_ACTION_FOR_MODIFIED_DATE,
         UPDATE_COORD_ACTION_RERUN,
         GET_COORD_ACTION,
+        GET_COORD_ACTION_STATUS,
         GET_COORD_ACTIVE_ACTIONS_COUNT_BY_JOBID,
         GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME
     };
@@ -174,6 +181,7 @@ public class CoordActionQueryExecutor extends
         CoordActionQuery caQuery = (CoordActionQuery) namedQuery;
         switch (caQuery) {
             case GET_COORD_ACTION:
+            case GET_COORD_ACTION_STATUS:
                 query.setParameter("id", parameters[0]);
                 break;
             case GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME:
@@ -198,10 +206,11 @@ public class CoordActionQueryExecutor extends
     public CoordinatorActionBean get(CoordActionQuery namedQuery, Object... parameters) throws JPAExecutorException {
         EntityManager em = jpaService.getEntityManager();
         Query query = getSelectQuery(namedQuery, em, parameters);
-        CoordinatorActionBean bean = (CoordinatorActionBean) jpaService.executeGet(namedQuery.name(), query, em);
-        if (bean == null) {
+        Object ret = jpaService.executeGet(namedQuery.name(), query, em);
+        if (ret == null) {
             throw new JPAExecutorException(ErrorCode.E0605, query.toString());
         }
+        CoordinatorActionBean bean = constructBean(namedQuery, ret);
         return bean;
     }
 
@@ -222,11 +231,19 @@ public class CoordActionQueryExecutor extends
 
     private CoordinatorActionBean constructBean(CoordActionQuery namedQuery, Object ret) throws JPAExecutorException {
         CoordinatorActionBean bean;
+        Object[] arr;
         switch (namedQuery) {
+            case GET_COORD_ACTION:
+                bean = (CoordinatorActionBean) ret;
+                break;
             case GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME:
                 bean = new CoordinatorActionBean();
                 bean.setJobId((String) ret);
                 break;
+            case GET_COORD_ACTION_STATUS:
+                bean = new CoordinatorActionBean();
+                bean.setStatusStr((String)ret);
+                break;
             default:
                 throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct action bean for "
                         + namedQuery.name());

http://git-wip-us.apache.org/repos/asf/oozie/blob/198f5c2a/core/src/main/java/org/apache/oozie/service/JobsConcurrencyService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/JobsConcurrencyService.java b/core/src/main/java/org/apache/oozie/service/JobsConcurrencyService.java
index 27c97e6..36adbd6 100644
--- a/core/src/main/java/org/apache/oozie/service/JobsConcurrencyService.java
+++ b/core/src/main/java/org/apache/oozie/service/JobsConcurrencyService.java
@@ -134,4 +134,12 @@ public class JobsConcurrencyService implements Service, Instrumentable {
     public boolean isAllServerRequest(Map<String, String[]> params) {
         return false;
     }
+
+    /**
+     * Check if it is running in HA mode
+     * @return false
+     */
+    public boolean isHighlyAvailableMode(){
+        return false;
+    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/198f5c2a/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java b/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java
index 985dcab..41d1ba2 100644
--- a/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java
+++ b/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java
@@ -18,16 +18,28 @@
 package org.apache.oozie.service;
 
 import java.util.Collection;
+import java.util.Date;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.client.CoordinatorAction;
 import org.apache.oozie.command.coord.CoordActionUpdatePushMissingDependency;
 import org.apache.oozie.dependency.hcat.HCatDependencyCache;
 import org.apache.oozie.dependency.hcat.SimpleHCatDependencyCache;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
 import org.apache.oozie.util.HCatURI;
 import org.apache.oozie.util.XLog;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * Module that functions like a caching service to maintain partition dependency mappings
  */
@@ -35,11 +47,20 @@ public class PartitionDependencyManagerService implements Service {
 
     public static final String CONF_PREFIX = Service.CONF_PREFIX + "PartitionDependencyManagerService.";
     public static final String CACHE_MANAGER_IMPL = CONF_PREFIX + "cache.manager.impl";
+    public static final String CACHE_PURGE_INTERVAL = CONF_PREFIX + "cache.purge.interval";
+    public static final String CACHE_PURGE_TTL = CONF_PREFIX + "cache.purge.ttl";
 
     private static XLog LOG = XLog.getLog(PartitionDependencyManagerService.class);
 
     private HCatDependencyCache dependencyCache;
 
+    /**
+     * Keep timestamp when missing dependencies of a coord action are registered
+     */
+    private ConcurrentMap<String, Long> registeredCoordActionMap;
+
+    private boolean purgeEnabled = false;
+
     @Override
     public void init(Services services) throws ServiceException {
         init(services.getConf());
@@ -52,6 +73,57 @@ public class PartitionDependencyManagerService implements Service {
         dependencyCache.init(conf);
         LOG.info("PartitionDependencyManagerService initialized. Dependency cache is {0} ", dependencyCache.getClass()
                 .getName());
+        purgeEnabled = Services.get().get(JobsConcurrencyService.class).isHighlyAvailableMode();
+        if (purgeEnabled) {
+            Runnable purgeThread = new CachePurgeWorker(dependencyCache);
+            // schedule runnable by default every 10 min
+            Services.get()
+                    .get(SchedulerService.class)
+                    .schedule(purgeThread, 10, Services.get().getConf().getInt(CACHE_PURGE_INTERVAL, 600),
+                            SchedulerService.Unit.SEC);
+            registeredCoordActionMap = new ConcurrentHashMap<String, Long>();
+        }
+    }
+
+    private class CachePurgeWorker implements Runnable {
+        HCatDependencyCache cache;
+        public CachePurgeWorker(HCatDependencyCache cache) {
+            this.cache = cache;
+        }
+
+        @Override
+        public void run() {
+            if (Thread.currentThread().isInterrupted()) {
+                return;
+            }
+            try {
+                purgeMissingDependency(Services.get().getConf().getInt(CACHE_PURGE_TTL, 1800));
+            }
+            catch (Throwable error) {
+                XLog.getLog(PartitionDependencyManagerService.class).debug("Throwable in CachePurgeWorker thread run : ", error);
+            }
+        }
+
+        private void purgeMissingDependency(int timeToLive) {
+            long currentTime = new Date().getTime();
+            Set<String> staleActions = new HashSet<String>();
+            for(String actionId : registeredCoordActionMap.keySet()) {
+                Long regTime = registeredCoordActionMap.get(actionId);
+                if(regTime < (currentTime - timeToLive * 1000)){
+                    CoordinatorActionBean caBean = null;
+                    try {
+                        caBean = CoordActionQueryExecutor.getInstance().get(CoordActionQuery.GET_COORD_ACTION_STATUS, actionId);
+                    }
+                    catch (JPAExecutorException e) {
+                        LOG.warn("Error in checking coord action:" + actionId + "to purge, skipping", e);
+                    }
+                    if(caBean != null && !caBean.getStatus().equals(CoordinatorAction.Status.WAITING)){
+                        staleActions.add(actionId);
+                    }
+                }
+            }
+            dependencyCache.removeNonWaitingCoordActions(staleActions);
+        }
     }
 
     @Override
@@ -71,6 +143,9 @@ public class PartitionDependencyManagerService implements Service {
      * @param actionID ID of action which is waiting for the dependency
      */
     public void addMissingDependency(HCatURI hcatURI, String actionID) {
+        if (purgeEnabled) {
+            registeredCoordActionMap.put(actionID, new Date().getTime());
+        }
         dependencyCache.addMissingDependency(hcatURI, actionID);
     }
 
@@ -142,4 +217,22 @@ public class PartitionDependencyManagerService implements Service {
         return dependencyCache.removeAvailableDependencyURIs(actionID, dependencyURIs);
     }
 
+    /**
+     * Remove a coord action from dependency cache when all push missing dependencies available
+     *
+     * @param actionID action id
+     * @param dependencyURIs set of dependency URIs
+     * @return true if successful, else false
+     */
+    public void removeCoordActionWithDependenciesAvailable(String actionID) {
+        if (purgeEnabled) {
+            registeredCoordActionMap.remove(actionID);
+        }
+        dependencyCache.removeCoordActionWithDependenciesAvailable(actionID);
+    }
+
+    @VisibleForTesting
+    public void runCachePurgeWorker() {
+        new CachePurgeWorker(dependencyCache).run();
+    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/198f5c2a/core/src/main/java/org/apache/oozie/service/ZKJobsConcurrencyService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/ZKJobsConcurrencyService.java b/core/src/main/java/org/apache/oozie/service/ZKJobsConcurrencyService.java
index 611b74c..1d5f4a4 100644
--- a/core/src/main/java/org/apache/oozie/service/ZKJobsConcurrencyService.java
+++ b/core/src/main/java/org/apache/oozie/service/ZKJobsConcurrencyService.java
@@ -207,4 +207,14 @@ public class ZKJobsConcurrencyService extends JobsConcurrencyService implements
         return params == null || params.get(RestConstants.ALL_SERVER_REQUEST) == null || params.isEmpty()
                 || !params.get(RestConstants.ALL_SERVER_REQUEST)[0].equalsIgnoreCase("false");
     }
+
+    /**
+     * Return if it is running in HA mode
+     *
+     * @return
+     */
+    @Override
+    public boolean isHighlyAvailableMode() {
+        return true;
+    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/198f5c2a/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java b/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java
index 1ffadbd..0eafacf 100644
--- a/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java
@@ -268,8 +268,9 @@ public class TestCoordActionInputCheckXCommand extends XDataTestCase {
     }
 
     public void testActionInputCheckLatestActionCreationTimeWithPushDependency() throws Exception {
+        setupServicesForHCatalog(services);
         Services.get().getConf().setBoolean(CoordELFunctions.LATEST_EL_USE_CURRENT_TIME, false);
-
+        services.init();
         String jobId = "0000000-" + new Date().getTime() + "-TestCoordActionInputCheckXCommand-C";
         Date startTime = DateUtils.parseDateOozieTZ("2009-02-15T23:59" + TZ);
         Date endTime = DateUtils.parseDateOozieTZ("2009-02-16T23:59" + TZ);
@@ -400,7 +401,9 @@ public class TestCoordActionInputCheckXCommand extends XDataTestCase {
     }
 
     public void testActionInputCheckLatestCurrentTimeWithPushDependency() throws Exception {
+        setupServicesForHCatalog(services);
         Services.get().getConf().setBoolean(CoordELFunctions.LATEST_EL_USE_CURRENT_TIME, true);
+        services.init();
 
         String jobId = "0000000-" + new Date().getTime() + "-TestCoordActionInputCheckXCommand-C";
         Date startTime = DateUtils.parseDateOozieTZ("2009-02-15T23:59" + TZ);

http://git-wip-us.apache.org/repos/asf/oozie/blob/198f5c2a/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java b/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java
index da09727..3c8b082 100644
--- a/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java
@@ -464,5 +464,4 @@ public class TestCoordPushDependencyCheckXCommand extends XDataTestCase {
             throw new Exception("Action ID " + actionId + " was not stored properly in db");
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/198f5c2a/core/src/test/java/org/apache/oozie/service/TestHAPartitionDependencyManagerEhCache.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestHAPartitionDependencyManagerEhCache.java b/core/src/test/java/org/apache/oozie/service/TestHAPartitionDependencyManagerEhCache.java
new file mode 100644
index 0000000..9d3165d
--- /dev/null
+++ b/core/src/test/java/org/apache/oozie/service/TestHAPartitionDependencyManagerEhCache.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.service;
+
+import org.apache.oozie.dependency.hcat.EhcacheHCatDependencyCache;
+
+public class TestHAPartitionDependencyManagerEhCache extends TestHAPartitionDependencyManagerService {
+
+    protected void setUp() throws Exception {
+        super.setUp();
+        services.getConf().set(PartitionDependencyManagerService.CACHE_MANAGER_IMPL,
+                EhcacheHCatDependencyCache.class.getName());
+        services.setService(ZKJobsConcurrencyService.class);
+        PartitionDependencyManagerService pdms = services.get(PartitionDependencyManagerService.class);
+        pdms.init(services);
+    }
+
+    @Override
+    public void testDependencyCacheWithHA(){
+    }
+
+    @Override
+    public void testPurgeMissingDependencies() throws Exception {
+        PartitionDependencyManagerService pdms = services.get(PartitionDependencyManagerService.class);
+        testPurgeMissingDependenciesForCache(pdms);
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/198f5c2a/core/src/test/java/org/apache/oozie/service/TestHAPartitionDependencyManagerService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestHAPartitionDependencyManagerService.java b/core/src/test/java/org/apache/oozie/service/TestHAPartitionDependencyManagerService.java
new file mode 100644
index 0000000..da383b3
--- /dev/null
+++ b/core/src/test/java/org/apache/oozie/service/TestHAPartitionDependencyManagerService.java
@@ -0,0 +1,257 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.service;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.hadoop.util.Shell;
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.client.CoordinatorAction.Status;
+import org.apache.oozie.client.rest.JsonBean;
+import org.apache.oozie.dependency.hcat.HCatMessageHandler;
+import org.apache.oozie.executor.jpa.BatchQueryExecutor;
+import org.apache.oozie.service.RecoveryService.RecoveryRunnable;
+import org.apache.oozie.test.ZKXTestCase;
+import org.apache.oozie.util.HCatURI;
+
+public class TestHAPartitionDependencyManagerService extends ZKXTestCase {
+
+    protected Services services;
+    protected String server;
+    protected String db;
+    protected String table1;
+    protected String table2;
+    protected String part1;
+    protected String part2;
+    protected String part3;
+
+    protected void setUp() throws Exception {
+        super.setUp();
+        services = super.setupServicesForHCatalog(Services.get());
+        // disable recovery service
+        services.getConf().setInt(RecoveryService.CONF_SERVICE_INTERVAL, 1000000);
+        // disable regular cache purge
+        services.getConf().setInt(PartitionDependencyManagerService.CACHE_PURGE_INTERVAL, 1000000);
+        server = super.getHCatalogServer().getMetastoreAuthority();
+        services.init();
+    }
+
+    protected void tearDown() throws Exception {
+        super.tearDown();
+    }
+
+    private void populateTable() throws Exception {
+        dropTable(db, table1, true);
+        dropTable(db, table2, true);
+        dropDatabase(db, true);
+        createDatabase(db);
+        createTable(db, table1, "dt,country");
+        createTable(db, table2, "dt,country");
+    }
+
+    protected String getSanitizedTestCaseDir() {
+        // On Windows, the working directory will have a colon from to the drive letter. Because colons
+        // are not allowed in DFS paths, we remove it. Also, prepend a backslash to simulate an absolute path.
+        if(Shell.WINDOWS) {
+            return "\\" + getTestCaseDir().replaceAll(":", "");
+        }
+        else {
+            return getTestCaseDir();
+        }
+    }
+
+    public void testDependencyCacheWithHA() throws Exception {
+
+        db = "default";
+        table1 = "mytbl";
+        table2 = "mytb2";
+        part1 = "dt=20120101;country=us";
+        part2 = "dt=20120102;country=us";
+        part3 = "dt=20120103;country=us";
+        String newHCatDependency1 = "hcat://" + server + "/" + db + "/" + table1 + "/" + part1;
+        String newHCatDependency2 = "hcat://" + server + "/" + db + "/" + table1 + "/" + part2;
+        String newHCatDependency3 = "hcat://" + server + "/" + db + "/" + table2 + "/" + part3;
+        HCatURI dep1 = new HCatURI(newHCatDependency1);
+        HCatURI dep2 = new HCatURI(newHCatDependency2);
+        HCatURI dep3 = new HCatURI(newHCatDependency3);
+        // create db, table and partitions
+        populateTable();
+
+        String actionId1 = addInitRecords(newHCatDependency1);
+        String actionId2 = addInitRecords(newHCatDependency2);
+        String actionId3 = addInitRecords(newHCatDependency3);
+
+        // Assume dependency cache on dummy server with missing push dependencies registered
+        PartitionDependencyManagerService dummyPdms = new PartitionDependencyManagerService();
+        PartitionDependencyManagerService pdms = Services.get().get(PartitionDependencyManagerService.class);
+        dummyPdms.init(Services.get());
+        dummyPdms.addMissingDependency(dep1, actionId1);
+        dummyPdms.addMissingDependency(dep2, actionId2);
+        dummyPdms.addMissingDependency(dep3, actionId3);
+
+        Collection<String> waitingActions = (Collection<String>)dummyPdms.getWaitingActions(dep1);
+        assertEquals(1, waitingActions.size());
+        waitingActions = (Collection<String>)dummyPdms.getWaitingActions(dep2);
+        assertEquals(1, waitingActions.size());
+        waitingActions = (Collection<String>)dummyPdms.getWaitingActions(dep3);
+        assertEquals(1, waitingActions.size());
+
+        //Dependency cache on living server doesn't have these partitions registered at this point
+        waitingActions = (Collection<String>)pdms.getWaitingActions(dep1);
+        assertNull(waitingActions);
+        waitingActions = (Collection<String>)pdms.getWaitingActions(dep2);
+        assertNull(waitingActions);
+        waitingActions = (Collection<String>)pdms.getWaitingActions(dep3);
+        assertNull(waitingActions);
+
+        //Assume dummy server is down, and recovery service on living server pick up these jobs
+        dummyPdms.destroy();
+        Runnable recoveryRunnable = new RecoveryRunnable(60, 0, 60);
+        recoveryRunnable.run();
+        waitFor(30 * 1000, new Predicate() {
+            public boolean evaluate() throws Exception {
+                Collection<String> waitingActions;
+                PartitionDependencyManagerService pdms = Services.get().get(PartitionDependencyManagerService.class);
+                HCatURI dep1 = new HCatURI("hcat://"+ server + "/" + db + "/" + table1 + "/" + part1);
+                HCatURI dep2 = new HCatURI("hcat://"+ server + "/" + db + "/" + table1 + "/" + part2);
+                HCatURI dep3 = new HCatURI("hcat://"+ server + "/" + db + "/" + table2 + "/" + part3);
+                waitingActions = pdms.getWaitingActions(dep1);
+                if(waitingActions == null) {
+                    return false;
+                }
+                waitingActions = pdms.getWaitingActions(dep2);
+                if(waitingActions == null) {
+                    return false;
+                }
+                waitingActions = pdms.getWaitingActions(dep3);
+                if(waitingActions == null) {
+                    return false;
+                }
+                return true;
+            }
+        });
+        //Dependency cache on living server has missing partitions added
+        waitingActions = (Collection<String>)pdms.getWaitingActions(dep1);
+        assertEquals(1, waitingActions.size());
+        assertTrue(waitingActions.contains(actionId1));
+        waitingActions = (Collection<String>)pdms.getWaitingActions(dep2);
+        assertEquals(1, waitingActions.size());
+        assertTrue(waitingActions.contains(actionId2));
+        waitingActions = (Collection<String>)pdms.getWaitingActions(dep3);
+        assertEquals(1, waitingActions.size());
+        assertTrue(waitingActions.contains(actionId3));
+
+        HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
+        // mytbl and mytb2 registered to topic map to receive notification
+        assertTrue(hcatService.isRegisteredForNotification(dep1));
+        assertTrue(hcatService.isRegisteredForNotification(dep2));
+        assertTrue(hcatService.isRegisteredForNotification(dep3));
+    }
+
+    protected void addMissingDependencyAndRegister(HCatURI hcatURI, String actionId, PartitionDependencyManagerService pdms) {
+        pdms.addMissingDependency(hcatURI, actionId);
+        HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
+        if (!hcatService.isRegisteredForNotification(hcatURI)) {
+            hcatService.registerForNotification(hcatURI, hcatURI.getDb() + "." + hcatURI.getTable(),
+                    new HCatMessageHandler(hcatURI.getServer()));
+        }
+    }
+
+    public void testPurgeMissingDependencies() throws Exception{
+        services.setService(ZKJobsConcurrencyService.class);
+        PartitionDependencyManagerService pdms = services.get(PartitionDependencyManagerService.class);
+        pdms.init(services);
+        testPurgeMissingDependenciesForCache(pdms);
+    }
+
+    protected void testPurgeMissingDependenciesForCache(PartitionDependencyManagerService pdms) throws Exception{
+
+        String actionId1 = "1234465451";
+        String actionId2 = "1234465452";
+        String actionId3 = "1234465453";
+
+        // add partitions as missing
+        HCatURI dep1 = new HCatURI("hcat://hcat-server1.domain.com:5080/mydb/mytbl1/dt=20120101;country=us");
+        HCatURI dep2 = new HCatURI("hcat://hcat-server1.domain.com:5080/mydb/mytbl1/country=us;dt=20120101");
+        HCatURI dep3 = new HCatURI("hcat://hcat-server2.domain.com:5080/mydb/mytbl2/dt=20120102;country=us");
+
+        // actionId1-->(dep1,2), actionId2-->(dep2), actionId3-->(dep2,3)
+        addMissingDependencyAndRegister(dep1, actionId1, pdms);
+        addMissingDependencyAndRegister(dep2, actionId1, pdms);
+        addMissingDependencyAndRegister(dep2, actionId2, pdms);
+        addMissingDependencyAndRegister(dep2, actionId3, pdms);
+        addMissingDependencyAndRegister(dep3, actionId3, pdms);
+
+        List<String> waitingDep1 = (ArrayList<String>) pdms.getWaitingActions(dep1);
+        assertEquals(waitingDep1.size(), 1);
+        assertEquals(waitingDep1.get(0), actionId1);
+
+        List<String> waitingDep2 = (ArrayList<String>) pdms.getWaitingActions(dep2);
+        assertEquals(waitingDep2.size(), 3);
+        for (String id : waitingDep2) {
+            assertTrue(id.equals(actionId1) || id.equals(actionId2) || id.equals(actionId3));
+        }
+        List<String> waitingDep3 = (ArrayList<String>) pdms.getWaitingActions(dep3);
+        assertEquals(waitingDep3.size(), 1);
+        assertTrue(waitingDep3.get(0).equals(actionId3));
+
+        // make only coordAction 1 to WAITING, the rest to RUNNING (only WAITING
+        // remain dependency cache)
+        ArrayList<JsonBean> insertList = new ArrayList<JsonBean>();
+        CoordinatorActionBean coordAction1 = new CoordinatorActionBean();
+        coordAction1.setId(actionId1);
+        coordAction1.setStatus(Status.WAITING);
+        insertList.add(coordAction1);
+        CoordinatorActionBean coordAction2 = new CoordinatorActionBean();
+        coordAction2.setId(actionId2);
+        coordAction2.setStatus(Status.RUNNING);
+        insertList.add(coordAction2);
+        CoordinatorActionBean coordAction3 = new CoordinatorActionBean();
+        coordAction3.setId(actionId3);
+        coordAction3.setStatus(Status.RUNNING);
+        insertList.add(coordAction3);
+        BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, null, null);
+
+        // run cache purge
+        Services.get().getConf().setInt(PartitionDependencyManagerService.CACHE_PURGE_TTL, 0);
+        pdms.runCachePurgeWorker();
+
+        // only coord Action 1 still in dependency cache
+        waitingDep1 = (ArrayList<String>) pdms.getWaitingActions(dep1);
+        assertEquals(waitingDep1.size(), 1);
+        assertTrue(waitingDep1.get(0).equals(actionId1));
+
+        // only coord Action 1 still in dependency cache
+        waitingDep2 = (ArrayList<String>) pdms.getWaitingActions(dep2);
+        assertEquals(waitingDep2.size(), 1);
+        assertTrue(waitingDep2.get(0).equals(actionId1));
+
+        waitingDep3 = (ArrayList<String>) pdms.getWaitingActions(dep3);
+        assertNull(waitingDep3);
+
+        HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
+        // mytbl1 should be still in topic map
+        assertTrue(hcatService.isRegisteredForNotification(dep1));
+        // mytbl1 should be still in topic map
+        assertTrue(hcatService.isRegisteredForNotification(dep2));
+        // mytbl2 should NOT be in topic map
+        assertFalse(hcatService.isRegisteredForNotification(dep3));
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/198f5c2a/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerEhcache.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerEhcache.java b/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerEhcache.java
index cfdfbd1..7b88a19 100644
--- a/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerEhcache.java
+++ b/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerEhcache.java
@@ -131,5 +131,4 @@ public class TestPartitionDependencyManagerEhcache extends TestPartitionDependen
             assertTrue(dep.toURIString() + " is missing in cache", waitingActions.contains(actionID));
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/198f5c2a/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java b/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java
index ef71fb0..67ea851 100644
--- a/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java
@@ -20,16 +20,20 @@ package org.apache.oozie.service;
 import java.lang.management.ManagementFactory;
 import java.lang.management.MemoryMXBean;
 import java.net.URISyntaxException;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.List;
 
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.client.CoordinatorAction.Status;
+import org.apache.oozie.client.rest.JsonBean;
 import org.apache.oozie.dependency.hcat.HCatMessageHandler;
+import org.apache.oozie.executor.jpa.BatchQueryExecutor;
 import org.apache.oozie.jms.JMSConnectionInfo;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.test.XDataTestCase;
 import org.apache.oozie.util.HCatURI;
 import org.apache.oozie.util.XLog;
-import org.junit.After;
-import org.junit.Before;
 import org.junit.Test;
 
 /**
@@ -40,14 +44,15 @@ public class TestPartitionDependencyManagerService extends XDataTestCase {
 
     private static XLog LOG = XLog.getLog(TestPartitionDependencyManagerService.class);
     protected Services services;
-    @Before
+
     protected void setUp() throws Exception {
         super.setUp();
         services = super.setupServicesForHCatalog();
+        // disable regular cache purge
+        services.getConf().setInt(PartitionDependencyManagerService.CACHE_PURGE_INTERVAL, 1000000);
         services.init();
     }
 
-    @After
     protected void tearDown() throws Exception {
         Services.get().destroy();
         super.tearDown();
@@ -55,6 +60,7 @@ public class TestPartitionDependencyManagerService extends XDataTestCase {
 
     @Test
     public void testPartitionDependency() throws Exception {
+
         // Test all APIs related to dependency caching
         String actionId1 = "1234465451";
         String actionId2 = "1234465452";
@@ -72,15 +78,16 @@ public class TestPartitionDependencyManagerService extends XDataTestCase {
         HCatURI dep3 = new HCatURI("hcat://hcat-server2.domain.com:5080/mydb/mytbl2/dt=20120102;country=us");
         HCatURI dep4 = new HCatURI("hcat://hcat-server2.domain.com:5080/mydb/mytbl2/dt=20120102;country=us;state=CA");
 
-        addMissingDependencyAndRegister(dep1, actionId1);
-        addMissingDependencyAndRegister(dep2, actionId1);
-        addMissingDependencyAndRegister(dep2, actionId2);
-        addMissingDependencyAndRegister(dep2, actionId3);
-        addMissingDependencyAndRegister(dep3, actionId3);
-        addMissingDependencyAndRegister(dep4, actionId4);
+        PartitionDependencyManagerService pdms = Services.get().get(PartitionDependencyManagerService.class);
+        addMissingDependencyAndRegister(dep1, actionId1, pdms);
+        addMissingDependencyAndRegister(dep2, actionId1, pdms);
+        addMissingDependencyAndRegister(dep2, actionId2, pdms);
+        addMissingDependencyAndRegister(dep2, actionId3, pdms);
+        addMissingDependencyAndRegister(dep3, actionId3, pdms);
+        addMissingDependencyAndRegister(dep4, actionId4, pdms);
         // Add duplicates. RecoveryService will add duplicates
-        addMissingDependencyAndRegister(dep4, actionId4);
-        addMissingDependencyAndRegister(dep4, actionId4);
+        addMissingDependencyAndRegister(dep4, actionId4, pdms);
+        addMissingDependencyAndRegister(dep4, actionId4, pdms);
 
         HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
         JMSAccessorService jmsService = Services.get().get(JMSAccessorService.class);
@@ -90,7 +97,6 @@ public class TestPartitionDependencyManagerService extends XDataTestCase {
         assertTrue(jmsService.isListeningToTopic(connInfo, dep1.getDb() + "." + dep1.getTable()));
         assertTrue(jmsService.isListeningToTopic(connInfo, dep3.getDb() + "." + dep3.getTable()));
 
-        PartitionDependencyManagerService pdms = Services.get().get(PartitionDependencyManagerService.class);
         assertTrue(pdms.getWaitingActions(dep1).contains(actionId1));
         assertTrue(pdms.getWaitingActions(dep2).contains(actionId1));
         assertTrue(pdms.getWaitingActions(dep2).contains(actionId2));
@@ -130,8 +136,7 @@ public class TestPartitionDependencyManagerService extends XDataTestCase {
         assertFalse(jmsService.isListeningToTopic(connInfo, dep3.getDb() + "." + dep3.getTable()));
     }
 
-    private void addMissingDependencyAndRegister(HCatURI hcatURI, String actionId) {
-        PartitionDependencyManagerService pdms = Services.get().get(PartitionDependencyManagerService.class);
+    protected void addMissingDependencyAndRegister(HCatURI hcatURI, String actionId, PartitionDependencyManagerService pdms) {
         pdms.addMissingDependency(hcatURI, actionId);
         HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
         if (!hcatService.isRegisteredForNotification(hcatURI)) {
@@ -191,5 +196,4 @@ public class TestPartitionDependencyManagerService extends XDataTestCase {
             assertTrue(dep.toURIString() + " is missing in cache", waitingActions.contains(actionID));
         }
     }
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/oozie/blob/198f5c2a/core/src/test/java/org/apache/oozie/test/XTestCase.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/test/XTestCase.java b/core/src/test/java/org/apache/oozie/test/XTestCase.java
index 1536927..6bf0a8f 100644
--- a/core/src/test/java/org/apache/oozie/test/XTestCase.java
+++ b/core/src/test/java/org/apache/oozie/test/XTestCase.java
@@ -1068,6 +1068,11 @@ public abstract class XTestCase extends TestCase {
 
     protected Services setupServicesForHCatalog() throws ServiceException {
         Services services = new Services();
+        setupServicesForHCataLogImpl(services);
+        return services;
+    }
+
+    private void setupServicesForHCataLogImpl(Services services) {
         Configuration conf = services.getConf();
         conf.set(Services.CONF_SERVICE_EXT_CLASSES,
                 JMSAccessorService.class.getName() + "," +
@@ -1081,8 +1086,11 @@ public abstract class XTestCase extends TestCase {
                 FSURIHandler.class.getName() + "," + HCatURIHandler.class.getName());
         setSystemProperty("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
         setSystemProperty("java.naming.provider.url", "vm://localhost?broker.persistent=false");
-        return services;
     }
 
+    protected Services setupServicesForHCatalog(Services services) throws ServiceException {
+        setupServicesForHCataLogImpl(services);
+        return services;
+    }
 }
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/198f5c2a/core/src/test/java/org/apache/oozie/test/ZKXTestCase.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/test/ZKXTestCase.java b/core/src/test/java/org/apache/oozie/test/ZKXTestCase.java
index 7bebaf0..3d37d48 100644
--- a/core/src/test/java/org/apache/oozie/test/ZKXTestCase.java
+++ b/core/src/test/java/org/apache/oozie/test/ZKXTestCase.java
@@ -49,7 +49,7 @@ import org.apache.oozie.util.ZKUtils;
  * <p>
  * To use security, see {@link ZKXTestCaseWithSecurity}.
  */
-public abstract class ZKXTestCase extends XTestCase {
+public abstract class ZKXTestCase extends XDataTestCase {
     private TestingServer zkServer;
     private CuratorFramework client = null;
     private ServiceDiscovery<Map> sDiscovery = null;

http://git-wip-us.apache.org/repos/asf/oozie/blob/198f5c2a/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index e311c7f..2f89c3e 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.1.0 release (trunk - unreleased)
 
+OOZIE-1492 Make sure HA works with HCat (ryota)
 OOZIE-1869 Sharelib update shows vip/load balancer address as one of the hostname (puru via ryota)
 OOZIE-1861 Pig action should work with tez mode (rohini)
 OOZIE-1703 User should be able to set coord end-time before start time (puru via rohini)