You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by je...@apache.org on 2010/05/02 19:03:00 UTC
svn commit: r940263 [16/16] - in /ode/trunk: ./ axis2-war/
axis2-war/src/main/assembly/
axis2-war/src/main/webapp/WEB-INF/conf.hib-derby/
axis2-war/src/main/webapp/WEB-INF/conf.jpa-derby/
axis2-war/src/main/webapp/WEB-INF/conf/ axis2-war/src/test/java/...
Added: ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/DAOConnectionTest.java
URL: http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/DAOConnectionTest.java?rev=940263&view=auto
==============================================================================
--- ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/DAOConnectionTest.java (added)
+++ ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/DAOConnectionTest.java Sun May 2 17:02:51 2010
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ode.scheduler.simple;
+
+import java.util.List;
+
+import javax.xml.namespace.QName;
+
+import org.apache.ode.bpel.iapi.Scheduler;
+import org.apache.ode.bpel.iapi.Scheduler.JobType;
+import org.apache.ode.dao.scheduler.JobDAO;
+
+import org.apache.ode.dao.scheduler.SchedulerDAOConnection;
+
+/**
+ *
+ * Test of the JDBC delegate.
+ *
+ * @author Maciej Szefler ( m s z e f l e r @ g m a i l . c o m )
+ */
+public class DAOConnectionTest extends SchedulerTestBase {
+
+
+ public void testGetNodeIds() throws Exception {
+ SchedulerDAOConnection conn = _factory.getConnection();
+ try{
+ // should have no node ids in the db, empty list (not null)
+ _txm.begin();;
+ List<String> nids = conn.getNodeIds();
+ _txm.commit();
+ assertNotNull(nids);
+ assertEquals(0, nids.size());
+
+ // try for one nodeid
+ _txm.begin();;
+ conn.insertJob(conn.createJob(true, new Scheduler.JobDetails(), true, 0L),"abc", true);
+ _txm.commit();
+ _txm.begin();;
+ nids = conn.getNodeIds();
+ _txm.commit();
+ assertEquals(1, nids.size());
+ assertTrue(nids.contains("abc"));
+
+ // check that dups are ignored.
+ _txm.begin();;
+ conn.insertJob(conn.createJob(true, new Scheduler.JobDetails(), true, 0L),"abc", true);
+ _txm.commit();
+ _txm.begin();;
+ nids = conn.getNodeIds();
+ _txm.commit();
+ assertEquals(1, nids.size());
+ assertTrue(nids.contains("abc"));
+
+ // add another nodeid,
+ _txm.begin();;
+ conn.insertJob(conn.createJob(true, new Scheduler.JobDetails(), true, 0L),"123", true);
+ _txm.commit();
+
+ _txm.begin();;
+ nids = conn.getNodeIds();
+ _txm.commit();
+ assertEquals(2, nids.size());
+ assertTrue(nids.contains("abc"));
+ assertTrue(nids.contains("123"));
+ }finally{
+ conn.close();
+ }
+ }
+
+ public void testReassign() throws Exception {
+ SchedulerDAOConnection conn = _factory.getConnection();
+ try{
+ _txm.begin();;
+ conn.insertJob(conn.createJob(true, new Scheduler.JobDetails(), true, 100L),"n1", false);
+ conn.insertJob(conn.createJob(true, new Scheduler.JobDetails(), true, 200L),"n2", false);
+ _txm.commit();
+
+ _txm.begin();;
+ int num = conn.updateReassign("n1","n2");
+ _txm.commit();
+
+ assertEquals(1,num);
+
+ _txm.begin();;
+ List<JobDAO> jobs = conn.dequeueImmediate("n2", 400L, 1000);
+ _txm.commit();
+
+ assertEquals(2,jobs.size());
+ }finally{
+ conn.close();
+ }
+ }
+
+ public void testScheduleImmediateTimeFilter() throws Exception {
+ SchedulerDAOConnection conn = _factory.getConnection();
+ try{
+ _txm.begin();;
+ JobDAO job = conn.createJob(true, new Scheduler.JobDetails(), true, 100L);
+ String jobId1 = job.getJobId();
+ conn.insertJob(job,"n1", false);
+ job = conn.createJob(true, new Scheduler.JobDetails(), true, 200L);
+ String jobId2 = job.getJobId();
+ conn.insertJob(job,"n1", false);
+ _txm.commit();
+
+ _txm.begin();;
+ List<JobDAO> jobs = conn.dequeueImmediate("n1", 150L, 1000);
+ _txm.commit();
+
+ assertNotNull(jobs);
+ assertEquals(1, jobs.size());
+ assertEquals(jobId1,jobs.get(0).getJobId());
+
+ _txm.begin();;
+ jobs = conn.dequeueImmediate("n1", 250L, 1000);
+ _txm.commit();
+
+ assertNotNull(jobs);
+ assertEquals(1, jobs.size());
+ assertEquals(jobId2,jobs.get(0).getJobId());
+ }finally{
+ conn.close();
+ }
+ }
+
+ public void testScheduleImmediateMaxRows() throws Exception {
+ SchedulerDAOConnection conn = _factory.getConnection();
+ try{
+ _txm.begin();;
+ JobDAO job = conn.createJob(true, new Scheduler.JobDetails(), true, 100L);
+ String jobId1 = job.getJobId();
+ conn.insertJob(job,"n1", false);
+ job = conn.createJob(true, new Scheduler.JobDetails(), true, 200L);
+ String jobId2 = job.getJobId();
+ conn.insertJob(job,"n1", false);
+ _txm.commit();
+
+ _txm.begin();;
+ List<JobDAO> jobs = conn.dequeueImmediate("n1", 201L, 1);
+ _txm.commit();
+ assertNotNull(jobs);
+ assertEquals(1, jobs.size());
+ assertEquals(jobId1,jobs.get(0).getJobId());
+
+ _txm.begin();;
+ jobs = conn.dequeueImmediate("n1", 250L, 1000);
+ _txm.commit();
+ assertNotNull(jobs);
+ assertEquals(1, jobs.size());
+ assertEquals(jobId2,jobs.get(0).getJobId());
+ }finally{
+ conn.close();
+ }
+ }
+
+ public void testScheduleImmediateNodeFilter() throws Exception {
+ SchedulerDAOConnection conn = _factory.getConnection();
+ try{
+ _txm.begin();;
+ JobDAO job = conn.createJob(true, new Scheduler.JobDetails(), true, 100L);
+ String jobId1 = job.getJobId();
+ conn.insertJob(job,"n1", false);
+ job = conn.createJob(true, new Scheduler.JobDetails(), true, 200L);
+ String jobId2 = job.getJobId();
+ conn.insertJob(job,"n2", false);
+ _txm.commit();
+
+ _txm.begin();;
+ List<JobDAO> jobs = conn.dequeueImmediate("n2", 300L, 1000);
+ _txm.commit();
+
+ assertNotNull(jobs);
+ assertEquals(1, jobs.size());
+ assertEquals(jobId2,jobs.get(0).getJobId());
+ }finally{
+ conn.close();
+ }
+ }
+
+ public void testDeleteJob() throws Exception {
+
+ SchedulerDAOConnection conn = _factory.getConnection();
+ try{
+ _txm.begin();;
+ JobDAO job = conn.createJob(true, new Scheduler.JobDetails(), true, 100L);
+ String jobId1 = job.getJobId();
+ conn.insertJob(job,"n1", false);
+ job = conn.createJob(true, new Scheduler.JobDetails(), true, 200L);
+ String jobId2 = job.getJobId();
+ conn.insertJob(job,"n2", false);
+ _txm.commit();
+
+ // try deleting, wrong jobid -- del should fail
+ _txm.begin();;
+ assertFalse(conn.deleteJob("j1x", "n1"));
+ assertEquals(2,conn.getNodeIds().size());
+ _txm.commit();
+
+ // wrong nodeid
+ _txm.begin();;
+ assertFalse(conn.deleteJob(jobId1, "n1x"));
+ assertEquals(2,conn.getNodeIds().size());
+ _txm.commit();
+
+ // now do the correct job
+ _txm.begin();;
+ assertTrue(conn.deleteJob(jobId1, "n1"));
+ assertEquals(1,conn.getNodeIds().size());
+ _txm.commit();
+ }finally{
+ conn.close();
+ }
+ }
+
+ public void testUpgrade() throws Exception {
+
+ SchedulerDAOConnection conn = _factory.getConnection();
+ try{
+ _txm.begin();;
+ for (int i = 0; i < 200; ++i)
+ conn.insertJob(conn.createJob(true, new Scheduler.JobDetails(), true, i),null, false);
+ _txm.commit();
+
+ _txm.begin();;
+ int n1 = conn.updateAssignToNode("n1", 0, 3, 100);
+ int n2 = conn.updateAssignToNode("n2", 1, 3, 100);
+ int n3 = conn.updateAssignToNode("n3", 2, 3, 100);
+ _txm.commit();
+ // Make sure we got 100 upgraded nodes
+ assertEquals(100,n1+n2+n3);
+
+ // now do scheduling.
+ _txm.begin();;
+ assertEquals(n1,conn.dequeueImmediate("n1", 10000L, 1000).size());
+ assertEquals(n2,conn.dequeueImmediate("n2", 10000L, 1000).size());
+ assertEquals(n3,conn.dequeueImmediate("n3", 10000L, 1000).size());
+ _txm.commit();
+ }finally{
+ conn.close();
+ }
+ }
+
+ public void testMigration() throws Exception {
+ SchedulerDAOConnection conn = _factory.getConnection();
+ try{
+ Scheduler.JobDetails j1 = new Scheduler.JobDetails();
+ j1.getDetailsExt().put("type", "MATCHER");
+ j1.getDetailsExt().put("iid", 1234L);
+ j1.getDetailsExt().put("pid", new QName("http://test1", "test2").toString());
+ j1.getDetailsExt().put("inmem", true);
+ j1.getDetailsExt().put("ckey", "123~abcd");
+ j1.getDetailsExt().put("channel", "123");
+ j1.getDetailsExt().put("mexid", "mexid123");
+ j1.getDetailsExt().put("correlatorId", "cid123");
+ j1.getDetailsExt().put("retryCount", "15");
+
+ _txm.begin();;
+ conn.insertJob(conn.createJob(true, j1, true, 0L), null, false);
+ conn.updateAssignToNode("m", 0, 3, 100);
+ _txm.commit();
+
+ _txm.begin();;
+ Scheduler.JobDetails j2 = conn.dequeueImmediate("m", 10000L, 1000).get(0).getDetails();
+ _txm.commit();
+
+ assertEquals(j2.getType(), JobType.MATCHER);
+ assertEquals(j2.getInstanceId(), (Object) 1234L);
+ assertEquals(j2.getProcessId(), new QName("http://test1", "test2"));
+ assertEquals(j2.getInMem(), (Object) true);
+ assertEquals(j2.getCorrelationKey().toCanonicalString(), (Object) "123~abcd");
+ assertEquals(j2.getChannel(), (Object) "123");
+ assertEquals(j2.getMexId(), (Object) "mexid123");
+ assertEquals(j2.getCorrelatorId(), (Object) "cid123");
+ assertEquals(j2.getRetryCount(), (Object) 15);
+ }finally{
+ conn.close();
+ }
+ }
+}
Modified: ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/RetriesTest.java
URL: http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/RetriesTest.java?rev=940263&r1=940262&r2=940263&view=diff
==============================================================================
--- ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/RetriesTest.java (original)
+++ ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/RetriesTest.java Sun May 2 17:02:51 2010
@@ -3,29 +3,26 @@ package org.apache.ode.scheduler.simple;
import org.apache.ode.bpel.iapi.Scheduler;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.geronimo.transaction.manager.GeronimoTransactionManager;
-import javax.transaction.TransactionManager;
import java.util.*;
-import junit.framework.TestCase;
+import org.apache.ode.dao.scheduler.SchedulerDAOConnection;
/**
* @author Matthieu Riou <mr...@apache.org>
*/
-public class RetriesTest extends TestCase implements Scheduler.JobProcessor {
+public class RetriesTest extends SchedulerTestBase implements Scheduler.JobProcessor {
private static final Log __log = LogFactory.getLog(RetriesTest.class);
- DelegateSupport _ds;
+
SimpleScheduler _scheduler;
ArrayList<Scheduler.JobInfo> _jobs;
ArrayList<Scheduler.JobInfo> _commit;
- TransactionManager _txm;
+
int _tried = 0;
public void setUp() throws Exception {
- _txm = new GeronimoTransactionManager();
- _ds = new DelegateSupport();
+ super.setUp();
_scheduler = newScheduler("n1");
_jobs = new ArrayList<Scheduler.JobInfo>(100);
@@ -34,6 +31,7 @@ public class RetriesTest extends TestCas
public void tearDown() throws Exception {
_scheduler.shutdown();
+ super.tearDown();
}
public void testRetries() throws Exception {
@@ -41,11 +39,14 @@ public class RetriesTest extends TestCas
_scheduler.setNearFutureInterval(5000);
_scheduler.setImmediateInterval(1000);
_scheduler.start();
+
+ SchedulerDAOConnection conn = _factory.getConnection();
_txm.begin();
try {
_scheduler.schedulePersistedJob(newDetail("123"), new Date());
} finally {
_txm.commit();
+ conn.close();
}
Thread.sleep(10000);
@@ -55,7 +56,7 @@ public class RetriesTest extends TestCas
public void onScheduledJob(Scheduler.JobInfo jobInfo) throws Scheduler.JobProcessorException {
_tried++;
-
+ __log.debug("onScheduledJob " + jobInfo.jobName);
if (jobInfo.retryCount < 2) {
__log.debug("retrying " + _tried);
throw new Scheduler.JobProcessorException(true);
@@ -71,9 +72,8 @@ public class RetriesTest extends TestCas
}
private SimpleScheduler newScheduler(String nodeId) {
- SimpleScheduler scheduler = new SimpleScheduler(nodeId, _ds.delegate(), new Properties());
+ SimpleScheduler scheduler = new SimpleScheduler(nodeId, _factory, _txm, new Properties());
scheduler.setJobProcessor(this);
- scheduler.setTransactionManager(_txm);
return scheduler;
}
Added: ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SchedulerTestBase.java
URL: http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SchedulerTestBase.java?rev=940263&view=auto
==============================================================================
--- ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SchedulerTestBase.java (added)
+++ ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SchedulerTestBase.java Sun May 2 17:02:51 2010
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.scheduler.simple;
+
+import java.io.InputStream;
+import java.sql.Connection;
+
+import java.util.Properties;
+import javax.transaction.TransactionManager;
+import junit.framework.TestCase;
+import org.apache.ode.dao.scheduler.SchedulerDAOConnectionFactory;
+import org.apache.ode.il.EmbeddedGeronimoFactory;
+import org.apache.ode.il.config.OdeConfigProperties;
+import org.apache.ode.il.dbutil.Database;
+import org.apache.ode.il.txutil.TxManager;
+import org.apache.ode.scheduler.simple.jdbc.SchedulerDAOConnectionFactoryImpl;
+
+/**
+ * Support class for creating a JDBC delegate (using in-mem HSQL db).
+ *
+ * @author Maciej Szefler ( m s z e f l e r @ g m a i l . c o m )
+ */
+public class SchedulerTestBase extends TestCase {
+
+ protected Database _db;
+ protected SchedulerDAOConnectionFactory _factory;
+ protected TransactionManager _txm;
+
+ @Override
+ public void setUp() throws Exception {
+ Properties props = new Properties();
+ props.put(OdeConfigProperties.PROP_DAOCF_SCHEDULER, System.getProperty(OdeConfigProperties.PROP_DAOCF_SCHEDULER,OdeConfigProperties.DEFAULT_DAOCF_SCHEDULER_CLASS));
+ OdeConfigProperties odeProps = new OdeConfigProperties(props, "");
+ TxManager tx = new TxManager(odeProps);
+ _txm = tx.createTransactionManager();
+ _db = new Database(odeProps);
+ _db.setTransactionManager(_txm);
+ _db.start();
+ _factory = _db.createDaoSchedulerCF();
+
+ if (_factory instanceof SchedulerDAOConnectionFactoryImpl) {
+ Connection c = _db.getDataSource().getConnection();
+ try {
+ StringBuffer sql = new StringBuffer();
+
+ {
+ InputStream in = getClass().getResourceAsStream("/simplesched-h2.sql");
+ int v;
+ while ((v = in.read()) != -1) {
+ sql.append((char) v);
+ }
+ }
+
+ String[] cmds = sql.toString().split(";");
+ for (String cmd : cmds) {
+ c.createStatement().executeUpdate(cmd);
+ }
+ } finally {
+ c.close();
+ }
+ }
+
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ _factory.shutdown();
+ _db.shutdown();
+
+ }
+
+ public static long mod(long a, long b) {
+ return a % b;
+ }
+}
+
Modified: ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SchedulerThreadTest.java
URL: http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SchedulerThreadTest.java?rev=940263&r1=940262&r2=940263&view=diff
==============================================================================
--- ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SchedulerThreadTest.java (original)
+++ ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SchedulerThreadTest.java Sun May 2 17:02:51 2010
@@ -24,14 +24,9 @@ import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.ode.scheduler.simple.SchedulerThread;
-import org.apache.ode.scheduler.simple.Task;
-import org.apache.ode.scheduler.simple.TaskRunner;
-
-
import junit.framework.TestCase;
+
/**
* Test of SchedulerThread.
*
@@ -44,6 +39,7 @@ public class SchedulerThreadTest extends
List<TR> _tasks = new ArrayList<TR>(100);
+ @Override
public void setUp() throws Exception {
_st = new SchedulerThread(this);
}
Modified: ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java
URL: http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java?rev=940263&r1=940262&r2=940263&view=diff
==============================================================================
--- ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java (original)
+++ ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java Sun May 2 17:02:51 2010
@@ -16,207 +16,198 @@
* specific language governing permissions and limitations
* under the License.
*/
-
package org.apache.ode.scheduler.simple;
import java.util.*;
-import javax.transaction.TransactionManager;
-
-import junit.framework.TestCase;
-
-import org.apache.geronimo.transaction.manager.GeronimoTransactionManager;
import org.apache.ode.bpel.iapi.Scheduler;
import org.apache.ode.bpel.iapi.Scheduler.JobInfo;
import org.apache.ode.bpel.iapi.Scheduler.JobProcessor;
import org.apache.ode.bpel.iapi.Scheduler.JobProcessorException;
-public class SimpleSchedulerTest extends TestCase implements JobProcessor {
+public class SimpleSchedulerTest extends SchedulerTestBase implements JobProcessor {
- DelegateSupport _ds;
- SimpleScheduler _scheduler;
- ArrayList<JobInfo> _jobs;
- TransactionManager _txm;
-
-
- public void setUp() throws Exception {
- _txm = new GeronimoTransactionManager();
- _ds = new DelegateSupport();
-
- _scheduler = newScheduler("n1");
- _jobs = new ArrayList<JobInfo>(100);
- }
-
- public void tearDown() throws Exception {
- _scheduler.shutdown();
- }
-
- public void testConcurrentExec() throws Exception {
- _scheduler.start();
- _txm.begin();
- String jobId;
- try {
- jobId = _scheduler.schedulePersistedJob(newDetail("123"), new Date(System.currentTimeMillis() + 100));
- Thread.sleep(200);
- // Make sure we don't schedule until commit.
- assertEquals(0, _jobs.size());
- } finally {
- _txm.commit();
- }
- // Wait for the job to be execed.
- Thread.sleep(100);
- // Should execute job,
- assertEquals(1, _jobs.size());
-
- }
-
- public void testImmediateScheduling() throws Exception {
- _scheduler.start();
- _txm.begin();
- try {
- _scheduler.schedulePersistedJob(newDetail("123"), new Date());
- Thread.sleep(100);
- // Make sure we don't schedule until commit.
- assertEquals(0, _jobs.size());
- } finally {
- _txm.commit();
- }
- Thread.sleep(100);
- assertEquals(1, _jobs.size());
- }
-
- public void testStartStop() throws Exception {
- _scheduler.start();
- _txm.begin();
- try {
- for (int i = 0; i < 10; ++i)
- _scheduler.schedulePersistedJob(newDetail("123"), new Date(System.currentTimeMillis() + (i * 100)));
- } finally {
- _txm.commit();
- }
- Thread.sleep(100);
- _scheduler.stop();
- int jobs = _jobs.size();
- assertTrue(jobs > 0);
- assertTrue(jobs < 10);
- Thread.sleep(200);
- assertEquals(jobs, _jobs.size());
- _scheduler.start();
- Thread.sleep(1000);
- assertEquals(10, _jobs.size());
- }
-
- public void testNearFutureScheduling() throws Exception {
- // speed things up a bit to hit the right code paths
- _scheduler.setNearFutureInterval(1000);
- _scheduler.setImmediateInterval(500);
- _scheduler.start();
-
- _txm.begin();
- try {
- _scheduler.schedulePersistedJob(newDetail("123"), new Date(System.currentTimeMillis() + 750));
- } finally {
- _txm.commit();
- }
-
- Thread.sleep(850);
- assertEquals(1, _jobs.size());
- }
-
- public void testFarFutureScheduling() throws Exception {
- // speed things up a bit to hit the right code paths
- _scheduler.setNearFutureInterval(700);
- _scheduler.setImmediateInterval(300);
- _scheduler.start();
-
- _txm.begin();
- try {
- _scheduler.schedulePersistedJob(newDetail("123"), new Date(System.currentTimeMillis() + 750));
- } finally {
- _txm.commit();
- }
-
- Thread.sleep(850);
- assertEquals(1, _jobs.size());
- }
-
- public void testRecovery() throws Exception {
- // speed things up a bit to hit the right code paths
- _scheduler.setNearFutureInterval(200);
- _scheduler.setImmediateInterval(100);
- _scheduler.setStaleInterval(50);
-
- _txm.begin();
- try {
- _scheduler.schedulePersistedJob(newDetail("immediate"), new Date(System.currentTimeMillis()));
- _scheduler.schedulePersistedJob(newDetail("near"), new Date(System.currentTimeMillis() + 110));
- _scheduler.schedulePersistedJob(newDetail("far"), new Date(System.currentTimeMillis() + 250));
- } finally {
- _txm.commit();
- }
-
- _scheduler = newScheduler("n3");
- _scheduler.setNearFutureInterval(200);
- _scheduler.setImmediateInterval(100);
- _scheduler.setStaleInterval(50);
- _scheduler.start();
- Thread.sleep(400);
- assertEquals(3, _jobs.size());
- }
-
- public void testRecoverySuppressed() throws Exception {
- // speed things up a bit to hit the right code paths
- _scheduler.setNearFutureInterval(200);
- _scheduler.setImmediateInterval(100);
- _scheduler.setStaleInterval(50);
-
- // schedule some jobs ...
- _txm.begin();
- try {
- _scheduler.schedulePersistedJob(newDetail("immediate"), new Date(System.currentTimeMillis()));
- _scheduler.schedulePersistedJob(newDetail("near"), new Date(System.currentTimeMillis() + 150));
- _scheduler.schedulePersistedJob(newDetail("far"), new Date(System.currentTimeMillis() + 250));
- } finally {
- _txm.commit();
- }
-
- // but don't start the scheduler....
-
- // create a second node for the scheduler.
- SimpleScheduler scheduler = newScheduler("n3");
- scheduler.setNearFutureInterval(200);
- scheduler.setImmediateInterval(100);
- scheduler.setStaleInterval(50);
- scheduler.start();
- for (int i = 0; i < 40; ++i) {
- scheduler.updateHeartBeat("n1");
- Thread.sleep(10);
- }
-
- scheduler.stop();
-
- assertTrue(_jobs.size() <= 1);
- if (_jobs.size() == 1)
- assertEquals("far", _jobs.get(0).jobDetail.getDetailsExt().get("foo"));
- }
-
- public void onScheduledJob(final JobInfo jobInfo) throws JobProcessorException {
- synchronized (_jobs) {
- _jobs.add(jobInfo);
- }
- }
-
- Scheduler.JobDetails newDetail(String x) {
- Scheduler.JobDetails jd = new Scheduler.JobDetails();
- jd.getDetailsExt().put("foo", x);
- return jd;
- }
-
- private SimpleScheduler newScheduler(String nodeId) {
- SimpleScheduler scheduler = new SimpleScheduler(nodeId, _ds.delegate(), new Properties());
- scheduler.setJobProcessor(this);
- scheduler.setTransactionManager(_txm);
- return scheduler;
- }
-
+ SimpleScheduler _scheduler;
+ ArrayList<JobInfo> _jobs;
+
+ public void setUp() throws Exception {
+ super.setUp();
+ _scheduler = newScheduler("n1");
+ _jobs = new ArrayList<JobInfo>(100);
+ }
+
+ public void tearDown() throws Exception {
+ _scheduler.shutdown();
+ super.tearDown();
+ }
+
+ public void testConcurrentExec() throws Exception {
+ _scheduler.start();
+ _txm.begin();
+ String jobId;
+ try {
+ jobId = _scheduler.schedulePersistedJob(newDetail("123"), new Date(System.currentTimeMillis() + 100));
+ Thread.sleep(200);
+ // Make sure we don't schedule until commit.
+ assertEquals(0, _jobs.size());
+ } finally {
+ _txm.commit();
+ }
+ // Wait for the job to be execed.
+ Thread.sleep(100);
+ // Should execute job,
+ assertEquals(1, _jobs.size());
+
+ }
+
+ public void testImmediateScheduling() throws Exception {
+ _scheduler.start();
+ _txm.begin();
+ try {
+ _scheduler.schedulePersistedJob(newDetail("123"), new Date());
+ Thread.sleep(100);
+ // Make sure we don't schedule until commit.
+ assertEquals(0, _jobs.size());
+ } finally {
+ _txm.commit();
+ }
+ Thread.sleep(100);
+ assertEquals(1, _jobs.size());
+ }
+
+ public void testStartStop() throws Exception {
+ _scheduler.start();
+ _txm.begin();
+ try {
+ for (int i = 0; i < 10; ++i) {
+ _scheduler.schedulePersistedJob(newDetail("123"), new Date(System.currentTimeMillis() + (i * 100)));
+ }
+ } finally {
+ _txm.commit();
+ }
+ Thread.sleep(100);
+ _scheduler.stop();
+ int jobs = _jobs.size();
+ assertTrue(jobs > 0);
+ assertTrue(jobs < 10);
+ Thread.sleep(200);
+ assertEquals(jobs, _jobs.size());
+ _scheduler.start();
+ Thread.sleep(1000);
+ assertEquals(10, _jobs.size());
+ }
+
+ public void testNearFutureScheduling() throws Exception {
+ // speed things up a bit to hit the right code paths
+ _scheduler.setNearFutureInterval(1000);
+ _scheduler.setImmediateInterval(500);
+ _scheduler.start();
+
+ _txm.begin();
+ try {
+ _scheduler.schedulePersistedJob(newDetail("123"), new Date(System.currentTimeMillis() + 750));
+ } finally {
+ _txm.commit();
+ }
+
+ Thread.sleep(850);
+ assertEquals(1, _jobs.size());
+ }
+
+ public void testFarFutureScheduling() throws Exception {
+ // speed things up a bit to hit the right code paths
+ _scheduler.setNearFutureInterval(700);
+ _scheduler.setImmediateInterval(300);
+ _scheduler.start();
+
+ _txm.begin();
+ try {
+ _scheduler.schedulePersistedJob(newDetail("123"), new Date(System.currentTimeMillis() + 750));
+ } finally {
+ _txm.commit();
+ }
+
+ Thread.sleep(850);
+ assertEquals(1, _jobs.size());
+ }
+
+ public void testRecovery() throws Exception {
+ // speed things up a bit to hit the right code paths
+ _scheduler.setNearFutureInterval(200);
+ _scheduler.setImmediateInterval(100);
+ _scheduler.setStaleInterval(50);
+
+ _txm.begin();
+ try {
+ _scheduler.schedulePersistedJob(newDetail("immediate"), new Date(System.currentTimeMillis()));
+ _scheduler.schedulePersistedJob(newDetail("near"), new Date(System.currentTimeMillis() + 110));
+ _scheduler.schedulePersistedJob(newDetail("far"), new Date(System.currentTimeMillis() + 250));
+ } finally {
+ _txm.commit();
+ }
+
+ _scheduler = newScheduler("n3");
+ _scheduler.setNearFutureInterval(200);
+ _scheduler.setImmediateInterval(100);
+ _scheduler.setStaleInterval(50);
+ _scheduler.start();
+ Thread.sleep(400);
+ assertEquals(3, _jobs.size());
+ }
+
+ public void testRecoverySuppressed() throws Exception {
+ // speed things up a bit to hit the right code paths
+ _scheduler.setNearFutureInterval(200);
+ _scheduler.setImmediateInterval(100);
+ _scheduler.setStaleInterval(50);
+
+ // schedule some jobs ...
+ _txm.begin();
+ try {
+ _scheduler.schedulePersistedJob(newDetail("immediate"), new Date(System.currentTimeMillis()));
+ _scheduler.schedulePersistedJob(newDetail("near"), new Date(System.currentTimeMillis() + 150));
+ _scheduler.schedulePersistedJob(newDetail("far"), new Date(System.currentTimeMillis() + 250));
+ } finally {
+ _txm.commit();
+ }
+
+ // but don't start the scheduler....
+
+ // create a second node for the scheduler.
+ SimpleScheduler scheduler = newScheduler("n3");
+ scheduler.setNearFutureInterval(200);
+ scheduler.setImmediateInterval(100);
+ scheduler.setStaleInterval(50);
+ scheduler.start();
+ for (int i = 0; i < 40; ++i) {
+ scheduler.updateHeartBeat("n1");
+ Thread.sleep(10);
+ }
+
+ scheduler.stop();
+
+ assertTrue(_jobs.size() <= 1);
+ if (_jobs.size() == 1) {
+ assertEquals("far", _jobs.get(0).jobDetail.getDetailsExt().get("foo"));
+ }
+ }
+
+ public void onScheduledJob(final JobInfo jobInfo) throws JobProcessorException {
+ synchronized (_jobs) {
+ _jobs.add(jobInfo);
+ }
+ }
+
+ Scheduler.JobDetails newDetail(String x) {
+ Scheduler.JobDetails jd = new Scheduler.JobDetails();
+ jd.getDetailsExt().put("foo", x);
+ return jd;
+ }
+
+ private SimpleScheduler newScheduler(String nodeId) {
+ SimpleScheduler scheduler = new SimpleScheduler(nodeId, _factory, _txm, new Properties());
+ scheduler.setJobProcessor(this);
+ return scheduler;
+ }
}
+
Modified: ode/trunk/scheduler-simple/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/test/resources/log4j.properties?rev=940263&r1=940262&r2=940263&view=diff
==============================================================================
--- ode/trunk/scheduler-simple/src/test/resources/log4j.properties (original)
+++ ode/trunk/scheduler-simple/src/test/resources/log4j.properties Sun May 2 17:02:51 2010
@@ -16,9 +16,10 @@
#
# Set root logger level to WARN and its only appender to CONSOLE
-log4j.rootLogger=WARN, CONSOLE
+log4j.rootLogger=WARN, CONSOLE, FILE
# log4j properties to work with commandline tools.
+log4j.category.org.apache.ode=ERROR
log4j.category.org.apache.ode.scheduler.simple.RetriesTest=INFO
log4j.category.org.apache.ode.bpel.engine=INFO
@@ -26,3 +27,10 @@ log4j.category.org.apache.ode.bpel.engin
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=%p - %C{1}.%M(%L) | %m%n
+
+log4j.appender.FILE=org.apache.log4j.FileAppender
+log4j.appender.FILE.File=target/scheduler-test.log
+log4j.appender.FILE.layout=org.apache.log4j.PatternLayout
+log4j.appender.FILE.layout.ConversionPattern=%d{MM-dd@HH:mm:ss} %-5p (%13F:%L) %3x - %m%n
+log4j.appender.FILE.append=false
+
Added: ode/trunk/scheduler-simple/src/test/resources/simplesched-h2.sql
URL: http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/test/resources/simplesched-h2.sql?rev=940263&view=auto
==============================================================================
--- ode/trunk/scheduler-simple/src/test/resources/simplesched-h2.sql (added)
+++ ode/trunk/scheduler-simple/src/test/resources/simplesched-h2.sql Sun May 2 17:02:51 2010
@@ -0,0 +1,22 @@
+CREATE TABLE ode_job (
+ jobid varchar(64) NOT NULL DEFAULT '',
+ ts BIGINT NOT NULL DEFAULT 0,
+ nodeid varchar(64),
+ scheduled int NOT NULL DEFAULT 0,
+ transacted int NOT NULL DEFAULT 0,
+
+ instanceId BIGINT,
+ mexId varchar(255),
+ processId varchar(255),
+ type varchar(255),
+ channel varchar(255),
+ correlatorId varchar(255),
+ correlationKey varchar(255),
+ retryCount int,
+ inMem int,
+ detailsExt blob(4096),
+
+ PRIMARY KEY(jobid));
+
+CREATE INDEX IDX_ODE_JOB_TS ON ode_job(ts);
+CREATE INDEX IDX_ODE_JOB_NODEID ON ode_job(nodeid);
\ No newline at end of file