You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by vi...@apache.org on 2012/12/21 23:47:37 UTC
svn commit: r1425174 - in /oozie/branches/hcat-intre: ./
core/src/main/java/org/apache/oozie/
core/src/main/java/org/apache/oozie/command/coord/
core/src/main/java/org/apache/oozie/dependency/
core/src/main/java/org/apache/oozie/jms/ core/src/main/java...
Author: virag
Date: Fri Dec 21 22:47:37 2012
New Revision: 1425174
URL: http://svn.apache.org/viewvc?rev=1425174&view=rev
Log:
OOZIE-1138 Provide rule based mechanism to allow multiple hcatalog servers to connect to JMS server (virag)
Added:
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/MappingRule.java
Modified:
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/ErrorCode.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdatePushMissingDependency.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/FSURIHandler.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/URIHandler.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/HCatMessageHandler.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/JMSAccessorService.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/PartitionWrapper.java
oozie/branches/hcat-intre/core/src/main/resources/oozie-default.xml
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java
oozie/branches/hcat-intre/release-log.txt
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/ErrorCode.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/ErrorCode.java?rev=1425174&r1=1425173&r2=1425174&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/ErrorCode.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/ErrorCode.java Fri Dec 21 22:47:37 2012
@@ -232,7 +232,7 @@ public enum ErrorCode {
E1501(XLog.STD, "Partition Dependency Manager could not add cache entry"),
E1502(XLog.STD, "Partition cache lookup error"),
- E1503(XLog.STD, "Error in Metadata URI [{0}]"),
+ E1503(XLog.STD, "Error in Metadata URI [{0}]"), //TODO - Error code not used, replace this
E1504(XLog.STD, "Error in getting HCat Access [{0}]"),
E1505(XLog.STD, "Error with JMS Message, Details: [{0}]"),
E1506(XLog.STD, "Error in creating connection or message listener, Details: [{0}]"),
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdatePushMissingDependency.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdatePushMissingDependency.java?rev=1425174&r1=1425173&r2=1425174&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdatePushMissingDependency.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdatePushMissingDependency.java Fri Dec 21 22:47:37 2012
@@ -16,6 +16,7 @@ import org.apache.oozie.executor.jpa.Coo
import org.apache.oozie.executor.jpa.CoordActionUpdatePushInputCheckJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.MetadataServiceException;
import org.apache.oozie.service.PartitionDependencyManagerService;
import org.apache.oozie.service.Services;
import org.apache.oozie.util.LogUtils;
@@ -51,12 +52,15 @@ public class CoordActionUpdatePushMissin
LOG.debug("Updating action Id " + actionId + " for available partition of " + availPartitionList.toString()
+ "missing parts :" + missPartitions);
String newMissPartitions = removePartitions(missPartitions, availPartitionList);
+ LOG.trace("In CoordActionUpdatePushMissingDependency: New missing partitions " +newMissPartitions);
+ // TODO - Check if new miss partitions are updated, then only persist, do the same in CoordInputCheck
coordAction.setPushMissingDependencies(newMissPartitions);
String otherDeps = coordAction.getMissingDependencies();
if ((newMissPartitions == null || newMissPartitions.trim().length() == 0)
&& (otherDeps == null || otherDeps.trim().length() == 0)) {
coordAction.setStatus(CoordinatorAction.Status.READY);
// pass jobID to the CoordActionReadyXCommand
+ LOG.trace("Queuing READY for "+actionId);
queue(new CoordActionReadyXCommand(coordAction.getJobId()), 100);
}
else {
@@ -66,18 +70,19 @@ public class CoordActionUpdatePushMissin
if (jpaService != null) {
try {
jpaService.execute(new CoordActionUpdatePushInputCheckJPAExecutor(coordAction));
+ // remove available partitions for the action as the push dependencies are persisted
+ if (pdms.removeAvailablePartitions(availPartitionList, actionId)) {
+ LOG.debug("Succesfully removed partitions for actionId: [{0}] from available Map ", actionId);
+ }
+ else {
+ LOG.warn("Unable to remove partitions for actionId: [{0}] from available Map ", actionId);
+ }
}
catch (JPAExecutorException jex) {
throw new CommandException(ErrorCode.E1023, jex.getMessage(), jex);
}
- finally {
- // remove from Available map as it is being persisted
- if (pdms.removeActionFromAvailPartitions(actionId)) {
- LOG.debug("Succesfully removed actionId: [{0}] from available Map ", actionId);
- }
- else {
- LOG.warn("Unable to remove actionId: [{0}] from available Map ", actionId);
- }
+ catch (MetadataServiceException e) {
+ throw new CommandException(ErrorCode.E0902, e.getMessage(), e);
}
}
LOG.info("ENDED for Action id [{0}]", actionId);
@@ -145,6 +150,14 @@ public class CoordActionUpdatePushMissin
return actionId;
}
+ /* (non-Javadoc)
+ * @see org.apache.oozie.command.XCommand#getKey()
+ */
+ @Override
+ public String getKey(){
+ return getName() + "_" + actionId;
+ }
+
@Override
protected void loadState() throws CommandException {
try {
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java?rev=1425174&r1=1425173&r2=1425174&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java Fri Dec 21 22:47:37 2012
@@ -37,6 +37,7 @@ import org.apache.oozie.coord.SyncCoordA
import org.apache.oozie.coord.TimeUnit;
import org.apache.oozie.dependency.DependencyType;
import org.apache.oozie.dependency.URIHandler;
+import org.apache.oozie.service.MetadataServiceException;
import org.apache.oozie.service.PartitionDependencyManagerService;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.URIHandlerService;
@@ -283,28 +284,29 @@ public class CoordCommandUtils {
*
* @param event
* @param instances
- * @param dependencyList
* @throws Exception
*/
- public static void separateResolvedAndUnresolved(Element event, StringBuilder instances, StringBuffer dependencyList)
+ public static StringBuffer separateResolvedAndUnresolved(Element event, StringBuilder instances)
throws Exception {
StringBuilder unresolvedInstances = new StringBuilder();
StringBuilder urisWithDoneFlag = new StringBuilder();
+ StringBuffer depList = new StringBuffer();
String uris = createEarlyURIs(event, instances.toString(), unresolvedInstances, urisWithDoneFlag);
if (uris.length() > 0) {
Element uriInstance = new Element("uris", event.getNamespace());
uriInstance.addContent(uris);
event.getContent().add(1, uriInstance);
- if (dependencyList.length() > 0) {
- dependencyList.append(CoordELFunctions.INSTANCE_SEPARATOR);
+ if (depList.length() > 0) {
+ depList.append(CoordELFunctions.INSTANCE_SEPARATOR);
}
- dependencyList.append(urisWithDoneFlag);
+ depList.append(urisWithDoneFlag);
}
if (unresolvedInstances.length() > 0) {
Element elemInstance = new Element(UNRESOLVED_INST_TAG, event.getNamespace());
elemInstance.addContent(unresolvedInstances.toString());
event.getContent().add(1, elemInstance);
}
+ return depList;
}
/**
@@ -350,8 +352,9 @@ public class CoordCommandUtils {
String uriPath = CoordELFunctions.evalAndWrap(eval, event.getChild("dataset", event.getNamespace())
.getChild("uri-template", event.getNamespace()).getTextTrim());
- uris.append(uriPath);
URIHandler uriHandler = uriService.getURIHandler(uriPath);
+ uriHandler.validate(uriPath);
+ uris.append(uriPath);
urisWithDoneFlag.append(uriHandler.getURIWithDoneFlag(uriPath, doneFlagElement));
}
return uris.toString();
@@ -525,11 +528,6 @@ public class CoordCommandUtils {
.getChild("uri-template", event.getNamespace());
String pullOrPush = "pull";
String uriTemplate = uri.getText();
- URI baseURI = uriService.getAuthorityWithScheme(uriTemplate);
- URIHandler handler = uriService.getURIHandler(baseURI);
- if (uriTemplate != null && handler.getDependencyType(baseURI).equals(DependencyType.PUSH)) {
- pullOrPush = "push";
- }
StringBuilder instances = new StringBuilder();
ELEvaluator eval = CoordELEvaluator.createInstancesELEvaluator(event, appInst, conf);
// Handle list of instance tag
@@ -537,7 +535,13 @@ public class CoordCommandUtils {
// Handle start-instance and end-instance
resolveInstanceRange(event, instances, appInst, conf, eval);
// Separate out the unresolved instances
- separateResolvedAndUnresolved(event, instances, dependencyList.get(pullOrPush));
+ StringBuffer depList = separateResolvedAndUnresolved(event, instances);
+ URI baseURI = uriService.getAuthorityWithScheme(uriTemplate);
+ URIHandler handler = uriService.getURIHandler(baseURI);
+ if (handler.getDependencyType(baseURI).equals(DependencyType.PUSH)) {
+ pullOrPush = "push";
+ }
+ dependencyList.put(pullOrPush, depList);
String tmpUnresolved = event.getChildTextTrim(UNRESOLVED_INST_TAG, event.getNamespace());
if (tmpUnresolved != null) {
if (unresolvedList.get(pullOrPush).length() > 0) {
@@ -584,7 +588,7 @@ public class CoordCommandUtils {
* @param actionBean
* @throws Exception
*/
- public static void registerPartition(CoordinatorActionBean actionBean) throws Exception {
+ public static void registerPartition(CoordinatorActionBean actionBean) throws MetadataServiceException {
String resolved = getResolvedList(actionBean.getPushMissingDependencies(), new StringBuilder(),
new StringBuilder());
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java?rev=1425174&r1=1425173&r2=1425174&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java Fri Dec 21 22:47:37 2012
@@ -32,6 +32,7 @@ import org.apache.oozie.SLAEventBean;
import org.apache.oozie.client.CoordinatorJob;
import org.apache.oozie.client.Job;
import org.apache.oozie.client.SLAEvent.SlaAppType;
+import org.apache.oozie.client.rest.JsonBean;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.MaterializeTransitionXCommand;
import org.apache.oozie.command.PreconditionException;
@@ -42,6 +43,7 @@ import org.apache.oozie.executor.jpa.Coo
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.MetadataServiceException;
import org.apache.oozie.service.Service;
import org.apache.oozie.service.Services;
import org.apache.oozie.util.DateUtils;
@@ -106,10 +108,21 @@ public class CoordMaterializeTransitionX
public void performWrites() throws CommandException {
try {
jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
+ // register the partition related dependencies of actions
+ for (JsonBean actionBean : insertList) {
+ if (actionBean instanceof CoordinatorActionBean) {
+ CoordinatorActionBean coordAction = (CoordinatorActionBean) actionBean;
+ CoordCommandUtils.registerPartition(coordAction);
+ queue(new CoordPushDependencyCheckXCommand(coordAction.getId()));
+ }
+ }
}
catch (JPAExecutorException jex) {
throw new CommandException(jex);
}
+ catch (MetadataServiceException ex) {
+ LOG.warn("Error happened in registering partitions ", ex);
+ }
}
/* (non-Javadoc)
@@ -329,8 +342,7 @@ public class CoordMaterializeTransitionX
if (!dryrun) {
storeToDB(actionBean, action); // Storing to table
- CoordCommandUtils.registerPartition(actionBean); // Register partition to PDMS
- queue(new CoordPushDependencyCheckXCommand(actionBean.getId()));
+
}
else {
actionStrings.append("action for new instance");
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java?rev=1425174&r1=1425173&r2=1425174&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java Fri Dec 21 22:47:37 2012
@@ -143,10 +143,19 @@ public class CoordPushDependencyCheckXCo
* @see org.apache.oozie.command.XCommand#getEntityKey()
*/
@Override
+ // TODO - Check whether the entityKey should be JobId or actionId
public String getEntityKey() {
return actionId;
}
+ /* (non-Javadoc)
+ * @see org.apache.oozie.command.XCommand#getKey()
+ */
+ @Override
+ public String getKey(){
+ return getName() + "_" + actionId;
+ }
+
/*
* (non-Javadoc)
*
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/FSURIHandler.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/FSURIHandler.java?rev=1425174&r1=1425173&r2=1425174&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/FSURIHandler.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/FSURIHandler.java Fri Dec 21 22:47:37 2012
@@ -120,6 +120,10 @@ public class FSURIHandler extends URIHan
}
@Override
+ public void validate(String uri) throws URIAccessorException {
+ }
+
+ @Override
public void destroy() {
}
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java?rev=1425174&r1=1425173&r2=1425174&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java Fri Dec 21 22:47:37 2012
@@ -32,8 +32,11 @@ import org.apache.hcatalog.api.HCatTable
import org.apache.hcatalog.common.HCatException;
import org.apache.hcatalog.data.schema.HCatFieldSchema;
import org.apache.oozie.ErrorCode;
+import org.apache.oozie.service.JMSAccessorService;
import org.apache.oozie.service.MetaDataAccessorException;
import org.apache.oozie.service.MetaDataAccessorService;
+import org.apache.oozie.service.MetadataServiceException;
+import org.apache.oozie.service.PartitionDependencyManagerService;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.URIAccessorException;
import org.apache.oozie.util.HCatURI;
@@ -61,10 +64,10 @@ public class HCatURIHandler extends URIH
@Override
public DependencyType getDependencyType(URI uri) throws URIAccessorException {
- // TODO Determine if jms server is configured for the hcat server and
- // return poll or notify
- return DependencyType.PUSH;
- }
+ JMSAccessorService service = Services.get().get(JMSAccessorService.class);
+ return service.getOrCreateConnection(uri.getScheme() + "://" + uri.getAuthority()) ? DependencyType.PUSH
+ : DependencyType.PULL;
+ }
@Override
public void registerForNotification(URI uri, String actionID) throws URIAccessorException {
@@ -112,6 +115,17 @@ public class HCatURIHandler extends URIH
}
@Override
+ public void validate (String uri) throws URIAccessorException{
+ try {
+ new HCatURI(uri); //will fail if uri syntax is incorrect
+ }
+ catch (URISyntaxException e) {
+ throw new URIAccessorException(ErrorCode.E1025, uri, e);
+ }
+
+ }
+
+ @Override
public void destroy() {
}
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/URIHandler.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/URIHandler.java?rev=1425174&r1=1425173&r2=1425174&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/URIHandler.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/URIHandler.java Fri Dec 21 22:47:37 2012
@@ -146,5 +146,14 @@ public abstract class URIHandler {
*/
public abstract String getURIWithDoneFlag(String uri, String doneFlag) throws URIAccessorException;
+ /**
+ * Check whether the URI is valid or not
+ * @param uri
+ * @return
+ * @throws URIAccessorException
+ */
+ public abstract void validate(String uri) throws URIAccessorException;
+
public abstract void destroy();
+
}
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/HCatMessageHandler.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/HCatMessageHandler.java?rev=1425174&r1=1425173&r2=1425174&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/HCatMessageHandler.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/HCatMessageHandler.java Fri Dec 21 22:47:37 2012
@@ -36,6 +36,7 @@ public class HCatMessageHandler implemen
private PartitionWrapper msgPartition;
private static XLog log;
+ public static final String THRIFT_SCHEME = "thrift";
public HCatMessageHandler() {
log = XLog.getLog(getClass());
@@ -56,9 +57,11 @@ public class HCatMessageHandler implemen
// Parse msg components
AddPartitionMessage partMsg = (AddPartitionMessage) hcatMsg;
String server = partMsg.getServer();
+ int index = server.indexOf("://");
+ server = server.substring(index + 3);
String db = partMsg.getDB();
String table = partMsg.getTable();
- log.info("ADD event type db [{0}] table [{1}] partitions [{3}]", db, table, partMsg.getPartitions());
+ log.info("ADD event type db [{0}] table [{1}] partitions [{2}]", db, table, partMsg.getPartitions());
PartitionDependencyManagerService pdms = Services.get().get(PartitionDependencyManagerService.class);
if (pdms != null) {
// message is batched. therefore iterate through partitions
@@ -93,6 +96,7 @@ public class HCatMessageHandler implemen
catch (IllegalArgumentException iae) {
throw new MetadataServiceException(ErrorCode.E1505, iae);
}
+
}
}
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/JMSAccessorService.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/JMSAccessorService.java?rev=1425174&r1=1425173&r2=1425174&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/JMSAccessorService.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/JMSAccessorService.java Fri Dec 21 22:47:37 2012
@@ -17,7 +17,9 @@
*/
package org.apache.oozie.service;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
@@ -37,6 +39,7 @@ import javax.naming.NamingException;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.jms.MessageReceiver;
+import org.apache.oozie.util.MappingRule;
import org.apache.oozie.util.XLog;
/**
@@ -54,9 +57,12 @@ public class JMSAccessorService implemen
public static final String JMS_CONNECTIONS_PROPERTIES = CONF_PREFIX + "connections";
public static final String SESSION_OPTS = CONF_PREFIX + "jms.sessionOpts";
public static final String DEFAULT_SERVER_ENDPOINT = "default";
+ private String defaultConnection = null;
+
private static XLog LOG;
private Configuration conf;
+ private List<MappingRule> mappingRules = null;
ConcurrentHashMap<String, ConnectionContext> connSessionMap = new ConcurrentHashMap<String, ConnectionContext>();
HashMap<String, Properties> hmConnProps = new HashMap<String, Properties>();
@@ -64,11 +70,106 @@ public class JMSAccessorService implemen
public void init(Services services) throws ServiceException {
LOG = XLog.getLog(getClass());
conf = services.getConf();
- parseConfiguration(conf);
- establishConnections();
+ initializeMappingRules();
+ }
+
+
+ protected void initializeMappingRules() {
+ String connectionString = conf.get(JMS_CONNECTIONS_PROPERTIES);
+ if (connectionString != null) {
+ String[] connections = connectionString.split("\\s*,\\s*");
+ mappingRules = new ArrayList<MappingRule>(connections.length);
+ for (String connection : connections) {
+ String[] values = connection.split("=", 2);
+ String key = values[0].replaceAll("^\\s+|\\s+$", "");
+ String value = values[1].replaceAll("^\\s+|\\s+$", "");
+ if (key.equals("default")) {
+ defaultConnection = value;
+ }
+ else {
+ mappingRules.add(new MappingRule(key, value));
+ }
+ }
+ }
+ else {
+ LOG.warn("No JMS connection defined");
+ }
}
/**
+ * Checks if the connection exists or not. If it doens't exists, creates a new one.
+ * Returns false if the connection cannot be established
+ * @param serverName
+ * @return
+ */
+ public boolean getOrCreateConnection(String serverName) {
+ if (!isExistsConnection(serverName)) {
+ // Get JNDI properties related to the JMS server name
+ Properties props = getJMSServerProps(serverName);
+ if (props != null) {
+ Connection conn = null;
+ try {
+ conn = getConnection(props);
+ }
+ catch (ServiceException se) {
+ LOG.warn("Could not create connection " + se.getErrorCode() + se.getMessage());
+ return false;
+ }
+ LOG.info("Connection established to JMS Server for " + serverName);
+ connSessionMap.put(serverName, new ConnectionContext(conn));
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Checks whether connection to JMS server already exists
+ * @param serverName
+ * @return
+ */
+ public boolean isExistsConnection(String serverName) {
+ if (connSessionMap.containsKey(serverName)) {
+ LOG.info("Connection exists to JMS Server for " + serverName);
+ return true; // connection already exists
+ }
+ else {
+ return false;
+ }
+ }
+
+
+ protected Properties getJMSServerProps(String serverName) {
+ Properties props = null;
+ if (hmConnProps.containsKey(serverName)) {
+ props = hmConnProps.get(serverName);
+ return props;
+ }
+ String jmsServerMapping = getJMSServerMapping(serverName);
+ LOG.trace("\n JMS Server Mapping for server "+ serverName + "is " + jmsServerMapping);
+ if (jmsServerMapping == null) {
+ return null;
+ }
+ else {
+ props = getJMSPropsFromConf(jmsServerMapping);
+ if (props != null) {
+ hmConnProps.put(serverName, props);
+ }
+ return props;
+ }
+ }
+
+ protected String getJMSServerMapping(String serverName) {
+ for (MappingRule mr : mappingRules) {
+ String jmsServerMapping = mr.applyRule(serverName);
+ if (jmsServerMapping != null) {
+ return jmsServerMapping;
+ }
+ }
+ return defaultConnection;
+
+ }
+ /**
* Returns Consumer object for specific service end point and topic name
*
* @param endPoint : Service end-point (preferably HCatalog server address)
@@ -157,49 +258,37 @@ public class JMSAccessorService implemen
return;
}
- private void establishConnections() throws ServiceException {
- for (String key : hmConnProps.keySet()) {
- connSessionMap.put(key, new ConnectionContext(getConnection(hmConnProps.get(key))));
- }
- }
-
- private void parseConfiguration(Configuration conf) {
- String[] keyVals = conf.getStrings(JMS_CONNECTIONS_PROPERTIES, "");
- for (String kVal : keyVals) {
- LOG.info("Key=value " + kVal);
- if (kVal.trim().length() > 0) {
- addToHM(kVal);
- }
- }
- }
- private void addToHM(String kVal) {
- int pos = kVal.indexOf("=");
+ protected Properties getJMSPropsFromConf(String kVal) {
Properties props = new Properties();
- if (pos > 0) {
- String val = kVal.substring(pos + 1);
- String[] propArr = val.split(";");
- for (String pair : propArr) {
- String[] kV = pair.split("#");
- if (kV.length > 1) {
- props.put(kV[0].trim(), kV[1].trim());
- }
- else {
- LOG.info("Unformatted properties. Expected key#value : " + pair);
- }
+ String[] propArr = kVal.split(";");
+ for (String pair : propArr) {
+ String[] kV = pair.split("#");
+ if (kV.length > 1) {
+ props.put(kV[0].trim(), kV[1].trim());
+ }
+ else {
+ LOG.info("Unformatted properties. Expected key#value : " + pair);
}
- String key = kVal.substring(0, pos);
- LOG.info(key + ": Adding " + props);
- hmConnProps.put(key.trim(), props);
}
- else {
- LOG.info("Unformatted properties. Expected two parts : " + kVal);
+ if (props.isEmpty()) {
+ return null;
}
+ return props;
}
@Override
public void destroy() {
// TODO Remove topic sessions based on no demand
+ LOG.info("Destroying JMSAccessor service ");
+ for (Entry<String, ConnectionContext> entry : connSessionMap.entrySet()) {
+ try {
+ entry.getValue().getConnection().close();
+ }
+ catch (JMSException e) {
+ LOG.warn("Unable to close the connection for " + entry.getKey(), e);
+ }
+ }
}
@@ -211,9 +300,9 @@ public class JMSAccessorService implemen
/*
* Look up connection factory Create connection
*/
- private Connection getConnection(Properties props) throws ServiceException {
+ protected synchronized Connection getConnection(Properties props) throws ServiceException {
- Connection conn;
+ Connection conn = null;
try {
Context jndiContext = getJndiContext(props);
String connFacName = (String) jndiContext.getEnvironment().get(JMS_CONNECTION_FACTORY);
@@ -232,11 +321,16 @@ public class JMSAccessorService implemen
});
}
- catch (NamingException e) {
- throw new ServiceException(ErrorCode.E0100, getClass().getName(), e.getMessage(), e);
- }
- catch (JMSException e) {
- throw new ServiceException(ErrorCode.E0100, getClass().getName(), e.getMessage(), e);
+ catch (Exception e1){
+ LOG.error(e1.getMessage(), e1);
+ if (conn != null) {
+ try {
+ conn.close();
+ }
+ catch (Exception e2) {
+ LOG.error(e2.getMessage(), e2);
+ }
+ }
}
return conn;
}
@@ -304,18 +398,6 @@ public class JMSAccessorService implemen
}
}
- @Override
- public void finalize() {
- LOG.info("Finalizing ");
- for (Entry<String, ConnectionContext> entry : connSessionMap.entrySet()) {
- try {
- entry.getValue().getConnection().close();
- }
- catch (JMSException e) {
- LOG.warn("Unable to close the connection for " + entry.getKey(), e);
- }
- }
- }
/**
* This class maintains a JMS connection and map of topic to Session. Only
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java?rev=1425174&r1=1425173&r2=1425174&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java Fri Dec 21 22:47:37 2012
@@ -34,16 +34,14 @@ import org.apache.oozie.ErrorCode;
import org.apache.oozie.command.coord.CoordActionUpdatePushMissingDependency;
import org.apache.oozie.jms.HCatMessageHandler;
import org.apache.oozie.jms.MessageReceiver;
-import org.apache.oozie.service.Service;
-import org.apache.oozie.service.ServiceException;
-import org.apache.oozie.service.Services;
+import org.apache.oozie.util.HCatURI;
import org.apache.oozie.util.PartitionWrapper;
import org.apache.oozie.util.PartitionsGroup;
import org.apache.oozie.util.WaitingActions;
-import org.apache.oozie.util.HCatURI;
import org.apache.oozie.util.XCallable;
import org.apache.oozie.util.XLog;
+
/**
* Module that functions like a caching service to maintain partition dependency
* mappings
@@ -119,21 +117,6 @@ public class PartitionDependencyManagerS
}
/**
- * Remove en entry from available Map
- *
- * @param actionId
- * @return true if the entry exists , otherwise false
- */
- public boolean removeActionFromAvailPartitions(String actionId) {
- boolean ret = false;
- if (availMap.containsKey(actionId)) {
- availMap.remove(actionId);
- ret = true;
- }
- return ret;
- }
-
- /**
* Remove an action from missing partition map
*
* @param hcatURI
@@ -150,7 +133,7 @@ public class PartitionDependencyManagerS
catch (URISyntaxException e) {
throw new MetadataServiceException(ErrorCode.E1025, e.getMessage());
}
- PartitionWrapper partition = new PartitionWrapper(uri.getServerEndPoint(), uri.getDb(), uri.getTable(),
+ PartitionWrapper partition = new PartitionWrapper(uri.getServer(), uri.getDb(), uri.getTable(),
uri.getPartitionMap());
List<String> actions = _getActionsForPartition(partition);
if (actions != null && actions.size() != 0) {
@@ -169,7 +152,7 @@ public class PartitionDependencyManagerS
* @param actionId
* @throws MetadataServiceException
*/
- public void addMissingPartition(PartitionWrapper partition, String actionId) throws MetadataServiceException {
+ private void addMissingPartition(PartitionWrapper partition, String actionId) throws MetadataServiceException {
String prefix = PartitionWrapper.makePrefix(partition.getServerName(), partition.getDbName());
Map<String, PartitionsGroup> tablePartitionsMap;
String tableName = partition.getTableName();
@@ -187,7 +170,7 @@ public class PartitionDependencyManagerS
else { // new partition from different hcat server/db
_addNewEntry(hcatInstanceMap, prefix, tableName, partition, actionId);
}
- _registerMessageReceiver(partition);
+
}
catch (ClassCastException e) {
throw new MetadataServiceException(ErrorCode.E1501, e.getCause());
@@ -203,16 +186,16 @@ public class PartitionDependencyManagerS
}
}
- private void _registerMessageReceiver(PartitionWrapper partition) throws MetadataServiceException {
+ private void _registerMessageReceiver(PartitionWrapper partition, String serverEndPoint) throws MetadataServiceException {
String topic = _getTopic(partition);
try {
- MessageReceiver recvr = Services.get().get(JMSAccessorService.class).getTopicReceiver(topic);
+ MessageReceiver recvr = Services.get().get(JMSAccessorService.class).getTopicReceiver(serverEndPoint, topic);
//Register new listener only if topic is new. Else do nothing
if(recvr == null) {
//Registering new receiver
recvr = new MessageReceiver(new HCatMessageHandler());
- log.debug("Registering to listen on topic :" + topic);
- recvr.registerTopic(topic); //server-endpoint is obtained from partition
+ log.debug("Registering to listen on topic :" + topic + "for endpoint " + serverEndPoint);
+ recvr.registerTopic(serverEndPoint, topic); //server-endpoint is obtained from partition
}
}
catch (JMSException e) {
@@ -254,11 +237,13 @@ public class PartitionDependencyManagerS
catch (URISyntaxException e) {
throw new MetadataServiceException(ErrorCode.E1025, e.getMessage());
}
- PartitionWrapper partition = new PartitionWrapper(uri.getServerEndPoint(), uri.getDb(), uri.getTable(),
+ PartitionWrapper partition = new PartitionWrapper(uri.getServer(), uri.getDb(), uri.getTable(),
uri.getPartitionMap());
addMissingPartition(partition, actionId);
+ _registerMessageReceiver(partition, uri.getServerEndPoint());
}
+
public void addMissingPartitions(String[] hcatURIs, String actionId) throws MetadataServiceException {
for (String uri : hcatURIs) {
if (uri != null && uri.length() > 0) {
@@ -267,6 +252,32 @@ public class PartitionDependencyManagerS
}
}
+ /** Remove available partitions for an action
+ *
+ * @param partitions
+ * @param actionId
+ * @return
+ * @throws MetadataServiceException
+ */
+ public boolean removeAvailablePartitions(List<PartitionWrapper> partitions, String actionId)
+ throws MetadataServiceException {
+ List<PartitionWrapper> availList = null;
+ if (!availMap.containsKey(actionId)) {
+ return false;
+ }
+ else {
+ availList = availMap.get(actionId);
+ }
+ if (!availList.removeAll(partitions)) {
+ return false;
+ }
+ if (availList.isEmpty()) {
+ availMap.remove(actionId);
+ }
+ return true;
+ }
+
+
/**
* Remove partition entry specified by PartitionWrapper object and cascading
* delete indicator
@@ -293,7 +304,8 @@ public class PartitionDependencyManagerS
if (tableMap.size() == 0) {
hcatInstanceMap.remove(prefix);
}
- _deregisterMessageReceiver(partition);
+ // TODO - do unregistering in a synchronized way
+ //_deregisterMessageReceiver(partition);
}
}
return true;
@@ -329,7 +341,7 @@ public class PartitionDependencyManagerS
catch (URISyntaxException e) {
throw new MetadataServiceException(ErrorCode.E1025, e.getMessage());
}
- PartitionWrapper partition = new PartitionWrapper(uri.getServerEndPoint(), uri.getDb(), uri.getTable(),
+ PartitionWrapper partition = new PartitionWrapper(uri.getServer(), uri.getDb(), uri.getTable(),
uri.getPartitionMap());
return removePartition(partition, cascade);
}
@@ -350,7 +362,7 @@ public class PartitionDependencyManagerS
catch (URISyntaxException e) {
throw new MetadataServiceException(ErrorCode.E1025, e.getMessage());
}
- PartitionWrapper partition = new PartitionWrapper(uri.getServerEndPoint(), uri.getDb(), uri.getTable(),
+ PartitionWrapper partition = new PartitionWrapper(uri.getServer(), uri.getDb(), uri.getTable(),
uri.getPartitionMap());
return removePartition(partition, true);
}
@@ -408,7 +420,7 @@ public class PartitionDependencyManagerS
catch (URISyntaxException e) {
throw new MetadataServiceException(ErrorCode.E1025, e.getMessage());
}
- PartitionWrapper partition = new PartitionWrapper(uri.getServerEndPoint(), uri.getDb(), uri.getTable(),
+ PartitionWrapper partition = new PartitionWrapper(uri.getServer(), uri.getDb(), uri.getTable(),
uri.getPartitionMap());
return partitionAvailable(partition);
}
@@ -426,13 +438,14 @@ public class PartitionDependencyManagerS
uri = new HCatURI(hcatURI);
}
catch (URISyntaxException e) {
- throw new MetadataServiceException(ErrorCode.E1503, e.getMessage());
+ throw new MetadataServiceException(ErrorCode.E1025, e.getMessage());
}
- PartitionWrapper partition = new PartitionWrapper(uri.getServerEndPoint(), uri.getDb(), uri.getTable(),
+ PartitionWrapper partition = new PartitionWrapper(uri.getServer(), uri.getDb(), uri.getTable(),
uri.getPartitionMap());
return containsPartition(partition);
}
+
/**
* Determine if a partition entry exists in cache
*
Added: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/MappingRule.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/MappingRule.java?rev=1425174&view=auto
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/MappingRule.java (added)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/MappingRule.java Fri Dec 21 22:47:37 2012
@@ -0,0 +1,74 @@
+package org.apache.oozie.util;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Class for rule mapping
+ */
+public class MappingRule {
+
+ private static Pattern variableNamePattern = Pattern.compile("\\$\\{[0-9]\\}");
+ private Pattern fromPattern;
+ private String fromString;
+ private String toString;
+ private boolean patternMatch;
+
+ /**
+ * Maps from source rule to destination rule
+ * @param fromRule - Rule for which input needs to be matched
+ * @param toRule - Rule for value to be returned
+ */
+ public MappingRule(String fromRule, String toRule) {
+ if (fromRule.contains("$")) {
+ patternMatch = true;
+ fromRule = fromRule.replaceAll("\\.", "\\\\.");
+ Matcher match = variableNamePattern.matcher(fromRule);
+ fromRule = match.replaceAll("(.*)");
+ fromPattern = Pattern.compile(fromRule);
+ }
+ else {
+ fromString = fromRule;
+ }
+ toString = toRule;
+ }
+
+ /**
+ * Gets the from rule
+ * @return
+ */
+ public String getFromRule() {
+ return fromString;
+ }
+
+ /**
+ * Gets the to rule
+ * @return
+ */
+ public String getToRule() {
+ return toString;
+ }
+
+ /**
+ * Applies rules based on the input
+ * @param input
+ * @return
+ */
+ public String applyRule(String input) {
+ if (patternMatch) {
+ Matcher match = fromPattern.matcher(input);
+ if (match.matches()) {
+ String result = toString;
+ int count = match.groupCount();
+ for (int i = 1; i <= count; i++) {
+ result = result.replace("${" + (i) + "}", match.group(i));
+ }
+ return result;
+ }
+ }
+ else if (input.equals(fromString)) {
+ return toString;
+ }
+ return null;
+ }
+}
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/PartitionWrapper.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/PartitionWrapper.java?rev=1425174&r1=1425173&r2=1425174&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/PartitionWrapper.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/PartitionWrapper.java Fri Dec 21 22:47:37 2012
@@ -45,7 +45,7 @@ public class PartitionWrapper {
}
public PartitionWrapper(HCatURI hcatUri) {
- this(hcatUri.getServerEndPoint(), hcatUri.getDb(), hcatUri.getTable(), hcatUri.getPartitionMap());
+ this(hcatUri.getServer(), hcatUri.getDb(), hcatUri.getTable(), hcatUri.getPartitionMap());
}
public PartitionWrapper(String partURI) throws URISyntaxException {
Modified: oozie/branches/hcat-intre/core/src/main/resources/oozie-default.xml
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/resources/oozie-default.xml?rev=1425174&r1=1425173&r2=1425174&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/resources/oozie-default.xml (original)
+++ oozie/branches/hcat-intre/core/src/main/resources/oozie-default.xml Fri Dec 21 22:47:37 2012
@@ -143,6 +143,8 @@
identifies the HCatalog server URL. "default" is used if no endpoint is mentioned
in the query. If some JMS property is not defined, the system will use the property
defined jndi.properties. jndi.properties files is retrieved from the application classpath.
+ Mapping rules can also be provided for mapping Hcatalog server names to JMS server.
+ hcat://${1}.${2}.server.com:8020=java.naming.factory.initial#Dummy.Factory;java.naming.provider.url#tcp://broker.${2}:61616
</description>
</property>
Modified: oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java?rev=1425174&r1=1425173&r2=1425174&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java (original)
+++ oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java Fri Dec 21 22:47:37 2012
@@ -23,7 +23,9 @@ import javax.jms.Session;
import junit.framework.Assert;
+import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.test.XTestCase;
+import org.junit.Test;
public class TestJMSAccessorService extends XTestCase {
private Services services;
@@ -42,17 +44,28 @@ public class TestJMSAccessorService exte
super.tearDown();
}
+ @Test
public void testService() {
JMSAccessorService jmsService = services.get(JMSAccessorService.class);
Assert.assertNotNull(jmsService);
}
+ @Test
+ public void testConnection() throws Exception {
+ JMSAccessorService jmsService = services.get(JMSAccessorService.class);
+ // both servers should connect to default JMS server
+ assertTrue(jmsService.getOrCreateConnection("blahblah"));
+ assertTrue(jmsService.getOrCreateConnection(JMSAccessorService.DEFAULT_SERVER_ENDPOINT));
+ }
+
+ @Test
public void testConsumer() throws Exception {
JMSAccessorService jmsService = services.get(JMSAccessorService.class);
+ jmsService.getOrCreateConnection(JMSAccessorService.DEFAULT_SERVER_ENDPOINT);
MessageConsumer consumer = null;
try {
consumer = jmsService.getMessageConsumer(JMSAccessorService.DEFAULT_SERVER_ENDPOINT, "test-topic");
- assert (consumer != null);
+ assertTrue(consumer != null);
}
finally {
if (consumer != null) {
@@ -62,12 +75,14 @@ public class TestJMSAccessorService exte
}
}
+ @Test
public void testProducer() throws Exception {
JMSAccessorService jmsService = services.get(JMSAccessorService.class);
+ jmsService.getOrCreateConnection(JMSAccessorService.DEFAULT_SERVER_ENDPOINT);
MessageProducer producer = null;
try {
producer = jmsService.getMessageProducer(JMSAccessorService.DEFAULT_SERVER_ENDPOINT, "test-topic");
- assert (producer != null);
+ assertTrue(producer != null);
}
finally {
if (producer != null) {
@@ -76,12 +91,14 @@ public class TestJMSAccessorService exte
}
}
+ @Test
public void testSession() throws Exception {
JMSAccessorService jmsService = services.get(JMSAccessorService.class);
+ jmsService.getOrCreateConnection(JMSAccessorService.DEFAULT_SERVER_ENDPOINT);
Session sess = null;
try {
sess = jmsService.getSession(JMSAccessorService.DEFAULT_SERVER_ENDPOINT, "test-topic");
- assert (sess != null);
+ assertTrue(sess != null);
}
finally {
if (sess != null) {
@@ -90,16 +107,13 @@ public class TestJMSAccessorService exte
}
}
- public void testConnection() {
- JMSAccessorService jmsService = services.get(JMSAccessorService.class);
- JMSAccessorService.ConnectionContext conCtx = jmsService
- .getConnectionContext(JMSAccessorService.DEFAULT_SERVER_ENDPOINT);
- assert (conCtx.getConnection() != null);
- }
-
+ @Test
public void testSingleConsumerPerTopic() {
try {
+ JMSAccessorService jmsService = services.get(JMSAccessorService.class);
+ String endPoint = "hcat://hcat.server.com:5080";
+ jmsService.getOrCreateConnection(endPoint);
// Add sample missing partitions belonging to same table
String partition1 = "hcat://hcat.server.com:5080/mydb/mytable/mypart=10";
String partition2 = "hcat://hcat.server.com:5080/mydb/mytable/mypart=20";
@@ -113,8 +127,7 @@ public class TestJMSAccessorService exte
pdms.addMissingPartition(partition2, "action-2");
//this registers the message receiver
- JMSAccessorService jmsService = services.get(JMSAccessorService.class);
- String endPoint = JMSAccessorService.DEFAULT_SERVER_ENDPOINT;
+
assertNotNull(jmsService.getConnectionContext(endPoint));
assertNotNull(jmsService.getSession(endPoint, topic));
@@ -129,8 +142,54 @@ public class TestJMSAccessorService exte
}
catch (Exception e) {
+ e.printStackTrace();
fail("Exception encountered : " + e);
}
}
+
+ @Test
+ public void testGetJMSServerMappingNoDefault() throws ServiceException {
+ services.destroy();
+ services = super.setupServicesForHCatalog();
+ Configuration conf = services.getConf();
+ String server2 = "hcat://${1}.${2}.server.com:8020=java.naming.factory.initial#Dummy.Factory;java.naming.provider.url#tcp://broker.${2}:61616";
+ String server3 = "hcat://xyz.corp.dummy.com=java.naming.factory.initial#Dummy.Factory;java.naming.provider.url#tcp:localhost:61616";
+
+ String jmsConnectionURL = server2 + "," + server3;
+ conf.set(JMSAccessorService.JMS_CONNECTIONS_PROPERTIES, jmsConnectionURL);
+ services.init();
+
+ JMSAccessorService jmsService = services.get(JMSAccessorService.class);
+ // No default JMS mapping
+ String jmsServerMapping = jmsService.getJMSServerMapping("UNKNOWN_SERVER");
+ assertNull(jmsServerMapping);
+ }
+
+ @Test
+ public void testGetJMSServerMapping() throws ServiceException{
+ services.destroy();
+ services = super.setupServicesForHCatalog();
+ Configuration conf = services.getConf();
+ String server1 = "default=java.naming.factory.initial#org.apache.activemq.jndi.ActiveMQInitialContextFactory;java.naming.provider.url#vm://localhost?broker.persistent=false";
+ String server2 = "hcat://${1}.${2}.server.com:8020=java.naming.factory.initial#Dummy.Factory;java.naming.provider.url#tcp://broker.${2}:61616";
+ String server3 = "hcat://xyz.corp.dummy.com=java.naming.factory.initial#Dummy.Factory;java.naming.provider.url#tcp:localhost:61616";
+
+ String jmsConnectionURL = server1+","+server2+","+server3;
+ conf.set(JMSAccessorService.JMS_CONNECTIONS_PROPERTIES, jmsConnectionURL);
+ services.init();
+
+
+ JMSAccessorService jmsService = services.get(JMSAccessorService.class);
+ String jmsServerMapping = jmsService.getJMSServerMapping("hcat://axoniteblue-1.blue.server.com:8020");
+ // rules will be applied
+ assertEquals("java.naming.factory.initial#Dummy.Factory;java.naming.provider.url#tcp://broker.blue:61616", jmsServerMapping);
+
+ jmsServerMapping = jmsService.getJMSServerMapping("UNKNOWN_SERVER");
+ // will map to default
+ assertEquals("java.naming.factory.initial#org.apache.activemq.jndi.ActiveMQInitialContextFactory;java.naming.provider.url#vm://localhost?broker.persistent=false", jmsServerMapping);
+
+ jmsServerMapping = jmsService.getJMSServerMapping("hcat://xyz.corp.dummy.com");
+ assertEquals("java.naming.factory.initial#Dummy.Factory;java.naming.provider.url#tcp:localhost:61616", jmsServerMapping);
+ }
}
Modified: oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java?rev=1425174&r1=1425173&r2=1425174&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java (original)
+++ oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java Fri Dec 21 22:47:37 2012
@@ -18,9 +18,11 @@
package org.apache.oozie.service;
import java.net.URISyntaxException;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import org.apache.oozie.ErrorCode;
import org.apache.oozie.service.PartitionDependencyManagerService;
import org.apache.oozie.service.Services;
import org.apache.oozie.test.XDataTestCase;
@@ -38,17 +40,18 @@ import org.junit.Test;
*/
public class TestPartitionDependencyManagerService extends XDataTestCase {
+ private Services services;
@Before
protected void setUp() throws Exception {
super.setUp();
setSystemProperty(PartitionDependencyManagerService.MAP_MAX_WEIGHTED_CAPACITY, "100");
- Services services = super.setupServicesForHCatalog();
+ services = super.setupServicesForHCatalog();
services.init();
}
@After
protected void tearDown() throws Exception {
- Services.get().destroy();
+ services.destroy();
super.tearDown();
}
@@ -73,14 +76,15 @@ public class TestPartitionDependencyMana
*/
@Test
public void testAddMissingPartition() throws MetadataServiceException, URISyntaxException {
- Services services = Services.get();
PartitionDependencyManagerService pdms = services.get(PartitionDependencyManagerService.class);
String newHCatDependency = "hcat://hcat.server.com:5080/mydb/clicks/datastamp=12®ion=us";
+ JMSAccessorService jmsService = services.get(JMSAccessorService.class);
+ jmsService.getOrCreateConnection("hcat://hcat.server.com:5080");
String actionId = "myAction";
pdms.addMissingPartition(newHCatDependency, actionId);
HCatURI hcatUri = new HCatURI(newHCatDependency);
- Map<String, PartitionsGroup> tablePartitionsMap = pdms.getHCatMap().get(hcatUri.getServerEndPoint() + "#" +
+ Map<String, PartitionsGroup> tablePartitionsMap = pdms.getHCatMap().get(hcatUri.getServer() + "#" +
hcatUri.getDb()); // clicks
assertNotNull(tablePartitionsMap);
assertTrue(tablePartitionsMap.containsKey("clicks"));
@@ -105,11 +109,13 @@ public class TestPartitionDependencyMana
Services services = Services.get();
PartitionDependencyManagerService pdms = services.get(PartitionDependencyManagerService.class);
String newHCatDependency = "hcat://hcat.server.com:5080/mydb/clicks/datastamp=12®ion=us";
+ JMSAccessorService jmsService = services.get(JMSAccessorService.class);
+ jmsService.getOrCreateConnection("hcat://hcat.server.com:5080");
String actionId = "myAction";
pdms.addMissingPartition(newHCatDependency, actionId);
HCatURI hcatUri = new HCatURI(newHCatDependency);
- Map<String, PartitionsGroup> tablePartitionsMap = pdms.getHCatMap().get(hcatUri.getServerEndPoint() + "#" +
+ Map<String, PartitionsGroup> tablePartitionsMap = pdms.getHCatMap().get(hcatUri.getServer() + "#" +
hcatUri.getDb()); // clicks
assertNotNull(tablePartitionsMap);
assertTrue(tablePartitionsMap.containsKey("clicks"));
@@ -139,11 +145,13 @@ public class TestPartitionDependencyMana
Services services = Services.get();
PartitionDependencyManagerService pdms = services.get(PartitionDependencyManagerService.class);
String newHCatDependency = "hcat://hcat.server.com:5080/mydb/clicks/datastamp=12®ion=us";
+ JMSAccessorService jmsService = services.get(JMSAccessorService.class);
+ jmsService.getOrCreateConnection("hcat://hcat.server.com:5080");
String actionId = "myAction";
pdms.addMissingPartition(newHCatDependency, actionId);
HCatURI hcatUri = new HCatURI(newHCatDependency);
- Map<String, PartitionsGroup> tablePartitionsMap = pdms.getHCatMap().get(hcatUri.getServerEndPoint() + "#" +
+ Map<String, PartitionsGroup> tablePartitionsMap = pdms.getHCatMap().get(hcatUri.getServer() + "#" +
hcatUri.getDb()); // clicks
assertNotNull(tablePartitionsMap);
assertTrue(tablePartitionsMap.containsKey("clicks"));
@@ -171,6 +179,8 @@ public class TestPartitionDependencyMana
PartitionDependencyManagerService pdms = services.get(PartitionDependencyManagerService.class);
String newHCatDependency1 = "hcat://hcat.server.com:5080/mydb/clicks/datastamp=12";
String newHCatDependency2 = "hcat://hcat.server.com:5080/mydb/clicks/datastamp=12®ion=us";
+ JMSAccessorService jmsService = services.get(JMSAccessorService.class);
+ jmsService.getOrCreateConnection("hcat://hcat.server.com:5080");
String actionId1 = "1";
String actionId2 = "2";
pdms.addMissingPartition(newHCatDependency1, actionId1);
@@ -179,7 +189,7 @@ public class TestPartitionDependencyMana
pdms.removeActionFromMissingPartitions(newHCatDependency2, actionId2);
HCatURI hcatUri = new HCatURI(newHCatDependency1);
- String prefix = PartitionWrapper.makePrefix(hcatUri.getServerEndPoint(), hcatUri.getDb());
+ String prefix = PartitionWrapper.makePrefix(hcatUri.getServer(), hcatUri.getDb());
Map<String, PartitionsGroup> tablePartitionsMap = pdms.getHCatMap().get(prefix);
PartitionsGroup missingPartitions = tablePartitionsMap.get(hcatUri.getTable());
assertNotNull(missingPartitions);
@@ -190,4 +200,54 @@ public class TestPartitionDependencyMana
assertFalse(actions.getActions().contains(actionId2));
}
+
+
+ /**
+ * Test removal of partitions from Available map
+ */
+ @Test
+ public void testRemovePartitionsFromAvailMap() {
+ try {
+ Services services = Services.get();
+ PartitionDependencyManagerService pdms = services.get(PartitionDependencyManagerService.class);
+ String newHCatDependency1 = "hcat://hcat.server.com:5080/mydb/clicks/datastamp=12";
+ String newHCatDependency2 = "hcat://hcat.server.com:5080/mydb/clicks/datastamp=12®ion=us";
+ String newHCatDependency3 = "hcat://hcat.server.com:5080/mydb/clicks/datastamp=13®ion=us";
+ String actionId1 = "1";
+
+ HCatURI uri = new HCatURI(newHCatDependency1);
+ PartitionWrapper partition1 = new PartitionWrapper(uri.getServer(), uri.getDb(), uri.getTable(),
+ uri.getPartitionMap());
+ uri = new HCatURI(newHCatDependency2);
+ PartitionWrapper partition2 = new PartitionWrapper(uri.getServer(), uri.getDb(), uri.getTable(),
+ uri.getPartitionMap());
+ uri = new HCatURI(newHCatDependency3);
+ PartitionWrapper partition3 = new PartitionWrapper(uri.getServer(), uri.getDb(), uri.getTable(),
+ uri.getPartitionMap());
+
+ List<PartitionWrapper> partitionList = new ArrayList<PartitionWrapper>();
+ partitionList.add(partition1);
+ partitionList.add(partition2);
+ partitionList.add(partition3);
+
+ Map<String, List<PartitionWrapper>> availMap = pdms.getAvailableMap();
+ // Add 3 partitions to availMap for a given action
+ availMap.put(actionId1, partitionList);
+
+ List<PartitionWrapper> availPartitionList = new ArrayList<PartitionWrapper>();
+ availPartitionList.add(partition1);
+ availPartitionList.add(partition2);
+ // add two partitions to be removed
+ assertTrue(pdms.removeAvailablePartitions(availPartitionList, actionId1));
+ // check if the size of avail map reduces to 1
+ assertEquals(1, pdms.getAvailableMap().get(actionId1).size());
+
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail("Unexpected exception " + e);
+ }
+ }
+
+
}
Modified: oozie/branches/hcat-intre/release-log.txt
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/release-log.txt?rev=1425174&r1=1425173&r2=1425174&view=diff
==============================================================================
--- oozie/branches/hcat-intre/release-log.txt (original)
+++ oozie/branches/hcat-intre/release-log.txt Fri Dec 21 22:47:37 2012
@@ -1,5 +1,6 @@
-- Oozie 3.4.0 release (trunk - unreleased)
+OOZIE-1138 Provide rule based mechanism to allow multiple hcatalog servers to connect to JMS server (virag)
OOZIE-1124 Split pig unit tests to a separate module (rohini via virag)
OOZIE-1111 change HCatURI to specify partitions in path instead of query parameter (rohini,ryota via virag)
OOZIE-1108 Fix JMS message consumer to maintain single session per topic registration (mona)