You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by vi...@apache.org on 2013/03/07 00:28:57 UTC

svn commit: r1453618 - in /oozie/branches/branch-4.0: core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java release-log.txt

Author: virag
Date: Wed Mar  6 23:28:57 2013
New Revision: 1453618

URL: http://svn.apache.org/r1453618
Log:
OOZIE-1253 latest() gets resolved before all push dependencies are resolved (rohini via virag)

Modified:
    oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
    oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java
    oozie/branches/branch-4.0/release-log.txt

Modified: oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
URL: http://svn.apache.org/viewvc/oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java?rev=1453618&r1=1453617&r2=1453618&view=diff
==============================================================================
--- oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java (original)
+++ oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java Wed Mar  6 23:28:57 2013
@@ -130,6 +130,13 @@ public class CoordActionInputCheckXComma
                     + nonResolvedList.toString());
             // Updating the list of data dependencies that are available and those that are yet not
             boolean status = checkInput(actionXml, existList, nonExistList, actionConf);
+            String pushDeps = coordAction.getPushMissingDependencies();
+            // Resolve latest/future only when all current missingDependencies and
+            // pushMissingDependencies are met
+            if (status) {
+                status = (pushDeps == null || pushDeps.length() == 0) ? checkUnResolvedInput(actionXml, actionConf)
+                        : false;
+            }
             coordAction.setLastModifiedTime(currentTime);
             coordAction.setActionXml(actionXml.toString());
             if (nonResolvedList.length() > 0 && status == false) {
@@ -141,8 +148,7 @@ public class CoordActionInputCheckXComma
                 isChangeInDependency = true;
                 coordAction.setMissingDependencies(nonExistListStr);
             }
