You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by vi...@apache.org on 2013/02/25 22:42:09 UTC

svn commit: r1449911 [1/8] - in /oozie/trunk: ./ client/src/main/java/org/apache/oozie/cli/ client/src/main/java/org/apache/oozie/client/ client/src/main/java/org/apache/oozie/client/rest/ client/src/test/java/org/apache/oozie/client/rest/ core/ core/s...

Author: virag
Date: Mon Feb 25 21:42:07 2013
New Revision: 1449911

URL: http://svn.apache.org/r1449911
Log:
Merging hcat-intre to trunk

Added:
    oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/FSLauncherURIHandler.java
    oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/HCatLauncherURIHandler.java
    oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/LauncherException.java
    oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/LauncherURIHandler.java
    oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/LauncherURIHandlerFactory.java
    oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdatePushMissingDependency.java
    oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
    oozie/trunk/core/src/main/java/org/apache/oozie/coord/HCatELFunctions.java
    oozie/trunk/core/src/main/java/org/apache/oozie/dependency/
    oozie/trunk/core/src/main/java/org/apache/oozie/dependency/ActionDependency.java
    oozie/trunk/core/src/main/java/org/apache/oozie/dependency/DependencyChecker.java
    oozie/trunk/core/src/main/java/org/apache/oozie/dependency/FSURIHandler.java
    oozie/trunk/core/src/main/java/org/apache/oozie/dependency/HCatURIHandler.java
    oozie/trunk/core/src/main/java/org/apache/oozie/dependency/URIHandler.java
    oozie/trunk/core/src/main/java/org/apache/oozie/dependency/URIHandlerException.java
    oozie/trunk/core/src/main/java/org/apache/oozie/dependency/hcat/
    oozie/trunk/core/src/main/java/org/apache/oozie/dependency/hcat/EhcacheHCatDependencyCache.java
    oozie/trunk/core/src/main/java/org/apache/oozie/dependency/hcat/HCatDependencyCache.java
    oozie/trunk/core/src/main/java/org/apache/oozie/dependency/hcat/HCatMessageHandler.java
    oozie/trunk/core/src/main/java/org/apache/oozie/dependency/hcat/SimpleHCatDependencyCache.java
    oozie/trunk/core/src/main/java/org/apache/oozie/dependency/hcat/WaitingAction.java
    oozie/trunk/core/src/main/java/org/apache/oozie/dependency/hcat/WaitingActions.java
    oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionUpdatePushInputCheckJPAExecutor.java
    oozie/trunk/core/src/main/java/org/apache/oozie/jms/
    oozie/trunk/core/src/main/java/org/apache/oozie/jms/ConnectionContext.java
    oozie/trunk/core/src/main/java/org/apache/oozie/jms/DefaultConnectionContext.java
    oozie/trunk/core/src/main/java/org/apache/oozie/jms/JMSConnectionInfo.java
    oozie/trunk/core/src/main/java/org/apache/oozie/jms/JMSExceptionListener.java
    oozie/trunk/core/src/main/java/org/apache/oozie/jms/MessageHandler.java
    oozie/trunk/core/src/main/java/org/apache/oozie/jms/MessageReceiver.java
    oozie/trunk/core/src/main/java/org/apache/oozie/service/HCatAccessorException.java
    oozie/trunk/core/src/main/java/org/apache/oozie/service/HCatAccessorService.java
    oozie/trunk/core/src/main/java/org/apache/oozie/service/JMSAccessorService.java
    oozie/trunk/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java
    oozie/trunk/core/src/main/java/org/apache/oozie/service/URIHandlerService.java
    oozie/trunk/core/src/main/java/org/apache/oozie/service/UserGroupInformationService.java
    oozie/trunk/core/src/main/java/org/apache/oozie/util/HCatURI.java
    oozie/trunk/core/src/main/java/org/apache/oozie/util/MappingRule.java
    oozie/trunk/core/src/main/resources/ehcache-default.xml
    oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestFSPrepareActions.java
    oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestHCatPrepareActions.java
    oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestLauncherFSURIHandler.java
    oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestLauncherHCatURIHandler.java
    oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionUpdatePushMissingDependency.java
    oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java
    oozie/trunk/core/src/test/java/org/apache/oozie/coord/TestHCatELFunctions.java
    oozie/trunk/core/src/test/java/org/apache/oozie/dependency/
    oozie/trunk/core/src/test/java/org/apache/oozie/dependency/TestFSURIHandler.java
    oozie/trunk/core/src/test/java/org/apache/oozie/dependency/TestHCatURIHandler.java
    oozie/trunk/core/src/test/java/org/apache/oozie/dependency/TestURIHandlerService.java
    oozie/trunk/core/src/test/java/org/apache/oozie/jms/
    oozie/trunk/core/src/test/java/org/apache/oozie/jms/TestHCatMessageHandler.java
    oozie/trunk/core/src/test/java/org/apache/oozie/service/TestHCatAccessorService.java
    oozie/trunk/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java
    oozie/trunk/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerEhcache.java
    oozie/trunk/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java
    oozie/trunk/core/src/test/java/org/apache/oozie/test/MiniHCatServer.java
    oozie/trunk/core/src/test/java/org/apache/oozie/test/XHCatTestCase.java
    oozie/trunk/core/src/test/java/org/apache/oozie/util/TestHCatURI.java
    oozie/trunk/core/src/test/resources/coord-job-for-matd-hcat.xml
    oozie/trunk/core/src/test/resources/coord-job-for-matd-neg-hcat.xml
    oozie/trunk/core/src/test/resources/ehcache.xml
    oozie/trunk/examples/src/main/apps/hcatalog/
    oozie/trunk/examples/src/main/apps/hcatalog/README
    oozie/trunk/examples/src/main/apps/hcatalog/coordinator.xml
    oozie/trunk/examples/src/main/apps/hcatalog/id.pig
    oozie/trunk/examples/src/main/apps/hcatalog/job.properties
    oozie/trunk/examples/src/main/apps/hcatalog/workflow.xml
    oozie/trunk/hcataloglibs/
    oozie/trunk/hcataloglibs/hcatalog-0.6/
    oozie/trunk/hcataloglibs/hcatalog-0.6/pom.xml
    oozie/trunk/hcataloglibs/pom.xml
    oozie/trunk/sharelib/hcatalog/
    oozie/trunk/sharelib/hcatalog/pom.xml
    oozie/trunk/src/main/assemblies/hcataloglib.xml
    oozie/trunk/src/main/assemblies/hcataloglibs.xml
Modified:
    oozie/trunk/client/src/main/java/org/apache/oozie/cli/OozieCLI.java
    oozie/trunk/client/src/main/java/org/apache/oozie/client/CoordinatorAction.java
    oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java
    oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonToBean.java
    oozie/trunk/client/src/test/java/org/apache/oozie/client/rest/TestJsonToBean.java
    oozie/trunk/core/pom.xml
    oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
    oozie/trunk/core/src/main/java/org/apache/oozie/ErrorCode.java
    oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/FileSystemActions.java
    oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
    oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java
    oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java
    oozie/trunk/core/src/main/java/org/apache/oozie/client/rest/JsonCoordinatorAction.java
    oozie/trunk/core/src/main/java/org/apache/oozie/client/rest/JsonCoordinatorJob.java
    oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
    oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java
    oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
    oozie/trunk/core/src/main/java/org/apache/oozie/coord/CoordELEvaluator.java
    oozie/trunk/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java
    oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForInfoJPAExecutor.java
    oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForInputCheckJPAExecutor.java
    oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetJPAExecutor.java
    oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionsGetForRecoveryJPAExecutor.java
    oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionsSubsetJPAExecutor.java
    oozie/trunk/core/src/main/java/org/apache/oozie/service/HadoopAccessorException.java
    oozie/trunk/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java
    oozie/trunk/core/src/main/java/org/apache/oozie/service/RecoveryService.java
    oozie/trunk/core/src/main/java/org/apache/oozie/service/SchedulerService.java
    oozie/trunk/core/src/main/java/org/apache/oozie/util/XLog.java
    oozie/trunk/core/src/main/resources/oozie-default.xml
    oozie/trunk/core/src/main/resources/oozie-log4j.properties
    oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestFileSystemActions.java
    oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestHiveMain.java
    oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
    oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionError.java
    oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java
    oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestPrepareActionsDriver.java
    oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestShellActionExecutor.java
    oozie/trunk/core/src/test/java/org/apache/oozie/client/rest/TestJsonCoordinatorAction.java
    oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java
    oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java
    oozie/trunk/core/src/test/java/org/apache/oozie/coord/TestCoordELFunctions.java
    oozie/trunk/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java
    oozie/trunk/core/src/test/java/org/apache/oozie/servlet/TestJobsServlet.java
    oozie/trunk/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
    oozie/trunk/core/src/test/java/org/apache/oozie/test/XTestCase.java
    oozie/trunk/core/src/test/java/org/apache/oozie/util/TestGraphGenerator.java
    oozie/trunk/docs/src/site/twiki/CoordinatorFunctionalSpec.twiki
    oozie/trunk/docs/src/site/twiki/DG_QuickStart.twiki
    oozie/trunk/docs/src/site/twiki/ENG_Custom_Authentication.twiki
    oozie/trunk/docs/src/site/twiki/WorkflowFunctionalSpec.twiki
    oozie/trunk/examples/src/test/java/org/apache/oozie/example/TestLocalOozieExample.java
    oozie/trunk/login/src/main/java/org/apache/oozie/servlet/login/LoginServlet.java
    oozie/trunk/login/src/main/webapp/WEB-INF/web.xml
    oozie/trunk/minitest/src/test/java/org/apache/oozie/test/WorkflowTest.java
    oozie/trunk/pom.xml
    oozie/trunk/release-log.txt
    oozie/trunk/sharelib/pom.xml
    oozie/trunk/src/main/assemblies/sharelib.xml
    oozie/trunk/tests/pig/src/test/java/org/apache/oozie/action/hadoop/TestPigActionExecutor.java
    oozie/trunk/tests/pig/src/test/java/org/apache/oozie/action/hadoop/TestPigMain.java
    oozie/trunk/webapp/src/main/webapp/oozie-console.js

