You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by vi...@apache.org on 2013/02/07 21:10:57 UTC

svn commit: r1443694 [2/3] - in /oozie/branches/hcat-intre: ./ core/src/main/java/org/apache/oozie/action/hadoop/ core/src/main/java/org/apache/oozie/client/rest/ core/src/main/java/org/apache/oozie/command/coord/ core/src/main/java/org/apache/oozie/co...

Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/URIHandler.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/URIHandler.java?rev=1443694&r1=1443693&r2=1443694&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/URIHandler.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/URIHandler.java Thu Feb  7 20:10:55 2013
@@ -18,57 +18,83 @@
 package org.apache.oozie.dependency;
 
 import java.net.URI;
-import java.util.Collection;
+import java.util.List;
 import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.oozie.service.URIAccessorException;
-import org.jdom.Element;
+import org.apache.oozie.action.hadoop.LauncherURIHandler;
 
-public abstract class URIHandler {
+public interface URIHandler {
 
-    public abstract void init(Configuration conf, boolean isFrontEnd);
+    /**
+     * Type of the dependency. PULL dependencies are those whose availability is determined by
+     * polling and PUSH dependencies are those whose availability is known from notifications
+     */
+    public enum DependencyType {
+        PULL,
+        PUSH;
+    }
+
+    /**
+     * Initialize the URIHandler
+     *
+     * @param conf Configuration for initialization
+     */
+    public void init(Configuration conf);
 
     /**
      * Get the list of uri schemes supported by this URIHandler
      *
      * @return supported list of uri schemes
      */
-    public abstract Set<String> getSupportedSchemes();
+    public Set<String> getSupportedSchemes();
 
-    /** Get the list of dependent classes to ship to the hadoop launcher job for prepare actions
-     * @return dependent classes to ship to the hadoop job
+    /**
+     * Get the URIHandler that will be used to handle the supported schemes in launcher
+     *
+     * @return LauncherURIHandler that handles URI in the launcher
+     */
+    public Class<? extends LauncherURIHandler> getLauncherURIHandlerClass();
+
+    /**
+     * Get list of classes to ship to launcher for LauncherURIHandler
+     *
+     * @return list of classes to ship to launcher
      */
-    public abstract Collection<Class<?>> getClassesToShip();
+    public List<Class<?>> getClassesForLauncher();
 
     /**
-     * Get the type of dependency type of the URI. When the availability of the
+     * Get the dependency type of the URI. When the availability of the
      * URI is to be determined by polling the type is DependencyType.PULL, and
      * when the availability is received through notifications from a external
      * entity like a JMS server the type is DependencyType.PUSH
      *
      * @return dependency type of URI
      */
-    public abstract DependencyType getDependencyType(URI uri) throws URIAccessorException;
+    public DependencyType getDependencyType(URI uri) throws URIHandlerException;
 
     /**
      * Register for notifications in case of a push dependency
      *
-     * @param uri The URI to check for availability
-     * @param actionID The id of action which depends on the availability of the
-     *        uri.
+     * @param uri  The URI to check for availability
+     * @param conf Configuration to access the URI
+     * @param user name of the user the URI should be accessed as
+     * @param actionID The id of action which depends on the availability of the uri
+     *
+     * @throws URIHandlerException
      */
-    public abstract void registerForNotification(URI uri, Configuration conf, String user, String actionID)
-            throws URIAccessorException;
+    public void registerForNotification(URI uri, Configuration conf, String user, String actionID)
+            throws URIHandlerException;
 
     /**
      * Unregister from notifications in case of a push dependency
+     *
      * @param uri The URI to be removed from missing dependency
      * @param actionID The id of action which was dependent on the uri.
      *
-     * @throws URIAccessorException
+     * @throws URIHandlerException
      */
-    public abstract boolean unregisterFromNotification(URI uri, String actionID);
+    public boolean unregisterFromNotification(URI uri, String actionID);
 
     /**
      * Get the URIContext which can be used to access URI of the same scheme and
@@ -80,24 +106,9 @@ public abstract class URIHandler {
      *
      * @return Context to access URIs with same scheme and host
      *
-     * @throws URIAccessorException
+     * @throws URIHandlerException
      */
-    public abstract URIContext getURIContext(URI uri, Configuration conf, String user) throws URIAccessorException;
-
-    /**
-     * Create the resource identified by the URI
-     *
-     * @param uri URI of the dependency
-     * @param conf Configuration to access the URI
-     * @param user name of the user the URI should be accessed as. If null the
-     *        logged in user is used.
-     *
-     * @return <code>true</code> if the URI did not exist and was successfully
-     *         created; <code>false</code> if the URI already existed
-     *
-     * @throws URIAccessorException
-     */
-    public abstract boolean create(URI uri, Configuration conf, String user) throws URIAccessorException;
+    public URIContext getURIContext(URI uri, Configuration conf, String user) throws URIHandlerException;
 
     /**
      * Check if the dependency identified by the URI is available
@@ -108,9 +119,9 @@ public abstract class URIHandler {
      * @return <code>true</code> if the URI exists; <code>false</code> if the
      *         URI does not exist
      *
-     * @throws URIAccessorException
+     * @throws URIHandlerException
      */
-    public abstract boolean exists(URI uri, URIContext uriContext) throws URIAccessorException;
+    public boolean exists(URI uri, URIContext uriContext) throws URIHandlerException;
 
     /**
      * Check if the dependency identified by the URI is available
@@ -123,23 +134,9 @@ public abstract class URIHandler {
      * @return <code>true</code> if the URI exists; <code>false</code> if the
      *         URI does not exist
      *
-     * @throws URIAccessorException
-     */
-    public abstract boolean exists(URI uri, Configuration conf, String user) throws URIAccessorException;
-
-    /**
-     * Delete the resource identified by the URI
-     *
-     * @param uri URI of the dependency
-     * @param conf Configuration to access the URI
-     * @param user name of the user the URI should be accessed as. If null the
-     *        logged in user is used.
-     *
-     * @return <code>true</code> if the URI exists and was successfully deleted;
-     *         <code>false</code> if the URI does not exist
-     * @throws URIAccessorException
+     * @throws URIHandlerException
      */
-    public abstract boolean delete(URI uri, Configuration conf, String user) throws URIAccessorException;
+    public boolean exists(URI uri, Configuration conf, String user) throws URIHandlerException;
 
     /**
      * Get the URI based on the done flag
@@ -149,30 +146,21 @@ public abstract class URIHandler {
      *
      * @return the final URI with the doneFlag incorporated
      *
-     * @throws URIAccessorException
+     * @throws URIHandlerException
      */
-    public abstract String getURIWithDoneFlag(String uri, Element doneFlagElement) throws URIAccessorException;
-
-    /**
-     * Get the URI based on the done flag
-     *
-     * @param uri URI of the dependency
-     * @param doneFlag flag that determines URI availability
-     *
-     * @return the final URI with the doneFlag incorporated
-     *
-     * @throws URIAccessorException
-     */
-    public abstract String getURIWithDoneFlag(String uri, String doneFlag) throws URIAccessorException;
+    public String getURIWithDoneFlag(String uri, String doneFlag) throws URIHandlerException;
 
     /**
      * Check whether the URI is valid or not
      * @param uri
      * @return
-     * @throws URIAccessorException
+     * @throws URIHandlerException
      */
-    public abstract void validate(String uri) throws URIAccessorException;
+    public void validate(String uri) throws URIHandlerException;
 
-    public abstract void destroy();
+    /**
+     * Destroy the URIHandler
+     */
+    public void destroy();
 
 }

Added: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/URIHandlerException.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/URIHandlerException.java?rev=1443694&view=auto
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/URIHandlerException.java (added)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/URIHandlerException.java Thu Feb  7 20:10:55 2013
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.dependency;
+
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.XException;
+
+public class URIHandlerException extends XException {
+
+    /**
+     * Create an URIHandlerException Exception exception from a XException.
+     *
+     * @param cause the XException cause.
+     */
+    public URIHandlerException(XException cause) {
+        super(cause);
+    }
+
+    /**
+     * Create a URIHandlerException exception.
+     *
+     * @param errorCode error code.
+     * @param params parameters for the error code message template.
+     */
+    public URIHandlerException(ErrorCode errorCode, Object... params) {
+        super(errorCode, params);
+    }
+}

Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/cache/SimpleHCatDependencyCache.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/cache/SimpleHCatDependencyCache.java?rev=1443694&r1=1443693&r2=1443694&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/cache/SimpleHCatDependencyCache.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/cache/SimpleHCatDependencyCache.java Thu Feb  7 20:10:55 2013
@@ -29,6 +29,8 @@ import java.util.concurrent.ConcurrentHa
 import java.util.concurrent.ConcurrentMap;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.service.HCatAccessorService;
+import org.apache.oozie.service.Services;
 import org.apache.oozie.util.HCatURI;
 import org.apache.oozie.util.XLog;
 
@@ -42,7 +44,7 @@ public class SimpleHCatDependencyCache i
      * value (us;20120101;CA) - Collection of waiting actions (actionID and original hcat uri as
      * string).
      */
-    private Map<String, Map<String, Map<String, Collection<WaitingAction>>>> missingDeps;
+    private ConcurrentMap<String, ConcurrentMap<String, Map<String, Collection<WaitingAction>>>> missingDeps;
 
     /**
      * Map of actionIDs and collection of available URIs
@@ -50,12 +52,11 @@ public class SimpleHCatDependencyCache i
     private ConcurrentMap<String, Collection<String>> availableDeps;
 
     // TODO:
-    // Synchronization for missingDeps. Currently if removed, depend on CoordPushDependencyCheck
     // Gather and print stats on cache hits and misses.
 
     @Override
     public void init(Configuration conf) {
-        missingDeps = new ConcurrentHashMap<String, Map<String, Map<String, Collection<WaitingAction>>>>();
+        missingDeps = new ConcurrentHashMap<String, ConcurrentMap<String, Map<String, Collection<WaitingAction>>>>();
         availableDeps = new ConcurrentHashMap<String, Collection<String>>();
     }
 
@@ -67,22 +68,29 @@ public class SimpleHCatDependencyCache i
         String partKey = sortedPKV.getPartKeys();
         // Partition values seperated by ;. For eg: 20120101;US;CA
         String partVal = sortedPKV.getPartVals();
-        Map<String, Map<String, Collection<WaitingAction>>> partKeyPatterns = missingDeps.get(tableKey);
+        ConcurrentMap<String, Map<String, Collection<WaitingAction>>> partKeyPatterns = missingDeps.get(tableKey);
         if (partKeyPatterns == null) {
-            partKeyPatterns = new HashMap<String, Map<String, Collection<WaitingAction>>>();
-            missingDeps.put(tableKey, partKeyPatterns);
-        }
-        Map<String, Collection<WaitingAction>> partValues = partKeyPatterns.get(partKey);
-        if (partValues == null) {
-            partValues = new HashMap<String, Collection<WaitingAction>>();
-            partKeyPatterns.put(partKey, partValues);
+            partKeyPatterns = new ConcurrentHashMap<String, Map<String, Collection<WaitingAction>>>();
+            ConcurrentMap<String, Map<String, Collection<WaitingAction>>> existingMap = missingDeps.putIfAbsent(
+                    tableKey, partKeyPatterns);
+            if (existingMap != null) {
+                partKeyPatterns = existingMap;
+            }
         }
-        Collection<WaitingAction> waitingActions = partValues.get(partVal);
-        if (waitingActions == null) {
-            waitingActions = new ArrayList<WaitingAction>();
-            partValues.put(partVal, waitingActions);
+        synchronized (partKeyPatterns) {
+            missingDeps.put(tableKey, partKeyPatterns); // To handle race condition with removal of partKeyPatterns
+            Map<String, Collection<WaitingAction>> partValues = partKeyPatterns.get(partKey);
+            if (partValues == null) {
+                partValues = new HashMap<String, Collection<WaitingAction>>();
+                partKeyPatterns.put(partKey, partValues);
+            }
+            Collection<WaitingAction> waitingActions = partValues.get(partVal);
+            if (waitingActions == null) {
+                waitingActions = new ArrayList<WaitingAction>();
+                partValues.put(partVal, waitingActions);
+            }
+            waitingActions.add(new WaitingAction(actionID, hcatURI.toURIString()));
         }
-        waitingActions.add(new WaitingAction(actionID, hcatURI.toURIString()));
     }
 
     @Override
@@ -97,39 +105,44 @@ public class SimpleHCatDependencyCache i
                     hcatURI.toURIString(), actionID);
             return false;
         }
-        Map<String, Collection<WaitingAction>> partValues = partKeyPatterns.get(partKey);
-        if (partValues == null) {
-            LOG.debug("Remove missing dependency - Missing partition pattern - uri={0}, actionID={1}",
-                    hcatURI.toURIString(), actionID);
-            return false;
-        }
-        Collection<WaitingAction> waitingActions = partValues.get(partVal);
-        if (waitingActions == null) {
-            LOG.debug("Remove missing dependency - Missing partition value - uri={0}, actionID={1}",
-                    hcatURI.toURIString(), actionID);
-            return false;
-        }
-        WaitingAction wAction = null;
-        for (WaitingAction action : waitingActions) {
-            if (action.getActionID().equals(actionID)) {
-                wAction = action;
+        synchronized(partKeyPatterns) {
+            Map<String, Collection<WaitingAction>> partValues = partKeyPatterns.get(partKey);
+            if (partValues == null) {
+                LOG.debug("Remove missing dependency - Missing partition pattern - uri={0}, actionID={1}",
+                        hcatURI.toURIString(), actionID);
+                return false;
             }
-        }
-        boolean removed = waitingActions.remove(wAction);
-        if (!removed) {
-            LOG.debug("Remove missing dependency - Missing action ID - uri={0}, actionID={1}",
-                    hcatURI.toURIString(), actionID);
-        }
-        if (waitingActions.isEmpty()) {
-            partValues.remove(partVal);
-            if (partValues.isEmpty()) {
-                partKeyPatterns.remove(partKey);
+            Collection<WaitingAction> waitingActions = partValues.get(partVal);
+            if (waitingActions == null) {
+                LOG.debug("Remove missing dependency - Missing partition value - uri={0}, actionID={1}",
+                        hcatURI.toURIString(), actionID);
+                return false;
             }
-            if (partKeyPatterns.isEmpty()) {
-                missingDeps.remove(tableKey);
+            WaitingAction wAction = null;
+            for (WaitingAction action : waitingActions) {
+                if (action.getActionID().equals(actionID)) {
+                    wAction = action;
+                }
+            }
+            boolean removed = waitingActions.remove(wAction);
+            if (!removed) {
+                LOG.debug("Remove missing dependency - Missing action ID - uri={0}, actionID={1}",
+                        hcatURI.toURIString(), actionID);
             }
+            if (waitingActions.isEmpty()) {
+                partValues.remove(partVal);
+                if (partValues.isEmpty()) {
+                    partKeyPatterns.remove(partKey);
+                }
+                if (partKeyPatterns.isEmpty()) {
+                    missingDeps.remove(tableKey);
+                    // Close JMS session. Stop listening on topic
+                    HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
+                    hcatService.unregisterFromNotification(hcatURI);
+                }
+            }
+            return removed;
         }
-        return removed;
     }
 
     @Override
@@ -173,50 +186,54 @@ public class SimpleHCatDependencyCache i
         Collection<String> actionsWithAvailDep = new HashSet<String>();
         List<String> partKeysToRemove = new ArrayList<String>();
         StringBuilder partValSB = new StringBuilder();
-        // If partition patterns are date, date;country and date;country;state,
-        // construct the partition values for each pattern from the available partitions map and for
-        // the matching value in the dependency map, get the waiting actions.
-        for (Entry<String, Map<String, Collection<WaitingAction>>> entry : partKeyPatterns.entrySet()) {
-            String[] partKeys = entry.getKey().split(DELIMITER);
-            partValSB.setLength(0);
-            for (String key : partKeys) {
-                partValSB.append(partitions.get(key)).append(DELIMITER);
-            }
-            partValSB.setLength(partValSB.length() - 1);
-
-            Map<String, Collection<WaitingAction>> partValues = entry.getValue();
-            String partVal = partValSB.toString();
-            Collection<WaitingAction> wActions = entry.getValue().get(partVal);
-            if (wActions == null)
-                continue;
-            for (WaitingAction wAction : wActions) {
-                String actionID = wAction.getActionID();
-                actionsWithAvailDep.add(actionID);
-                Collection<String> depURIs = availableDeps.get(actionID);
-                if (depURIs == null) {
-                    depURIs = new ArrayList<String>();
-                    Collection<String> existing = availableDeps.putIfAbsent(actionID, depURIs);
-                    if (existing != null) {
-                        depURIs = existing;
+        synchronized (partKeyPatterns) {
+            // If partition patterns are date, date;country and date;country;state,
+            // construct the partition values for each pattern from the available partitions map and
+            // for the matching value in the dependency map, get the waiting actions.
+            for (Entry<String, Map<String, Collection<WaitingAction>>> entry : partKeyPatterns.entrySet()) {
+                String[] partKeys = entry.getKey().split(DELIMITER);
+                partValSB.setLength(0);
+                for (String key : partKeys) {
+                    partValSB.append(partitions.get(key)).append(DELIMITER);
+                }
+                partValSB.setLength(partValSB.length() - 1);
+
+                Map<String, Collection<WaitingAction>> partValues = entry.getValue();
+                String partVal = partValSB.toString();
+                Collection<WaitingAction> wActions = entry.getValue().get(partVal);
+                if (wActions == null)
+                    continue;
+                for (WaitingAction wAction : wActions) {
+                    String actionID = wAction.getActionID();
+                    actionsWithAvailDep.add(actionID);
+                    Collection<String> depURIs = availableDeps.get(actionID);
+                    if (depURIs == null) {
+                        depURIs = new ArrayList<String>();
+                        Collection<String> existing = availableDeps.putIfAbsent(actionID, depURIs);
+                        if (existing != null) {
+                            depURIs = existing;
+                        }
+                    }
+                    synchronized (depURIs) {
+                        depURIs.add(wAction.getDependencyURI());
+                        availableDeps.put(actionID, depURIs);
                     }
                 }
-                synchronized (depURIs) {
-                    depURIs.add(wAction.getDependencyURI());
-                    availableDeps.put(actionID, depURIs);
+                partValues.remove(partVal);
+                if (partValues.isEmpty()) {
+                    partKeysToRemove.add(entry.getKey());
                 }
             }
-            partValues.remove(partVal);
-            if (partValues.isEmpty()) {
-                partKeysToRemove.add(entry.getKey());
+            for (String partKey : partKeysToRemove) {
+                partKeyPatterns.remove(partKey);
+            }
+            if (partKeyPatterns.isEmpty()) {
+                missingDeps.remove(tableKey);
+                // Close JMS session. Stop listening on topic
+                HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
+                hcatService.unregisterFromNotification(server, db, table);
             }
         }
-        for (String partKey : partKeysToRemove) {
-            partKeyPatterns.remove(partKey);
-        }
-        if (partKeyPatterns.isEmpty()) {
-            missingDeps.remove(tableKey);
-            // TODO: Close JMS session. Stop listening on topic
-        }
         return actionsWithAvailDep;
     }
 

Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/ConnectionContext.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/ConnectionContext.java?rev=1443694&r1=1443693&r2=1443694&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/ConnectionContext.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/ConnectionContext.java Thu Feb  7 20:10:55 2013
@@ -19,8 +19,6 @@ package org.apache.oozie.jms;
 
 import java.util.Properties;
 
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
 import javax.jms.ExceptionListener;
 import javax.jms.JMSException;
 import javax.jms.MessageConsumer;
@@ -34,19 +32,11 @@ import javax.naming.NamingException;
 public interface ConnectionContext {
 
     /**
-     * Create connection factory using properties
+     * Create connection using properties
      * @param props the properties used for creating jndi context
-     * @return
-     * @throws NamingException
-     */
-    public ConnectionFactory createConnectionFactory(Properties props) throws NamingException;
-
-    /**
-     * Create connection using connection Factory
-     * @param connFactory
      * @throws JMSException
      */
-    public void createConnection(ConnectionFactory connFactory) throws JMSException;
+    public void createConnection(Properties props) throws NamingException, JMSException;
 
     /**
     * Set the exception Listener
@@ -87,18 +77,6 @@ public interface ConnectionContext {
     public MessageProducer createProducer(Session session, String topicName) throws JMSException;
 
     /**
-     * Retrieves the connection for this connection context
-     * @return
-     */
-    public Connection getConnection();
-
-    /**
-     * Retrieves the conneciton factory name for this context
-     * @return
-     */
-    public String getConnectionFactoryName();
-
-    /**
      * Closes the connection
      */
     public void close();

Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/DefaultConnectionContext.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/DefaultConnectionContext.java?rev=1443694&r1=1443693&r2=1443694&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/DefaultConnectionContext.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/DefaultConnectionContext.java Thu Feb  7 20:10:55 2013
@@ -40,7 +40,7 @@ public class DefaultConnectionContext im
     private static XLog LOG = XLog.getLog(ConnectionContext.class);
 
     @Override
-    public ConnectionFactory createConnectionFactory(Properties props) throws NamingException {
+    public void createConnection(Properties props) throws NamingException, JMSException {
         Context jndiContext = new InitialContext(props);
         connectionFactoryName = (String) jndiContext.getEnvironment().get("connectionFactoryNames");
         if (connectionFactoryName == null || connectionFactoryName.trim().length() == 0) {
@@ -48,14 +48,8 @@ public class DefaultConnectionContext im
         }
         ConnectionFactory connectionFactory = (ConnectionFactory) jndiContext.lookup(connectionFactoryName);
         LOG.info("Connecting with the following properties \n" + jndiContext.getEnvironment().toString());
-        return connectionFactory;
-
-    }
-
-    @Override
-    public void createConnection(ConnectionFactory connFactory) throws JMSException {
         try {
-            connection = connFactory.createConnection();
+            connection = connectionFactory.createConnection();
             connection.start();
             connection.setExceptionListener(new ExceptionListener() {
                 @Override
@@ -79,9 +73,10 @@ public class DefaultConnectionContext im
         }
     }
 
+
     @Override
     public boolean isConnectionInitialized() {
-        return (connection != null) ? true : false;
+        return connection != null;
     }
 
     @Override
@@ -109,16 +104,6 @@ public class DefaultConnectionContext im
     }
 
     @Override
-    public Connection getConnection() {
-        return connection;
-    }
-
-    @Override
-    public String getConnectionFactoryName() {
-        return connectionFactoryName;
-    }
-
-    @Override
     public void close() {
         try {
             connection.close();

Added: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/JMSConnectionInfo.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/JMSConnectionInfo.java?rev=1443694&view=auto
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/JMSConnectionInfo.java (added)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/JMSConnectionInfo.java Thu Feb  7 20:10:55 2013
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.jms;
+
+import java.util.Properties;
+
+import org.apache.oozie.util.XLog;
+
+public class JMSConnectionInfo {
+
+    private String jndiPropertiesString;
+    private Properties props;
+
+    /**
+     * Create a JMSConnectionInfo
+     * @param jndiPropertiesString JNDI properties with # as key value delimiter and ; as properties delimiter
+     */
+    public JMSConnectionInfo(String jndiPropertiesString) {
+        this.jndiPropertiesString = jndiPropertiesString;
+        initializeProps();
+    }
+
+    private void initializeProps() {
+        this.props = new Properties();
+        String[] propArr = jndiPropertiesString.split(";");
+        for (String pair : propArr) {
+            String[] kV = pair.split("#");
+            if (kV.length > 1) {
+                props.put(kV[0].trim(), kV[1].trim());
+            }
+            else {
+                XLog.getLog(getClass()).warn("Unformatted properties. Expected key#value : " + pair);
+                props = null;
+            }
+        }
+        if (props.isEmpty()) {
+            props = null;
+        }
+    }
+
+    /**
+     * Get JNDI properties to establish a JMS connection
+     * @return JNDI properties
+     */
+    public Properties getJNDIProperties() {
+        return props;
+    }
+
+    /**
+     * Return JNDI properties string
+     * @return JNDI properties string
+     */
+    public String getJNDIPropertiesString() {
+        return jndiPropertiesString;
+    }
+
+    @Override
+    public int hashCode() {
+        return 31 + ((jndiPropertiesString == null) ? 0 : jndiPropertiesString.hashCode());
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        JMSConnectionInfo other = (JMSConnectionInfo) obj;
+        if (jndiPropertiesString == null) {
+            if (other.jndiPropertiesString != null)
+                return false;
+        }
+        else if (!jndiPropertiesString.equals(other.jndiPropertiesString))
+            return false;
+        return true;
+    }
+
+    @Override
+    public String toString() {
+        return "JMSConnectionInfo [jndiProperties=" + jndiPropertiesString + "]";
+    }
+
+}

Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/JMSExceptionListener.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/JMSExceptionListener.java?rev=1443694&r1=1443693&r2=1443694&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/JMSExceptionListener.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/JMSExceptionListener.java Thu Feb  7 20:10:55 2013
@@ -27,7 +27,7 @@ import org.apache.oozie.util.XLog;
 public class JMSExceptionListener implements ExceptionListener {
 
     private static XLog LOG = XLog.getLog(JMSExceptionListener.class);
-    private String jmsConnectString;
+    private JMSConnectionInfo connInfo;
     private ConnectionContext connCtxt;
 
     /**
@@ -36,17 +36,17 @@ public class JMSExceptionListener implem
      * @param jmsConnectString  The connect string specifiying parameters for JMS connection
      * @param connCtxt  The actual connection on which this listener will be registered
      */
-    public JMSExceptionListener(String jmsConnectString, ConnectionContext connCtxt) {
-        this.jmsConnectString = jmsConnectString;
+    public JMSExceptionListener(JMSConnectionInfo connInfo, ConnectionContext connCtxt) {
+        this.connInfo = connInfo;
         this.connCtxt = connCtxt;
     }
 
     @Override
     public void onException(JMSException exception) {
-        LOG.warn("Received JMSException for connection [{0}]. Reestablishing connection", jmsConnectString, exception);
+        LOG.warn("Received JMSException for [{0}]. Reestablishing connection", connInfo, exception);
         connCtxt.close();
         JMSAccessorService jmsService = Services.get().get(JMSAccessorService.class);
-        jmsService.reestablishConnection(jmsConnectString);
+        jmsService.reestablishConnection(connInfo);
     }
 
 }

Added: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/HCatAccessorService.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/HCatAccessorService.java?rev=1443694&view=auto
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/HCatAccessorService.java (added)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/HCatAccessorService.java Thu Feb  7 20:10:55 2013
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.service;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.jms.HCatMessageHandler;
+import org.apache.oozie.jms.JMSConnectionInfo;
+import org.apache.oozie.util.HCatURI;
+import org.apache.oozie.util.MappingRule;
+import org.apache.oozie.util.XLog;
+
+public class HCatAccessorService implements Service {
+
+    public static final String CONF_PREFIX = Service.CONF_PREFIX + "HCatAccessorService.";
+    public static final String JMS_CONNECTIONS_PROPERTIES = CONF_PREFIX + "jmsconnections";
+
+    private static XLog LOG;
+    private static String DELIMITER = "#";
+    private Configuration conf;
+    private JMSAccessorService jmsService;
+    private List<MappingRule> mappingRules;
+    private JMSConnectionInfo defaultJMSConnInfo;
+    /**
+     * Map of publisher(host:port) to JMS connection info
+     */
+    private Map<String, JMSConnectionInfo> publisherJMSConnInfoMap;
+    /**
+     * Mapping of table to the topic name for the table
+     */
+    private Map<String, String> registeredTopicsMap;
+
+    @Override
+    public void init(Services services) throws ServiceException {
+        LOG = XLog.getLog(getClass());
+        conf = services.getConf();
+        this.jmsService = services.get(JMSAccessorService.class);
+        initializeMappingRules();
+        this.publisherJMSConnInfoMap = new HashMap<String, JMSConnectionInfo>();
+        this.registeredTopicsMap = new HashMap<String, String>();
+    }
+
+    private void initializeMappingRules() {
+        String[] connections = conf.getStrings(JMS_CONNECTIONS_PROPERTIES);
+        if (connections != null) {
+            mappingRules = new ArrayList<MappingRule>(connections.length);
+            for (String connection : connections) {
+                String[] values = connection.split("=", 2);
+                String key = values[0].trim();
+                String value = values[1].trim();
+                if (key.equals("default")) {
+                    defaultJMSConnInfo = new JMSConnectionInfo(value);
+                }
+                else {
+                    mappingRules.add(new MappingRule(key, value));
+                }
+            }
+        }
+        else {
+            LOG.warn("No JMS connection defined");
+        }
+    }
+
+    /**
+     * Determine whether a given source URI publishes JMS messages
+     *
+     * @param sourceURI URI of the publisher
+     * @return true if we have JMS connection information for the source URI, else false
+     */
+    public boolean isKnownPublisher(URI sourceURI) {
+        if (publisherJMSConnInfoMap.containsKey(sourceURI.getAuthority())) {
+            return true;
+        }
+        else {
+            return getJMSConnectionInfo(sourceURI) != null;
+        }
+    }
+
+    /**
+     * Given a publisher host:port return the connection details of JMS server that the publisher
+     * publishes to
+     *
+     * @param publisherURI URI of the publisher
+     * @return JMSConnectionInfo to connect to the JMS server that the publisher publishes to
+     */
+    public JMSConnectionInfo getJMSConnectionInfo(URI publisherURI) {
+        String publisherAuthority = publisherURI.getAuthority();
+        if (publisherJMSConnInfoMap.containsKey(publisherAuthority)) {
+            return publisherJMSConnInfoMap.get(publisherAuthority);
+        }
+        else {
+            String schemeWithAuthority = publisherURI.getScheme() + "://" + publisherAuthority;
+            for (MappingRule mr : mappingRules) {
+                String jndiPropertiesString = mr.applyRule(schemeWithAuthority);
+                if (jndiPropertiesString != null) {
+                    JMSConnectionInfo connInfo = new JMSConnectionInfo(jndiPropertiesString);
+                    publisherJMSConnInfoMap.put(publisherAuthority, connInfo);
+                    return connInfo;
+                }
+            }
+            publisherJMSConnInfoMap.put(publisherAuthority, defaultJMSConnInfo);
+            return defaultJMSConnInfo;
+        }
+    }
+
+    /**
+     * Check if we are already listening to the JMS topic for the table in the given hcatURI
+     *
+     * @param hcatURI hcatalog partition URI
+     * @return true if registered to a JMS topic for the table in the given hcatURI
+     */
+    public boolean isRegisteredForNotification(HCatURI hcatURI) {
+        return registeredTopicsMap.containsKey(hcatURI.getURI().getAuthority() + DELIMITER + hcatURI.getDb()
+                + DELIMITER + hcatURI.getTable());
+    }
+
+    /**
+     * Register for notifications on a JMS topic for the specified hcatalog table.
+     *
+     * @param hcatURI hcatalog partition URI
+     * @param topic JMS topic to register to
+     * @param msgHandler Handler which will process the messages received on the topic
+     */
+    public void registerForNotification(HCatURI hcatURI, String topic, HCatMessageHandler msgHandler) {
+        JMSConnectionInfo connInfo = getJMSConnectionInfo(hcatURI.getURI());
+        jmsService.registerForNotification(connInfo, topic, msgHandler);
+        registeredTopicsMap.put(
+                hcatURI.getURI().getAuthority() + DELIMITER + hcatURI.getDb() + DELIMITER + hcatURI.getTable(), topic);
+    }
+
+    public void unregisterFromNotification(HCatURI hcatURI) {
+        String topic = registeredTopicsMap.remove(hcatURI.getURI().getAuthority() + DELIMITER + hcatURI.getDb()
+                + DELIMITER + hcatURI.getTable());
+        if (topic != null) {
+            JMSConnectionInfo connInfo = getJMSConnectionInfo(hcatURI.getURI());
+            jmsService.unregisterFromNotification(connInfo, topic);
+        }
+    }
+
+    public void unregisterFromNotification(String server, String database, String table) {
+        String key = server + DELIMITER + database + DELIMITER + table;
+        String topic = registeredTopicsMap.remove(key);
+        if (topic != null) {
+            try {
+                JMSConnectionInfo connInfo = getJMSConnectionInfo(new URI("hcat://" + server));
+                jmsService.unregisterFromNotification(connInfo, topic);
+            }
+            catch (URISyntaxException e) {
+                LOG.warn("Error unregistering from notification for topic [{0}]. Hcat table=[{1}]", topic, key, e);
+            }
+        }
+    }
+
+    @Override
+    public void destroy() {
+        publisherJMSConnInfoMap.clear();
+    }
+
+    @Override
+    public Class<? extends Service> getInterface() {
+        return HCatAccessorService.class;
+    }
+
+}

Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/HadoopAccessorException.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/HadoopAccessorException.java?rev=1443694&r1=1443693&r2=1443694&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/HadoopAccessorException.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/HadoopAccessorException.java Thu Feb  7 20:10:55 2013
@@ -19,8 +19,9 @@ package org.apache.oozie.service;
 
 import org.apache.oozie.XException;
 import org.apache.oozie.ErrorCode;
+import org.apache.oozie.dependency.URIHandlerException;
 
-public class HadoopAccessorException extends URIAccessorException {
+public class HadoopAccessorException extends URIHandlerException {
 
     /**
      * Create an HadoopAccessor exception from a XException.

Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java?rev=1443694&r1=1443693&r2=1443694&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java Thu Feb  7 20:10:55 2013
@@ -45,7 +45,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.HashSet;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 
 /**
  * The HadoopAccessorService returns HadoopAccessor instances configured to work on behalf of a user-group. <p/> The
@@ -80,7 +79,7 @@ public class HadoopAccessorService imple
     private Map<String, File> actionConfigDirs = new HashMap<String, File>();
     private Map<String, Map<String, XConfiguration>> actionConfigs = new HashMap<String, Map<String, XConfiguration>>();
 
-    private ConcurrentMap<String, UserGroupInformation> userUgiMap;
+    private UserGroupInformationService ugiService;
 
     /**
      * Supported filesystem schemes for namespace federation
@@ -91,6 +90,7 @@ public class HadoopAccessorService imple
     private boolean allSchemesSupported;
 
     public void init(Services services) throws ServiceException {
+        this.ugiService = services.get(UserGroupInformationService.class);
         init(services.getConf());
     }
 
@@ -128,7 +128,9 @@ public class HadoopAccessorService imple
             UserGroupInformation.setConfiguration(ugiConf);
         }
 
-        userUgiMap = new ConcurrentHashMap<String, UserGroupInformation>();
+        if (ugiService == null) { //for testing purposes, see XFsTestCase
+            this.ugiService = new UserGroupInformationService();
+        }
 
         loadHadoopConfigs(conf);
         preLoadActionConfigs(conf);
@@ -264,13 +266,7 @@ public class HadoopAccessorService imple
     }
 
     private UserGroupInformation getUGI(String user) throws IOException {
-        UserGroupInformation ugi = userUgiMap.get(user);
-        if (ugi == null) {
-            // taking care of a race condition, the latest UGI will be discarded
-            ugi = UserGroupInformation.createProxyUser(user, UserGroupInformation.getLoginUser());
-            userUgiMap.putIfAbsent(user, ugi);
-        }
-        return ugi;
+        return ugiService.getProxyUser(user);
     }
 
     /**

Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/JMSAccessorService.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/JMSAccessorService.java?rev=1443694&r1=1443693&r2=1443694&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/JMSAccessorService.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/JMSAccessorService.java Thu Feb  7 20:10:55 2013
@@ -17,13 +17,11 @@
  */
 package org.apache.oozie.service;
 
-import java.net.URI;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import javax.jms.JMSException;
@@ -33,10 +31,10 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.oozie.jms.ConnectionContext;
 import org.apache.oozie.jms.DefaultConnectionContext;
+import org.apache.oozie.jms.JMSConnectionInfo;
 import org.apache.oozie.jms.JMSExceptionListener;
 import org.apache.oozie.jms.MessageHandler;
 import org.apache.oozie.jms.MessageReceiver;
-import org.apache.oozie.util.MappingRule;
 import org.apache.oozie.util.XLog;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -51,132 +49,75 @@ import com.google.common.annotations.Vis
  */
 public class JMSAccessorService implements Service {
     public static final String CONF_PREFIX = Service.CONF_PREFIX + "JMSAccessorService.";
-    public static final String JMS_CONNECTIONS_PROPERTIES = CONF_PREFIX + "connections";
     public static final String JMS_CONNECTION_CONTEXT_IMPL = CONF_PREFIX + "connectioncontext.impl";
     public static final String SESSION_OPTS = CONF_PREFIX + "jms.sessionOpts";
     public static final String CONF_RETRY_INITIAL_DELAY = CONF_PREFIX + "retry.initial.delay";
     public static final String CONF_RETRY_MULTIPLIER = CONF_PREFIX + "retry.multiplier";
     public static final String CONF_RETRY_MAX_ATTEMPTS = CONF_PREFIX + "retry.max.attempts";
-    public static final String DEFAULT_SERVER_ENDPOINT = "default";
     private static XLog LOG;
 
-    private String defaultConnectString = null;
     private Configuration conf;
     private int sessionOpts;
     private int retryInitialDelay;
     private int retryMultiplier;
     private int retryMaxAttempts;
-    private List<MappingRule> mappingRules = null;
 
     /**
-     * Map of publisher(host:port) to JMS connect string
+     * Map of JMS connection info to established JMS Connection
      */
-    private Map<String, String> publisherConnectStringMap = new HashMap<String, String>();
+    private ConcurrentMap<JMSConnectionInfo, ConnectionContext> connectionMap = new ConcurrentHashMap<JMSConnectionInfo, ConnectionContext>();
     /**
-     * Map of JMS connect string to established JMS Connection
+     * Map of JMS connection info to topic names to MessageReceiver
      */
-    private ConcurrentMap<String, ConnectionContext> connectionMap = new ConcurrentHashMap<String, ConnectionContext>();
-    /**
-     * Map of publisher(host:port) to topic names to MessageReceiver
-     */
-    private ConcurrentMap<String, Map<String, MessageReceiver>> receiversMap = new ConcurrentHashMap<String, Map<String, MessageReceiver>>();
-
-    /**
-     * Map of JMS connect string to last connection attempt time
-     */
-    private Map<String, ConnectionAttempt> retryConnectionsMap = new HashMap<String, ConnectionAttempt>();
+    private ConcurrentMap<JMSConnectionInfo, Map<String, MessageReceiver>> receiversMap = new ConcurrentHashMap<JMSConnectionInfo, Map<String, MessageReceiver>>();
 
     /**
-     * Map of publisher(host:port) to topic names that need to be registered for listening to the
-     * MessageHandler that process message of the topic.
+     * Map of JMS connection info to connection retry information
      */
-    private Map<String, Map<String, MessageHandler>> retryTopicsMap = new HashMap<String, Map<String, MessageHandler>>();
+    private Map<JMSConnectionInfo, ConnectionRetryInfo> retryConnectionsMap = new HashMap<JMSConnectionInfo, ConnectionRetryInfo>();
 
     @Override
     public void init(Services services) throws ServiceException {
         LOG = XLog.getLog(getClass());
         conf = services.getConf();
-        initializeMappingRules();
         sessionOpts = conf.getInt(SESSION_OPTS, Session.AUTO_ACKNOWLEDGE);
         retryInitialDelay = conf.getInt(CONF_RETRY_INITIAL_DELAY, 60); // initial delay in seconds
         retryMultiplier = conf.getInt(CONF_RETRY_MULTIPLIER, 2);
         retryMaxAttempts = conf.getInt(CONF_RETRY_MAX_ATTEMPTS, 10);
     }
 
-    protected void initializeMappingRules() {
-        String connectionString = conf.get(JMS_CONNECTIONS_PROPERTIES);
-        if (connectionString != null) {
-            String[] connections = connectionString.split("\\s*,\\s*");
-            mappingRules = new ArrayList<MappingRule>(connections.length);
-            for (String connection : connections) {
-                String[] values = connection.split("=", 2);
-                String key = values[0].replaceAll("^\\s+|\\s+$", "");
-                String value = values[1].replaceAll("^\\s+|\\s+$", "");
-                if (key.equals("default")) {
-                    defaultConnectString = value;
-                }
-                else {
-                    mappingRules.add(new MappingRule(key, value));
-                }
-            }
-        }
-        else {
-            LOG.warn("No JMS connection defined");
-        }
-    }
-
     /**
-     * Determine whether a given source URI publishes JMS messages
+     * Register for notifications on a JMS topic.
      *
-     * @param sourceURI URI of the publisher
-     * @return true if we have JMS mapping for the source URI, else false
-     */
-    public boolean isKnownPublisher(URI sourceURI) {
-        if (publisherConnectStringMap.containsKey(sourceURI.getAuthority())) {
-            return true;
-        }
-        else {
-            return getJMSServerConnectString(sourceURI.getAuthority()) != null;
-        }
-    }
-
-    /**
-     * Register for notifications on a JMS topic from the specified publisher.
-     *
-     * @param publisherURI URI of the publisher of JMS messages. Used to determine the JMS
-     *        connection end point.
+     * @param connInfo Information to connect to a JMS compliant messaging service.
      * @param topic Topic in which the JMS messages are published
      * @param msgHandler Handler which will process the messages received on the topic
      */
-    public void registerForNotification(URI publisherURI, String topic, MessageHandler msgHandler) {
-        String publisherAuthority = publisherURI.getAuthority();
-        if (isTopicInRetryList(publisherAuthority, topic)) {
+    public void registerForNotification(JMSConnectionInfo connInfo, String topic, MessageHandler msgHandler) {
+        if (isTopicInRetryList(connInfo, topic)) {
             return;
         }
-        if (isConnectionInRetryList(publisherAuthority)) {
-            queueTopicForRetry(publisherAuthority, topic, msgHandler);
+        if (isConnectionInRetryList(connInfo)) {
+            queueTopicForRetry(connInfo, topic, msgHandler);
             return;
         }
-        Map<String, MessageReceiver> topicsMap = getReceiversTopicsMap(publisherAuthority);
+        Map<String, MessageReceiver> topicsMap = getReceiversTopicsMap(connInfo);
         if (topicsMap.containsKey(topic)) {
             return;
         }
         synchronized (topicsMap) {
             if (!topicsMap.containsKey(topic)) {
-                String jmsConnectString = getJMSServerConnectString(publisherAuthority);
-                ConnectionContext connCtxt = createConnectionContext(jmsConnectString);
+                ConnectionContext connCtxt = createConnectionContext(connInfo);
                 if (connCtxt == null) {
-                    queueTopicForRetry(publisherAuthority, topic, msgHandler);
-                    queueConnectionForRetry(jmsConnectString);
+                    queueTopicForRetry(connInfo, topic, msgHandler);
                     return;
                 }
-                MessageReceiver receiver = registerForTopic(connCtxt, publisherAuthority, topic, msgHandler);
+                MessageReceiver receiver = registerForTopic(connInfo, connCtxt, topic, msgHandler);
                 if (receiver == null) {
-                    queueTopicForRetry(publisherAuthority, topic, msgHandler);
-                    queueConnectionForRetry(jmsConnectString);
+                    queueTopicForRetry(connInfo, topic, msgHandler);
                 }
                 else {
-                    LOG.info("Registered a listener for topic {0} from publisher {1}", topic, publisherAuthority);
+                    LOG.info("Registered a listener for topic {0} on {1}", topic, connInfo);
                     topicsMap.put(topic, receiver);
                 }
             }
@@ -186,18 +127,17 @@ public class JMSAccessorService implemen
     /**
      * Unregister from listening to JMS messages on a topic.
      *
-     * @param publisherAuthority host:port of the publisher of JMS messages. Used to determine the
-     *        JMS connection end point.
+     * @param connInfo Information to connect to the JMS compliant messaging service.
      * @param topic Topic in which the JMS messages are published
      */
-    public void unregisterFromNotification(String publisherAuthority, String topic) {
-        LOG.info("Unregistering JMS listener. Clossing session for {0} and topic {1}", publisherAuthority, topic);
+    public void unregisterFromNotification(JMSConnectionInfo connInfo, String topic) {
+        LOG.info("Unregistering JMS listener. Clossing session for {0} and topic {1}", connInfo, topic);
 
-        if (isTopicInRetryList(publisherAuthority, topic)) {
-            removeTopicFromRetryList(publisherAuthority, topic);
+        if (isTopicInRetryList(connInfo, topic)) {
+            removeTopicFromRetryList(connInfo, topic);
         }
         else {
-            Map<String, MessageReceiver> topicsMap = receiversMap.get(publisherAuthority);
+            Map<String, MessageReceiver> topicsMap = receiversMap.get(connInfo);
             MessageReceiver receiver = topicsMap.remove(topic);
             if (receiver != null) {
                 try {
@@ -209,20 +149,20 @@ public class JMSAccessorService implemen
             }
             else {
                 LOG.warn(
-                        "Received request to unregister from topic [{0}] from publisher [{1}], but no matching session.",
-                        topic, publisherAuthority);
+                        "Received request to unregister from topic [{0}] on [{1}], but no matching session.",
+                        topic, connInfo);
             }
             if (topicsMap.isEmpty()) {
-                receiversMap.remove(publisherAuthority);
+                receiversMap.remove(connInfo);
             }
         }
     }
 
-    private Map<String, MessageReceiver> getReceiversTopicsMap(String publisherAuthority) {
-        Map<String, MessageReceiver> topicsMap = receiversMap.get(publisherAuthority);
+    private Map<String, MessageReceiver> getReceiversTopicsMap(JMSConnectionInfo connInfo) {
+        Map<String, MessageReceiver> topicsMap = receiversMap.get(connInfo);
         if (topicsMap == null) {
             topicsMap = new HashMap<String, MessageReceiver>();
-            Map<String, MessageReceiver> exists = receiversMap.putIfAbsent(publisherAuthority, topicsMap);
+            Map<String, MessageReceiver> exists = receiversMap.putIfAbsent(connInfo, topicsMap);
             if (exists != null) {
                 topicsMap = exists;
             }
@@ -233,66 +173,68 @@ public class JMSAccessorService implemen
     /**
      * Determine if currently listening to JMS messages on a topic.
      *
-     * @param publisherAuthority host:port of the publisher of JMS messages. Used to determine the
-     *        JMS connection end point.
+     * @param connInfo Information to connect to the JMS compliant messaging service.
      * @param topic Topic in which the JMS messages are published
      * @return true if listening to the topic, else false
      */
     @VisibleForTesting
-    boolean isListeningToTopic(String publisherAuthority, String topic) {
-        Map<String, MessageReceiver> topicsMap = receiversMap.get(publisherAuthority);
+    boolean isListeningToTopic(JMSConnectionInfo connInfo, String topic) {
+        Map<String, MessageReceiver> topicsMap = receiversMap.get(connInfo);
         return (topicsMap != null && topicsMap.containsKey(topic));
     }
 
     @VisibleForTesting
-    boolean isConnectionInRetryList(String publisherAuthority) {
-        String jmsConnectString = getJMSServerConnectString(publisherAuthority);
-        return retryConnectionsMap.containsKey(jmsConnectString);
+    boolean isConnectionInRetryList(JMSConnectionInfo connInfo) {
+        return retryConnectionsMap.containsKey(connInfo);
     }
 
     @VisibleForTesting
-    boolean isTopicInRetryList(String publisherAuthority, String topic) {
-        Map<String, MessageHandler> topicsMap = retryTopicsMap.get(publisherAuthority);
-        return topicsMap != null && topicsMap.containsKey(topic);
+    boolean isTopicInRetryList(JMSConnectionInfo connInfo, String topic) {
+        ConnectionRetryInfo connRetryInfo = retryConnectionsMap.get(connInfo);
+        if (connRetryInfo == null) {
+            return false;
+        }
+        else {
+            Map<String, MessageHandler> topicsMap = connRetryInfo.getTopicsToRetry();
+            return topicsMap.containsKey(topic);
+        }
     }
 
     // For unit testing
     @VisibleForTesting
-    int getNumConnectionAttempts(String publisherAuthority) {
-        String jmsConnectString = getJMSServerConnectString(publisherAuthority);
-        return retryConnectionsMap.get(jmsConnectString).getNumAttempt();
+    int getNumConnectionAttempts(JMSConnectionInfo connInfo) {
+        return retryConnectionsMap.get(connInfo).getNumAttempt();
     }
 
-    private void queueConnectionForRetry(String jmsConnectString) {
-        if (!retryConnectionsMap.containsKey(jmsConnectString)) {
-            LOG.info("Queueing connection {0} for retry", jmsConnectString);
-            retryConnectionsMap.put(jmsConnectString, new ConnectionAttempt(0, retryInitialDelay));
-            scheduleRetry(jmsConnectString, retryInitialDelay);
+    private ConnectionRetryInfo queueConnectionForRetry(JMSConnectionInfo connInfo) {
+        ConnectionRetryInfo connRetryInfo = retryConnectionsMap.get(connInfo);
+        if (connRetryInfo == null) {
+            LOG.info("Queueing connection {0} for retry", connInfo);
+            connRetryInfo = new ConnectionRetryInfo(0, retryInitialDelay);
+            retryConnectionsMap.put(connInfo, connRetryInfo);
+            scheduleRetry(connInfo, retryInitialDelay);
         }
+        return connRetryInfo;
     }
 
-    private void queueTopicForRetry(String publisherAuthority, String topic, MessageHandler msgHandler) {
-        LOG.info("Queueing topic {0} from publisher {1} for retry", topic, publisherAuthority);
-        Map<String, MessageHandler> topicsMap = retryTopicsMap.get(publisherAuthority);
-        if (topicsMap == null) {
-            topicsMap = new HashMap<String, MessageHandler>();
-            retryTopicsMap.put(publisherAuthority, topicsMap);
-        }
+    private ConnectionRetryInfo queueTopicForRetry(JMSConnectionInfo connInfo, String topic, MessageHandler msgHandler) {
+        LOG.info("Queueing topic {0} for {1} for retry", topic, connInfo);
+        ConnectionRetryInfo connRetryInfo = queueConnectionForRetry(connInfo);
+        Map<String, MessageHandler> topicsMap = connRetryInfo.getTopicsToRetry();
         topicsMap.put(topic, msgHandler);
+        return connRetryInfo;
     }
 
-    private void removeTopicFromRetryList(String publisherAuthority, String topic) {
-        LOG.info("Removing topic {0} from publisher {1} from retry list", topic, publisherAuthority);
-        Map<String, MessageHandler> topicsMap = retryTopicsMap.get(publisherAuthority);
-        if (topicsMap != null) {
+    private void removeTopicFromRetryList(JMSConnectionInfo connInfo, String topic) {
+        LOG.info("Removing topic {0} from {1} from retry list", topic, connInfo);
+        ConnectionRetryInfo connRetryInfo = retryConnectionsMap.get(connInfo);
+        if (connRetryInfo != null) {
+            Map<String, MessageHandler> topicsMap = connRetryInfo.getTopicsToRetry();
             topicsMap.remove(topic);
-            if (topicsMap.isEmpty()) {
-                retryTopicsMap.remove(publisherAuthority);
-            }
         }
     }
 
-    private MessageReceiver registerForTopic(ConnectionContext connCtxt, String publisherAuthority, String topic,
+    private MessageReceiver registerForTopic(JMSConnectionInfo connInfo, ConnectionContext connCtxt, String topic,
             MessageHandler msgHandler) {
         try {
             Session session = connCtxt.createSession(sessionOpts);
@@ -302,69 +244,27 @@ public class JMSAccessorService implemen
             return receiver;
         }
         catch (JMSException e) {
-            LOG.warn("Error while registering to listen to topic {0} from publisher {1}", topic, publisherAuthority, e);
+            LOG.warn("Error while registering to listen to topic {0} from {1}", topic, connInfo, e);
             return null;
         }
     }
 
-    protected ConnectionContext createConnectionContext(String jmsConnectString) {
-        ConnectionContext connCtxt = connectionMap.get(jmsConnectString);
+    protected ConnectionContext createConnectionContext(JMSConnectionInfo connInfo) {
+        ConnectionContext connCtxt = connectionMap.get(connInfo);
         if (connCtxt == null) {
-            Properties props = getJMSPropsFromConf(jmsConnectString);
-            if (props != null) {
-                try {
-                    connCtxt = getConnectionContextImpl();
-                    connCtxt.createConnection(connCtxt.createConnectionFactory(props));
-                    connCtxt.setExceptionListener(new JMSExceptionListener(jmsConnectString, connCtxt));
-                    connectionMap.put(jmsConnectString, connCtxt);
-                    LOG.info("Connection established to JMS Server for [{0}]", jmsConnectString);
-                }
-                catch (Exception e) {
-                    LOG.warn("Exception while establishing connection to JMS Server for [{0}]", jmsConnectString, e);
-                    return null;
-                }
-            }
-            else {
-                LOG.warn("JMS connection string is not configured properly - [{0}]", jmsConnectString);
+            try {
+                connCtxt = getConnectionContextImpl();
+                connCtxt.createConnection(connInfo.getJNDIProperties());
+                connCtxt.setExceptionListener(new JMSExceptionListener(connInfo, connCtxt));
+                connectionMap.put(connInfo, connCtxt);
+                LOG.info("Connection established to JMS Server for [{0}]", connInfo);
             }
-        }
-        return connCtxt;
-    }
-
-    protected String getJMSServerConnectString(String publisherAuthority) {
-        if (publisherConnectStringMap.containsKey(publisherAuthority)) {
-            return publisherConnectStringMap.get(publisherAuthority);
-        }
-        else {
-            for (MappingRule mr : mappingRules) {
-                String jmsConnectString = mr.applyRule(publisherAuthority);
-                if (jmsConnectString != null) {
-                    publisherConnectStringMap.put(publisherAuthority, jmsConnectString);
-                    return jmsConnectString;
-                }
-            }
-            publisherConnectStringMap.put(publisherAuthority, defaultConnectString);
-            return defaultConnectString;
-        }
-    }
-
-    protected Properties getJMSPropsFromConf(String jmsConnectString) {
-        Properties props = new Properties();
-        String[] propArr = jmsConnectString.split(";");
-        for (String pair : propArr) {
-            String[] kV = pair.split("#");
-            if (kV.length > 1) {
-                props.put(kV[0].trim(), kV[1].trim());
-            }
-            else {
-                LOG.warn("Unformatted properties. Expected key#value : " + pair);
+            catch (Exception e) {
+                LOG.warn("Exception while establishing connection to JMS Server for [{0}]", connInfo, e);
                 return null;
             }
         }
-        if (props.isEmpty()) {
-            return null;
-        }
-        return props;
+        return connCtxt;
     }
 
     private ConnectionContext getConnectionContextImpl() {
@@ -380,8 +280,8 @@ public class JMSAccessorService implemen
     }
 
     @VisibleForTesting
-    MessageReceiver getMessageReceiver(String publisher, String topic) {
-        Map<String, MessageReceiver> topicsMap = receiversMap.get(publisher);
+    MessageReceiver getMessageReceiver(JMSConnectionInfo connInfo, String topic) {
+        Map<String, MessageReceiver> topicsMap = receiversMap.get(connInfo);
         if (topicsMap != null) {
             return topicsMap.get(topic);
         }
@@ -405,8 +305,8 @@ public class JMSAccessorService implemen
         receiversMap.clear();
 
         LOG.info("Closing JMS connections");
-        for (Entry<String, ConnectionContext> entry : connectionMap.entrySet()) {
-            entry.getValue().close();
+        for (ConnectionContext conn : connectionMap.values()) {
+            conn.close();
         }
         connectionMap.clear();
     }
@@ -417,117 +317,102 @@ public class JMSAccessorService implemen
     }
 
     /**
-     * Reestablish connection for the given JMS connect string
-     * @param jmsConnectString JMS connect string
+     * Reestablish connection for the given JMS connect information
+     * @param connInfo JMS connection info
      */
-    public void reestablishConnection(String jmsConnectString) {
-        connectionMap.remove(jmsConnectString);
-        queueConnectionForRetry(jmsConnectString);
-        for (Entry<String, String> entry : publisherConnectStringMap.entrySet()) {
-            if (entry.getValue().equals(jmsConnectString)) {
-                String publisherAuthority = entry.getKey();
-                Map<String, MessageReceiver> topicsMap = receiversMap.remove(publisherAuthority);
-                if (topicsMap != null) {
-                    Map<String, MessageHandler> retryTopics = retryTopicsMap.get(publisherAuthority);
-                    if (retryTopics == null) {
-                        retryTopics = new HashMap<String, MessageHandler>();
-                        retryTopicsMap.put(publisherAuthority, retryTopics);
-                    }
-                    for (Entry<String, MessageReceiver> topicEntry : topicsMap.entrySet()) {
-                        MessageReceiver receiver = topicEntry.getValue();
-                        retryTopics.put(topicEntry.getKey(), receiver.getMessageHandler());
-                        try {
-                            receiver.getSession().close();
-                        }
-                        catch (JMSException e) {
-                            LOG.warn("Unable to close session " + receiver.getSession(), e);
-                        }
-                    }
+    public void reestablishConnection(JMSConnectionInfo connInfo) {
+        connectionMap.remove(connInfo);
+        // Queue the connection and topics for retry
+        ConnectionRetryInfo connRetryInfo = queueConnectionForRetry(connInfo);
+        Map<String, MessageReceiver> listeningTopicsMap = receiversMap.remove(connInfo);
+        if (listeningTopicsMap != null) {
+            Map<String, MessageHandler> retryTopicsMap = connRetryInfo.getTopicsToRetry();
+            for (Entry<String, MessageReceiver> topicEntry : listeningTopicsMap.entrySet()) {
+                MessageReceiver receiver = topicEntry.getValue();
+                retryTopicsMap.put(topicEntry.getKey(), receiver.getMessageHandler());
+                try {
+                    receiver.getSession().close();
+                }
+                catch (JMSException e) {
+                    LOG.warn("Unable to close session " + receiver.getSession(), e);
                 }
             }
         }
     }
 
-    private void scheduleRetry(String jmsConnectString, long delay) {
-        LOG.info("Scheduling retry of connection [{0}] in [{1}] seconds", jmsConnectString, delay);
-        JMSRetryRunnable runnable = new JMSRetryRunnable(jmsConnectString);
+    private void scheduleRetry(JMSConnectionInfo connInfo, long delay) {
+        LOG.info("Scheduling retry of connection [{0}] in [{1}] seconds", connInfo, delay);
+        JMSRetryRunnable runnable = new JMSRetryRunnable(connInfo);
         SchedulerService scheduler = Services.get().get(SchedulerService.class);
         scheduler.schedule(runnable, delay, SchedulerService.Unit.SEC);
     }
 
     @VisibleForTesting
-    boolean retryConnection(String jmsConnectString) {
-        ConnectionAttempt attempt = retryConnectionsMap.get(jmsConnectString);
-        if (attempt.getNumAttempt() >= retryMaxAttempts) {
-            LOG.info("Not attempting jms connection [{0}] again. Reached max attempts [{1}]", jmsConnectString,
-                    retryMaxAttempts);
+    boolean retryConnection(JMSConnectionInfo connInfo) {
+        ConnectionRetryInfo connRetryInfo = retryConnectionsMap.get(connInfo);
+        if (connRetryInfo.getNumAttempt() >= retryMaxAttempts) {
+            LOG.info("Not attempting connection [{0}] again. Reached max attempts [{1}]", connInfo, retryMaxAttempts);
             return false;
         }
-        LOG.info("Attempting retry of connection [{0}]", jmsConnectString);
-        attempt.setNumAttempt(attempt.getNumAttempt() + 1);
-        attempt.setNextDelay(attempt.getNextDelay() * retryMultiplier);
-        ConnectionContext connCtxt = createConnectionContext(jmsConnectString);
-        boolean removeRetryConnection = true;
+        LOG.info("Attempting retry of connection [{0}]", connInfo);
+        connRetryInfo.setNumAttempt(connRetryInfo.getNumAttempt() + 1);
+        connRetryInfo.setNextDelay(connRetryInfo.getNextDelay() * retryMultiplier);
+        ConnectionContext connCtxt = createConnectionContext(connInfo);
+        boolean shouldRetry = false;
         if (connCtxt == null) {
-            removeRetryConnection = false;
+            shouldRetry = true;
         }
         else {
-            for (Entry<String, String> entry : publisherConnectStringMap.entrySet()) {
-                if (entry.getValue().equals(jmsConnectString)) {
-                    String publisherAuthority = entry.getKey();
-                    Map<String, MessageHandler> retryTopics = retryTopicsMap.get(publisherAuthority);
-                    if (retryTopics != null) {
-                        List<String> topicsToRemoveList = new ArrayList<String>();
-                        // For each topic in the retry list, try to register for the topic
-                        for (Entry<String, MessageHandler> topicEntry : retryTopics.entrySet()) {
-                            String topic = topicEntry.getKey();
-                            Map<String, MessageReceiver> topicsMap = getReceiversTopicsMap(publisherAuthority);
-                            if (topicsMap.containsKey(topic)) {
-                                continue;
-                            }
-                            synchronized (topicsMap) {
-                                if (!topicsMap.containsKey(topic)) {
-                                    MessageReceiver receiver = registerForTopic(connCtxt, publisherAuthority, topic,
-                                            topicEntry.getValue());
-                                    if (receiver == null) {
-                                        queueTopicForRetry(publisherAuthority, topic, topicEntry.getValue());
-                                        removeRetryConnection = false;
-                                    }
-                                    else {
-                                        topicsMap.put(topic, receiver);
-                                        topicsToRemoveList.add(topic);
-                                        LOG.info("Registered a listener for topic {0} from publisher {1}", topic,
-                                                publisherAuthority);
-                                    }
-                                }
-                            }
+            Map<String, MessageHandler> retryTopicsMap = connRetryInfo.getTopicsToRetry();
+            Map<String, MessageReceiver> listeningTopicsMap = getReceiversTopicsMap(connInfo);
+            List<String> topicsToRemoveList = new ArrayList<String>();
+            // For each topic in the retry list, try to register the MessageHandler for that topic
+            for (Entry<String, MessageHandler> topicEntry : retryTopicsMap.entrySet()) {
+                String topic = topicEntry.getKey();
+                if (listeningTopicsMap.containsKey(topic)) {
+                    continue;
+                }
+                synchronized (listeningTopicsMap) {
+                    if (!listeningTopicsMap.containsKey(topic)) {
+                        MessageReceiver receiver = registerForTopic(connInfo, connCtxt, topic, topicEntry.getValue());
+                        if (receiver == null) {
+                            LOG.warn("Failed to register a listener for topic {0} on {1}", topic, connInfo);
                         }
-                        for (String topic : topicsToRemoveList) {
-                            retryTopics.remove(topic);
+                        else {
+                            listeningTopicsMap.put(topic, receiver);
+                            topicsToRemoveList.add(topic);
+                            LOG.info("Registered a listener for topic {0} on {1}", topic, connInfo);
                         }
                     }
-                    if (retryTopics.isEmpty()) {
-                        retryTopicsMap.remove(publisherAuthority);
-                    }
                 }
             }
+            for (String topic : topicsToRemoveList) {
+                retryTopicsMap.remove(topic);
+            }
+            if (retryTopicsMap.isEmpty()) {
+                shouldRetry = false;
+            }
         }
-        if (removeRetryConnection) {
-            retryConnectionsMap.remove(jmsConnectString);
+
+        if (shouldRetry) {
+            scheduleRetry(connInfo, connRetryInfo.getNextDelay());
         }
         else {
-            scheduleRetry(jmsConnectString, attempt.getNextDelay());
+            retryConnectionsMap.remove(connInfo);
         }
         return true;
     }
 
-    private static class ConnectionAttempt {
+    private static class ConnectionRetryInfo {
         private int numAttempt;
         private int nextDelay;
+        private boolean retryConnection;
+        private Map<String, MessageHandler> retryTopicsMap;
 
-        public ConnectionAttempt(int numAttempt, int nextDelay) {
+        public ConnectionRetryInfo(int numAttempt, int nextDelay) {
             this.numAttempt = numAttempt;
             this.nextDelay = nextDelay;
+            this.retryTopicsMap = new HashMap<String, MessageHandler>();
         }
 
         public int getNumAttempt() {
@@ -546,23 +431,35 @@ public class JMSAccessorService implemen
             this.nextDelay = nextDelay;
         }
 
+        public Map<String, MessageHandler> getTopicsToRetry() {
+            return retryTopicsMap;
+        }
+
+        public boolean shouldRetryConnection() {
+            return retryConnection;
+        }
+
+        public void setRetryConnection(boolean retryConnection) {
+            this.retryConnection = retryConnection;
+        }
+
     }
 
     public class JMSRetryRunnable implements Runnable {
 
-        private String jmsConnectString;
+        private JMSConnectionInfo connInfo;
 
-        public JMSRetryRunnable(String jmsConnectString) {
-            this.jmsConnectString = jmsConnectString;
+        public JMSRetryRunnable(JMSConnectionInfo connInfo) {
+            this.connInfo = connInfo;
         }
 
-        public String getJmsConnectString() {
-            return jmsConnectString;
+        public JMSConnectionInfo getJMSConnectionInfo() {
+            return connInfo;
         }
 
         @Override
         public void run() {
-            retryConnection(jmsConnectString);
+            retryConnection(connInfo);
         }
 
     }

Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/MetaDataAccessorException.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/MetaDataAccessorException.java?rev=1443694&r1=1443693&r2=1443694&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/MetaDataAccessorException.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/MetaDataAccessorException.java Thu Feb  7 20:10:55 2013
@@ -19,8 +19,9 @@ package org.apache.oozie.service;
 
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.XException;
+import org.apache.oozie.dependency.URIHandlerException;
 
-public class MetaDataAccessorException extends URIAccessorException {
+public class MetaDataAccessorException extends URIHandlerException {
 
     /**
      * Create an MetaDataAccessor Exception exception from a XException.

Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java?rev=1443694&r1=1443693&r2=1443694&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java Thu Feb  7 20:10:55 2013
@@ -26,7 +26,6 @@ import org.apache.oozie.command.coord.Co
 import org.apache.oozie.dependency.cache.HCatDependencyCache;
 import org.apache.oozie.dependency.cache.SimpleHCatDependencyCache;
 import org.apache.oozie.util.HCatURI;
-import org.apache.oozie.util.XCallable;
 import org.apache.oozie.util.XLog;
 
 /**
@@ -110,7 +109,14 @@ public class PartitionDependencyManagerS
                 partitions);
         if (actionsWithAvailableDep != null) {
             for (String actionID : actionsWithAvailableDep) {
-                queueCallable(new CoordActionUpdatePushMissingDependency(actionID), 100);
+                boolean ret = Services.get().get(CallableQueueService.class)
+                        .queue(new CoordActionUpdatePushMissingDependency(actionID), 100);
+                if (ret == false) {
+                    XLog.getLog(getClass()).warn(
+                            "Unable to queue the callable commands for PartitionDependencyManagerService for actionID "
+                                    + actionID + ".Most possibly command queue is full. Queue size is :"
+                                    + Services.get().get(CallableQueueService.class).queueSize());
+                }
             }
         }
     }
@@ -136,14 +142,4 @@ public class PartitionDependencyManagerS
         dependencyCache.removeAvailableDependencyURIs(actionID, dependencyURIs);
     }
 
-    private void queueCallable(XCallable<?> callable, int delay) {
-        boolean ret = Services.get().get(CallableQueueService.class).queue(callable, delay);
-        if (ret == false) {
-            XLog.getLog(getClass()).warn(
-                    "Unable to queue the callable commands for PartitionDependencyManagerService. "
-                            + "Most possibly command queue is full. Queue size is :"
-                            + Services.get().get(CallableQueueService.class).queueSize());
-        }
-    }
-
 }

Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/RecoveryService.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/RecoveryService.java?rev=1443694&r1=1443693&r2=1443694&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/RecoveryService.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/RecoveryService.java Thu Feb  7 20:10:55 2013
@@ -119,10 +119,6 @@ public class RecoveryService implements 
         private List<XCallable<?>> delayedCallables;
         private StringBuilder msg = null;
         private JPAService jpaService = null;
-        URIHandlerService uriService = Services.get().get(URIHandlerService.class);
-        JMSAccessorService jmsService = Services.get().get(JMSAccessorService.class);
-        PartitionDependencyManagerService pdms = Services.get().get(PartitionDependencyManagerService.class);
-
 
         public RecoveryRunnable(long olderThan, long coordOlderThan,long bundleOlderThan) {
             this.olderThan = olderThan;