You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by vi...@apache.org on 2013/02/07 21:10:57 UTC
svn commit: r1443694 [2/3] - in /oozie/branches/hcat-intre: ./
core/src/main/java/org/apache/oozie/action/hadoop/
core/src/main/java/org/apache/oozie/client/rest/
core/src/main/java/org/apache/oozie/command/coord/
core/src/main/java/org/apache/oozie/co...
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/URIHandler.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/URIHandler.java?rev=1443694&r1=1443693&r2=1443694&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/URIHandler.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/URIHandler.java Thu Feb 7 20:10:55 2013
@@ -18,57 +18,83 @@
package org.apache.oozie.dependency;
import java.net.URI;
-import java.util.Collection;
+import java.util.List;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
-import org.apache.oozie.service.URIAccessorException;
-import org.jdom.Element;
+import org.apache.oozie.action.hadoop.LauncherURIHandler;
-public abstract class URIHandler {
+public interface URIHandler {
- public abstract void init(Configuration conf, boolean isFrontEnd);
+ /**
+ * Type of the dependency. PULL dependencies are those whose availability is determined by
+ * polling and PUSH dependencies are those whose availability is known from notifications
+ */
+ public enum DependencyType {
+ PULL,
+ PUSH;
+ }
+
+ /**
+ * Initialize the URIHandler
+ *
+ * @param conf Configuration for initialization
+ */
+ public void init(Configuration conf);
/**
* Get the list of uri schemes supported by this URIHandler
*
* @return supported list of uri schemes
*/
- public abstract Set<String> getSupportedSchemes();
+ public Set<String> getSupportedSchemes();
- /** Get the list of dependent classes to ship to the hadoop launcher job for prepare actions
- * @return dependent classes to ship to the hadoop job
+ /**
+ * Get the URIHandler that will be used to handle the supported schemes in launcher
+ *
+ * @return LauncherURIHandler that handles URI in the launcher
+ */
+ public Class<? extends LauncherURIHandler> getLauncherURIHandlerClass();
+
+ /**
+ * Get list of classes to ship to launcher for LauncherURIHandler
+ *
+ * @return list of classes to ship to launcher
*/
- public abstract Collection<Class<?>> getClassesToShip();
+ public List<Class<?>> getClassesForLauncher();
/**
- * Get the type of dependency type of the URI. When the availability of the
+ * Get the dependency type of the URI. When the availability of the
* URI is to be determined by polling the type is DependencyType.PULL, and
* when the availability is received through notifications from a external
* entity like a JMS server the type is DependencyType.PUSH
*
* @return dependency type of URI
*/
- public abstract DependencyType getDependencyType(URI uri) throws URIAccessorException;
+ public DependencyType getDependencyType(URI uri) throws URIHandlerException;
/**
* Register for notifications in case of a push dependency
*
- * @param uri The URI to check for availability
- * @param actionID The id of action which depends on the availability of the
- * uri.
+ * @param uri The URI to check for availability
+ * @param conf Configuration to access the URI
+ * @param user name of the user the URI should be accessed as
+ * @param actionID The id of action which depends on the availability of the uri
+ *
+ * @throws URIHandlerException
*/
- public abstract void registerForNotification(URI uri, Configuration conf, String user, String actionID)
- throws URIAccessorException;
+ public void registerForNotification(URI uri, Configuration conf, String user, String actionID)
+ throws URIHandlerException;
/**
* Unregister from notifications in case of a push dependency
+ *
* @param uri The URI to be removed from missing dependency
* @param actionID The id of action which was dependent on the uri.
*
- * @throws URIAccessorException
+ * @throws URIHandlerException
*/
- public abstract boolean unregisterFromNotification(URI uri, String actionID);
+ public boolean unregisterFromNotification(URI uri, String actionID);
/**
* Get the URIContext which can be used to access URI of the same scheme and
@@ -80,24 +106,9 @@ public abstract class URIHandler {
*
* @return Context to access URIs with same scheme and host
*
- * @throws URIAccessorException
+ * @throws URIHandlerException
*/
- public abstract URIContext getURIContext(URI uri, Configuration conf, String user) throws URIAccessorException;
-
- /**
- * Create the resource identified by the URI
- *
- * @param uri URI of the dependency
- * @param conf Configuration to access the URI
- * @param user name of the user the URI should be accessed as. If null the
- * logged in user is used.
- *
- * @return <code>true</code> if the URI did not exist and was successfully
- * created; <code>false</code> if the URI already existed
- *
- * @throws URIAccessorException
- */
- public abstract boolean create(URI uri, Configuration conf, String user) throws URIAccessorException;
+ public URIContext getURIContext(URI uri, Configuration conf, String user) throws URIHandlerException;
/**
* Check if the dependency identified by the URI is available
@@ -108,9 +119,9 @@ public abstract class URIHandler {
* @return <code>true</code> if the URI exists; <code>false</code> if the
* URI does not exist
*
- * @throws URIAccessorException
+ * @throws URIHandlerException
*/
- public abstract boolean exists(URI uri, URIContext uriContext) throws URIAccessorException;
+ public boolean exists(URI uri, URIContext uriContext) throws URIHandlerException;
/**
* Check if the dependency identified by the URI is available
@@ -123,23 +134,9 @@ public abstract class URIHandler {
* @return <code>true</code> if the URI exists; <code>false</code> if the
* URI does not exist
*
- * @throws URIAccessorException
- */
- public abstract boolean exists(URI uri, Configuration conf, String user) throws URIAccessorException;
-
- /**
- * Delete the resource identified by the URI
- *
- * @param uri URI of the dependency
- * @param conf Configuration to access the URI
- * @param user name of the user the URI should be accessed as. If null the
- * logged in user is used.
- *
- * @return <code>true</code> if the URI exists and was successfully deleted;
- * <code>false</code> if the URI does not exist
- * @throws URIAccessorException
+ * @throws URIHandlerException
*/
- public abstract boolean delete(URI uri, Configuration conf, String user) throws URIAccessorException;
+ public boolean exists(URI uri, Configuration conf, String user) throws URIHandlerException;
/**
* Get the URI based on the done flag
@@ -149,30 +146,21 @@ public abstract class URIHandler {
*
* @return the final URI with the doneFlag incorporated
*
- * @throws URIAccessorException
+ * @throws URIHandlerException
*/
- public abstract String getURIWithDoneFlag(String uri, Element doneFlagElement) throws URIAccessorException;
-
- /**
- * Get the URI based on the done flag
- *
- * @param uri URI of the dependency
- * @param doneFlag flag that determines URI availability
- *
- * @return the final URI with the doneFlag incorporated
- *
- * @throws URIAccessorException
- */
- public abstract String getURIWithDoneFlag(String uri, String doneFlag) throws URIAccessorException;
+ public String getURIWithDoneFlag(String uri, String doneFlag) throws URIHandlerException;
/**
* Check whether the URI is valid or not
* @param uri
* @return
- * @throws URIAccessorException
+ * @throws URIHandlerException
*/
- public abstract void validate(String uri) throws URIAccessorException;
+ public void validate(String uri) throws URIHandlerException;
- public abstract void destroy();
+ /**
+ * Destroy the URIHandler
+ */
+ public void destroy();
}
Added: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/URIHandlerException.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/URIHandlerException.java?rev=1443694&view=auto
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/URIHandlerException.java (added)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/URIHandlerException.java Thu Feb 7 20:10:55 2013
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.dependency;
+
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.XException;
+
+public class URIHandlerException extends XException {
+
+ /**
+ * Create an URIHandlerException Exception exception from a XException.
+ *
+ * @param cause the XException cause.
+ */
+ public URIHandlerException(XException cause) {
+ super(cause);
+ }
+
+ /**
+ * Create a URIHandlerException exception.
+ *
+ * @param errorCode error code.
+ * @param params parameters for the error code message template.
+ */
+ public URIHandlerException(ErrorCode errorCode, Object... params) {
+ super(errorCode, params);
+ }
+}
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/cache/SimpleHCatDependencyCache.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/cache/SimpleHCatDependencyCache.java?rev=1443694&r1=1443693&r2=1443694&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/cache/SimpleHCatDependencyCache.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/dependency/cache/SimpleHCatDependencyCache.java Thu Feb 7 20:10:55 2013
@@ -29,6 +29,8 @@ import java.util.concurrent.ConcurrentHa
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.service.HCatAccessorService;
+import org.apache.oozie.service.Services;
import org.apache.oozie.util.HCatURI;
import org.apache.oozie.util.XLog;
@@ -42,7 +44,7 @@ public class SimpleHCatDependencyCache i
* value (us;20120101;CA) - Collection of waiting actions (actionID and original hcat uri as
* string).
*/
- private Map<String, Map<String, Map<String, Collection<WaitingAction>>>> missingDeps;
+ private ConcurrentMap<String, ConcurrentMap<String, Map<String, Collection<WaitingAction>>>> missingDeps;
/**
* Map of actionIDs and collection of available URIs
@@ -50,12 +52,11 @@ public class SimpleHCatDependencyCache i
private ConcurrentMap<String, Collection<String>> availableDeps;
// TODO:
- // Synchronization for missingDeps. Currently if removed, depend on CoordPushDependencyCheck
// Gather and print stats on cache hits and misses.
@Override
public void init(Configuration conf) {
- missingDeps = new ConcurrentHashMap<String, Map<String, Map<String, Collection<WaitingAction>>>>();
+ missingDeps = new ConcurrentHashMap<String, ConcurrentMap<String, Map<String, Collection<WaitingAction>>>>();
availableDeps = new ConcurrentHashMap<String, Collection<String>>();
}
@@ -67,22 +68,29 @@ public class SimpleHCatDependencyCache i
String partKey = sortedPKV.getPartKeys();
// Partition values seperated by ;. For eg: 20120101;US;CA
String partVal = sortedPKV.getPartVals();
- Map<String, Map<String, Collection<WaitingAction>>> partKeyPatterns = missingDeps.get(tableKey);
+ ConcurrentMap<String, Map<String, Collection<WaitingAction>>> partKeyPatterns = missingDeps.get(tableKey);
if (partKeyPatterns == null) {
- partKeyPatterns = new HashMap<String, Map<String, Collection<WaitingAction>>>();
- missingDeps.put(tableKey, partKeyPatterns);
- }
- Map<String, Collection<WaitingAction>> partValues = partKeyPatterns.get(partKey);
- if (partValues == null) {
- partValues = new HashMap<String, Collection<WaitingAction>>();
- partKeyPatterns.put(partKey, partValues);
+ partKeyPatterns = new ConcurrentHashMap<String, Map<String, Collection<WaitingAction>>>();
+ ConcurrentMap<String, Map<String, Collection<WaitingAction>>> existingMap = missingDeps.putIfAbsent(
+ tableKey, partKeyPatterns);
+ if (existingMap != null) {
+ partKeyPatterns = existingMap;
+ }
}
- Collection<WaitingAction> waitingActions = partValues.get(partVal);
- if (waitingActions == null) {
- waitingActions = new ArrayList<WaitingAction>();
- partValues.put(partVal, waitingActions);
+ synchronized (partKeyPatterns) {
+ missingDeps.put(tableKey, partKeyPatterns); // To handle race condition with removal of partKeyPatterns
+ Map<String, Collection<WaitingAction>> partValues = partKeyPatterns.get(partKey);
+ if (partValues == null) {
+ partValues = new HashMap<String, Collection<WaitingAction>>();
+ partKeyPatterns.put(partKey, partValues);
+ }
+ Collection<WaitingAction> waitingActions = partValues.get(partVal);
+ if (waitingActions == null) {
+ waitingActions = new ArrayList<WaitingAction>();
+ partValues.put(partVal, waitingActions);
+ }
+ waitingActions.add(new WaitingAction(actionID, hcatURI.toURIString()));
}
- waitingActions.add(new WaitingAction(actionID, hcatURI.toURIString()));
}
@Override
@@ -97,39 +105,44 @@ public class SimpleHCatDependencyCache i
hcatURI.toURIString(), actionID);
return false;
}
- Map<String, Collection<WaitingAction>> partValues = partKeyPatterns.get(partKey);
- if (partValues == null) {
- LOG.debug("Remove missing dependency - Missing partition pattern - uri={0}, actionID={1}",
- hcatURI.toURIString(), actionID);
- return false;
- }
- Collection<WaitingAction> waitingActions = partValues.get(partVal);
- if (waitingActions == null) {
- LOG.debug("Remove missing dependency - Missing partition value - uri={0}, actionID={1}",
- hcatURI.toURIString(), actionID);
- return false;
- }
- WaitingAction wAction = null;
- for (WaitingAction action : waitingActions) {
- if (action.getActionID().equals(actionID)) {
- wAction = action;
+ synchronized(partKeyPatterns) {
+ Map<String, Collection<WaitingAction>> partValues = partKeyPatterns.get(partKey);
+ if (partValues == null) {
+ LOG.debug("Remove missing dependency - Missing partition pattern - uri={0}, actionID={1}",
+ hcatURI.toURIString(), actionID);
+ return false;
}
- }
- boolean removed = waitingActions.remove(wAction);
- if (!removed) {
- LOG.debug("Remove missing dependency - Missing action ID - uri={0}, actionID={1}",
- hcatURI.toURIString(), actionID);
- }
- if (waitingActions.isEmpty()) {
- partValues.remove(partVal);
- if (partValues.isEmpty()) {
- partKeyPatterns.remove(partKey);
+ Collection<WaitingAction> waitingActions = partValues.get(partVal);
+ if (waitingActions == null) {
+ LOG.debug("Remove missing dependency - Missing partition value - uri={0}, actionID={1}",
+ hcatURI.toURIString(), actionID);
+ return false;
}
- if (partKeyPatterns.isEmpty()) {
- missingDeps.remove(tableKey);
+ WaitingAction wAction = null;
+ for (WaitingAction action : waitingActions) {
+ if (action.getActionID().equals(actionID)) {
+ wAction = action;
+ }
+ }
+ boolean removed = waitingActions.remove(wAction);
+ if (!removed) {
+ LOG.debug("Remove missing dependency - Missing action ID - uri={0}, actionID={1}",
+ hcatURI.toURIString(), actionID);
}
+ if (waitingActions.isEmpty()) {
+ partValues.remove(partVal);
+ if (partValues.isEmpty()) {
+ partKeyPatterns.remove(partKey);
+ }
+ if (partKeyPatterns.isEmpty()) {
+ missingDeps.remove(tableKey);
+ // Close JMS session. Stop listening on topic
+ HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
+ hcatService.unregisterFromNotification(hcatURI);
+ }
+ }
+ return removed;
}
- return removed;
}
@Override
@@ -173,50 +186,54 @@ public class SimpleHCatDependencyCache i
Collection<String> actionsWithAvailDep = new HashSet<String>();
List<String> partKeysToRemove = new ArrayList<String>();
StringBuilder partValSB = new StringBuilder();
- // If partition patterns are date, date;country and date;country;state,
- // construct the partition values for each pattern from the available partitions map and for
- // the matching value in the dependency map, get the waiting actions.
- for (Entry<String, Map<String, Collection<WaitingAction>>> entry : partKeyPatterns.entrySet()) {
- String[] partKeys = entry.getKey().split(DELIMITER);
- partValSB.setLength(0);
- for (String key : partKeys) {
- partValSB.append(partitions.get(key)).append(DELIMITER);
- }
- partValSB.setLength(partValSB.length() - 1);
-
- Map<String, Collection<WaitingAction>> partValues = entry.getValue();
- String partVal = partValSB.toString();
- Collection<WaitingAction> wActions = entry.getValue().get(partVal);
- if (wActions == null)
- continue;
- for (WaitingAction wAction : wActions) {
- String actionID = wAction.getActionID();
- actionsWithAvailDep.add(actionID);
- Collection<String> depURIs = availableDeps.get(actionID);
- if (depURIs == null) {
- depURIs = new ArrayList<String>();
- Collection<String> existing = availableDeps.putIfAbsent(actionID, depURIs);
- if (existing != null) {
- depURIs = existing;
+ synchronized (partKeyPatterns) {
+ // If partition patterns are date, date;country and date;country;state,
+ // construct the partition values for each pattern from the available partitions map and
+ // for the matching value in the dependency map, get the waiting actions.
+ for (Entry<String, Map<String, Collection<WaitingAction>>> entry : partKeyPatterns.entrySet()) {
+ String[] partKeys = entry.getKey().split(DELIMITER);
+ partValSB.setLength(0);
+ for (String key : partKeys) {
+ partValSB.append(partitions.get(key)).append(DELIMITER);
+ }
+ partValSB.setLength(partValSB.length() - 1);
+
+ Map<String, Collection<WaitingAction>> partValues = entry.getValue();
+ String partVal = partValSB.toString();
+ Collection<WaitingAction> wActions = entry.getValue().get(partVal);
+ if (wActions == null)
+ continue;
+ for (WaitingAction wAction : wActions) {
+ String actionID = wAction.getActionID();
+ actionsWithAvailDep.add(actionID);
+ Collection<String> depURIs = availableDeps.get(actionID);
+ if (depURIs == null) {
+ depURIs = new ArrayList<String>();
+ Collection<String> existing = availableDeps.putIfAbsent(actionID, depURIs);
+ if (existing != null) {
+ depURIs = existing;
+ }
+ }
+ synchronized (depURIs) {
+ depURIs.add(wAction.getDependencyURI());
+ availableDeps.put(actionID, depURIs);
}
}
- synchronized (depURIs) {
- depURIs.add(wAction.getDependencyURI());
- availableDeps.put(actionID, depURIs);
+ partValues.remove(partVal);
+ if (partValues.isEmpty()) {
+ partKeysToRemove.add(entry.getKey());
}
}
- partValues.remove(partVal);
- if (partValues.isEmpty()) {
- partKeysToRemove.add(entry.getKey());
+ for (String partKey : partKeysToRemove) {
+ partKeyPatterns.remove(partKey);
+ }
+ if (partKeyPatterns.isEmpty()) {
+ missingDeps.remove(tableKey);
+ // Close JMS session. Stop listening on topic
+ HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
+ hcatService.unregisterFromNotification(server, db, table);
}
}
- for (String partKey : partKeysToRemove) {
- partKeyPatterns.remove(partKey);
- }
- if (partKeyPatterns.isEmpty()) {
- missingDeps.remove(tableKey);
- // TODO: Close JMS session. Stop listening on topic
- }
return actionsWithAvailDep;
}
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/ConnectionContext.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/ConnectionContext.java?rev=1443694&r1=1443693&r2=1443694&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/ConnectionContext.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/ConnectionContext.java Thu Feb 7 20:10:55 2013
@@ -19,8 +19,6 @@ package org.apache.oozie.jms;
import java.util.Properties;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
@@ -34,19 +32,11 @@ import javax.naming.NamingException;
public interface ConnectionContext {
/**
- * Create connection factory using properties
+ * Create connection using properties
* @param props the properties used for creating jndi context
- * @return
- * @throws NamingException
- */
- public ConnectionFactory createConnectionFactory(Properties props) throws NamingException;
-
- /**
- * Create connection using connection Factory
- * @param connFactory
* @throws JMSException
*/
- public void createConnection(ConnectionFactory connFactory) throws JMSException;
+ public void createConnection(Properties props) throws NamingException, JMSException;
/**
* Set the exception Listener
@@ -87,18 +77,6 @@ public interface ConnectionContext {
public MessageProducer createProducer(Session session, String topicName) throws JMSException;
/**
- * Retrieves the connection for this connection context
- * @return
- */
- public Connection getConnection();
-
- /**
- * Retrieves the conneciton factory name for this context
- * @return
- */
- public String getConnectionFactoryName();
-
- /**
* Closes the connection
*/
public void close();
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/DefaultConnectionContext.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/DefaultConnectionContext.java?rev=1443694&r1=1443693&r2=1443694&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/DefaultConnectionContext.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/DefaultConnectionContext.java Thu Feb 7 20:10:55 2013
@@ -40,7 +40,7 @@ public class DefaultConnectionContext im
private static XLog LOG = XLog.getLog(ConnectionContext.class);
@Override
- public ConnectionFactory createConnectionFactory(Properties props) throws NamingException {
+ public void createConnection(Properties props) throws NamingException, JMSException {
Context jndiContext = new InitialContext(props);
connectionFactoryName = (String) jndiContext.getEnvironment().get("connectionFactoryNames");
if (connectionFactoryName == null || connectionFactoryName.trim().length() == 0) {
@@ -48,14 +48,8 @@ public class DefaultConnectionContext im
}
ConnectionFactory connectionFactory = (ConnectionFactory) jndiContext.lookup(connectionFactoryName);
LOG.info("Connecting with the following properties \n" + jndiContext.getEnvironment().toString());
- return connectionFactory;
-
- }
-
- @Override
- public void createConnection(ConnectionFactory connFactory) throws JMSException {
try {
- connection = connFactory.createConnection();
+ connection = connectionFactory.createConnection();
connection.start();
connection.setExceptionListener(new ExceptionListener() {
@Override
@@ -79,9 +73,10 @@ public class DefaultConnectionContext im
}
}
+
@Override
public boolean isConnectionInitialized() {
- return (connection != null) ? true : false;
+ return connection != null;
}
@Override
@@ -109,16 +104,6 @@ public class DefaultConnectionContext im
}
@Override
- public Connection getConnection() {
- return connection;
- }
-
- @Override
- public String getConnectionFactoryName() {
- return connectionFactoryName;
- }
-
- @Override
public void close() {
try {
connection.close();
Added: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/JMSConnectionInfo.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/JMSConnectionInfo.java?rev=1443694&view=auto
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/JMSConnectionInfo.java (added)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/JMSConnectionInfo.java Thu Feb 7 20:10:55 2013
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.jms;
+
+import java.util.Properties;
+
+import org.apache.oozie.util.XLog;
+
+public class JMSConnectionInfo {
+
+ private String jndiPropertiesString;
+ private Properties props;
+
+ /**
+ * Create a JMSConnectionInfo
+ * @param jndiPropertiesString JNDI properties with # as key value delimiter and ; as properties delimiter
+ */
+ public JMSConnectionInfo(String jndiPropertiesString) {
+ this.jndiPropertiesString = jndiPropertiesString;
+ initializeProps();
+ }
+
+ private void initializeProps() {
+ this.props = new Properties();
+ String[] propArr = jndiPropertiesString.split(";");
+ for (String pair : propArr) {
+ String[] kV = pair.split("#");
+ if (kV.length > 1) {
+ props.put(kV[0].trim(), kV[1].trim());
+ }
+ else {
+ XLog.getLog(getClass()).warn("Unformatted properties. Expected key#value : " + pair);
+ props = null;
+ }
+ }
+ if (props.isEmpty()) {
+ props = null;
+ }
+ }
+
+ /**
+ * Get JNDI properties to establish a JMS connection
+ * @return JNDI properties
+ */
+ public Properties getJNDIProperties() {
+ return props;
+ }
+
+ /**
+ * Return JNDI properties string
+ * @return JNDI properties string
+ */
+ public String getJNDIPropertiesString() {
+ return jndiPropertiesString;
+ }
+
+ @Override
+ public int hashCode() {
+ return 31 + ((jndiPropertiesString == null) ? 0 : jndiPropertiesString.hashCode());
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ JMSConnectionInfo other = (JMSConnectionInfo) obj;
+ if (jndiPropertiesString == null) {
+ if (other.jndiPropertiesString != null)
+ return false;
+ }
+ else if (!jndiPropertiesString.equals(other.jndiPropertiesString))
+ return false;
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "JMSConnectionInfo [jndiProperties=" + jndiPropertiesString + "]";
+ }
+
+}
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/JMSExceptionListener.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/JMSExceptionListener.java?rev=1443694&r1=1443693&r2=1443694&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/JMSExceptionListener.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/JMSExceptionListener.java Thu Feb 7 20:10:55 2013
@@ -27,7 +27,7 @@ import org.apache.oozie.util.XLog;
public class JMSExceptionListener implements ExceptionListener {
private static XLog LOG = XLog.getLog(JMSExceptionListener.class);
- private String jmsConnectString;
+ private JMSConnectionInfo connInfo;
private ConnectionContext connCtxt;
/**
@@ -36,17 +36,17 @@ public class JMSExceptionListener implem
* @param jmsConnectString The connect string specifiying parameters for JMS connection
* @param connCtxt The actual connection on which this listener will be registered
*/
- public JMSExceptionListener(String jmsConnectString, ConnectionContext connCtxt) {
- this.jmsConnectString = jmsConnectString;
+ public JMSExceptionListener(JMSConnectionInfo connInfo, ConnectionContext connCtxt) {
+ this.connInfo = connInfo;
this.connCtxt = connCtxt;
}
@Override
public void onException(JMSException exception) {
- LOG.warn("Received JMSException for connection [{0}]. Reestablishing connection", jmsConnectString, exception);
+ LOG.warn("Received JMSException for [{0}]. Reestablishing connection", connInfo, exception);
connCtxt.close();
JMSAccessorService jmsService = Services.get().get(JMSAccessorService.class);
- jmsService.reestablishConnection(jmsConnectString);
+ jmsService.reestablishConnection(connInfo);
}
}
Added: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/HCatAccessorService.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/HCatAccessorService.java?rev=1443694&view=auto
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/HCatAccessorService.java (added)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/HCatAccessorService.java Thu Feb 7 20:10:55 2013
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.service;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.jms.HCatMessageHandler;
+import org.apache.oozie.jms.JMSConnectionInfo;
+import org.apache.oozie.util.HCatURI;
+import org.apache.oozie.util.MappingRule;
+import org.apache.oozie.util.XLog;
+
+public class HCatAccessorService implements Service {
+
+ public static final String CONF_PREFIX = Service.CONF_PREFIX + "HCatAccessorService.";
+ public static final String JMS_CONNECTIONS_PROPERTIES = CONF_PREFIX + "jmsconnections";
+
+ private static XLog LOG;
+ private static String DELIMITER = "#";
+ private Configuration conf;
+ private JMSAccessorService jmsService;
+ private List<MappingRule> mappingRules;
+ private JMSConnectionInfo defaultJMSConnInfo;
+ /**
+ * Map of publisher(host:port) to JMS connection info
+ */
+ private Map<String, JMSConnectionInfo> publisherJMSConnInfoMap;
+ /**
+ * Mapping of table to the topic name for the table
+ */
+ private Map<String, String> registeredTopicsMap;
+
+ @Override
+ public void init(Services services) throws ServiceException {
+ LOG = XLog.getLog(getClass());
+ conf = services.getConf();
+ this.jmsService = services.get(JMSAccessorService.class);
+ initializeMappingRules();
+ this.publisherJMSConnInfoMap = new HashMap<String, JMSConnectionInfo>();
+ this.registeredTopicsMap = new HashMap<String, String>();
+ }
+
+ private void initializeMappingRules() {
+ String[] connections = conf.getStrings(JMS_CONNECTIONS_PROPERTIES);
+ if (connections != null) {
+ mappingRules = new ArrayList<MappingRule>(connections.length);
+ for (String connection : connections) {
+ String[] values = connection.split("=", 2);
+ String key = values[0].trim();
+ String value = values[1].trim();
+ if (key.equals("default")) {
+ defaultJMSConnInfo = new JMSConnectionInfo(value);
+ }
+ else {
+ mappingRules.add(new MappingRule(key, value));
+ }
+ }
+ }
+ else {
+ LOG.warn("No JMS connection defined");
+ }
+ }
+
+ /**
+ * Determine whether a given source URI publishes JMS messages
+ *
+ * @param sourceURI URI of the publisher
+ * @return true if we have JMS connection information for the source URI, else false
+ */
+ public boolean isKnownPublisher(URI sourceURI) {
+ if (publisherJMSConnInfoMap.containsKey(sourceURI.getAuthority())) {
+ return true;
+ }
+ else {
+ return getJMSConnectionInfo(sourceURI) != null;
+ }
+ }
+
+ /**
+ * Given a publisher host:port return the connection details of JMS server that the publisher
+ * publishes to
+ *
+ * @param publisherURI URI of the publisher
+ * @return JMSConnectionInfo to connect to the JMS server that the publisher publishes to
+ */
+ public JMSConnectionInfo getJMSConnectionInfo(URI publisherURI) {
+ String publisherAuthority = publisherURI.getAuthority();
+ if (publisherJMSConnInfoMap.containsKey(publisherAuthority)) {
+ return publisherJMSConnInfoMap.get(publisherAuthority);
+ }
+ else {
+ String schemeWithAuthority = publisherURI.getScheme() + "://" + publisherAuthority;
+ for (MappingRule mr : mappingRules) {
+ String jndiPropertiesString = mr.applyRule(schemeWithAuthority);
+ if (jndiPropertiesString != null) {
+ JMSConnectionInfo connInfo = new JMSConnectionInfo(jndiPropertiesString);
+ publisherJMSConnInfoMap.put(publisherAuthority, connInfo);
+ return connInfo;
+ }
+ }
+ publisherJMSConnInfoMap.put(publisherAuthority, defaultJMSConnInfo);
+ return defaultJMSConnInfo;
+ }
+ }
+
+ /**
+ * Check if we are already listening to the JMS topic for the table in the given hcatURI
+ *
+ * @param hcatURI hcatalog partition URI
+ * @return true if registered to a JMS topic for the table in the given hcatURI
+ */
+ public boolean isRegisteredForNotification(HCatURI hcatURI) {
+ return registeredTopicsMap.containsKey(hcatURI.getURI().getAuthority() + DELIMITER + hcatURI.getDb()
+ + DELIMITER + hcatURI.getTable());
+ }
+
+ /**
+ * Register for notifications on a JMS topic for the specified hcatalog table.
+ *
+ * @param hcatURI hcatalog partition URI
+ * @param topic JMS topic to register to
+ * @param msgHandler Handler which will process the messages received on the topic
+ */
+ public void registerForNotification(HCatURI hcatURI, String topic, HCatMessageHandler msgHandler) {
+ JMSConnectionInfo connInfo = getJMSConnectionInfo(hcatURI.getURI());
+ jmsService.registerForNotification(connInfo, topic, msgHandler);
+ registeredTopicsMap.put(
+ hcatURI.getURI().getAuthority() + DELIMITER + hcatURI.getDb() + DELIMITER + hcatURI.getTable(), topic);
+ }
+
+ public void unregisterFromNotification(HCatURI hcatURI) {
+ String topic = registeredTopicsMap.remove(hcatURI.getURI().getAuthority() + DELIMITER + hcatURI.getDb()
+ + DELIMITER + hcatURI.getTable());
+ if (topic != null) {
+ JMSConnectionInfo connInfo = getJMSConnectionInfo(hcatURI.getURI());
+ jmsService.unregisterFromNotification(connInfo, topic);
+ }
+ }
+
+ public void unregisterFromNotification(String server, String database, String table) {
+ String key = server + DELIMITER + database + DELIMITER + table;
+ String topic = registeredTopicsMap.remove(key);
+ if (topic != null) {
+ try {
+ JMSConnectionInfo connInfo = getJMSConnectionInfo(new URI("hcat://" + server));
+ jmsService.unregisterFromNotification(connInfo, topic);
+ }
+ catch (URISyntaxException e) {
+ LOG.warn("Error unregistering from notification for topic [{0}]. Hcat table=[{1}]", topic, key, e);
+ }
+ }
+ }
+
+ @Override
+ public void destroy() {
+ publisherJMSConnInfoMap.clear();
+ }
+
+ @Override
+ public Class<? extends Service> getInterface() {
+ return HCatAccessorService.class;
+ }
+
+}
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/HadoopAccessorException.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/HadoopAccessorException.java?rev=1443694&r1=1443693&r2=1443694&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/HadoopAccessorException.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/HadoopAccessorException.java Thu Feb 7 20:10:55 2013
@@ -19,8 +19,9 @@ package org.apache.oozie.service;
import org.apache.oozie.XException;
import org.apache.oozie.ErrorCode;
+import org.apache.oozie.dependency.URIHandlerException;
-public class HadoopAccessorException extends URIAccessorException {
+public class HadoopAccessorException extends URIHandlerException {
/**
* Create an HadoopAccessor exception from a XException.
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java?rev=1443694&r1=1443693&r2=1443694&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java Thu Feb 7 20:10:55 2013
@@ -45,7 +45,6 @@ import java.util.Map;
import java.util.Set;
import java.util.HashSet;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
/**
* The HadoopAccessorService returns HadoopAccessor instances configured to work on behalf of a user-group. <p/> The
@@ -80,7 +79,7 @@ public class HadoopAccessorService imple
private Map<String, File> actionConfigDirs = new HashMap<String, File>();
private Map<String, Map<String, XConfiguration>> actionConfigs = new HashMap<String, Map<String, XConfiguration>>();
- private ConcurrentMap<String, UserGroupInformation> userUgiMap;
+ private UserGroupInformationService ugiService;
/**
* Supported filesystem schemes for namespace federation
@@ -91,6 +90,7 @@ public class HadoopAccessorService imple
private boolean allSchemesSupported;
public void init(Services services) throws ServiceException {
+ this.ugiService = services.get(UserGroupInformationService.class);
init(services.getConf());
}
@@ -128,7 +128,9 @@ public class HadoopAccessorService imple
UserGroupInformation.setConfiguration(ugiConf);
}
- userUgiMap = new ConcurrentHashMap<String, UserGroupInformation>();
+ if (ugiService == null) { //for testing purposes, see XFsTestCase
+ this.ugiService = new UserGroupInformationService();
+ }
loadHadoopConfigs(conf);
preLoadActionConfigs(conf);
@@ -264,13 +266,7 @@ public class HadoopAccessorService imple
}
private UserGroupInformation getUGI(String user) throws IOException {
- UserGroupInformation ugi = userUgiMap.get(user);
- if (ugi == null) {
- // taking care of a race condition, the latest UGI will be discarded
- ugi = UserGroupInformation.createProxyUser(user, UserGroupInformation.getLoginUser());
- userUgiMap.putIfAbsent(user, ugi);
- }
- return ugi;
+ return ugiService.getProxyUser(user);
}
/**
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/JMSAccessorService.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/JMSAccessorService.java?rev=1443694&r1=1443693&r2=1443694&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/JMSAccessorService.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/JMSAccessorService.java Thu Feb 7 20:10:55 2013
@@ -17,13 +17,11 @@
*/
package org.apache.oozie.service;
-import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import javax.jms.JMSException;
@@ -33,10 +31,10 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.oozie.jms.ConnectionContext;
import org.apache.oozie.jms.DefaultConnectionContext;
+import org.apache.oozie.jms.JMSConnectionInfo;
import org.apache.oozie.jms.JMSExceptionListener;
import org.apache.oozie.jms.MessageHandler;
import org.apache.oozie.jms.MessageReceiver;
-import org.apache.oozie.util.MappingRule;
import org.apache.oozie.util.XLog;
import com.google.common.annotations.VisibleForTesting;
@@ -51,132 +49,75 @@ import com.google.common.annotations.Vis
*/
public class JMSAccessorService implements Service {
public static final String CONF_PREFIX = Service.CONF_PREFIX + "JMSAccessorService.";
- public static final String JMS_CONNECTIONS_PROPERTIES = CONF_PREFIX + "connections";
public static final String JMS_CONNECTION_CONTEXT_IMPL = CONF_PREFIX + "connectioncontext.impl";
public static final String SESSION_OPTS = CONF_PREFIX + "jms.sessionOpts";
public static final String CONF_RETRY_INITIAL_DELAY = CONF_PREFIX + "retry.initial.delay";
public static final String CONF_RETRY_MULTIPLIER = CONF_PREFIX + "retry.multiplier";
public static final String CONF_RETRY_MAX_ATTEMPTS = CONF_PREFIX + "retry.max.attempts";
- public static final String DEFAULT_SERVER_ENDPOINT = "default";
private static XLog LOG;
- private String defaultConnectString = null;
private Configuration conf;
private int sessionOpts;
private int retryInitialDelay;
private int retryMultiplier;
private int retryMaxAttempts;
- private List<MappingRule> mappingRules = null;
/**
- * Map of publisher(host:port) to JMS connect string
+ * Map of JMS connection info to established JMS Connection
*/
- private Map<String, String> publisherConnectStringMap = new HashMap<String, String>();
+ private ConcurrentMap<JMSConnectionInfo, ConnectionContext> connectionMap = new ConcurrentHashMap<JMSConnectionInfo, ConnectionContext>();
/**
- * Map of JMS connect string to established JMS Connection
+ * Map of JMS connection info to topic names to MessageReceiver
*/
- private ConcurrentMap<String, ConnectionContext> connectionMap = new ConcurrentHashMap<String, ConnectionContext>();
- /**
- * Map of publisher(host:port) to topic names to MessageReceiver
- */
- private ConcurrentMap<String, Map<String, MessageReceiver>> receiversMap = new ConcurrentHashMap<String, Map<String, MessageReceiver>>();
-
- /**
- * Map of JMS connect string to last connection attempt time
- */
- private Map<String, ConnectionAttempt> retryConnectionsMap = new HashMap<String, ConnectionAttempt>();
+ private ConcurrentMap<JMSConnectionInfo, Map<String, MessageReceiver>> receiversMap = new ConcurrentHashMap<JMSConnectionInfo, Map<String, MessageReceiver>>();
/**
- * Map of publisher(host:port) to topic names that need to be registered for listening to the
- * MessageHandler that process message of the topic.
+ * Map of JMS connection info to connection retry information
*/
- private Map<String, Map<String, MessageHandler>> retryTopicsMap = new HashMap<String, Map<String, MessageHandler>>();
+ private Map<JMSConnectionInfo, ConnectionRetryInfo> retryConnectionsMap = new HashMap<JMSConnectionInfo, ConnectionRetryInfo>();
@Override
public void init(Services services) throws ServiceException {
LOG = XLog.getLog(getClass());
conf = services.getConf();
- initializeMappingRules();
sessionOpts = conf.getInt(SESSION_OPTS, Session.AUTO_ACKNOWLEDGE);
retryInitialDelay = conf.getInt(CONF_RETRY_INITIAL_DELAY, 60); // initial delay in seconds
retryMultiplier = conf.getInt(CONF_RETRY_MULTIPLIER, 2);
retryMaxAttempts = conf.getInt(CONF_RETRY_MAX_ATTEMPTS, 10);
}
- protected void initializeMappingRules() {
- String connectionString = conf.get(JMS_CONNECTIONS_PROPERTIES);
- if (connectionString != null) {
- String[] connections = connectionString.split("\\s*,\\s*");
- mappingRules = new ArrayList<MappingRule>(connections.length);
- for (String connection : connections) {
- String[] values = connection.split("=", 2);
- String key = values[0].replaceAll("^\\s+|\\s+$", "");
- String value = values[1].replaceAll("^\\s+|\\s+$", "");
- if (key.equals("default")) {
- defaultConnectString = value;
- }
- else {
- mappingRules.add(new MappingRule(key, value));
- }
- }
- }
- else {
- LOG.warn("No JMS connection defined");
- }
- }
-
/**
- * Determine whether a given source URI publishes JMS messages
+ * Register for notifications on a JMS topic.
*
- * @param sourceURI URI of the publisher
- * @return true if we have JMS mapping for the source URI, else false
- */
- public boolean isKnownPublisher(URI sourceURI) {
- if (publisherConnectStringMap.containsKey(sourceURI.getAuthority())) {
- return true;
- }
- else {
- return getJMSServerConnectString(sourceURI.getAuthority()) != null;
- }
- }
-
- /**
- * Register for notifications on a JMS topic from the specified publisher.
- *
- * @param publisherURI URI of the publisher of JMS messages. Used to determine the JMS
- * connection end point.
+ * @param connInfo Information to connect to a JMS compliant messaging service.
* @param topic Topic in which the JMS messages are published
* @param msgHandler Handler which will process the messages received on the topic
*/
- public void registerForNotification(URI publisherURI, String topic, MessageHandler msgHandler) {
- String publisherAuthority = publisherURI.getAuthority();
- if (isTopicInRetryList(publisherAuthority, topic)) {
+ public void registerForNotification(JMSConnectionInfo connInfo, String topic, MessageHandler msgHandler) {
+ if (isTopicInRetryList(connInfo, topic)) {
return;
}
- if (isConnectionInRetryList(publisherAuthority)) {
- queueTopicForRetry(publisherAuthority, topic, msgHandler);
+ if (isConnectionInRetryList(connInfo)) {
+ queueTopicForRetry(connInfo, topic, msgHandler);
return;
}
- Map<String, MessageReceiver> topicsMap = getReceiversTopicsMap(publisherAuthority);
+ Map<String, MessageReceiver> topicsMap = getReceiversTopicsMap(connInfo);
if (topicsMap.containsKey(topic)) {
return;
}
synchronized (topicsMap) {
if (!topicsMap.containsKey(topic)) {
- String jmsConnectString = getJMSServerConnectString(publisherAuthority);
- ConnectionContext connCtxt = createConnectionContext(jmsConnectString);
+ ConnectionContext connCtxt = createConnectionContext(connInfo);
if (connCtxt == null) {
- queueTopicForRetry(publisherAuthority, topic, msgHandler);
- queueConnectionForRetry(jmsConnectString);
+ queueTopicForRetry(connInfo, topic, msgHandler);
return;
}
- MessageReceiver receiver = registerForTopic(connCtxt, publisherAuthority, topic, msgHandler);
+ MessageReceiver receiver = registerForTopic(connInfo, connCtxt, topic, msgHandler);
if (receiver == null) {
- queueTopicForRetry(publisherAuthority, topic, msgHandler);
- queueConnectionForRetry(jmsConnectString);
+ queueTopicForRetry(connInfo, topic, msgHandler);
}
else {
- LOG.info("Registered a listener for topic {0} from publisher {1}", topic, publisherAuthority);
+ LOG.info("Registered a listener for topic {0} on {1}", topic, connInfo);
topicsMap.put(topic, receiver);
}
}
@@ -186,18 +127,17 @@ public class JMSAccessorService implemen
/**
* Unregister from listening to JMS messages on a topic.
*
- * @param publisherAuthority host:port of the publisher of JMS messages. Used to determine the
- * JMS connection end point.
+ * @param connInfo Information to connect to the JMS compliant messaging service.
* @param topic Topic in which the JMS messages are published
*/
- public void unregisterFromNotification(String publisherAuthority, String topic) {
- LOG.info("Unregistering JMS listener. Clossing session for {0} and topic {1}", publisherAuthority, topic);
+ public void unregisterFromNotification(JMSConnectionInfo connInfo, String topic) {
+ LOG.info("Unregistering JMS listener. Clossing session for {0} and topic {1}", connInfo, topic);
- if (isTopicInRetryList(publisherAuthority, topic)) {
- removeTopicFromRetryList(publisherAuthority, topic);
+ if (isTopicInRetryList(connInfo, topic)) {
+ removeTopicFromRetryList(connInfo, topic);
}
else {
- Map<String, MessageReceiver> topicsMap = receiversMap.get(publisherAuthority);
+ Map<String, MessageReceiver> topicsMap = receiversMap.get(connInfo);
MessageReceiver receiver = topicsMap.remove(topic);
if (receiver != null) {
try {
@@ -209,20 +149,20 @@ public class JMSAccessorService implemen
}
else {
LOG.warn(
- "Received request to unregister from topic [{0}] from publisher [{1}], but no matching session.",
- topic, publisherAuthority);
+ "Received request to unregister from topic [{0}] on [{1}], but no matching session.",
+ topic, connInfo);
}
if (topicsMap.isEmpty()) {
- receiversMap.remove(publisherAuthority);
+ receiversMap.remove(connInfo);
}
}
}
- private Map<String, MessageReceiver> getReceiversTopicsMap(String publisherAuthority) {
- Map<String, MessageReceiver> topicsMap = receiversMap.get(publisherAuthority);
+ private Map<String, MessageReceiver> getReceiversTopicsMap(JMSConnectionInfo connInfo) {
+ Map<String, MessageReceiver> topicsMap = receiversMap.get(connInfo);
if (topicsMap == null) {
topicsMap = new HashMap<String, MessageReceiver>();
- Map<String, MessageReceiver> exists = receiversMap.putIfAbsent(publisherAuthority, topicsMap);
+ Map<String, MessageReceiver> exists = receiversMap.putIfAbsent(connInfo, topicsMap);
if (exists != null) {
topicsMap = exists;
}
@@ -233,66 +173,68 @@ public class JMSAccessorService implemen
/**
* Determine if currently listening to JMS messages on a topic.
*
- * @param publisherAuthority host:port of the publisher of JMS messages. Used to determine the
- * JMS connection end point.
+ * @param connInfo Information to connect to the JMS compliant messaging service.
* @param topic Topic in which the JMS messages are published
* @return true if listening to the topic, else false
*/
@VisibleForTesting
- boolean isListeningToTopic(String publisherAuthority, String topic) {
- Map<String, MessageReceiver> topicsMap = receiversMap.get(publisherAuthority);
+ boolean isListeningToTopic(JMSConnectionInfo connInfo, String topic) {
+ Map<String, MessageReceiver> topicsMap = receiversMap.get(connInfo);
return (topicsMap != null && topicsMap.containsKey(topic));
}
@VisibleForTesting
- boolean isConnectionInRetryList(String publisherAuthority) {
- String jmsConnectString = getJMSServerConnectString(publisherAuthority);
- return retryConnectionsMap.containsKey(jmsConnectString);
+ boolean isConnectionInRetryList(JMSConnectionInfo connInfo) {
+ return retryConnectionsMap.containsKey(connInfo);
}
@VisibleForTesting
- boolean isTopicInRetryList(String publisherAuthority, String topic) {
- Map<String, MessageHandler> topicsMap = retryTopicsMap.get(publisherAuthority);
- return topicsMap != null && topicsMap.containsKey(topic);
+ boolean isTopicInRetryList(JMSConnectionInfo connInfo, String topic) {
+ ConnectionRetryInfo connRetryInfo = retryConnectionsMap.get(connInfo);
+ if (connRetryInfo == null) {
+ return false;
+ }
+ else {
+ Map<String, MessageHandler> topicsMap = connRetryInfo.getTopicsToRetry();
+ return topicsMap.containsKey(topic);
+ }
}
// For unit testing
@VisibleForTesting
- int getNumConnectionAttempts(String publisherAuthority) {
- String jmsConnectString = getJMSServerConnectString(publisherAuthority);
- return retryConnectionsMap.get(jmsConnectString).getNumAttempt();
+ int getNumConnectionAttempts(JMSConnectionInfo connInfo) {
+ return retryConnectionsMap.get(connInfo).getNumAttempt();
}
- private void queueConnectionForRetry(String jmsConnectString) {
- if (!retryConnectionsMap.containsKey(jmsConnectString)) {
- LOG.info("Queueing connection {0} for retry", jmsConnectString);
- retryConnectionsMap.put(jmsConnectString, new ConnectionAttempt(0, retryInitialDelay));
- scheduleRetry(jmsConnectString, retryInitialDelay);
+ private ConnectionRetryInfo queueConnectionForRetry(JMSConnectionInfo connInfo) {
+ ConnectionRetryInfo connRetryInfo = retryConnectionsMap.get(connInfo);
+ if (connRetryInfo == null) {
+ LOG.info("Queueing connection {0} for retry", connInfo);
+ connRetryInfo = new ConnectionRetryInfo(0, retryInitialDelay);
+ retryConnectionsMap.put(connInfo, connRetryInfo);
+ scheduleRetry(connInfo, retryInitialDelay);
}
+ return connRetryInfo;
}
- private void queueTopicForRetry(String publisherAuthority, String topic, MessageHandler msgHandler) {
- LOG.info("Queueing topic {0} from publisher {1} for retry", topic, publisherAuthority);
- Map<String, MessageHandler> topicsMap = retryTopicsMap.get(publisherAuthority);
- if (topicsMap == null) {
- topicsMap = new HashMap<String, MessageHandler>();
- retryTopicsMap.put(publisherAuthority, topicsMap);
- }
+ private ConnectionRetryInfo queueTopicForRetry(JMSConnectionInfo connInfo, String topic, MessageHandler msgHandler) {
+ LOG.info("Queueing topic {0} for {1} for retry", topic, connInfo);
+ ConnectionRetryInfo connRetryInfo = queueConnectionForRetry(connInfo);
+ Map<String, MessageHandler> topicsMap = connRetryInfo.getTopicsToRetry();
topicsMap.put(topic, msgHandler);
+ return connRetryInfo;
}
- private void removeTopicFromRetryList(String publisherAuthority, String topic) {
- LOG.info("Removing topic {0} from publisher {1} from retry list", topic, publisherAuthority);
- Map<String, MessageHandler> topicsMap = retryTopicsMap.get(publisherAuthority);
- if (topicsMap != null) {
+ private void removeTopicFromRetryList(JMSConnectionInfo connInfo, String topic) {
+ LOG.info("Removing topic {0} from {1} from retry list", topic, connInfo);
+ ConnectionRetryInfo connRetryInfo = retryConnectionsMap.get(connInfo);
+ if (connRetryInfo != null) {
+ Map<String, MessageHandler> topicsMap = connRetryInfo.getTopicsToRetry();
topicsMap.remove(topic);
- if (topicsMap.isEmpty()) {
- retryTopicsMap.remove(publisherAuthority);
- }
}
}
- private MessageReceiver registerForTopic(ConnectionContext connCtxt, String publisherAuthority, String topic,
+ private MessageReceiver registerForTopic(JMSConnectionInfo connInfo, ConnectionContext connCtxt, String topic,
MessageHandler msgHandler) {
try {
Session session = connCtxt.createSession(sessionOpts);
@@ -302,69 +244,27 @@ public class JMSAccessorService implemen
return receiver;
}
catch (JMSException e) {
- LOG.warn("Error while registering to listen to topic {0} from publisher {1}", topic, publisherAuthority, e);
+ LOG.warn("Error while registering to listen to topic {0} from {1}", topic, connInfo, e);
return null;
}
}
- protected ConnectionContext createConnectionContext(String jmsConnectString) {
- ConnectionContext connCtxt = connectionMap.get(jmsConnectString);
+ protected ConnectionContext createConnectionContext(JMSConnectionInfo connInfo) {
+ ConnectionContext connCtxt = connectionMap.get(connInfo);
if (connCtxt == null) {
- Properties props = getJMSPropsFromConf(jmsConnectString);
- if (props != null) {
- try {
- connCtxt = getConnectionContextImpl();
- connCtxt.createConnection(connCtxt.createConnectionFactory(props));
- connCtxt.setExceptionListener(new JMSExceptionListener(jmsConnectString, connCtxt));
- connectionMap.put(jmsConnectString, connCtxt);
- LOG.info("Connection established to JMS Server for [{0}]", jmsConnectString);
- }
- catch (Exception e) {
- LOG.warn("Exception while establishing connection to JMS Server for [{0}]", jmsConnectString, e);
- return null;
- }
- }
- else {
- LOG.warn("JMS connection string is not configured properly - [{0}]", jmsConnectString);
+ try {
+ connCtxt = getConnectionContextImpl();
+ connCtxt.createConnection(connInfo.getJNDIProperties());
+ connCtxt.setExceptionListener(new JMSExceptionListener(connInfo, connCtxt));
+ connectionMap.put(connInfo, connCtxt);
+ LOG.info("Connection established to JMS Server for [{0}]", connInfo);
}
- }
- return connCtxt;
- }
-
- protected String getJMSServerConnectString(String publisherAuthority) {
- if (publisherConnectStringMap.containsKey(publisherAuthority)) {
- return publisherConnectStringMap.get(publisherAuthority);
- }
- else {
- for (MappingRule mr : mappingRules) {
- String jmsConnectString = mr.applyRule(publisherAuthority);
- if (jmsConnectString != null) {
- publisherConnectStringMap.put(publisherAuthority, jmsConnectString);
- return jmsConnectString;
- }
- }
- publisherConnectStringMap.put(publisherAuthority, defaultConnectString);
- return defaultConnectString;
- }
- }
-
- protected Properties getJMSPropsFromConf(String jmsConnectString) {
- Properties props = new Properties();
- String[] propArr = jmsConnectString.split(";");
- for (String pair : propArr) {
- String[] kV = pair.split("#");
- if (kV.length > 1) {
- props.put(kV[0].trim(), kV[1].trim());
- }
- else {
- LOG.warn("Unformatted properties. Expected key#value : " + pair);
+ catch (Exception e) {
+ LOG.warn("Exception while establishing connection to JMS Server for [{0}]", connInfo, e);
return null;
}
}
- if (props.isEmpty()) {
- return null;
- }
- return props;
+ return connCtxt;
}
private ConnectionContext getConnectionContextImpl() {
@@ -380,8 +280,8 @@ public class JMSAccessorService implemen
}
@VisibleForTesting
- MessageReceiver getMessageReceiver(String publisher, String topic) {
- Map<String, MessageReceiver> topicsMap = receiversMap.get(publisher);
+ MessageReceiver getMessageReceiver(JMSConnectionInfo connInfo, String topic) {
+ Map<String, MessageReceiver> topicsMap = receiversMap.get(connInfo);
if (topicsMap != null) {
return topicsMap.get(topic);
}
@@ -405,8 +305,8 @@ public class JMSAccessorService implemen
receiversMap.clear();
LOG.info("Closing JMS connections");
- for (Entry<String, ConnectionContext> entry : connectionMap.entrySet()) {
- entry.getValue().close();
+ for (ConnectionContext conn : connectionMap.values()) {
+ conn.close();
}
connectionMap.clear();
}
@@ -417,117 +317,102 @@ public class JMSAccessorService implemen
}
/**
- * Reestablish connection for the given JMS connect string
- * @param jmsConnectString JMS connect string
+ * Reestablish connection for the given JMS connect information
+ * @param connInfo JMS connection info
*/
- public void reestablishConnection(String jmsConnectString) {
- connectionMap.remove(jmsConnectString);
- queueConnectionForRetry(jmsConnectString);
- for (Entry<String, String> entry : publisherConnectStringMap.entrySet()) {
- if (entry.getValue().equals(jmsConnectString)) {
- String publisherAuthority = entry.getKey();
- Map<String, MessageReceiver> topicsMap = receiversMap.remove(publisherAuthority);
- if (topicsMap != null) {
- Map<String, MessageHandler> retryTopics = retryTopicsMap.get(publisherAuthority);
- if (retryTopics == null) {
- retryTopics = new HashMap<String, MessageHandler>();
- retryTopicsMap.put(publisherAuthority, retryTopics);
- }
- for (Entry<String, MessageReceiver> topicEntry : topicsMap.entrySet()) {
- MessageReceiver receiver = topicEntry.getValue();
- retryTopics.put(topicEntry.getKey(), receiver.getMessageHandler());
- try {
- receiver.getSession().close();
- }
- catch (JMSException e) {
- LOG.warn("Unable to close session " + receiver.getSession(), e);
- }
- }
+ public void reestablishConnection(JMSConnectionInfo connInfo) {
+ connectionMap.remove(connInfo);
+ // Queue the connection and topics for retry
+ ConnectionRetryInfo connRetryInfo = queueConnectionForRetry(connInfo);
+ Map<String, MessageReceiver> listeningTopicsMap = receiversMap.remove(connInfo);
+ if (listeningTopicsMap != null) {
+ Map<String, MessageHandler> retryTopicsMap = connRetryInfo.getTopicsToRetry();
+ for (Entry<String, MessageReceiver> topicEntry : listeningTopicsMap.entrySet()) {
+ MessageReceiver receiver = topicEntry.getValue();
+ retryTopicsMap.put(topicEntry.getKey(), receiver.getMessageHandler());
+ try {
+ receiver.getSession().close();
+ }
+ catch (JMSException e) {
+ LOG.warn("Unable to close session " + receiver.getSession(), e);
}
}
}
}
- private void scheduleRetry(String jmsConnectString, long delay) {
- LOG.info("Scheduling retry of connection [{0}] in [{1}] seconds", jmsConnectString, delay);
- JMSRetryRunnable runnable = new JMSRetryRunnable(jmsConnectString);
+ private void scheduleRetry(JMSConnectionInfo connInfo, long delay) {
+ LOG.info("Scheduling retry of connection [{0}] in [{1}] seconds", connInfo, delay);
+ JMSRetryRunnable runnable = new JMSRetryRunnable(connInfo);
SchedulerService scheduler = Services.get().get(SchedulerService.class);
scheduler.schedule(runnable, delay, SchedulerService.Unit.SEC);
}
@VisibleForTesting
- boolean retryConnection(String jmsConnectString) {
- ConnectionAttempt attempt = retryConnectionsMap.get(jmsConnectString);
- if (attempt.getNumAttempt() >= retryMaxAttempts) {
- LOG.info("Not attempting jms connection [{0}] again. Reached max attempts [{1}]", jmsConnectString,
- retryMaxAttempts);
+ boolean retryConnection(JMSConnectionInfo connInfo) {
+ ConnectionRetryInfo connRetryInfo = retryConnectionsMap.get(connInfo);
+ if (connRetryInfo.getNumAttempt() >= retryMaxAttempts) {
+ LOG.info("Not attempting connection [{0}] again. Reached max attempts [{1}]", connInfo, retryMaxAttempts);
return false;
}
- LOG.info("Attempting retry of connection [{0}]", jmsConnectString);
- attempt.setNumAttempt(attempt.getNumAttempt() + 1);
- attempt.setNextDelay(attempt.getNextDelay() * retryMultiplier);
- ConnectionContext connCtxt = createConnectionContext(jmsConnectString);
- boolean removeRetryConnection = true;
+ LOG.info("Attempting retry of connection [{0}]", connInfo);
+ connRetryInfo.setNumAttempt(connRetryInfo.getNumAttempt() + 1);
+ connRetryInfo.setNextDelay(connRetryInfo.getNextDelay() * retryMultiplier);
+ ConnectionContext connCtxt = createConnectionContext(connInfo);
+ boolean shouldRetry = false;
if (connCtxt == null) {
- removeRetryConnection = false;
+ shouldRetry = true;
}
else {
- for (Entry<String, String> entry : publisherConnectStringMap.entrySet()) {
- if (entry.getValue().equals(jmsConnectString)) {
- String publisherAuthority = entry.getKey();
- Map<String, MessageHandler> retryTopics = retryTopicsMap.get(publisherAuthority);
- if (retryTopics != null) {
- List<String> topicsToRemoveList = new ArrayList<String>();
- // For each topic in the retry list, try to register for the topic
- for (Entry<String, MessageHandler> topicEntry : retryTopics.entrySet()) {
- String topic = topicEntry.getKey();
- Map<String, MessageReceiver> topicsMap = getReceiversTopicsMap(publisherAuthority);
- if (topicsMap.containsKey(topic)) {
- continue;
- }
- synchronized (topicsMap) {
- if (!topicsMap.containsKey(topic)) {
- MessageReceiver receiver = registerForTopic(connCtxt, publisherAuthority, topic,
- topicEntry.getValue());
- if (receiver == null) {
- queueTopicForRetry(publisherAuthority, topic, topicEntry.getValue());
- removeRetryConnection = false;
- }
- else {
- topicsMap.put(topic, receiver);
- topicsToRemoveList.add(topic);
- LOG.info("Registered a listener for topic {0} from publisher {1}", topic,
- publisherAuthority);
- }
- }
- }
+ Map<String, MessageHandler> retryTopicsMap = connRetryInfo.getTopicsToRetry();
+ Map<String, MessageReceiver> listeningTopicsMap = getReceiversTopicsMap(connInfo);
+ List<String> topicsToRemoveList = new ArrayList<String>();
+ // For each topic in the retry list, try to register the MessageHandler for that topic
+ for (Entry<String, MessageHandler> topicEntry : retryTopicsMap.entrySet()) {
+ String topic = topicEntry.getKey();
+ if (listeningTopicsMap.containsKey(topic)) {
+ continue;
+ }
+ synchronized (listeningTopicsMap) {
+ if (!listeningTopicsMap.containsKey(topic)) {
+ MessageReceiver receiver = registerForTopic(connInfo, connCtxt, topic, topicEntry.getValue());
+ if (receiver == null) {
+ LOG.warn("Failed to register a listener for topic {0} on {1}", topic, connInfo);
}
- for (String topic : topicsToRemoveList) {
- retryTopics.remove(topic);
+ else {
+ listeningTopicsMap.put(topic, receiver);
+ topicsToRemoveList.add(topic);
+ LOG.info("Registered a listener for topic {0} on {1}", topic, connInfo);
}
}
- if (retryTopics.isEmpty()) {
- retryTopicsMap.remove(publisherAuthority);
- }
}
}
+ for (String topic : topicsToRemoveList) {
+ retryTopicsMap.remove(topic);
+ }
+ if (retryTopicsMap.isEmpty()) {
+ shouldRetry = false;
+ }
}
- if (removeRetryConnection) {
- retryConnectionsMap.remove(jmsConnectString);
+
+ if (shouldRetry) {
+ scheduleRetry(connInfo, connRetryInfo.getNextDelay());
}
else {
- scheduleRetry(jmsConnectString, attempt.getNextDelay());
+ retryConnectionsMap.remove(connInfo);
}
return true;
}
- private static class ConnectionAttempt {
+ private static class ConnectionRetryInfo {
private int numAttempt;
private int nextDelay;
+ private boolean retryConnection;
+ private Map<String, MessageHandler> retryTopicsMap;
- public ConnectionAttempt(int numAttempt, int nextDelay) {
+ public ConnectionRetryInfo(int numAttempt, int nextDelay) {
this.numAttempt = numAttempt;
this.nextDelay = nextDelay;
+ this.retryTopicsMap = new HashMap<String, MessageHandler>();
}
public int getNumAttempt() {
@@ -546,23 +431,35 @@ public class JMSAccessorService implemen
this.nextDelay = nextDelay;
}
+ public Map<String, MessageHandler> getTopicsToRetry() {
+ return retryTopicsMap;
+ }
+
+ public boolean shouldRetryConnection() {
+ return retryConnection;
+ }
+
+ public void setRetryConnection(boolean retryConnection) {
+ this.retryConnection = retryConnection;
+ }
+
}
public class JMSRetryRunnable implements Runnable {
- private String jmsConnectString;
+ private JMSConnectionInfo connInfo;
- public JMSRetryRunnable(String jmsConnectString) {
- this.jmsConnectString = jmsConnectString;
+ public JMSRetryRunnable(JMSConnectionInfo connInfo) {
+ this.connInfo = connInfo;
}
- public String getJmsConnectString() {
- return jmsConnectString;
+ public JMSConnectionInfo getJMSConnectionInfo() {
+ return connInfo;
}
@Override
public void run() {
- retryConnection(jmsConnectString);
+ retryConnection(connInfo);
}
}
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/MetaDataAccessorException.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/MetaDataAccessorException.java?rev=1443694&r1=1443693&r2=1443694&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/MetaDataAccessorException.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/MetaDataAccessorException.java Thu Feb 7 20:10:55 2013
@@ -19,8 +19,9 @@ package org.apache.oozie.service;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.XException;
+import org.apache.oozie.dependency.URIHandlerException;
-public class MetaDataAccessorException extends URIAccessorException {
+public class MetaDataAccessorException extends URIHandlerException {
/**
* Create an MetaDataAccessor Exception exception from a XException.
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java?rev=1443694&r1=1443693&r2=1443694&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java Thu Feb 7 20:10:55 2013
@@ -26,7 +26,6 @@ import org.apache.oozie.command.coord.Co
import org.apache.oozie.dependency.cache.HCatDependencyCache;
import org.apache.oozie.dependency.cache.SimpleHCatDependencyCache;
import org.apache.oozie.util.HCatURI;
-import org.apache.oozie.util.XCallable;
import org.apache.oozie.util.XLog;
/**
@@ -110,7 +109,14 @@ public class PartitionDependencyManagerS
partitions);
if (actionsWithAvailableDep != null) {
for (String actionID : actionsWithAvailableDep) {
- queueCallable(new CoordActionUpdatePushMissingDependency(actionID), 100);
+ boolean ret = Services.get().get(CallableQueueService.class)
+ .queue(new CoordActionUpdatePushMissingDependency(actionID), 100);
+ if (ret == false) {
+ XLog.getLog(getClass()).warn(
+ "Unable to queue the callable commands for PartitionDependencyManagerService for actionID "
+ + actionID + ".Most possibly command queue is full. Queue size is :"
+ + Services.get().get(CallableQueueService.class).queueSize());
+ }
}
}
}
@@ -136,14 +142,4 @@ public class PartitionDependencyManagerS
dependencyCache.removeAvailableDependencyURIs(actionID, dependencyURIs);
}
- private void queueCallable(XCallable<?> callable, int delay) {
- boolean ret = Services.get().get(CallableQueueService.class).queue(callable, delay);
- if (ret == false) {
- XLog.getLog(getClass()).warn(
- "Unable to queue the callable commands for PartitionDependencyManagerService. "
- + "Most possibly command queue is full. Queue size is :"
- + Services.get().get(CallableQueueService.class).queueSize());
- }
- }
-
}
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/RecoveryService.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/RecoveryService.java?rev=1443694&r1=1443693&r2=1443694&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/RecoveryService.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/RecoveryService.java Thu Feb 7 20:10:55 2013
@@ -119,10 +119,6 @@ public class RecoveryService implements
private List<XCallable<?>> delayedCallables;
private StringBuilder msg = null;
private JPAService jpaService = null;
- URIHandlerService uriService = Services.get().get(URIHandlerService.class);
- JMSAccessorService jmsService = Services.get().get(JMSAccessorService.class);
- PartitionDependencyManagerService pdms = Services.get().get(PartitionDependencyManagerService.class);
-
public RecoveryRunnable(long olderThan, long coordOlderThan,long bundleOlderThan) {
this.olderThan = olderThan;