You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by rk...@apache.org on 2013/02/04 19:02:57 UTC

svn commit: r1442252 - in /oozie/branches/branch-3.3: ./ core/src/main/java/org/apache/oozie/coord/ core/src/main/resources/ core/src/test/java/org/apache/oozie/command/coord/

Author: rkanter
Date: Mon Feb  4 18:02:57 2013
New Revision: 1442252

URL: http://svn.apache.org/viewvc?rev=1442252&view=rev
Log:
OOZIE-1071 latest EL function is based on action materialization time (rohini via virag)

Modified:
    oozie/branches/branch-3.3/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java
    oozie/branches/branch-3.3/core/src/main/resources/oozie-default.xml
    oozie/branches/branch-3.3/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java
    oozie/branches/branch-3.3/release-log.txt

Modified: oozie/branches/branch-3.3/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java
URL: http://svn.apache.org/viewvc/oozie/branches/branch-3.3/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java?rev=1442252&r1=1442251&r2=1442252&view=diff
==============================================================================
--- oozie/branches/branch-3.3/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java (original)
+++ oozie/branches/branch-3.3/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java Mon Feb  4 18:02:57 2013
@@ -45,6 +45,7 @@ public class CoordELFunctions {
     final private static String DATASET = "oozie.coord.el.dataset.bean";
     final private static String COORD_ACTION = "oozie.coord.el.app.bean";
     final public static String CONFIGURATION = "oozie.coord.el.conf";
+    final public static String LATEST_EL_USE_CURRENT_TIME = "oozie.service.ELService.latest-el.use-current-time";
     // INSTANCE_SEPARATOR is used to separate multiple directories into one tag.
     final public static String INSTANCE_SEPARATOR = "#";
     final public static String DIR_SEPARATOR = ",";
@@ -946,7 +947,14 @@ public class CoordELFunctions {
         int datasetFrequency = (int) getDSFrequency();// in minutes
         TimeUnit dsTimeUnit = getDSTimeUnit();
         int[] instCount = new int[1];
-        Calendar nominalInstanceCal = getCurrentInstance(getActualTime(), instCount);
+        boolean useCurrentTime = Services.get().getConf().getBoolean(LATEST_EL_USE_CURRENT_TIME, false);
+        Calendar nominalInstanceCal;
+        if (useCurrentTime) {
+            nominalInstanceCal = getCurrentInstance(new Date(), instCount);
+        }
+        else {
+            nominalInstanceCal = getCurrentInstance(getActualTime(), instCount);
+        }
         StringBuilder resolvedInstances = new StringBuilder();
         StringBuilder resolvedURIPaths = new StringBuilder();
         if (nominalInstanceCal != null) {
@@ -974,7 +982,7 @@ public class CoordELFunctions {
                 }
                 if (isPathAvailable(pathWithDoneFlag, user, null, conf)) {
                     LOG.debug("Found latest(" + available + "): " + pathWithDoneFlag);
-                    if (available == endOffset) {
+                    if (available == startOffset) {
                         LOG.debug("Matched latest(" + available + "): " + pathWithDoneFlag);
                         resolved = true;
                         resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal));
@@ -982,7 +990,7 @@ public class CoordELFunctions {
                         retVal = resolvedInstances.toString();
                         eval.setVariable("resolved_path", resolvedURIPaths.toString());
                         break;
-                    } else if (available <= startOffset) {
+                    } else if (available <= endOffset) {
                         LOG.debug("Matched latest(" + available + "): " + pathWithDoneFlag);
                         resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal)).append(INSTANCE_SEPARATOR);
                         resolvedURIPaths.append(uriPath).append(INSTANCE_SEPARATOR);

Modified: oozie/branches/branch-3.3/core/src/main/resources/oozie-default.xml
URL: http://svn.apache.org/viewvc/oozie/branches/branch-3.3/core/src/main/resources/oozie-default.xml?rev=1442252&r1=1442251&r2=1442252&view=diff
==============================================================================
--- oozie/branches/branch-3.3/core/src/main/resources/oozie-default.xml (original)
+++ oozie/branches/branch-3.3/core/src/main/resources/oozie-default.xml Mon Feb  4 18:02:57 2013
@@ -1011,6 +1011,14 @@
         </description>
     </property>
 
+    <property>
+        <name>oozie.service.ELService.latest-el.use-current-time</name>
+        <value>false</value>
+        <description>
+            Determine whether to use the current time to determine the latest dependency or the action creation time.
+            This is for backward compatibility with older oozie behaviour.
+        </description>
+    </property>
 
     <!-- UUIDService -->
 

Modified: oozie/branches/branch-3.3/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java
URL: http://svn.apache.org/viewvc/oozie/branches/branch-3.3/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java?rev=1442252&r1=1442251&r2=1442252&view=diff
==============================================================================
--- oozie/branches/branch-3.3/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java (original)
+++ oozie/branches/branch-3.3/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java Mon Feb  4 18:02:57 2013
@@ -32,8 +32,11 @@ import org.apache.oozie.client.OozieClie
 import org.apache.oozie.client.CoordinatorJob.Execution;
 import org.apache.oozie.client.CoordinatorJob.Timeunit;
 import org.apache.oozie.command.CommandException;
+import org.apache.oozie.coord.CoordELFunctions;
+import org.apache.oozie.executor.jpa.CoordActionGetForInputCheckJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordActionInsertJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordActionUpdateForInputCheckJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.service.CallableQueueService;
@@ -198,18 +201,19 @@ public class TestCoordActionInputCheckXC
         }
     }
 
-    public void testActionInputCheckLatest() throws Exception {
+    public void testActionInputCheckLatestActionCreationTime() throws Exception {
+        Services.get().getConf().setBoolean(CoordELFunctions.LATEST_EL_USE_CURRENT_TIME, false);
+
         String jobId = "0000000-" + new Date().getTime() + "-TestCoordActionInputCheckXCommand-C";
         Date startTime = DateUtils.parseDateOozieTZ("2009-02-15T23:59" + TZ);
         Date endTime = DateUtils.parseDateOozieTZ("2009-02-16T23:59" + TZ);
         CoordinatorJobBean job = addRecordToCoordJobTable(jobId, startTime, endTime, "latest");
         new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
 
-        new CoordActionInputCheckXCommand(job.getId() + "@1", job.getId()).call();
         CoordinatorActionBean action = null;
         JPAService jpaService = Services.get().get(JPAService.class);
         try {
-            action = jpaService.execute(new CoordActionGetJPAExecutor(job.getId() + "@1"));
+            action = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
         }
         catch (JPAExecutorException se) {
             fail("Action ID " + job.getId() + "@1" + " was not stored properly in db");
@@ -217,13 +221,102 @@ public class TestCoordActionInputCheckXC
 
         assertEquals(";${coord:latestRange(-3,0)}", action.getMissingDependencies());
 
+        String actionXML = action.getActionXml();
+        String actionCreationTime = "2009-02-15T01:00" + TZ;
+        actionXML = actionXML.replaceAll("action-actual-time=\".*\">", "action-actual-time=\"" + actionCreationTime
+                + "\">");
+        action.setActionXml(actionXML);
+        action.setCreatedTime(DateUtils.parseDateOozieTZ(actionCreationTime));
+
+        try {
+            jpaService.execute(new CoordActionUpdateForInputCheckJPAExecutor(action));
+            action = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
+            assertTrue(action.getActionXml().contains("action-actual-time=\"2009-02-15T01:00")) ;
+        }
+        catch (JPAExecutorException se) {
+            fail("Action ID " + job.getId() + "@1" + " was not stored properly in db");
+        }
+
         // providing some of the dataset dirs required as per coordinator specification with holes
+        // before and after action creation time
+        createDir(getTestCaseDir() + "/2009/03/05/");
+        createDir(getTestCaseDir() + "/2009/02/19/");
+        createDir(getTestCaseDir() + "/2009/02/12/");
+        createDir(getTestCaseDir() + "/2009/02/05/");
+        createDir(getTestCaseDir() + "/2009/01/22/");
+        createDir(getTestCaseDir() + "/2009/01/08/");
+
+        new CoordActionInputCheckXCommand(job.getId() + "@1", job.getId()).call();
+        //Sleep for sometime as it gets requeued with 10ms delay on failure to acquire write lock
+        Thread.sleep(1000);
+        try {
+            action = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
+        }
+        catch (JPAExecutorException se) {
+            fail("Action ID " + job.getId() + "@1" + " was not stored properly in db");
+        }
+
+        actionXML = action.getActionXml();
+        assertEquals("", action.getMissingDependencies());
+        // Datasets only before action creation/actual time should be picked up.
+        String resolvedList = "file://" + getTestCaseDir() + "/2009/02/12" + CoordELFunctions.INSTANCE_SEPARATOR
+                + "file://" + getTestCaseDir() + "/2009/02/05" + CoordELFunctions.INSTANCE_SEPARATOR
+                + "file://" + getTestCaseDir() + "/2009/01/22" + CoordELFunctions.INSTANCE_SEPARATOR
+                + "file://" + getTestCaseDir() + "/2009/01/08";
+        System.out.println("Expected: " + resolvedList);
+        System.out.println("Actual: " +  actionXML.substring(actionXML.indexOf("<uris>") + 6, actionXML.indexOf("</uris>")));
+        assertEquals(resolvedList, actionXML.substring(actionXML.indexOf("<uris>") + 6, actionXML.indexOf("</uris>")));
+    }
+
+    public void testActionInputCheckLatestCurrentTime() throws Exception {
+        Services.get().getConf().setBoolean(CoordELFunctions.LATEST_EL_USE_CURRENT_TIME, true);
+
+        String jobId = "0000000-" + new Date().getTime() + "-TestCoordActionInputCheckXCommand-C";
+        Date startTime = DateUtils.parseDateOozieTZ("2009-02-15T23:59" + TZ);
+        Date endTime = DateUtils.parseDateOozieTZ("2009-02-16T23:59" + TZ);
+        CoordinatorJobBean job = addRecordToCoordJobTable(jobId, startTime, endTime, "latest");
+        new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
+
+        CoordinatorActionBean action = null;
+        JPAService jpaService = Services.get().get(JPAService.class);
+        try {
+            action = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
+        }
+        catch (JPAExecutorException se) {
+            fail("Action ID " + job.getId() + "@1" + " was not stored properly in db");
+        }
+
+        assertEquals(";${coord:latestRange(-3,0)}", action.getMissingDependencies());
+
+        String actionXML = action.getActionXml();
+        String actionCreationTime = "2009-02-15T01:00" + TZ;
+        actionXML = actionXML.replaceAll("action-actual-time=\".*\">", "action-actual-time=\"" + actionCreationTime
+                + "\">");
+        action.setActionXml(actionXML);
+        action.setCreatedTime(DateUtils.parseDateOozieTZ(actionCreationTime));
+
+        try {
+            jpaService.execute(new CoordActionUpdateForInputCheckJPAExecutor(action));
+            action = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
+            assertTrue(action.getActionXml().contains("action-actual-time=\"2009-02-15T01:00")) ;
+        }
+        catch (JPAExecutorException se) {
+            fail("Action ID " + job.getId() + "@1" + " was not stored properly in db");
+        }
+
+        // providing some of the dataset dirs required as per coordinator
+        // specification with holes
+        // before and after action creation time
+        createDir(getTestCaseDir() + "/2009/03/05/");
+        createDir(getTestCaseDir() + "/2009/02/19/");
         createDir(getTestCaseDir() + "/2009/02/12/");
         createDir(getTestCaseDir() + "/2009/02/05/");
         createDir(getTestCaseDir() + "/2009/01/22/");
         createDir(getTestCaseDir() + "/2009/01/08/");
 
         new CoordActionInputCheckXCommand(job.getId() + "@1", job.getId()).call();
+        //Sleep for sometime as it gets requeued with 10ms delay on failure to acquire write lock
+        Thread.sleep(1000);
         try {
             action = jpaService.execute(new CoordActionGetJPAExecutor(job.getId() + "@1"));
         }
@@ -231,7 +324,14 @@ public class TestCoordActionInputCheckXC
             fail("Action ID " + job.getId() + "@1" + " was not stored properly in db");
         }
 
+        actionXML = action.getActionXml();
         assertEquals("", action.getMissingDependencies());
+        // Datasets should be picked up based on current time and not action creation/actual time.
+        String resolvedList = "file://" + getTestCaseDir() + "/2009/03/05" + CoordELFunctions.INSTANCE_SEPARATOR
+                + "file://" + getTestCaseDir() + "/2009/02/19" + CoordELFunctions.INSTANCE_SEPARATOR
+                + "file://" + getTestCaseDir() + "/2009/02/12" + CoordELFunctions.INSTANCE_SEPARATOR
+                + "file://" + getTestCaseDir() + "/2009/02/05";
+        assertEquals(resolvedList, actionXML.substring(actionXML.indexOf("<uris>") + 6, actionXML.indexOf("</uris>")));
     }
 
     public void testActionInputCheckFuture() throws Exception {
@@ -270,6 +370,12 @@ public class TestCoordActionInputCheckXC
         }
 
         assertEquals("", action.getMissingDependencies());
+        String actionXML = action.getActionXml();
+        String resolvedList = "file://" + getTestCaseDir() + "/2009/02/12" + CoordELFunctions.INSTANCE_SEPARATOR
+                + "file://" + getTestCaseDir() + "/2009/02/26" + CoordELFunctions.INSTANCE_SEPARATOR
+                + "file://" + getTestCaseDir() + "/2009/03/05" + CoordELFunctions.INSTANCE_SEPARATOR
+                + "file://" + getTestCaseDir() + "/2009/03/12";
+        assertEquals(resolvedList, actionXML.substring(actionXML.indexOf("<uris>") + 6, actionXML.indexOf("</uris>")));
     }
     /**
      * Testing a non existing namenode path

Modified: oozie/branches/branch-3.3/release-log.txt
URL: http://svn.apache.org/viewvc/oozie/branches/branch-3.3/release-log.txt?rev=1442252&r1=1442251&r2=1442252&view=diff
==============================================================================
--- oozie/branches/branch-3.3/release-log.txt (original)
+++ oozie/branches/branch-3.3/release-log.txt Mon Feb  4 18:02:57 2013
@@ -1,5 +1,6 @@
 -- Oozie 3.3.2 (unreleased)
 
+OOZIE-1071 latest EL function is based on action materialization time (rohini via virag)
 OOZIE-1114 Some tests don't use the Services singleton properly (rkanter)
 OOZIE-1101 Fix log messages that contain {0} or similar (rkanter)
 OOZIE-1073 Optimize latest and future EL resolution in case of start-instance and end-instance (rohini via virag)