You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by vi...@apache.org on 2013/03/13 04:55:35 UTC

svn commit: r1455791 - in /oozie/trunk: ./ core/src/main/java/org/apache/oozie/command/coord/ core/src/main/java/org/apache/oozie/dependency/ core/src/main/java/org/apache/oozie/dependency/hcat/ core/src/test/java/org/apache/oozie/command/coord/ core/s...

Author: virag
Date: Wed Mar 13 03:55:35 2013
New Revision: 1455791

URL: http://svn.apache.org/r1455791
Log:
OOZIE-1263 Fix few HCat dependency check issues (rohini via virag)

Modified:
    oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdatePushMissingDependency.java
    oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
    oozie/trunk/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java
    oozie/trunk/core/src/main/java/org/apache/oozie/dependency/hcat/EhcacheHCatDependencyCache.java
    oozie/trunk/core/src/main/java/org/apache/oozie/dependency/hcat/SimpleHCatDependencyCache.java
    oozie/trunk/core/src/main/java/org/apache/oozie/dependency/hcat/WaitingAction.java
    oozie/trunk/core/src/main/java/org/apache/oozie/dependency/hcat/WaitingActions.java
    oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java
    oozie/trunk/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerEhcache.java
    oozie/trunk/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java
    oozie/trunk/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
    oozie/trunk/release-log.txt

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdatePushMissingDependency.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdatePushMissingDependency.java?rev=1455791&r1=1455790&r2=1455791&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdatePushMissingDependency.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdatePushMissingDependency.java Wed Mar 13 03:55:35 2013
@@ -43,14 +43,14 @@ public class CoordActionUpdatePushMissin
             PartitionDependencyManagerService pdms = Services.get().get(PartitionDependencyManagerService.class);
             Collection<String> availDepList = pdms.getAvailableDependencyURIs(actionId);
             if (availDepList == null || availDepList.size() == 0) {
-                LOG.info("There are no available dependencies for action ID: [{0}]", actionId);
+                LOG.info("There are no available dependencies");
                 if (isTimeout()) { // Poll and check as one last try
                     queue(new CoordPushDependencyCheckXCommand(coordAction.getId()), 100);
                 }
             }
             else {
-                LOG.debug("Updating action ID [{0}] with available uris=[{1}] where missing uris=[{2}]", actionId,
-                        availDepList.toString(), pushMissingDeps);
+                LOG.debug("Updating with available uris=[{0}] where missing uris=[{1}]", availDepList.toString(),
+                        pushMissingDeps);
 
                 String[] missingDepsArray = DependencyChecker.dependenciesAsArray(pushMissingDeps);
                 List<String> stillMissingDepsList = new ArrayList<String>(Arrays.asList(missingDepsArray));
@@ -82,12 +82,10 @@ public class CoordActionUpdatePushMissin
 
     private void removeAvailableDependencies(PartitionDependencyManagerService pdms, Collection<String> availDepList) {
         if (pdms.removeAvailableDependencyURIs(actionId, availDepList)) {
-            LOG.debug("Successfully removed uris [{0}] for actionId: [{1}] from available list",
-                    availDepList.toString(), actionId);
+            LOG.debug("Successfully removed uris [{0}] from available list", availDepList.toString());
         }
         else {
-            LOG.warn("Failed to remove uris [{0}] for actionId: [{1}] from available list", availDepList.toString(),
-                    actionId);
+            LOG.warn("Failed to remove uris [{0}] from available list", availDepList.toString(), actionId);
         }
     }
 

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java?rev=1455791&r1=1455790&r2=1455791&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java Wed Mar 13 03:55:35 2013
@@ -93,8 +93,11 @@ public class CoordPushDependencyCheckXCo
         }
         else {
             String[] missingDepsArray = DependencyChecker.dependenciesAsArray(pushMissingDeps);
-            LOG.info("First Push missing dependency for actionID [{0}] is [{1}] ", actionId, missingDepsArray[0]);
-            LOG.trace("Push missing dependencies for actionID [{0}] is [{1}] ", actionId, pushMissingDeps);
+            LOG.info("First Push missing dependency is [{0}] ", missingDepsArray[0]);
+            LOG.trace("Push missing dependencies are [{0}] ", pushMissingDeps);
+            if (registerForNotification) {
+                LOG.debug("Register for notifications is true");
+            }
 
             try {
                 Configuration actionConf = null;
@@ -142,7 +145,7 @@ public class CoordPushDependencyCheckXCo
                     registerForNotification(actionDep.getMissingDependencies(), actionConf);
                 }
                 else {
-                    unregisterAvailableDependencies(actionDep);
+                    unregisterAvailableDependencies(actionDep.getAvailableDependencies());
                 }
                 if (timeout) {
                     unregisterMissingDependencies(actionDep.getMissingDependencies(), actionId);
@@ -203,6 +206,12 @@ public class CoordPushDependencyCheckXCo
                 queue(new CoordActionReadyXCommand(coordAction.getJobId()), 100);
             }
         }
+        else if (isTimeout()) {
+            // If it is timeout and all push dependencies are available but still some unresolved
+            // missing dependencies queue CoordActionInputCheckXCommand now. Else it will have to
+            // wait till RecoveryService kicks in
+            queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()));
+        }
     }
 
     private String resolveCoordConfiguration() throws CommandException {
@@ -245,32 +254,29 @@ public class CoordPushDependencyCheckXCo
                 URI missingURI = new URI(missingDep);
                 URIHandler handler = uriService.getURIHandler(missingURI);
                 handler.registerForNotification(missingURI, actionConf, user, actionId);
-                    LOG.debug("Registered uri [{0}] for actionId: [{1}] for notifications",
-                            missingURI, actionId);
+                    LOG.debug("Registered uri [{0}] for notifications", missingURI);
             }
             catch (Exception e) {
-                LOG.warn("Exception while registering uri for actionId: [{0}] for notifications", actionId, e);
+                LOG.warn("Exception while registering uri [{0}] for notifications", missingDep, e);
             }
         }
     }
 
