You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by an...@apache.org on 2012/01/18 02:09:14 UTC

svn commit: r1232709 [2/2] - in /incubator/oozie/trunk: ./ core/src/main/java/org/apache/oozie/command/ core/src/main/java/org/apache/oozie/command/bundle/ core/src/main/java/org/apache/oozie/command/coord/ core/src/main/java/org/apache/oozie/command/w...

Modified: incubator/oozie/trunk/core/src/test/java/org/apache/oozie/service/TestCallableQueueService.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/service/TestCallableQueueService.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/test/java/org/apache/oozie/service/TestCallableQueueService.java (original)
+++ incubator/oozie/trunk/core/src/test/java/org/apache/oozie/service/TestCallableQueueService.java Wed Jan 18 01:09:12 2012
@@ -18,11 +18,16 @@
 package org.apache.oozie.service;
 
 import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.command.XCommand;
 import org.apache.oozie.test.XTestCase;
 import org.apache.oozie.util.XCallable;
 
@@ -100,6 +105,86 @@ public class TestCallableQueueService ex
         public String getKey() {
             return this.key;
         }
+
+        @Override
+        public String getEntityKey() {
+            return null;
+        }
+
+        @Override
+        public void setInterruptMode(boolean mode) {
+        }
+
+        @Override
+        public boolean getInterruptMode() {
+            return false;
+        }
+    }
+
+    public static class ExtendedXCommand extends XCommand<Void> {
+        private boolean lockRequired = true;
+        public String lockKey;
+        public long wait;
+        long executed;
+
+        public ExtendedXCommand(String key, String type, int priority, int wait, String lockKey, boolean lockRequired) {
+            super(key, type, priority, false);
+            this.lockRequired = lockRequired;
+            this.lockKey = lockKey;
+            this.wait = wait;
+        }
+
+        public ExtendedXCommand(String key, String type, int priority, int wait, String lockKey) {
+            super(key, type, priority, false);
+            this.lockKey = lockKey;
+            this.wait = wait;
+        }
+
+        @Override
+        protected boolean isLockRequired() {
+            return this.lockRequired;
+        }
+
+        @Override
+        protected boolean isReQueueRequired() {
+            return false;
+        }
+
+        @Override
+        public String getEntityKey() {
+            return this.lockKey;
+        }
+
+        @Override
+        protected void eagerLoadState() {
+        }
+
+        @Override
+        protected void eagerVerifyPrecondition() throws CommandException {
+        }
+
+        @Override
+        protected void loadState() {
+        }
+
+        @Override
+        protected void verifyPrecondition() throws CommandException {
+        }
+
+        @Override
+        protected Void execute() throws CommandException {
+            if (executed == 0) {
+                try {
+                    Thread.sleep(this.wait);
+                }
+                catch (InterruptedException exp) {
+                    throw new CommandException(ErrorCode.ETEST);
+                }
+                executed = System.currentTimeMillis();
+                ;
+            }
+            return null;
+        }
     }
 
     public void testQueuing() throws Exception {
@@ -217,10 +302,15 @@ public class TestCallableQueueService ex
             return "type";
         }
 
-		@Override
-		public String getKey() {
-			return "name" + "_" + UUID.randomUUID();
-		}
+        @Override
+        public String getKey() {
+            return "name" + "_" + UUID.randomUUID();
+        }
+
+        @Override
+        public String getEntityKey() {
+            return null;
+        }
 
         @Override
         public long getCreatedTime() {
@@ -228,6 +318,15 @@ public class TestCallableQueueService ex
         }
 
         @Override
+        public void setInterruptMode(boolean mode) {
+        }
+
+        @Override
+        public boolean getInterruptMode() {
+            return false;
+        }
+
+        @Override
         public Void call() throws Exception {
             incr();
             Thread.sleep(100);
@@ -599,4 +698,230 @@ public class TestCallableQueueService ex
         services.destroy();
     }
 