Modified: oozie/trunk/client/src/main/java/org/apache/oozie/cli/OozieCLI.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/main/java/org/apache/oozie/cli/OozieCLI.java?rev=1449911&r1=1449910&r2=1449911&view=diff
==============================================================================
--- oozie/trunk/client/src/main/java/org/apache/oozie/cli/OozieCLI.java (original)
+++ oozie/trunk/client/src/main/java/org/apache/oozie/cli/OozieCLI.java Mon Feb 25 21:42:07 2013
@@ -981,10 +981,6 @@ public class OozieCLI {
             System.out.println(RULER);
 
             for (CoordinatorAction action : actions) {
-                String missingDep = action.getMissingDependencies();
-                if(missingDep != null && !missingDep.isEmpty()) {
-                    missingDep = missingDep.split(INSTANCE_SEPARATOR)[0];
-                }
                 System.out.println(maskIfNull(action.getId()) + VERBOSE_DELIMITER + action.getActionNumber()
                         + VERBOSE_DELIMITER + maskIfNull(action.getConsoleUrl()) + VERBOSE_DELIMITER
                         + maskIfNull(action.getErrorCode()) + VERBOSE_DELIMITER + maskIfNull(action.getErrorMessage())
@@ -994,7 +990,7 @@ public class OozieCLI {
                         + maskDate(action.getCreatedTime(), timeZoneId, verbose) + VERBOSE_DELIMITER
                         + maskDate(action.getNominalTime(), timeZoneId, verbose) + action.getStatus() + VERBOSE_DELIMITER
                         + maskDate(action.getLastModifiedTime(), timeZoneId, verbose) + VERBOSE_DELIMITER
-                        + maskIfNull(missingDep));
+                        + maskIfNull(getFirstMissingDependencies(action)));
 
                 System.out.println(RULER);
             }
@@ -1056,11 +1052,7 @@ public class OozieCLI {
         System.out.println("Nominal Time         : " + maskDate(coordAction.getNominalTime(), timeZoneId, false));
         System.out.println("Status               : " + coordAction.getStatus());
         System.out.println("Last Modified        : " + maskDate(coordAction.getLastModifiedTime(), timeZoneId, false));
-        String missingDep = coordAction.getMissingDependencies();
-        if(missingDep != null && !missingDep.isEmpty()) {
-            missingDep = missingDep.split(INSTANCE_SEPARATOR)[0];
-        }
-        System.out.println("First Missing Dependency : " + maskIfNull(missingDep));
+        System.out.println("First Missing Dependency : " + maskIfNull(getFirstMissingDependencies(coordAction)));
 
         System.out.println(RULER);
     }
@@ -1537,7 +1529,7 @@ public class OozieCLI {
                 Schema schema = factory.newSchema(sources.toArray(new StreamSource[sources.size()]));
                 Validator validator = schema.newValidator();
                 validator.validate(new StreamSource(new FileReader(file)));
-                System.out.println("Valid worflow-app");
+                System.out.println("Valid workflow-app");
             }
             catch (Exception ex) {
                 throw new OozieCLIException("Invalid workflow-app, " + ex.toString(), ex);
@@ -1637,4 +1629,23 @@ public class OozieCLI {
             throw new OozieCLIException(ex.toString(), ex);
         }
     }
+
+    private String getFirstMissingDependencies(CoordinatorAction action) {
+        StringBuilder allDeps = new StringBuilder();
+        String missingDep = action.getMissingDependencies();
+        boolean depExists = false;
+        if (missingDep != null && !missingDep.isEmpty()) {
+            allDeps.append(missingDep.split(INSTANCE_SEPARATOR)[0]);
+            depExists = true;
+        }
+        String pushDeps = action.getPushMissingDependencies();
+        if (pushDeps != null && !pushDeps.isEmpty()) {
+            if(depExists) {
+                allDeps.append(INSTANCE_SEPARATOR);
+            }
+            allDeps.append(pushDeps.split(INSTANCE_SEPARATOR)[0]);
+        }
+        return allDeps.toString();
+    }
+
 }

