You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by ry...@apache.org on 2014/06/16 10:29:07 UTC
git commit: OOZIE-1492 Make sure HA works with HCat (ryota)
Repository: oozie
Updated Branches:
refs/heads/master ab3a17497 -> 198f5c2a5
OOZIE-1492 Make sure HA works with HCat (ryota)
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/198f5c2a
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/198f5c2a
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/198f5c2a
Branch: refs/heads/master
Commit: 198f5c2a5e7fce71d6fa638d2d26fa006fee171a
Parents: ab3a174
Author: egashira <ry...@yahoo.com>
Authored: Mon Jun 16 01:22:04 2014 -0700
Committer: egashira <ry...@yahoo.com>
Committed: Mon Jun 16 01:23:29 2014 -0700
----------------------------------------------------------------------
.../org/apache/oozie/CoordinatorActionBean.java | 3 +
.../coord/CoordPushDependencyCheckXCommand.java | 4 +
.../hcat/EhcacheHCatDependencyCache.java | 54 ++++
.../dependency/hcat/HCatDependencyCache.java | 12 +
.../hcat/SimpleHCatDependencyCache.java | 118 ++++++++-
.../executor/jpa/CoordActionQueryExecutor.java | 21 +-
.../oozie/service/JobsConcurrencyService.java | 8 +
.../PartitionDependencyManagerService.java | 93 +++++++
.../oozie/service/ZKJobsConcurrencyService.java | 10 +
.../TestCoordActionInputCheckXCommand.java | 5 +-
.../TestCoordPushDependencyCheckXCommand.java | 1 -
...TestHAPartitionDependencyManagerEhCache.java | 42 +++
...TestHAPartitionDependencyManagerService.java | 257 +++++++++++++++++++
.../TestPartitionDependencyManagerEhcache.java | 1 -
.../TestPartitionDependencyManagerService.java | 36 +--
.../java/org/apache/oozie/test/XTestCase.java | 10 +-
.../java/org/apache/oozie/test/ZKXTestCase.java | 2 +-
release-log.txt | 1 +
18 files changed, 653 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/oozie/blob/198f5c2a/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java b/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
index 8cbcc4f..51eaf2d 100644
--- a/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
+++ b/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
@@ -136,6 +136,9 @@ import org.json.simple.JSONObject;
// Query to retrieve status of Coordinator actions
@NamedQuery(name = "GET_COORD_ACTIONS_STATUS_UNIGNORED", query = "select a.statusStr from CoordinatorActionBean a where a.jobId = :jobId AND a.statusStr <> 'IGNORED'"),
+ // Query to retrieve status of Coordinator actions
+ @NamedQuery(name = "GET_COORD_ACTION_STATUS", query = "select a.statusStr from CoordinatorActionBean a where a.id = :id"),
+
@NamedQuery(name = "GET_COORD_ACTION_FOR_COORD_JOB_BY_ACTION_NUMBER", query = "select a.id from CoordinatorActionBean a where a.jobId = :jobId AND a.actionNumber = :actionNumber"),
@NamedQuery(name = "GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME", query = "select a.jobId from CoordinatorActionBean a where a.lastModifiedTimestamp >= :lastModifiedTime"),
http://git-wip-us.apache.org/repos/asf/oozie/blob/198f5c2a/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
index 2e5cd47..ae71924 100644
--- a/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
@@ -23,6 +23,7 @@ import java.net.URI;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.CoordinatorActionBean;
import org.apache.oozie.CoordinatorJobBean;
@@ -43,6 +44,7 @@ import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.service.CallableQueueService;
import org.apache.oozie.service.EventHandlerService;
import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.PartitionDependencyManagerService;
import org.apache.oozie.service.RecoveryService;
import org.apache.oozie.service.Service;
import org.apache.oozie.service.Services;
@@ -207,6 +209,8 @@ public class CoordPushDependencyCheckXCommand extends CoordinatorXCommand<Void>
protected void onAllPushDependenciesAvailable() throws CommandException {
coordAction.setPushMissingDependencies("");
+ Services.get().get(PartitionDependencyManagerService.class)
+ .removeCoordActionWithDependenciesAvailable(coordAction.getId());
if (coordAction.getMissingDependencies() == null || coordAction.getMissingDependencies().length() == 0) {
Date nominalTime = coordAction.getNominalTime();
Date currentTime = new Date();
http://git-wip-us.apache.org/repos/asf/oozie/blob/198f5c2a/core/src/main/java/org/apache/oozie/dependency/hcat/EhcacheHCatDependencyCache.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/dependency/hcat/EhcacheHCatDependencyCache.java b/core/src/main/java/org/apache/oozie/dependency/hcat/EhcacheHCatDependencyCache.java
index 6f127c4..5743c15 100644
--- a/core/src/main/java/org/apache/oozie/dependency/hcat/EhcacheHCatDependencyCache.java
+++ b/core/src/main/java/org/apache/oozie/dependency/hcat/EhcacheHCatDependencyCache.java
@@ -17,13 +17,16 @@
*/
package org.apache.oozie.dependency.hcat;
+import java.net.URISyntaxException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -472,4 +475,55 @@ public class EhcacheHCatDependencyCache implements HCatDependencyCache, CacheEve
}
}
+ @Override
+ public void removeNonWaitingCoordActions(Set<String> staleActions) {
+ Iterator<String> serverItr = missingDepsByServer.keySet().iterator();
+ while (serverItr.hasNext()) {
+ String server = serverItr.next();
+ Cache missingCache = missingDepsByServer.get(server);
+ if (missingCache == null) {
+ continue;
+ }
+ synchronized (missingCache) {
+ for (Object key : missingCache.getKeys()) {
+ Element element = missingCache.get(key);
+ if (element == null) {
+ continue;
+ }
+ Collection<WaitingAction> waitingActions = ((WaitingActions) element.getObjectValue())
+ .getWaitingActions();
+ Iterator<WaitingAction> wactionItr = waitingActions.iterator();
+ HCatURI hcatURI = null;
+ while(wactionItr.hasNext()) {
+ WaitingAction waction = wactionItr.next();
+ if(staleActions.contains(waction.getActionID())) {
+ try {
+ hcatURI = new HCatURI(waction.getDependencyURI());
+ wactionItr.remove();
+ }
+ catch (URISyntaxException e) {
+ continue;
+ }
+ }
+ }
+ if (waitingActions.isEmpty() && hcatURI != null) {
+ missingCache.remove(key);
+ // Decrement partition key pattern count if the cache entry is removed
+ SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap());
+ String partKeys = sortedPKV.getPartKeys();
+ String tableKey = hcatURI.getServer() + TABLE_DELIMITER + hcatURI.getDb() + TABLE_DELIMITER
+ + hcatURI.getTable();
+ String hcatURIStr = hcatURI.toURIString();
+ decrementPartKeyPatternCount(tableKey, partKeys, hcatURIStr);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public void removeCoordActionWithDependenciesAvailable(String coordAction) {
+ // to be implemented when reverse-lookup data structure for purging is added
+ }
+
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/198f5c2a/core/src/main/java/org/apache/oozie/dependency/hcat/HCatDependencyCache.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/dependency/hcat/HCatDependencyCache.java b/core/src/main/java/org/apache/oozie/dependency/hcat/HCatDependencyCache.java
index df3afd3..e1e770f 100644
--- a/core/src/main/java/org/apache/oozie/dependency/hcat/HCatDependencyCache.java
+++ b/core/src/main/java/org/apache/oozie/dependency/hcat/HCatDependencyCache.java
@@ -19,6 +19,7 @@ package org.apache.oozie.dependency.hcat;
import java.util.Collection;
import java.util.Map;
+import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.util.HCatURI;
@@ -89,4 +90,15 @@ public interface HCatDependencyCache {
* Destroy the cache
*/
public void destroy();
+
+ /**
+ * Purge stale actions
+ */
+ public void removeNonWaitingCoordActions(Set<String> coordActions);
+
+ /**
+ * Remove coordAction when all dependencies met
+ */
+ public void removeCoordActionWithDependenciesAvailable(String coordAction);
+
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/198f5c2a/core/src/main/java/org/apache/oozie/dependency/hcat/SimpleHCatDependencyCache.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/dependency/hcat/SimpleHCatDependencyCache.java b/core/src/main/java/org/apache/oozie/dependency/hcat/SimpleHCatDependencyCache.java
index e8e3ebc..08aa8f9 100644
--- a/core/src/main/java/org/apache/oozie/dependency/hcat/SimpleHCatDependencyCache.java
+++ b/core/src/main/java/org/apache/oozie/dependency/hcat/SimpleHCatDependencyCache.java
@@ -17,14 +17,17 @@
*/
package org.apache.oozie.dependency.hcat;
+import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -51,13 +54,16 @@ public class SimpleHCatDependencyCache implements HCatDependencyCache {
*/
private ConcurrentMap<String, Collection<String>> availableDeps;
- // TODO:
- // Gather and print stats on cache hits and misses.
+ /**
+ * Map of actionIDs and partitions for reverse-lookup in purging
+ */
+ private ConcurrentMap<String, ConcurrentMap<String, Collection<String>>> actionPartitionMap;
@Override
public void init(Configuration conf) {
missingDeps = new ConcurrentHashMap<String, ConcurrentMap<String, Map<String, Collection<WaitingAction>>>>();
availableDeps = new ConcurrentHashMap<String, Collection<String>>();
+ actionPartitionMap = new ConcurrentHashMap<String, ConcurrentMap<String, Collection<String>>>();
}
@Override
@@ -77,6 +83,23 @@ public class SimpleHCatDependencyCache implements HCatDependencyCache {
partKeyPatterns = existingMap;
}
}
+ ConcurrentMap<String, Collection<String>> partitionMap = actionPartitionMap.get(actionID);
+ if (partitionMap == null) {
+ partitionMap = new ConcurrentHashMap<String, Collection<String>>();
+ ConcurrentMap<String, Collection<String>> existingPartMap = actionPartitionMap.putIfAbsent(actionID,
+ partitionMap);
+ if (existingPartMap != null) {
+ partitionMap = existingPartMap;
+ }
+ }
+ synchronized (partitionMap) {
+ Collection<String> partKeys = partitionMap.get(tableKey);
+ if (partKeys == null) {
+ partKeys = new ArrayList<String>();
+ }
+ partKeys.add(partKey);
+ partitionMap.put(tableKey, partKeys);
+ }
synchronized (partKeyPatterns) {
missingDeps.put(tableKey, partKeyPatterns); // To handle race condition with removal of partKeyPatterns
Map<String, Collection<WaitingAction>> partValues = partKeyPatterns.get(partKey);
@@ -105,6 +128,22 @@ public class SimpleHCatDependencyCache implements HCatDependencyCache {
hcatURI.toURIString(), actionID);
return false;
}
+ ConcurrentMap<String, Collection<String>> partitionMap = actionPartitionMap.get(actionID);
+ if (partitionMap != null) {
+ synchronized (partitionMap) {
+ Collection<String> partKeys = partitionMap.get(tableKey);
+ if (partKeys != null) {
+ partKeys.remove(partKey);
+ }
+ if (partKeys.size() == 0) {
+ partitionMap.remove(tableKey);
+ }
+ if (partitionMap.size() == 0) {
+ actionPartitionMap.remove(actionID);
+ }
+ }
+ }
+
synchronized(partKeyPatterns) {
Map<String, Collection<WaitingAction>> partValues = partKeyPatterns.get(partKey);
if (partValues == null) {
@@ -292,7 +331,82 @@ public class SimpleHCatDependencyCache implements HCatDependencyCache {
public String getPartVals() {
return partVals.toString();
}
+ }
+
+ private HCatURI removePartitions(String coordActionId, Collection<String> partKeys,
+ Map<String, Map<String, Collection<WaitingAction>>> partKeyPatterns) {
+ HCatURI hcatUri = null;
+ for (String partKey : partKeys) {
+ Map<String, Collection<WaitingAction>> partValues = partKeyPatterns.get(partKey);
+ Iterator<String> partValItr = partValues.keySet().iterator();
+ while (partValItr.hasNext()) {
+ String partVal = partValItr.next();
+ Collection<WaitingAction> waitingActions = partValues.get(partVal);
+ if (waitingActions != null) {
+ Iterator<WaitingAction> waitItr = waitingActions.iterator();
+ while (waitItr.hasNext()) {
+ WaitingAction waction = waitItr.next();
+ if (coordActionId.contains(waction.getActionID())) {
+ waitItr.remove();
+ if (hcatUri == null) {
+ try {
+ hcatUri = new HCatURI(waction.getDependencyURI());
+ }
+ catch (URISyntaxException e) {
+ continue;
+ }
+ }
+ }
+ }
+ }
+ // delete partition value with no waiting actions
+ if (waitingActions.size() == 0) {
+ partValItr.remove();
+ }
+ }
+ if (partValues.size() == 0) {
+ partKeyPatterns.remove(partKey);
+ }
+ }
+ return hcatUri;
+ }
+ @Override
+ public void removeNonWaitingCoordActions(Set<String> coordActions) {
+ HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
+ for (String coordActionId : coordActions) {
+ synchronized (actionPartitionMap) {
+ Map<String, Collection<String>> partitionMap = actionPartitionMap.get(coordActionId);
+ if (partitionMap != null) {
+ Iterator<String> tableItr = partitionMap.keySet().iterator();
+ while (tableItr.hasNext()) {
+ String tableKey = tableItr.next();
+ HCatURI hcatUri = null;
+ Map<String, Map<String, Collection<WaitingAction>>> partKeyPatterns = missingDeps.get(tableKey);
+ if (partKeyPatterns != null) {
+ synchronized (partKeyPatterns) {
+ Collection<String> partKeys = partitionMap.get(tableKey);
+ if (partKeys != null) {
+ hcatUri = removePartitions(coordActionId, partKeys, partKeyPatterns);
+ }
+ }
+ if (partKeyPatterns.size() == 0) {
+ tableItr.remove();
+ if (hcatUri != null) {
+ // Close JMS session. Stop listening on topic
+ hcatService.unregisterFromNotification(hcatUri);
+ }
+ }
+ }
+ }
+ }
+ actionPartitionMap.remove(coordActionId);
+ }
+ }
}
+ @Override
+ public void removeCoordActionWithDependenciesAvailable(String coordAction) {
+ actionPartitionMap.remove(coordAction);
+ }
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/198f5c2a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java
index f5304ca..d56af7b 100644
--- a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java
+++ b/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java
@@ -25,10 +25,16 @@ import java.util.List;
import javax.persistence.EntityManager;
import javax.persistence.Query;
+import org.apache.oozie.BinaryBlob;
import org.apache.oozie.CoordinatorActionBean;
import org.apache.oozie.ErrorCode;
+import org.apache.oozie.StringBlob;
+import org.apache.oozie.WorkflowActionBean;
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
+import org.apache.oozie.util.DateUtils;
import com.google.common.annotations.VisibleForTesting;
@@ -48,6 +54,7 @@ public class CoordActionQueryExecutor extends
UPDATE_COORD_ACTION_FOR_MODIFIED_DATE,
UPDATE_COORD_ACTION_RERUN,
GET_COORD_ACTION,
+ GET_COORD_ACTION_STATUS,
GET_COORD_ACTIVE_ACTIONS_COUNT_BY_JOBID,
GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME
};
@@ -174,6 +181,7 @@ public class CoordActionQueryExecutor extends
CoordActionQuery caQuery = (CoordActionQuery) namedQuery;
switch (caQuery) {
case GET_COORD_ACTION:
+ case GET_COORD_ACTION_STATUS:
query.setParameter("id", parameters[0]);
break;
case GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME:
@@ -198,10 +206,11 @@ public class CoordActionQueryExecutor extends
public CoordinatorActionBean get(CoordActionQuery namedQuery, Object... parameters) throws JPAExecutorException {
EntityManager em = jpaService.getEntityManager();
Query query = getSelectQuery(namedQuery, em, parameters);
- CoordinatorActionBean bean = (CoordinatorActionBean) jpaService.executeGet(namedQuery.name(), query, em);
- if (bean == null) {
+ Object ret = jpaService.executeGet(namedQuery.name(), query, em);
+ if (ret == null) {
throw new JPAExecutorException(ErrorCode.E0605, query.toString());
}
+ CoordinatorActionBean bean = constructBean(namedQuery, ret);
return bean;
}
@@ -222,11 +231,19 @@ public class CoordActionQueryExecutor extends
private CoordinatorActionBean constructBean(CoordActionQuery namedQuery, Object ret) throws JPAExecutorException {
CoordinatorActionBean bean;
+ Object[] arr;
switch (namedQuery) {
+ case GET_COORD_ACTION:
+ bean = (CoordinatorActionBean) ret;
+ break;
case GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME:
bean = new CoordinatorActionBean();
bean.setJobId((String) ret);
break;
+ case GET_COORD_ACTION_STATUS:
+ bean = new CoordinatorActionBean();
+ bean.setStatusStr((String)ret);
+ break;
default:
throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct action bean for "
+ namedQuery.name());
http://git-wip-us.apache.org/repos/asf/oozie/blob/198f5c2a/core/src/main/java/org/apache/oozie/service/JobsConcurrencyService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/JobsConcurrencyService.java b/core/src/main/java/org/apache/oozie/service/JobsConcurrencyService.java
index 27c97e6..36adbd6 100644
--- a/core/src/main/java/org/apache/oozie/service/JobsConcurrencyService.java
+++ b/core/src/main/java/org/apache/oozie/service/JobsConcurrencyService.java
@@ -134,4 +134,12 @@ public class JobsConcurrencyService implements Service, Instrumentable {
public boolean isAllServerRequest(Map<String, String[]> params) {
return false;
}
+
+ /**
+ * Check if it is running in HA mode
+ * @return false
+ */
+ public boolean isHighlyAvailableMode(){
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/198f5c2a/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java b/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java
index 985dcab..41d1ba2 100644
--- a/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java
+++ b/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java
@@ -18,16 +18,28 @@
package org.apache.oozie.service;
import java.util.Collection;
+import java.util.Date;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.command.coord.CoordActionUpdatePushMissingDependency;
import org.apache.oozie.dependency.hcat.HCatDependencyCache;
import org.apache.oozie.dependency.hcat.SimpleHCatDependencyCache;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
import org.apache.oozie.util.HCatURI;
import org.apache.oozie.util.XLog;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* Module that functions like a caching service to maintain partition dependency mappings
*/
@@ -35,11 +47,20 @@ public class PartitionDependencyManagerService implements Service {
public static final String CONF_PREFIX = Service.CONF_PREFIX + "PartitionDependencyManagerService.";
public static final String CACHE_MANAGER_IMPL = CONF_PREFIX + "cache.manager.impl";
+ public static final String CACHE_PURGE_INTERVAL = CONF_PREFIX + "cache.purge.interval";
+ public static final String CACHE_PURGE_TTL = CONF_PREFIX + "cache.purge.ttl";
private static XLog LOG = XLog.getLog(PartitionDependencyManagerService.class);
private HCatDependencyCache dependencyCache;
+ /**
+ * Keep timestamp when missing dependencies of a coord action are registered
+ */
+ private ConcurrentMap<String, Long> registeredCoordActionMap;
+
+ private boolean purgeEnabled = false;
+
@Override
public void init(Services services) throws ServiceException {
init(services.getConf());
@@ -52,6 +73,57 @@ public class PartitionDependencyManagerService implements Service {
dependencyCache.init(conf);
LOG.info("PartitionDependencyManagerService initialized. Dependency cache is {0} ", dependencyCache.getClass()
.getName());
+ purgeEnabled = Services.get().get(JobsConcurrencyService.class).isHighlyAvailableMode();
+ if (purgeEnabled) {
+ Runnable purgeThread = new CachePurgeWorker(dependencyCache);
+ // schedule runnable by default every 10 min
+ Services.get()
+ .get(SchedulerService.class)
+ .schedule(purgeThread, 10, Services.get().getConf().getInt(CACHE_PURGE_INTERVAL, 600),
+ SchedulerService.Unit.SEC);
+ registeredCoordActionMap = new ConcurrentHashMap<String, Long>();
+ }
+ }
+
+ private class CachePurgeWorker implements Runnable {
+ HCatDependencyCache cache;
+ public CachePurgeWorker(HCatDependencyCache cache) {
+ this.cache = cache;
+ }
+
+ @Override
+ public void run() {
+ if (Thread.currentThread().isInterrupted()) {
+ return;
+ }
+ try {
+ purgeMissingDependency(Services.get().getConf().getInt(CACHE_PURGE_TTL, 1800));
+ }
+ catch (Throwable error) {
+ XLog.getLog(PartitionDependencyManagerService.class).debug("Throwable in CachePurgeWorker thread run : ", error);
+ }
+ }
+
+ private void purgeMissingDependency(int timeToLive) {
+ long currentTime = new Date().getTime();
+ Set<String> staleActions = new HashSet<String>();
+ for(String actionId : registeredCoordActionMap.keySet()) {
+ Long regTime = registeredCoordActionMap.get(actionId);
+ if(regTime < (currentTime - timeToLive * 1000)){
+ CoordinatorActionBean caBean = null;
+ try {
+ caBean = CoordActionQueryExecutor.getInstance().get(CoordActionQuery.GET_COORD_ACTION_STATUS, actionId);
+ }
+ catch (JPAExecutorException e) {
+ LOG.warn("Error in checking coord action:" + actionId + "to purge, skipping", e);
+ }
+ if(caBean != null && !caBean.getStatus().equals(CoordinatorAction.Status.WAITING)){
+ staleActions.add(actionId);
+ }
+ }
+ }
+ dependencyCache.removeNonWaitingCoordActions(staleActions);
+ }
}
@Override
@@ -71,6 +143,9 @@ public class PartitionDependencyManagerService implements Service {
* @param actionID ID of action which is waiting for the dependency
*/
public void addMissingDependency(HCatURI hcatURI, String actionID) {
+ if (purgeEnabled) {
+ registeredCoordActionMap.put(actionID, new Date().getTime());
+ }
dependencyCache.addMissingDependency(hcatURI, actionID);
}
@@ -142,4 +217,22 @@ public class PartitionDependencyManagerService implements Service {
return dependencyCache.removeAvailableDependencyURIs(actionID, dependencyURIs);
}
+ /**
+ * Remove a coord action from dependency cache when all push missing dependencies available
+ *
+ * @param actionID action id
+ * @param dependencyURIs set of dependency URIs
+ * @return true if successful, else false
+ */
+ public void removeCoordActionWithDependenciesAvailable(String actionID) {
+ if (purgeEnabled) {
+ registeredCoordActionMap.remove(actionID);
+ }
+ dependencyCache.removeCoordActionWithDependenciesAvailable(actionID);
+ }
+
+ @VisibleForTesting
+ public void runCachePurgeWorker() {
+ new CachePurgeWorker(dependencyCache).run();
+ }
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/198f5c2a/core/src/main/java/org/apache/oozie/service/ZKJobsConcurrencyService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/ZKJobsConcurrencyService.java b/core/src/main/java/org/apache/oozie/service/ZKJobsConcurrencyService.java
index 611b74c..1d5f4a4 100644
--- a/core/src/main/java/org/apache/oozie/service/ZKJobsConcurrencyService.java
+++ b/core/src/main/java/org/apache/oozie/service/ZKJobsConcurrencyService.java
@@ -207,4 +207,14 @@ public class ZKJobsConcurrencyService extends JobsConcurrencyService implements
return params == null || params.get(RestConstants.ALL_SERVER_REQUEST) == null || params.isEmpty()
|| !params.get(RestConstants.ALL_SERVER_REQUEST)[0].equalsIgnoreCase("false");
}
+
+ /**
+ * Return if it is running in HA mode
+ *
+ * @return
+ */
+ @Override
+ public boolean isHighlyAvailableMode() {
+ return true;
+ }
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/198f5c2a/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java b/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java
index 1ffadbd..0eafacf 100644
--- a/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java
@@ -268,8 +268,9 @@ public class TestCoordActionInputCheckXCommand extends XDataTestCase {
}
public void testActionInputCheckLatestActionCreationTimeWithPushDependency() throws Exception {
+ setupServicesForHCatalog(services);
Services.get().getConf().setBoolean(CoordELFunctions.LATEST_EL_USE_CURRENT_TIME, false);
-
+ services.init();
String jobId = "0000000-" + new Date().getTime() + "-TestCoordActionInputCheckXCommand-C";
Date startTime = DateUtils.parseDateOozieTZ("2009-02-15T23:59" + TZ);
Date endTime = DateUtils.parseDateOozieTZ("2009-02-16T23:59" + TZ);
@@ -400,7 +401,9 @@ public class TestCoordActionInputCheckXCommand extends XDataTestCase {
}
public void testActionInputCheckLatestCurrentTimeWithPushDependency() throws Exception {
+ setupServicesForHCatalog(services);
Services.get().getConf().setBoolean(CoordELFunctions.LATEST_EL_USE_CURRENT_TIME, true);
+ services.init();
String jobId = "0000000-" + new Date().getTime() + "-TestCoordActionInputCheckXCommand-C";
Date startTime = DateUtils.parseDateOozieTZ("2009-02-15T23:59" + TZ);
http://git-wip-us.apache.org/repos/asf/oozie/blob/198f5c2a/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java b/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java
index da09727..3c8b082 100644
--- a/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java
@@ -464,5 +464,4 @@ public class TestCoordPushDependencyCheckXCommand extends XDataTestCase {
throw new Exception("Action ID " + actionId + " was not stored properly in db");
}
}
-
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/198f5c2a/core/src/test/java/org/apache/oozie/service/TestHAPartitionDependencyManagerEhCache.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestHAPartitionDependencyManagerEhCache.java b/core/src/test/java/org/apache/oozie/service/TestHAPartitionDependencyManagerEhCache.java
new file mode 100644
index 0000000..9d3165d
--- /dev/null
+++ b/core/src/test/java/org/apache/oozie/service/TestHAPartitionDependencyManagerEhCache.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.service;
+
+import org.apache.oozie.dependency.hcat.EhcacheHCatDependencyCache;
+
+public class TestHAPartitionDependencyManagerEhCache extends TestHAPartitionDependencyManagerService {
+
+ protected void setUp() throws Exception {
+ super.setUp();
+ services.getConf().set(PartitionDependencyManagerService.CACHE_MANAGER_IMPL,
+ EhcacheHCatDependencyCache.class.getName());
+ services.setService(ZKJobsConcurrencyService.class);
+ PartitionDependencyManagerService pdms = services.get(PartitionDependencyManagerService.class);
+ pdms.init(services);
+ }
+
+ @Override
+ public void testDependencyCacheWithHA(){
+ }
+
+ @Override
+ public void testPurgeMissingDependencies() throws Exception {
+ PartitionDependencyManagerService pdms = services.get(PartitionDependencyManagerService.class);
+ testPurgeMissingDependenciesForCache(pdms);
+ }
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/198f5c2a/core/src/test/java/org/apache/oozie/service/TestHAPartitionDependencyManagerService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestHAPartitionDependencyManagerService.java b/core/src/test/java/org/apache/oozie/service/TestHAPartitionDependencyManagerService.java
new file mode 100644
index 0000000..da383b3
--- /dev/null
+++ b/core/src/test/java/org/apache/oozie/service/TestHAPartitionDependencyManagerService.java
@@ -0,0 +1,257 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.service;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.hadoop.util.Shell;
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.client.CoordinatorAction.Status;
+import org.apache.oozie.client.rest.JsonBean;
+import org.apache.oozie.dependency.hcat.HCatMessageHandler;
+import org.apache.oozie.executor.jpa.BatchQueryExecutor;
+import org.apache.oozie.service.RecoveryService.RecoveryRunnable;
+import org.apache.oozie.test.ZKXTestCase;
+import org.apache.oozie.util.HCatURI;
+
+public class TestHAPartitionDependencyManagerService extends ZKXTestCase {
+
+ protected Services services;
+ protected String server;
+ protected String db;
+ protected String table1;
+ protected String table2;
+ protected String part1;
+ protected String part2;
+ protected String part3;
+
+ protected void setUp() throws Exception {
+ super.setUp();
+ services = super.setupServicesForHCatalog(Services.get());
+ // disable recovery service
+ services.getConf().setInt(RecoveryService.CONF_SERVICE_INTERVAL, 1000000);
+ // disable regular cache purge
+ services.getConf().setInt(PartitionDependencyManagerService.CACHE_PURGE_INTERVAL, 1000000);
+ server = super.getHCatalogServer().getMetastoreAuthority();
+ services.init();
+ }
+
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ }
+
+ private void populateTable() throws Exception {
+ dropTable(db, table1, true);
+ dropTable(db, table2, true);
+ dropDatabase(db, true);
+ createDatabase(db);
+ createTable(db, table1, "dt,country");
+ createTable(db, table2, "dt,country");
+ }
+
+ protected String getSanitizedTestCaseDir() {
+ // On Windows, the working directory will have a colon from to the drive letter. Because colons
+ // are not allowed in DFS paths, we remove it. Also, prepend a backslash to simulate an absolute path.
+ if(Shell.WINDOWS) {
+ return "\\" + getTestCaseDir().replaceAll(":", "");
+ }
+ else {
+ return getTestCaseDir();
+ }
+ }
+
+ public void testDependencyCacheWithHA() throws Exception {
+
+ db = "default";
+ table1 = "mytbl";
+ table2 = "mytb2";
+ part1 = "dt=20120101;country=us";
+ part2 = "dt=20120102;country=us";
+ part3 = "dt=20120103;country=us";
+ String newHCatDependency1 = "hcat://" + server + "/" + db + "/" + table1 + "/" + part1;
+ String newHCatDependency2 = "hcat://" + server + "/" + db + "/" + table1 + "/" + part2;
+ String newHCatDependency3 = "hcat://" + server + "/" + db + "/" + table2 + "/" + part3;
+ HCatURI dep1 = new HCatURI(newHCatDependency1);
+ HCatURI dep2 = new HCatURI(newHCatDependency2);
+ HCatURI dep3 = new HCatURI(newHCatDependency3);
+ // create db, table and partitions
+ populateTable();
+
+ String actionId1 = addInitRecords(newHCatDependency1);
+ String actionId2 = addInitRecords(newHCatDependency2);
+ String actionId3 = addInitRecords(newHCatDependency3);
+
+ // Assume dependency cache on dummy server with missing push dependencies registered
+ PartitionDependencyManagerService dummyPdms = new PartitionDependencyManagerService();
+ PartitionDependencyManagerService pdms = Services.get().get(PartitionDependencyManagerService.class);
+ dummyPdms.init(Services.get());
+ dummyPdms.addMissingDependency(dep1, actionId1);
+ dummyPdms.addMissingDependency(dep2, actionId2);
+ dummyPdms.addMissingDependency(dep3, actionId3);
+
+ Collection<String> waitingActions = (Collection<String>)dummyPdms.getWaitingActions(dep1);
+ assertEquals(1, waitingActions.size());
+ waitingActions = (Collection<String>)dummyPdms.getWaitingActions(dep2);
+ assertEquals(1, waitingActions.size());
+ waitingActions = (Collection<String>)dummyPdms.getWaitingActions(dep3);
+ assertEquals(1, waitingActions.size());
+
+ //Dependency cache on living server doesn't have these partitions registered at this point
+ waitingActions = (Collection<String>)pdms.getWaitingActions(dep1);
+ assertNull(waitingActions);
+ waitingActions = (Collection<String>)pdms.getWaitingActions(dep2);
+ assertNull(waitingActions);
+ waitingActions = (Collection<String>)pdms.getWaitingActions(dep3);
+ assertNull(waitingActions);
+
+ //Assume dummy server is down, and recovery service on living server pick up these jobs
+ dummyPdms.destroy();
+ Runnable recoveryRunnable = new RecoveryRunnable(60, 0, 60);
+ recoveryRunnable.run();
+ waitFor(30 * 1000, new Predicate() {
+ public boolean evaluate() throws Exception {
+ Collection<String> waitingActions;
+ PartitionDependencyManagerService pdms = Services.get().get(PartitionDependencyManagerService.class);
+ HCatURI dep1 = new HCatURI("hcat://"+ server + "/" + db + "/" + table1 + "/" + part1);
+ HCatURI dep2 = new HCatURI("hcat://"+ server + "/" + db + "/" + table1 + "/" + part2);
+ HCatURI dep3 = new HCatURI("hcat://"+ server + "/" + db + "/" + table2 + "/" + part3);
+ waitingActions = pdms.getWaitingActions(dep1);
+ if(waitingActions == null) {
+ return false;
+ }
+ waitingActions = pdms.getWaitingActions(dep2);
+ if(waitingActions == null) {
+ return false;
+ }
+ waitingActions = pdms.getWaitingActions(dep3);
+ if(waitingActions == null) {
+ return false;
+ }
+ return true;
+ }
+ });
+ //Dependency cache on living server has missing partitions added
+ waitingActions = (Collection<String>)pdms.getWaitingActions(dep1);
+ assertEquals(1, waitingActions.size());
+ assertTrue(waitingActions.contains(actionId1));
+ waitingActions = (Collection<String>)pdms.getWaitingActions(dep2);
+ assertEquals(1, waitingActions.size());
+ assertTrue(waitingActions.contains(actionId2));
+ waitingActions = (Collection<String>)pdms.getWaitingActions(dep3);
+ assertEquals(1, waitingActions.size());
+ assertTrue(waitingActions.contains(actionId3));
+
+ HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
+ // mytbl and mytb2 registered to topic map to receive notification
+ assertTrue(hcatService.isRegisteredForNotification(dep1));
+ assertTrue(hcatService.isRegisteredForNotification(dep2));
+ assertTrue(hcatService.isRegisteredForNotification(dep3));
+ }
+
+ protected void addMissingDependencyAndRegister(HCatURI hcatURI, String actionId, PartitionDependencyManagerService pdms) {
+ pdms.addMissingDependency(hcatURI, actionId);
+ HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
+ if (!hcatService.isRegisteredForNotification(hcatURI)) {
+ hcatService.registerForNotification(hcatURI, hcatURI.getDb() + "." + hcatURI.getTable(),
+ new HCatMessageHandler(hcatURI.getServer()));
+ }
+ }
+
+ public void testPurgeMissingDependencies() throws Exception{
+ services.setService(ZKJobsConcurrencyService.class);
+ PartitionDependencyManagerService pdms = services.get(PartitionDependencyManagerService.class);
+ pdms.init(services);
+ testPurgeMissingDependenciesForCache(pdms);
+ }
+
+ protected void testPurgeMissingDependenciesForCache(PartitionDependencyManagerService pdms) throws Exception{
+
+ String actionId1 = "1234465451";
+ String actionId2 = "1234465452";
+ String actionId3 = "1234465453";
+
+ // add partitions as missing
+ HCatURI dep1 = new HCatURI("hcat://hcat-server1.domain.com:5080/mydb/mytbl1/dt=20120101;country=us");
+ HCatURI dep2 = new HCatURI("hcat://hcat-server1.domain.com:5080/mydb/mytbl1/country=us;dt=20120101");
+ HCatURI dep3 = new HCatURI("hcat://hcat-server2.domain.com:5080/mydb/mytbl2/dt=20120102;country=us");
+
+ // actionId1-->(dep1,2), actionId2-->(dep2), actionId3-->(dep2,3)
+ addMissingDependencyAndRegister(dep1, actionId1, pdms);
+ addMissingDependencyAndRegister(dep2, actionId1, pdms);
+ addMissingDependencyAndRegister(dep2, actionId2, pdms);
+ addMissingDependencyAndRegister(dep2, actionId3, pdms);
+ addMissingDependencyAndRegister(dep3, actionId3, pdms);
+
+ List<String> waitingDep1 = (ArrayList<String>) pdms.getWaitingActions(dep1);
+ assertEquals(waitingDep1.size(), 1);
+ assertEquals(waitingDep1.get(0), actionId1);
+
+ List<String> waitingDep2 = (ArrayList<String>) pdms.getWaitingActions(dep2);
+ assertEquals(waitingDep2.size(), 3);
+ for (String id : waitingDep2) {
+ assertTrue(id.equals(actionId1) || id.equals(actionId2) || id.equals(actionId3));
+ }
+ List<String> waitingDep3 = (ArrayList<String>) pdms.getWaitingActions(dep3);
+ assertEquals(waitingDep3.size(), 1);
+ assertTrue(waitingDep3.get(0).equals(actionId3));
+
+ // make only coordAction 1 to WAITING, the rest to RUNNING (only WAITING
+ // remain dependency cache)
+ ArrayList<JsonBean> insertList = new ArrayList<JsonBean>();
+ CoordinatorActionBean coordAction1 = new CoordinatorActionBean();
+ coordAction1.setId(actionId1);
+ coordAction1.setStatus(Status.WAITING);
+ insertList.add(coordAction1);
+ CoordinatorActionBean coordAction2 = new CoordinatorActionBean();
+ coordAction2.setId(actionId2);
+ coordAction2.setStatus(Status.RUNNING);
+ insertList.add(coordAction2);
+ CoordinatorActionBean coordAction3 = new CoordinatorActionBean();
+ coordAction3.setId(actionId3);
+ coordAction3.setStatus(Status.RUNNING);
+ insertList.add(coordAction3);
+ BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, null, null);
+
+ // run cache purge
+ Services.get().getConf().setInt(PartitionDependencyManagerService.CACHE_PURGE_TTL, 0);
+ pdms.runCachePurgeWorker();
+
+ // only coord Action 1 still in dependency cache
+ waitingDep1 = (ArrayList<String>) pdms.getWaitingActions(dep1);
+ assertEquals(waitingDep1.size(), 1);
+ assertTrue(waitingDep1.get(0).equals(actionId1));
+
+ // only coord Action 1 still in dependency cache
+ waitingDep2 = (ArrayList<String>) pdms.getWaitingActions(dep2);
+ assertEquals(waitingDep2.size(), 1);
+ assertTrue(waitingDep2.get(0).equals(actionId1));
+
+ waitingDep3 = (ArrayList<String>) pdms.getWaitingActions(dep3);
+ assertNull(waitingDep3);
+
+ HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
+ // mytbl1 should be still in topic map
+ assertTrue(hcatService.isRegisteredForNotification(dep1));
+ // mytbl1 should be still in topic map
+ assertTrue(hcatService.isRegisteredForNotification(dep2));
+ // mytbl2 should NOT be in topic map
+ assertFalse(hcatService.isRegisteredForNotification(dep3));
+ }
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/198f5c2a/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerEhcache.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerEhcache.java b/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerEhcache.java
index cfdfbd1..7b88a19 100644
--- a/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerEhcache.java
+++ b/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerEhcache.java
@@ -131,5 +131,4 @@ public class TestPartitionDependencyManagerEhcache extends TestPartitionDependen
assertTrue(dep.toURIString() + " is missing in cache", waitingActions.contains(actionID));
}
}
-
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/198f5c2a/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java b/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java
index ef71fb0..67ea851 100644
--- a/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java
@@ -20,16 +20,20 @@ package org.apache.oozie.service;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.net.URISyntaxException;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.List;
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.client.CoordinatorAction.Status;
+import org.apache.oozie.client.rest.JsonBean;
import org.apache.oozie.dependency.hcat.HCatMessageHandler;
+import org.apache.oozie.executor.jpa.BatchQueryExecutor;
import org.apache.oozie.jms.JMSConnectionInfo;
import org.apache.oozie.service.Services;
import org.apache.oozie.test.XDataTestCase;
import org.apache.oozie.util.HCatURI;
import org.apache.oozie.util.XLog;
-import org.junit.After;
-import org.junit.Before;
import org.junit.Test;
/**
@@ -40,14 +44,15 @@ public class TestPartitionDependencyManagerService extends XDataTestCase {
private static XLog LOG = XLog.getLog(TestPartitionDependencyManagerService.class);
protected Services services;
- @Before
+
protected void setUp() throws Exception {
super.setUp();
services = super.setupServicesForHCatalog();
+ // disable regular cache purge
+ services.getConf().setInt(PartitionDependencyManagerService.CACHE_PURGE_INTERVAL, 1000000);
services.init();
}
- @After
protected void tearDown() throws Exception {
Services.get().destroy();
super.tearDown();
@@ -55,6 +60,7 @@ public class TestPartitionDependencyManagerService extends XDataTestCase {
@Test
public void testPartitionDependency() throws Exception {
+
// Test all APIs related to dependency caching
String actionId1 = "1234465451";
String actionId2 = "1234465452";
@@ -72,15 +78,16 @@ public class TestPartitionDependencyManagerService extends XDataTestCase {
HCatURI dep3 = new HCatURI("hcat://hcat-server2.domain.com:5080/mydb/mytbl2/dt=20120102;country=us");
HCatURI dep4 = new HCatURI("hcat://hcat-server2.domain.com:5080/mydb/mytbl2/dt=20120102;country=us;state=CA");
- addMissingDependencyAndRegister(dep1, actionId1);
- addMissingDependencyAndRegister(dep2, actionId1);
- addMissingDependencyAndRegister(dep2, actionId2);
- addMissingDependencyAndRegister(dep2, actionId3);
- addMissingDependencyAndRegister(dep3, actionId3);
- addMissingDependencyAndRegister(dep4, actionId4);
+ PartitionDependencyManagerService pdms = Services.get().get(PartitionDependencyManagerService.class);
+ addMissingDependencyAndRegister(dep1, actionId1, pdms);
+ addMissingDependencyAndRegister(dep2, actionId1, pdms);
+ addMissingDependencyAndRegister(dep2, actionId2, pdms);
+ addMissingDependencyAndRegister(dep2, actionId3, pdms);
+ addMissingDependencyAndRegister(dep3, actionId3, pdms);
+ addMissingDependencyAndRegister(dep4, actionId4, pdms);
// Add duplicates. RecoveryService will add duplicates
- addMissingDependencyAndRegister(dep4, actionId4);
- addMissingDependencyAndRegister(dep4, actionId4);
+ addMissingDependencyAndRegister(dep4, actionId4, pdms);
+ addMissingDependencyAndRegister(dep4, actionId4, pdms);
HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
JMSAccessorService jmsService = Services.get().get(JMSAccessorService.class);
@@ -90,7 +97,6 @@ public class TestPartitionDependencyManagerService extends XDataTestCase {
assertTrue(jmsService.isListeningToTopic(connInfo, dep1.getDb() + "." + dep1.getTable()));
assertTrue(jmsService.isListeningToTopic(connInfo, dep3.getDb() + "." + dep3.getTable()));
- PartitionDependencyManagerService pdms = Services.get().get(PartitionDependencyManagerService.class);
assertTrue(pdms.getWaitingActions(dep1).contains(actionId1));
assertTrue(pdms.getWaitingActions(dep2).contains(actionId1));
assertTrue(pdms.getWaitingActions(dep2).contains(actionId2));
@@ -130,8 +136,7 @@ public class TestPartitionDependencyManagerService extends XDataTestCase {
assertFalse(jmsService.isListeningToTopic(connInfo, dep3.getDb() + "." + dep3.getTable()));
}
- private void addMissingDependencyAndRegister(HCatURI hcatURI, String actionId) {
- PartitionDependencyManagerService pdms = Services.get().get(PartitionDependencyManagerService.class);
+ protected void addMissingDependencyAndRegister(HCatURI hcatURI, String actionId, PartitionDependencyManagerService pdms) {
pdms.addMissingDependency(hcatURI, actionId);
HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
if (!hcatService.isRegisteredForNotification(hcatURI)) {
@@ -191,5 +196,4 @@ public class TestPartitionDependencyManagerService extends XDataTestCase {
assertTrue(dep.toURIString() + " is missing in cache", waitingActions.contains(actionID));
}
}
-
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/oozie/blob/198f5c2a/core/src/test/java/org/apache/oozie/test/XTestCase.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/test/XTestCase.java b/core/src/test/java/org/apache/oozie/test/XTestCase.java
index 1536927..6bf0a8f 100644
--- a/core/src/test/java/org/apache/oozie/test/XTestCase.java
+++ b/core/src/test/java/org/apache/oozie/test/XTestCase.java
@@ -1068,6 +1068,11 @@ public abstract class XTestCase extends TestCase {
protected Services setupServicesForHCatalog() throws ServiceException {
Services services = new Services();
+ setupServicesForHCataLogImpl(services);
+ return services;
+ }
+
+ private void setupServicesForHCataLogImpl(Services services) {
Configuration conf = services.getConf();
conf.set(Services.CONF_SERVICE_EXT_CLASSES,
JMSAccessorService.class.getName() + "," +
@@ -1081,8 +1086,11 @@ public abstract class XTestCase extends TestCase {
FSURIHandler.class.getName() + "," + HCatURIHandler.class.getName());
setSystemProperty("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
setSystemProperty("java.naming.provider.url", "vm://localhost?broker.persistent=false");
- return services;
}
+ protected Services setupServicesForHCatalog(Services services) throws ServiceException {
+ setupServicesForHCataLogImpl(services);
+ return services;
+ }
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/198f5c2a/core/src/test/java/org/apache/oozie/test/ZKXTestCase.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/test/ZKXTestCase.java b/core/src/test/java/org/apache/oozie/test/ZKXTestCase.java
index 7bebaf0..3d37d48 100644
--- a/core/src/test/java/org/apache/oozie/test/ZKXTestCase.java
+++ b/core/src/test/java/org/apache/oozie/test/ZKXTestCase.java
@@ -49,7 +49,7 @@ import org.apache.oozie.util.ZKUtils;
* <p>
* To use security, see {@link ZKXTestCaseWithSecurity}.
*/
-public abstract class ZKXTestCase extends XTestCase {
+public abstract class ZKXTestCase extends XDataTestCase {
private TestingServer zkServer;
private CuratorFramework client = null;
private ServiceDiscovery<Map> sDiscovery = null;
http://git-wip-us.apache.org/repos/asf/oozie/blob/198f5c2a/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index e311c7f..2f89c3e 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
-- Oozie 4.1.0 release (trunk - unreleased)
+OOZIE-1492 Make sure HA works with HCat (ryota)
OOZIE-1869 Sharelib update shows vip/load balancer address as one of the hostname (puru via ryota)
OOZIE-1861 Pig action should work with tez mode (rohini)
OOZIE-1703 User should be able to set coord end-time before start time (puru via rohini)