You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by vi...@apache.org on 2012/12/21 23:47:37 UTC

svn commit: r1425174 - in /oozie/branches/hcat-intre: ./ core/src/main/java/org/apache/oozie/ core/src/main/java/org/apache/oozie/command/coord/ core/src/main/java/org/apache/oozie/dependency/ core/src/main/java/org/apache/oozie/jms/ core/src/main/java...

Author: virag
Date: Fri Dec 21 22:47:37 2012
New Revision: 1425174

URL: http://svn.apache.org/viewvc?rev=1425174&view=rev
Log:
OOZIE-1138 Provide rule based mechanism to allow multiple hcatalog servers to connect to JMS server (virag)

Added:
    oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/MappingRule.java
Modified:
    oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/ErrorCode.java
    oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdatePushMissingDependency.java
    oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java
    oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
    oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
    oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/FSURIHandler.java
    oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java
    oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/URIHandler.java
    oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/HCatMessageHandler.java
    oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/JMSAccessorService.java
    oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java
    oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/PartitionWrapper.java
    oozie/branches/hcat-intre/core/src/main/resources/oozie-default.xml
    oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java
    oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java
    oozie/branches/hcat-intre/release-log.txt

Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/ErrorCode.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/ErrorCode.java?rev=1425174&r1=1425173&r2=1425174&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/ErrorCode.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/ErrorCode.java Fri Dec 21 22:47:37 2012
@@ -232,7 +232,7 @@ public enum ErrorCode {
 
     E1501(XLog.STD, "Partition Dependency Manager could not add cache entry"),
     E1502(XLog.STD, "Partition cache lookup error"),
-    E1503(XLog.STD, "Error in Metadata URI [{0}]"),
+    E1503(XLog.STD, "Error in Metadata URI [{0}]"),  //TODO - Error code not used, replace this
     E1504(XLog.STD, "Error in getting HCat Access [{0}]"),
     E1505(XLog.STD, "Error with JMS Message, Details: [{0}]"),
     E1506(XLog.STD, "Error in creating connection or message listener, Details: [{0}]"),

Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdatePushMissingDependency.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdatePushMissingDependency.java?rev=1425174&r1=1425173&r2=1425174&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdatePushMissingDependency.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdatePushMissingDependency.java Fri Dec 21 22:47:37 2012
@@ -16,6 +16,7 @@ import org.apache.oozie.executor.jpa.Coo
 import org.apache.oozie.executor.jpa.CoordActionUpdatePushInputCheckJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.MetadataServiceException;
 import org.apache.oozie.service.PartitionDependencyManagerService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.util.LogUtils;
@@ -51,12 +52,15 @@ public class CoordActionUpdatePushMissin
         LOG.debug("Updating action Id " + actionId + " for available partition of " + availPartitionList.toString()
                 + "missing parts :" + missPartitions);
         String newMissPartitions = removePartitions(missPartitions, availPartitionList);
+        LOG.trace("In CoordActionUpdatePushMissingDependency: New missing partitions " +newMissPartitions);
+        // TODO - Check if new miss partitions are updated, then only persist, do the same in CoordInputCheck
         coordAction.setPushMissingDependencies(newMissPartitions);
         String otherDeps = coordAction.getMissingDependencies();
         if ((newMissPartitions == null || newMissPartitions.trim().length() == 0)
                 && (otherDeps == null || otherDeps.trim().length() == 0)) {
             coordAction.setStatus(CoordinatorAction.Status.READY);
             // pass jobID to the CoordActionReadyXCommand
+            LOG.trace("Queuing READY for "+actionId);
             queue(new CoordActionReadyXCommand(coordAction.getJobId()), 100);
         }
         else {
@@ -66,18 +70,19 @@ public class CoordActionUpdatePushMissin
         if (jpaService != null) {
             try {
                 jpaService.execute(new CoordActionUpdatePushInputCheckJPAExecutor(coordAction));
+                // remove available partitions for the action as the push dependencies are persisted
+                if (pdms.removeAvailablePartitions(availPartitionList, actionId)) {
+                    LOG.debug("Succesfully removed partitions for actionId: [{0}] from available Map ", actionId);
+                }
+                else {
+                    LOG.warn("Unable to remove partitions for actionId: [{0}] from available Map ", actionId);
+                }
             }
             catch (JPAExecutorException jex) {
                 throw new CommandException(ErrorCode.E1023, jex.getMessage(), jex);
             }
-            finally {
-                // remove from Available map as it is being persisted
-                if (pdms.removeActionFromAvailPartitions(actionId)) {
-                    LOG.debug("Succesfully removed actionId: [{0}] from available Map ", actionId);
-                }
-                else {
-                    LOG.warn("Unable to remove actionId: [{0}] from available Map ", actionId);
-                }
+            catch (MetadataServiceException e) {
+                throw new CommandException(ErrorCode.E0902, e.getMessage(), e);
             }
         }
         LOG.info("ENDED for Action id [{0}]", actionId);
@@ -145,6 +150,14 @@ public class CoordActionUpdatePushMissin
         return actionId;
     }
 
+    /* (non-Javadoc)
+     * @see org.apache.oozie.command.XCommand#getKey()
+     */
+    @Override
+    public String getKey(){
+        return getName() + "_" + actionId;
+    }
+
     @Override
     protected void loadState() throws CommandException {
         try {

Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java?rev=1425174&r1=1425173&r2=1425174&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java Fri Dec 21 22:47:37 2012
@@ -37,6 +37,7 @@ import org.apache.oozie.coord.SyncCoordA
 import org.apache.oozie.coord.TimeUnit;
 import org.apache.oozie.dependency.DependencyType;
 import org.apache.oozie.dependency.URIHandler;
+import org.apache.oozie.service.MetadataServiceException;
 import org.apache.oozie.service.PartitionDependencyManagerService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.service.URIHandlerService;
@@ -283,28 +284,29 @@ public class CoordCommandUtils {
      *
      * @param event
      * @param instances
-     * @param dependencyList
      * @throws Exception
      */
-    public static void separateResolvedAndUnresolved(Element event, StringBuilder instances, StringBuffer dependencyList)
+    public static StringBuffer separateResolvedAndUnresolved(Element event, StringBuilder instances)
             throws Exception {
         StringBuilder unresolvedInstances = new StringBuilder();
         StringBuilder urisWithDoneFlag = new StringBuilder();
+        StringBuffer depList = new StringBuffer();
         String uris = createEarlyURIs(event, instances.toString(), unresolvedInstances, urisWithDoneFlag);
         if (uris.length() > 0) {
             Element uriInstance = new Element("uris", event.getNamespace());
             uriInstance.addContent(uris);
             event.getContent().add(1, uriInstance);
-            if (dependencyList.length() > 0) {
-                dependencyList.append(CoordELFunctions.INSTANCE_SEPARATOR);
+            if (depList.length() > 0) {
+                depList.append(CoordELFunctions.INSTANCE_SEPARATOR);
             }
-            dependencyList.append(urisWithDoneFlag);
+            depList.append(urisWithDoneFlag);
         }
         if (unresolvedInstances.length() > 0) {
             Element elemInstance = new Element(UNRESOLVED_INST_TAG, event.getNamespace());
             elemInstance.addContent(unresolvedInstances.toString());
             event.getContent().add(1, elemInstance);
         }
+        return depList;
     }
 
     /**
@@ -350,8 +352,9 @@ public class CoordCommandUtils {
 
             String uriPath = CoordELFunctions.evalAndWrap(eval, event.getChild("dataset", event.getNamespace())
                     .getChild("uri-template", event.getNamespace()).getTextTrim());
-            uris.append(uriPath);
             URIHandler uriHandler = uriService.getURIHandler(uriPath);
+            uriHandler.validate(uriPath);
+            uris.append(uriPath);
             urisWithDoneFlag.append(uriHandler.getURIWithDoneFlag(uriPath, doneFlagElement));
         }
         return uris.toString();
@@ -525,11 +528,6 @@ public class CoordCommandUtils {
                     .getChild("uri-template", event.getNamespace());
             String pullOrPush = "pull";
             String uriTemplate = uri.getText();
-            URI baseURI = uriService.getAuthorityWithScheme(uriTemplate);
-            URIHandler handler = uriService.getURIHandler(baseURI);
-            if (uriTemplate != null && handler.getDependencyType(baseURI).equals(DependencyType.PUSH)) {
-                pullOrPush = "push";
-            }
             StringBuilder instances = new StringBuilder();
             ELEvaluator eval = CoordELEvaluator.createInstancesELEvaluator(event, appInst, conf);
             // Handle list of instance tag
@@ -537,7 +535,13 @@ public class CoordCommandUtils {
             // Handle start-instance and end-instance
             resolveInstanceRange(event, instances, appInst, conf, eval);
             // Separate out the unresolved instances
-            separateResolvedAndUnresolved(event, instances, dependencyList.get(pullOrPush));
+            StringBuffer depList = separateResolvedAndUnresolved(event, instances);
+            URI baseURI = uriService.getAuthorityWithScheme(uriTemplate);
+            URIHandler handler = uriService.getURIHandler(baseURI);
+            if (handler.getDependencyType(baseURI).equals(DependencyType.PUSH)) {
+                pullOrPush = "push";
+            }
+            dependencyList.put(pullOrPush, depList);
             String tmpUnresolved = event.getChildTextTrim(UNRESOLVED_INST_TAG, event.getNamespace());
             if (tmpUnresolved != null) {
                 if (unresolvedList.get(pullOrPush).length() > 0) {
@@ -584,7 +588,7 @@ public class CoordCommandUtils {
      * @param actionBean
      * @throws Exception
      */
-    public static void registerPartition(CoordinatorActionBean actionBean) throws Exception {
+    public static void registerPartition(CoordinatorActionBean actionBean) throws MetadataServiceException {
 
         String resolved = getResolvedList(actionBean.getPushMissingDependencies(), new StringBuilder(),
                 new StringBuilder());

Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java?rev=1425174&r1=1425173&r2=1425174&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java Fri Dec 21 22:47:37 2012
@@ -32,6 +32,7 @@ import org.apache.oozie.SLAEventBean;
 import org.apache.oozie.client.CoordinatorJob;
 import org.apache.oozie.client.Job;
 import org.apache.oozie.client.SLAEvent.SlaAppType;
+import org.apache.oozie.client.rest.JsonBean;
 import org.apache.oozie.command.CommandException;
 import org.apache.oozie.command.MaterializeTransitionXCommand;
 import org.apache.oozie.command.PreconditionException;
@@ -42,6 +43,7 @@ import org.apache.oozie.executor.jpa.Coo
 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.MetadataServiceException;
 import org.apache.oozie.service.Service;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.util.DateUtils;
@@ -106,10 +108,21 @@ public class CoordMaterializeTransitionX
     public void performWrites() throws CommandException {
         try {
             jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
+            // register the partition related dependencies of actions
+            for (JsonBean actionBean : insertList) {
+                if (actionBean instanceof CoordinatorActionBean) {
+                    CoordinatorActionBean coordAction = (CoordinatorActionBean) actionBean;
+                    CoordCommandUtils.registerPartition(coordAction);
+                    queue(new CoordPushDependencyCheckXCommand(coordAction.getId()));
+                }
+            }
         }
         catch (JPAExecutorException jex) {
             throw new CommandException(jex);
         }
+        catch (MetadataServiceException ex) {
+            LOG.warn("Error happened in registering partitions ", ex);
+        }
     }
 
     /* (non-Javadoc)
@@ -329,8 +342,7 @@ public class CoordMaterializeTransitionX
 
             if (!dryrun) {
                 storeToDB(actionBean, action); // Storing to table
-                CoordCommandUtils.registerPartition(actionBean); // Register partition to PDMS
-                queue(new CoordPushDependencyCheckXCommand(actionBean.getId()));
+
             }
             else {
                 actionStrings.append("action for new instance");

Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java?rev=1425174&r1=1425173&r2=1425174&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java Fri Dec 21 22:47:37 2012
@@ -143,10 +143,19 @@ public class CoordPushDependencyCheckXCo
      * @see org.apache.oozie.command.XCommand#getEntityKey()
      */
     @Override
+    // TODO - Check whether the entityKey should be JobId or actionId
     public String getEntityKey() {
         return actionId;
     }
 
+    /* (non-Javadoc)
+     * @see org.apache.oozie.command.XCommand#getKey()
+     */
+    @Override
+    public String getKey(){
+        return getName() + "_" + actionId;
+    }
+
     /*
      * (non-Javadoc)
      *

Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/FSURIHandler.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/FSURIHandler.java?rev=1425174&r1=1425173&r2=1425174&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/FSURIHandler.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/FSURIHandler.java Fri Dec 21 22:47:37 2012
@@ -120,6 +120,10 @@ public class FSURIHandler extends URIHan
     }
 
     @Override
+    public void validate(String uri) throws URIAccessorException {
+    }
+
+    @Override
     public void destroy() {
 
     }

Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java?rev=1425174&r1=1425173&r2=1425174&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java Fri Dec 21 22:47:37 2012
@@ -32,8 +32,11 @@ import org.apache.hcatalog.api.HCatTable
 import org.apache.hcatalog.common.HCatException;
 import org.apache.hcatalog.data.schema.HCatFieldSchema;
 import org.apache.oozie.ErrorCode;
+import org.apache.oozie.service.JMSAccessorService;
 import org.apache.oozie.service.MetaDataAccessorException;
 import org.apache.oozie.service.MetaDataAccessorService;
+import org.apache.oozie.service.MetadataServiceException;
+import org.apache.oozie.service.PartitionDependencyManagerService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.service.URIAccessorException;
 import org.apache.oozie.util.HCatURI;
@@ -61,10 +64,10 @@ public class HCatURIHandler extends URIH
 
     @Override
     public DependencyType getDependencyType(URI uri) throws URIAccessorException {
-        // TODO Determine if jms server is configured for the hcat server and
-        // return poll or notify
-        return DependencyType.PUSH;
-    }
+        JMSAccessorService service = Services.get().get(JMSAccessorService.class);
+        return service.getOrCreateConnection(uri.getScheme() + "://" + uri.getAuthority()) ? DependencyType.PUSH
+                : DependencyType.PULL;
+ }
 
     @Override
     public void registerForNotification(URI uri, String actionID) throws URIAccessorException {
@@ -112,6 +115,17 @@ public class HCatURIHandler extends URIH
     }
 
     @Override
+    public void validate (String uri) throws URIAccessorException{
+        try {
+            new HCatURI(uri);  //will fail if uri syntax is incorrect
+        }
+        catch (URISyntaxException e) {
+            throw new URIAccessorException(ErrorCode.E1025, uri, e);
+        }
+
+    }
+
+    @Override
     public void destroy() {
 
     }

Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/URIHandler.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/URIHandler.java?rev=1425174&r1=1425173&r2=1425174&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/URIHandler.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/URIHandler.java Fri Dec 21 22:47:37 2012
@@ -146,5 +146,14 @@ public abstract class URIHandler {
      */
     public abstract String getURIWithDoneFlag(String uri, String doneFlag) throws URIAccessorException;
 
+    /**
+     * Check whether the URI is valid or not
+     * @param uri
+     * @return
+     * @throws URIAccessorException
+     */
+    public abstract void validate(String uri) throws URIAccessorException;
+
     public abstract void destroy();
+
 }

Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/HCatMessageHandler.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/HCatMessageHandler.java?rev=1425174&r1=1425173&r2=1425174&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/HCatMessageHandler.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/HCatMessageHandler.java Fri Dec 21 22:47:37 2012
@@ -36,6 +36,7 @@ public class HCatMessageHandler implemen
 
     private PartitionWrapper msgPartition;
     private static XLog log;
+    public static final String THRIFT_SCHEME = "thrift";
 
     public HCatMessageHandler() {
         log = XLog.getLog(getClass());
@@ -56,9 +57,11 @@ public class HCatMessageHandler implemen
                 // Parse msg components
                 AddPartitionMessage partMsg = (AddPartitionMessage) hcatMsg;
                 String server = partMsg.getServer();
+                int index = server.indexOf("://");
+                server = server.substring(index + 3);
                 String db = partMsg.getDB();
                 String table = partMsg.getTable();
-                log.info("ADD event type db [{0}]  table [{1}] partitions [{3}]", db, table, partMsg.getPartitions());
+                log.info("ADD event type db [{0}]  table [{1}] partitions [{2}]", db, table, partMsg.getPartitions());
                 PartitionDependencyManagerService pdms = Services.get().get(PartitionDependencyManagerService.class);
                 if (pdms != null) {
                     // message is batched. therefore iterate through partitions
@@ -93,6 +96,7 @@ public class HCatMessageHandler implemen
         catch (IllegalArgumentException iae) {
             throw new MetadataServiceException(ErrorCode.E1505, iae);
         }
+
     }
 
 }

Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/JMSAccessorService.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/JMSAccessorService.java?rev=1425174&r1=1425173&r2=1425174&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/JMSAccessorService.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/JMSAccessorService.java Fri Dec 21 22:47:37 2012
@@ -17,7 +17,9 @@
  */
 package org.apache.oozie.service;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
@@ -37,6 +39,7 @@ import javax.naming.NamingException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.jms.MessageReceiver;
+import org.apache.oozie.util.MappingRule;
 import org.apache.oozie.util.XLog;
 
 /**
@@ -54,9 +57,12 @@ public class JMSAccessorService implemen
     public static final String JMS_CONNECTIONS_PROPERTIES = CONF_PREFIX + "connections";
     public static final String SESSION_OPTS = CONF_PREFIX + "jms.sessionOpts";
     public static final String DEFAULT_SERVER_ENDPOINT = "default";
+    private String defaultConnection = null;
+
 
     private static XLog LOG;
     private Configuration conf;
+    private List<MappingRule> mappingRules = null;
     ConcurrentHashMap<String, ConnectionContext> connSessionMap = new ConcurrentHashMap<String, ConnectionContext>();
     HashMap<String, Properties> hmConnProps = new HashMap<String, Properties>();
 
@@ -64,11 +70,106 @@ public class JMSAccessorService implemen
     public void init(Services services) throws ServiceException {
         LOG = XLog.getLog(getClass());
         conf = services.getConf();
-        parseConfiguration(conf);
-        establishConnections();
+        initializeMappingRules();
+    }
+
+
+    protected void initializeMappingRules() {
+        String connectionString = conf.get(JMS_CONNECTIONS_PROPERTIES);
+        if (connectionString != null) {
+            String[] connections = connectionString.split("\\s*,\\s*");
+            mappingRules = new ArrayList<MappingRule>(connections.length);
+            for (String connection : connections) {
+                String[] values = connection.split("=", 2);
+                String key = values[0].replaceAll("^\\s+|\\s+$", "");
+                String value = values[1].replaceAll("^\\s+|\\s+$", "");
+                if (key.equals("default")) {
+                    defaultConnection = value;
+                }
+                else {
+                    mappingRules.add(new MappingRule(key, value));
+                }
+            }
+        }
+        else {
+            LOG.warn("No JMS connection defined");
+        }
     }
 
     /**
+     * Checks if the connection exists or not. If it doens't exists, creates a new one.
+     * Returns false if the connection cannot be established
+     * @param serverName
+     * @return
+     */
+    public boolean getOrCreateConnection(String serverName) {
+        if (!isExistsConnection(serverName)) {
+            // Get JNDI properties related to the JMS server name
+            Properties props = getJMSServerProps(serverName);
+            if (props != null) {
+                Connection conn = null;
+                try {
+                    conn = getConnection(props);
+                }
+                catch (ServiceException se) {
+                    LOG.warn("Could not create connection " + se.getErrorCode() + se.getMessage());
+                    return false;
+                }
+                LOG.info("Connection established to JMS Server for " + serverName);
+                connSessionMap.put(serverName, new ConnectionContext(conn));
+                return true;
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Checks whether connection to JMS server already exists
+     * @param serverName
+     * @return
+     */
+    public boolean isExistsConnection(String serverName) {
+        if (connSessionMap.containsKey(serverName)) {
+            LOG.info("Connection exists to JMS Server for " + serverName);
+            return true; // connection already exists
+        }
+        else {
+            return false;
+        }
+    }
+
+
+    protected Properties getJMSServerProps(String serverName) {
+        Properties props = null;
+        if (hmConnProps.containsKey(serverName)) {
+            props = hmConnProps.get(serverName);
+            return props;
+        }
+        String jmsServerMapping = getJMSServerMapping(serverName);
+        LOG.trace("\n JMS Server Mapping for server "+ serverName + "is " + jmsServerMapping);
+        if (jmsServerMapping == null) {
+            return null;
+        }
+        else {
+            props = getJMSPropsFromConf(jmsServerMapping);
+            if (props != null) {
+                hmConnProps.put(serverName, props);
+            }
+            return props;
+        }
+    }
+
+    protected String getJMSServerMapping(String serverName) {
+        for (MappingRule mr : mappingRules) {
+            String jmsServerMapping = mr.applyRule(serverName);
+            if (jmsServerMapping != null) {
+                return jmsServerMapping;
+            }
+        }
+        return defaultConnection;
+
+    }
+    /**
      * Returns Consumer object for specific service end point and topic name
      *
      * @param endPoint : Service end-point (preferably HCatalog server address)
@@ -157,49 +258,37 @@ public class JMSAccessorService implemen
         return;
     }
 
-    private void establishConnections() throws ServiceException {
-        for (String key : hmConnProps.keySet()) {
-            connSessionMap.put(key, new ConnectionContext(getConnection(hmConnProps.get(key))));
-        }
-    }
-
-    private void parseConfiguration(Configuration conf) {
-        String[] keyVals = conf.getStrings(JMS_CONNECTIONS_PROPERTIES, "");
-        for (String kVal : keyVals) {
-            LOG.info("Key=value " + kVal);
-            if (kVal.trim().length() > 0) {
-                addToHM(kVal);
-            }
-        }
-    }
 
-    private void addToHM(String kVal) {
-        int pos = kVal.indexOf("=");
+    protected Properties getJMSPropsFromConf(String kVal) {
         Properties props = new Properties();
-        if (pos > 0) {
-            String val = kVal.substring(pos + 1);
-            String[] propArr = val.split(";");
-            for (String pair : propArr) {
-                String[] kV = pair.split("#");
-                if (kV.length > 1) {
-                    props.put(kV[0].trim(), kV[1].trim());
-                }
-                else {
-                    LOG.info("Unformatted properties. Expected key#value : " + pair);
-                }
+        String[] propArr = kVal.split(";");
+        for (String pair : propArr) {
+            String[] kV = pair.split("#");
+            if (kV.length > 1) {
+                props.put(kV[0].trim(), kV[1].trim());
+            }
+            else {
+                LOG.info("Unformatted properties. Expected key#value : " + pair);
             }
-            String key = kVal.substring(0, pos);
-            LOG.info(key + ": Adding " + props);
-            hmConnProps.put(key.trim(), props);
         }
-        else {
-            LOG.info("Unformatted properties. Expected two parts : " + kVal);
+        if (props.isEmpty()) {
+            return null;
         }
+        return props;
     }
 
     @Override
     public void destroy() {
         // TODO Remove topic sessions based on no demand
+        LOG.info("Destroying JMSAccessor service ");
+        for (Entry<String, ConnectionContext> entry : connSessionMap.entrySet()) {
+            try {
+                entry.getValue().getConnection().close();
+            }
+            catch (JMSException e) {
+                LOG.warn("Unable to close the connection for " + entry.getKey(), e);
+            }
+        }
 
     }
 
@@ -211,9 +300,9 @@ public class JMSAccessorService implemen
     /*
      * Look up connection factory Create connection
      */
-    private Connection getConnection(Properties props) throws ServiceException {
+    protected synchronized Connection getConnection(Properties props) throws ServiceException {
 
-        Connection conn;
+        Connection conn = null;
         try {
             Context jndiContext = getJndiContext(props);
             String connFacName = (String) jndiContext.getEnvironment().get(JMS_CONNECTION_FACTORY);
@@ -232,11 +321,16 @@ public class JMSAccessorService implemen
             });
 
         }
-        catch (NamingException e) {
-            throw new ServiceException(ErrorCode.E0100, getClass().getName(), e.getMessage(), e);
-        }
-        catch (JMSException e) {
-            throw new ServiceException(ErrorCode.E0100, getClass().getName(), e.getMessage(), e);
+        catch (Exception e1){
+            LOG.error(e1.getMessage(), e1);
+            if (conn != null) {
+                try {
+                    conn.close();
+                }
+                catch (Exception e2) {
+                    LOG.error(e2.getMessage(), e2);
+                }
+            }
         }
         return conn;
     }
@@ -304,18 +398,6 @@ public class JMSAccessorService implemen
         }
     }
 
-    @Override
-    public void finalize() {
-        LOG.info("Finalizing ");
-        for (Entry<String, ConnectionContext> entry : connSessionMap.entrySet()) {
-            try {
-                entry.getValue().getConnection().close();
-            }
-            catch (JMSException e) {
-                LOG.warn("Unable to close the connection for " + entry.getKey(), e);
-            }
-        }
-    }
 
     /**
      * This class maintains a JMS connection and map of topic to Session. Only

Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java?rev=1425174&r1=1425173&r2=1425174&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java Fri Dec 21 22:47:37 2012
@@ -34,16 +34,14 @@ import org.apache.oozie.ErrorCode;
 import org.apache.oozie.command.coord.CoordActionUpdatePushMissingDependency;
 import org.apache.oozie.jms.HCatMessageHandler;
 import org.apache.oozie.jms.MessageReceiver;
-import org.apache.oozie.service.Service;
-import org.apache.oozie.service.ServiceException;
-import org.apache.oozie.service.Services;
+import org.apache.oozie.util.HCatURI;
 import org.apache.oozie.util.PartitionWrapper;
 import org.apache.oozie.util.PartitionsGroup;
 import org.apache.oozie.util.WaitingActions;
-import org.apache.oozie.util.HCatURI;
 import org.apache.oozie.util.XCallable;
 import org.apache.oozie.util.XLog;
 
+
 /**
  * Module that functions like a caching service to maintain partition dependency
  * mappings
@@ -119,21 +117,6 @@ public class PartitionDependencyManagerS
     }
 
     /**
-     * Remove en entry from available Map
-     *
-     * @param actionId
-     * @return true if the entry exists , otherwise false
-     */
-    public boolean removeActionFromAvailPartitions(String actionId) {
-        boolean ret = false;
-        if (availMap.containsKey(actionId)) {
-            availMap.remove(actionId);
-            ret = true;
-        }
-        return ret;
-    }
-
-    /**
      * Remove an action from missing partition map
      *
      * @param hcatURI
@@ -150,7 +133,7 @@ public class PartitionDependencyManagerS
         catch (URISyntaxException e) {
             throw new MetadataServiceException(ErrorCode.E1025, e.getMessage());
         }
-        PartitionWrapper partition = new PartitionWrapper(uri.getServerEndPoint(), uri.getDb(), uri.getTable(),
+        PartitionWrapper partition = new PartitionWrapper(uri.getServer(), uri.getDb(), uri.getTable(),
                 uri.getPartitionMap());
         List<String> actions = _getActionsForPartition(partition);
         if (actions != null && actions.size() != 0) {
@@ -169,7 +152,7 @@ public class PartitionDependencyManagerS
      * @param actionId
      * @throws MetadataServiceException
      */
-    public void addMissingPartition(PartitionWrapper partition, String actionId) throws MetadataServiceException {
+    private void addMissingPartition(PartitionWrapper partition, String actionId) throws MetadataServiceException {
         String prefix = PartitionWrapper.makePrefix(partition.getServerName(), partition.getDbName());
         Map<String, PartitionsGroup> tablePartitionsMap;
         String tableName = partition.getTableName();
@@ -187,7 +170,7 @@ public class PartitionDependencyManagerS
             else { // new partition from different hcat server/db
                 _addNewEntry(hcatInstanceMap, prefix, tableName, partition, actionId);
             }
-            _registerMessageReceiver(partition);
+
         }
         catch (ClassCastException e) {
             throw new MetadataServiceException(ErrorCode.E1501, e.getCause());
@@ -203,16 +186,16 @@ public class PartitionDependencyManagerS
         }
     }
 
-    private void _registerMessageReceiver(PartitionWrapper partition) throws MetadataServiceException {
+    private void _registerMessageReceiver(PartitionWrapper partition, String serverEndPoint) throws MetadataServiceException {
         String topic = _getTopic(partition);
         try {
-            MessageReceiver recvr = Services.get().get(JMSAccessorService.class).getTopicReceiver(topic);
+            MessageReceiver recvr = Services.get().get(JMSAccessorService.class).getTopicReceiver(serverEndPoint, topic);
             //Register new listener only if topic is new. Else do nothing
             if(recvr == null) {
                 //Registering new receiver
                 recvr = new MessageReceiver(new HCatMessageHandler());
-                log.debug("Registering to listen on topic :" + topic);
-                recvr.registerTopic(topic); //server-endpoint is obtained from partition
+                log.debug("Registering to listen on topic :" + topic + "for endpoint " + serverEndPoint);
+                recvr.registerTopic(serverEndPoint, topic); //server-endpoint is obtained from partition
             }
         }
         catch (JMSException e) {
@@ -254,11 +237,13 @@ public class PartitionDependencyManagerS
         catch (URISyntaxException e) {
             throw new MetadataServiceException(ErrorCode.E1025, e.getMessage());
         }
-        PartitionWrapper partition = new PartitionWrapper(uri.getServerEndPoint(), uri.getDb(), uri.getTable(),
+        PartitionWrapper partition = new PartitionWrapper(uri.getServer(), uri.getDb(), uri.getTable(),
                 uri.getPartitionMap());
         addMissingPartition(partition, actionId);
+        _registerMessageReceiver(partition, uri.getServerEndPoint());
     }
 
+
     public void addMissingPartitions(String[] hcatURIs, String actionId) throws MetadataServiceException {
         for (String uri : hcatURIs) {
             if (uri != null && uri.length() > 0) {
@@ -267,6 +252,32 @@ public class PartitionDependencyManagerS
         }
     }
 
+    /** Remove available partitions for an action
+     *
+     * @param partitions
+     * @param actionId
+     * @return
+     * @throws MetadataServiceException
+     */
+    public boolean removeAvailablePartitions(List<PartitionWrapper> partitions, String actionId)
+            throws MetadataServiceException {
+        List<PartitionWrapper> availList = null;
+        if (!availMap.containsKey(actionId)) {
+            return false;
+        }
+        else {
+            availList = availMap.get(actionId);
+        }
+        if (!availList.removeAll(partitions)) {
+            return false;
+        }
+        if (availList.isEmpty()) {
+            availMap.remove(actionId);
+        }
+        return true;
+    }
+
+
     /**
      * Remove partition entry specified by PartitionWrapper object and cascading
      * delete indicator
@@ -293,7 +304,8 @@ public class PartitionDependencyManagerS
                             if (tableMap.size() == 0) {
                                 hcatInstanceMap.remove(prefix);
                             }
-                            _deregisterMessageReceiver(partition);
+                            // TODO - do unregistering in a synchronized way
+                            //_deregisterMessageReceiver(partition);
                         }
                     }
                     return true;
@@ -329,7 +341,7 @@ public class PartitionDependencyManagerS
         catch (URISyntaxException e) {
             throw new MetadataServiceException(ErrorCode.E1025, e.getMessage());
         }
-        PartitionWrapper partition = new PartitionWrapper(uri.getServerEndPoint(), uri.getDb(), uri.getTable(),
+        PartitionWrapper partition = new PartitionWrapper(uri.getServer(), uri.getDb(), uri.getTable(),
                 uri.getPartitionMap());
         return removePartition(partition, cascade);
     }
@@ -350,7 +362,7 @@ public class PartitionDependencyManagerS
         catch (URISyntaxException e) {
             throw new MetadataServiceException(ErrorCode.E1025, e.getMessage());
         }
-        PartitionWrapper partition = new PartitionWrapper(uri.getServerEndPoint(), uri.getDb(), uri.getTable(),
+        PartitionWrapper partition = new PartitionWrapper(uri.getServer(), uri.getDb(), uri.getTable(),
                 uri.getPartitionMap());
         return removePartition(partition, true);
     }
@@ -408,7 +420,7 @@ public class PartitionDependencyManagerS
         catch (URISyntaxException e) {
             throw new MetadataServiceException(ErrorCode.E1025, e.getMessage());
         }
-        PartitionWrapper partition = new PartitionWrapper(uri.getServerEndPoint(), uri.getDb(), uri.getTable(),
+        PartitionWrapper partition = new PartitionWrapper(uri.getServer(), uri.getDb(), uri.getTable(),
                 uri.getPartitionMap());
         return partitionAvailable(partition);
     }
@@ -426,13 +438,14 @@ public class PartitionDependencyManagerS
             uri = new HCatURI(hcatURI);
         }
         catch (URISyntaxException e) {
-            throw new MetadataServiceException(ErrorCode.E1503, e.getMessage());
+            throw new MetadataServiceException(ErrorCode.E1025, e.getMessage());
         }
-        PartitionWrapper partition = new PartitionWrapper(uri.getServerEndPoint(), uri.getDb(), uri.getTable(),
+        PartitionWrapper partition = new PartitionWrapper(uri.getServer(), uri.getDb(), uri.getTable(),
                 uri.getPartitionMap());
         return containsPartition(partition);
     }
 
+
     /**
      * Determine if a partition entry exists in cache
      *

Added: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/MappingRule.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/MappingRule.java?rev=1425174&view=auto
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/MappingRule.java (added)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/MappingRule.java Fri Dec 21 22:47:37 2012
@@ -0,0 +1,74 @@
+package org.apache.oozie.util;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Class for rule mapping
+ */
+public class MappingRule {
+
+    private static Pattern variableNamePattern = Pattern.compile("\\$\\{[0-9]\\}");
+    private Pattern fromPattern;
+    private String fromString;
+    private String toString;
+    private boolean patternMatch;
+
+    /**
+     * Maps from source rule to destination rule
+     * @param fromRule - Rule for which input needs to be matched
+     * @param toRule - Rule for value to be returned
+     */
+    public MappingRule(String fromRule, String toRule) {
+        if (fromRule.contains("$")) {
+            patternMatch = true;
+            fromRule = fromRule.replaceAll("\\.", "\\\\.");
+            Matcher match = variableNamePattern.matcher(fromRule);
+            fromRule = match.replaceAll("(.*)");
+            fromPattern = Pattern.compile(fromRule);
+        }
+        else {
+            fromString = fromRule;
+        }
+        toString = toRule;
+    }
+
+    /**
+     * Gets the from rule
+     * @return
+     */
+    public String getFromRule() {
+        return fromString;
+    }
+
+    /**
+     * Gets the to rule
+     * @return
+     */
+    public String getToRule() {
+        return toString;
+    }
+
+    /**
+     * Applies rules based on the input
+     * @param input
+     * @return
+     */
+    public String applyRule(String input) {
+        if (patternMatch) {
+            Matcher match = fromPattern.matcher(input);
+            if (match.matches()) {
+                String result = toString;
+                int count = match.groupCount();
+                for (int i = 1; i <= count; i++) {
+                    result = result.replace("${" + (i) + "}", match.group(i));
+                }
+                return result;
+            }
+        }
+        else if (input.equals(fromString)) {
+            return toString;
+        }
+        return null;
+    }
+}

Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/PartitionWrapper.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/PartitionWrapper.java?rev=1425174&r1=1425173&r2=1425174&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/PartitionWrapper.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/PartitionWrapper.java Fri Dec 21 22:47:37 2012
@@ -45,7 +45,7 @@ public class PartitionWrapper {
     }
 
     public PartitionWrapper(HCatURI hcatUri) {
-        this(hcatUri.getServerEndPoint(), hcatUri.getDb(), hcatUri.getTable(), hcatUri.getPartitionMap());
+        this(hcatUri.getServer(), hcatUri.getDb(), hcatUri.getTable(), hcatUri.getPartitionMap());
     }
 
     public PartitionWrapper(String partURI) throws URISyntaxException {

Modified: oozie/branches/hcat-intre/core/src/main/resources/oozie-default.xml
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/resources/oozie-default.xml?rev=1425174&r1=1425173&r2=1425174&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/resources/oozie-default.xml (original)
+++ oozie/branches/hcat-intre/core/src/main/resources/oozie-default.xml Fri Dec 21 22:47:37 2012
@@ -143,6 +143,8 @@
         identifies the HCatalog server URL. "default" is used if no endpoint is mentioned 
         in the query. If some JMS property is not defined, the system will use the property 
         defined jndi.properties. jndi.properties files is retrieved from the application classpath.
+        Mapping rules can also be provided for mapping Hcatalog server names to JMS server.
+        hcat://${1}.${2}.server.com:8020=java.naming.factory.initial#Dummy.Factory;java.naming.provider.url#tcp://broker.${2}:61616
         </description>
     </property>
 

Modified: oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java?rev=1425174&r1=1425173&r2=1425174&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java (original)
+++ oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java Fri Dec 21 22:47:37 2012
@@ -23,7 +23,9 @@ import javax.jms.Session;
 
 import junit.framework.Assert;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.oozie.test.XTestCase;
+import org.junit.Test;
 
 public class TestJMSAccessorService extends XTestCase {
     private Services services;
@@ -42,17 +44,28 @@ public class TestJMSAccessorService exte
         super.tearDown();
     }
 
+    @Test
     public void testService() {
         JMSAccessorService jmsService = services.get(JMSAccessorService.class);
         Assert.assertNotNull(jmsService);
     }
 
+    @Test
+    public void testConnection() throws Exception {
+        JMSAccessorService jmsService = services.get(JMSAccessorService.class);
+        // both servers should connect to default JMS server
+        assertTrue(jmsService.getOrCreateConnection("blahblah"));
+        assertTrue(jmsService.getOrCreateConnection(JMSAccessorService.DEFAULT_SERVER_ENDPOINT));
+    }
+
+    @Test
     public void testConsumer() throws Exception {
         JMSAccessorService jmsService = services.get(JMSAccessorService.class);
+        jmsService.getOrCreateConnection(JMSAccessorService.DEFAULT_SERVER_ENDPOINT);
         MessageConsumer consumer = null;
         try {
             consumer = jmsService.getMessageConsumer(JMSAccessorService.DEFAULT_SERVER_ENDPOINT, "test-topic");
-            assert (consumer != null);
+            assertTrue(consumer != null);
         }
         finally {
             if (consumer != null) {
@@ -62,12 +75,14 @@ public class TestJMSAccessorService exte
         }
     }
 
+    @Test
     public void testProducer() throws Exception {
         JMSAccessorService jmsService = services.get(JMSAccessorService.class);
+        jmsService.getOrCreateConnection(JMSAccessorService.DEFAULT_SERVER_ENDPOINT);
         MessageProducer producer = null;
         try {
             producer = jmsService.getMessageProducer(JMSAccessorService.DEFAULT_SERVER_ENDPOINT, "test-topic");
-            assert (producer != null);
+            assertTrue(producer != null);
         }
         finally {
             if (producer != null) {
@@ -76,12 +91,14 @@ public class TestJMSAccessorService exte
         }
     }
 
+    @Test
     public void testSession() throws Exception {
         JMSAccessorService jmsService = services.get(JMSAccessorService.class);
+        jmsService.getOrCreateConnection(JMSAccessorService.DEFAULT_SERVER_ENDPOINT);
         Session sess = null;
         try {
             sess = jmsService.getSession(JMSAccessorService.DEFAULT_SERVER_ENDPOINT, "test-topic");
-            assert (sess != null);
+            assertTrue(sess != null);
         }
         finally {
             if (sess != null) {
@@ -90,16 +107,13 @@ public class TestJMSAccessorService exte
         }
     }
 
-    public void testConnection() {
-        JMSAccessorService jmsService = services.get(JMSAccessorService.class);
-        JMSAccessorService.ConnectionContext conCtx = jmsService
-                .getConnectionContext(JMSAccessorService.DEFAULT_SERVER_ENDPOINT);
-        assert (conCtx.getConnection() != null);
-    }
-
+    @Test
     public void testSingleConsumerPerTopic() {
 
         try {
+            JMSAccessorService jmsService = services.get(JMSAccessorService.class);
+            String endPoint = "hcat://hcat.server.com:5080";
+            jmsService.getOrCreateConnection(endPoint);
             // Add sample missing partitions belonging to same table
             String partition1 = "hcat://hcat.server.com:5080/mydb/mytable/mypart=10";
             String partition2 = "hcat://hcat.server.com:5080/mydb/mytable/mypart=20";
@@ -113,8 +127,7 @@ public class TestJMSAccessorService exte
             pdms.addMissingPartition(partition2, "action-2");
             //this registers the message receiver
 
-            JMSAccessorService jmsService = services.get(JMSAccessorService.class);
-            String endPoint = JMSAccessorService.DEFAULT_SERVER_ENDPOINT;
+
             assertNotNull(jmsService.getConnectionContext(endPoint));
             assertNotNull(jmsService.getSession(endPoint, topic));
 
@@ -129,8 +142,54 @@ public class TestJMSAccessorService exte
 
         }
         catch (Exception e) {
+            e.printStackTrace();
             fail("Exception encountered : " + e);
         }
 
     }
+
+    @Test
+    public void testGetJMSServerMappingNoDefault() throws ServiceException {
+        services.destroy();
+        services = super.setupServicesForHCatalog();
+        Configuration conf = services.getConf();
+        String server2 = "hcat://${1}.${2}.server.com:8020=java.naming.factory.initial#Dummy.Factory;java.naming.provider.url#tcp://broker.${2}:61616";
+        String server3 = "hcat://xyz.corp.dummy.com=java.naming.factory.initial#Dummy.Factory;java.naming.provider.url#tcp:localhost:61616";
+
+        String jmsConnectionURL = server2 + "," + server3;
+        conf.set(JMSAccessorService.JMS_CONNECTIONS_PROPERTIES, jmsConnectionURL);
+        services.init();
+
+        JMSAccessorService jmsService = services.get(JMSAccessorService.class);
+        // No default JMS mapping
+        String jmsServerMapping = jmsService.getJMSServerMapping("UNKNOWN_SERVER");
+        assertNull(jmsServerMapping);
+    }
+
+    @Test
+    public void testGetJMSServerMapping() throws ServiceException{
+        services.destroy();
+        services = super.setupServicesForHCatalog();
+        Configuration conf = services.getConf();
+        String server1 = "default=java.naming.factory.initial#org.apache.activemq.jndi.ActiveMQInitialContextFactory;java.naming.provider.url#vm://localhost?broker.persistent=false";
+        String server2 = "hcat://${1}.${2}.server.com:8020=java.naming.factory.initial#Dummy.Factory;java.naming.provider.url#tcp://broker.${2}:61616";
+        String server3 = "hcat://xyz.corp.dummy.com=java.naming.factory.initial#Dummy.Factory;java.naming.provider.url#tcp:localhost:61616";
+
+        String jmsConnectionURL = server1+","+server2+","+server3;
+        conf.set(JMSAccessorService.JMS_CONNECTIONS_PROPERTIES, jmsConnectionURL);
+        services.init();
+
+
+        JMSAccessorService jmsService = services.get(JMSAccessorService.class);
+        String jmsServerMapping = jmsService.getJMSServerMapping("hcat://axoniteblue-1.blue.server.com:8020");
+        // rules will be applied
+        assertEquals("java.naming.factory.initial#Dummy.Factory;java.naming.provider.url#tcp://broker.blue:61616", jmsServerMapping);
+
+        jmsServerMapping = jmsService.getJMSServerMapping("UNKNOWN_SERVER");
+        // will map to default
+        assertEquals("java.naming.factory.initial#org.apache.activemq.jndi.ActiveMQInitialContextFactory;java.naming.provider.url#vm://localhost?broker.persistent=false", jmsServerMapping);
+
+        jmsServerMapping = jmsService.getJMSServerMapping("hcat://xyz.corp.dummy.com");
+        assertEquals("java.naming.factory.initial#Dummy.Factory;java.naming.provider.url#tcp:localhost:61616", jmsServerMapping);
+    }
 }

Modified: oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java?rev=1425174&r1=1425173&r2=1425174&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java (original)
+++ oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java Fri Dec 21 22:47:37 2012
@@ -18,9 +18,11 @@
 package org.apache.oozie.service;
 
 import java.net.URISyntaxException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.oozie.ErrorCode;
 import org.apache.oozie.service.PartitionDependencyManagerService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.test.XDataTestCase;
@@ -38,17 +40,18 @@ import org.junit.Test;
  */
 public class TestPartitionDependencyManagerService extends XDataTestCase {
 
+    private Services services;
     @Before
     protected void setUp() throws Exception {
         super.setUp();
         setSystemProperty(PartitionDependencyManagerService.MAP_MAX_WEIGHTED_CAPACITY, "100");
-        Services services = super.setupServicesForHCatalog();
+        services = super.setupServicesForHCatalog();
         services.init();
     }
 
     @After
     protected void tearDown() throws Exception {
-        Services.get().destroy();
+        services.destroy();
         super.tearDown();
     }
 
@@ -73,14 +76,15 @@ public class TestPartitionDependencyMana
      */
     @Test
     public void testAddMissingPartition() throws MetadataServiceException, URISyntaxException {
-        Services services = Services.get();
         PartitionDependencyManagerService pdms = services.get(PartitionDependencyManagerService.class);
         String newHCatDependency = "hcat://hcat.server.com:5080/mydb/clicks/datastamp=12&region=us";
+        JMSAccessorService jmsService = services.get(JMSAccessorService.class);
+        jmsService.getOrCreateConnection("hcat://hcat.server.com:5080");
         String actionId = "myAction";
         pdms.addMissingPartition(newHCatDependency, actionId);
 
         HCatURI hcatUri = new HCatURI(newHCatDependency);
-        Map<String, PartitionsGroup> tablePartitionsMap = pdms.getHCatMap().get(hcatUri.getServerEndPoint() + "#" +
+        Map<String, PartitionsGroup> tablePartitionsMap = pdms.getHCatMap().get(hcatUri.getServer() + "#" +
                                                                             hcatUri.getDb()); // clicks
         assertNotNull(tablePartitionsMap);
         assertTrue(tablePartitionsMap.containsKey("clicks"));
@@ -105,11 +109,13 @@ public class TestPartitionDependencyMana
         Services services = Services.get();
         PartitionDependencyManagerService pdms = services.get(PartitionDependencyManagerService.class);
         String newHCatDependency = "hcat://hcat.server.com:5080/mydb/clicks/datastamp=12&region=us";
+        JMSAccessorService jmsService = services.get(JMSAccessorService.class);
+        jmsService.getOrCreateConnection("hcat://hcat.server.com:5080");
         String actionId = "myAction";
         pdms.addMissingPartition(newHCatDependency, actionId);
 
         HCatURI hcatUri = new HCatURI(newHCatDependency);
-        Map<String, PartitionsGroup> tablePartitionsMap = pdms.getHCatMap().get(hcatUri.getServerEndPoint() + "#" +
+        Map<String, PartitionsGroup> tablePartitionsMap = pdms.getHCatMap().get(hcatUri.getServer() + "#" +
                                                                             hcatUri.getDb()); // clicks
         assertNotNull(tablePartitionsMap);
         assertTrue(tablePartitionsMap.containsKey("clicks"));
@@ -139,11 +145,13 @@ public class TestPartitionDependencyMana
         Services services = Services.get();
         PartitionDependencyManagerService pdms = services.get(PartitionDependencyManagerService.class);
         String newHCatDependency = "hcat://hcat.server.com:5080/mydb/clicks/datastamp=12&region=us";
+        JMSAccessorService jmsService = services.get(JMSAccessorService.class);
+        jmsService.getOrCreateConnection("hcat://hcat.server.com:5080");
         String actionId = "myAction";
         pdms.addMissingPartition(newHCatDependency, actionId);
 
         HCatURI hcatUri = new HCatURI(newHCatDependency);
-        Map<String, PartitionsGroup> tablePartitionsMap = pdms.getHCatMap().get(hcatUri.getServerEndPoint() + "#" +
+        Map<String, PartitionsGroup> tablePartitionsMap = pdms.getHCatMap().get(hcatUri.getServer() + "#" +
                                                                             hcatUri.getDb()); // clicks
         assertNotNull(tablePartitionsMap);
         assertTrue(tablePartitionsMap.containsKey("clicks"));
@@ -171,6 +179,8 @@ public class TestPartitionDependencyMana
         PartitionDependencyManagerService pdms = services.get(PartitionDependencyManagerService.class);
         String newHCatDependency1 = "hcat://hcat.server.com:5080/mydb/clicks/datastamp=12";
         String newHCatDependency2 = "hcat://hcat.server.com:5080/mydb/clicks/datastamp=12&region=us";
+        JMSAccessorService jmsService = services.get(JMSAccessorService.class);
+        jmsService.getOrCreateConnection("hcat://hcat.server.com:5080");
         String actionId1 = "1";
         String actionId2 = "2";
         pdms.addMissingPartition(newHCatDependency1, actionId1);
@@ -179,7 +189,7 @@ public class TestPartitionDependencyMana
         pdms.removeActionFromMissingPartitions(newHCatDependency2, actionId2);
 
         HCatURI hcatUri = new HCatURI(newHCatDependency1);
-        String prefix = PartitionWrapper.makePrefix(hcatUri.getServerEndPoint(), hcatUri.getDb());
+        String prefix = PartitionWrapper.makePrefix(hcatUri.getServer(), hcatUri.getDb());
         Map<String, PartitionsGroup> tablePartitionsMap = pdms.getHCatMap().get(prefix);
         PartitionsGroup missingPartitions = tablePartitionsMap.get(hcatUri.getTable());
         assertNotNull(missingPartitions);
@@ -190,4 +200,54 @@ public class TestPartitionDependencyMana
         assertFalse(actions.getActions().contains(actionId2));
     }
 
+
+
+    /**
+     * Test removal of partitions from Available map
+     */
+    @Test
+    public void testRemovePartitionsFromAvailMap() {
+        try {
+            Services services = Services.get();
+            PartitionDependencyManagerService pdms = services.get(PartitionDependencyManagerService.class);
+            String newHCatDependency1 = "hcat://hcat.server.com:5080/mydb/clicks/datastamp=12";
+            String newHCatDependency2 = "hcat://hcat.server.com:5080/mydb/clicks/datastamp=12&region=us";
+            String newHCatDependency3 = "hcat://hcat.server.com:5080/mydb/clicks/datastamp=13&region=us";
+            String actionId1 = "1";
+
+            HCatURI uri = new HCatURI(newHCatDependency1);
+            PartitionWrapper partition1 = new PartitionWrapper(uri.getServer(), uri.getDb(), uri.getTable(),
+                    uri.getPartitionMap());
+            uri = new HCatURI(newHCatDependency2);
+            PartitionWrapper partition2 = new PartitionWrapper(uri.getServer(), uri.getDb(), uri.getTable(),
+                    uri.getPartitionMap());
+            uri = new HCatURI(newHCatDependency3);
+            PartitionWrapper partition3 = new PartitionWrapper(uri.getServer(), uri.getDb(), uri.getTable(),
+                    uri.getPartitionMap());
+
+            List<PartitionWrapper> partitionList = new ArrayList<PartitionWrapper>();
+            partitionList.add(partition1);
+            partitionList.add(partition2);
+            partitionList.add(partition3);
+
+            Map<String, List<PartitionWrapper>> availMap = pdms.getAvailableMap();
+            // Add 3 partitions to availMap for a given action
+            availMap.put(actionId1, partitionList);
+
+            List<PartitionWrapper> availPartitionList = new ArrayList<PartitionWrapper>();
+            availPartitionList.add(partition1);
+            availPartitionList.add(partition2);
+            // add two partitions to be removed
+            assertTrue(pdms.removeAvailablePartitions(availPartitionList, actionId1));
+            // check if the size of avail map reduces to 1
+            assertEquals(1, pdms.getAvailableMap().get(actionId1).size());
+
+        }
+        catch (Exception e) {
+            e.printStackTrace();
+            fail("Unexpected exception " + e);
+        }
+    }
+
+
 }

Modified: oozie/branches/hcat-intre/release-log.txt
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/release-log.txt?rev=1425174&r1=1425173&r2=1425174&view=diff
==============================================================================
--- oozie/branches/hcat-intre/release-log.txt (original)
+++ oozie/branches/hcat-intre/release-log.txt Fri Dec 21 22:47:37 2012
@@ -1,5 +1,6 @@
 -- Oozie 3.4.0 release (trunk - unreleased)
 
+OOZIE-1138 Provide rule based mechanism to allow multiple hcatalog servers to connect to JMS server (virag)
 OOZIE-1124 Split pig unit tests to a separate module (rohini via virag)
 OOZIE-1111 change HCatURI to specify partitions in path instead of query parameter (rohini,ryota via virag)
 OOZIE-1108 Fix JMS message consumer to maintain single session per topic registration (mona)