Modified: oozie/trunk/client/src/main/java/org/apache/oozie/client/CoordinatorAction.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/main/java/org/apache/oozie/client/CoordinatorAction.java?rev=1449911&r1=1449910&r2=1449911&view=diff
==============================================================================
--- oozie/trunk/client/src/main/java/org/apache/oozie/client/CoordinatorAction.java (original)
+++ oozie/trunk/client/src/main/java/org/apache/oozie/client/CoordinatorAction.java Mon Feb 25 21:42:07 2013
@@ -105,12 +105,20 @@ public interface CoordinatorAction {
     Status getStatus();
 
     /**
-     * Return the missing dependencies for the particular action
+     * Return the PULL-based (directory based) missing dependencies for the
+     * particular action
      *
      * @return the missing dependencies for the particular action
      */
     String getMissingDependencies();
 
+    /**
+     * Return the PUSH-based (e.d HCatalog partition-based ) missing
+     * dependencies for the particular action
+     *
+     * @return the missing dependencies for the particular action
+     */
+    String getPushMissingDependencies();
 
     /**
      * Return the external status of the application instance.

Modified: oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java?rev=1449911&r1=1449910&r2=1449911&view=diff
==============================================================================
--- oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java (original)
+++ oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java Mon Feb 25 21:42:07 2013
@@ -115,6 +115,7 @@ public interface JsonTags {
     public static final String COORDINATOR_ACTION_NOMINAL_TIME = "nominalTime";
     public static final String COORDINATOR_ACTION_STATUS = "status";
     public static final String COORDINATOR_ACTION_MISSING_DEPS = "missingDependencies";
+    public static final String COORDINATOR_ACTION_PUSH_MISSING_DEPS = "pushMissingDependencies";
     public static final String COORDINATOR_ACTION_EXTERNAL_STATUS = "externalStatus";
     public static final String COORDINATOR_ACTION_TRACKER_URI = "trackerUri";
     public static final String COORDINATOR_ACTION_CONSOLE_URL = "consoleUrl";

Modified: oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonToBean.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonToBean.java?rev=1449911&r1=1449910&r2=1449911&view=diff
==============================================================================
--- oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonToBean.java (original)
+++ oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonToBean.java Mon Feb 25 21:42:07 2013
@@ -119,6 +119,8 @@ public class JsonToBean {
                 .put("getLastModifiedTime", new Property(JsonTags.COORDINATOR_ACTION_LAST_MODIFIED_TIME, Date.class));
         COORD_ACTION
                 .put("getMissingDependencies", new Property(JsonTags.COORDINATOR_ACTION_MISSING_DEPS, String.class));
+        COORD_ACTION.put("getPushMissingDependencies", new Property(JsonTags.COORDINATOR_ACTION_PUSH_MISSING_DEPS,
+                String.class));
         COORD_ACTION.put("getExternalStatus", new Property(JsonTags.COORDINATOR_ACTION_EXTERNAL_STATUS, String.class));
         COORD_ACTION.put("getTrackerUri", new Property(JsonTags.COORDINATOR_ACTION_TRACKER_URI, String.class));
         COORD_ACTION.put("getConsoleUrl", new Property(JsonTags.COORDINATOR_ACTION_CONSOLE_URL, String.class));

Modified: oozie/trunk/client/src/test/java/org/apache/oozie/client/rest/TestJsonToBean.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/test/java/org/apache/oozie/client/rest/TestJsonToBean.java?rev=1449911&r1=1449910&r2=1449911&view=diff
==============================================================================
--- oozie/trunk/client/src/test/java/org/apache/oozie/client/rest/TestJsonToBean.java (original)
+++ oozie/trunk/client/src/test/java/org/apache/oozie/client/rest/TestJsonToBean.java Mon Feb 25 21:42:07 2013
@@ -181,6 +181,7 @@ public class TestJsonToBean extends Test
         json.put(JsonTags.COORDINATOR_ACTION_RUNTIME_CONF, "e");
         json.put(JsonTags.COORDINATOR_ACTION_LAST_MODIFIED_TIME, LAST_MODIFIED);
         json.put(JsonTags.COORDINATOR_ACTION_MISSING_DEPS, "f");
+        json.put(JsonTags.COORDINATOR_ACTION_PUSH_MISSING_DEPS, "ff");
         json.put(JsonTags.COORDINATOR_ACTION_EXTERNAL_STATUS, "g");
         json.put(JsonTags.COORDINATOR_ACTION_TRACKER_URI, "h");
         json.put(JsonTags.COORDINATOR_ACTION_CONSOLE_URL, "i");
@@ -217,6 +218,7 @@ public class TestJsonToBean extends Test
         assertEquals("e", action.getRunConf());
         assertEquals(JsonUtils.parseDateRfc822(LAST_MODIFIED), action.getLastModifiedTime());
         assertEquals("f", action.getMissingDependencies());
+        assertEquals("ff", action.getPushMissingDependencies());
         assertEquals("g", action.getExternalStatus());
         assertEquals("h", action.getTrackerUri());
         assertEquals("i", action.getConsoleUrl());

Modified: oozie/trunk/core/pom.xml
URL: http://svn.apache.org/viewvc/oozie/trunk/core/pom.xml?rev=1449911&r1=1449910&r2=1449911&view=diff
==============================================================================
--- oozie/trunk/core/pom.xml (original)
+++ oozie/trunk/core/pom.xml Mon Feb 25 21:42:07 2013
@@ -49,9 +49,15 @@
         </dependency>
 
         <dependency>
-            <groupId>org.apache.oozie</groupId>
-            <artifactId>oozie-hbase</artifactId>
-            <scope>provided</scope>
+             <groupId>org.apache.oozie</groupId>
+             <artifactId>oozie-hbase</artifactId>
+             <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+             <groupId>org.apache.oozie</groupId>
+             <artifactId>oozie-hcatalog</artifactId>
+             <scope>provided</scope>
         </dependency>
 
         <dependency>
@@ -102,6 +108,12 @@
         </dependency>
 
         <dependency>
+            <groupId>net.sf.ehcache</groupId>
+            <artifactId>ehcache-core</artifactId>
+            <scope>compile</scope>
+        </dependency>
+
+        <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
             <scope>test</scope>
@@ -347,6 +359,24 @@
             <scope>test</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.activemq</groupId>
+            <artifactId>activemq-client</artifactId>
+            <scope>compile</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.activemq</groupId>
+            <artifactId>activemq-broker</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.activemq</groupId>
+            <artifactId>activemq-kahadb-store</artifactId>
+            <scope>test</scope>
+        </dependency>
+
         <!-- For drawing runtime DAG -->
         <dependency>
             <groupId>net.sf.jung</groupId>

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java?rev=1449911&r1=1449910&r2=1449911&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java Mon Feb 25 21:42:07 2013
@@ -56,6 +56,8 @@ import org.apache.openjpa.persistence.jd
         @NamedQuery(name = "UPDATE_COORD_ACTION_STATUS_PENDING_TIME", query = "update CoordinatorActionBean w set w.status =:status, w.pending =:pending, w.lastModifiedTimestamp = :lastModifiedTime where w.id = :id"),
         // Update query for InputCheck
         @NamedQuery(name = "UPDATE_COORD_ACTION_FOR_INPUTCHECK", query = "update CoordinatorActionBean w set w.status = :status, w.lastModifiedTimestamp = :lastModifiedTime, w.actionXml = :actionXml, w.missingDependencies = :missingDependencies where w.id = :id"),
+        // Update query for Push-based missing dependency check
+        @NamedQuery(name = "UPDATE_COORD_ACTION_FOR_PUSH_INPUTCHECK", query = "update CoordinatorActionBean w set w.status = :status, w.lastModifiedTimestamp = :lastModifiedTime, w.pushMissingDependencies = :pushMissingDependencies where w.id = :id"),
         // Update query for Start
         @NamedQuery(name = "UPDATE_COORD_ACTION_FOR_START", query = "update CoordinatorActionBean w set w.status =:status, w.lastModifiedTimestamp = :lastModifiedTime, w.runConf = :runConf, w.externalId = :externalId, w.pending = :pending, w.errorCode = :errorCode, w.errorMessage = :errorMessage  where w.id = :id"),
 
@@ -71,11 +73,11 @@ import org.apache.openjpa.persistence.jd
         @NamedQuery(name = "GET_COORD_ACTION", query = "select OBJECT(a) from CoordinatorActionBean a where a.id = :id"),
 
         // Select query used by ActionInfo command
-        @NamedQuery(name = "GET_COORD_ACTION_FOR_INFO", query = "select a.id, a.jobId, a.actionNumber, a.consoleUrl, a.errorCode, a.errorMessage, a.externalId, a.externalStatus, a.trackerUri, a.createdTimestamp, a.nominalTimestamp, a.status, a.lastModifiedTimestamp, a.missingDependencies from CoordinatorActionBean a where a.id = :id"),
+        @NamedQuery(name = "GET_COORD_ACTION_FOR_INFO", query = "select a.id, a.jobId, a.actionNumber, a.consoleUrl, a.errorCode, a.errorMessage, a.externalId, a.externalStatus, a.trackerUri, a.createdTimestamp, a.nominalTimestamp, a.status, a.lastModifiedTimestamp, a.missingDependencies, a.pushMissingDependencies from CoordinatorActionBean a where a.id = :id"),
         // Select Query used by Timeout command
         @NamedQuery(name = "GET_COORD_ACTION_FOR_TIMEOUT", query = "select a.id, a.jobId, a.status, a.runConf, a.pending from CoordinatorActionBean a where a.id = :id"),
         // Select query used by InputCheck command
-        @NamedQuery(name = "GET_COORD_ACTION_FOR_INPUTCHECK", query = "select a.id, a.jobId, a.status, a.runConf, a.nominalTimestamp, a.createdTimestamp, a.actionXml, a.missingDependencies, a.timeOut from CoordinatorActionBean a where a.id = :id"),
+        @NamedQuery(name = "GET_COORD_ACTION_FOR_INPUTCHECK", query = "select a.id, a.jobId, a.status, a.runConf, a.nominalTimestamp, a.createdTimestamp, a.actionXml, a.missingDependencies, a.pushMissingDependencies, a.timeOut from CoordinatorActionBean a where a.id = :id"),
         // Select query used by CoordActionUpdate command
         @NamedQuery(name = "GET_COORD_ACTION_FOR_EXTERNALID", query = "select a.id, a.jobId, a.status, a.pending, a.externalId, a.lastModifiedTimestamp, a.slaXml from CoordinatorActionBean a where a.externalId = :externalId"),
         // Select query used by Check command
@@ -99,7 +101,7 @@ import org.apache.openjpa.persistence.jd
 
         @NamedQuery(name = "GET_ACTIONS_FOR_COORD_JOB", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId"),
         // Query to retrieve Coordinator actions sorted by nominal time
-        @NamedQuery(name = "GET_ACTIONS_FOR_COORD_JOB_ORDER_BY_NOMINAL_TIME", query = "select a.id, a.actionNumber, a.consoleUrl, a.errorCode, a.errorMessage, a.externalId, a.externalStatus, a.jobId, a.trackerUri, a.createdTimestamp, a.nominalTimestamp, a.status, a.lastModifiedTimestamp, a.missingDependencies, a.timeOut from CoordinatorActionBean a where a.jobId = :jobId order by a.nominalTimestamp"),
+        @NamedQuery(name = "GET_ACTIONS_FOR_COORD_JOB_ORDER_BY_NOMINAL_TIME", query = "select a.id, a.actionNumber, a.consoleUrl, a.errorCode, a.errorMessage, a.externalId, a.externalStatus, a.jobId, a.trackerUri, a.createdTimestamp, a.nominalTimestamp, a.status, a.lastModifiedTimestamp, a.missingDependencies, a.pushMissingDependencies, a.timeOut from CoordinatorActionBean a where a.jobId = :jobId order by a.nominalTimestamp"),
         // Query to maintain backward compatibility for coord job info command
         @NamedQuery(name = "GET_ALL_COLS_FOR_ACTIONS_FOR_COORD_JOB_ORDER_BY_NOMINAL_TIME", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId order by a.nominalTimestamp"),
         // Query to retrieve action id, action status, pending status and external Id of not completed Coordinator actions
@@ -126,7 +128,7 @@ import org.apache.openjpa.persistence.jd
 
         @NamedQuery(name = "GET_RUNNING_ACTIONS_OLDER_THAN", query = "select a.id from CoordinatorActionBean a where a.status = 'RUNNING' AND a.lastModifiedTimestamp <= :lastModifiedTime"),
 
-        @NamedQuery(name = "GET_COORD_ACTIONS_WAITING_SUBMITTED_OLDER_THAN", query = "select a.id, a.jobId, a.status, a.externalId from CoordinatorActionBean a where (a.status = 'WAITING' OR a.status = 'SUBMITTED') AND a.lastModifiedTimestamp <= :lastModifiedTime"),
+        @NamedQuery(name = "GET_COORD_ACTIONS_WAITING_SUBMITTED_OLDER_THAN", query = "select a.id, a.jobId, a.status, a.externalId, a.pushMissingDependencies from CoordinatorActionBean a where (a.status = 'WAITING' OR a.status = 'SUBMITTED') AND a.lastModifiedTimestamp <= :lastModifiedTime"),
 
         @NamedQuery(name = "GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN", query = "select a.id, a.jobId, a.status, a.externalId from CoordinatorActionBean a where a.pending > 0 AND (a.status = 'SUSPENDED' OR a.status = 'KILLED' OR a.status = 'RUNNING') AND a.lastModifiedTimestamp <= :lastModifiedTime"),
         // Select query used by rerun, requires almost all columns so select * is used

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/ErrorCode.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/ErrorCode.java?rev=1449911&r1=1449910&r2=1449911&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/ErrorCode.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/ErrorCode.java Mon Feb 25 21:42:07 2013
@@ -177,6 +177,8 @@ public enum ErrorCode {
     E0902(XLog.OPS, "Exception occured: [{0}]"),
     E0903(XLog.OPS, "Invalid JobConf, it has not been created by HadoopAccessorService"),
     E0904(XLog.STD, "Scheme [{0}] not supported in uri [{1}]"),
+    E0905(XLog.STD, "Scheme not present in uri [{0}]"),
+    E0906(XLog.STD, "URI parsing error : {0}"),
 
     E1001(XLog.STD, "Could not read the coordinator job definition, {0}"),
     E1002(XLog.STD, "Invalid coordinator application URI [{0}], {1}"),
@@ -230,6 +232,8 @@ public enum ErrorCode {
 
     E1400(XLog.STD, "doAs (proxyuser) failure"),
 
+    E1501(XLog.STD, "Error in getting HCat Access [{0}]"),
+
     ETEST(XLog.STD, "THIS SHOULD HAPPEN ONLY IN TESTING, invalid job id [{0}]"),;
 
     private String template;

Added: oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/FSLauncherURIHandler.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/FSLauncherURIHandler.java?rev=1449911&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/FSLauncherURIHandler.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/FSLauncherURIHandler.java Mon Feb 25 21:42:07 2013
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public class FSLauncherURIHandler implements LauncherURIHandler {
+
+    @Override
+    public boolean create(URI uri, Configuration conf) throws LauncherException {
+        boolean status = false;
+        try {
+            FileSystem fs = FileSystem.get(uri, conf);
+            Path path = getNormalizedPath(uri);
+            if (!fs.exists(path)) {
+                status = fs.mkdirs(path);
+                if (status) {
+                    System.out.println("Creating directory at " + path + " succeeded.");
+                }
+                else {
+                    System.out.println("Creating directory at " + path + " failed.");
+                }
+            }
+        }
+        catch (IOException e) {
+            throw new LauncherException("Creating directory at " + uri + " failed.", e);
+        }
+        return status;
+    }
+
+    @Override
+    public boolean delete(URI uri, Configuration conf) throws LauncherException {
+        boolean status = false;
+        try {
+            FileSystem fs = FileSystem.get(uri, conf);
+            Path path = getNormalizedPath(uri);
+            if (fs.exists(path)) {
+                status = fs.delete(path, true);
+                if (status) {
+                    System.out.println("Deletion of path " + path + " succeeded.");
+                }
+                else {
+                    System.out.println("Deletion of path " + path + " failed.");
+                }
+            }
+        }
+        catch (IOException e) {
+            throw new LauncherException("Deletion of path " + uri + " failed.", e);
+        }
+        return status;
+    }
+
+    private Path getNormalizedPath(URI uri) {
+        // Normalizes uri path replacing // with / in the path which users specify by mistake
+        return new Path(uri.getScheme(), uri.getAuthority(), uri.getPath());
+    }
+
+    @Override
+    public List<Class<?>> getClassesForLauncher() {
+        return null;
+    }
+
+}

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/FileSystemActions.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/FileSystemActions.java?rev=1449911&r1=1449910&r2=1449911&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/FileSystemActions.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/FileSystemActions.java Mon Feb 25 21:42:07 2013
@@ -1,114 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.oozie.action.hadoop;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.HashSet;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.w3c.dom.Node;
-/**
- * Class to perform file system operations specified in the prepare block of Workflow
- *
- */
-public class FileSystemActions {
-    private static Collection<String> supportedFileSystems;
-
-    public FileSystemActions(Collection<String> fileSystems) {
-        supportedFileSystems = fileSystems;
-    }
-
-    /**
-     * Method to execute the prepare actions based on the command
-     *
-     * @param n Child node of the prepare XML
-     * @throws LauncherException
-     */
-    public void execute(Node n) throws LauncherException {
-        String command = n.getNodeName();
-        if (command.equals("delete")) {
-            delete(new Path(n.getAttributes().getNamedItem("path").getNodeValue().trim()));
-        } else if (command.equals("mkdir")) {
-            mkdir(new Path(n.getAttributes().getNamedItem("path").getNodeValue().trim()));
-        }
-    }
-
-    // Method to delete the specified file based on the path
-    private void delete(Path path) throws LauncherException {
-        try {
-            validatePath(path, true);
-            FileSystem fs = FileSystem.get(path.toUri(), new Configuration());
-            if (fs.exists(path)) {
-                if (!fs.delete(path, true)) {
-                    String deleteFailed = "Deletion of path " + path.toString() + " failed.";
-                    System.out.println(deleteFailed);
-                    throw new LauncherException(deleteFailed);
-                } else {
-                    System.out.println("Deletion of path " + path.toString() + " was successful.");
-                }
-            }
-        } catch (IOException ex) {
-            throw new LauncherException(ex.getMessage(), ex);
-        }
-
-    }
-
-    // Method to create a directory based on the path
-    private void mkdir(Path path) throws LauncherException {
-        try {
-            validatePath(path, true);
-            FileSystem fs = FileSystem.get(path.toUri(), new Configuration());
-            if (!fs.exists(path)) {
-                if (!fs.mkdirs(path)) {
-                    String mkdirFailed = "Creating directory at " + path + " failed.";
-                    System.out.println(mkdirFailed);
-                    throw new LauncherException(mkdirFailed);
-                } else {
-                    System.out.println("Creating directory at path " + path + " was successful.");
-                }
-            }
-        } catch (IOException ex) {
-            throw new LauncherException(ex.getMessage(), ex);
-        }
-    }
-
-    // Method to validate the path provided for the prepare action
-    private void validatePath(Path path, boolean withScheme) throws LauncherException {
-        String scheme = path.toUri().getScheme();
-        if (withScheme) {
-            if (scheme == null) {
-                String nullScheme = "Scheme of the path " + path + " is null";
-                System.out.println(nullScheme);
-                throw new LauncherException(nullScheme);
-            } else if (supportedFileSystems.size() != 1 || !supportedFileSystems.iterator().next().equals("*")) {
-                if (!supportedFileSystems.contains(scheme.toLowerCase())) {
-                    String unsupportedScheme = "Scheme of '" + path + "' is not supported.";
-                    System.out.println(unsupportedScheme);
-                    throw new LauncherException(unsupportedScheme);
-                }
-            }
-        } else if (scheme != null) {
-            String notNullScheme = "Scheme of the path " + path + " is not null as specified.";
-            System.out.println(notNullScheme);
-            throw new LauncherException(notNullScheme);
-        }
-    }
-}

Added: oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/HCatLauncherURIHandler.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/HCatLauncherURIHandler.java?rev=1449911&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/HCatLauncherURIHandler.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/HCatLauncherURIHandler.java Mon Feb 25 21:42:07 2013
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hcatalog.api.ConnectionFailureException;
+import org.apache.hcatalog.api.HCatClient;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.oozie.util.HCatURI;
+
+public class HCatLauncherURIHandler implements LauncherURIHandler {
+
+    private static List<Class<?>> classesToShip = new ArrayList<Class<?>>();
+
+    static {
+        classesToShip.add(HCatURI.class);
+    }
+
+    @Override
+    public boolean create(URI uri, Configuration conf) throws LauncherException {
+        throw new UnsupportedOperationException("Creation of partition is not supported for " + uri);
+    }
+
+    @Override
+    public boolean delete(URI uri, Configuration conf) throws LauncherException {
+        HCatClient client = getHCatClient(uri, conf);
+        try {
+            HCatURI hcatURI = new HCatURI(uri.toString());
+            client.dropPartitions(hcatURI.getDb(), hcatURI.getTable(), hcatURI.getPartitionMap(), true);
+            System.out.println("Dropped partitions for " + uri);
+            return true;
+        }
+        catch (ConnectionFailureException e) {
+            throw new LauncherException("Error trying to drop " + uri, e);
+        }
+        catch (HCatException e) {
+            throw new LauncherException("Error trying to drop " + uri, e);
+        }
+        catch (URISyntaxException e) {
+            throw new LauncherException("Error trying to drop " + uri, e);
+        }
+        finally {
+            closeQuietly(client);
+        }
+    }
+
+    private HCatClient getHCatClient(URI uri, Configuration conf) throws LauncherException {
+        final HiveConf hiveConf = new HiveConf(conf, this.getClass());
+        String serverURI = getMetastoreConnectURI(uri);
+        if (!serverURI.equals("")) {
+            hiveConf.set("hive.metastore.local", "false");
+        }
+        hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, serverURI);
+        try {
+            System.out.println("Creating HCatClient for user=" + UserGroupInformation.getCurrentUser() + " and server="
+                    + serverURI);
+            // Delegation token fetched from metastore has new Text() as service and
+            // HiveMetastoreClient looks for the same if not overriden by hive.metastore.token.signature
+            // We are good as long as HCatCredentialHelper does not change the service of the token.
+            return HCatClient.create(hiveConf);
+        }
+        catch (HCatException e) {
+            throw new LauncherException("Error trying to connect to " + serverURI, e);
+        }
+        catch (IOException e) {
+            throw new LauncherException("Error trying to connect to " + serverURI, e);
+        }
+    }
+
+    private String getMetastoreConnectURI(URI uri) {
+        String metastoreURI;
+        // For unit tests
+        if (uri.getAuthority().equals("unittest-local")) {
+            metastoreURI = "";
+        }
+        else {
+            // Hardcoding hcat to thrift mapping till support for webhcat(templeton)
+            // is added
+            metastoreURI = "thrift://" + uri.getAuthority();
+        }
+        return metastoreURI;
+    }
+
+    private void closeQuietly(HCatClient client) {
+        if (client != null) {
+            try {
+                client.close();
+            }
+            catch (Exception ignore) {
+                System.err.println("Error closing hcat client");
+                ignore.printStackTrace(System.err);
+            }
+        }
+    }
+
+    @Override
+    public List<Class<?>> getClassesForLauncher() {
+        return classesToShip;
+    }
+
+}

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java?rev=1449911&r1=1449910&r2=1449911&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java Mon Feb 25 21:42:07 2013
@@ -61,6 +61,7 @@ import org.apache.oozie.client.WorkflowA
 import org.apache.oozie.service.HadoopAccessorException;
 import org.apache.oozie.service.HadoopAccessorService;
 import org.apache.oozie.service.Services;
