You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by an...@apache.org on 2012/01/18 02:09:14 UTC
svn commit: r1232709 [2/2] - in /incubator/oozie/trunk: ./
core/src/main/java/org/apache/oozie/command/
core/src/main/java/org/apache/oozie/command/bundle/
core/src/main/java/org/apache/oozie/command/coord/
core/src/main/java/org/apache/oozie/command/w...
Modified: incubator/oozie/trunk/core/src/test/java/org/apache/oozie/service/TestCallableQueueService.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/service/TestCallableQueueService.java?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/test/java/org/apache/oozie/service/TestCallableQueueService.java (original)
+++ incubator/oozie/trunk/core/src/test/java/org/apache/oozie/service/TestCallableQueueService.java Wed Jan 18 01:09:12 2012
@@ -18,11 +18,16 @@
package org.apache.oozie.service;
import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.command.XCommand;
import org.apache.oozie.test.XTestCase;
import org.apache.oozie.util.XCallable;
@@ -100,6 +105,86 @@ public class TestCallableQueueService ex
public String getKey() {
return this.key;
}
+
+ @Override
+ public String getEntityKey() {
+ return null;
+ }
+
+ @Override
+ public void setInterruptMode(boolean mode) {
+ }
+
+ @Override
+ public boolean getInterruptMode() {
+ return false;
+ }
+ }
+
+ public static class ExtendedXCommand extends XCommand<Void> {
+ private boolean lockRequired = true;
+ public String lockKey;
+ public long wait;
+ long executed;
+
+ public ExtendedXCommand(String key, String type, int priority, int wait, String lockKey, boolean lockRequired) {
+ super(key, type, priority, false);
+ this.lockRequired = lockRequired;
+ this.lockKey = lockKey;
+ this.wait = wait;
+ }
+
+ public ExtendedXCommand(String key, String type, int priority, int wait, String lockKey) {
+ super(key, type, priority, false);
+ this.lockKey = lockKey;
+ this.wait = wait;
+ }
+
+ @Override
+ protected boolean isLockRequired() {
+ return this.lockRequired;
+ }
+
+ @Override
+ protected boolean isReQueueRequired() {
+ return false;
+ }
+
+ @Override
+ public String getEntityKey() {
+ return this.lockKey;
+ }
+
+ @Override
+ protected void eagerLoadState() {
+ }
+
+ @Override
+ protected void eagerVerifyPrecondition() throws CommandException {
+ }
+
+ @Override
+ protected void loadState() {
+ }
+
+ @Override
+ protected void verifyPrecondition() throws CommandException {
+ }
+
+ @Override
+ protected Void execute() throws CommandException {
+ if (executed == 0) {
+ try {
+ Thread.sleep(this.wait);
+ }
+ catch (InterruptedException exp) {
+ throw new CommandException(ErrorCode.ETEST);
+ }
+ executed = System.currentTimeMillis();
+ ;
+ }
+ return null;
+ }
}
public void testQueuing() throws Exception {
@@ -217,10 +302,15 @@ public class TestCallableQueueService ex
return "type";
}
- @Override
- public String getKey() {
- return "name" + "_" + UUID.randomUUID();
- }
+ @Override
+ public String getKey() {
+ return "name" + "_" + UUID.randomUUID();
+ }
+
+ @Override
+ public String getEntityKey() {
+ return null;
+ }
@Override
public long getCreatedTime() {
@@ -228,6 +318,15 @@ public class TestCallableQueueService ex
}
@Override
+ public void setInterruptMode(boolean mode) {
+ }
+
+ @Override
+ public boolean getInterruptMode() {
+ return false;
+ }
+
+ @Override
public Void call() throws Exception {
incr();
Thread.sleep(100);
@@ -599,4 +698,230 @@ public class TestCallableQueueService ex
services.destroy();
}
+ /**
+ * Testing the interrupts by introducing an interrupt command within a set
+ * of 10 commands and assuring it will be executed first
+ */
+ public void testInterrupt() throws Exception {
+ EXEC_ORDER = new AtomicLong();
+ setSystemProperty(CallableQueueService.CONF_THREADS, "1");
+ setSystemProperty(CallableQueueService.CONF_CALLABLE_INTERRUPT_TYPES, "testKill");
+ Services services = new Services();
+ services.init();
+
+ CallableQueueService queueservice = services.get(CallableQueueService.class);
+ final ExtendedXCommand initialCallable = new ExtendedXCommand("initialKey", "initialType", 2, 200,
+ "initialLockKey");
+ final List<ExtendedXCommand> callables = new ArrayList<ExtendedXCommand>();
+ for (int i = 0; i < 10; i++) {
+ callables.add(new ExtendedXCommand("key" + i, "type" + i, 1, 100, "lockKey"));
+ }
+
+ final ExtendedXCommand intCallable = new ExtendedXCommand("keyInt", "testKill", 0, 200, "lockKey");
+
+ queueservice.queue(initialCallable);
+ for (int i = 0; i < 10; i++) {
+ queueservice.queue(callables.get(i));
+ }
+
+ queueservice.queue(intCallable);
+
+ waitFor(3000, new Predicate() {
+ public boolean evaluate() throws Exception {
+ boolean retValue = initialCallable.executed != 0 && intCallable.executed != 0;
+ for (ExtendedXCommand c : callables) {
+ retValue = retValue && c.executed != 0;
+ }
+ return retValue;
+ }
+ });
+
+ assertTrue(initialCallable.executed > 0);
+ assertTrue(intCallable.executed > 0);
+ assertTrue(intCallable.executed < callables.get(5).executed);
+ services.destroy();
+ }
+
+ /*
+ * Introducing an interrupt with different keys and assure it will be
+ * executed in order regardless of the existence of an interrupt command in
+ * the mix.
+ */
+ public void testInterruptsWithDistinguishedLockKeys() throws Exception {
+ EXEC_ORDER = new AtomicLong();
+ setSystemProperty(CallableQueueService.CONF_THREADS, "1");
+ setSystemProperty(CallableQueueService.CONF_CALLABLE_INTERRUPT_TYPES, "testKill");
+ Services services = new Services();
+ services.init();
+
+ CallableQueueService queueservice = services.get(CallableQueueService.class);
+
+ final ExtendedXCommand initialCallable = new ExtendedXCommand("initialKey", "initialType", 2, 200,
+ "initialLockKey");
+ final List<ExtendedXCommand> callables = new ArrayList<ExtendedXCommand>();
+ for (int i = 0; i < 10; i++) {
+ callables.add(new ExtendedXCommand("key" + i, "type" + i, 1, 100, "lockKey" + i));
+ }
+
+ final ExtendedXCommand intCallable = new ExtendedXCommand("keyInt", "testKill", 0, 100, "lockKey");
+
+ queueservice.queue(initialCallable);
+ for (int i = 0; i < 10; i++) {
+ queueservice.queue(callables.get(i));
+ }
+
+ queueservice.queue(intCallable);
+
+ waitFor(6000, new Predicate() {
+ public boolean evaluate() throws Exception {
+ boolean retValue = initialCallable.executed != 0 && intCallable.executed != 0;
+ for (ExtendedXCommand c : callables) {
+ retValue = retValue && c.executed != 0;
+ }
+ return retValue;
+ }
+ });
+
+ assertTrue(initialCallable.executed > 0);
+ assertTrue(intCallable.executed > 0);
+ assertTrue(intCallable.executed > callables.get(5).executed);
+
+ services.destroy();
+ }
+
+ /*
+ * assuring an interrupt command will be executed before a composite
+ * callable with the same lock key
+ */
+ public void testInterruptsWithCompositeCallable() throws Exception {
+ EXEC_ORDER = new AtomicLong();
+ setSystemProperty(CallableQueueService.CONF_THREADS, "1");
+ setSystemProperty(CallableQueueService.CONF_CALLABLE_INTERRUPT_TYPES, "testKill");
+ Services services = new Services();
+ services.init();
+
+ CallableQueueService queueservice = services.get(CallableQueueService.class);
+ final ExtendedXCommand initialCallable = new ExtendedXCommand("initialKey", "initialType", 2, 200,
+ "initialLockKey");
+ final List<ExtendedXCommand> callables = new ArrayList<ExtendedXCommand>();
+
+ for (int i = 0; i < 10; i++) {
+ callables.add(new ExtendedXCommand("key" + i, "type" + i, 1, 100, "lockKey"));
+ }
+
+ final ExtendedXCommand intCallable = new ExtendedXCommand("key5", "testKill", 0, 200, "lockKey");
+
+ queueservice.queue(initialCallable);
+ queueservice.queueSerial((List<? extends XCallable<?>>) (callables), 0);
+ queueservice.queue(intCallable);
+
+ waitFor(3000, new Predicate() {
+ public boolean evaluate() throws Exception {
+ boolean retValue = initialCallable.executed != 0 && intCallable.executed != 0;
+ for (ExtendedXCommand c : callables) {
+ retValue = retValue && c.executed != 0;
+ }
+ return retValue;
+ }
+ });
+
+ assertTrue(initialCallable.executed > 0);
+ assertTrue(intCallable.executed > 0);
+ for (ExtendedXCommand c : callables) {
+ assertTrue(intCallable.executed < c.executed);
+ }
+
+ services.destroy();
+ }
+
+ /*
+ * Testing an interrupt commands inside a composite callable Assuring it is
+ * executed before the others
+ */
+ public void testInterruptsInCompositeCallable() throws Exception {
+ EXEC_ORDER = new AtomicLong();
+ setSystemProperty(CallableQueueService.CONF_THREADS, "1");
+ setSystemProperty(CallableQueueService.CONF_CALLABLE_INTERRUPT_TYPES, "testKill");
+ Services services = new Services();
+ services.init();
+
+ CallableQueueService queueservice = services.get(CallableQueueService.class);
+ final ExtendedXCommand initialCallable = new ExtendedXCommand("initialKey", "initialType", 2, 200,
+ "initialLockKey");
+ final List<ExtendedXCommand> callables = new ArrayList<ExtendedXCommand>();
+
+ for (int i = 0; i < 5; i++) {
+ callables.add(new ExtendedXCommand("key" + i, "type" + i, 1, 100, "lockKey"));
+ }
+ callables.add(new ExtendedXCommand("key" + 5, "testKill", 1, 100, "lockKey"));
+ for (int i = 6; i < 10; i++) {
+ callables.add(new ExtendedXCommand("key" + i, "type" + i, 1, 100, "lockKey"));
+ }
+
+ queueservice.queue(initialCallable);
+ queueservice.queueSerial((List<? extends XCallable<?>>) (callables), 0);
+
+ waitFor(3000, new Predicate() {
+ public boolean evaluate() throws Exception {
+ boolean retValue = initialCallable.executed != 0;
+ for (ExtendedXCommand c : callables) {
+ retValue = retValue && c.executed != 0;
+ }
+ return retValue;
+ }
+ });
+
+ assertTrue(initialCallable.executed > 0);
+ assertTrue(callables.get(1).executed > callables.get(5).executed);
+
+ services.destroy();
+ }
+
+ /*
+ * Assuring the interrupts will not be inserted in the map when it reached
+ * the max size
+ */
+ public void testMaxInterruptMapSize() throws Exception {
+ EXEC_ORDER = new AtomicLong();
+ setSystemProperty(CallableQueueService.CONF_THREADS, "1");
+ setSystemProperty(CallableQueueService.CONF_CALLABLE_INTERRUPT_TYPES, "testKill");
+ setSystemProperty(CallableQueueService.CONF_CALLABLE_INTERRUPT_MAP_MAX_SIZE, "0");
+ Services services = new Services();
+ services.init();
+
+ CallableQueueService queueservice = services.get(CallableQueueService.class);
+
+ final ExtendedXCommand initialCallable = new ExtendedXCommand("initialKey", "initialType", 2, 100,
+ "initialLockKey");
+ final List<ExtendedXCommand> callables = new ArrayList<ExtendedXCommand>();
+ for (int i = 0; i < 10; i++) {
+ callables.add(new ExtendedXCommand("key" + i, "type" + i, 1, 100, "lockKey"));
+ }
+
+ final ExtendedXCommand intCallable = new ExtendedXCommand("keyInt", "testKill", 0, 100, "lockKey");
+
+ queueservice.queue(initialCallable);
+ for (int i = 0; i < 10; i++) {
+ queueservice.queue(callables.get(i));
+ }
+
+ queueservice.queue(intCallable);
+
+ waitFor(5000, new Predicate() {
+ public boolean evaluate() throws Exception {
+ boolean retValue = initialCallable.executed != 0 && intCallable.executed != 0;
+ for (ExtendedXCommand c : callables) {
+ retValue = retValue && c.executed != 0;
+ }
+ return retValue;
+ }
+ });
+
+ assertTrue(initialCallable.executed > 0);
+ assertTrue(intCallable.executed > 0);
+ assertTrue(intCallable.executed > callables.get(5).executed);
+
+ services.destroy();
+ }
+
}
Modified: incubator/oozie/trunk/release-log.txt
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/release-log.txt?rev=1232709&r1=1232708&r2=1232709&view=diff
==============================================================================
--- incubator/oozie/trunk/release-log.txt (original)
+++ incubator/oozie/trunk/release-log.txt Wed Jan 18 01:09:12 2012
@@ -1,5 +1,6 @@
-- Oozie 3.2.0 release
+OOZIE-591: Oozie continues to materialize new actions after end date modification (Mohamed Battisha vis Angelo)
OOZIE-639 Hive sharelib POM must exclude hadoop-core. (tucu)
OOZIE-635 ShellMain closes the STD/ERR stream while shell is processing. (tucu)
OOZIE-629 Oozie server to prevent usage of dataset initial-instance earlier than the system-defined default value.(Mona via Mohammad)