You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by vi...@apache.org on 2013/03/13 04:55:35 UTC
svn commit: r1455791 - in /oozie/trunk: ./
core/src/main/java/org/apache/oozie/command/coord/
core/src/main/java/org/apache/oozie/dependency/
core/src/main/java/org/apache/oozie/dependency/hcat/
core/src/test/java/org/apache/oozie/command/coord/ core/s...
Author: virag
Date: Wed Mar 13 03:55:35 2013
New Revision: 1455791
URL: http://svn.apache.org/r1455791
Log:
OOZIE-1263 Fix few HCat dependency check issues (rohini via virag)
Modified:
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdatePushMissingDependency.java
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
oozie/trunk/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java
oozie/trunk/core/src/main/java/org/apache/oozie/dependency/hcat/EhcacheHCatDependencyCache.java
oozie/trunk/core/src/main/java/org/apache/oozie/dependency/hcat/SimpleHCatDependencyCache.java
oozie/trunk/core/src/main/java/org/apache/oozie/dependency/hcat/WaitingAction.java
oozie/trunk/core/src/main/java/org/apache/oozie/dependency/hcat/WaitingActions.java
oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java
oozie/trunk/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerEhcache.java
oozie/trunk/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java
oozie/trunk/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
oozie/trunk/release-log.txt
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdatePushMissingDependency.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdatePushMissingDependency.java?rev=1455791&r1=1455790&r2=1455791&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdatePushMissingDependency.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdatePushMissingDependency.java Wed Mar 13 03:55:35 2013
@@ -43,14 +43,14 @@ public class CoordActionUpdatePushMissin
PartitionDependencyManagerService pdms = Services.get().get(PartitionDependencyManagerService.class);
Collection<String> availDepList = pdms.getAvailableDependencyURIs(actionId);
if (availDepList == null || availDepList.size() == 0) {
- LOG.info("There are no available dependencies for action ID: [{0}]", actionId);
+ LOG.info("There are no available dependencies");
if (isTimeout()) { // Poll and check as one last try
queue(new CoordPushDependencyCheckXCommand(coordAction.getId()), 100);
}
}
else {
- LOG.debug("Updating action ID [{0}] with available uris=[{1}] where missing uris=[{2}]", actionId,
- availDepList.toString(), pushMissingDeps);
+ LOG.debug("Updating with available uris=[{0}] where missing uris=[{1}]", availDepList.toString(),
+ pushMissingDeps);
String[] missingDepsArray = DependencyChecker.dependenciesAsArray(pushMissingDeps);
List<String> stillMissingDepsList = new ArrayList<String>(Arrays.asList(missingDepsArray));
@@ -82,12 +82,10 @@ public class CoordActionUpdatePushMissin
private void removeAvailableDependencies(PartitionDependencyManagerService pdms, Collection<String> availDepList) {
if (pdms.removeAvailableDependencyURIs(actionId, availDepList)) {
- LOG.debug("Successfully removed uris [{0}] for actionId: [{1}] from available list",
- availDepList.toString(), actionId);
+ LOG.debug("Successfully removed uris [{0}] from available list", availDepList.toString());
}
else {
- LOG.warn("Failed to remove uris [{0}] for actionId: [{1}] from available list", availDepList.toString(),
- actionId);
+ LOG.warn("Failed to remove uris [{0}] from available list", availDepList.toString(), actionId);
}
}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java?rev=1455791&r1=1455790&r2=1455791&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java Wed Mar 13 03:55:35 2013
@@ -93,8 +93,11 @@ public class CoordPushDependencyCheckXCo
}
else {
String[] missingDepsArray = DependencyChecker.dependenciesAsArray(pushMissingDeps);
- LOG.info("First Push missing dependency for actionID [{0}] is [{1}] ", actionId, missingDepsArray[0]);
- LOG.trace("Push missing dependencies for actionID [{0}] is [{1}] ", actionId, pushMissingDeps);
+ LOG.info("First Push missing dependency is [{0}] ", missingDepsArray[0]);
+ LOG.trace("Push missing dependencies are [{0}] ", pushMissingDeps);
+ if (registerForNotification) {
+ LOG.debug("Register for notifications is true");
+ }
try {
Configuration actionConf = null;
@@ -142,7 +145,7 @@ public class CoordPushDependencyCheckXCo
registerForNotification(actionDep.getMissingDependencies(), actionConf);
}
else {
- unregisterAvailableDependencies(actionDep);
+ unregisterAvailableDependencies(actionDep.getAvailableDependencies());
}
if (timeout) {
unregisterMissingDependencies(actionDep.getMissingDependencies(), actionId);
@@ -203,6 +206,12 @@ public class CoordPushDependencyCheckXCo
queue(new CoordActionReadyXCommand(coordAction.getJobId()), 100);
}
}
+ else if (isTimeout()) {
+ // If it is timeout and all push dependencies are available but still some unresolved
+ // missing dependencies queue CoordActionInputCheckXCommand now. Else it will have to
+ // wait till RecoveryService kicks in
+ queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()));
+ }
}
private String resolveCoordConfiguration() throws CommandException {
@@ -245,32 +254,29 @@ public class CoordPushDependencyCheckXCo
URI missingURI = new URI(missingDep);
URIHandler handler = uriService.getURIHandler(missingURI);
handler.registerForNotification(missingURI, actionConf, user, actionId);
- LOG.debug("Registered uri [{0}] for actionId: [{1}] for notifications",
- missingURI, actionId);
+ LOG.debug("Registered uri [{0}] for notifications", missingURI);
}
catch (Exception e) {
- LOG.warn("Exception while registering uri for actionId: [{0}] for notifications", actionId, e);
+ LOG.warn("Exception while registering uri [{0}] for notifications", missingDep, e);
}
}
}
- private void unregisterAvailableDependencies(ActionDependency actionDependency) {
+ private void unregisterAvailableDependencies(List<String> availableDeps) {
URIHandlerService uriService = Services.get().get(URIHandlerService.class);
- for (String availableDep : actionDependency.getAvailableDependencies()) {
+ for (String availableDep : availableDeps) {
try {
URI availableURI = new URI(availableDep);
URIHandler handler = uriService.getURIHandler(availableURI);
if (handler.unregisterFromNotification(availableURI, actionId)) {
- LOG.debug("Successfully unregistered uri [{0}] for actionId: [{1}] from notifications",
- availableURI, actionId);
+ LOG.debug("Successfully unregistered uri [{0}] from notifications", availableURI);
}
else {
- LOG.warn("Unable to unregister uri [{0}] for actionId: [{1}] from notifications", availableURI,
- actionId);
+ LOG.warn("Unable to unregister uri [{0}] from notifications", availableURI);
}
}
catch (Exception e) {
- LOG.warn("Exception while unregistering uri for actionId: [{0}] for notifications", actionId, e);
+ LOG.warn("Exception while unregistering uri [{0}] from notifications", availableDep, e);
}
}
}
@@ -283,16 +289,14 @@ public class CoordPushDependencyCheckXCo
URI missingURI = new URI(missingDep);
URIHandler handler = uriService.getURIHandler(missingURI);
if (handler.unregisterFromNotification(missingURI, actionId)) {
- LOG.debug("Successfully unregistered uri [{0}] for actionId: [{1}] from notifications", missingURI,
- actionId);
+ LOG.debug("Successfully unregistered uri [{0}] from notifications", missingURI);
}
else {
- LOG.warn("Unable to unregister uri [{0}] for actionId: [{1}] from notifications", missingURI,
- actionId);
+ LOG.warn("Unable to unregister uri [{0}] from notifications", missingURI);
}
}
catch (Exception e) {
- LOG.warn("Exception while registering uri for actionId: [{0}] for notifications", actionId, e);
+ LOG.warn("Exception while unregistering uri [{0}] from notifications", missingDep, e);
}
}
}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java?rev=1455791&r1=1455790&r2=1455791&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java Wed Mar 13 03:55:35 2013
@@ -48,7 +48,6 @@ import org.apache.oozie.util.XLog;
public class HCatURIHandler implements URIHandler {
- private static XLog LOG = XLog.getLog(HCatURIHandler.class);
private Set<String> supportedSchemes;
private Map<String, DependencyType> dependencyTypes;
private List<Class<?>> classesToShip;
@@ -185,7 +184,8 @@ public class HCatURIHandler implements U
}
hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, serverURI);
try {
- LOG.info("Creating HCatClient for user [{0}] login_user [{1}] and server [{2}] ", user,
+ XLog.getLog(HCatURIHandler.class).info(
+ "Creating HCatClient for user [{0}] login_user [{1}] and server [{2}] ", user,
UserGroupInformation.getLoginUser(), serverURI);
// HiveMetastoreClient (hive 0.9) currently does not work if UGI has doAs
@@ -252,14 +252,13 @@ public class HCatURIHandler implements U
client.close();
}
catch (Exception ignore) {
- LOG.warn("Error closing hcat client", ignore);
+ XLog.getLog(HCatURIHandler.class).warn("Error closing hcat client", ignore);
}
}
}
static class HCatContext extends Context {
- private static XLog LOG = XLog.getLog(HCatContext.class);
private HCatClient hcatClient;
/**
@@ -289,7 +288,7 @@ public class HCatURIHandler implements U
hcatClient.close();
}
catch (Exception ignore) {
- LOG.warn("Error closing hcat client", ignore);
+ XLog.getLog(HCatContext.class).warn("Error closing hcat client", ignore);
}
}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/dependency/hcat/EhcacheHCatDependencyCache.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/dependency/hcat/EhcacheHCatDependencyCache.java?rev=1455791&r1=1455790&r2=1455791&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/dependency/hcat/EhcacheHCatDependencyCache.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/dependency/hcat/EhcacheHCatDependencyCache.java Wed Mar 13 03:55:35 2013
@@ -21,17 +21,18 @@ import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import net.sf.ehcache.Cache;
import net.sf.ehcache.CacheException;
import net.sf.ehcache.CacheManager;
import net.sf.ehcache.Ehcache;
import net.sf.ehcache.Element;
+import net.sf.ehcache.config.CacheConfiguration;
import net.sf.ehcache.event.CacheEventListener;
import org.apache.hadoop.conf.Configuration;
@@ -50,15 +51,22 @@ public class EhcacheHCatDependencyCache
public static String CONF_CACHE_NAME = PartitionDependencyManagerService.CONF_PREFIX + "cache.ehcache.name";
private CacheManager cacheManager;
+
/**
- * Cache with key server#db#table#pk1=val;pk2=val2 and value as WaitingActions (list of
+ * Map of server to EhCache which has key as db#table#pk1;pk2#val;val2 and value as WaitingActions (list of
* WaitingAction) which is Serializable (for overflowToDisk)
*/
- private Ehcache missingCache;
+ private ConcurrentMap<String, Cache> missingDepsByServer;
+
+ private CacheConfiguration cacheConfig;
/**
- * Map of server#db#table - sorted part key pattern - count of elements in the cache
+ * Map of server#db#table - sorted part key pattern - count of different partition values (count
+ * of elements in the cache) still missing for a partition key pattern. This count is used to
+ * quickly determine if there are any more missing dependencies for a table. When the count
+ * becomes 0, we unregister from notifications as there are no more missing dependencies for
+ * that table.
*/
- private ConcurrentMap<String, Map<String, SettableInteger>> partKeyPatterns;
+ private ConcurrentMap<String, ConcurrentMap<String, SettableInteger>> partKeyPatterns;
/**
* Map of actionIDs and collection of available URIs
*/
@@ -79,22 +87,41 @@ public class EhcacheHCatDependencyCache
throw new IllegalStateException("ehcache.xml is not found in classpath");
}
cacheManager = CacheManager.newInstance(cacheConfigURL);
- missingCache = cacheManager.getCache(cacheName);
- if (missingCache == null) {
+ final Cache specifiedCache = cacheManager.getCache(cacheName);
+ if (specifiedCache == null) {
throw new IllegalStateException("Cache " + cacheName + " configured in " + CONF_CACHE_NAME
+ " is not found");
}
- missingCache.getCacheEventNotificationService().registerListener(this);
- partKeyPatterns = new ConcurrentHashMap<String, Map<String, SettableInteger>>();
+ cacheConfig = specifiedCache.getCacheConfiguration();
+ missingDepsByServer = new ConcurrentHashMap<String, Cache>();
+ partKeyPatterns = new ConcurrentHashMap<String, ConcurrentMap<String, SettableInteger>>();
availableDeps = new ConcurrentHashMap<String, Collection<String>>();
}
@Override
public void addMissingDependency(HCatURI hcatURI, String actionID) {
+
+ // Create cache for the server if we don't have one
+ Cache missingCache = missingDepsByServer.get(hcatURI.getServer());
+ if (missingCache == null) {
+ CacheConfiguration clonedConfig = cacheConfig.clone();
+ clonedConfig.setName(hcatURI.getServer());
+ missingCache = new Cache(clonedConfig);
+ Cache exists = missingDepsByServer.putIfAbsent(hcatURI.getServer(), missingCache);
+ if (exists == null) {
+ cacheManager.addCache(missingCache);
+ missingCache.getCacheEventNotificationService().registerListener(this);
+ }
+ else {
+ missingCache.dispose(); //discard
+ }
+ }
+
+ // Add hcat uri into the missingCache
SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap());
String partKeys = sortedPKV.getPartKeys();
- String missingKey = hcatURI.getServer() + TABLE_DELIMITER + hcatURI.getDb() + TABLE_DELIMITER
- + hcatURI.getTable() + TABLE_DELIMITER + partKeys + TABLE_DELIMITER + sortedPKV.getPartVals();
+ String missingKey = hcatURI.getDb() + TABLE_DELIMITER + hcatURI.getTable() + TABLE_DELIMITER
+ + partKeys + TABLE_DELIMITER + sortedPKV.getPartVals();
boolean newlyAdded = true;
synchronized (missingCache) {
Element element = missingCache.get(missingKey);
@@ -114,13 +141,15 @@ public class EhcacheHCatDependencyCache
waitingActions.add(new WaitingAction(actionID, hcatURI.toURIString()));
}
}
+
+ // Increment count for the partition key pattern
if (newlyAdded) {
String tableKey = hcatURI.getServer() + TABLE_DELIMITER + hcatURI.getDb() + TABLE_DELIMITER
+ hcatURI.getTable();
synchronized (partKeyPatterns) {
- Map<String, SettableInteger> patternCounts = partKeyPatterns.get(tableKey);
+ ConcurrentMap<String, SettableInteger> patternCounts = partKeyPatterns.get(tableKey);
if (patternCounts == null) {
- patternCounts = new HashMap<String, SettableInteger>();
+ patternCounts = new ConcurrentHashMap<String, SettableInteger>();
partKeyPatterns.put(tableKey, patternCounts);
}
SettableInteger count = patternCounts.get(partKeys);
@@ -136,29 +165,30 @@ public class EhcacheHCatDependencyCache
@Override
public boolean removeMissingDependency(HCatURI hcatURI, String actionID) {
+
+ Cache missingCache = missingDepsByServer.get(hcatURI.getServer());
+ if (missingCache == null) {
+ LOG.warn("Remove missing dependency - Missing cache entry for server - uri={0}, actionID={1}",
+ hcatURI.toURIString(), actionID);
+ return false;
+ }
SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap());
String partKeys = sortedPKV.getPartKeys();
- String missingKey = hcatURI.getServer() + TABLE_DELIMITER + hcatURI.getDb() + TABLE_DELIMITER
- + hcatURI.getTable() + TABLE_DELIMITER + partKeys + TABLE_DELIMITER + sortedPKV.getPartVals();
+ String missingKey = hcatURI.getDb() + TABLE_DELIMITER + hcatURI.getTable() + TABLE_DELIMITER +
+ partKeys + TABLE_DELIMITER + sortedPKV.getPartVals();
boolean decrement = false;
boolean removed = false;
synchronized (missingCache) {
Element element = missingCache.get(missingKey);
if (element == null) {
- LOG.debug("Remove missing dependency - Missing cache entry - uri={0}, actionID={1}",
+ LOG.warn("Remove missing dependency - Missing cache entry - uri={0}, actionID={1}",
hcatURI.toURIString(), actionID);
return false;
}
Collection<WaitingAction> waitingActions = ((WaitingActions) element.getObjectValue()).getWaitingActions();
- WaitingAction wAction = null;
- for (WaitingAction action : waitingActions) {
- if (action.getActionID().equals(actionID)) {
- wAction = action;
- }
- }
- removed = waitingActions.remove(wAction);
+ removed = waitingActions.remove(new WaitingAction(actionID, hcatURI.toURIString()));
if (!removed) {
- LOG.debug("Remove missing dependency - Missing action ID - uri={0}, actionID={1}",
+ LOG.warn("Remove missing dependency - Missing action ID - uri={0}, actionID={1}",
hcatURI.toURIString(), actionID);
}
if (waitingActions.isEmpty()) {
@@ -166,6 +196,7 @@ public class EhcacheHCatDependencyCache
decrement = true;
}
}
+ // Decrement partition key pattern count if the cache entry is removed
if (decrement) {
String tableKey = hcatURI.getServer() + TABLE_DELIMITER + hcatURI.getDb() + TABLE_DELIMITER
+ hcatURI.getTable();
@@ -176,20 +207,22 @@ public class EhcacheHCatDependencyCache
@Override
public Collection<String> getWaitingActions(HCatURI hcatURI) {
- SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap());
- String missingKey = hcatURI.getServer() + TABLE_DELIMITER + hcatURI.getDb() + TABLE_DELIMITER
- + hcatURI.getTable() + TABLE_DELIMITER + sortedPKV.getPartKeys() + TABLE_DELIMITER
- + sortedPKV.getPartVals();
- Element element = missingCache.get(missingKey);
- if (element == null) {
- return null;
- }
- WaitingActions waitingActions = (WaitingActions) element.getObjectValue();
- Collection<String> actionIDs = new ArrayList<String>();
- String uriString = hcatURI.getURI().toString();
- for (WaitingAction action : waitingActions.getWaitingActions()) {
- if (action.getDependencyURI().equals(uriString)) {
- actionIDs.add(action.getActionID());
+ Collection<String> actionIDs = null;
+ Cache missingCache = missingDepsByServer.get(hcatURI.getServer());
+ if (missingCache != null) {
+ SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap());
+ String missingKey = hcatURI.getDb() + TABLE_DELIMITER + hcatURI.getTable() + TABLE_DELIMITER
+ + sortedPKV.getPartKeys() + TABLE_DELIMITER + sortedPKV.getPartVals();
+ Element element = missingCache.get(missingKey);
+ if (element != null) {
+ WaitingActions waitingActions = (WaitingActions) element.getObjectValue();
+ actionIDs = new ArrayList<String>();
+ String uriString = hcatURI.getURI().toString();
+ for (WaitingAction action : waitingActions.getWaitingActions()) {
+ if (action.getDependencyURI().equals(uriString)) {
+ actionIDs.add(action.getActionID());
+ }
+ }
}
}
return actionIDs;
@@ -199,63 +232,80 @@ public class EhcacheHCatDependencyCache
public Collection<String> markDependencyAvailable(String server, String db, String table,
Map<String, String> partitions) {
String tableKey = server + TABLE_DELIMITER + db + TABLE_DELIMITER + table;
- Map<String, SettableInteger> patternCounts = partKeyPatterns.get(tableKey);
- if (patternCounts == null) {
- LOG.warn("Got partition available notification for " + tableKey
- + ". Unexpected and should not be listening to topic. Unregistering topic");
- HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
- hcatService.unregisterFromNotification(server, db, table);
- return null;
- }
- Collection<String> actionsWithAvailDep = new HashSet<String>();
- StringBuilder partValSB = new StringBuilder();
- // If partition patterns are date, date;country and date;country;state,
- // construct the partition values for each pattern and for the matching value in the
- // missingCache, get the waiting actions and mark it as available.
- for (Entry<String, SettableInteger> entry : patternCounts.entrySet()) {
- String[] partKeys = entry.getKey().split(PARTITION_DELIMITER);
- partValSB.setLength(0);
- for (String key : partKeys) {
- partValSB.append(partitions.get(key)).append(PARTITION_DELIMITER);
- }
- partValSB.setLength(partValSB.length() - 1);
- String missingKey = tableKey + TABLE_DELIMITER + entry.getKey() + TABLE_DELIMITER + partValSB.toString();
- boolean removed = false;
- Element element = null;
- synchronized (missingCache) {
- element = missingCache.get(missingKey);
- if (element != null) {
- missingCache.remove(missingKey);
- removed = true;
- }
- }
- if (removed) {
- decrementPartKeyPatternCount(tableKey, entry.getKey(), missingKey);
- Collection<WaitingAction> wActions = ((WaitingActions) element.getObjectValue()).getWaitingActions();
- for (WaitingAction wAction : wActions) {
- String actionID = wAction.getActionID();
- actionsWithAvailDep.add(actionID);
- Collection<String> depURIs = availableDeps.get(actionID);
- if (depURIs == null) {
- depURIs = new ArrayList<String>();
- Collection<String> existing = availableDeps.putIfAbsent(actionID, depURIs);
- if (existing != null) {
- depURIs = existing;
- }
+ synchronized (partKeyPatterns) {
+ Map<String, SettableInteger> patternCounts = partKeyPatterns.get(tableKey);
+ if (patternCounts == null) {
+ LOG.warn("Got partition available notification for " + tableKey
+ + ". Unexpected as no matching partition keys. Unregistering topic");
+ unregisterFromNotifications(server, db, table);
+ return null;
+ }
+ Cache missingCache = missingDepsByServer.get(server);
+ if (missingCache == null) {
+ LOG.warn("Got partition available notification for " + tableKey
+ + ". Unexpected. Missing server entry in cache. Unregistering topic");
+ partKeyPatterns.remove(tableKey);
+ unregisterFromNotifications(server, db, table);
+ return null;
+ }
+ Collection<String> actionsWithAvailDep = new HashSet<String>();
+ StringBuilder partValSB = new StringBuilder();
+ // If partition patterns are date, date;country and date;country;state,
+ // construct the partition values for each pattern and for the matching value in the
+ // missingCache, get the waiting actions and mark it as available.
+ for (Entry<String, SettableInteger> entry : patternCounts.entrySet()) {
+ String[] partKeys = entry.getKey().split(PARTITION_DELIMITER);
+ partValSB.setLength(0);
+ for (String key : partKeys) {
+ partValSB.append(partitions.get(key)).append(PARTITION_DELIMITER);
+ }
+ partValSB.setLength(partValSB.length() - 1);
+ String missingKey = db + TABLE_DELIMITER + table + TABLE_DELIMITER + entry.getKey() + TABLE_DELIMITER
+ + partValSB.toString();
+ boolean removed = false;
+ Element element = null;
+ synchronized (missingCache) {
+ element = missingCache.get(missingKey);
+ if (element != null) {
+ missingCache.remove(missingKey);
+ removed = true;
}
- synchronized (depURIs) {
- depURIs.add(wAction.getDependencyURI());
- availableDeps.put(actionID, depURIs);
+ }
+ if (removed) {
+ decrementPartKeyPatternCount(tableKey, entry.getKey(), server + TABLE_DELIMITER + missingKey);
+ // Add the removed entry to available dependencies
+ Collection<WaitingAction> wActions = ((WaitingActions) element.getObjectValue())
+ .getWaitingActions();
+ for (WaitingAction wAction : wActions) {
+ String actionID = wAction.getActionID();
+ actionsWithAvailDep.add(actionID);
+ Collection<String> depURIs = availableDeps.get(actionID);
+ if (depURIs == null) {
+ depURIs = new ArrayList<String>();
+ Collection<String> existing = availableDeps.putIfAbsent(actionID, depURIs);
+ if (existing != null) {
+ depURIs = existing;
+ }
+ }
+ synchronized (depURIs) {
+ depURIs.add(wAction.getDependencyURI());
+ availableDeps.put(actionID, depURIs);
+ }
}
}
}
+ return actionsWithAvailDep;
}
- return actionsWithAvailDep;
}
@Override
public Collection<String> getAvailableDependencyURIs(String actionID) {
- return availableDeps.get(actionID);
+ Collection<String> available = availableDeps.get(actionID);
+ if (available != null) {
+ // Return a copy
+ available = new ArrayList<String>(available);
+ }
+ return available;
}
@Override
@@ -296,7 +346,7 @@ public class EhcacheHCatDependencyCache
public void notifyElementExpired(Ehcache cache, Element element) {
// Invoked when timeToIdleSeconds or timeToLiveSeconds is met
String missingDepKey = (String) element.getObjectKey();
- LOG.info("Cache entry [{0}] expired", missingDepKey);
+ LOG.info("Cache entry [{0}] of cache [{1}] expired", missingDepKey, cache.getName());
onExpiryOrEviction(cache, element, missingDepKey);
}
@@ -321,29 +371,37 @@ public class EhcacheHCatDependencyCache
public void notifyElementEvicted(Ehcache cache, Element element) {
// Invoked when maxElementsInMemory is met
String missingDepKey = (String) element.getObjectKey();
- LOG.info("Cache entry [{0}] evicted", missingDepKey);
+ LOG.info("Cache entry [{0}] of cache [{1}] evicted", missingDepKey, cache.getName());
onExpiryOrEviction(cache, element, missingDepKey);
}
private void onExpiryOrEviction(Ehcache cache, Element element, String missingDepKey) {
int partValIndex = missingDepKey.lastIndexOf(TABLE_DELIMITER);
int partKeyIndex = missingDepKey.lastIndexOf(TABLE_DELIMITER, partValIndex - 1);
- String tableKey = missingDepKey.substring(0, partKeyIndex);
+ // server#db#table. Name of the cache is that of the server.
+ String tableKey = cache.getName() + TABLE_DELIMITER + missingDepKey.substring(0, partKeyIndex);
String partKeys = missingDepKey.substring(partKeyIndex + 1, partValIndex);
decrementPartKeyPatternCount(tableKey, partKeys, missingDepKey);
}
+ /**
+ * Decrement partition key pattern count, once a hcat URI is removed from the cache
+ *
+ * @param tableKey key identifying the table - server#db#table
+ * @param partKeys partition key pattern
+ * @param hcatURI URI with the partition key pattern
+ */
private void decrementPartKeyPatternCount(String tableKey, String partKeys, String hcatURI) {
synchronized (partKeyPatterns) {
Map<String, SettableInteger> patternCounts = partKeyPatterns.get(tableKey);
if (patternCounts == null) {
- LOG.debug("Removed dependency - Missing cache entry - uri={0}. "
+ LOG.warn("Removed dependency - Missing cache entry - uri={0}. "
+ "But no corresponding pattern key table entry", hcatURI);
}
else {
SettableInteger count = patternCounts.get(partKeys);
if (count == null) {
- LOG.debug("Removed dependency - Missing cache entry - uri={0}. "
+ LOG.warn("Removed dependency - Missing cache entry - uri={0}. "
+ "But no corresponding pattern key entry", hcatURI);
}
else {
@@ -353,16 +411,20 @@ public class EhcacheHCatDependencyCache
}
if (patternCounts.isEmpty()) {
partKeyPatterns.remove(tableKey);
- // Close JMS session. Stop listening on topic
- HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
String[] tableDetails = tableKey.split(TABLE_DELIMITER);
- hcatService.unregisterFromNotification(tableDetails[0], tableDetails[1], tableDetails[2]);
+ unregisterFromNotifications(tableDetails[0], tableDetails[1], tableDetails[2]);
}
}
}
}
}
+ private void unregisterFromNotifications(String server, String db, String table) {
+ // Close JMS session. Stop listening on topic
+ HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
+ hcatService.unregisterFromNotification(server, db, table);
+ }
+
private static class SortedPKV {
private StringBuilder partKeys;
private StringBuilder partVals;
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/dependency/hcat/SimpleHCatDependencyCache.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/dependency/hcat/SimpleHCatDependencyCache.java?rev=1455791&r1=1455790&r2=1455791&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/dependency/hcat/SimpleHCatDependencyCache.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/dependency/hcat/SimpleHCatDependencyCache.java Wed Mar 13 03:55:35 2013
@@ -86,7 +86,7 @@ public class SimpleHCatDependencyCache i
}
Collection<WaitingAction> waitingActions = partValues.get(partVal);
if (waitingActions == null) {
- waitingActions = new ArrayList<WaitingAction>();
+ waitingActions = new HashSet<WaitingAction>();
partValues.put(partVal, waitingActions);
}
waitingActions.add(new WaitingAction(actionID, hcatURI.toURIString()));
@@ -101,32 +101,26 @@ public class SimpleHCatDependencyCache i
String partVal = sortedPKV.getPartVals();
Map<String, Map<String, Collection<WaitingAction>>> partKeyPatterns = missingDeps.get(tableKey);
if (partKeyPatterns == null) {
- LOG.debug("Remove missing dependency - Missing table entry - uri={0}, actionID={1}",
+ LOG.warn("Remove missing dependency - Missing table entry - uri={0}, actionID={1}",
hcatURI.toURIString(), actionID);
return false;
}
synchronized(partKeyPatterns) {
Map<String, Collection<WaitingAction>> partValues = partKeyPatterns.get(partKey);
if (partValues == null) {
- LOG.debug("Remove missing dependency - Missing partition pattern - uri={0}, actionID={1}",
+ LOG.warn("Remove missing dependency - Missing partition pattern - uri={0}, actionID={1}",
hcatURI.toURIString(), actionID);
return false;
}
Collection<WaitingAction> waitingActions = partValues.get(partVal);
if (waitingActions == null) {
- LOG.debug("Remove missing dependency - Missing partition value - uri={0}, actionID={1}",
+ LOG.warn("Remove missing dependency - Missing partition value - uri={0}, actionID={1}",
hcatURI.toURIString(), actionID);
return false;
}
- WaitingAction wAction = null;
- for (WaitingAction action : waitingActions) {
- if (action.getActionID().equals(actionID)) {
- wAction = action;
- }
- }
- boolean removed = waitingActions.remove(wAction);
+ boolean removed = waitingActions.remove(new WaitingAction(actionID, hcatURI.toURIString()));
if (!removed) {
- LOG.debug("Remove missing dependency - Missing action ID - uri={0}, actionID={1}",
+ LOG.warn("Remove missing dependency - Missing action ID - uri={0}, actionID={1}",
hcatURI.toURIString(), actionID);
}
if (waitingActions.isEmpty()) {
@@ -182,7 +176,7 @@ public class SimpleHCatDependencyCache i
LOG.warn("Got partition available notification for " + tableKey
+ ". Unexpected and should not be listening to topic. Unregistering topic");
HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
- hcatService.unregisterFromNotification(server,db, table);
+ hcatService.unregisterFromNotification(server, db, table);
return null;
}
Collection<String> actionsWithAvailDep = new HashSet<String>();
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/dependency/hcat/WaitingAction.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/dependency/hcat/WaitingAction.java?rev=1455791&r1=1455790&r2=1455791&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/dependency/hcat/WaitingAction.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/dependency/hcat/WaitingAction.java Wed Mar 13 03:55:35 2013
@@ -46,4 +46,26 @@ public class WaitingAction implements Se
return dependencyURI;
}
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = prime + ((actionID == null) ? 0 : actionID.hashCode());
+ result = prime * result + ((dependencyURI == null) ? 0 : dependencyURI.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ WaitingAction other = (WaitingAction) obj;
+ return actionID.equals(other.actionID) && dependencyURI.equals(other.dependencyURI);
+ }
+
+ @Override
+ public String toString() {
+ return "WaitingAction [actionID=" + actionID + ", dependencyURI=" + dependencyURI + "]";
+ }
}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/dependency/hcat/WaitingActions.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/dependency/hcat/WaitingActions.java?rev=1455791&r1=1455790&r2=1455791&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/dependency/hcat/WaitingActions.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/dependency/hcat/WaitingActions.java Wed Mar 13 03:55:35 2013
@@ -18,16 +18,16 @@
package org.apache.oozie.dependency.hcat;
import java.io.Serializable;
-import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashSet;
public class WaitingActions implements Serializable {
private static final long serialVersionUID = 1L;
- private ArrayList<WaitingAction> waitingActions;
+ private Collection<WaitingAction> waitingActions;
public WaitingActions() {
- waitingActions = new ArrayList<WaitingAction>();
+ waitingActions = new HashSet<WaitingAction>();
}
/**
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java?rev=1455791&r1=1455790&r2=1455791&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java Wed Mar 13 03:55:35 2013
@@ -236,6 +236,43 @@ public class TestCoordPushDependencyChec
}
@Test
+ public void testTimeOutWithUnresolvedMissingDependencies() throws Exception {
+ String db = "default";
+ String table = "tablename";
+ String newHCatDependency1 = "hcat://" + server + "/" + db + "/" + table + "/dt=20120430;country=brazil";
+ String newHCatDependency2 = "hcat://" + server + "/" + db + "/" + table + "/dt=20120430;country=usa";
+ String newHCatDependency3 = "hcat://" + server + "/" + db + "/" + table + "/dt=20120430;country=uk";
+ String newHCatDependency = newHCatDependency1 + CoordELFunctions.INSTANCE_SEPARATOR + newHCatDependency2;
+ populateTable(db, table);
+
+ String actionId = addInitRecords(newHCatDependency);
+ checkCoordAction(actionId, newHCatDependency, CoordinatorAction.Status.WAITING);
+ new CoordPushDependencyCheckXCommand(actionId, true).call();
+ checkCoordAction(actionId, newHCatDependency1, CoordinatorAction.Status.WAITING);
+ PartitionDependencyManagerService pdms = Services.get().get(PartitionDependencyManagerService.class);
+ HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
+ assertTrue(pdms.getWaitingActions(new HCatURI(newHCatDependency1)).contains(actionId));
+ assertTrue(hcatService.isRegisteredForNotification(new HCatURI(newHCatDependency1)));
+
+ // Timeout is 10 mins. Change action created time to before 12 min to make the action
+ // timeout.
+ long timeOutCreationTime = System.currentTimeMillis() - (12 * 60 * 1000);
+ setCoordActionCreationTime(actionId, timeOutCreationTime);
+ // Set some missing dependency. Instead of latest or future just setting a current one for testing as
+ // we are only interested in ensuring CoordActionInputCheckXCommand is run
+ setMissingDependencies(actionId, newHCatDependency + CoordELFunctions.INSTANCE_SEPARATOR + newHCatDependency3);
+ addPartition(db, table, "dt=20120430;country=brazil");
+ checkDependencies(actionId, newHCatDependency + CoordELFunctions.INSTANCE_SEPARATOR + newHCatDependency3,
+ newHCatDependency1);
+ new CoordPushDependencyCheckXCommand(actionId).call();
+ Thread.sleep(300);
+
+ checkDependencies(actionId, newHCatDependency3, "");
+ assertNull(pdms.getWaitingActions(new HCatURI(newHCatDependency1)));
+ assertFalse(hcatService.isRegisteredForNotification(new HCatURI(newHCatDependency1)));
+ }
+
+ @Test
public void testTimeOutWithException1() throws Exception {
// Test timeout when missing dependencies are from a non existing table
String newHCatDependency1 = "hcat://" + server + "/nodb/notable/dt=20120430;country=brazil";
@@ -360,4 +397,18 @@ public class TestCoordPushDependencyChec
}
}
+ private CoordinatorActionBean checkDependencies(String actionId, String expDeps, String expPushDeps)
+ throws Exception {
+ try {
+ JPAService jpaService = Services.get().get(JPAService.class);
+ CoordinatorActionBean action = jpaService.execute(new CoordActionGetJPAExecutor(actionId));
+ assertEquals(expDeps, action.getMissingDependencies());
+ assertEquals(expPushDeps, action.getPushMissingDependencies());
+ return action;
+ }
+ catch (JPAExecutorException se) {
+ throw new Exception("Action ID " + actionId + " was not stored properly in db");
+ }
+ }
+
}
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerEhcache.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerEhcache.java?rev=1455791&r1=1455790&r2=1455791&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerEhcache.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerEhcache.java Wed Mar 13 03:55:35 2013
@@ -112,18 +112,18 @@ public class TestPartitionDependencyMana
HCatURI dep = new HCatURI("hcat://hcat.server.com:5080/mydb/mytbl/id=" + i);
pdms.addMissingDependency(dep, "" + i);
}
- // First 500 should have been evicted. But it is LRU and the next 100 removed is between 350 and 650.
- for (int i = 0; i < 350; i++) {
+ // First 500 should have been evicted. But it is LRU and the last 200 removed is between 300 and 700.
+ for (int i = 0; i < 300; i++) {
assertNull(pdms.getWaitingActions(new HCatURI("hcat://hcat.server.com:5080/mydb/mytbl/id=" + "" + i)));
}
int evicted = 0;
- for (int i = 350; i < 650; i++) {
+ for (int i = 300; i < 700; i++) {
if (pdms.getWaitingActions(new HCatURI("hcat://hcat.server.com:5080/mydb/mytbl/id=" + "" + i)) == null) {
evicted++;
}
}
- assertEquals(150, evicted);
- for (int i = 650; i < 1000; i++) {
+ assertEquals(200, evicted);
+ for (int i = 700; i < 1000; i++) {
String actionID = "" + i;
HCatURI dep = new HCatURI("hcat://hcat.server.com:5080/mydb/mytbl/id=" + actionID);
Collection<String> waitingActions = pdms.getWaitingActions(dep);
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java?rev=1455791&r1=1455790&r2=1455791&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java Wed Mar 13 03:55:35 2013
@@ -22,6 +22,8 @@ import java.lang.management.MemoryMXBean
import java.net.URISyntaxException;
import java.util.Collection;
+import org.apache.oozie.dependency.hcat.HCatMessageHandler;
+import org.apache.oozie.jms.JMSConnectionInfo;
import org.apache.oozie.service.Services;
import org.apache.oozie.test.XDataTestCase;
import org.apache.oozie.util.HCatURI;
@@ -59,27 +61,44 @@ public class TestPartitionDependencyMana
String actionId3 = "1234465453";
String actionId4 = "1234465454";
- String server = "hcat.server.com:5080";
+ String server1 = "hcat-server1.domain.com:5080";
+ String server2 = "hcat-server2.domain.com:5080";
String db = "mydb";
- String table = "mytbl";
+ String table1 = "mytbl1";
+ String table2 = "mytbl2";
// add partition as missing
- HCatURI dep1 = new HCatURI("hcat://hcat.server.com:5080/mydb/mytbl/dt=20120101;country=us");
- HCatURI dep2 = new HCatURI("hcat://hcat.server.com:5080/mydb/mytbl/country=us;dt=20120101");
- HCatURI dep3 = new HCatURI("hcat://hcat.server.com:5080/mydb/mytbl/dt=20120102;country=us");
- HCatURI dep4 = new HCatURI("hcat://hcat.server.com:5080/mydb/mytbl/dt=20120102;country=us;state=CA");
+ HCatURI dep1 = new HCatURI("hcat://hcat-server1.domain.com:5080/mydb/mytbl1/dt=20120101;country=us");
+ HCatURI dep2 = new HCatURI("hcat://hcat-server1.domain.com:5080/mydb/mytbl1/country=us;dt=20120101");
+ HCatURI dep3 = new HCatURI("hcat://hcat-server2.domain.com:5080/mydb/mytbl2/dt=20120102;country=us");
+ HCatURI dep4 = new HCatURI("hcat://hcat-server2.domain.com:5080/mydb/mytbl2/dt=20120102;country=us;state=CA");
+
+ addMissingDependencyAndRegister(dep1, actionId1);
+ addMissingDependencyAndRegister(dep2, actionId1);
+ addMissingDependencyAndRegister(dep2, actionId2);
+ addMissingDependencyAndRegister(dep2, actionId3);
+ addMissingDependencyAndRegister(dep3, actionId3);
+ addMissingDependencyAndRegister(dep4, actionId4);
+ // Add duplicates. RecoveryService will add duplicates
+ addMissingDependencyAndRegister(dep4, actionId4);
+ addMissingDependencyAndRegister(dep4, actionId4);
+
+ HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
+ JMSAccessorService jmsService = Services.get().get(JMSAccessorService.class);
+ JMSConnectionInfo connInfo = hcatService.getJMSConnectionInfo(dep1.getURI());
+ assertTrue(hcatService.isRegisteredForNotification(dep1)); //server1,db,table1
+ assertTrue(hcatService.isRegisteredForNotification(dep3)); //server2,db,table2
+ assertTrue(jmsService.isListeningToTopic(connInfo, dep1.getDb() + "." + dep1.getTable()));
+ assertTrue(jmsService.isListeningToTopic(connInfo, dep3.getDb() + "." + dep3.getTable()));
+
PartitionDependencyManagerService pdms = Services.get().get(PartitionDependencyManagerService.class);
- pdms.addMissingDependency(dep1, actionId1);
- pdms.addMissingDependency(dep2, actionId1);
- pdms.addMissingDependency(dep2, actionId2);
- pdms.addMissingDependency(dep2, actionId3);
- pdms.addMissingDependency(dep3, actionId3);
- pdms.addMissingDependency(dep4, actionId4);
assertTrue(pdms.getWaitingActions(dep1).contains(actionId1));
assertTrue(pdms.getWaitingActions(dep2).contains(actionId1));
assertTrue(pdms.getWaitingActions(dep2).contains(actionId2));
assertTrue(pdms.getWaitingActions(dep2).contains(actionId2));
assertTrue(pdms.getWaitingActions(dep3).contains(actionId3));
assertTrue(pdms.getWaitingActions(dep4).contains(actionId4));
+ // Should not contain duplicates
+ assertEquals(1, pdms.getWaitingActions(dep4).size());
pdms.removeMissingDependency(dep2, actionId1);
assertTrue(pdms.getWaitingActions(dep1).contains(actionId1));
@@ -87,15 +106,15 @@ public class TestPartitionDependencyMana
assertTrue(!pdms.getWaitingActions(dep2).contains(actionId1));
assertNull(pdms.getAvailableDependencyURIs(actionId1));
- pdms.partitionAvailable(server, db, table, getPartitionMap("dt=20120102;country=us;state=NY"));
+ pdms.partitionAvailable(server2, db, table2, getPartitionMap("dt=20120102;country=us;state=NY"));
assertNull(pdms.getWaitingActions(dep3));
assertTrue(pdms.getAvailableDependencyURIs(actionId3).contains(dep3.getURI().toString()));
- pdms.partitionAvailable(server, db, table, getPartitionMap("dt=20120102;country=us;state=CA"));
+ pdms.partitionAvailable(server2, db, table2, getPartitionMap("dt=20120102;country=us;state=CA"));
assertNull(pdms.getWaitingActions(dep4));
assertTrue(pdms.getAvailableDependencyURIs(actionId4).contains(dep4.getURI().toString()));
- pdms.partitionAvailable(server, db, table, getPartitionMap("dt=20120101;country=us"));
+ pdms.partitionAvailable(server1, db, table1, getPartitionMap("dt=20120101;country=us"));
assertNull(pdms.getWaitingActions(dep1));
assertNull(pdms.getWaitingActions(dep2));
assertTrue(pdms.getAvailableDependencyURIs(actionId2).contains(dep2.getURI().toString()));
@@ -104,6 +123,21 @@ public class TestPartitionDependencyMana
assertTrue(pdms.removeAvailableDependencyURIs(actionId3, pdms.getAvailableDependencyURIs(actionId3)));
assertNull(pdms.getAvailableDependencyURIs(actionId3));
+
+ assertFalse(hcatService.isRegisteredForNotification(dep1)); //server1,db,table1
+ assertFalse(hcatService.isRegisteredForNotification(dep3)); //server2,db,table2
+ assertFalse(jmsService.isListeningToTopic(connInfo, dep1.getDb() + "." + dep1.getTable()));
+ assertFalse(jmsService.isListeningToTopic(connInfo, dep3.getDb() + "." + dep3.getTable()));
+ }
+
+ private void addMissingDependencyAndRegister(HCatURI hcatURI, String actionId) {
+ PartitionDependencyManagerService pdms = Services.get().get(PartitionDependencyManagerService.class);
+ pdms.addMissingDependency(hcatURI, actionId);
+ HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
+ if (!hcatService.isRegisteredForNotification(hcatURI)) {
+ hcatService.registerForNotification(hcatURI, hcatURI.getDb() + "." + hcatURI.getTable(),
+ new HCatMessageHandler(hcatURI.getServer()));
+ }
}
@Test
@@ -158,4 +192,4 @@ public class TestPartitionDependencyMana
}
}
-}
+}
\ No newline at end of file
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/test/XDataTestCase.java?rev=1455791&r1=1455790&r2=1455791&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/test/XDataTestCase.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/test/XDataTestCase.java Wed Mar 13 03:55:35 2013
@@ -52,13 +52,14 @@ import org.apache.oozie.client.Coordinat
import org.apache.oozie.client.CoordinatorJob.Timeunit;
import org.apache.oozie.executor.jpa.BundleActionInsertJPAExecutor;
import org.apache.oozie.executor.jpa.BundleJobInsertJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
import org.apache.oozie.executor.jpa.CoordActionInsertJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordActionUpdateJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.executor.jpa.SLAEventInsertJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowActionInsertJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobInsertJPAExecutor;
-import org.apache.oozie.service.CoordinatorStoreService;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.LiteWorkflowStoreService;
import org.apache.oozie.service.Services;
@@ -66,7 +67,6 @@ import org.apache.oozie.service.UUIDServ
import org.apache.oozie.service.WorkflowAppService;
import org.apache.oozie.service.WorkflowStoreService;
import org.apache.oozie.service.UUIDService.ApplicationType;
-import org.apache.oozie.store.CoordinatorStore;
import org.apache.oozie.util.DateUtils;
import org.apache.oozie.util.IOUtils;
import org.apache.oozie.util.XConfiguration;
@@ -1358,11 +1358,16 @@ public abstract class XDataTestCase exte
}
protected void setCoordActionCreationTime(String actionId, long actionCreationTime) throws Exception {
- CoordinatorStore store = Services.get().get(CoordinatorStoreService.class).create();
- CoordinatorActionBean action = store.getCoordinatorAction(actionId, false);
+ JPAService jpaService = Services.get().get(JPAService.class);
+ CoordinatorActionBean action = jpaService.execute(new CoordActionGetJPAExecutor(actionId));
action.setCreatedTime(new Date(actionCreationTime));
- store.beginTrx();
- store.updateCoordinatorAction(action);
- store.commitTrx();
+ jpaService.execute(new CoordActionUpdateJPAExecutor(action));
+ }
+
+ protected void setMissingDependencies(String actionId, String missingDependencies) throws Exception {
+ JPAService jpaService = Services.get().get(JPAService.class);
+ CoordinatorActionBean action = jpaService.execute(new CoordActionGetJPAExecutor(actionId));
+ action.setMissingDependencies(missingDependencies);
+ jpaService.execute(new CoordActionUpdateJPAExecutor(action));
}
}
Modified: oozie/trunk/release-log.txt
URL: http://svn.apache.org/viewvc/oozie/trunk/release-log.txt?rev=1455791&r1=1455790&r2=1455791&view=diff
==============================================================================
--- oozie/trunk/release-log.txt (original)
+++ oozie/trunk/release-log.txt Wed Mar 13 03:55:35 2013
@@ -5,6 +5,7 @@ OOZIE-1239 Bump up trunk to 4.1.0-SNAPSH
-- Oozie 4.0.0 (unreleased)
+OOZIE-1263 Fix few HCat dependency check issues (rohini via virag)
OOZIE-1261 Registered push dependencies are not removed on Coord Kill command (virag)
OOZIE-1191 add examples of coordinator with SLA tag inserted (ryota via mona)
OOZIE-1204 Illustrate correct use of parameters inside SLA tags (jun aoki via mona)