You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by vi...@apache.org on 2013/03/21 21:35:48 UTC

svn commit: r1459512 - in /oozie/trunk: ./ core/src/main/java/org/apache/oozie/command/coord/ core/src/main/java/org/apache/oozie/service/ core/src/test/java/org/apache/oozie/command/coord/ core/src/test/java/org/apache/oozie/service/

Author: virag
Date: Thu Mar 21 20:35:48 2013
New Revision: 1459512

URL: http://svn.apache.org/r1459512
Log:
OOZIE-1280 CoordPushDependencyCheck queued by Recovery Services doesn't remove dependencies from cache (rohini via virag)

Modified:
    oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
    oozie/trunk/core/src/main/java/org/apache/oozie/service/RecoveryService.java
    oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java
    oozie/trunk/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java
    oozie/trunk/release-log.txt

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java?rev=1459512&r1=1459511&r2=1459512&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java Thu Mar 21 20:35:48 2013
@@ -70,15 +70,22 @@ public class CoordPushDependencyCheckXCo
      */
     private final int DEFAULT_COMMAND_REQUEUE_INTERVAL = 600000;
     private boolean registerForNotification;
+    private boolean removeAvailDependencies;
 
     public CoordPushDependencyCheckXCommand(String actionId) {
-        this(actionId, false);
+        this(actionId, false, true);
     }
 
     public CoordPushDependencyCheckXCommand(String actionId, boolean registerForNotification) {
+        this(actionId, registerForNotification, !registerForNotification);
+    }
+
+    public CoordPushDependencyCheckXCommand(String actionId, boolean registerForNotification,
+            boolean removeAvailDependencies) {
         super("coord_push_dep_check", "coord_push_dep_check", 0);
         this.actionId = actionId;
         this.registerForNotification = registerForNotification;
+        this.removeAvailDependencies = removeAvailDependencies;
     }
 
     protected CoordPushDependencyCheckXCommand(String actionName, String actionId) {
@@ -145,7 +152,7 @@ public class CoordPushDependencyCheckXCo
                 if (registerForNotification) {
                     registerForNotification(actionDep.getMissingDependencies(), actionConf);
                 }
-                else {
+                if (removeAvailDependencies) {
                     unregisterAvailableDependencies(actionDep.getAvailableDependencies());
                 }
                 if (timeout) {
@@ -164,8 +171,9 @@ public class CoordPushDependencyCheckXCo
                         && coordAction.getMissingDependencies().length() > 0) {
                     // Queue again on exception as RecoveryService will not queue this again with
                     // the action being updated regularly by CoordActionInputCheckXCommand
-                    callableQueueService.queue(new CoordPushDependencyCheckXCommand(coordAction.getId()), Services
-                            .get().getConf().getInt(RecoveryService.CONF_COORD_OLDER_THAN, 600) * 1000);
+                    callableQueueService.queue(new CoordPushDependencyCheckXCommand(coordAction.getId(),
+                            registerForNotification, removeAvailDependencies),
+                            Services.get().getConf().getInt(RecoveryService.CONF_COORD_OLDER_THAN, 600) * 1000);
                 }
                 throw new CommandException(ErrorCode.E1021, e.getMessage(), e);
             }

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/service/RecoveryService.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/service/RecoveryService.java?rev=1459512&r1=1459511&r2=1459512&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/service/RecoveryService.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/service/RecoveryService.java Thu Mar 21 20:35:48 2013
@@ -245,7 +245,7 @@ public class RecoveryService implements 
                                 + caction.getId());
                         if (caction.getPushMissingDependencies() != null
                                 && caction.getPushMissingDependencies().length() != 0) {
-                            queueCallable(new CoordPushDependencyCheckXCommand(caction.getId(), true),
+                            queueCallable(new CoordPushDependencyCheckXCommand(caction.getId(), true, true),
                                     pushMissingDepDelay);
                             pushMissingDepDelay = pushMissingDepDelay + pushMissingDepInterval;
                             log.info("Recover a WAITING coord action and resubmit CoordPushDependencyCheckX :"

Modified: oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java?rev=1459512&r1=1459511&r2=1459512&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java Thu Mar 21 20:35:48 2013
@@ -395,9 +395,10 @@ public class TestCoordPushDependencyChec
         // Should be requeued at the recovery service interval
         final List<String> queueDump = callableQueueService.getQueueDump();
         assertEquals(1, callableQueueService.getQueueDump().size());
-        // Delay should be something like delay=5999999. Ignore last digit
-        assertTrue(queueDump.get(0).contains("delay=599999"));
         assertTrue(queueDump.get(0).contains(CoordPushDependencyCheckXCommand.class.getName()));
+        log.info("Queue dump is " + queueDump.toString());
+        // Delay should be something like delay=599999. Ignore last three digits
+        assertTrue(queueDump.get(0).matches("delay=599[0-9]{3}, .*"));
     }
 
     @Test

Modified: oozie/trunk/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java?rev=1459512&r1=1459511&r2=1459512&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java Thu Mar 21 20:35:48 2013
@@ -356,6 +356,7 @@ public class TestRecoveryService extends
 
         HCatAccessorService hcatService = services.get(HCatAccessorService.class);
         JMSAccessorService jmsService = services.get(JMSAccessorService.class);
+        PartitionDependencyManagerService pdms = services.get(PartitionDependencyManagerService.class);
         assertFalse(jmsService.isListeningToTopic(hcatService.getJMSConnectionInfo(new URI(newHCatDependency1)), db
                 + "." + table));
 
@@ -363,6 +364,10 @@ public class TestRecoveryService extends
         String actionId = addInitRecords(newHCatDependency);
         CoordinatorAction ca = checkCoordActionDependencies(actionId, newHCatDependency);
         assertEquals(CoordinatorAction.Status.WAITING, ca.getStatus());
+        // Register the missing dependencies to PDMS assuming CoordPushDependencyCheckCommand did this.
+        pdms.addMissingDependency(new HCatURI(newHCatDependency1), actionId);
+        pdms.addMissingDependency(new HCatURI(newHCatDependency2), actionId);
+
         sleep(2000);
         Runnable recoveryRunnable = new RecoveryRunnable(0, 1, 1);
         recoveryRunnable.run();
@@ -374,7 +379,6 @@ public class TestRecoveryService extends
                 + db + "." + table));
         checkCoordActionDependencies(actionId, newHCatDependency1);
 