+import org.apache.oozie.service.URIHandlerService;
 import org.apache.oozie.service.WorkflowAppService;
 import org.apache.oozie.servlet.CallbackServlet;
 import org.apache.oozie.util.ELEvaluator;
@@ -127,8 +128,8 @@ public class JavaActionExecutor extends 
         classes.add(LauncherSecurityManager.class);
         classes.add(LauncherException.class);
         classes.add(LauncherMainException.class);
-        classes.add(FileSystemActions.class);
         classes.add(PrepareActionsDriver.class);
+        classes.addAll(Services.get().get(URIHandlerService.class).getClassesForLauncher());
         classes.add(ActionStats.class);
         classes.add(ActionType.class);
         return classes;
@@ -388,39 +389,44 @@ public class JavaActionExecutor extends 
         }
     }
 
-    protected void addShareLib(Path appPath, Configuration conf, String actionShareLibName)
-    throws ActionExecutorException {
-        if (actionShareLibName != null) {
-            try {
-                Path systemLibPath = Services.get().get(WorkflowAppService.class).getSystemLibPath();
-                if (systemLibPath != null) {
-                    Path actionLibPath = new Path(systemLibPath, actionShareLibName);
-                    String user = conf.get("user.name");
-                    FileSystem fs;
-                    // If the actionLibPath has a valid scheme and authority, then use them to determine the filesystem that the
-                    // sharelib resides on; otherwise, assume it resides on the same filesystem as the appPath and use the appPath
-                    // to determine the filesystem
-                    if (actionLibPath.toUri().getScheme() != null && actionLibPath.toUri().getAuthority() != null) {
-                        fs = Services.get().get(HadoopAccessorService.class).createFileSystem(user, actionLibPath.toUri(), conf);
-                    }
-                    else {
-                        fs = Services.get().get(HadoopAccessorService.class).createFileSystem(user, appPath.toUri(), conf);
-                    }
-                    if (fs.exists(actionLibPath)) {
-                        FileStatus[] files = fs.listStatus(actionLibPath);
-                        for (FileStatus file : files) {
-                            addToCache(conf, actionLibPath, file.getPath().toUri().getPath(), false);
+    protected void addShareLib(Path appPath, Configuration conf, String[] actionShareLibNames)
+            throws ActionExecutorException {
+        if (actionShareLibNames != null) {
+            for (String actionShareLibName : actionShareLibNames) {
+                try {
+                    Path systemLibPath = Services.get().get(WorkflowAppService.class).getSystemLibPath();
+                    if (systemLibPath != null) {
+                        Path actionLibPath = new Path(systemLibPath, actionShareLibName.trim());
+                        String user = conf.get("user.name");
+                        FileSystem fs;
+                        // If the actionLibPath has a valid scheme and authority, then use them to
+                        // determine the filesystem that the sharelib resides on; otherwise, assume
+                        // it resides on the same filesystem as the appPath and use the appPath to
+                        // determine the filesystem
+                        if (actionLibPath.toUri().getScheme() != null && actionLibPath.toUri().getAuthority() != null) {
+                            fs = Services.get().get(HadoopAccessorService.class)
+                                    .createFileSystem(user, actionLibPath.toUri(), conf);
+                        }
+                        else {
+                            fs = Services.get().get(HadoopAccessorService.class)
+                                    .createFileSystem(user, appPath.toUri(), conf);
+                        }
+                        if (fs.exists(actionLibPath)) {
+                            FileStatus[] files = fs.listStatus(actionLibPath);
+                            for (FileStatus file : files) {
+                                addToCache(conf, actionLibPath, file.getPath().toUri().getPath(), false);
+                            }
                         }
                     }
                 }
-            }
-            catch (HadoopAccessorException ex){
-                throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED,
-                        ex.getErrorCode().toString(), ex.getMessage());
-            }
-            catch (IOException ex){
-                throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED,
-                        "It should never happen", ex.getMessage());
+                catch (HadoopAccessorException ex) {
+                    throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, ex.getErrorCode()
+                            .toString(), ex.getMessage());
+                }
+                catch (IOException ex) {
+                    throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED,
+                            "It should never happen", ex.getMessage());
+                }
             }
         }
     }
@@ -500,7 +506,7 @@ public class JavaActionExecutor extends 
         // Add action specific share libs
         addActionShareLib(appPath, conf, context, actionXml);
         // Add common sharelibs for Oozie
-        addShareLib(appPath, conf, JavaActionExecutor.OOZIE_COMMON_LIBDIR);
+        addShareLib(appPath, conf, new String[]{JavaActionExecutor.OOZIE_COMMON_LIBDIR});
     }
 
     private void addActionShareLib(Path appPath, Configuration conf, Context context, Element actionXml)
@@ -516,7 +522,7 @@ public class JavaActionExecutor extends 
         // Action sharelibs are only added if user has specified to use system libpath
         if (wfJobConf.getBoolean(OozieClient.USE_SYSTEM_LIBPATH, false)) {
             // add action specific sharelibs
-            addShareLib(appPath, conf, getShareLibName(context, actionXml, conf));
+            addShareLib(appPath, conf, getShareLibNames(context, actionXml, conf));
         }
     }
 