-    private void unregisterAvailableDependencies(ActionDependency actionDependency) {
+    private void unregisterAvailableDependencies(List<String> availableDeps) {
         URIHandlerService uriService = Services.get().get(URIHandlerService.class);
-        for (String availableDep : actionDependency.getAvailableDependencies()) {
+        for (String availableDep : availableDeps) {
             try {
                 URI availableURI = new URI(availableDep);
                 URIHandler handler = uriService.getURIHandler(availableURI);
                 if (handler.unregisterFromNotification(availableURI, actionId)) {
-                    LOG.debug("Successfully unregistered uri [{0}] for actionId: [{1}] from notifications",
-                            availableURI, actionId);
+                    LOG.debug("Successfully unregistered uri [{0}] from notifications", availableURI);
                 }
                 else {
-                    LOG.warn("Unable to unregister uri [{0}] for actionId: [{1}] from notifications", availableURI,
-                            actionId);
+                    LOG.warn("Unable to unregister uri [{0}] from notifications", availableURI);
                 }
             }
             catch (Exception e) {
-                LOG.warn("Exception while unregistering uri for actionId: [{0}] for notifications", actionId, e);
+                LOG.warn("Exception while unregistering uri [{0}] from notifications", availableDep, e);
             }
         }
     }
@@ -283,16 +289,14 @@ public class CoordPushDependencyCheckXCo
                 URI missingURI = new URI(missingDep);
                 URIHandler handler = uriService.getURIHandler(missingURI);
                 if (handler.unregisterFromNotification(missingURI, actionId)) {
-                    LOG.debug("Successfully unregistered uri [{0}] for actionId: [{1}] from notifications", missingURI,
-                            actionId);
+                    LOG.debug("Successfully unregistered uri [{0}] from notifications", missingURI);
                 }
                 else {
-                    LOG.warn("Unable to unregister uri [{0}] for actionId: [{1}] from notifications", missingURI,
-                            actionId);
+                    LOG.warn("Unable to unregister uri [{0}] from notifications", missingURI);
                 }
             }
             catch (Exception e) {
-                LOG.warn("Exception while registering uri for actionId: [{0}] for notifications", actionId, e);
+                LOG.warn("Exception while unregistering uri [{0}] from notifications", missingDep, e);
             }
         }
     }

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java?rev=1455791&r1=1455790&r2=1455791&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java Wed Mar 13 03:55:35 2013
@@ -48,7 +48,6 @@ import org.apache.oozie.util.XLog;
 
 public class HCatURIHandler implements URIHandler {
 
-    private static XLog LOG = XLog.getLog(HCatURIHandler.class);
     private Set<String> supportedSchemes;
     private Map<String, DependencyType> dependencyTypes;
     private List<Class<?>> classesToShip;
@@ -185,7 +184,8 @@ public class HCatURIHandler implements U
         }
         hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, serverURI);
         try {
-            LOG.info("Creating HCatClient for user [{0}] login_user [{1}] and server [{2}] ", user,
+            XLog.getLog(HCatURIHandler.class).info(
+                    "Creating HCatClient for user [{0}] login_user [{1}] and server [{2}] ", user,
                     UserGroupInformation.getLoginUser(), serverURI);
 
             // HiveMetastoreClient (hive 0.9) currently does not work if UGI has doAs
@@ -252,14 +252,13 @@ public class HCatURIHandler implements U
                 client.close();
             }
             catch (Exception ignore) {
-                LOG.warn("Error closing hcat client", ignore);
+                XLog.getLog(HCatURIHandler.class).warn("Error closing hcat client", ignore);
             }
         }
     }
 
     static class HCatContext extends Context {
 
-        private static XLog LOG = XLog.getLog(HCatContext.class);
         private HCatClient hcatClient;
 
         /**
@@ -289,7 +288,7 @@ public class HCatURIHandler implements U
                 hcatClient.close();
             }
             catch (Exception ignore) {
-                LOG.warn("Error closing hcat client", ignore);
+                XLog.getLog(HCatContext.class).warn("Error closing hcat client", ignore);
             }
         }
 

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/dependency/hcat/EhcacheHCatDependencyCache.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/dependency/hcat/EhcacheHCatDependencyCache.java?rev=1455791&r1=1455790&r2=1455791&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/dependency/hcat/EhcacheHCatDependencyCache.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/dependency/hcat/EhcacheHCatDependencyCache.java Wed Mar 13 03:55:35 2013
@@ -21,17 +21,18 @@ import java.net.URL;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
+import net.sf.ehcache.Cache;
 import net.sf.ehcache.CacheException;
 import net.sf.ehcache.CacheManager;
 import net.sf.ehcache.Ehcache;
 import net.sf.ehcache.Element;
+import net.sf.ehcache.config.CacheConfiguration;
 import net.sf.ehcache.event.CacheEventListener;
 
 import org.apache.hadoop.conf.Configuration;
@@ -50,15 +51,22 @@ public class EhcacheHCatDependencyCache 
     public static String CONF_CACHE_NAME = PartitionDependencyManagerService.CONF_PREFIX + "cache.ehcache.name";
 
     private CacheManager cacheManager;
+
     /**
-     * Cache with key server#db#table#pk1=val;pk2=val2 and value as WaitingActions (list of
+     * Map of server to EhCache which has key as db#table#pk1;pk2#val;val2 and value as WaitingActions (list of
      * WaitingAction) which is Serializable (for overflowToDisk)
      */
-    private Ehcache missingCache;
+    private ConcurrentMap<String, Cache> missingDepsByServer;
+
+    private CacheConfiguration cacheConfig;
     /**
-     * Map of server#db#table - sorted part key pattern - count of elements in the cache
+     * Map of server#db#table - sorted part key pattern - count of different partition values (count
+     * of elements in the cache) still missing for a partition key pattern. This count is used to
+     * quickly determine if there are any more missing dependencies for a table. When the count
+     * becomes 0, we unregister from notifications as there are no more missing dependencies for
+     * that table.
      */
-    private ConcurrentMap<String, Map<String, SettableInteger>> partKeyPatterns;
+    private ConcurrentMap<String, ConcurrentMap<String, SettableInteger>> partKeyPatterns;
     /**
      * Map of actionIDs and collection of available URIs
      */
@@ -79,22 +87,41 @@ public class EhcacheHCatDependencyCache 
             throw new IllegalStateException("ehcache.xml is not found in classpath");
         }
         cacheManager = CacheManager.newInstance(cacheConfigURL);
