You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by vi...@apache.org on 2013/03/21 21:36:14 UTC
svn commit: r1459513 - in /oozie/branches/branch-4.0: ./
core/src/main/java/org/apache/oozie/command/coord/
core/src/main/java/org/apache/oozie/service/
core/src/test/java/org/apache/oozie/command/coord/
core/src/test/java/org/apache/oozie/service/
Author: virag
Date: Thu Mar 21 20:36:14 2013
New Revision: 1459513
URL: http://svn.apache.org/r1459513
Log:
OOZIE-1280 CoordPushDependencyCheck queued by Recovery Services doesn't remove dependencies from cache (rohini via virag)
Modified:
oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/service/RecoveryService.java
oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java
oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java
oozie/branches/branch-4.0/release-log.txt
Modified: oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
URL: http://svn.apache.org/viewvc/oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java?rev=1459513&r1=1459512&r2=1459513&view=diff
==============================================================================
--- oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java (original)
+++ oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java Thu Mar 21 20:36:14 2013
@@ -70,15 +70,22 @@ public class CoordPushDependencyCheckXCo
*/
private final int DEFAULT_COMMAND_REQUEUE_INTERVAL = 600000;
private boolean registerForNotification;
+ private boolean removeAvailDependencies;
public CoordPushDependencyCheckXCommand(String actionId) {
- this(actionId, false);
+ this(actionId, false, true);
}
public CoordPushDependencyCheckXCommand(String actionId, boolean registerForNotification) {
+ this(actionId, registerForNotification, !registerForNotification);
+ }
+
+ public CoordPushDependencyCheckXCommand(String actionId, boolean registerForNotification,
+ boolean removeAvailDependencies) {
super("coord_push_dep_check", "coord_push_dep_check", 0);
this.actionId = actionId;
this.registerForNotification = registerForNotification;
+ this.removeAvailDependencies = removeAvailDependencies;
}
protected CoordPushDependencyCheckXCommand(String actionName, String actionId) {
@@ -145,7 +152,7 @@ public class CoordPushDependencyCheckXCo
if (registerForNotification) {
registerForNotification(actionDep.getMissingDependencies(), actionConf);
}
- else {
+ if (removeAvailDependencies) {
unregisterAvailableDependencies(actionDep.getAvailableDependencies());
}
if (timeout) {
@@ -164,8 +171,9 @@ public class CoordPushDependencyCheckXCo
&& coordAction.getMissingDependencies().length() > 0) {
// Queue again on exception as RecoveryService will not queue this again with
// the action being updated regularly by CoordActionInputCheckXCommand
- callableQueueService.queue(new CoordPushDependencyCheckXCommand(coordAction.getId()), Services
- .get().getConf().getInt(RecoveryService.CONF_COORD_OLDER_THAN, 600) * 1000);
+ callableQueueService.queue(new CoordPushDependencyCheckXCommand(coordAction.getId(),
+ registerForNotification, removeAvailDependencies),
+ Services.get().getConf().getInt(RecoveryService.CONF_COORD_OLDER_THAN, 600) * 1000);
}
throw new CommandException(ErrorCode.E1021, e.getMessage(), e);
}
Modified: oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/service/RecoveryService.java
URL: http://svn.apache.org/viewvc/oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/service/RecoveryService.java?rev=1459513&r1=1459512&r2=1459513&view=diff
==============================================================================
--- oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/service/RecoveryService.java (original)
+++ oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/service/RecoveryService.java Thu Mar 21 20:36:14 2013
@@ -245,7 +245,7 @@ public class RecoveryService implements
+ caction.getId());
if (caction.getPushMissingDependencies() != null
&& caction.getPushMissingDependencies().length() != 0) {
- queueCallable(new CoordPushDependencyCheckXCommand(caction.getId(), true),
+ queueCallable(new CoordPushDependencyCheckXCommand(caction.getId(), true, true),
pushMissingDepDelay);
pushMissingDepDelay = pushMissingDepDelay + pushMissingDepInterval;
log.info("Recover a WAITING coord action and resubmit CoordPushDependencyCheckX :"
Modified: oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java
URL: http://svn.apache.org/viewvc/oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java?rev=1459513&r1=1459512&r2=1459513&view=diff
==============================================================================
--- oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java (original)
+++ oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java Thu Mar 21 20:36:14 2013
@@ -395,9 +395,10 @@ public class TestCoordPushDependencyChec
// Should be requeued at the recovery service interval
final List<String> queueDump = callableQueueService.getQueueDump();
assertEquals(1, callableQueueService.getQueueDump().size());
- // Delay should be something like delay=5999999. Ignore last digit
- assertTrue(queueDump.get(0).contains("delay=599999"));
assertTrue(queueDump.get(0).contains(CoordPushDependencyCheckXCommand.class.getName()));
+ log.info("Queue dump is " + queueDump.toString());
+ // Delay should be something like delay=599999. Ignore last three digits
+ assertTrue(queueDump.get(0).matches("delay=599[0-9]{3}, .*"));
}
@Test
Modified: oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java
URL: http://svn.apache.org/viewvc/oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java?rev=1459513&r1=1459512&r2=1459513&view=diff
==============================================================================
--- oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java (original)
+++ oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java Thu Mar 21 20:36:14 2013
@@ -356,6 +356,7 @@ public class TestRecoveryService extends
HCatAccessorService hcatService = services.get(HCatAccessorService.class);
JMSAccessorService jmsService = services.get(JMSAccessorService.class);
+ PartitionDependencyManagerService pdms = services.get(PartitionDependencyManagerService.class);
assertFalse(jmsService.isListeningToTopic(hcatService.getJMSConnectionInfo(new URI(newHCatDependency1)), db
+ "." + table));
@@ -363,6 +364,10 @@ public class TestRecoveryService extends
String actionId = addInitRecords(newHCatDependency);
CoordinatorAction ca = checkCoordActionDependencies(actionId, newHCatDependency);
assertEquals(CoordinatorAction.Status.WAITING, ca.getStatus());
+ // Register the missing dependencies to PDMS assuming CoordPushDependencyCheckCommand did this.
+ pdms.addMissingDependency(new HCatURI(newHCatDependency1), actionId);
+ pdms.addMissingDependency(new HCatURI(newHCatDependency2), actionId);
+
sleep(2000);
Runnable recoveryRunnable = new RecoveryRunnable(0, 1, 1);
recoveryRunnable.run();
@@ -374,7 +379,6 @@ public class TestRecoveryService extends
+ db + "." + table));
checkCoordActionDependencies(actionId, newHCatDependency1);
- PartitionDependencyManagerService pdms = services.get(PartitionDependencyManagerService.class);
assertNull(pdms.getWaitingActions(new HCatURI(newHCatDependency2)));
Collection<String> waitingActions = pdms.getWaitingActions(new HCatURI(newHCatDependency1));
assertEquals(1, waitingActions.size());
Modified: oozie/branches/branch-4.0/release-log.txt
URL: http://svn.apache.org/viewvc/oozie/branches/branch-4.0/release-log.txt?rev=1459513&r1=1459512&r2=1459513&view=diff
==============================================================================
--- oozie/branches/branch-4.0/release-log.txt (original)
+++ oozie/branches/branch-4.0/release-log.txt Thu Mar 21 20:36:14 2013
@@ -1,5 +1,6 @@
-- Oozie 4.0.0 (unreleased)
+OOZIE-1280 CoordPushDependencyCheck queued by Recovery Services doesn't remove dependencies from cache (rohini via virag)
OOZIE-1277 CoordActionInputCheck requeues itself even if only push missing dependencies exist (virag)
OOZIE-1272 Two workflow jobs mapped to a single coordinator action (ryota via virag)
OOZIE-1274 change recovery service interval to make it consistent with oozie-default.xml (ryota via virag)