@@ -585,8 +591,7 @@ public class JavaActionExecutor extends 
                     prepareXML);
 
             LauncherMapper.setupMainClass(launcherJobConf, getLauncherMain(launcherJobConf, actionXml));
-            LauncherMapper.setupSupportedFileSystems(
-                launcherJobConf, Services.get().getConf().get(HadoopAccessorService.SUPPORTED_FILESYSTEMS));
+            LauncherMapper.setupLauncherURIHandlerConf(launcherJobConf);
             LauncherMapper.setupMaxOutputData(launcherJobConf, maxActionOutputLen);
             LauncherMapper.setupMaxExternalStatsSize(launcherJobConf, maxExternalStatsSize);
 
@@ -1161,14 +1166,15 @@ public class JavaActionExecutor extends 
 
 
     /**
-     * Return the sharelib name for the action.
+     * Return the sharelib names for the action.
      * <p/>
-     * If <code>NULL</code> or emtpy, it means that the action does not use the action
+     * If <code>NULL</code> or empty, it means that the action does not use the action
      * sharelib.
      * <p/>
      * If a non-empty string, i.e. <code>foo</code>, it means the action uses the
-     * action sharelib subdirectory <code>foo</code> and all JARs in the sharelib
-     * <code>foo</code> directory will be in the action classpath.
+     * action sharelib sub-directory <code>foo</code> and all JARs in the sharelib
+     * <code>foo</code> directory will be in the action classpath. Multiple sharelib
+     * sub-directories can be specified as a comma separated list.
      * <p/>
      * The resolution is done using the following precedence order:
      * <ul>
@@ -1181,18 +1187,22 @@ public class JavaActionExecutor extends 
      *
      * @param context executor context.
      * @param actionXml
-     *@param conf action configuration.  @return the action sharelib name.
+     * @param conf action configuration.
+     * @return the action sharelib names.
      */