-        missingCache = cacheManager.getCache(cacheName);
-        if (missingCache == null) {
+        final Cache specifiedCache = cacheManager.getCache(cacheName);
+        if (specifiedCache == null) {
             throw new IllegalStateException("Cache " + cacheName + " configured in " + CONF_CACHE_NAME
                     + " is not found");
         }
-        missingCache.getCacheEventNotificationService().registerListener(this);
-        partKeyPatterns = new ConcurrentHashMap<String, Map<String, SettableInteger>>();
+        cacheConfig = specifiedCache.getCacheConfiguration();
+        missingDepsByServer = new ConcurrentHashMap<String, Cache>();
+        partKeyPatterns = new ConcurrentHashMap<String, ConcurrentMap<String, SettableInteger>>();
         availableDeps = new ConcurrentHashMap<String, Collection<String>>();
     }
 
     @Override
     public void addMissingDependency(HCatURI hcatURI, String actionID) {
+
+        // Create cache for the server if we don't have one
+        Cache missingCache = missingDepsByServer.get(hcatURI.getServer());
+        if (missingCache == null) {
+            CacheConfiguration clonedConfig = cacheConfig.clone();
+            clonedConfig.setName(hcatURI.getServer());
+            missingCache = new Cache(clonedConfig);
+            Cache exists = missingDepsByServer.putIfAbsent(hcatURI.getServer(), missingCache);
+            if (exists == null) {
+                cacheManager.addCache(missingCache);
+                missingCache.getCacheEventNotificationService().registerListener(this);
+            }
+            else {
+                missingCache.dispose(); //discard
+            }
+        }
+
+        // Add hcat uri into the missingCache
         SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap());
         String partKeys = sortedPKV.getPartKeys();
-        String missingKey = hcatURI.getServer() + TABLE_DELIMITER + hcatURI.getDb() + TABLE_DELIMITER
-                + hcatURI.getTable() + TABLE_DELIMITER + partKeys + TABLE_DELIMITER + sortedPKV.getPartVals();
+        String missingKey = hcatURI.getDb() + TABLE_DELIMITER + hcatURI.getTable() + TABLE_DELIMITER
+                + partKeys + TABLE_DELIMITER + sortedPKV.getPartVals();
         boolean newlyAdded = true;
         synchronized (missingCache) {
             Element element = missingCache.get(missingKey);
@@ -114,13 +141,15 @@ public class EhcacheHCatDependencyCache 
                 waitingActions.add(new WaitingAction(actionID, hcatURI.toURIString()));
             }
         }