+    /**
+     * Testing the interrupts by introducing an interrupt command within a set
+     * of 10 commands and assuring it will be executed first
+     */
+    public void testInterrupt() throws Exception {
+        EXEC_ORDER = new AtomicLong();
+        setSystemProperty(CallableQueueService.CONF_THREADS, "1");
+        setSystemProperty(CallableQueueService.CONF_CALLABLE_INTERRUPT_TYPES, "testKill");
+        Services services = new Services();
+        services.init();
+
+        CallableQueueService queueservice = services.get(CallableQueueService.class);
+        final ExtendedXCommand initialCallable = new ExtendedXCommand("initialKey", "initialType", 2, 200,
+                "initialLockKey");
+        final List<ExtendedXCommand> callables = new ArrayList<ExtendedXCommand>();
+        for (int i = 0; i < 10; i++) {
+            callables.add(new ExtendedXCommand("key" + i, "type" + i, 1, 100, "lockKey"));
+        }
+
+        final ExtendedXCommand intCallable = new ExtendedXCommand("keyInt", "testKill", 0, 200, "lockKey");
+
+        queueservice.queue(initialCallable);
+        for (int i = 0; i < 10; i++) {
+            queueservice.queue(callables.get(i));
+        }
+
+        queueservice.queue(intCallable);
+
+        waitFor(3000, new Predicate() {
+            public boolean evaluate() throws Exception {
+                boolean retValue = initialCallable.executed != 0 && intCallable.executed != 0;
+                for (ExtendedXCommand c : callables) {
+                    retValue = retValue && c.executed != 0;
+                }
+                return retValue;
+            }
+        });
+
+        assertTrue(initialCallable.executed > 0);
+        assertTrue(intCallable.executed > 0);
+        assertTrue(intCallable.executed < callables.get(5).executed);
+        services.destroy();
+    }
+
+    /*
+     * Introducing an interrupt with different keys and assure it will be
+     * executed in order regardless of the existence of an interrupt command in
+     * the mix.
+     */
+    public void testInterruptsWithDistinguishedLockKeys() throws Exception {
+        EXEC_ORDER = new AtomicLong();
+        setSystemProperty(CallableQueueService.CONF_THREADS, "1");
+        setSystemProperty(CallableQueueService.CONF_CALLABLE_INTERRUPT_TYPES, "testKill");
+        Services services = new Services();
+        services.init();
+
+        CallableQueueService queueservice = services.get(CallableQueueService.class);
+
+        final ExtendedXCommand initialCallable = new ExtendedXCommand("initialKey", "initialType", 2, 200,
+                "initialLockKey");
+        final List<ExtendedXCommand> callables = new ArrayList<ExtendedXCommand>();
+        for (int i = 0; i < 10; i++) {
+            callables.add(new ExtendedXCommand("key" + i, "type" + i, 1, 100, "lockKey" + i));
+        }
+
+        final ExtendedXCommand intCallable = new ExtendedXCommand("keyInt", "testKill", 0, 100, "lockKey");
+
+        queueservice.queue(initialCallable);
+        for (int i = 0; i < 10; i++) {
+            queueservice.queue(callables.get(i));
+        }
+
+        queueservice.queue(intCallable);
+
+        waitFor(6000, new Predicate() {
+            public boolean evaluate() throws Exception {
+                boolean retValue = initialCallable.executed != 0 && intCallable.executed != 0;
+                for (ExtendedXCommand c : callables) {
+                    retValue = retValue && c.executed != 0;
+                }
+                return retValue;
+            }
+        });
+
+        assertTrue(initialCallable.executed > 0);
+        assertTrue(intCallable.executed > 0);
+        assertTrue(intCallable.executed > callables.get(5).executed);
+
+        services.destroy();
+    }
+
+    /*
+     * assuring an interrupt command will be executed before a composite
+     * callable with the same lock key
+     */
+    public void testInterruptsWithCompositeCallable() throws Exception {
+        EXEC_ORDER = new AtomicLong();
+        setSystemProperty(CallableQueueService.CONF_THREADS, "1");
+        setSystemProperty(CallableQueueService.CONF_CALLABLE_INTERRUPT_TYPES, "testKill");
+        Services services = new Services();
+        services.init();
+
+        CallableQueueService queueservice = services.get(CallableQueueService.class);
+        final ExtendedXCommand initialCallable = new ExtendedXCommand("initialKey", "initialType", 2, 200,
+                "initialLockKey");
+        final List<ExtendedXCommand> callables = new ArrayList<ExtendedXCommand>();
+
+        for (int i = 0; i < 10; i++) {
+            callables.add(new ExtendedXCommand("key" + i, "type" + i, 1, 100, "lockKey"));
+        }
+
+        final ExtendedXCommand intCallable = new ExtendedXCommand("key5", "testKill", 0, 200, "lockKey");
+
+        queueservice.queue(initialCallable);
+        queueservice.queueSerial((List<? extends XCallable<?>>) (callables), 0);
+        queueservice.queue(intCallable);
+
+        waitFor(3000, new Predicate() {
+            public boolean evaluate() throws Exception {
+                boolean retValue = initialCallable.executed != 0 && intCallable.executed != 0;
+                for (ExtendedXCommand c : callables) {
+                    retValue = retValue && c.executed != 0;
+                }
+                return retValue;
+            }
+        });
+
+        assertTrue(initialCallable.executed > 0);
+        assertTrue(intCallable.executed > 0);
+        for (ExtendedXCommand c : callables) {
+            assertTrue(intCallable.executed < c.executed);
+        }
+
+        services.destroy();
+    }
+
+    /*
+     * Testing an interrupt commands inside a composite callable Assuring it is
+     * executed before the others
+     */
+    public void testInterruptsInCompositeCallable() throws Exception {
+        EXEC_ORDER = new AtomicLong();
+        setSystemProperty(CallableQueueService.CONF_THREADS, "1");
+        setSystemProperty(CallableQueueService.CONF_CALLABLE_INTERRUPT_TYPES, "testKill");
+        Services services = new Services();
+        services.init();
+
+        CallableQueueService queueservice = services.get(CallableQueueService.class);
+        final ExtendedXCommand initialCallable = new ExtendedXCommand("initialKey", "initialType", 2, 200,
+                "initialLockKey");
+        final List<ExtendedXCommand> callables = new ArrayList<ExtendedXCommand>();
+
+        for (int i = 0; i < 5; i++) {
+            callables.add(new ExtendedXCommand("key" + i, "type" + i, 1, 100, "lockKey"));
+        }
+        callables.add(new ExtendedXCommand("key" + 5, "testKill", 1, 100, "lockKey"));
+        for (int i = 6; i < 10; i++) {
+            callables.add(new ExtendedXCommand("key" + i, "type" + i, 1, 100, "lockKey"));
+        }
+
+        queueservice.queue(initialCallable);
+        queueservice.queueSerial((List<? extends XCallable<?>>) (callables), 0);
+
+        waitFor(3000, new Predicate() {
+            public boolean evaluate() throws Exception {
+                boolean retValue = initialCallable.executed != 0;
+                for (ExtendedXCommand c : callables) {
+                    retValue = retValue && c.executed != 0;
+                }
+                return retValue;
+            }
+        });
+
+        assertTrue(initialCallable.executed > 0);
+        assertTrue(callables.get(1).executed > callables.get(5).executed);
+
+        services.destroy();
+    }
+
+    /*
+     * Assuring the interrupts will not be inserted in the map when it reached
+     * the max size
+     */
+    public void testMaxInterruptMapSize() throws Exception {
+        EXEC_ORDER = new AtomicLong();
+        setSystemProperty(CallableQueueService.CONF_THREADS, "1");
+        setSystemProperty(CallableQueueService.CONF_CALLABLE_INTERRUPT_TYPES, "testKill");
+        setSystemProperty(CallableQueueService.CONF_CALLABLE_INTERRUPT_MAP_MAX_SIZE, "0");
+        Services services = new Services();
+        services.init();
+
+        CallableQueueService queueservice = services.get(CallableQueueService.class);
+
+        final ExtendedXCommand initialCallable = new ExtendedXCommand("initialKey", "initialType", 2, 100,
+                "initialLockKey");
+        final List<ExtendedXCommand> callables = new ArrayList<ExtendedXCommand>();
+        for (int i = 0; i < 10; i++) {
+            callables.add(new ExtendedXCommand("key" + i, "type" + i, 1, 100, "lockKey"));
+        }
+
+        final ExtendedXCommand intCallable = new ExtendedXCommand("keyInt", "testKill", 0, 100, "lockKey");
+
+        queueservice.queue(initialCallable);
+        for (int i = 0; i < 10; i++) {
+            queueservice.queue(callables.get(i));
+        }
+
+        queueservice.queue(intCallable);
+
+        waitFor(5000, new Predicate() {
+            public boolean evaluate() throws Exception {
+                boolean retValue = initialCallable.executed != 0 && intCallable.executed != 0;
+                for (ExtendedXCommand c : callables) {
+                    retValue = retValue && c.executed != 0;
+                }
+                return retValue;
+            }
+        });
+
+        assertTrue(initialCallable.executed > 0);
+        assertTrue(intCallable.executed > 0);
+        assertTrue(intCallable.executed > callables.get(5).executed);
+
+        services.destroy();
+    }
+
 }

Modified: incubator/oozie/trunk/release-log.txt
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/release-log.txt?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/release-log.txt (original)
+++ incubator/oozie/trunk/release-log.txt Wed Jan 18 01:09:12 2012
@@ -1,5 +1,6 @@
 -- Oozie 3.2.0 release
 
+OOZIE-591: Oozie continues to materialize new actions after end date modification (Mohamed Battisha vis Angelo)
 OOZIE-639 Hive sharelib POM must exclude hadoop-core. (tucu)
 OOZIE-635 ShellMain closes the STD/ERR stream while shell is processing. (tucu)
 OOZIE-629 Oozie server to prevent usage of dataset initial-instance earlier than the system-defined default value.(Mona via Mohammad)