-            String pushDeps = coordAction.getPushMissingDependencies();
-            if (status == true && (pushDeps == null || pushDeps.length() == 0)) {
+            if (status) {
                 String newActionXml = resolveCoordConfiguration(actionXml, actionConf, actionId);
                 actionXml.replace(0, actionXml.length(), newActionXml);
                 coordAction.setActionXml(actionXml.toString());
@@ -242,11 +248,13 @@ public class CoordActionInputCheckXComma
     protected boolean checkInput(StringBuilder actionXml, StringBuilder existList, StringBuilder nonExistList,
             Configuration conf) throws Exception {
         Element eAction = XmlUtils.parseXml(actionXml.toString());
-        boolean allExist = checkResolvedUris(eAction, existList, nonExistList, conf);
-        if (allExist) {
-            LOG.debug("[" + actionId + "]::ActionInputCheck:: Checking Latest/future");
-            allExist = checkUnresolvedInstances(eAction, conf);
-        }
+        return checkResolvedUris(eAction, existList, nonExistList, conf);
+    }
+
+    private boolean checkUnResolvedInput(StringBuilder actionXml, Configuration conf) throws Exception {
+        Element eAction = XmlUtils.parseXml(actionXml.toString());
+        LOG.debug("[" + actionId + "]::ActionInputCheck:: Checking Latest/future");
+        boolean allExist = checkUnresolvedInstances(eAction, conf);
         if (allExist) {
             actionXml.replace(0, actionXml.length(), XmlUtils.prettyPrint(eAction).toString());
         }

Modified: oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java
URL: http://svn.apache.org/viewvc/oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java?rev=1453618&r1=1453617&r2=1453618&view=diff
==============================================================================
--- oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java (original)
+++ oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java Wed Mar  6 23:28:57 2013
@@ -37,6 +37,7 @@ import org.apache.oozie.executor.jpa.Coo
 import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordActionInsertJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordActionUpdateForInputCheckJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordActionUpdatePushInputCheckJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.service.CallableQueueService;
@@ -213,33 +214,22 @@ public class TestCoordActionInputCheckXC
         CoordinatorJobBean job = addRecordToCoordJobTable(jobId, startTime, endTime, "latest");
         new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
 
-        CoordinatorActionBean action = null;
         JPAService jpaService = Services.get().get(JPAService.class);
-        try {
-            action = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
-        }
-        catch (JPAExecutorException se) {
-            fail("Action ID " + job.getId() + "@1" + " was not stored properly in db");
-        }
-
+        CoordinatorActionBean action = jpaService
+                .execute(new CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
         assertEquals(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR + "${coord:latestRange(-3,0)}",
                 action.getMissingDependencies());
 
+        // Update action creation time
         String actionXML = action.getActionXml();
         String actionCreationTime = "2009-02-15T01:00" + TZ;
         actionXML = actionXML.replaceAll("action-actual-time=\".*\">", "action-actual-time=\"" + actionCreationTime
                 + "\">");
         action.setActionXml(actionXML);
         action.setCreatedTime(DateUtils.parseDateOozieTZ(actionCreationTime));
-
-        try {
-            jpaService.execute(new CoordActionUpdateForInputCheckJPAExecutor(action));
-            action = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
-            assertTrue(action.getActionXml().contains("action-actual-time=\"2009-02-15T01:00")) ;
-        }
-        catch (JPAExecutorException se) {
-            fail("Action ID " + job.getId() + "@1" + " was not stored properly in db");
-        }
+        jpaService.execute(new CoordActionUpdateForInputCheckJPAExecutor(action));
+        action = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
+        assertTrue(action.getActionXml().contains("action-actual-time=\"2009-02-15T01:00")) ;
 
         // providing some of the dataset dirs required as per coordinator specification with holes
         // before and after action creation time
@@ -253,13 +243,8 @@ public class TestCoordActionInputCheckXC
         new CoordActionInputCheckXCommand(job.getId() + "@1", job.getId()).call();
         //Sleep for sometime as it gets requeued with 10ms delay on failure to acquire write lock
         Thread.sleep(1000);
-        try {
-            action = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
-        }
-        catch (JPAExecutorException se) {
-            fail("Action ID " + job.getId() + "@1" + " was not stored properly in db");
-        }
 
+        action = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
         actionXML = action.getActionXml();
         assertEquals("", action.getMissingDependencies());
         // Datasets only before action creation/actual time should be picked up.
@@ -272,6 +257,81 @@ public class TestCoordActionInputCheckXC
         assertEquals(resolvedList, actionXML.substring(actionXML.indexOf("<uris>") + 6, actionXML.indexOf("</uris>")));
     }
 
+    public void testActionInputCheckLatestActionCreationTimeWithPushDependency() throws Exception {
+        Services.get().getConf().setBoolean(CoordELFunctions.LATEST_EL_USE_CURRENT_TIME, false);
+
+        String jobId = "0000000-" + new Date().getTime() + "-TestCoordActionInputCheckXCommand-C";
+        Date startTime = DateUtils.parseDateOozieTZ("2009-02-15T23:59" + TZ);
+        Date endTime = DateUtils.parseDateOozieTZ("2009-02-16T23:59" + TZ);
+        CoordinatorJobBean job = addRecordToCoordJobTable(jobId, startTime, endTime, "latest");
+        new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
+
+        // Set push missing dependency
+        JPAService jpaService = Services.get().get(JPAService.class);
+        CoordinatorActionBean action = jpaService
+                .execute(new CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
+        final String pushMissingDependency = "file://" + getTestCaseDir() + "/2009/02/05";
+        action.setPushMissingDependencies(pushMissingDependency);
+        jpaService.execute(new CoordActionUpdatePushInputCheckJPAExecutor(action));
+
+        // Update action creation time
+        String actionXML = action.getActionXml();
+        String actionCreationTime = "2009-02-15T01:00" + TZ;
+        actionXML = actionXML.replaceAll("action-actual-time=\".*\">", "action-actual-time=\"" + actionCreationTime
+                + "\">");
+        action.setActionXml(actionXML);
+        action.setCreatedTime(DateUtils.parseDateOozieTZ(actionCreationTime));
+        jpaService.execute(new CoordActionUpdateForInputCheckJPAExecutor(action));
+        action = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
+        assertTrue(action.getActionXml().contains("action-actual-time=\"2009-02-15T01:00")) ;
+
+        new CoordActionInputCheckXCommand(job.getId() + "@1", job.getId()).call();
+        new CoordPushDependencyCheckXCommand(job.getId() + "@1").call();
+        action = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
+        assertEquals(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR + "${coord:latestRange(-3,0)}",
+                action.getMissingDependencies());
+        assertEquals(pushMissingDependency, action.getPushMissingDependencies());
+
+        // providing some of the dataset dirs required as per coordinator specification with holes
+        // before and after action creation time
+        createDir(getTestCaseDir() + "/2009/03/05/");
+        createDir(getTestCaseDir() + "/2009/02/19/");
+        createDir(getTestCaseDir() + "/2009/02/12/");
+        createDir(getTestCaseDir() + "/2009/01/22/");
+        createDir(getTestCaseDir() + "/2009/01/08/");
+        createDir(getTestCaseDir() + "/2009/01/01/");
+
+        // Run input check after making latest available
+        new CoordActionInputCheckXCommand(job.getId() + "@1", job.getId()).call();
+        action = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
+        assertEquals(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR + "${coord:latestRange(-3,0)}",
+                action.getMissingDependencies());
+        assertEquals(pushMissingDependency, action.getPushMissingDependencies());
+
+        // Run input check after making push dependencies available
+        createDir(getTestCaseDir() + "/2009/02/05");
+        new CoordPushDependencyCheckXCommand(job.getId() + "@1").call();
+        action = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
+        assertEquals("", action.getPushMissingDependencies());
+        checkCoordAction(job.getId() + "@1", CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR
+                + "${coord:latestRange(-3,0)}", CoordinatorAction.Status.WAITING);
+        new CoordActionInputCheckXCommand(job.getId() + "@1", job.getId()).call();
+        //Sleep for sometime as it gets requeued with 10ms delay on failure to acquire write lock
+        Thread.sleep(1000);
+
+        action = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
+        assertEquals("", action.getMissingDependencies());
+        actionXML = action.getActionXml();
+        // Datasets only before action creation/actual time should be picked up.
+        String resolvedList = "file://" + getTestCaseDir() + "/2009/02/12" + CoordELFunctions.INSTANCE_SEPARATOR
+                + "file://" + getTestCaseDir() + "/2009/02/05" + CoordELFunctions.INSTANCE_SEPARATOR
+                + "file://" + getTestCaseDir() + "/2009/01/22" + CoordELFunctions.INSTANCE_SEPARATOR
+                + "file://" + getTestCaseDir() + "/2009/01/08";
+        System.out.println("Expected: " + resolvedList);
+        System.out.println("Actual: " +  actionXML.substring(actionXML.indexOf("<uris>") + 6, actionXML.indexOf("</uris>")));
+        assertEquals(resolvedList, actionXML.substring(actionXML.indexOf("<uris>") + 6, actionXML.indexOf("</uris>")));
+    }
+
     public void testActionInputCheckLatestCurrentTime() throws Exception {
         Services.get().getConf().setBoolean(CoordELFunctions.LATEST_EL_USE_CURRENT_TIME, true);
 
@@ -281,33 +341,22 @@ public class TestCoordActionInputCheckXC
         CoordinatorJobBean job = addRecordToCoordJobTable(jobId, startTime, endTime, "latest");
         new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
 
-        CoordinatorActionBean action = null;
         JPAService jpaService = Services.get().get(JPAService.class);
-        try {
-            action = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
-        }
-        catch (JPAExecutorException se) {
-            fail("Action ID " + job.getId() + "@1" + " was not stored properly in db");
-        }
-
+        CoordinatorActionBean action = jpaService
+                .execute(new CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
         assertEquals(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR + "${coord:latestRange(-3,0)}",
                 action.getMissingDependencies());
 
+        // Update action creation time
         String actionXML = action.getActionXml();
         String actionCreationTime = "2009-02-15T01:00" + TZ;
         actionXML = actionXML.replaceAll("action-actual-time=\".*\">", "action-actual-time=\"" + actionCreationTime
                 + "\">");
         action.setActionXml(actionXML);
         action.setCreatedTime(DateUtils.parseDateOozieTZ(actionCreationTime));
-
-        try {
-            jpaService.execute(new CoordActionUpdateForInputCheckJPAExecutor(action));
-            action = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
-            assertTrue(action.getActionXml().contains("action-actual-time=\"2009-02-15T01:00")) ;
-        }
-        catch (JPAExecutorException se) {
-            fail("Action ID " + job.getId() + "@1" + " was not stored properly in db");
-        }
+        jpaService.execute(new CoordActionUpdateForInputCheckJPAExecutor(action));
+        action = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
+        assertTrue(action.getActionXml().contains("action-actual-time=\"2009-02-15T01:00")) ;
 
         // providing some of the dataset dirs required as per coordinator
         // specification with holes
@@ -322,13 +371,8 @@ public class TestCoordActionInputCheckXC
         new CoordActionInputCheckXCommand(job.getId() + "@1", job.getId()).call();
         //Sleep for sometime as it gets requeued with 10ms delay on failure to acquire write lock
         Thread.sleep(1000);
-        try {
-            action = jpaService.execute(new CoordActionGetJPAExecutor(job.getId() + "@1"));
-        }
-        catch (JPAExecutorException se) {
-            fail("Action ID " + job.getId() + "@1" + " was not stored properly in db");
-        }
 
+        action = jpaService.execute(new CoordActionGetJPAExecutor(job.getId() + "@1"));
         actionXML = action.getActionXml();
         assertEquals("", action.getMissingDependencies());
         // Datasets should be picked up based on current time and not action creation/actual time.
@@ -339,6 +383,81 @@ public class TestCoordActionInputCheckXC
         assertEquals(resolvedList, actionXML.substring(actionXML.indexOf("<uris>") + 6, actionXML.indexOf("</uris>")));
     }
 
+    public void testActionInputCheckLatestCurrentTimeWithPushDependency() throws Exception {
+        Services.get().getConf().setBoolean(CoordELFunctions.LATEST_EL_USE_CURRENT_TIME, true);
+
+        String jobId = "0000000-" + new Date().getTime() + "-TestCoordActionInputCheckXCommand-C";
+        Date startTime = DateUtils.parseDateOozieTZ("2009-02-15T23:59" + TZ);
+        Date endTime = DateUtils.parseDateOozieTZ("2009-02-16T23:59" + TZ);
+        CoordinatorJobBean job = addRecordToCoordJobTable(jobId, startTime, endTime, "latest");
+        new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
+
+        // Set push missing dependency
+        JPAService jpaService = Services.get().get(JPAService.class);
+        CoordinatorActionBean action = jpaService
+                .execute(new CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
+        final String pushMissingDependency = "file://" + getTestCaseDir() + "/2009/02/05";
+        action.setPushMissingDependencies(pushMissingDependency);
+        jpaService.execute(new CoordActionUpdatePushInputCheckJPAExecutor(action));
+
+        // Update action creation time
+        String actionXML = action.getActionXml();
+        String actionCreationTime = "2009-02-15T01:00" + TZ;
+        actionXML = actionXML.replaceAll("action-actual-time=\".*\">", "action-actual-time=\"" + actionCreationTime
+                + "\">");
+        action.setActionXml(actionXML);
+        action.setCreatedTime(DateUtils.parseDateOozieTZ(actionCreationTime));
+        jpaService.execute(new CoordActionUpdateForInputCheckJPAExecutor(action));
+        action = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
+        assertTrue(action.getActionXml().contains("action-actual-time=\"2009-02-15T01:00")) ;
+
+        new CoordActionInputCheckXCommand(job.getId() + "@1", job.getId()).call();
+        new CoordPushDependencyCheckXCommand(job.getId() + "@1").call();
+        action = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
+        assertEquals(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR + "${coord:latestRange(-3,0)}",
+                action.getMissingDependencies());
+        assertEquals(pushMissingDependency, action.getPushMissingDependencies());
+
+        // providing some of the dataset dirs required as per coordinator specification with holes
+        // before and after action creation time
+        createDir(getTestCaseDir() + "/2009/03/05/");
+        createDir(getTestCaseDir() + "/2009/02/19/");
+        createDir(getTestCaseDir() + "/2009/02/12/");
+        createDir(getTestCaseDir() + "/2009/01/22/");
+        createDir(getTestCaseDir() + "/2009/01/08/");
+        createDir(getTestCaseDir() + "/2009/01/01/");
+
+        // Run input check after making latest available
+        new CoordActionInputCheckXCommand(job.getId() + "@1", job.getId()).call();
+        action = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
+        assertEquals(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR + "${coord:latestRange(-3,0)}",
+                action.getMissingDependencies());
+        assertEquals(pushMissingDependency, action.getPushMissingDependencies());
+
+        // Run input check after making push dependencies available
+        createDir(getTestCaseDir() + "/2009/02/05");
+        new CoordPushDependencyCheckXCommand(job.getId() + "@1").call();
+        action = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
+        assertEquals("", action.getPushMissingDependencies());
+        checkCoordAction(job.getId() + "@1", CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR
+                + "${coord:latestRange(-3,0)}", CoordinatorAction.Status.WAITING);
+        new CoordActionInputCheckXCommand(job.getId() + "@1", job.getId()).call();
+        //Sleep for sometime as it gets requeued with 10ms delay on failure to acquire write lock
+        Thread.sleep(1000);
+
+        action = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
+        assertEquals("", action.getMissingDependencies());
+        actionXML = action.getActionXml();
+        // Datasets should be picked up based on current time and not action creation/actual time.
+        String resolvedList = "file://" + getTestCaseDir() + "/2009/03/05" + CoordELFunctions.INSTANCE_SEPARATOR
+                + "file://" + getTestCaseDir() + "/2009/02/19" + CoordELFunctions.INSTANCE_SEPARATOR
+                + "file://" + getTestCaseDir() + "/2009/02/12" + CoordELFunctions.INSTANCE_SEPARATOR
+                + "file://" + getTestCaseDir() + "/2009/02/05";
+        System.out.println("Expected: " + resolvedList);
+        System.out.println("Actual: " +  actionXML.substring(actionXML.indexOf("<uris>") + 6, actionXML.indexOf("</uris>")));
+        assertEquals(resolvedList, actionXML.substring(actionXML.indexOf("<uris>") + 6, actionXML.indexOf("</uris>")));
+    }
+
     public void testActionInputCheckFuture() throws Exception {
         String jobId = "0000000-" + new Date().getTime() + "-TestCoordActionInputCheckXCommand-C";
         Date startTime = DateUtils.parseDateOozieTZ("2009-02-15T23:59" + TZ);

Modified: oozie/branches/branch-4.0/release-log.txt
URL: http://svn.apache.org/viewvc/oozie/branches/branch-4.0/release-log.txt?rev=1453618&r1=1453617&r2=1453618&view=diff
==============================================================================
--- oozie/branches/branch-4.0/release-log.txt (original)
+++ oozie/branches/branch-4.0/release-log.txt Wed Mar  6 23:28:57 2013
@@ -1,5 +1,6 @@
 -- Oozie 4.0.0 (unreleased)
 
+OOZIE-1253 latest() gets resolved before all push dependencies are resolved (rohini via virag)
 OOZIE-1251 Log messages for DependencyChecker class show wrong jobid and actionid (rohini via mona)
 OOZIE-1218 Create a HCatalog Integration Guide (rohini via virag)
 OOZIE-1250 Coord action timeout not happening when there is a exception (rohini via mona)