-    protected String getShareLibName(Context context, Element actionXml, Configuration conf) {
-        String name = conf.get(ACTION_SHARELIB_FOR + getType());
-        if (name == null) {
+    protected String[] getShareLibNames(Context context, Element actionXml, Configuration conf) {
+        String[] names = conf.getStrings(ACTION_SHARELIB_FOR + getType());
+        if (names == null || names.length == 0) {
             try {
                 XConfiguration jobConf = new XConfiguration(new StringReader(context.getWorkflow().getConf()));
-                name = jobConf.get(ACTION_SHARELIB_FOR + getType());
-                if (name == null) {
-                    name = Services.get().getConf().get(ACTION_SHARELIB_FOR + getType());
-                    if (name == null) {
-                        name = getDefaultShareLibName(actionXml);
+                names = jobConf.getStrings(ACTION_SHARELIB_FOR + getType());
+                if (names == null || names.length == 0) {
+                    names = Services.get().getConf().getStrings(ACTION_SHARELIB_FOR + getType());
+                    if (names == null || names.length == 0) {
+                        String name = getDefaultShareLibName(actionXml);
+                        if (name != null) {
+                            names = new String[] { name };
+                        }
                     }
                 }
             }
@@ -1200,7 +1210,7 @@ public class JavaActionExecutor extends 
                 throw new RuntimeException("It cannot happen, " + ex.toString(), ex);
             }
         }
-        return name;
+        return names;
     }
 
     private final static String ACTION_SHARELIB_FOR = "oozie.action.sharelib.for.";

Added: oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/LauncherException.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/LauncherException.java?rev=1449911&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/LauncherException.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/LauncherException.java Mon Feb 25 21:42:07 2013
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+public class LauncherException extends Exception {
+
+    /**
+     * Constructs an <code>LauncherException</code> with the
+     * specified detail message.
+     *
+     * @param message   the detail message.
+     */
+    LauncherException(String message) {
+        super(message);
+    }
+
+    /**
+     * Constructs a new exception with the specified detail message and cause.
+     *
+     * @param message the detail message
+     * @param cause the cause exception
+     */
+    LauncherException(String message, Throwable cause) {
+        super(message, cause);
+    }
+}

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java?rev=1449911&r1=1449910&r2=1449911&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java Mon Feb 25 21:42:07 2013
@@ -33,13 +33,14 @@ import java.lang.reflect.Method;
 import java.net.URI;
 import java.security.Permission;
 import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
+import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.StringTokenizer;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -53,12 +54,12 @@ import org.apache.hadoop.mapred.RunningJ
 import org.apache.oozie.service.HadoopAccessorException;
 import org.apache.oozie.service.HadoopAccessorService;
 import org.apache.oozie.service.Services;
+import org.apache.oozie.service.URIHandlerService;
 import org.apache.oozie.util.XLog;
 
 public class LauncherMapper<K1, V1, K2, V2> implements Mapper<K1, V1, K2, V2>, Runnable {
 
     public static final String CONF_OOZIE_ACTION_MAIN_CLASS = "oozie.launcher.action.main.class";
-    public static final String CONF_OOZIE_ACTION_SUPPORTED_FILESYSTEMS = "oozie.launcher.action.supported.filesystems";
 
     public static final String CONF_OOZIE_ACTION_MAX_OUTPUT_DATA = "oozie.action.max.output.data";
 
@@ -152,8 +153,10 @@ public class LauncherMapper<K1, V1, K2, 
         launcherConf.set(CONF_OOZIE_ACTION_MAIN_CLASS, javaMainClass);
     }
 
-    public static void setupSupportedFileSystems(Configuration launcherConf, String supportedFileSystems) {
-        launcherConf.set(CONF_OOZIE_ACTION_SUPPORTED_FILESYSTEMS, supportedFileSystems);
+    public static void setupLauncherURIHandlerConf(Configuration launcherConf) {
+        for(Entry<String, String> entry : Services.get().get(URIHandlerService.class).getLauncherConfig()) {
+            launcherConf.set(entry.getKey(), entry.getValue());
+        }
     }
 
     public static void setupMainArguments(Configuration launcherConf, String[] args) {
@@ -547,6 +550,7 @@ public class LauncherMapper<K1, V1, K2, 
                             System.out.println();
                         }
                         handleActionStatsData(reporter);
+                        handleExternalChildIDs(reporter);
                         File newId = new File(System.getProperty("oozie.action.newId.properties"));
                         if (newId.exists()) {
                             Properties props = new Properties();
@@ -665,8 +669,7 @@ public class LauncherMapper<K1, V1, K2, 
         String prepareXML = getJobConf().get(ACTION_PREPARE_XML);
         if (prepareXML != null) {
              if (!prepareXML.equals("")) {
-                 PrepareActionsDriver.doOperations(
-                     getJobConf().getStringCollection(CONF_OOZIE_ACTION_SUPPORTED_FILESYSTEMS), prepareXML);
+                 PrepareActionsDriver.doOperations(prepareXML, getJobConf());
              } else {
                  System.out.println("There are no prepare actions to execute.");
              }
@@ -817,14 +820,3 @@ class LauncherSecurityManager extends Se
         exitCode = 0;
     }
 }
-
-class LauncherException extends Exception {
-
-    LauncherException(String message) {
-        super(message);
-    }
-
-    LauncherException(String message, Throwable cause) {
-        super(message, cause);
-    }
-}

Added: oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/LauncherURIHandler.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/LauncherURIHandler.java?rev=1449911&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/LauncherURIHandler.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/LauncherURIHandler.java Mon Feb 25 21:42:07 2013
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+import java.net.URI;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+
+public interface LauncherURIHandler {
+
+    /**
+     * Create the resource identified by the URI
+     *
+     * @param uri URI of the dependency
+     * @param conf Configuration to access the URI
+     *
+     * @return <code>true</code> if the URI did not exist and was successfully
+     *         created; <code>false</code> if the URI already existed
+     *
+     * @throws LauncherException
+     */
+    public boolean create(URI uri, Configuration conf) throws LauncherException;
+
+    /**
+     * Delete the resource identified by the URI
+     *
+     * @param uri URI of the dependency
+     * @param conf Configuration to access the URI
+     *
+     * @return <code>true</code> if the URI exists and was successfully deleted;
+     *         <code>false</code> if the URI does not exist
+     * @throws LauncherException
+     */
+    public boolean delete(URI uri, Configuration conf) throws LauncherException;
+
+
+    /**
+     * Get list of classes to ship to launcher for LauncherURIHandler
+     *
+     * @return list of classes to ship to launcher
+     */
+    public List<Class<?>> getClassesForLauncher();
+
+}

Added: oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/LauncherURIHandlerFactory.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/LauncherURIHandlerFactory.java?rev=1449911&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/LauncherURIHandlerFactory.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/LauncherURIHandlerFactory.java Mon Feb 25 21:42:07 2013
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+import java.net.URI;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+
+public class LauncherURIHandlerFactory {
+
+    public static final String CONF_LAUNCHER_URIHANDLER_SCHEME_PREFIX = "oozie.launcher.action.urihandler.scheme.";
+    private Configuration conf;
+
+    public LauncherURIHandlerFactory(Configuration conf) {
+        this.conf = conf;
+    }
+
+    /**
+     * Get LauncherURIHandler to perform operations on a URI in the launcher
+     * @param uri
+     * @return LauncherURIHandler to perform operations on the URI
+     * @throws LauncherException
+     */
+    public LauncherURIHandler getURIHandler(URI uri) throws LauncherException {
+        if (uri.getScheme() == null) {
+            throw new LauncherException("Scheme not present in uri " + uri);
+        }
+        else {
+            String className = conf.get(CONF_LAUNCHER_URIHANDLER_SCHEME_PREFIX + uri.getScheme());
+            if (className == null) {
+                className = conf.get(CONF_LAUNCHER_URIHANDLER_SCHEME_PREFIX + "*");
+            }
+            if (className == null) {
+                throw new LauncherException("Scheme " + uri.getScheme() + " not supported in uri " + uri.toString());
+            }
+            Class<?> clazz;
+            try {
+                clazz = Class.forName(className);
+            }
+            catch (ClassNotFoundException e) {
+                throw new LauncherException("Error instantiating LauncherURIHandler", e);
+            }
+            return (LauncherURIHandler) ReflectionUtils.newInstance(clazz, null);
+        }
+    }
+
+}

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java?rev=1449911&r1=1449910&r2=1449911&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java Mon Feb 25 21:42:07 2013
@@ -20,12 +20,13 @@ package org.apache.oozie.action.hadoop;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.util.Collection;
+import java.net.URI;
+import java.net.URISyntaxException;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.oozie.service.HadoopAccessorService;
 import org.xml.sax.SAXException;
 import org.w3c.dom.Document;
+import org.w3c.dom.Node;
 import org.w3c.dom.NodeList;
 
 import javax.xml.parsers.DocumentBuilderFactory;
@@ -44,27 +45,25 @@ public class PrepareActionsDriver {
      * @param prepareXML Prepare XML block in string format
      * @throws LauncherException
      */
-    static void doOperations(Collection<String> supportedFileSystems, String prepareXML) throws LauncherException {
+    static void doOperations(String prepareXML, Configuration conf) throws LauncherException {
         try {
             Document doc = getDocumentFromXML(prepareXML);
             doc.getDocumentElement().normalize();
 
             // Get the list of child nodes, basically, each one corresponding to a separate action
             NodeList nl = doc.getDocumentElement().getChildNodes();
-            FileSystemActions fsActions = new FileSystemActions(supportedFileSystems);
+            LauncherURIHandlerFactory factory = new LauncherURIHandlerFactory(conf);
 
             for (int i = 0; i < nl.getLength(); ++i) {
-                String commandType = "";
-                /* Logic to find the command type goes here
-                commandType = ..........;
-                */
-                // As of now, the available prepare action is of type hdfs. Hence, assigning the value directly
-                commandType = "hdfs";
-                if (commandType.equalsIgnoreCase("hdfs")) {
-                    fsActions.execute(nl.item(i));
-                } /*else if(commandType.equalsIgnoreCase("hcat")) {     //Other command types go here
-                    hCatActions.execute(nl.item(i));
-                  }*/
+                Node n = nl.item(i);
+                String operation = n.getNodeName();
+                if (n.getAttributes() == null || n.getAttributes().getNamedItem("path") == null) {
+                    continue;
+                }
+                String path = n.getAttributes().getNamedItem("path").getNodeValue().trim();
+                URI uri = new URI(path);
+                LauncherURIHandler handler = factory.getURIHandler(uri);
+                execute(operation, uri, handler, conf);
             }
         } catch (IOException ioe) {
             throw new LauncherException(ioe.getMessage(), ioe);
@@ -72,6 +71,24 @@ public class PrepareActionsDriver {
             throw new LauncherException(saxe.getMessage(), saxe);
         } catch (ParserConfigurationException pce) {
             throw new LauncherException(pce.getMessage(), pce);
+        } catch (URISyntaxException use) {
+            throw new LauncherException(use.getMessage(), use);
+        }
+    }
+
+    /**
+     * Method to execute the prepare actions based on the command
+     *
+     * @param n Child node of the prepare XML
+     * @throws LauncherException
+     */
+    private static void execute(String operation, URI uri, LauncherURIHandler handler, Configuration conf)
+            throws LauncherException {
+        if (operation.equals("delete")) {
+            handler.delete(uri, conf);
+        }
+        else if (operation.equals("mkdir")) {
+            handler.create(uri, conf);
         }
     }
 
@@ -83,4 +100,5 @@ public class PrepareActionsDriver {
         InputStream is = new ByteArrayInputStream(prepareXML.getBytes("UTF-8"));
         return docBuilder.parse(is);
     }
-}
+
+}
\ No newline at end of file

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/client/rest/JsonCoordinatorAction.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/client/rest/JsonCoordinatorAction.java?rev=1449911&r1=1449910&r2=1449911&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/client/rest/JsonCoordinatorAction.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/client/rest/JsonCoordinatorAction.java Mon Feb 25 21:42:07 2013
@@ -89,6 +89,10 @@ public class JsonCoordinatorAction imple
     @Lob
     private String missingDependencies;
 
+    @Column(name = "push_missing_dependencies")
+    @Lob
+    private String pushMissingDependencies;
+
     @Basic
     @Column(name = "external_status")
     private String externalStatus;
@@ -142,6 +146,7 @@ public class JsonCoordinatorAction imple
         // json.put(JsonTags.COORDINATOR_ACTION_END_TIME, JsonUtils
         // .formatDateRfc822(endTime), timeZoneId);
         json.put(JsonTags.COORDINATOR_ACTION_MISSING_DEPS, missingDependencies);
+        json.put(JsonTags.COORDINATOR_ACTION_PUSH_MISSING_DEPS, pushMissingDependencies);
         json.put(JsonTags.COORDINATOR_ACTION_EXTERNAL_STATUS, externalStatus);
         json.put(JsonTags.COORDINATOR_ACTION_TRACKER_URI, trackerUri);
         json.put(JsonTags.COORDINATOR_ACTION_CONSOLE_URL, consoleUrl);
@@ -240,6 +245,14 @@ public class JsonCoordinatorAction imple
         return missingDependencies;
     }
 
+    public String getPushMissingDependencies() {
+        return pushMissingDependencies;
+    }
+
+    public void setPushMissingDependencies(String pushMissingDependencies) {
+        this.pushMissingDependencies = pushMissingDependencies;
+    }
+
     public String getExternalStatus() {
         return externalStatus;
     }

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/client/rest/JsonCoordinatorJob.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/client/rest/JsonCoordinatorJob.java?rev=1449911&r1=1449910&r2=1449911&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/client/rest/JsonCoordinatorJob.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/client/rest/JsonCoordinatorJob.java Mon Feb 25 21:42:07 2013
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -139,7 +139,7 @@ public class JsonCoordinatorJob implemen
     public JSONObject toJSONObject() {
         return toJSONObject("GMT");
     }
-    
+
     @SuppressWarnings("unchecked")
     public JSONObject toJSONObject(String timeZoneId) {
         JSONObject json = new JSONObject();
@@ -156,7 +156,7 @@ public class JsonCoordinatorJob implemen
         json.put(JsonTags.COORDINATOR_JOB_CONCURRENCY, getConcurrency());
         json.put(JsonTags.COORDINATOR_JOB_TIMEOUT, getTimeout());
         json.put(JsonTags.COORDINATOR_JOB_LAST_ACTION_TIME, JsonUtils.formatDateRfc822(getLastActionTime(), timeZoneId));
-        json.put(JsonTags.COORDINATOR_JOB_NEXT_MATERIALIZED_TIME, 
+        json.put(JsonTags.COORDINATOR_JOB_NEXT_MATERIALIZED_TIME,
                 JsonUtils.formatDateRfc822(getNextMaterializedTime(), timeZoneId));
         json.put(JsonTags.COORDINATOR_JOB_START_TIME, JsonUtils.formatDateRfc822(getStartTime(), timeZoneId));
         json.put(JsonTags.COORDINATOR_JOB_END_TIME, JsonUtils.formatDateRfc822(getEndTime(), timeZoneId));
@@ -405,8 +405,6 @@ public class JsonCoordinatorJob implemen
 
     /**
      * Set pending to true
-     *
-     * @param pending set pending to true
      */
     public void setPending() {
         this.pending = 1;
@@ -414,8 +412,6 @@ public class JsonCoordinatorJob implemen
 
     /**
      * Set pending to false
-     *
-     * @param pending set pending to false
      */
     public void resetPending() {
         this.pending = 0;

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java?rev=1449911&r1=1449910&r2=1449911&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java Mon Feb 25 21:42:07 2013
@@ -19,11 +19,12 @@ package org.apache.oozie.command.coord;
 
 import java.io.IOException;
 import java.io.StringReader;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.Date;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
 import org.apache.oozie.CoordinatorActionBean;
 import org.apache.oozie.CoordinatorJobBean;
 import org.apache.oozie.ErrorCode;
@@ -34,16 +35,17 @@ import org.apache.oozie.command.CommandE
 import org.apache.oozie.command.PreconditionException;
 import org.apache.oozie.coord.CoordELEvaluator;
 import org.apache.oozie.coord.CoordELFunctions;
+import org.apache.oozie.dependency.URIHandler;
+import org.apache.oozie.dependency.URIHandlerException;
 import org.apache.oozie.executor.jpa.CoordActionGetForInputCheckJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordActionUpdateForInputCheckJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordActionUpdateForModifiedTimeJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
-import org.apache.oozie.service.HadoopAccessorException;
-import org.apache.oozie.service.HadoopAccessorService;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Service;
 import org.apache.oozie.service.Services;
+import org.apache.oozie.service.URIHandlerService;
 import org.apache.oozie.util.DateUtils;
 import org.apache.oozie.util.ELEvaluator;
 import org.apache.oozie.util.Instrumentation;
@@ -96,14 +98,7 @@ public class CoordActionInputCheckXComma
         if (nominalTime.compareTo(currentTime) > 0) {
             queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()), Math.max((nominalTime.getTime() - currentTime
                     .getTime()), getCoordInputCheckRequeueInterval()));
-            // update lastModifiedTime
-            coordAction.setLastModifiedTime(new Date());
-            try {
-                jpaService.execute(new CoordActionUpdateForInputCheckJPAExecutor(coordAction));
-            }
-            catch (JPAExecutorException e) {
-                throw new CommandException(e);
-            }
+            updateCoordAction(coordAction, false);
             LOG.info("[" + actionId
                     + "]::ActionInputCheck:: nominal Time is newer than current time, so requeue and wait. Current="
                     + currentTime + ", nominal=" + nominalTime);
@@ -139,50 +134,74 @@ public class CoordActionInputCheckXComma
                 nonExistList.append(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR).append(nonResolvedList);
             }
             String nonExistListStr = nonExistList.toString();
-            if (!nonExistListStr.equals(missingDeps) || missingDeps.isEmpty()) {
-                // missingDeps empty means action should become READY
+            if (!nonExistListStr.equals(missingDeps)) {
+                // missingDeps null means action should become READY
                 isChangeInDependency = true;
                 coordAction.setMissingDependencies(nonExistListStr);
             }
-            if (status == true) {
+            String pushDeps = coordAction.getPushMissingDependencies();
+            if (status == true && (pushDeps == null || pushDeps.length() == 0)) {
                 coordAction.setStatus(CoordinatorAction.Status.READY);
                 // pass jobID to the CoordActionReadyXCommand
                 queue(new CoordActionReadyXCommand(coordAction.getJobId()), 100);
             }
+            else if (!isTimeout(currentTime)) {
+                if (status == false) {
+                    queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()),
+                            getCoordInputCheckRequeueInterval());
+                }
+                else {
+                    queue(new CoordPushDependencyCheckXCommand(coordAction.getId()));
+                }
+            }
             else {
-                long waitingTime = (currentTime.getTime() - Math.max(coordAction.getNominalTime().getTime(), coordAction
-                        .getCreatedTime().getTime()))
-                        / (60 * 1000);
-                int timeOut = coordAction.getTimeOut();
-                if ((timeOut >= 0) && (waitingTime > timeOut)) {
+                if (!nonExistListStr.isEmpty() && pushDeps == null || pushDeps.length() == 0) {
                     queue(new CoordActionTimeOutXCommand(coordAction), 100);
                 }
                 else {
-                    queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()), getCoordInputCheckRequeueInterval());
+                    // Let CoordPushDependencyCheckXCommand queue the timeout
+                    queue(new CoordPushDependencyCheckXCommand(coordAction.getId()));
                 }
             }
         }
         catch (Exception e) {
+            if (isTimeout(currentTime)) {
+                queue(new CoordActionTimeOutXCommand(coordAction), 100);
+            }
             throw new CommandException(ErrorCode.E1021, e.getMessage(), e);
         }
         finally {
-            coordAction.setLastModifiedTime(new Date());
             cron.stop();
-            if(jpaService != null) {
-                try {
-                    if (isChangeInDependency) {
-                        jpaService.execute(new CoordActionUpdateForInputCheckJPAExecutor(coordAction));
-                    }
-                    else {
-                        jpaService.execute(new CoordActionUpdateForModifiedTimeJPAExecutor(coordAction));
-                    }
+            updateCoordAction(coordAction, isChangeInDependency);
+        }
+        return null;
+    }
+
+
+    private boolean isTimeout(Date currentTime) {
+        long waitingTime = (currentTime.getTime() - Math.max(coordAction.getNominalTime().getTime(), coordAction
+                .getCreatedTime().getTime()))
+                / (60 * 1000);
+        int timeOut = coordAction.getTimeOut();
+        return (timeOut >= 0) && (waitingTime > timeOut);
+    }
+
+    private void updateCoordAction(CoordinatorActionBean coordAction, boolean isChangeInDependency)
+            throws CommandException {
+        coordAction.setLastModifiedTime(new Date());
+        if (jpaService != null) {
+            try {
+                if (isChangeInDependency) {
+                    jpaService.execute(new CoordActionUpdateForInputCheckJPAExecutor(coordAction));
                 }
-                catch(JPAExecutorException jex) {
-                    throw new CommandException(ErrorCode.E1021, jex.getMessage(), jex);
+                else {
+                    jpaService.execute(new CoordActionUpdateForModifiedTimeJPAExecutor(coordAction));
                 }
             }
+            catch (JPAExecutorException jex) {
+                throw new CommandException(ErrorCode.E1021, jex.getMessage(), jex);
+            }
         }
-        return null;
     }
 
     /**
@@ -308,7 +327,7 @@ public class CoordActionInputCheckXComma
         Element outputList = eAction.getChild("output-events", eAction.getNamespace());
         if (outputList != null) {
             for (Element dEvent : (List<Element>) outputList.getChildren("data-out", eAction.getNamespace())) {
-                if (dEvent.getChild("unresolved-instances", dEvent.getNamespace()) != null) {
+                if (dEvent.getChild(CoordCommandUtils.UNRESOLVED_INST_TAG, dEvent.getNamespace()) != null) {
                     throw new CommandException(ErrorCode.E1006, "coord:latest()/future()",
                             " not permitted in output-event ");
                 }
@@ -331,11 +350,11 @@ public class CoordActionInputCheckXComma
     private boolean materializeUnresolvedEvent(List<Element> eDataEvents, Date nominalTime, Date actualTime,
             Configuration conf) throws Exception {
         for (Element dEvent : eDataEvents) {
-            if (dEvent.getChild("unresolved-instances", dEvent.getNamespace()) == null) {
+            if (dEvent.getChild(CoordCommandUtils.UNRESOLVED_INST_TAG, dEvent.getNamespace()) == null) {
                 continue;
             }
             ELEvaluator eval = CoordELEvaluator.createLazyEvaluator(actualTime, nominalTime, dEvent, conf);
-            String uresolvedInstance = dEvent.getChild("unresolved-instances", dEvent.getNamespace()).getTextTrim();
+            String uresolvedInstance = dEvent.getChild(CoordCommandUtils.UNRESOLVED_INST_TAG, dEvent.getNamespace()).getTextTrim();
             String unresolvedList[] = uresolvedInstance.split(CoordELFunctions.INSTANCE_SEPARATOR);
             StringBuffer resolvedTmp = new StringBuffer();
             for (int i = 0; i < unresolvedList.length; i++) {
@@ -360,7 +379,7 @@ public class CoordActionInputCheckXComma
                 uriInstance.addContent(resolvedTmp.toString());
                 dEvent.getContent().add(1, uriInstance);
             }
-            dEvent.removeChild("unresolved-instances", dEvent.getNamespace());
+            dEvent.removeChild(CoordCommandUtils.UNRESOLVED_INST_TAG, dEvent.getNamespace());
         }
 
         return true;
@@ -409,9 +428,10 @@ public class CoordActionInputCheckXComma
         nonExistList.delete(0, nonExistList.length());
         boolean allExists = true;
         String existSeparator = "", nonExistSeparator = "";
+        String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
         for (int i = 0; i < uriList.length; i++) {
             if (allExists) {
-                allExists = pathExists(uriList[i], conf);
+                allExists = pathExists(uriList[i], conf, user);
                 LOG.info("[" + actionId + "]::ActionInputCheck:: File:" + uriList[i] + ", Exists? :" + allExists);
             }
             if (allExists) {
@@ -434,19 +454,22 @@ public class CoordActionInputCheckXComma
      * @return true if path exists
      * @throws IOException thrown if unable to access the path
      */
-    protected boolean pathExists(String sPath, Configuration actionConf) throws IOException {
+    protected boolean pathExists(String sPath, Configuration actionConf, String user) throws IOException {
         LOG.debug("checking for the file " + sPath);
-        Path path = new Path(sPath);
-        String user = ParamChecker.notEmpty(actionConf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
         try {
-            HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
-            Configuration fsConf = has.createJobConf(path.toUri().getAuthority());
-            return has.createFileSystem(user, path.toUri(), fsConf).exists(path);
+            URI uri = new URI(sPath);
+            URIHandlerService service = Services.get().get(URIHandlerService.class);
+            URIHandler handler = service.getURIHandler(uri);
+            return handler.exists(uri, actionConf, user);
         }
-        catch (HadoopAccessorException e) {
+        catch (URIHandlerException e) {
             coordAction.setErrorCode(e.getErrorCode().toString());
             coordAction.setErrorMessage(e.getMessage());
             throw new IOException(e);
+        } catch (URISyntaxException e) {
+            coordAction.setErrorCode(ErrorCode.E0906.toString());
+            coordAction.setErrorMessage(e.getMessage());
+            throw new IOException(e);
         }
     }