You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by vi...@apache.org on 2013/02/25 22:42:09 UTC

svn commit: r1449911 [2/8] - in /oozie/trunk: ./ client/src/main/java/org/apache/oozie/cli/ client/src/main/java/org/apache/oozie/client/ client/src/main/java/org/apache/oozie/client/rest/ client/src/test/java/org/apache/oozie/client/rest/ core/ core/s...

Added: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdatePushMissingDependency.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdatePushMissingDependency.java?rev=1449911&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdatePushMissingDependency.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdatePushMissingDependency.java Mon Feb 25 21:42:07 2013
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.command.coord;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.dependency.DependencyChecker;
+import org.apache.oozie.service.PartitionDependencyManagerService;
+import org.apache.oozie.service.Services;
+
+public class CoordActionUpdatePushMissingDependency extends CoordPushDependencyCheckXCommand {
+
+    public CoordActionUpdatePushMissingDependency(String actionId) {
+        super("coord_action_push_md", actionId);
+    }
+
+    @Override
+    protected Void execute() throws CommandException {
+        LOG.info("STARTED for Action id [{0}]", actionId);
+        String pushMissingDeps = coordAction.getPushMissingDependencies();
+        if (pushMissingDeps == null || pushMissingDeps.length() == 0) {
+            LOG.info("Nothing to check. Empty push missing dependency");
+        }
+        else {
+            PartitionDependencyManagerService pdms = Services.get().get(PartitionDependencyManagerService.class);
+            Collection<String> availDepList = pdms.getAvailableDependencyURIs(actionId);
+            if (availDepList == null || availDepList.size() == 0) {
+                LOG.info("There are no available dependencies for action ID: [{0}]", actionId);
+                if (isTimeout()) { // Poll and check as one last try
+                    queue(new CoordPushDependencyCheckXCommand(coordAction.getId()), 100);
+                }
+            }
+            else {
+                LOG.debug("Updating action ID [{0}] with available uris=[{1}] where missing uris=[{2}]", actionId,
+                        availDepList.toString(), pushMissingDeps);
+
+                String[] missingDepsArray = DependencyChecker.dependenciesAsArray(pushMissingDeps);
+                List<String> stillMissingDepsList = new ArrayList<String>(Arrays.asList(missingDepsArray));
+                stillMissingDepsList.removeAll(availDepList);
+                boolean isChangeInDependency = true;
+                if (stillMissingDepsList.size() == 0) {
+                    // All push-based dependencies are available
+                    onAllPushDependenciesAvailable();
+                }
+                else {
+                    if (stillMissingDepsList.size() == missingDepsArray.length) {
+                        isChangeInDependency = false;
+                    }
+                    else {
+                        String stillMissingDeps = DependencyChecker.dependenciesAsString(stillMissingDepsList);
+                        coordAction.setPushMissingDependencies(stillMissingDeps);
+                    }
+                    if (isTimeout()) { // Poll and check as one last try
+                        queue(new CoordPushDependencyCheckXCommand(coordAction.getId()), 100);
+                    }
+                }
+                updateCoordAction(coordAction, isChangeInDependency);
+                removeAvailableDependencies(pdms, availDepList);
+                LOG.info("ENDED for Action id [{0}]", actionId);
+            }
+        }
+        return null;
+    }
+
+    private void removeAvailableDependencies(PartitionDependencyManagerService pdms, Collection<String> availDepList) {
+        if (pdms.removeAvailableDependencyURIs(actionId, availDepList)) {
+            LOG.debug("Successfully removed uris [{0}] for actionId: [{1}] from available list",
+                    availDepList.toString(), actionId);
+        }
+        else {
+            LOG.warn("Failed to remove uris [{0}] for actionId: [{1}] from available list", availDepList.toString(),
+                    actionId);
+        }
+    }
+
+    @Override
+    public String getEntityKey() {
+        return actionId;
+    }
+
+}

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java?rev=1449911&r1=1449910&r2=1449911&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java Mon Feb 25 21:42:07 2013
@@ -18,9 +18,12 @@
 package org.apache.oozie.command.coord;
 
 import java.io.StringReader;
+import java.net.URI;
 import java.util.Calendar;
 import java.util.Date;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.oozie.CoordinatorActionBean;
@@ -33,7 +36,11 @@ import org.apache.oozie.coord.CoordUtils
 import org.apache.oozie.coord.CoordinatorJobException;
 import org.apache.oozie.coord.SyncCoordAction;
 import org.apache.oozie.coord.TimeUnit;
+import org.apache.oozie.dependency.DependencyChecker;
+import org.apache.oozie.dependency.URIHandler;
+import org.apache.oozie.dependency.URIHandler.DependencyType;
 import org.apache.oozie.service.Services;
+import org.apache.oozie.service.URIHandlerService;
 import org.apache.oozie.service.UUIDService;
 import org.apache.oozie.util.DateUtils;
 import org.apache.oozie.util.ELEvaluator;
