You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by mo...@apache.org on 2013/03/06 00:08:03 UTC
svn commit: r1453079 - in /oozie/branches/branch-4.0: ./
core/src/main/java/org/apache/oozie/command/coord/
core/src/test/java/org/apache/oozie/command/coord/
core/src/test/java/org/apache/oozie/test/
Author: mona
Date: Tue Mar 5 23:08:03 2013
New Revision: 1453079
URL: http://svn.apache.org/r1453079
Log:
OOZIE-1250 Coord action timeout not happening when there is a exception (rohini via mona)
Modified:
oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java
oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java
oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
oozie/branches/branch-4.0/release-log.txt
Modified: oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
URL: http://svn.apache.org/viewvc/oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java?rev=1453079&r1=1453078&r2=1453079&view=diff
==============================================================================
--- oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java (original)
+++ oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java Tue Mar 5 23:08:03 2013
@@ -42,6 +42,7 @@ import org.apache.oozie.executor.jpa.Coo
import org.apache.oozie.executor.jpa.CoordActionUpdateForModifiedTimeJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.service.CallableQueueService;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Service;
import org.apache.oozie.service.Services;
@@ -157,7 +158,7 @@ public class CoordActionInputCheckXComma
}
else {
if (!nonExistListStr.isEmpty() && pushDeps == null || pushDeps.length() == 0) {
- queue(new CoordActionTimeOutXCommand(coordAction), 100);
+ queue(new CoordActionTimeOutXCommand(coordAction));
}
else {
// Let CoordPushDependencyCheckXCommand queue the timeout
@@ -167,7 +168,9 @@ public class CoordActionInputCheckXComma
}
catch (Exception e) {
if (isTimeout(currentTime)) {
- queue(new CoordActionTimeOutXCommand(coordAction), 100);
+ LOG.debug("Queueing timeout command");
+ // XCommand.queue() will not work when there is a Exception
+ Services.get().get(CallableQueueService.class).queue(new CoordActionTimeOutXCommand(coordAction));
}
throw new CommandException(ErrorCode.E1021, e.getMessage(), e);
}
Modified: oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
URL: http://svn.apache.org/viewvc/oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java?rev=1453079&r1=1453078&r2=1453079&view=diff
==============================================================================
--- oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java (original)
+++ oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java Tue Mar 5 23:08:03 2013
@@ -20,6 +20,7 @@ package org.apache.oozie.command.coord;
import java.io.IOException;
import java.io.StringReader;
import java.net.URI;
+import java.util.Arrays;
import java.util.Date;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
@@ -32,7 +33,6 @@ import org.apache.oozie.client.Job;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.PreconditionException;
-import org.apache.oozie.coord.CoordELEvaluator;
import org.apache.oozie.dependency.DependencyChecker;
import org.apache.oozie.dependency.ActionDependency;
import org.apache.oozie.dependency.URIHandler;
@@ -41,16 +41,14 @@ import org.apache.oozie.executor.jpa.Coo
import org.apache.oozie.executor.jpa.CoordActionUpdatePushInputCheckJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.service.CallableQueueService;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Service;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.URIHandlerService;
-import org.apache.oozie.util.ELEvaluator;
import org.apache.oozie.util.LogUtils;
import org.apache.oozie.util.StatusUtils;
import org.apache.oozie.util.XConfiguration;
-import org.apache.oozie.util.XmlUtils;
-import org.jdom.Element;
public class CoordPushDependencyCheckXCommand extends CoordinatorXCommand<Void> {
protected String actionId;
@@ -93,7 +91,9 @@ public class CoordPushDependencyCheckXCo
LOG.info("Nothing to check. Empty push missing dependency");
}
else {
- LOG.info("Push missing dependencies for actionID [{0}] is [{1}] ", actionId, pushMissingDeps);
+ String[] missingDepsArray = DependencyChecker.dependenciesAsArray(pushMissingDeps);
+ LOG.info("First Push missing dependency for actionID [{0}] is [{1}] ", actionId, missingDepsArray[0]);
+ LOG.trace("Push missing dependencies for actionID [{0}] is [{1}] ", actionId, pushMissingDeps);
try {
Configuration actionConf = null;
@@ -104,7 +104,6 @@ public class CoordPushDependencyCheckXCo
throw new CommandException(ErrorCode.E1307, e.getMessage(), e);
}
- String[] missingDepsArray = DependencyChecker.dependenciesAsArray(pushMissingDeps);
// Check all dependencies during materialization to avoid registering in the cache.
// But check only first missing one afterwards similar to
// CoordActionInputCheckXCommand for efficiency. listPartitions is costly.
@@ -129,7 +128,7 @@ public class CoordPushDependencyCheckXCo
// Checking for timeout
timeout = isTimeout();
if (timeout) {
- queue(new CoordActionTimeOutXCommand(coordAction), 100);
+ queue(new CoordActionTimeOutXCommand(coordAction));
}
else {
queue(new CoordPushDependencyCheckXCommand(coordAction.getId()),
@@ -150,7 +149,10 @@ public class CoordPushDependencyCheckXCo
}
catch (Exception e) {
if (isTimeout()) {
- queue(new CoordActionTimeOutXCommand(coordAction), 100);
+ LOG.debug("Queueing timeout command");
+ // XCommand.queue() will not work when there is a Exception
+ Services.get().get(CallableQueueService.class).queue(new CoordActionTimeOutXCommand(coordAction));
+ unregisterMissingDependencies(Arrays.asList(missingDepsArray));
}
throw new CommandException(ErrorCode.E1021, e.getMessage(), e);
}
Modified: oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java
URL: http://svn.apache.org/viewvc/oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java?rev=1453079&r1=1453078&r2=1453079&view=diff
==============================================================================
--- oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java (original)
+++ oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java Tue Mar 5 23:08:03 2013
@@ -51,6 +51,7 @@ import org.apache.oozie.util.XConfigurat
import org.apache.oozie.util.XLog;
import org.apache.oozie.util.XmlUtils;
import org.jdom.Element;
+import org.junit.Test;
public class TestCoordActionInputCheckXCommand extends XDataTestCase {
protected Services services;
@@ -570,6 +571,46 @@ public class TestCoordActionInputCheckXC
assertEquals(action.getStatus(), CoordinatorAction.Status.READY);
}
+ @Test
+ public void testTimeout() throws Exception {
+ String missingDeps = "hdfs:///dirx/filex";
+ String actionId = addInitRecords(missingDeps, null, TZ);
+ new CoordActionInputCheckXCommand(actionId, actionId.substring(0, actionId.indexOf("@"))).call();
+ // Timeout is 10 mins. Change action creation time to before 12 min to make the action
+ // timeout.
+ long timeOutCreationTime = System.currentTimeMillis() - (12 * 60 * 1000);
+ setCoordActionCreationTime(actionId, timeOutCreationTime);
+ new CoordActionInputCheckXCommand(actionId, actionId.substring(0, actionId.indexOf("@"))).call();
+ Thread.sleep(100);
+ checkCoordAction(actionId, missingDeps, CoordinatorAction.Status.TIMEDOUT);
+ }
+
+ @Test
+ public void testTimeoutWithException() throws Exception {
+ String missingDeps = "nofs:///dirx/filex";
+ String actionId = addInitRecords(missingDeps, null, TZ);
+ try {
+ new CoordActionInputCheckXCommand(actionId, actionId.substring(0, actionId.indexOf("@"))).call();
+ fail();
+ }
+ catch (Exception e) {
+ assertTrue(e.getMessage().contains("No FileSystem for scheme"));
+ }
+ // Timeout is 10 mins. Change action created time to before 12 min to make the action
+ // timeout.
+ long timeOutCreationTime = System.currentTimeMillis() - (12 * 60 * 1000);
+ setCoordActionCreationTime(actionId, timeOutCreationTime);
+ try {
+ new CoordActionInputCheckXCommand(actionId, actionId.substring(0, actionId.indexOf("@"))).call();
+ fail();
+ }
+ catch (Exception e) {
+ assertTrue(e.getMessage().contains("No FileSystem for scheme"));
+ }
+ Thread.sleep(100);
+ checkCoordAction(actionId, missingDeps, CoordinatorAction.Status.TIMEDOUT);
+ }
+
protected CoordinatorJobBean addRecordToCoordJobTableForWaiting(String testFileName, CoordinatorJob.Status status,
Date start, Date end, boolean pending, boolean doneMatd, int lastActionNum) throws Exception {
@@ -747,6 +788,22 @@ public class TestCoordActionInputCheckXC
}
}
+ private CoordinatorActionBean checkCoordAction(String actionId, String expDeps, CoordinatorAction.Status stat)
+ throws Exception {
+ try {
+ JPAService jpaService = Services.get().get(JPAService.class);
+ CoordinatorActionBean action = jpaService.execute(new CoordActionGetJPAExecutor(actionId));
+ String missDeps = action.getMissingDependencies();
+ assertEquals(expDeps, missDeps);
+ assertEquals(stat, action.getStatus());
+
+ return action;
+ }
+ catch (JPAExecutorException se) {
+ throw new Exception("Action ID " + actionId + " was not stored properly in db");
+ }
+ }
+
private void createDir(String dir) {
Process pr;
try {
Modified: oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java
URL: http://svn.apache.org/viewvc/oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java?rev=1453079&r1=1453078&r2=1453079&view=diff
==============================================================================
--- oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java (original)
+++ oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java Tue Mar 5 23:08:03 2013
@@ -97,7 +97,7 @@ public class TestCoordPushDependencyChec
public void testUpdateCoordTableMultipleDepsV2() throws Exception {
// Test for two dependencies : one of them is already existing in the
// hcat server. Other one is not.
- // Expected to see both action in WAITING as first one is not available.
+ // Expected to see both action in WAITING as first one is not available and we only check for first.
// Later make the other partition also available. action is expected to
// be READY
String db = "default";
@@ -110,6 +110,7 @@ public class TestCoordPushDependencyChec
String actionId = addInitRecords(newHCatDependency);
checkCoordAction(actionId, newHCatDependency, CoordinatorAction.Status.WAITING);
+ // Checks only for first missing dependency
new CoordPushDependencyCheckXCommand(actionId).call();
// Checks dependencies in order. So list does not change if first one is not available
@@ -126,6 +127,39 @@ public class TestCoordPushDependencyChec
checkCoordAction(actionId, "", CoordinatorAction.Status.READY);
}
+ @Test
+ public void testUpdateCoordTableMultipleDepsV3() throws Exception {
+ // Test for two dependencies : one of them is already existing in the
+ // hcat server. Other one is not.
+ // Expected to see only first action in WAITING as we check for all dependencies.
+ // Later make the other partition also available. action is expected to
+ // be READY
+ String db = "default";
+ String table = "tablename";
+ String newHCatDependency1 = "hcat://" + server + "/" + db + "/" + table + "/dt=20120430;country=brazil";
+ String newHCatDependency2 = "hcat://" + server + "/" + db + "/" + table + "/dt=20120430;country=usa";
+ String newHCatDependency = newHCatDependency1 + CoordELFunctions.INSTANCE_SEPARATOR + newHCatDependency2;
+ populateTable(db, table);
+
+ String actionId = addInitRecords(newHCatDependency);
+ checkCoordAction(actionId, newHCatDependency, CoordinatorAction.Status.WAITING);
+
+ // Checks for all missing dependencies
+ new CoordPushDependencyCheckXCommand(actionId, true).call();
+ checkCoordAction(actionId, newHCatDependency1, CoordinatorAction.Status.WAITING);
+ PartitionDependencyManagerService pdms = Services.get().get(PartitionDependencyManagerService.class);
+ HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
+ assertTrue(pdms.getWaitingActions(new HCatURI(newHCatDependency1)).contains(actionId));
+ assertTrue(hcatService.isRegisteredForNotification(new HCatURI(newHCatDependency1)));
+ assertNull(pdms.getWaitingActions(new HCatURI(newHCatDependency2)));
+
+ // Make first dependency available
+ addPartition(db, table, "dt=20120430;country=brazil");
+ new CoordPushDependencyCheckXCommand(actionId).call();
+ checkCoordAction(actionId, "", CoordinatorAction.Status.READY);
+ assertNull(pdms.getWaitingActions(new HCatURI(newHCatDependency1)));
+ assertFalse(hcatService.isRegisteredForNotification(new HCatURI(newHCatDependency1)));
+ }
@Test
public void testResolveCoordConfiguration() throws Exception {
@@ -139,10 +173,11 @@ public class TestCoordPushDependencyChec
CoordinatorJobBean job = addRecordToCoordJobTableForWaiting("coord-job-for-action-input-check.xml",
CoordinatorJob.Status.RUNNING, false, true);
- CoordinatorActionBean action1 = addRecordToCoordActionTableForWaiting(job.getId(), 1,
- CoordinatorAction.Status.WAITING, "coord-action-for-action-push-check.xml", newHCatDependency);
+ CoordinatorActionBean action = addRecordToCoordActionTableForWaiting(job.getId(), 1,
+ CoordinatorAction.Status.WAITING, "coord-action-for-action-push-check.xml", null, newHCatDependency,
+ "Z");
- String actionId = action1.getId();
+ String actionId = action.getId();
checkCoordAction(actionId, newHCatDependency, CoordinatorAction.Status.WAITING);
new CoordPushDependencyCheckXCommand(actionId).call();
@@ -162,14 +197,8 @@ public class TestCoordPushDependencyChec
}
-
@Test
- public void testUpdateCoordTableMultipleDepsV3() throws Exception {
- // Test for two dependencies : one of them is already existing in the
- // hcat server. Other one is not.
- // Expected to see the action in WAITING
- // Later make the other partition also available. action is expected to
- // be READY
+ public void testTimeOut() throws Exception {
String db = "default";
String table = "tablename";
String newHCatDependency1 = "hcat://" + server + "/" + db + "/" + table + "/dt=20120430;country=brazil";
@@ -179,19 +208,99 @@ public class TestCoordPushDependencyChec
String actionId = addInitRecords(newHCatDependency);
checkCoordAction(actionId, newHCatDependency, CoordinatorAction.Status.WAITING);
-
new CoordPushDependencyCheckXCommand(actionId, true).call();
checkCoordAction(actionId, newHCatDependency1, CoordinatorAction.Status.WAITING);
PartitionDependencyManagerService pdms = Services.get().get(PartitionDependencyManagerService.class);
- assertTrue(pdms.getWaitingActions(new HCatURI(newHCatDependency1)).contains(actionId));
HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
+ assertTrue(pdms.getWaitingActions(new HCatURI(newHCatDependency1)).contains(actionId));
assertTrue(hcatService.isRegisteredForNotification(new HCatURI(newHCatDependency1)));
- // Make first dependency available
- addPartition(db, table, "dt=20120430;country=brazil");
+ // Timeout is 10 mins. Change action created time to before 12 min to make the action
+ // timeout.
+ long timeOutCreationTime = System.currentTimeMillis() - (12 * 60 * 1000);
+ setCoordActionCreationTime(actionId, timeOutCreationTime);
new CoordPushDependencyCheckXCommand(actionId).call();
+ Thread.sleep(100);
+ // Check for timeout status and unregistered missing dependencies
+ checkCoordAction(actionId, newHCatDependency1, CoordinatorAction.Status.TIMEDOUT);
+ assertNull(pdms.getWaitingActions(new HCatURI(newHCatDependency1)));
+ assertFalse(hcatService.isRegisteredForNotification(new HCatURI(newHCatDependency1)));
+
+ }
+
+ @Test
+ public void testTimeOutWithException1() throws Exception {
+ // Test timeout when missing dependencies are from a non existing table
+ String newHCatDependency1 = "hcat://" + server + "/nodb/notable/dt=20120430;country=brazil";
+ String newHCatDependency2 = "hcat://" + server + "/nodb/notable/dt=20120430;country=usa";
+ String newHCatDependency = newHCatDependency1 + CoordELFunctions.INSTANCE_SEPARATOR + newHCatDependency2;
+
+ String actionId = addInitRecords(newHCatDependency);
+ checkCoordAction(actionId, newHCatDependency, CoordinatorAction.Status.WAITING);
+ try {
+ new CoordPushDependencyCheckXCommand(actionId, true).call();
+ fail();
+ }
+ catch (Exception e) {
+ assertTrue(e.getMessage().contains("NoSuchObjectException"));
+ }
+ checkCoordAction(actionId, newHCatDependency, CoordinatorAction.Status.WAITING);
+ PartitionDependencyManagerService pdms = Services.get().get(PartitionDependencyManagerService.class);
+ HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
+ assertNull(pdms.getWaitingActions(new HCatURI(newHCatDependency1)));
+ assertFalse(hcatService.isRegisteredForNotification(new HCatURI(newHCatDependency1)));
+
+ // Timeout is 10 mins. Change action created time to before 12 min to make the action
+ // timeout.
+ long timeOutCreationTime = System.currentTimeMillis() - (12 * 60 * 1000);
+ setCoordActionCreationTime(actionId, timeOutCreationTime);
+ try {
+ new CoordPushDependencyCheckXCommand(actionId).call();
+ fail();
+ }
+ catch (Exception e) {
+ assertTrue(e.getMessage().contains("NoSuchObjectException"));
+ }
+ Thread.sleep(100);
+ // Check for timeout status and unregistered missing dependencies
+ checkCoordAction(actionId, newHCatDependency, CoordinatorAction.Status.TIMEDOUT);
+ }
+
+ @Test
+ public void testTimeOutWithException2() throws Exception {
+ // Test timeout when table containing missing dependencies is dropped in between
+ String db = "default";
+ String table = "tablename";
+ String newHCatDependency1 = "hcat://" + server + "/" + db + "/" + table + "/dt=20120430;country=brazil";
+ String newHCatDependency2 = "hcat://" + server + "/" + db + "/" + table + "/dt=20120430;country=usa";
+ String newHCatDependency = newHCatDependency1 + CoordELFunctions.INSTANCE_SEPARATOR + newHCatDependency2;
+ populateTable(db, table);
+
+ String actionId = addInitRecords(newHCatDependency);
+ checkCoordAction(actionId, newHCatDependency, CoordinatorAction.Status.WAITING);
+ new CoordPushDependencyCheckXCommand(actionId, true).call();
+ checkCoordAction(actionId, newHCatDependency1, CoordinatorAction.Status.WAITING);
+ PartitionDependencyManagerService pdms = Services.get().get(PartitionDependencyManagerService.class);
+ HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
+ assertTrue(pdms.getWaitingActions(new HCatURI(newHCatDependency1)).contains(actionId));
+ assertTrue(hcatService.isRegisteredForNotification(new HCatURI(newHCatDependency1)));
+
+ // Timeout is 10 mins. Change action created time to before 12 min to make the action
+ // timeout.
+ long timeOutCreationTime = System.currentTimeMillis() - (12 * 60 * 1000);
+ setCoordActionCreationTime(actionId, timeOutCreationTime);
+ dropTable(db, table, true);
+ try {
+ new CoordPushDependencyCheckXCommand(actionId).call();
+ fail();
+ }
+ catch (Exception e) {
+ assertTrue(e.getMessage().contains("NoSuchObjectException"));
+ }
+ Thread.sleep(100);
+ // Check for timeout status and unregistered missing dependencies
+ checkCoordAction(actionId, newHCatDependency1, CoordinatorAction.Status.TIMEDOUT);
assertNull(pdms.getWaitingActions(new HCatURI(newHCatDependency1)));
- checkCoordAction(actionId, "", CoordinatorAction.Status.READY);
assertFalse(hcatService.isRegisteredForNotification(new HCatURI(newHCatDependency1)));
}
@@ -212,7 +321,7 @@ public class TestCoordPushDependencyChec
CoordinatorActionBean action = jpaService.execute(new CoordActionGetJPAExecutor(actionId));
String missDeps = action.getPushMissingDependencies();
assertEquals(expDeps, missDeps);
- assertEquals(action.getStatus(), stat);
+ assertEquals(stat, action.getStatus());
return action;
}
Modified: oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
URL: http://svn.apache.org/viewvc/oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/test/XDataTestCase.java?rev=1453079&r1=1453078&r2=1453079&view=diff
==============================================================================
--- oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/test/XDataTestCase.java (original)
+++ oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/test/XDataTestCase.java Tue Mar 5 23:08:03 2013
@@ -58,6 +58,7 @@ import org.apache.oozie.executor.jpa.JPA
import org.apache.oozie.executor.jpa.SLAEventInsertJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowActionInsertJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobInsertJPAExecutor;
+import org.apache.oozie.service.CoordinatorStoreService;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.LiteWorkflowStoreService;
import org.apache.oozie.service.Services;
@@ -65,6 +66,7 @@ import org.apache.oozie.service.UUIDServ
import org.apache.oozie.service.WorkflowAppService;
import org.apache.oozie.service.WorkflowStoreService;
import org.apache.oozie.service.UUIDService.ApplicationType;
+import org.apache.oozie.store.CoordinatorStore;
import org.apache.oozie.util.DateUtils;
import org.apache.oozie.util.IOUtils;
import org.apache.oozie.util.XConfiguration;
@@ -581,6 +583,7 @@ public abstract class XDataTestCase exte
action.setCreatedTime(new Date());
action.setStatus(status);
action.setActionXml(actionXml);
+ action.setTimeOut(10);
Configuration conf = getCoordConf(appPath);
action.setCreatedConf(XmlUtils.prettyPrint(conf).toString());
@@ -1284,17 +1287,26 @@ public abstract class XDataTestCase exte
}
protected String addInitRecords(String pushMissingDependencies) throws Exception {
+ return addInitRecords(null, pushMissingDependencies, "Z");
+ }
+
+ protected String addInitRecords(String missingDependencies, String pushMissingDependencies, String oozieTimeZoneMask)
+ throws Exception {
CoordinatorJobBean job = addRecordToCoordJobTableForWaiting("coord-job-for-action-input-check.xml",
CoordinatorJob.Status.RUNNING, false, true);
- CoordinatorActionBean action1 = addRecordToCoordActionTableForWaiting(job.getId(), 1,
- CoordinatorAction.Status.WAITING, "coord-action-for-action-input-check.xml", pushMissingDependencies);
- return action1.getId();
+ CoordinatorActionBean action = addRecordToCoordActionTableForWaiting(job.getId(), 1,
+ CoordinatorAction.Status.WAITING, "coord-action-for-action-input-check.xml", missingDependencies,
+ pushMissingDependencies, oozieTimeZoneMask);
+ return action.getId();
}
protected CoordinatorActionBean addRecordToCoordActionTableForWaiting(String jobId, int actionNum,
- CoordinatorAction.Status status, String resourceXmlName, String pushMissingDependencies) throws Exception {
- CoordinatorActionBean action = createCoordAction(jobId, actionNum, status, resourceXmlName, 0);
+ CoordinatorAction.Status status, String resourceXmlName, String missingDependencies,
+ String pushMissingDependencies, String oozieTimeZoneMask) throws Exception {
+ CoordinatorActionBean action = createCoordAction(jobId, actionNum, status, resourceXmlName, 0,
+ oozieTimeZoneMask);
+ action.setMissingDependencies(missingDependencies);
action.setPushMissingDependencies(pushMissingDependencies);
try {
JPAService jpaService = Services.get().get(JPAService.class);
@@ -1344,4 +1356,13 @@ public abstract class XDataTestCase exte
throw new RuntimeException(XLog.format("Could not get " + testFileName, ioe));
}
}
+
+ protected void setCoordActionCreationTime(String actionId, long actionCreationTime) throws Exception {
+ CoordinatorStore store = Services.get().get(CoordinatorStoreService.class).create();
+ CoordinatorActionBean action = store.getCoordinatorAction(actionId, false);
+ action.setCreatedTime(new Date(actionCreationTime));
+ store.beginTrx();
+ store.updateCoordinatorAction(action);
+ store.commitTrx();
+ }
}
Modified: oozie/branches/branch-4.0/release-log.txt
URL: http://svn.apache.org/viewvc/oozie/branches/branch-4.0/release-log.txt?rev=1453079&r1=1453078&r2=1453079&view=diff
==============================================================================
--- oozie/branches/branch-4.0/release-log.txt (original)
+++ oozie/branches/branch-4.0/release-log.txt Tue Mar 5 23:08:03 2013
@@ -1,5 +1,6 @@
-- Oozie 4.0.0 (unreleased)
+OOZIE-1250 Coord action timeout not happening when there is a exception (rohini via mona)
OOZIE-1207 Optimize current EL resolution in case of start-instance and end-instance (rohini via mona)
OOZIE-1247 CoordActionInputCheck shouldn't queue CoordPushInputCheck (rohini via virag)
OOZIE-1235 CoordPushCheck doesn't evaluate the configuration section which is propogated to workflow (virag)