-        PartitionDependencyManagerService pdms = services.get(PartitionDependencyManagerService.class);
         assertNull(pdms.getWaitingActions(new HCatURI(newHCatDependency2)));
         Collection<String> waitingActions = pdms.getWaitingActions(new HCatURI(newHCatDependency1));
         assertEquals(1, waitingActions.size());

Modified: oozie/trunk/release-log.txt
URL: http://svn.apache.org/viewvc/oozie/trunk/release-log.txt?rev=1459512&r1=1459511&r2=1459512&view=diff
==============================================================================
--- oozie/trunk/release-log.txt (original)
+++ oozie/trunk/release-log.txt Thu Mar 21 20:35:48 2013
@@ -8,6 +8,7 @@ OOZIE-1239 Bump up trunk to 4.1.0-SNAPSH
 
 -- Oozie 4.0.0 (unreleased)
 
+OOZIE-1280 CoordPushDependencyCheck queued by Recovery Services doesn't remove dependencies from cache (rohini via virag)
 OOZIE-1277 CoordActionInputCheck requeues itself even if only push missing dependencies exist (virag)
 OOZIE-1272 Two workflow jobs mapped to a single coordinator action (ryota via virag)
 OOZIE-1274 change recovery service interval to make it consistent with oozie-default.xml (ryota via virag)