You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by vi...@apache.org on 2013/03/07 00:28:57 UTC
svn commit: r1453618 - in /oozie/branches/branch-4.0:
core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java
release-log.txt
Author: virag
Date: Wed Mar 6 23:28:57 2013
New Revision: 1453618
URL: http://svn.apache.org/r1453618
Log:
OOZIE-1253 latest() gets resolved before all push dependencies are resolved (rohini via virag)
Modified:
oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java
oozie/branches/branch-4.0/release-log.txt
Modified: oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
URL: http://svn.apache.org/viewvc/oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java?rev=1453618&r1=1453617&r2=1453618&view=diff
==============================================================================
--- oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java (original)
+++ oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java Wed Mar 6 23:28:57 2013
@@ -130,6 +130,13 @@ public class CoordActionInputCheckXComma
+ nonResolvedList.toString());
// Updating the list of data dependencies that are available and those that are yet not
boolean status = checkInput(actionXml, existList, nonExistList, actionConf);
+ String pushDeps = coordAction.getPushMissingDependencies();
+ // Resolve latest/future only when all current missingDependencies and
+ // pushMissingDependencies are met
+ if (status) {
+ status = (pushDeps == null || pushDeps.length() == 0) ? checkUnResolvedInput(actionXml, actionConf)
+ : false;
+ }
coordAction.setLastModifiedTime(currentTime);
coordAction.setActionXml(actionXml.toString());
if (nonResolvedList.length() > 0 && status == false) {
@@ -141,8 +148,7 @@ public class CoordActionInputCheckXComma
isChangeInDependency = true;
coordAction.setMissingDependencies(nonExistListStr);
}
- String pushDeps = coordAction.getPushMissingDependencies();
- if (status == true && (pushDeps == null || pushDeps.length() == 0)) {
+ if (status) {
String newActionXml = resolveCoordConfiguration(actionXml, actionConf, actionId);
actionXml.replace(0, actionXml.length(), newActionXml);
coordAction.setActionXml(actionXml.toString());
@@ -242,11 +248,13 @@ public class CoordActionInputCheckXComma
protected boolean checkInput(StringBuilder actionXml, StringBuilder existList, StringBuilder nonExistList,
Configuration conf) throws Exception {
Element eAction = XmlUtils.parseXml(actionXml.toString());
- boolean allExist = checkResolvedUris(eAction, existList, nonExistList, conf);
- if (allExist) {
- LOG.debug("[" + actionId + "]::ActionInputCheck:: Checking Latest/future");
- allExist = checkUnresolvedInstances(eAction, conf);
- }
+ return checkResolvedUris(eAction, existList, nonExistList, conf);
+ }
+
+ private boolean checkUnResolvedInput(StringBuilder actionXml, Configuration conf) throws Exception {
+ Element eAction = XmlUtils.parseXml(actionXml.toString());
+ LOG.debug("[" + actionId + "]::ActionInputCheck:: Checking Latest/future");
+ boolean allExist = checkUnresolvedInstances(eAction, conf);
if (allExist) {
actionXml.replace(0, actionXml.length(), XmlUtils.prettyPrint(eAction).toString());
}
Modified: oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java
URL: http://svn.apache.org/viewvc/oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java?rev=1453618&r1=1453617&r2=1453618&view=diff
==============================================================================
--- oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java (original)
+++ oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java Wed Mar 6 23:28:57 2013
@@ -37,6 +37,7 @@ import org.apache.oozie.executor.jpa.Coo
import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
import org.apache.oozie.executor.jpa.CoordActionInsertJPAExecutor;
import org.apache.oozie.executor.jpa.CoordActionUpdateForInputCheckJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordActionUpdatePushInputCheckJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.service.CallableQueueService;
@@ -213,33 +214,22 @@ public class TestCoordActionInputCheckXC
CoordinatorJobBean job = addRecordToCoordJobTable(jobId, startTime, endTime, "latest");
new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
- CoordinatorActionBean action = null;
JPAService jpaService = Services.get().get(JPAService.class);
- try {
- action = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
- }
- catch (JPAExecutorException se) {
- fail("Action ID " + job.getId() + "@1" + " was not stored properly in db");
- }
-
+ CoordinatorActionBean action = jpaService
+ .execute(new CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
assertEquals(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR + "${coord:latestRange(-3,0)}",
action.getMissingDependencies());
+ // Update action creation time
String actionXML = action.getActionXml();
String actionCreationTime = "2009-02-15T01:00" + TZ;
actionXML = actionXML.replaceAll("action-actual-time=\".*\">", "action-actual-time=\"" + actionCreationTime
+ "\">");
action.setActionXml(actionXML);
action.setCreatedTime(DateUtils.parseDateOozieTZ(actionCreationTime));
-
- try {
- jpaService.execute(new CoordActionUpdateForInputCheckJPAExecutor(action));
- action = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
- assertTrue(action.getActionXml().contains("action-actual-time=\"2009-02-15T01:00")) ;
- }
- catch (JPAExecutorException se) {
- fail("Action ID " + job.getId() + "@1" + " was not stored properly in db");
- }
+ jpaService.execute(new CoordActionUpdateForInputCheckJPAExecutor(action));
+ action = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
+ assertTrue(action.getActionXml().contains("action-actual-time=\"2009-02-15T01:00")) ;
// providing some of the dataset dirs required as per coordinator specification with holes
// before and after action creation time
@@ -253,13 +243,8 @@ public class TestCoordActionInputCheckXC
new CoordActionInputCheckXCommand(job.getId() + "@1", job.getId()).call();
//Sleep for sometime as it gets requeued with 10ms delay on failure to acquire write lock
Thread.sleep(1000);
- try {
- action = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
- }
- catch (JPAExecutorException se) {
- fail("Action ID " + job.getId() + "@1" + " was not stored properly in db");
- }
+ action = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
actionXML = action.getActionXml();
assertEquals("", action.getMissingDependencies());
// Datasets only before action creation/actual time should be picked up.
@@ -272,6 +257,81 @@ public class TestCoordActionInputCheckXC
assertEquals(resolvedList, actionXML.substring(actionXML.indexOf("<uris>") + 6, actionXML.indexOf("</uris>")));
}
+ public void testActionInputCheckLatestActionCreationTimeWithPushDependency() throws Exception {
+ Services.get().getConf().setBoolean(CoordELFunctions.LATEST_EL_USE_CURRENT_TIME, false);
+
+ String jobId = "0000000-" + new Date().getTime() + "-TestCoordActionInputCheckXCommand-C";
+ Date startTime = DateUtils.parseDateOozieTZ("2009-02-15T23:59" + TZ);
+ Date endTime = DateUtils.parseDateOozieTZ("2009-02-16T23:59" + TZ);
+ CoordinatorJobBean job = addRecordToCoordJobTable(jobId, startTime, endTime, "latest");
+ new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
+
+ // Set push missing dependency
+ JPAService jpaService = Services.get().get(JPAService.class);
+ CoordinatorActionBean action = jpaService
+ .execute(new CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
+ final String pushMissingDependency = "file://" + getTestCaseDir() + "/2009/02/05";
+ action.setPushMissingDependencies(pushMissingDependency);
+ jpaService.execute(new CoordActionUpdatePushInputCheckJPAExecutor(action));
+
+ // Update action creation time
+ String actionXML = action.getActionXml();
+ String actionCreationTime = "2009-02-15T01:00" + TZ;
+ actionXML = actionXML.replaceAll("action-actual-time=\".*\">", "action-actual-time=\"" + actionCreationTime
+ + "\">");
+ action.setActionXml(actionXML);
+ action.setCreatedTime(DateUtils.parseDateOozieTZ(actionCreationTime));
+ jpaService.execute(new CoordActionUpdateForInputCheckJPAExecutor(action));
+ action = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
+ assertTrue(action.getActionXml().contains("action-actual-time=\"2009-02-15T01:00")) ;
+
+ new CoordActionInputCheckXCommand(job.getId() + "@1", job.getId()).call();
+ new CoordPushDependencyCheckXCommand(job.getId() + "@1").call();
+ action = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
+ assertEquals(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR + "${coord:latestRange(-3,0)}",
+ action.getMissingDependencies());
+ assertEquals(pushMissingDependency, action.getPushMissingDependencies());
+
+ // providing some of the dataset dirs required as per coordinator specification with holes
+ // before and after action creation time
+ createDir(getTestCaseDir() + "/2009/03/05/");
+ createDir(getTestCaseDir() + "/2009/02/19/");
+ createDir(getTestCaseDir() + "/2009/02/12/");
+ createDir(getTestCaseDir() + "/2009/01/22/");
+ createDir(getTestCaseDir() + "/2009/01/08/");
+ createDir(getTestCaseDir() + "/2009/01/01/");
+
+ // Run input check after making latest available
+ new CoordActionInputCheckXCommand(job.getId() + "@1", job.getId()).call();
+ action = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
+ assertEquals(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR + "${coord:latestRange(-3,0)}",
+ action.getMissingDependencies());
+ assertEquals(pushMissingDependency, action.getPushMissingDependencies());
+
+ // Run input check after making push dependencies available
+ createDir(getTestCaseDir() + "/2009/02/05");
+ new CoordPushDependencyCheckXCommand(job.getId() + "@1").call();
+ action = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
+ assertEquals("", action.getPushMissingDependencies());
+ checkCoordAction(job.getId() + "@1", CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR
+ + "${coord:latestRange(-3,0)}", CoordinatorAction.Status.WAITING);
+ new CoordActionInputCheckXCommand(job.getId() + "@1", job.getId()).call();
+ //Sleep for sometime as it gets requeued with 10ms delay on failure to acquire write lock
+ Thread.sleep(1000);
+
+ action = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
+ assertEquals("", action.getMissingDependencies());
+ actionXML = action.getActionXml();
+ // Datasets only before action creation/actual time should be picked up.
+ String resolvedList = "file://" + getTestCaseDir() + "/2009/02/12" + CoordELFunctions.INSTANCE_SEPARATOR
+ + "file://" + getTestCaseDir() + "/2009/02/05" + CoordELFunctions.INSTANCE_SEPARATOR
+ + "file://" + getTestCaseDir() + "/2009/01/22" + CoordELFunctions.INSTANCE_SEPARATOR
+ + "file://" + getTestCaseDir() + "/2009/01/08";
+ System.out.println("Expected: " + resolvedList);
+ System.out.println("Actual: " + actionXML.substring(actionXML.indexOf("<uris>") + 6, actionXML.indexOf("</uris>")));
+ assertEquals(resolvedList, actionXML.substring(actionXML.indexOf("<uris>") + 6, actionXML.indexOf("</uris>")));
+ }
+
public void testActionInputCheckLatestCurrentTime() throws Exception {
Services.get().getConf().setBoolean(CoordELFunctions.LATEST_EL_USE_CURRENT_TIME, true);
@@ -281,33 +341,22 @@ public class TestCoordActionInputCheckXC
CoordinatorJobBean job = addRecordToCoordJobTable(jobId, startTime, endTime, "latest");
new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
- CoordinatorActionBean action = null;
JPAService jpaService = Services.get().get(JPAService.class);
- try {
- action = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
- }
- catch (JPAExecutorException se) {
- fail("Action ID " + job.getId() + "@1" + " was not stored properly in db");
- }
-
+ CoordinatorActionBean action = jpaService
+ .execute(new CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
assertEquals(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR + "${coord:latestRange(-3,0)}",
action.getMissingDependencies());
+ // Update action creation time
String actionXML = action.getActionXml();
String actionCreationTime = "2009-02-15T01:00" + TZ;
actionXML = actionXML.replaceAll("action-actual-time=\".*\">", "action-actual-time=\"" + actionCreationTime
+ "\">");
action.setActionXml(actionXML);
action.setCreatedTime(DateUtils.parseDateOozieTZ(actionCreationTime));
-
- try {
- jpaService.execute(new CoordActionUpdateForInputCheckJPAExecutor(action));
- action = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
- assertTrue(action.getActionXml().contains("action-actual-time=\"2009-02-15T01:00")) ;
- }
- catch (JPAExecutorException se) {
- fail("Action ID " + job.getId() + "@1" + " was not stored properly in db");
- }
+ jpaService.execute(new CoordActionUpdateForInputCheckJPAExecutor(action));
+ action = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
+ assertTrue(action.getActionXml().contains("action-actual-time=\"2009-02-15T01:00")) ;
// providing some of the dataset dirs required as per coordinator
// specification with holes
@@ -322,13 +371,8 @@ public class TestCoordActionInputCheckXC
new CoordActionInputCheckXCommand(job.getId() + "@1", job.getId()).call();
//Sleep for sometime as it gets requeued with 10ms delay on failure to acquire write lock
Thread.sleep(1000);
- try {
- action = jpaService.execute(new CoordActionGetJPAExecutor(job.getId() + "@1"));
- }
- catch (JPAExecutorException se) {
- fail("Action ID " + job.getId() + "@1" + " was not stored properly in db");
- }
+ action = jpaService.execute(new CoordActionGetJPAExecutor(job.getId() + "@1"));
actionXML = action.getActionXml();
assertEquals("", action.getMissingDependencies());
// Datasets should be picked up based on current time and not action creation/actual time.
@@ -339,6 +383,81 @@ public class TestCoordActionInputCheckXC
assertEquals(resolvedList, actionXML.substring(actionXML.indexOf("<uris>") + 6, actionXML.indexOf("</uris>")));
}
+ public void testActionInputCheckLatestCurrentTimeWithPushDependency() throws Exception {
+ Services.get().getConf().setBoolean(CoordELFunctions.LATEST_EL_USE_CURRENT_TIME, true);
+
+ String jobId = "0000000-" + new Date().getTime() + "-TestCoordActionInputCheckXCommand-C";
+ Date startTime = DateUtils.parseDateOozieTZ("2009-02-15T23:59" + TZ);
+ Date endTime = DateUtils.parseDateOozieTZ("2009-02-16T23:59" + TZ);
+ CoordinatorJobBean job = addRecordToCoordJobTable(jobId, startTime, endTime, "latest");
+ new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
+
+ // Set push missing dependency
+ JPAService jpaService = Services.get().get(JPAService.class);
+ CoordinatorActionBean action = jpaService
+ .execute(new CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
+ final String pushMissingDependency = "file://" + getTestCaseDir() + "/2009/02/05";
+ action.setPushMissingDependencies(pushMissingDependency);
+ jpaService.execute(new CoordActionUpdatePushInputCheckJPAExecutor(action));
+
+ // Update action creation time
+ String actionXML = action.getActionXml();
+ String actionCreationTime = "2009-02-15T01:00" + TZ;
+ actionXML = actionXML.replaceAll("action-actual-time=\".*\">", "action-actual-time=\"" + actionCreationTime
+ + "\">");
+ action.setActionXml(actionXML);
+ action.setCreatedTime(DateUtils.parseDateOozieTZ(actionCreationTime));
+ jpaService.execute(new CoordActionUpdateForInputCheckJPAExecutor(action));
+ action = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
+ assertTrue(action.getActionXml().contains("action-actual-time=\"2009-02-15T01:00")) ;
+
+ new CoordActionInputCheckXCommand(job.getId() + "@1", job.getId()).call();
+ new CoordPushDependencyCheckXCommand(job.getId() + "@1").call();
+ action = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
+ assertEquals(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR + "${coord:latestRange(-3,0)}",
+ action.getMissingDependencies());
+ assertEquals(pushMissingDependency, action.getPushMissingDependencies());
+
+ // providing some of the dataset dirs required as per coordinator specification with holes
+ // before and after action creation time
+ createDir(getTestCaseDir() + "/2009/03/05/");
+ createDir(getTestCaseDir() + "/2009/02/19/");
+ createDir(getTestCaseDir() + "/2009/02/12/");
+ createDir(getTestCaseDir() + "/2009/01/22/");
+ createDir(getTestCaseDir() + "/2009/01/08/");
+ createDir(getTestCaseDir() + "/2009/01/01/");
+
+ // Run input check after making latest available
+ new CoordActionInputCheckXCommand(job.getId() + "@1", job.getId()).call();
+ action = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
+ assertEquals(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR + "${coord:latestRange(-3,0)}",
+ action.getMissingDependencies());
+ assertEquals(pushMissingDependency, action.getPushMissingDependencies());
+
+ // Run input check after making push dependencies available
+ createDir(getTestCaseDir() + "/2009/02/05");
+ new CoordPushDependencyCheckXCommand(job.getId() + "@1").call();
+ action = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
+ assertEquals("", action.getPushMissingDependencies());
+ checkCoordAction(job.getId() + "@1", CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR
+ + "${coord:latestRange(-3,0)}", CoordinatorAction.Status.WAITING);
+ new CoordActionInputCheckXCommand(job.getId() + "@1", job.getId()).call();
+ //Sleep for sometime as it gets requeued with 10ms delay on failure to acquire write lock
+ Thread.sleep(1000);
+
+ action = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
+ assertEquals("", action.getMissingDependencies());
+ actionXML = action.getActionXml();
+ // Datasets should be picked up based on current time and not action creation/actual time.
+ String resolvedList = "file://" + getTestCaseDir() + "/2009/03/05" + CoordELFunctions.INSTANCE_SEPARATOR
+ + "file://" + getTestCaseDir() + "/2009/02/19" + CoordELFunctions.INSTANCE_SEPARATOR
+ + "file://" + getTestCaseDir() + "/2009/02/12" + CoordELFunctions.INSTANCE_SEPARATOR
+ + "file://" + getTestCaseDir() + "/2009/02/05";
+ System.out.println("Expected: " + resolvedList);
+ System.out.println("Actual: " + actionXML.substring(actionXML.indexOf("<uris>") + 6, actionXML.indexOf("</uris>")));
+ assertEquals(resolvedList, actionXML.substring(actionXML.indexOf("<uris>") + 6, actionXML.indexOf("</uris>")));
+ }
+
public void testActionInputCheckFuture() throws Exception {
String jobId = "0000000-" + new Date().getTime() + "-TestCoordActionInputCheckXCommand-C";
Date startTime = DateUtils.parseDateOozieTZ("2009-02-15T23:59" + TZ);
Modified: oozie/branches/branch-4.0/release-log.txt
URL: http://svn.apache.org/viewvc/oozie/branches/branch-4.0/release-log.txt?rev=1453618&r1=1453617&r2=1453618&view=diff
==============================================================================
--- oozie/branches/branch-4.0/release-log.txt (original)
+++ oozie/branches/branch-4.0/release-log.txt Wed Mar 6 23:28:57 2013
@@ -1,5 +1,6 @@
-- Oozie 4.0.0 (unreleased)
+OOZIE-1253 latest() gets resolved before all push dependencies are resolved (rohini via virag)
OOZIE-1251 Log messages for DependencyChecker class show wrong jobid and actionid (rohini via mona)
OOZIE-1218 Create a HCatalog Integration Guide (rohini via virag)
OOZIE-1250 Coord action timeout not happening when there is a exception (rohini via mona)