@@ -47,11 +54,13 @@ public class CoordCommandUtils {
     public static int FUTURE = 2;
     public static int OFFSET = 3;
     public static int UNEXPECTED = -1;
-    public static final String RESOLVED_UNRESOLVED_SEPARATOR = ";";
+    public static final String RESOLVED_UNRESOLVED_SEPARATOR = "!!";
+    public static final String UNRESOLVED_INST_TAG = "unresolved-instances";
 
     /**
      * parse a function like coord:latest(n)/future() and return the 'n'.
      * <p/>
+     *
      * @param function
      * @param event
      * @param appInst
@@ -197,8 +206,8 @@ public class CoordCommandUtils {
                 if (startCal != null && endCal != null) {
                     List<Integer> expandedFreqs = CoordELFunctions.expandOffsetTimes(startCal, endCal, eval);
                     for (int i = expandedFreqs.size() - 1; i >= 0; i--) {
-                        String matInstance = materializeInstance(event, "${coord:offset(" + expandedFreqs.get(i) + ", \"MINUTE\")}",
-                                                                    appInst, conf, eval);
+                        String matInstance = materializeInstance(event, "${coord:offset(" + expandedFreqs.get(i)
+                                + ", \"MINUTE\")}", appInst, conf, eval);
                         if (matInstance == null || matInstance.length() == 0) {
                             // Earlier than dataset's initial instance
                             break;
@@ -219,7 +228,8 @@ public class CoordCommandUtils {
                 if (funcType == CURRENT) {
                     // Everything could be resolved NOW. no latest() ELs
                     for (int i = endIndex; i >= startIndex; i--) {
-                        String matInstance = materializeInstance(event, "${coord:current(" + i + ")}", appInst, conf, eval);
+                        String matInstance = materializeInstance(event, "${coord:current(" + i + ")}", appInst, conf,
+                                eval);
                         if (matInstance == null || matInstance.length() == 0) {
                             // Earlier than dataset's initial instance
                             break;
@@ -273,28 +283,29 @@ public class CoordCommandUtils {
      *
      * @param event
      * @param instances
-     * @param dependencyList
      * @throws Exception
      */
-    public static void separateResolvedAndUnresolved(Element event, StringBuilder instances, StringBuffer dependencyList)
+    private static String separateResolvedAndUnresolved(Element event, StringBuilder instances)
             throws Exception {
         StringBuilder unresolvedInstances = new StringBuilder();
         StringBuilder urisWithDoneFlag = new StringBuilder();
+        StringBuilder depList = new StringBuilder();
         String uris = createEarlyURIs(event, instances.toString(), unresolvedInstances, urisWithDoneFlag);
         if (uris.length() > 0) {
             Element uriInstance = new Element("uris", event.getNamespace());
             uriInstance.addContent(uris);
             event.getContent().add(1, uriInstance);
-            if (dependencyList.length() > 0) {
-                dependencyList.append(CoordELFunctions.INSTANCE_SEPARATOR);
+            if (depList.length() > 0) {
+                depList.append(CoordELFunctions.INSTANCE_SEPARATOR);
             }
-            dependencyList.append(urisWithDoneFlag);
+            depList.append(urisWithDoneFlag);
         }
         if (unresolvedInstances.length() > 0) {
-            Element elemInstance = new Element("unresolved-instances", event.getNamespace());
+            Element elemInstance = new Element(UNRESOLVED_INST_TAG, event.getNamespace());
             elemInstance.addContent(unresolvedInstances.toString());
             event.getContent().add(1, elemInstance);
         }
+        return depList.toString();
     }
 
     /**
@@ -318,10 +329,10 @@ public class CoordCommandUtils {
 
         Element doneFlagElement = event.getChild("dataset", event.getNamespace()).getChild("done-flag",
                 event.getNamespace());
-        String doneFlag = CoordUtils.getDoneFlag(doneFlagElement);
+        URIHandlerService uriService = Services.get().get(URIHandlerService.class);
 
         for (int i = 0; i < instanceList.length; i++) {
-            if(instanceList[i].trim().length() == 0) {
+            if (instanceList[i].trim().length() == 0) {
                 continue;
             }
             int funcType = getFuncType(instanceList[i]);
@@ -340,11 +351,10 @@ public class CoordCommandUtils {
 
             String uriPath = CoordELFunctions.evalAndWrap(eval, event.getChild("dataset", event.getNamespace())
                     .getChild("uri-template", event.getNamespace()).getTextTrim());
+            URIHandler uriHandler = uriService.getURIHandler(uriPath);
+            uriHandler.validate(uriPath);
             uris.append(uriPath);
-            if (doneFlag.length() > 0) {
-                uriPath += "/" + doneFlag;
-            }
-            urisWithDoneFlag.append(uriPath);
+            urisWithDoneFlag.append(uriHandler.getURIWithDoneFlag(uriPath, CoordUtils.getDoneFlag(doneFlagElement)));
         }
         return uris.toString();
     }
@@ -415,22 +425,20 @@ public class CoordCommandUtils {
         appInst.setTimeZone(DateUtils.getTimeZone(eAction.getAttributeValue("timezone")));
         appInst.setEndOfDuration(TimeUnit.valueOf(eAction.getAttributeValue("end_of_duration")));
 
-        StringBuffer dependencyList = new StringBuffer();
+        Map<String, StringBuilder> dependencyMap = null;
 
         Element inputList = eAction.getChild("input-events", eAction.getNamespace());
         List<Element> dataInList = null;
         if (inputList != null) {
             dataInList = inputList.getChildren("data-in", eAction.getNamespace());
-            materializeDataEvents(dataInList, appInst, conf, dependencyList);
+            dependencyMap = materializeDataEvents(dataInList, appInst, conf);
         }
 
         Element outputList = eAction.getChild("output-events", eAction.getNamespace());
         List<Element> dataOutList = null;
         if (outputList != null) {
             dataOutList = outputList.getChildren("data-out", eAction.getNamespace());
-            StringBuffer tmp = new StringBuffer();
-            // no dependency checks
-            materializeDataEvents(dataOutList, appInst, conf, tmp);
+            materializeDataEvents(dataOutList, appInst, conf);
         }
 
         eAction.removeAttribute("start");
@@ -439,8 +447,9 @@ public class CoordCommandUtils {
         eAction.setAttribute("action-nominal-time", DateUtils.formatDateOozieTZ(nominalTime));
         eAction.setAttribute("action-actual-time", DateUtils.formatDateOozieTZ(actualTime));
 
-        boolean isSla = CoordCommandUtils.materializeSLA(eAction.getChild("action", eAction.getNamespace()).getChild(
-                "info", eAction.getNamespace("sla")), nominalTime, conf);
+        boolean isSla = CoordCommandUtils.materializeSLA(
+                eAction.getChild("action", eAction.getNamespace()).getChild("info", eAction.getNamespace("sla")),
+                nominalTime, conf);
 
         // Setting up action bean
         actionBean.setCreatedConf(XmlUtils.prettyPrint(conf).toString());
@@ -451,7 +460,16 @@ public class CoordCommandUtils {
         actionBean.setLastModifiedTime(new Date());
         actionBean.setStatus(CoordinatorAction.Status.WAITING);
         actionBean.setActionNumber(instanceCount);
-        actionBean.setMissingDependencies(dependencyList.toString());
+        if (dependencyMap != null) {
+            StringBuilder sbPull = dependencyMap.get(DependencyType.PULL.name());
+            if (sbPull != null) {
+                actionBean.setMissingDependencies(sbPull.toString());
+            }
+            StringBuilder sbPush = dependencyMap.get(DependencyType.PUSH.name());
+            if (sbPush != null) {
+                actionBean.setPushMissingDependencies(sbPush.toString());
+            }
+        }
         actionBean.setNominalTime(nominalTime);
         if (isSla == true) {
             actionBean.setSlaXml(XmlUtils.prettyPrint(
@@ -469,14 +487,20 @@ public class CoordCommandUtils {
         }
         else {
             String action = XmlUtils.prettyPrint(eAction).toString();
-            CoordActionInputCheckXCommand coordActionInput = new CoordActionInputCheckXCommand(actionBean.getId(), actionBean.getJobId());
             StringBuilder actionXml = new StringBuilder(action);
-            StringBuilder existList = new StringBuilder();
-            StringBuilder nonExistList = new StringBuilder();
-            StringBuilder nonResolvedList = new StringBuilder();
-            getResolvedList(actionBean.getMissingDependencies(), nonExistList, nonResolvedList);
             Configuration actionConf = new XConfiguration(new StringReader(actionBean.getRunConf()));
-            coordActionInput.checkInput(actionXml, existList, nonExistList, actionConf);
+            if (actionBean.getPushMissingDependencies() != null) {
+                DependencyChecker.checkForAvailability(actionBean.getPushMissingDependencies(), actionConf, true);
+            }
+            if (actionBean.getMissingDependencies() != null) {
+                CoordActionInputCheckXCommand coordActionInput = new CoordActionInputCheckXCommand(actionBean.getId(),
+                        actionBean.getJobId());
+                StringBuilder existList = new StringBuilder();
+                StringBuilder nonExistList = new StringBuilder();
+                StringBuilder nonResolvedList = new StringBuilder();
+                getResolvedList(actionBean.getMissingDependencies(), nonExistList, nonResolvedList);
+                coordActionInput.checkInput(actionXml, existList, nonExistList, actionConf);
+            }
             return actionXml.toString();
         }
     }
@@ -491,13 +515,18 @@ public class CoordCommandUtils {
      * @param conf
      * @throws Exception
      */
-    public static void materializeDataEvents(List<Element> events, SyncCoordAction appInst, Configuration conf,
-            StringBuffer dependencyList) throws Exception {
+    public static Map<String, StringBuilder> materializeDataEvents(List<Element> events, SyncCoordAction appInst, Configuration conf
+            ) throws Exception {
 
         if (events == null) {
-            return;
+            return null;
         }
-        StringBuffer unresolvedList = new StringBuffer();
+        StringBuilder unresolvedList = new StringBuilder();
+        Map<String, StringBuilder> dependencyMap = new HashMap<String, StringBuilder>();
+        URIHandlerService uriService = Services.get().get(URIHandlerService.class);
+        StringBuilder pullMissingDep = null;
+        StringBuilder pushMissingDep = null;
+
         for (Element event : events) {
             StringBuilder instances = new StringBuilder();
             ELEvaluator eval = CoordELEvaluator.createInstancesELEvaluator(event, appInst, conf);
@@ -506,8 +535,24 @@ public class CoordCommandUtils {
             // Handle start-instance and end-instance
             resolveInstanceRange(event, instances, appInst, conf, eval);
             // Separate out the unresolved instances
-            separateResolvedAndUnresolved(event, instances, dependencyList);
-            String tmpUnresolved = event.getChildTextTrim("unresolved-instances", event.getNamespace());
+            String resolvedList = separateResolvedAndUnresolved(event, instances);
+            if (!resolvedList.isEmpty()) {
+                Element uri = event.getChild("dataset", event.getNamespace()).getChild("uri-template",
+                        event.getNamespace());
+                String uriTemplate = uri.getText();
+                URI baseURI = uriService.getAuthorityWithScheme(uriTemplate);
+                URIHandler handler = uriService.getURIHandler(baseURI);
+                if (handler.getDependencyType(baseURI).equals(DependencyType.PULL)) {
+                    pullMissingDep = (pullMissingDep == null) ? new StringBuilder(resolvedList) : pullMissingDep.append(
+                            CoordELFunctions.INSTANCE_SEPARATOR).append(resolvedList);
+                }
+                else {
+                    pushMissingDep = (pushMissingDep == null) ? new StringBuilder(resolvedList) : pushMissingDep.append(
+                            CoordELFunctions.INSTANCE_SEPARATOR).append(resolvedList);
+                }
+            }
+
+            String tmpUnresolved = event.getChildTextTrim(UNRESOLVED_INST_TAG, event.getNamespace());
             if (tmpUnresolved != null) {
                 if (unresolvedList.length() > 0) {
                     unresolvedList.append(CoordELFunctions.INSTANCE_SEPARATOR);
@@ -516,10 +561,14 @@ public class CoordCommandUtils {
             }
         }
         if (unresolvedList.length() > 0) {
-            dependencyList.append(RESOLVED_UNRESOLVED_SEPARATOR);
-            dependencyList.append(unresolvedList);
+            if (pullMissingDep == null) {
+                pullMissingDep = new StringBuilder();
+            }
+            pullMissingDep.append(RESOLVED_UNRESOLVED_SEPARATOR).append(unresolvedList);
         }
-        return;
+        dependencyMap.put(DependencyType.PULL.name(), pullMissingDep);
+        dependencyMap.put(DependencyType.PUSH.name(), pushMissingDep);
+        return dependencyMap;
     }
 
     /**
@@ -538,7 +587,7 @@ public class CoordCommandUtils {
             }
             else {
                 resolved.append(missDepList.substring(0, index));
-                unresolved.append(missDepList.substring(index + 1));
+                unresolved.append(missDepList.substring(index + RESOLVED_UNRESOLVED_SEPARATOR.length()));
             }
         }
         return resolved.toString();

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java?rev=1449911&r1=1449910&r2=1449911&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java Mon Feb 25 21:42:07 2013
@@ -32,6 +32,7 @@ import org.apache.oozie.SLAEventBean;
 import org.apache.oozie.client.CoordinatorJob;
 import org.apache.oozie.client.Job;
 import org.apache.oozie.client.SLAEvent.SlaAppType;
+import org.apache.oozie.client.rest.JsonBean;
 import org.apache.oozie.command.CommandException;
 import org.apache.oozie.command.MaterializeTransitionXCommand;
 import org.apache.oozie.command.PreconditionException;
@@ -106,6 +107,16 @@ public class CoordMaterializeTransitionX
     public void performWrites() throws CommandException {
         try {
             jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
+            // register the partition related dependencies of actions
+            for (JsonBean actionBean : insertList) {
+                if (actionBean instanceof CoordinatorActionBean) {
+                    CoordinatorActionBean coordAction = (CoordinatorActionBean) actionBean;
+                    if (coordAction.getPushMissingDependencies() != null) {
+                        // TODO: Delay in catchup mode?
+                        queue(new CoordPushDependencyCheckXCommand(coordAction.getId(), true), 100);
+                    }
+                }
+            }
         }
         catch (JPAExecutorException jex) {
             throw new CommandException(jex);
@@ -329,6 +340,7 @@ public class CoordMaterializeTransitionX
 
             if (!dryrun) {
                 storeToDB(actionBean, action); // Storing to table
+
             }
             else {
                 actionStrings.append("action for new instance");

Added: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java?rev=1449911&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java Mon Feb 25 21:42:07 2013
@@ -0,0 +1,343 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.command.coord;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.net.URI;
+import java.util.Date;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.XException;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.Job;
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.command.PreconditionException;
+import org.apache.oozie.dependency.DependencyChecker;
+import org.apache.oozie.dependency.ActionDependency;
+import org.apache.oozie.dependency.URIHandler;
+import org.apache.oozie.executor.jpa.CoordActionGetForInputCheckJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordActionUpdateForModifiedTimeJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordActionUpdatePushInputCheckJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.Service;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.service.URIHandlerService;
+import org.apache.oozie.util.LogUtils;
+import org.apache.oozie.util.StatusUtils;
+import org.apache.oozie.util.XConfiguration;
+
+public class CoordPushDependencyCheckXCommand extends CoordinatorXCommand<Void> {
+    protected String actionId;
+    protected JPAService jpaService = null;
+    protected CoordinatorActionBean coordAction = null;
+    protected CoordinatorJobBean coordJob = null;
+
+    /**
+     * Property name of command re-queue interval for coordinator push check in
+     * milliseconds.
+     */
+    public static final String CONF_COORD_PUSH_CHECK_REQUEUE_INTERVAL = Service.CONF_PREFIX
+            + "coord.push.check.requeue.interval";
+    /**
+     * Default re-queue interval in ms. It is applied when no value defined in
+     * the oozie configuration.
+     */
+    private final int DEFAULT_COMMAND_REQUEUE_INTERVAL = 600000;
+    private boolean registerForNotification;
+
+    public CoordPushDependencyCheckXCommand(String actionId) {
+        this(actionId, false);
+    }
+
+    public CoordPushDependencyCheckXCommand(String actionId, boolean registerForNotification) {
+        super("coord_push_dep_check", "coord_push_dep_check", 0);
+        this.actionId = actionId;
+        this.registerForNotification = registerForNotification;
+    }
+
+    protected CoordPushDependencyCheckXCommand(String actionName, String actionId) {
+        super(actionName, actionName, 0);
+        this.actionId = actionId;
+    }
+
+    @Override
+    protected Void execute() throws CommandException {
+        String pushMissingDeps = coordAction.getPushMissingDependencies();
+        if (pushMissingDeps == null || pushMissingDeps.length() == 0) {
+            LOG.info("Nothing to check. Empty push missing dependency");
+        }
+        else {
+            LOG.info("Push missing dependencies for actionID [{0}] is [{1}] ", actionId, pushMissingDeps);
+
+            try {
+                Configuration actionConf = null;
+                try {
+                    actionConf = new XConfiguration(new StringReader(coordAction.getRunConf()));
+                }
+                catch (IOException e) {
+                    throw new CommandException(ErrorCode.E1307, e.getMessage(), e);
+                }
+
+                String[] missingDepsArray = DependencyChecker.dependenciesAsArray(pushMissingDeps);
+                // Check all dependencies during materialization to avoid registering in the cache.
+                // But check only first missing one afterwards similar to
+                // CoordActionInputCheckXCommand for efficiency. listPartitions is costly.
+                ActionDependency actionDep = DependencyChecker.checkForAvailability(missingDepsArray, actionConf,
+                        !registerForNotification);
+
+                boolean isChangeInDependency = true;
+                boolean timeout = false;
+                if (actionDep.getMissingDependencies().size() == 0) {
+                    // All push-based dependencies are available
+                    onAllPushDependenciesAvailable();
+                }
+                else {
+                    if (actionDep.getMissingDependencies().size() == missingDepsArray.length) {
+                        isChangeInDependency = false;
+                    }
+                    else {
+                        String stillMissingDeps = DependencyChecker.dependenciesAsString(actionDep
+                                .getMissingDependencies());
+                        coordAction.setPushMissingDependencies(stillMissingDeps);
+                    }
+                    // Checking for timeout
+                    timeout = isTimeout();
+                    if (timeout) {
+                        queue(new CoordActionTimeOutXCommand(coordAction), 100);
+                    }
+                    else {
+                        queue(new CoordPushDependencyCheckXCommand(coordAction.getId()),
+                                getCoordPushCheckRequeueInterval());
+                    }
+                }
+
+                updateCoordAction(coordAction, isChangeInDependency);
+                if (registerForNotification) {
+                    registerForNotification(actionDep.getMissingDependencies(), actionConf);
+                }
+                else {
+                    unregisterAvailableDependencies(actionDep);
+                }
+                if (timeout) {
+                    unregisterMissingDependencies(actionDep.getMissingDependencies());
+                }
+            }
+            catch (Exception e) {
+                if (isTimeout()) {
+                    queue(new CoordActionTimeOutXCommand(coordAction), 100);
+                }
+                throw new CommandException(ErrorCode.E1021, e.getMessage(), e);
+            }
+        }
+        return null;
+    }
+
+    /**
+     * Return the re-queue interval for coord push dependency check
+     * @return
+     */
+    public long getCoordPushCheckRequeueInterval() {
+        long requeueInterval = Services.get().getConf().getLong(CONF_COORD_PUSH_CHECK_REQUEUE_INTERVAL,
+                DEFAULT_COMMAND_REQUEUE_INTERVAL);
+        return requeueInterval;
+    }
+
+    /**
+     * Returns true if timeout period has been reached
+     *
+     * @return true if it is time for timeout else false
+     */
+    protected boolean isTimeout() {
+        long waitingTime = (new Date().getTime() - Math.max(coordAction.getNominalTime().getTime(), coordAction
+                .getCreatedTime().getTime()))
+                / (60 * 1000);
+        int timeOut = coordAction.getTimeOut();
+        return (timeOut >= 0) && (waitingTime > timeOut);
+    }
+
+    protected void onAllPushDependenciesAvailable() {
+        coordAction.setPushMissingDependencies("");
+        if (coordAction.getMissingDependencies() == null || coordAction.getMissingDependencies().length() == 0) {
+            Date nominalTime = coordAction.getNominalTime();
+            Date currentTime = new Date();
+            // The action should become READY only if current time > nominal time;
+            // CoordActionInputCheckXCommand will take care of moving it to READY when it is nominal time.
+            if (nominalTime.compareTo(currentTime) > 0) {
+                LOG.info("[" + actionId + "]::ActionInputCheck:: nominal Time is newer than current time. Current="
+                        + currentTime + ", nominal=" + nominalTime);
+            }
+            else {
+                coordAction.setStatus(CoordinatorAction.Status.READY);
+                // pass jobID to the CoordActionReadyXCommand
+                queue(new CoordActionReadyXCommand(coordAction.getJobId()), 100);
+            }
+        }
+    }
+
+    protected void updateCoordAction(CoordinatorActionBean coordAction, boolean isChangeInDependency)
+            throws CommandException {
+        coordAction.setLastModifiedTime(new Date());
+        if (jpaService != null) {
+            try {
+                if (isChangeInDependency) {
+                    jpaService.execute(new CoordActionUpdatePushInputCheckJPAExecutor(coordAction));
+                }
+                else {
+                    jpaService.execute(new CoordActionUpdateForModifiedTimeJPAExecutor(coordAction));
+                }
+            }
+            catch (JPAExecutorException jex) {
+                throw new CommandException(ErrorCode.E1021, jex.getMessage(), jex);
+            }
+        }
+    }
+
+    private void registerForNotification(List<String> missingDeps, Configuration actionConf) {
+        URIHandlerService uriService = Services.get().get(URIHandlerService.class);
+        String user = actionConf.get(OozieClient.USER_NAME, OozieClient.USER_NAME);
+        for (String missingDep : missingDeps) {
+            try {
+                URI missingURI = new URI(missingDep);
+                URIHandler handler = uriService.getURIHandler(missingURI);
+                handler.registerForNotification(missingURI, actionConf, user, actionId);
+                    LOG.debug("Registered uri [{0}] for actionId: [{1}] for notifications",
+                            missingURI, actionId);
+            }
+            catch (Exception e) {
+                LOG.warn("Exception while registering uri for actionId: [{0}] for notifications", actionId, e);
+            }
+        }
+    }
+
+    private void unregisterAvailableDependencies(ActionDependency actionDependency) {
+        URIHandlerService uriService = Services.get().get(URIHandlerService.class);
+        for (String availableDep : actionDependency.getAvailableDependencies()) {
+            try {
+                URI availableURI = new URI(availableDep);
+                URIHandler handler = uriService.getURIHandler(availableURI);
+                if (handler.unregisterFromNotification(availableURI, actionId)) {
+                    LOG.debug("Successfully unregistered uri [{0}] for actionId: [{1}] from notifications",
+                            availableURI, actionId);
+                }
+                else {
+                    LOG.warn("Unable to unregister uri [{0}] for actionId: [{1}] from notifications", availableURI,
+                            actionId);
+                }
+            }
+            catch (Exception e) {
+                LOG.warn("Exception while unregistering uri for actionId: [{0}] for notifications", actionId, e);
+            }
+        }
+    }
+
+    private void unregisterMissingDependencies(List<String> missingDeps) {
+        URIHandlerService uriService = Services.get().get(URIHandlerService.class);
+        for (String missingDep : missingDeps) {
+            try {
+                URI missingURI = new URI(missingDep);
+                URIHandler handler = uriService.getURIHandler(missingURI);
+                if (handler.unregisterFromNotification(missingURI, actionId)) {
+                    LOG.debug("Successfully unregistered uri [{0}] for actionId: [{1}] from notifications", missingURI,
+                            actionId);
+                }
+                else {
+                    LOG.warn("Unable to unregister uri [{0}] for actionId: [{1}] from notifications", missingURI,
+                            actionId);
+                }
+            }
+            catch (Exception e) {
+                LOG.warn("Exception while registering uri for actionId: [{0}] for notifications", actionId, e);
+            }
+        }
+    }
+
+    @Override
+    public String getEntityKey() {
+        return coordAction.getJobId();
+    }
+
+    @Override
+    public String getKey(){
+        return getName() + "_" + actionId;
+    }
+
+    @Override
+    protected boolean isLockRequired() {
+        return true;
+    }
+
+    @Override
+    protected void eagerLoadState() throws CommandException {
+        try {
+            jpaService = Services.get().get(JPAService.class);
+
+            if (jpaService != null) {
+                coordAction = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(actionId));
+                coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordAction.getJobId()));
+                LogUtils.setLogInfo(coordAction, logInfo);
+            }
+            else {
+                throw new CommandException(ErrorCode.E0610);
+            }
+        }
+        catch (XException ex) {
+            throw new CommandException(ex);
+        }
+    }
+
+    @Override
+    protected void eagerVerifyPrecondition() throws CommandException, PreconditionException {
+        if (coordAction.getStatus() != CoordinatorActionBean.Status.WAITING) {
+            throw new PreconditionException(ErrorCode.E1100, "[" + actionId
+                    + "]::CoordPushDependencyCheck:: Ignoring action. Should be in WAITING state, but state="
+                    + coordAction.getStatus());
+        }
+
+        // if eligible to do action input check when running with backward
+        // support is true
+        if (StatusUtils.getStatusForCoordActionInputCheck(coordJob)) {
+            return;
+        }
+
+        if (coordJob.getStatus() != Job.Status.RUNNING && coordJob.getStatus() != Job.Status.RUNNINGWITHERROR
+                && coordJob.getStatus() != Job.Status.PAUSED && coordJob.getStatus() != Job.Status.PAUSEDWITHERROR) {
+            throw new PreconditionException(ErrorCode.E1100, "[" + actionId
+                    + "]::CoordPushDependencyCheck:: Ignoring action."
+                    + " Coordinator job is not in RUNNING/RUNNINGWITHERROR/PAUSED/PAUSEDWITHERROR state, but state="
+                    + coordJob.getStatus());
+        }
+    }
+
+    @Override
+    protected void loadState() throws CommandException {
+        eagerLoadState();
+    }
+
+    @Override
+    protected void verifyPrecondition() throws CommandException, PreconditionException {
+        eagerVerifyPrecondition();
+    }
+
+}

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/coord/CoordELEvaluator.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/coord/CoordELEvaluator.java?rev=1449911&r1=1449910&r2=1449911&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/coord/CoordELEvaluator.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/coord/CoordELEvaluator.java Mon Feb 25 21:42:07 2013
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.command.coord.CoordCommandUtils;
 import org.apache.oozie.service.ELService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.util.DateUtils;
@@ -163,7 +164,7 @@ public class CoordELEvaluator {
                 }
                 else {
                 }
-                if (data.getChild("unresolved-instances", data.getNamespace()) != null) {
+                if (data.getChild(CoordCommandUtils.UNRESOLVED_INST_TAG, data.getNamespace()) != null) {
                     e.setVariable(".datain." + data.getAttributeValue("name") + ".unresolved", "true"); // TODO:
                     // check
                     // null
@@ -180,7 +181,7 @@ public class CoordELEvaluator {
                 }
                 else {
                 }// TODO
-                if (data.getChild("unresolved-instances", data.getNamespace()) != null) {
+                if (data.getChild(CoordCommandUtils.UNRESOLVED_INST_TAG, data.getNamespace()) != null) {
                     e.setVariable(".dataout." + data.getAttributeValue("name") + ".unresolved", "true"); // TODO:
                     // check
                     // null

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java?rev=1449911&r1=1449910&r2=1449911&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java Mon Feb 25 21:42:07 2013
@@ -17,7 +17,7 @@
  */
 package org.apache.oozie.coord;
 
-import java.io.IOException;
+import java.net.URI;
 import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.Date;
@@ -25,16 +25,15 @@ import java.util.List;
 import java.util.TimeZone;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-
 import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.dependency.URIHandler.Context;
+import org.apache.oozie.dependency.URIHandler;
 import org.apache.oozie.util.DateUtils;
 import org.apache.oozie.util.ELEvaluator;
 import org.apache.oozie.util.ParamChecker;
 import org.apache.oozie.util.XLog;
-import org.apache.oozie.service.HadoopAccessorException;
 import org.apache.oozie.service.Services;
-import org.apache.oozie.service.HadoopAccessorService;
+import org.apache.oozie.service.URIHandlerService;
 
 /**
  * This class implements the EL function related to coordinator
@@ -300,37 +299,50 @@ public class CoordELFunctions {
             String user = ParamChecker
                     .notEmpty((String) eval.getVariable(OozieClient.USER_NAME), OozieClient.USER_NAME);
             String doneFlag = ds.getDoneFlag();
-            while (instance >= checkedInstance) {
-                ELEvaluator uriEval = getUriEvaluator(nominalInstanceCal);
-                String uriPath = uriEval.evaluate(uriTemplate, String.class);
-                String pathWithDoneFlag = uriPath;
-                if (doneFlag.length() > 0) {
-                    pathWithDoneFlag += "/" + doneFlag;
-                }
-                if (isPathAvailable(pathWithDoneFlag, user, null, conf)) {
-                    LOG.debug("Found future(" + available + "): " + pathWithDoneFlag);
-                    if (available == endOffset) {
-                        LOG.debug("Matched future(" + available + "): " + pathWithDoneFlag);
-                        resolved = true;
-                        resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal));
-                        resolvedURIPaths.append(uriPath);
-                        retVal = resolvedInstances.toString();
-                        eval.setVariable("resolved_path", resolvedURIPaths.toString());
-                        break;
-                    } else if (available >= startOffset) {
-                        LOG.debug("Matched future(" + available + "): " + pathWithDoneFlag);
-                        resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal)).append(INSTANCE_SEPARATOR);
-                        resolvedURIPaths.append(uriPath).append(INSTANCE_SEPARATOR);
+            URIHandlerService uriService = Services.get().get(URIHandlerService.class);
+            URIHandler uriHandler = null;
+            Context uriContext = null;
+            try {
+                while (instance >= checkedInstance) {
+                    ELEvaluator uriEval = getUriEvaluator(nominalInstanceCal);
+                    String uriPath = uriEval.evaluate(uriTemplate, String.class);
+                    if (uriHandler == null) {
+                        URI uri = new URI(uriPath);
+                        uriHandler = uriService.getURIHandler(uri);
+                        uriContext = uriHandler.getContext(uri, conf, user);
                     }
-                    available++;
+                    String uriWithDoneFlag = uriHandler.getURIWithDoneFlag(uriPath, doneFlag);
+                    if (uriHandler.exists(new URI(uriWithDoneFlag), uriContext)) {
+                        if (available == endOffset) {
+                            LOG.debug("Matched future(" + available + "): " + uriWithDoneFlag);
+                            resolved = true;
+                            resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal));
+                            resolvedURIPaths.append(uriPath);
+                            retVal = resolvedInstances.toString();
+                            eval.setVariable("resolved_path", resolvedURIPaths.toString());
+                            break;
+                        }
+                        else if (available >= startOffset) {
+                            LOG.debug("Matched future(" + available + "): " + uriWithDoneFlag);
+                            resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal)).append(
+                                    INSTANCE_SEPARATOR);
+                            resolvedURIPaths.append(uriPath).append(INSTANCE_SEPARATOR);
+                        }
+                        available++;
+                    }
+                    // nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(),
+                    // -datasetFrequency);
+                    nominalInstanceCal = (Calendar) initInstance.clone();
+                    instCount[0]++;
+                    nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency);
+                    checkedInstance++;
+                    // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag());
+                }
+            }
+            finally {
+                if (uriContext != null) {
+                    uriContext.destroy();
                 }
-                // nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(),
-                // -datasetFrequency);
-                nominalInstanceCal = (Calendar) initInstance.clone();
-                instCount[0]++;
-                nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency);
-                checkedInstance++;
-                // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag());
             }
             if (!resolved) {
                 // return unchanged future function with variable 'is_resolved'
@@ -973,37 +985,52 @@ public class CoordELFunctions {
             String user = ParamChecker
                     .notEmpty((String) eval.getVariable(OozieClient.USER_NAME), OozieClient.USER_NAME);
             String doneFlag = ds.getDoneFlag();
-            while (nominalInstanceCal.compareTo(initInstance) >= 0) {
-                ELEvaluator uriEval = getUriEvaluator(nominalInstanceCal);
-                String uriPath = uriEval.evaluate(uriTemplate, String.class);
-                String pathWithDoneFlag = uriPath;
-                if (doneFlag.length() > 0) {
-                    pathWithDoneFlag += "/" + doneFlag;
-                }
-                if (isPathAvailable(pathWithDoneFlag, user, null, conf)) {
-                    LOG.debug("Found latest(" + available + "): " + pathWithDoneFlag);
-                    if (available == startOffset) {
-                        LOG.debug("Matched latest(" + available + "): " + pathWithDoneFlag);
-                        resolved = true;
-                        resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal));
-                        resolvedURIPaths.append(uriPath);
-                        retVal = resolvedInstances.toString();
-                        eval.setVariable("resolved_path", resolvedURIPaths.toString());
-                        break;
-                    } else if (available <= endOffset) {
-                        LOG.debug("Matched latest(" + available + "): " + pathWithDoneFlag);
-                        resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal)).append(INSTANCE_SEPARATOR);
-                        resolvedURIPaths.append(uriPath).append(INSTANCE_SEPARATOR);
+            URIHandlerService uriService = Services.get().get(URIHandlerService.class);
+            URIHandler uriHandler = null;
+            Context uriContext = null;
+            try {
+                while (nominalInstanceCal.compareTo(initInstance) >= 0) {
+                    ELEvaluator uriEval = getUriEvaluator(nominalInstanceCal);
+                    String uriPath = uriEval.evaluate(uriTemplate, String.class);
+                    if (uriHandler == null) {
+                        URI uri = new URI(uriPath);
+                        uriHandler = uriService.getURIHandler(uri);
+                        uriContext = uriHandler.getContext(uri, conf, user);
                     }
+                    String uriWithDoneFlag = uriHandler.getURIWithDoneFlag(uriPath, doneFlag);
+                    if (uriHandler.exists(new URI(uriWithDoneFlag), uriContext)) {
+                        XLog.getLog(CoordELFunctions.class)
+                                .debug("Found latest(" + available + "): " + uriWithDoneFlag);
+                        if (available == startOffset) {
+                            LOG.debug("Matched latest(" + available + "): " + uriWithDoneFlag);
+                            resolved = true;
+                            resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal));
+                            resolvedURIPaths.append(uriPath);
+                            retVal = resolvedInstances.toString();
+                            eval.setVariable("resolved_path", resolvedURIPaths.toString());
+                            break;
+                        }
+                        else if (available <= endOffset) {
+                            LOG.debug("Matched latest(" + available + "): " + uriWithDoneFlag);
+                            resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal)).append(
+                                    INSTANCE_SEPARATOR);
+                            resolvedURIPaths.append(uriPath).append(INSTANCE_SEPARATOR);
+                        }
 
-                    available--;
+                        available--;
+                    }
+                    // nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(),
+                    // -datasetFrequency);
+                    nominalInstanceCal = (Calendar) initInstance.clone();
+                    instCount[0]--;
+                    nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency);
+                    // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag());
+                }
+            }
+            finally {
+                if (uriContext != null) {
+                    uriContext.destroy();
                 }
-                // nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(),
-                // -datasetFrequency);
-                nominalInstanceCal = (Calendar) initInstance.clone();
-                instCount[0]--;
-                nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency);
-                // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag());
             }
             if (!resolved) {
                 // return unchanged latest function with variable 'is_resolved'
@@ -1026,26 +1053,6 @@ public class CoordELFunctions {
         return retVal;
     }
 
-    // TODO : Not an efficient way. In a loop environment, we could do something
-    // outside the loop
-    /**
-     * Check whether a URI path exists
-     *
-     * @param sPath
-     * @param conf
-     * @return
-     * @throws IOException
-     */
-
-    private static boolean isPathAvailable(String sPath, String user, String group, Configuration conf)
-            throws IOException, HadoopAccessorException {
-        // sPath += "/" + END_OF_OPERATION_INDICATOR_FILE;
-        Path path = new Path(sPath);
-        HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
-        Configuration fsConf = has.createJobConf(path.toUri().getAuthority());
-        return has.createFileSystem(user, path.toUri(), fsConf).exists(path);
-    }
-
     /**
      * @param tm
      * @return a new Evaluator to be used for URI-template evaluation

Added: oozie/trunk/core/src/main/java/org/apache/oozie/coord/HCatELFunctions.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/coord/HCatELFunctions.java?rev=1449911&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/coord/HCatELFunctions.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/coord/HCatELFunctions.java Mon Feb 25 21:42:07 2013
@@ -0,0 +1,427 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.coord;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.DagELFunctions;
+import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.dependency.URIHandler;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.service.URIHandlerService;
+import org.apache.oozie.util.ELEvaluator;
+import org.apache.oozie.util.HCatURI;
+import org.apache.oozie.util.XLog;
+
+/**
+ * This class implements the EL function for HCat datasets in coordinator
+ */
+
+public class HCatELFunctions {
+    private static XLog LOG = XLog.getLog(HCatELFunctions.class);
+    private static final Configuration EMPTY_CONF = new Configuration(true);
+
+    enum EventType {
+        input, output
+    }
+
+    /* Workflow Parameterization EL functions */
+
+    /**
+     * Return true if partitions exists or false if not.
+     *
+     * @param uri hcatalog partition uri.
+     * @return <code>true</code> if the uri exists, <code>false</code> if it does not.
+     * @throws Exception
+     */
+    public static boolean hcat_exists(String uri) throws Exception {
+        URI hcatURI = new URI(uri);
+        URIHandlerService uriService = Services.get().get(URIHandlerService.class);
+        URIHandler handler = uriService.getURIHandler(hcatURI);
+        WorkflowJob workflow = DagELFunctions.getWorkflow();
+        String user = workflow.getUser();
+        return handler.exists(hcatURI, EMPTY_CONF, user);
+    }
+
+    /* Coord EL functions */
+
+    /**
+     * Echo the same EL function without evaluating anything
+     *
+     * @param dataInName
+     * @return the same EL function
+     */
+    public static String ph1_coord_databaseIn_echo(String dataName) {
+        // Checking if the dataIn is correct?
+        isValidDataEvent(dataName);
+        return echoUnResolved("databaseIn", "'" + dataName + "'");
+    }
+
+    public static String ph1_coord_databaseOut_echo(String dataName) {
+        // Checking if the dataOut is correct?
+        isValidDataEvent(dataName);
+        return echoUnResolved("databaseOut", "'" + dataName + "'");
+    }
+
+    public static String ph1_coord_tableIn_echo(String dataName) {
+        // Checking if the dataIn is correct?
+        isValidDataEvent(dataName);
+        return echoUnResolved("tableIn", "'" + dataName + "'");
+    }
+
+    public static String ph1_coord_tableOut_echo(String dataName) {
+        // Checking if the dataOut is correct?
+        isValidDataEvent(dataName);
+        return echoUnResolved("tableOut", "'" + dataName + "'");
+    }
+
+    public static String ph1_coord_dataInPartitionFilter_echo(String dataInName, String type) {
+        // Checking if the dataIn/dataOut is correct?
+        isValidDataEvent(dataInName);
+        return echoUnResolved("dataInPartitionFilter", "'" + dataInName + "', '" + type + "'");
+    }
+
+    public static String ph1_coord_dataInPartitionMin_echo(String dataInName, String partition) {
+        // Checking if the dataIn/dataOut is correct?
+        isValidDataEvent(dataInName);
+        return echoUnResolved("dataInPartitionMin", "'" + dataInName + "', '" + partition + "'");
+    }
+
+    public static String ph1_coord_dataInPartitionMax_echo(String dataInName, String partition) {
+        // Checking if the dataIn/dataOut is correct?
+        isValidDataEvent(dataInName);
+        return echoUnResolved("dataInPartitionMax", "'" + dataInName + "', '" + partition + "'");
+    }
+
+    public static String ph1_coord_dataOutPartitions_echo(String dataOutName) {
+        // Checking if the dataIn/dataOut is correct?
+        isValidDataEvent(dataOutName);
+        return echoUnResolved("dataOutPartitions", "'" + dataOutName + "'");
+    }
+
+    public static String ph1_coord_dataOutPartitionValue_echo(String dataOutName, String partition) {
+        // Checking if the dataIn/dataOut is correct?
+        isValidDataEvent(dataOutName);
+        return echoUnResolved("dataOutPartitionValue", "'" + dataOutName + "', '" + partition + "'");
+    }
+
+    /**
+     * Extract the hcat DB name from the URI-template associate with
+     * 'dataInName'. Caller needs to specify the EL-evaluator level variable
+     * 'oozie.coord.el.dataset.bean' with synchronous dataset object
+     * (SyncCoordDataset)
+     *
+     * @param dataInName
+     * @return DB name
+     */
+    public static String ph3_coord_databaseIn(String dataName) {
+        HCatURI hcatURI = getURIFromResolved(dataName, EventType.input);
+        if (hcatURI != null) {
+            return hcatURI.getDb();
+        }
+        else {
+            return "";
+        }
+    }
+
+    /**
+     * Extract the hcat DB name from the URI-template associate with
+     * 'dataOutName'. Caller needs to specify the EL-evaluator level variable
+     * 'oozie.coord.el.dataset.bean' with synchronous dataset object
+     * (SyncCoordDataset)
+     *
+     * @param dataOutName
+     * @return DB name
+     */
+    public static String ph3_coord_databaseOut(String dataName) {
+        HCatURI hcatURI = getURIFromResolved(dataName, EventType.output);
+        if (hcatURI != null) {
+            return hcatURI.getDb();
+        }
+        else {
+            return "";
+        }
+    }
+
+    /**
+     * Extract the hcat Table name from the URI-template associate with
+     * 'dataInName'. Caller needs to specify the EL-evaluator level variable
+     * 'oozie.coord.el.dataset.bean' with synchronous dataset object
+     * (SyncCoordDataset)
+     *
+     * @param dataInName
+     * @return Table name
+     */
+    public static String ph3_coord_tableIn(String dataName) {
+        HCatURI hcatURI = getURIFromResolved(dataName, EventType.input);
+        if (hcatURI != null) {
+            return hcatURI.getTable();
+        }
+        else {
+            return "";
+        }
+    }
+
+    /**
+     * Extract the hcat Table name from the URI-template associate with
+     * 'dataOutName'. Caller needs to specify the EL-evaluator level variable
+     * 'oozie.coord.el.dataset.bean' with synchronous dataset object
+     * (SyncCoordDataset)
+     *
+     * @param dataOutName
+     * @return Table name
+     */
+    public static String ph3_coord_tableOut(String dataName) {
+        HCatURI hcatURI = getURIFromResolved(dataName, EventType.output);
+        if (hcatURI != null) {
+            return hcatURI.getTable();
+        }
+        else {
+            return "";
+        }
+    }
+
+    /**
+     * Used to specify the HCat partition filter which is input dependency for workflow job.<p/> Look for two evaluator-level
+     * variables <p/> A) .datain.<DATAIN_NAME> B) .datain.<DATAIN_NAME>.unresolved <p/> A defines the current list of
+     * HCat URIs. <p/> B defines whether there are any unresolved EL-function (i.e latest) <p/> If there are something
+     * unresolved, this function will echo back the original function <p/> otherwise it sends the partition filter.
+     *
+     * @param dataInName : Datain name
+     * @param type : for action type - pig, MR or hive
+     */
+    public static String ph3_coord_dataInPartitionFilter(String dataInName, String type) {
+        ELEvaluator eval = ELEvaluator.getCurrent();
+        String uris = (String) eval.getVariable(".datain." + dataInName);
+        Boolean unresolved = (Boolean) eval.getVariable(".datain." + dataInName + ".unresolved");
+        if (unresolved != null && unresolved.booleanValue() == true) {
+            return "${coord:dataInPartitionFilter('" + dataInName + "', '" + type + "')}";
+        }
+        return createPartitionFilter(uris, type);
+    }
+
+    /**
+     * Used to specify the HCat partition's value defining output for workflow job.<p/> Look for two evaluator-level
+     * variables <p/> A) .dataout.<DATAOUT_NAME> B) .dataout.<DATAOUT_NAME>.unresolved <p/> A defines the current list of
+     * HCat URIs. <p/> B defines whether there are any unresolved EL-function (i.e latest) <p/> If there are something
+     * unresolved, this function will echo back the original function <p/> otherwise it sends the partition value.
+     *
+     * @param dataOutName : Dataout name
+     * @param partitionName : Specific partition name whose value is wanted
+     */
+    public static String ph3_coord_dataOutPartitionValue(String dataOutName, String partitionName) {
+        ELEvaluator eval = ELEvaluator.getCurrent();
+        String uri = (String) eval.getVariable(".dataout." + dataOutName);
+        Boolean unresolved = (Boolean) eval.getVariable(".dataout." + dataOutName + ".unresolved");
+        if (unresolved != null && unresolved.booleanValue() == true) {
+            return "${coord:dataOutPartitionValue('" + dataOutName + "', '" + partitionName + "')}";
+        }
+        try {
+            HCatURI hcatUri = new HCatURI(uri);
+            return hcatUri.getPartitionValue(partitionName);
+        }
+        catch(URISyntaxException urie) {
+            LOG.warn("Exception with uriTemplate [{0}]. Reason [{1}]: ", uri, urie);
+            throw new RuntimeException("HCat URI can't be parsed " + urie);
+        }
+    }
+
+    /**
+     * Used to specify the entire HCat partition defining output for workflow job.<p/> Look for two evaluator-level
+     * variables <p/> A) .dataout.<DATAOUT_NAME> B) .dataout.<DATAOUT_NAME>.unresolved <p/> A defines the data-out
+     * HCat URI. <p/> B defines whether there are any unresolved EL-function (i.e latest) <p/> If there are something
+     * unresolved, this function will echo back the original function <p/> otherwise it sends the partition.
+     *
+     * @param dataOutName : DataOut name
+     */
+    public static String ph3_coord_dataOutPartitions(String dataOutName) {
+        ELEvaluator eval = ELEvaluator.getCurrent();
+        String uri = (String) eval.getVariable(".dataout." + dataOutName);
+        Boolean unresolved = (Boolean) eval.getVariable(".dataout." + dataOutName + ".unresolved");
+        if (unresolved != null && unresolved.booleanValue() == true) {
+            return "${coord:dataOutPartitions('" + dataOutName + "')}";
+        }
+        try {
+            return new HCatURI(uri).toPartitionString();
+        }
+        catch (URISyntaxException e) {
+            throw new RuntimeException("Parsing exception for HCatURI " + uri + ". details: " + e);
+        }
+    }
+
+    /**
+     * Used to specify the MAXIMUM value of an HCat partition which is input dependency for workflow job.<p/> Look for two evaluator-level
+     * variables <p/> A) .datain.<DATAIN_NAME> B) .datain.<DATAIN_NAME>.unresolved <p/> A defines the current list of
+     * HCat URIs. <p/> B defines whether there are any unresolved EL-function (i.e latest) <p/> If there are something
+     * unresolved, this function will echo back the original function <p/> otherwise it sends the max partition value.
+     *
+     * @param dataInName : Datain name
+     * @param partitionName : Specific partition name whose MAX value is wanted
+     */
+    public static String ph3_coord_dataInPartitionMin(String dataInName, String partitionName) {
+        ELEvaluator eval = ELEvaluator.getCurrent();
+        String uris = (String) eval.getVariable(".datain." + dataInName);
+        Boolean unresolved = (Boolean) eval.getVariable(".datain." + dataInName + ".unresolved");
+        if (unresolved != null && unresolved.booleanValue() == true) {
+            return "${coord:dataInPartitionMin('" + dataInName + "', '" + partitionName + "')}";
+        }
+        String minPartition = null;
+        if (uris != null) {
+            String[] uriList = uris.split(CoordELFunctions.DIR_SEPARATOR);
+            // get the partition values list and find minimum
+            try {
+                // initialize minValue with first partition value
+                minPartition = new HCatURI(uriList[0]).getPartitionValue(partitionName);
+                if (minPartition == null || minPartition.isEmpty()) {
+                    throw new RuntimeException("No value in data-in uri for partition key: " + partitionName);
+                }
+                for (int i = 1; i < uriList.length; i++) {
+                        String value = new HCatURI(uriList[i]).getPartitionValue(partitionName);
+                        if(value.compareTo(minPartition) < 0) { //sticking to string comparison since some numerical date
+                                                                //values can also contain letters e.g. 20120101T0300Z (UTC)
+                            minPartition = value;
+                        }
+                }
+            }
+            catch(URISyntaxException urie) {
+                throw new RuntimeException("HCat URI can't be parsed " + urie);
+            }
+        }
+        else {
+            LOG.warn("URI is null");
+            return null;
+        }
+        return minPartition;
+    }
+
+    /**
+     * Used to specify the MINIMUM value of an HCat partition which is input dependency for workflow job.<p/> Look for two evaluator-level
+     * variables <p/> A) .datain.<DATAIN_NAME> B) .datain.<DATAIN_NAME>.unresolved <p/> A defines the current list of
+     * HCat URIs. <p/> B defines whether there are any unresolved EL-function (i.e latest) <p/> If there are something
+     * unresolved, this function will echo back the original function <p/> otherwise it sends the min partition value.
+     *
+     * @param dataInName : Datain name
+     * @param partitionName : Specific partition name whose MIN value is wanted
+     */
+    public static String ph3_coord_dataInPartitionMax(String dataInName, String partitionName) {
+        ELEvaluator eval = ELEvaluator.getCurrent();
+        String uris = (String) eval.getVariable(".datain." + dataInName);
+        Boolean unresolved = (Boolean) eval.getVariable(".datain." + dataInName + ".unresolved");
+        if (unresolved != null && unresolved.booleanValue() == true) {
+            return "${coord:dataInPartitionMin('" + dataInName + "', '" + partitionName + "')}";
+        }
+        String maxPartition = null;
+        if (uris != null) {
+            String[] uriList = uris.split(CoordELFunctions.DIR_SEPARATOR);
+            // get the partition values list and find minimum
+            try {
+                // initialize minValue with first partition value
+                maxPartition = new HCatURI(uriList[0]).getPartitionValue(partitionName);
+                if (maxPartition == null || maxPartition.isEmpty()) {
+                    throw new RuntimeException("No value in data-in uri for partition key: " + partitionName);
+                }
+                for(int i = 1; i < uriList.length; i++) {
+                        String value = new HCatURI(uriList[i]).getPartitionValue(partitionName);
+                        if(value.compareTo(maxPartition) > 0) {
+                            maxPartition = value;
+                        }
+                }
+            }
+            catch(URISyntaxException urie) {
+                throw new RuntimeException("HCat URI can't be parsed " + urie);
+            }
+        }
+        else {
+            LOG.warn("URI is null");
+            return null;
+        }
+        return maxPartition;
+    }
+
+    private static String createPartitionFilter(String uris, String type) {
+        String[] uriList = uris.split(CoordELFunctions.DIR_SEPARATOR);
+        StringBuilder filter = new StringBuilder("");
+        if (uriList.length > 0) {
+            for (String uri : uriList) {
+                if (filter.length() > 0) {
+                    filter.append(" OR ");
+                }
+                try {
+                    filter.append(new HCatURI(uri).toPartitionFilter(type));
+                }
+                catch (URISyntaxException e) {
+                    throw new RuntimeException("Parsing exception for HCatURI " + uri + ". details: " + e);
+                }
+            }
+        }
+        return filter.toString();
+    }
+
+    private static HCatURI getURIFromResolved(String dataInName, EventType type) {
+        StringBuilder uriTemplate = new StringBuilder();
+        ELEvaluator eval = ELEvaluator.getCurrent();
+        String uris;
+        if(type == EventType.input) {
+            uris = (String) eval.getVariable(".datain." + dataInName);
+        }
+        else { //type=output
+            uris = (String) eval.getVariable(".dataout." + dataInName);
+        }
+        if (uris != null) {
+            String[] uri = uris.split(CoordELFunctions.DIR_SEPARATOR, -1);
+            uriTemplate.append(uri[0]);
+        }
+        else {
+            LOG.warn("URI is NULL");
+            return null;
+        }
+        LOG.info("uriTemplate [{0}] ", uriTemplate);
+        HCatURI hcatURI;
+        try {
+            hcatURI = new HCatURI(uriTemplate.toString());
+        }
+        catch (URISyntaxException e) {
+            LOG.info("uriTemplate [{0}]. Reason [{1}]: ", uriTemplate, e);
+            throw new RuntimeException("HCat URI can't be parsed " + e);
+        }
+        return hcatURI;
+    }
+
+    private static boolean isValidDataEvent(String dataInName) {
+        ELEvaluator eval = ELEvaluator.getCurrent();
+        String val = (String) eval.getVariable("oozie.dataname." + dataInName);
+        if (val == null || (val.equals("data-in") == false && val.equals("data-out") == false)) {
+            LOG.error("dataset name " + dataInName + " is not valid. val :" + val);
+            throw new RuntimeException("data set name " + dataInName + " is not valid");
+        }
+        return true;
+    }
+
+    private static String echoUnResolved(String functionName, String n) {
+        return echoUnResolvedPre(functionName, n, "coord:");
+    }
+
+    private static String echoUnResolvedPre(String functionName, String n, String prefix) {
+        ELEvaluator eval = ELEvaluator.getCurrent();
+        eval.setVariable(".wrap", "true");
+        return prefix + functionName + "(" + n + ")"; // Unresolved
+    }
+
+}
\ No newline at end of file

Added: oozie/trunk/core/src/main/java/org/apache/oozie/dependency/ActionDependency.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/dependency/ActionDependency.java?rev=1449911&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/dependency/ActionDependency.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/dependency/ActionDependency.java Mon Feb 25 21:42:07 2013
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.dependency;
+
+import java.util.List;
+
+public class ActionDependency {
+
+    private List<String> missingDependencies;
+    private List<String> availableDependencies;
+
+    ActionDependency(List<String> missingDependencies, List<String> availableDependencies) {
+        this.missingDependencies = missingDependencies;
+        this.availableDependencies = availableDependencies;
+    }
+
+    public List<String> getMissingDependencies() {
+        return missingDependencies;
+    }
+
+    public List<String> getAvailableDependencies() {
+        return availableDependencies;
+    }
+
+}

Added: oozie/trunk/core/src/main/java/org/apache/oozie/dependency/DependencyChecker.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/dependency/DependencyChecker.java?rev=1449911&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/dependency/DependencyChecker.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/dependency/DependencyChecker.java Mon Feb 25 21:42:07 2013
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.dependency;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.coord.CoordELFunctions;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.service.URIHandlerService;
+import org.apache.oozie.util.ParamChecker;
+import org.apache.oozie.util.XLog;
+
+public class DependencyChecker {
+
+    private static XLog LOG = XLog.getLog(DependencyChecker.class);
+
+    /**
+     * Return a string of missing dependencies concatenated by CoordELFunctions.INSTANCE_SEPARATOR
+     *
+     * @param missingDependencies list of missing dependencies
+     * @return missing dependencies as a string
+     */
+    public static String dependenciesAsString(List<String> missingDependencies) {
+        return StringUtils.join(missingDependencies, CoordELFunctions.INSTANCE_SEPARATOR);
+    }
+
+    /**
+     * Return a array of missing dependencies
+     *
+     * @param missingDependencies missing dependencies concatenated by
+     *        CoordELFunctions.INSTANCE_SEPARATOR
+     * @return missing dependencies as a array
+     */
+    public static String[] dependenciesAsArray(String missingDependencies) {
+        return missingDependencies.split(CoordELFunctions.INSTANCE_SEPARATOR);
+    }
+
+    /**
+     * Get the currently missing and available dependencies after checking the list of known missing
+     * dependencies against the source.
+     *
+     * @param missingDependencies known missing dependencies
+     * @param actionConf Configuration for the action
+     * @param stopOnFirstMissing Does not continue check for the rest of list if there is a missing
+     *        dependency
+     * @return ActionDependency which has the list of missing and available dependencies
+     * @throws CommandException
+     */
+    public static ActionDependency checkForAvailability(String missingDependencies, Configuration actionConf,
+            boolean stopOnFirstMissing) throws CommandException {
+        return checkForAvailability(dependenciesAsArray(missingDependencies), actionConf, stopOnFirstMissing);
+    }
+
+    /**
+     * Get the currently missing and available dependencies after checking the list of known missing
+     * dependencies against the source.
+     *
+     * @param missingDependencies known missing dependencies
+     * @param actionConf Configuration for the action
+     * @param stopOnFirstMissing Does not continue check for the rest of list if there is a missing
+     *        dependency
+     * @return ActionDependency which has the list of missing and available dependencies
+     * @throws CommandException
+     */
+    public static ActionDependency checkForAvailability(String[] missingDependencies, Configuration actionConf,
+            boolean stopOnFirstMissing) throws CommandException {
+        String user = ParamChecker.notEmpty(actionConf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
+        List<String> missingDeps = new ArrayList<String>();
+        List<String> availableDeps = new ArrayList<String>();
+        URIHandlerService uriService = Services.get().get(URIHandlerService.class);
+        boolean continueChecking = true;
+        try {
+            for (int index = 0; index < missingDependencies.length; index++) {
+                if (continueChecking) {
+                    String dependency = missingDependencies[index];
+
+                    URI uri = new URI(dependency);
+                    URIHandler uriHandler = uriService.getURIHandler(uri);
+                    LOG.debug("Checking for the availability of dependency [{0}] ", dependency);
+                    if (uriHandler.exists(uri, actionConf, user)) {
+                        LOG.debug("Dependency [{0}] is available", dependency);
+                        availableDeps.add(dependency);
+                    }
+                    else {
+                        LOG.debug("Dependency [{0}] is missing", dependency);
+                        missingDeps.add(dependency);
+                        if (stopOnFirstMissing) {
+                            continueChecking = false;
+                        }
+                    }
+
+                }
+                else {
+                    missingDeps.add(missingDependencies[index]);
+                }
+            }
+        }
+        catch (URISyntaxException e) {
+            throw new CommandException(ErrorCode.E0906, e.getMessage(), e);
+        }
+        catch (URIHandlerException e) {
+            throw new CommandException(e);
+        }
+        return new ActionDependency(missingDeps, availableDeps);
+    }
+}

Added: oozie/trunk/core/src/main/java/org/apache/oozie/dependency/FSURIHandler.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/dependency/FSURIHandler.java?rev=1449911&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/dependency/FSURIHandler.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/dependency/FSURIHandler.java Mon Feb 25 21:42:07 2013
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.dependency;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.action.hadoop.FSLauncherURIHandler;
+import org.apache.oozie.action.hadoop.LauncherURIHandler;
+import org.apache.oozie.service.HadoopAccessorException;
+import org.apache.oozie.service.HadoopAccessorService;
+import org.apache.oozie.service.Services;
+
+public class FSURIHandler implements URIHandler {
+
+    private HadoopAccessorService service;
+    private Set<String> supportedSchemes;
+    private List<Class<?>> classesToShip;
+
+    @Override
+    public void init(Configuration conf) {
+        service = Services.get().get(HadoopAccessorService.class);
+        supportedSchemes = service.getSupportedSchemes();
+        classesToShip = new FSLauncherURIHandler().getClassesForLauncher();
+    }
+
+    @Override
+    public Set<String> getSupportedSchemes() {
+        return supportedSchemes;
+    }
+
+    @Override
+    public Class<? extends LauncherURIHandler> getLauncherURIHandlerClass() {
+        return FSLauncherURIHandler.class;
+    }
+
+    @Override
+    public List<Class<?>> getClassesForLauncher() {
+        return classesToShip;
+    }
+
+    @Override
+    public DependencyType getDependencyType(URI uri) throws URIHandlerException {
+        return DependencyType.PULL;
+    }
+
+    @Override
+    public void registerForNotification(URI uri, Configuration conf, String user, String actionID)
+            throws URIHandlerException {
+        throw new UnsupportedOperationException("Notifications are not supported for " + uri.getScheme());
+    }
+
+    @Override
+    public boolean unregisterFromNotification(URI uri, String actionID) {
+        throw new UnsupportedOperationException("Notifications are not supported for " + uri.getScheme());
+    }
+
+    @Override
+    public Context getContext(URI uri, Configuration conf, String user) throws URIHandlerException {
+        FileSystem fs = getFileSystem(uri, conf, user);
+        return new FSContext(conf, user, fs);
+    }
+
+    @Override
+    public boolean exists(URI uri, Context context) throws URIHandlerException {
+        try {
+            FileSystem fs = ((FSContext) context).getFileSystem();
+            return fs.exists(getNormalizedPath(uri));
+        }
+        catch (IOException e) {
+            throw new HadoopAccessorException(ErrorCode.E0902, e);
+        }
+    }
+
+    @Override
+    public boolean exists(URI uri, Configuration conf, String user) throws URIHandlerException {
+        try {
+            FileSystem fs = getFileSystem(uri, conf, user);
+            return fs.exists(getNormalizedPath(uri));
+        }
+        catch (IOException e) {
+            throw new HadoopAccessorException(ErrorCode.E0902, e);
+        }
+    }
+
+    @Override
+    public String getURIWithDoneFlag(String uri, String doneFlag) throws URIHandlerException {
+        if (doneFlag.length() > 0) {
+            uri += "/" + doneFlag;
+        }
+        return uri;
+    }
+
+    @Override
+    public void validate(String uri) throws URIHandlerException {
+    }
+
+    @Override
+    public void destroy() {
+
+    }
+
+    private Path getNormalizedPath(URI uri) {
+        // Normalizes uri path replacing // with / in the path which users specify by mistake
+        return new Path(uri.getScheme(), uri.getAuthority(), uri.getPath());
+    }
+
+    private FileSystem getFileSystem(URI uri, Configuration conf, String user) throws HadoopAccessorException {
+        if (user == null) {
+            throw new HadoopAccessorException(ErrorCode.E0902, "user has to be specified to access FileSystem");
+        }
+        Configuration fsConf = service.createJobConf(uri.getAuthority());
+        return service.createFileSystem(user, uri, fsConf);
+    }
+
+    static class FSContext extends Context {
+
+        private FileSystem fs;
+
+        /**
+         * Create a FSContext that can be used to access a filesystem URI
+         *
+         * @param conf Configuration to access the URI
+         * @param user name of the user the URI should be accessed as
+         * @param fs FileSystem to access
+         */
+        public FSContext(Configuration conf, String user, FileSystem fs) {
+            super(conf, user);
+            this.fs = fs;
+        }
+
+        /**
+         * Get the FileSystem to access the URI
+         * @return FileSystem to access the URI
+         */
+        public FileSystem getFileSystem() {
+            return fs;
+        }
+    }
+
+}