+
+        // Increment count for the partition key pattern
         if (newlyAdded) {
             String tableKey = hcatURI.getServer() + TABLE_DELIMITER + hcatURI.getDb() + TABLE_DELIMITER
                     + hcatURI.getTable();
             synchronized (partKeyPatterns) {
-                Map<String, SettableInteger> patternCounts = partKeyPatterns.get(tableKey);
+                ConcurrentMap<String, SettableInteger> patternCounts = partKeyPatterns.get(tableKey);
                 if (patternCounts == null) {
-                    patternCounts = new HashMap<String, SettableInteger>();
+                    patternCounts = new ConcurrentHashMap<String, SettableInteger>();
                     partKeyPatterns.put(tableKey, patternCounts);
                 }
                 SettableInteger count = patternCounts.get(partKeys);
@@ -136,29 +165,30 @@ public class EhcacheHCatDependencyCache 
 
     @Override
     public boolean removeMissingDependency(HCatURI hcatURI, String actionID) {
+
+        Cache missingCache = missingDepsByServer.get(hcatURI.getServer());
+        if (missingCache == null) {
+            LOG.warn("Remove missing dependency - Missing cache entry for server - uri={0}, actionID={1}",
+                    hcatURI.toURIString(), actionID);
+            return false;
+        }
         SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap());
         String partKeys = sortedPKV.getPartKeys();
-        String missingKey = hcatURI.getServer() + TABLE_DELIMITER + hcatURI.getDb() + TABLE_DELIMITER
-                + hcatURI.getTable() + TABLE_DELIMITER + partKeys + TABLE_DELIMITER + sortedPKV.getPartVals();
+        String missingKey = hcatURI.getDb() + TABLE_DELIMITER + hcatURI.getTable() + TABLE_DELIMITER +
+                partKeys + TABLE_DELIMITER + sortedPKV.getPartVals();
         boolean decrement = false;
         boolean removed = false;
         synchronized (missingCache) {
             Element element = missingCache.get(missingKey);
             if (element == null) {
-                LOG.debug("Remove missing dependency - Missing cache entry - uri={0}, actionID={1}",
+                LOG.warn("Remove missing dependency - Missing cache entry - uri={0}, actionID={1}",
                         hcatURI.toURIString(), actionID);
                 return false;
             }
             Collection<WaitingAction> waitingActions = ((WaitingActions) element.getObjectValue()).getWaitingActions();
-            WaitingAction wAction = null;
-            for (WaitingAction action : waitingActions) {
-                if (action.getActionID().equals(actionID)) {
-                    wAction = action;
-                }
-            }
-            removed = waitingActions.remove(wAction);
+            removed = waitingActions.remove(new WaitingAction(actionID, hcatURI.toURIString()));
             if (!removed) {
-                LOG.debug("Remove missing dependency - Missing action ID - uri={0}, actionID={1}",
+                LOG.warn("Remove missing dependency - Missing action ID - uri={0}, actionID={1}",
                         hcatURI.toURIString(), actionID);
             }
             if (waitingActions.isEmpty()) {
@@ -166,6 +196,7 @@ public class EhcacheHCatDependencyCache 
                 decrement = true;
             }
         }
+        // Decrement partition key pattern count if the cache entry is removed
         if (decrement) {
             String tableKey = hcatURI.getServer() + TABLE_DELIMITER + hcatURI.getDb() + TABLE_DELIMITER
                     + hcatURI.getTable();
@@ -176,20 +207,22 @@ public class EhcacheHCatDependencyCache 
 
     @Override
     public Collection<String> getWaitingActions(HCatURI hcatURI) {
-        SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap());
-        String missingKey = hcatURI.getServer() + TABLE_DELIMITER + hcatURI.getDb() + TABLE_DELIMITER
-                + hcatURI.getTable() + TABLE_DELIMITER + sortedPKV.getPartKeys() + TABLE_DELIMITER
-                + sortedPKV.getPartVals();
-        Element element = missingCache.get(missingKey);
-        if (element == null) {
-            return null;
-        }
-        WaitingActions waitingActions = (WaitingActions) element.getObjectValue();
-        Collection<String> actionIDs = new ArrayList<String>();
-        String uriString = hcatURI.getURI().toString();
-        for (WaitingAction action : waitingActions.getWaitingActions()) {
-            if (action.getDependencyURI().equals(uriString)) {
-                actionIDs.add(action.getActionID());
+        Collection<String> actionIDs = null;
+        Cache missingCache = missingDepsByServer.get(hcatURI.getServer());
+        if (missingCache != null) {
+            SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap());
+            String missingKey = hcatURI.getDb() + TABLE_DELIMITER + hcatURI.getTable() + TABLE_DELIMITER
+                    + sortedPKV.getPartKeys() + TABLE_DELIMITER + sortedPKV.getPartVals();
+            Element element = missingCache.get(missingKey);
+            if (element != null) {
+                WaitingActions waitingActions = (WaitingActions) element.getObjectValue();
+                actionIDs = new ArrayList<String>();
+                String uriString = hcatURI.getURI().toString();
+                for (WaitingAction action : waitingActions.getWaitingActions()) {
+                    if (action.getDependencyURI().equals(uriString)) {
+                        actionIDs.add(action.getActionID());
+                    }
+                }
             }
         }
         return actionIDs;
@@ -199,63 +232,80 @@ public class EhcacheHCatDependencyCache 
     public Collection<String> markDependencyAvailable(String server, String db, String table,
             Map<String, String> partitions) {
         String tableKey = server + TABLE_DELIMITER + db + TABLE_DELIMITER + table;
-        Map<String, SettableInteger> patternCounts = partKeyPatterns.get(tableKey);
-        if (patternCounts == null) {
-            LOG.warn("Got partition available notification for " + tableKey
-                    + ". Unexpected and should not be listening to topic. Unregistering topic");
-            HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
-            hcatService.unregisterFromNotification(server, db, table);
-            return null;
-        }
-        Collection<String> actionsWithAvailDep = new HashSet<String>();
-        StringBuilder partValSB = new StringBuilder();
-        // If partition patterns are date, date;country and date;country;state,
-        // construct the partition values for each pattern and for the matching value in the
-        // missingCache, get the waiting actions and mark it as available.
-        for (Entry<String, SettableInteger> entry : patternCounts.entrySet()) {
-            String[] partKeys = entry.getKey().split(PARTITION_DELIMITER);
-            partValSB.setLength(0);
-            for (String key : partKeys) {
-                partValSB.append(partitions.get(key)).append(PARTITION_DELIMITER);
-            }
-            partValSB.setLength(partValSB.length() - 1);
-            String missingKey = tableKey + TABLE_DELIMITER + entry.getKey() + TABLE_DELIMITER + partValSB.toString();
-            boolean removed = false;
-            Element element = null;
-            synchronized (missingCache) {
-                element = missingCache.get(missingKey);
-                if (element != null) {
-                    missingCache.remove(missingKey);
-                    removed = true;
-                }
-            }
-            if (removed) {
-                decrementPartKeyPatternCount(tableKey, entry.getKey(), missingKey);
-                Collection<WaitingAction> wActions = ((WaitingActions) element.getObjectValue()).getWaitingActions();
-                for (WaitingAction wAction : wActions) {
-                    String actionID = wAction.getActionID();
-                    actionsWithAvailDep.add(actionID);
-                    Collection<String> depURIs = availableDeps.get(actionID);
-                    if (depURIs == null) {
-                        depURIs = new ArrayList<String>();
-                        Collection<String> existing = availableDeps.putIfAbsent(actionID, depURIs);
-                        if (existing != null) {
-                            depURIs = existing;
-                        }
+        synchronized (partKeyPatterns) {
+            Map<String, SettableInteger> patternCounts = partKeyPatterns.get(tableKey);
+            if (patternCounts == null) {
+                LOG.warn("Got partition available notification for " + tableKey
+                        + ". Unexpected as no matching partition keys. Unregistering topic");
+                unregisterFromNotifications(server, db, table);
+                return null;
+            }
+            Cache missingCache = missingDepsByServer.get(server);
+            if (missingCache == null) {
+                LOG.warn("Got partition available notification for " + tableKey
+                        + ". Unexpected. Missing server entry in cache. Unregistering topic");
+                partKeyPatterns.remove(tableKey);
+                unregisterFromNotifications(server, db, table);
+                return null;
+            }
+            Collection<String> actionsWithAvailDep = new HashSet<String>();
+            StringBuilder partValSB = new StringBuilder();
+            // If partition patterns are date, date;country and date;country;state,
+            // construct the partition values for each pattern and for the matching value in the
+            // missingCache, get the waiting actions and mark it as available.
+            for (Entry<String, SettableInteger> entry : patternCounts.entrySet()) {
+                String[] partKeys = entry.getKey().split(PARTITION_DELIMITER);
+                partValSB.setLength(0);
+                for (String key : partKeys) {
+                    partValSB.append(partitions.get(key)).append(PARTITION_DELIMITER);
+                }
+                partValSB.setLength(partValSB.length() - 1);
+                String missingKey = db + TABLE_DELIMITER + table + TABLE_DELIMITER + entry.getKey() + TABLE_DELIMITER
+                        + partValSB.toString();
+                boolean removed = false;
+                Element element = null;
+                synchronized (missingCache) {
+                    element = missingCache.get(missingKey);
+                    if (element != null) {
+                        missingCache.remove(missingKey);
+                        removed = true;
                     }
-                    synchronized (depURIs) {
-                        depURIs.add(wAction.getDependencyURI());
-                        availableDeps.put(actionID, depURIs);
+                }
+                if (removed) {
+                    decrementPartKeyPatternCount(tableKey, entry.getKey(), server + TABLE_DELIMITER + missingKey);
+                    // Add the removed entry to available dependencies
+                    Collection<WaitingAction> wActions = ((WaitingActions) element.getObjectValue())
+                            .getWaitingActions();
+                    for (WaitingAction wAction : wActions) {
+                        String actionID = wAction.getActionID();
+                        actionsWithAvailDep.add(actionID);
+                        Collection<String> depURIs = availableDeps.get(actionID);
+                        if (depURIs == null) {
+                            depURIs = new ArrayList<String>();
+                            Collection<String> existing = availableDeps.putIfAbsent(actionID, depURIs);
+                            if (existing != null) {
+                                depURIs = existing;
+                            }
+                        }
+                        synchronized (depURIs) {
+                            depURIs.add(wAction.getDependencyURI());
+                            availableDeps.put(actionID, depURIs);
+                        }
                     }
                 }
             }
+            return actionsWithAvailDep;
         }
-        return actionsWithAvailDep;
     }
 
     @Override
     public Collection<String> getAvailableDependencyURIs(String actionID) {
-        return availableDeps.get(actionID);
+        Collection<String> available = availableDeps.get(actionID);
+        if (available !=  null) {
+            // Return a copy
+            available = new ArrayList<String>(available);
+        }
+        return available;
     }
 
     @Override
@@ -296,7 +346,7 @@ public class EhcacheHCatDependencyCache 
     public void notifyElementExpired(Ehcache cache, Element element) {
         // Invoked when timeToIdleSeconds or timeToLiveSeconds is met
         String missingDepKey = (String) element.getObjectKey();
-        LOG.info("Cache entry [{0}] expired", missingDepKey);
+        LOG.info("Cache entry [{0}] of cache [{1}] expired", missingDepKey, cache.getName());
         onExpiryOrEviction(cache, element, missingDepKey);
     }
 
@@ -321,29 +371,37 @@ public class EhcacheHCatDependencyCache 
     public void notifyElementEvicted(Ehcache cache, Element element) {
         // Invoked when maxElementsInMemory is met
         String missingDepKey = (String) element.getObjectKey();
-        LOG.info("Cache entry [{0}] evicted", missingDepKey);
+        LOG.info("Cache entry [{0}] of cache [{1}] evicted", missingDepKey, cache.getName());
         onExpiryOrEviction(cache, element, missingDepKey);
     }
 
     private void onExpiryOrEviction(Ehcache cache, Element element, String missingDepKey) {
         int partValIndex = missingDepKey.lastIndexOf(TABLE_DELIMITER);
         int partKeyIndex = missingDepKey.lastIndexOf(TABLE_DELIMITER, partValIndex - 1);
-        String tableKey = missingDepKey.substring(0, partKeyIndex);
+        // server#db#table. Name of the cache is that of the server.
+        String tableKey = cache.getName() + TABLE_DELIMITER + missingDepKey.substring(0, partKeyIndex);
         String partKeys = missingDepKey.substring(partKeyIndex + 1, partValIndex);
         decrementPartKeyPatternCount(tableKey, partKeys, missingDepKey);
     }
 
+    /**
+     * Decrement partition key pattern count, once a hcat URI is removed from the cache
+     *
+     * @param tableKey key identifying the table - server#db#table
+     * @param partKeys partition key pattern
+     * @param hcatURI URI with the partition key pattern
+     */
     private void decrementPartKeyPatternCount(String tableKey, String partKeys, String hcatURI) {
         synchronized (partKeyPatterns) {
             Map<String, SettableInteger> patternCounts = partKeyPatterns.get(tableKey);
             if (patternCounts == null) {
-                LOG.debug("Removed dependency - Missing cache entry - uri={0}. "
+                LOG.warn("Removed dependency - Missing cache entry - uri={0}. "
                         + "But no corresponding pattern key table entry", hcatURI);
             }
             else {
                 SettableInteger count = patternCounts.get(partKeys);
                 if (count == null) {
-                    LOG.debug("Removed dependency - Missing cache entry - uri={0}. "
+                    LOG.warn("Removed dependency - Missing cache entry - uri={0}. "
                             + "But no corresponding pattern key entry", hcatURI);
                 }
                 else {
@@ -353,16 +411,20 @@ public class EhcacheHCatDependencyCache 
                     }
                     if (patternCounts.isEmpty()) {
                         partKeyPatterns.remove(tableKey);
-                        // Close JMS session. Stop listening on topic
-                        HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
                         String[] tableDetails = tableKey.split(TABLE_DELIMITER);
-                        hcatService.unregisterFromNotification(tableDetails[0], tableDetails[1], tableDetails[2]);
+                        unregisterFromNotifications(tableDetails[0], tableDetails[1], tableDetails[2]);
                     }
                 }
             }
         }
     }
 
+    private void unregisterFromNotifications(String server, String db, String table) {
+        // Close JMS session. Stop listening on topic
+        HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
+        hcatService.unregisterFromNotification(server, db, table);
+    }
+
     private static class SortedPKV {
         private StringBuilder partKeys;
         private StringBuilder partVals;

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/dependency/hcat/SimpleHCatDependencyCache.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/dependency/hcat/SimpleHCatDependencyCache.java?rev=1455791&r1=1455790&r2=1455791&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/dependency/hcat/SimpleHCatDependencyCache.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/dependency/hcat/SimpleHCatDependencyCache.java Wed Mar 13 03:55:35 2013
@@ -86,7 +86,7 @@ public class SimpleHCatDependencyCache i
             }
             Collection<WaitingAction> waitingActions = partValues.get(partVal);
             if (waitingActions == null) {
-                waitingActions = new ArrayList<WaitingAction>();
+                waitingActions = new HashSet<WaitingAction>();
                 partValues.put(partVal, waitingActions);
             }
             waitingActions.add(new WaitingAction(actionID, hcatURI.toURIString()));
@@ -101,32 +101,26 @@ public class SimpleHCatDependencyCache i
         String partVal = sortedPKV.getPartVals();
         Map<String, Map<String, Collection<WaitingAction>>> partKeyPatterns = missingDeps.get(tableKey);
         if (partKeyPatterns == null) {
-            LOG.debug("Remove missing dependency - Missing table entry - uri={0}, actionID={1}",
+            LOG.warn("Remove missing dependency - Missing table entry - uri={0}, actionID={1}",
                     hcatURI.toURIString(), actionID);
             return false;
         }
         synchronized(partKeyPatterns) {
             Map<String, Collection<WaitingAction>> partValues = partKeyPatterns.get(partKey);
             if (partValues == null) {
-                LOG.debug("Remove missing dependency - Missing partition pattern - uri={0}, actionID={1}",
+                LOG.warn("Remove missing dependency - Missing partition pattern - uri={0}, actionID={1}",
                         hcatURI.toURIString(), actionID);
                 return false;
             }
             Collection<WaitingAction> waitingActions = partValues.get(partVal);
             if (waitingActions == null) {
-                LOG.debug("Remove missing dependency - Missing partition value - uri={0}, actionID={1}",
+                LOG.warn("Remove missing dependency - Missing partition value - uri={0}, actionID={1}",
                         hcatURI.toURIString(), actionID);
                 return false;
             }
-            WaitingAction wAction = null;
-            for (WaitingAction action : waitingActions) {
-                if (action.getActionID().equals(actionID)) {
-                    wAction = action;
-                }
-            }
-            boolean removed = waitingActions.remove(wAction);
+            boolean removed = waitingActions.remove(new WaitingAction(actionID, hcatURI.toURIString()));
             if (!removed) {
-                LOG.debug("Remove missing dependency - Missing action ID - uri={0}, actionID={1}",
+                LOG.warn("Remove missing dependency - Missing action ID - uri={0}, actionID={1}",
                         hcatURI.toURIString(), actionID);
             }
             if (waitingActions.isEmpty()) {
@@ -182,7 +176,7 @@ public class SimpleHCatDependencyCache i
             LOG.warn("Got partition available notification for " + tableKey
                     + ". Unexpected and should not be listening to topic. Unregistering topic");
             HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
-            hcatService.unregisterFromNotification(server,db, table);
+            hcatService.unregisterFromNotification(server, db, table);
             return null;
         }
         Collection<String> actionsWithAvailDep = new HashSet<String>();

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/dependency/hcat/WaitingAction.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/dependency/hcat/WaitingAction.java?rev=1455791&r1=1455790&r2=1455791&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/dependency/hcat/WaitingAction.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/dependency/hcat/WaitingAction.java Wed Mar 13 03:55:35 2013
@@ -46,4 +46,26 @@ public class WaitingAction implements Se
         return dependencyURI;
     }
 
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = prime + ((actionID == null) ? 0 : actionID.hashCode());
+        result = prime * result + ((dependencyURI == null) ? 0 : dependencyURI.hashCode());
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        WaitingAction other = (WaitingAction) obj;
+        return actionID.equals(other.actionID) && dependencyURI.equals(other.dependencyURI);
+    }
+
+    @Override
+    public String toString() {
+        return "WaitingAction [actionID=" + actionID + ", dependencyURI=" + dependencyURI + "]";
+    }
 }

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/dependency/hcat/WaitingActions.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/dependency/hcat/WaitingActions.java?rev=1455791&r1=1455790&r2=1455791&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/dependency/hcat/WaitingActions.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/dependency/hcat/WaitingActions.java Wed Mar 13 03:55:35 2013
@@ -18,16 +18,16 @@
 package org.apache.oozie.dependency.hcat;
 
 import java.io.Serializable;
-import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashSet;
 
 public class WaitingActions implements Serializable {
 
     private static final long serialVersionUID = 1L;
-    private ArrayList<WaitingAction> waitingActions;
+    private Collection<WaitingAction> waitingActions;
 
     public WaitingActions() {
-        waitingActions = new ArrayList<WaitingAction>();
+        waitingActions = new HashSet<WaitingAction>();
     }
 
     /**

Modified: oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java?rev=1455791&r1=1455790&r2=1455791&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java Wed Mar 13 03:55:35 2013
@@ -236,6 +236,43 @@ public class TestCoordPushDependencyChec
     }
 
     @Test
+    public void testTimeOutWithUnresolvedMissingDependencies() throws Exception {
+        String db = "default";
+        String table = "tablename";
+        String newHCatDependency1 = "hcat://" + server + "/" + db + "/" + table + "/dt=20120430;country=brazil";
+        String newHCatDependency2 = "hcat://" + server + "/" + db + "/" + table + "/dt=20120430;country=usa";
+        String newHCatDependency3 = "hcat://" + server + "/" + db + "/" + table + "/dt=20120430;country=uk";
+        String newHCatDependency = newHCatDependency1 + CoordELFunctions.INSTANCE_SEPARATOR + newHCatDependency2;
+        populateTable(db, table);
+
+        String actionId = addInitRecords(newHCatDependency);
+        checkCoordAction(actionId, newHCatDependency, CoordinatorAction.Status.WAITING);
+        new CoordPushDependencyCheckXCommand(actionId, true).call();
+        checkCoordAction(actionId, newHCatDependency1, CoordinatorAction.Status.WAITING);
+        PartitionDependencyManagerService pdms = Services.get().get(PartitionDependencyManagerService.class);
+        HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
+        assertTrue(pdms.getWaitingActions(new HCatURI(newHCatDependency1)).contains(actionId));
+        assertTrue(hcatService.isRegisteredForNotification(new HCatURI(newHCatDependency1)));
+
+        // Timeout is 10 mins. Change action created time to before 12 min to make the action
+        // timeout.
+        long timeOutCreationTime = System.currentTimeMillis() - (12 * 60 * 1000);
+        setCoordActionCreationTime(actionId, timeOutCreationTime);
+        // Set some missing dependency. Instead of latest or future just setting a current one for testing as
+        // we are only interested in ensuring CoordActionInputCheckXCommand is run
+        setMissingDependencies(actionId, newHCatDependency + CoordELFunctions.INSTANCE_SEPARATOR + newHCatDependency3);
+        addPartition(db, table, "dt=20120430;country=brazil");
+        checkDependencies(actionId, newHCatDependency + CoordELFunctions.INSTANCE_SEPARATOR + newHCatDependency3,
+                newHCatDependency1);
+        new CoordPushDependencyCheckXCommand(actionId).call();
+        Thread.sleep(300);
+
+        checkDependencies(actionId, newHCatDependency3, "");
+        assertNull(pdms.getWaitingActions(new HCatURI(newHCatDependency1)));
+        assertFalse(hcatService.isRegisteredForNotification(new HCatURI(newHCatDependency1)));
+    }
+
+    @Test
     public void testTimeOutWithException1() throws Exception {
         // Test timeout when missing dependencies are from a non existing table
         String newHCatDependency1 = "hcat://" + server + "/nodb/notable/dt=20120430;country=brazil";
@@ -360,4 +397,18 @@ public class TestCoordPushDependencyChec
         }
     }
 
+    private CoordinatorActionBean checkDependencies(String actionId, String expDeps, String expPushDeps)
+            throws Exception {
+        try {
+            JPAService jpaService = Services.get().get(JPAService.class);
+            CoordinatorActionBean action = jpaService.execute(new CoordActionGetJPAExecutor(actionId));
+            assertEquals(expDeps, action.getMissingDependencies());
+            assertEquals(expPushDeps, action.getPushMissingDependencies());
+            return action;
+        }
+        catch (JPAExecutorException se) {
+            throw new Exception("Action ID " + actionId + " was not stored properly in db");
+        }
+    }
+
 }

Modified: oozie/trunk/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerEhcache.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerEhcache.java?rev=1455791&r1=1455790&r2=1455791&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerEhcache.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerEhcache.java Wed Mar 13 03:55:35 2013
@@ -112,18 +112,18 @@ public class TestPartitionDependencyMana
             HCatURI dep = new HCatURI("hcat://hcat.server.com:5080/mydb/mytbl/id=" + i);
             pdms.addMissingDependency(dep, "" + i);
         }
-        // First 500 should have been evicted. But it is LRU and the next 100 removed is between 350 and 650.
-        for (int i = 0; i < 350; i++) {
+        // First 500 should have been evicted. But it is LRU and the last 200 removed is between 300 and 700.
+        for (int i = 0; i < 300; i++) {
             assertNull(pdms.getWaitingActions(new HCatURI("hcat://hcat.server.com:5080/mydb/mytbl/id=" + "" + i)));
         }
         int evicted = 0;
-        for (int i = 350; i < 650; i++) {
+        for (int i = 300; i < 700; i++) {
             if (pdms.getWaitingActions(new HCatURI("hcat://hcat.server.com:5080/mydb/mytbl/id=" + "" + i)) == null) {
                 evicted++;
             }
         }
-        assertEquals(150, evicted);
-        for (int i = 650; i < 1000; i++) {
+        assertEquals(200, evicted);
+        for (int i = 700; i < 1000; i++) {
             String actionID = "" + i;
             HCatURI dep = new HCatURI("hcat://hcat.server.com:5080/mydb/mytbl/id=" + actionID);
             Collection<String> waitingActions = pdms.getWaitingActions(dep);

Modified: oozie/trunk/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java?rev=1455791&r1=1455790&r2=1455791&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java Wed Mar 13 03:55:35 2013
@@ -22,6 +22,8 @@ import java.lang.management.MemoryMXBean
 import java.net.URISyntaxException;
 import java.util.Collection;
 
+import org.apache.oozie.dependency.hcat.HCatMessageHandler;
+import org.apache.oozie.jms.JMSConnectionInfo;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.test.XDataTestCase;
 import org.apache.oozie.util.HCatURI;
@@ -59,27 +61,44 @@ public class TestPartitionDependencyMana
         String actionId3 = "1234465453";
         String actionId4 = "1234465454";
 
-        String server = "hcat.server.com:5080";
+        String server1 = "hcat-server1.domain.com:5080";
+        String server2 = "hcat-server2.domain.com:5080";
         String db = "mydb";
-        String table = "mytbl";
+        String table1 = "mytbl1";
+        String table2 = "mytbl2";
         // add partition as missing
-        HCatURI dep1 = new HCatURI("hcat://hcat.server.com:5080/mydb/mytbl/dt=20120101;country=us");
-        HCatURI dep2 = new HCatURI("hcat://hcat.server.com:5080/mydb/mytbl/country=us;dt=20120101");
-        HCatURI dep3 = new HCatURI("hcat://hcat.server.com:5080/mydb/mytbl/dt=20120102;country=us");
-        HCatURI dep4 = new HCatURI("hcat://hcat.server.com:5080/mydb/mytbl/dt=20120102;country=us;state=CA");
+        HCatURI dep1 = new HCatURI("hcat://hcat-server1.domain.com:5080/mydb/mytbl1/dt=20120101;country=us");
+        HCatURI dep2 = new HCatURI("hcat://hcat-server1.domain.com:5080/mydb/mytbl1/country=us;dt=20120101");
+        HCatURI dep3 = new HCatURI("hcat://hcat-server2.domain.com:5080/mydb/mytbl2/dt=20120102;country=us");
+        HCatURI dep4 = new HCatURI("hcat://hcat-server2.domain.com:5080/mydb/mytbl2/dt=20120102;country=us;state=CA");
+
+        addMissingDependencyAndRegister(dep1, actionId1);
+        addMissingDependencyAndRegister(dep2, actionId1);
+        addMissingDependencyAndRegister(dep2, actionId2);
+        addMissingDependencyAndRegister(dep2, actionId3);
+        addMissingDependencyAndRegister(dep3, actionId3);
+        addMissingDependencyAndRegister(dep4, actionId4);
+        // Add duplicates. RecoveryService will add duplicates
+        addMissingDependencyAndRegister(dep4, actionId4);
+        addMissingDependencyAndRegister(dep4, actionId4);
+
+        HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
+        JMSAccessorService jmsService = Services.get().get(JMSAccessorService.class);
+        JMSConnectionInfo connInfo = hcatService.getJMSConnectionInfo(dep1.getURI());
+        assertTrue(hcatService.isRegisteredForNotification(dep1)); //server1,db,table1
+        assertTrue(hcatService.isRegisteredForNotification(dep3)); //server2,db,table2
+        assertTrue(jmsService.isListeningToTopic(connInfo, dep1.getDb() + "." + dep1.getTable()));
+        assertTrue(jmsService.isListeningToTopic(connInfo, dep3.getDb() + "." + dep3.getTable()));
+
         PartitionDependencyManagerService pdms = Services.get().get(PartitionDependencyManagerService.class);
-        pdms.addMissingDependency(dep1, actionId1);
-        pdms.addMissingDependency(dep2, actionId1);
-        pdms.addMissingDependency(dep2, actionId2);
-        pdms.addMissingDependency(dep2, actionId3);
-        pdms.addMissingDependency(dep3, actionId3);
-        pdms.addMissingDependency(dep4, actionId4);
         assertTrue(pdms.getWaitingActions(dep1).contains(actionId1));
         assertTrue(pdms.getWaitingActions(dep2).contains(actionId1));
         assertTrue(pdms.getWaitingActions(dep2).contains(actionId2));
         assertTrue(pdms.getWaitingActions(dep2).contains(actionId2));
         assertTrue(pdms.getWaitingActions(dep3).contains(actionId3));
         assertTrue(pdms.getWaitingActions(dep4).contains(actionId4));
+        // Should not contain duplicates
+        assertEquals(1, pdms.getWaitingActions(dep4).size());
 
         pdms.removeMissingDependency(dep2, actionId1);
         assertTrue(pdms.getWaitingActions(dep1).contains(actionId1));
@@ -87,15 +106,15 @@ public class TestPartitionDependencyMana
         assertTrue(!pdms.getWaitingActions(dep2).contains(actionId1));
         assertNull(pdms.getAvailableDependencyURIs(actionId1));
 
-        pdms.partitionAvailable(server, db, table, getPartitionMap("dt=20120102;country=us;state=NY"));
+        pdms.partitionAvailable(server2, db, table2, getPartitionMap("dt=20120102;country=us;state=NY"));
         assertNull(pdms.getWaitingActions(dep3));
         assertTrue(pdms.getAvailableDependencyURIs(actionId3).contains(dep3.getURI().toString()));
 
-        pdms.partitionAvailable(server, db, table, getPartitionMap("dt=20120102;country=us;state=CA"));
+        pdms.partitionAvailable(server2, db, table2, getPartitionMap("dt=20120102;country=us;state=CA"));
         assertNull(pdms.getWaitingActions(dep4));
         assertTrue(pdms.getAvailableDependencyURIs(actionId4).contains(dep4.getURI().toString()));
 
-        pdms.partitionAvailable(server, db, table, getPartitionMap("dt=20120101;country=us"));
+        pdms.partitionAvailable(server1, db, table1, getPartitionMap("dt=20120101;country=us"));
         assertNull(pdms.getWaitingActions(dep1));
         assertNull(pdms.getWaitingActions(dep2));
         assertTrue(pdms.getAvailableDependencyURIs(actionId2).contains(dep2.getURI().toString()));
@@ -104,6 +123,21 @@ public class TestPartitionDependencyMana
 
         assertTrue(pdms.removeAvailableDependencyURIs(actionId3, pdms.getAvailableDependencyURIs(actionId3)));
         assertNull(pdms.getAvailableDependencyURIs(actionId3));
+
+        assertFalse(hcatService.isRegisteredForNotification(dep1)); //server1,db,table1
+        assertFalse(hcatService.isRegisteredForNotification(dep3)); //server2,db,table2
+        assertFalse(jmsService.isListeningToTopic(connInfo, dep1.getDb() + "." + dep1.getTable()));
+        assertFalse(jmsService.isListeningToTopic(connInfo, dep3.getDb() + "." + dep3.getTable()));
+    }
+
+    private void addMissingDependencyAndRegister(HCatURI hcatURI, String actionId) {
+        PartitionDependencyManagerService pdms = Services.get().get(PartitionDependencyManagerService.class);
+        pdms.addMissingDependency(hcatURI, actionId);
+        HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
+        if (!hcatService.isRegisteredForNotification(hcatURI)) {
+            hcatService.registerForNotification(hcatURI, hcatURI.getDb() + "." + hcatURI.getTable(),
+                    new HCatMessageHandler(hcatURI.getServer()));
+        }
     }
 
     @Test
@@ -158,4 +192,4 @@ public class TestPartitionDependencyMana
         }
     }
 
-}
+}
\ No newline at end of file

Modified: oozie/trunk/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/test/XDataTestCase.java?rev=1455791&r1=1455790&r2=1455791&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/test/XDataTestCase.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/test/XDataTestCase.java Wed Mar 13 03:55:35 2013
@@ -52,13 +52,14 @@ import org.apache.oozie.client.Coordinat
 import org.apache.oozie.client.CoordinatorJob.Timeunit;
 import org.apache.oozie.executor.jpa.BundleActionInsertJPAExecutor;
 import org.apache.oozie.executor.jpa.BundleJobInsertJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordActionInsertJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordActionUpdateJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.executor.jpa.SLAEventInsertJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowActionInsertJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobInsertJPAExecutor;
-import org.apache.oozie.service.CoordinatorStoreService;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.LiteWorkflowStoreService;
 import org.apache.oozie.service.Services;
@@ -66,7 +67,6 @@ import org.apache.oozie.service.UUIDServ
 import org.apache.oozie.service.WorkflowAppService;
 import org.apache.oozie.service.WorkflowStoreService;
 import org.apache.oozie.service.UUIDService.ApplicationType;
-import org.apache.oozie.store.CoordinatorStore;
 import org.apache.oozie.util.DateUtils;
 import org.apache.oozie.util.IOUtils;
 import org.apache.oozie.util.XConfiguration;
@@ -1358,11 +1358,16 @@ public abstract class XDataTestCase exte
     }
 
     protected void setCoordActionCreationTime(String actionId, long actionCreationTime) throws Exception {
-        CoordinatorStore store = Services.get().get(CoordinatorStoreService.class).create();
-        CoordinatorActionBean action = store.getCoordinatorAction(actionId, false);
+        JPAService jpaService = Services.get().get(JPAService.class);
+        CoordinatorActionBean action = jpaService.execute(new CoordActionGetJPAExecutor(actionId));
         action.setCreatedTime(new Date(actionCreationTime));
-        store.beginTrx();
-        store.updateCoordinatorAction(action);
-        store.commitTrx();
+        jpaService.execute(new CoordActionUpdateJPAExecutor(action));
+    }
+
+    protected void setMissingDependencies(String actionId, String missingDependencies) throws Exception {
+        JPAService jpaService = Services.get().get(JPAService.class);
+        CoordinatorActionBean action = jpaService.execute(new CoordActionGetJPAExecutor(actionId));
+        action.setMissingDependencies(missingDependencies);
+        jpaService.execute(new CoordActionUpdateJPAExecutor(action));
     }
 }

Modified: oozie/trunk/release-log.txt
URL: http://svn.apache.org/viewvc/oozie/trunk/release-log.txt?rev=1455791&r1=1455790&r2=1455791&view=diff
==============================================================================
--- oozie/trunk/release-log.txt (original)
+++ oozie/trunk/release-log.txt Wed Mar 13 03:55:35 2013
@@ -5,6 +5,7 @@ OOZIE-1239 Bump up trunk to 4.1.0-SNAPSH
 
 -- Oozie 4.0.0 (unreleased)
 
+OOZIE-1263 Fix few HCat dependency check issues (rohini via virag)
 OOZIE-1261 Registered push dependencies are not removed on Coord Kill command (virag)
 OOZIE-1191 add examples of coordinator with SLA tag inserted (ryota via mona)
 OOZIE-1204 Illustrate correct use of parameters inside SLA tags (jun aoki via mona)