You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by je...@apache.org on 2010/05/02 19:03:00 UTC

svn commit: r940263 [16/16] - in /ode/trunk: ./ axis2-war/ axis2-war/src/main/assembly/ axis2-war/src/main/webapp/WEB-INF/conf.hib-derby/ axis2-war/src/main/webapp/WEB-INF/conf.jpa-derby/ axis2-war/src/main/webapp/WEB-INF/conf/ axis2-war/src/test/java/...

Added: ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/DAOConnectionTest.java
URL: http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/DAOConnectionTest.java?rev=940263&view=auto
==============================================================================
--- ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/DAOConnectionTest.java (added)
+++ ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/DAOConnectionTest.java Sun May  2 17:02:51 2010
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ode.scheduler.simple;
+
+import java.util.List;
+
+import javax.xml.namespace.QName;
+
+import org.apache.ode.bpel.iapi.Scheduler;
+import org.apache.ode.bpel.iapi.Scheduler.JobType;
+import org.apache.ode.dao.scheduler.JobDAO;
+
+import org.apache.ode.dao.scheduler.SchedulerDAOConnection;
+
+/**
+ * 
+ * Test of the JDBC delegate. 
+ * 
+ * @author Maciej Szefler ( m s z e f l e r  @ g m a i l . c o m )
+ */
+public class DAOConnectionTest extends SchedulerTestBase {
+
+    
+    public void testGetNodeIds() throws Exception {
+        SchedulerDAOConnection conn = _factory.getConnection();
+        try{          
+          // should have no node ids in the db, empty list (not null)
+          _txm.begin();;
+          List<String> nids = conn.getNodeIds();
+          _txm.commit();
+          assertNotNull(nids);
+          assertEquals(0, nids.size());
+
+          // try for one nodeid
+          _txm.begin();;
+          conn.insertJob(conn.createJob(true, new Scheduler.JobDetails(), true, 0L),"abc", true);
+          _txm.commit();
+          _txm.begin();;
+          nids = conn.getNodeIds();
+          _txm.commit();
+          assertEquals(1, nids.size());
+          assertTrue(nids.contains("abc"));
+
+          // check that dups are ignored.
+          _txm.begin();;
+          conn.insertJob(conn.createJob(true, new Scheduler.JobDetails(), true, 0L),"abc", true);
+          _txm.commit();
+          _txm.begin();;
+          nids = conn.getNodeIds();
+          _txm.commit();
+          assertEquals(1, nids.size());
+          assertTrue(nids.contains("abc"));
+
+          // add another nodeid,
+          _txm.begin();;
+          conn.insertJob(conn.createJob(true, new Scheduler.JobDetails(), true, 0L),"123", true);
+          _txm.commit();
+
+          _txm.begin();;
+          nids = conn.getNodeIds();
+          _txm.commit();
+          assertEquals(2, nids.size());
+          assertTrue(nids.contains("abc"));
+          assertTrue(nids.contains("123"));
+        }finally{
+          conn.close();
+        }
+    }
+
+    public void testReassign() throws Exception {
+       SchedulerDAOConnection conn = _factory.getConnection();
+        try{
+          _txm.begin();;
+          conn.insertJob(conn.createJob(true, new Scheduler.JobDetails(), true, 100L),"n1", false);
+          conn.insertJob(conn.createJob(true, new Scheduler.JobDetails(), true, 200L),"n2", false);
+          _txm.commit();
+
+          _txm.begin();;
+          int num = conn.updateReassign("n1","n2");
+          _txm.commit();
+
+          assertEquals(1,num);
+
+          _txm.begin();;
+          List<JobDAO> jobs = conn.dequeueImmediate("n2", 400L, 1000);
+          _txm.commit();
+           
+          assertEquals(2,jobs.size());
+        }finally{
+          conn.close();
+        }
+    }
+
+    public void testScheduleImmediateTimeFilter() throws Exception {
+       SchedulerDAOConnection conn = _factory.getConnection();
+        try{
+          _txm.begin();;
+          JobDAO job = conn.createJob(true, new Scheduler.JobDetails(), true, 100L);
+          String jobId1 = job.getJobId();
+          conn.insertJob(job,"n1", false);
+          job = conn.createJob(true, new Scheduler.JobDetails(), true, 200L);
+          String jobId2 = job.getJobId();
+          conn.insertJob(job,"n1", false);
+          _txm.commit();
+
+          _txm.begin();;
+          List<JobDAO> jobs = conn.dequeueImmediate("n1", 150L, 1000);
+          _txm.commit();
+
+          assertNotNull(jobs);
+          assertEquals(1, jobs.size());
+          assertEquals(jobId1,jobs.get(0).getJobId());
+
+          _txm.begin();;
+          jobs = conn.dequeueImmediate("n1", 250L, 1000);
+          _txm.commit();
+
+          assertNotNull(jobs);
+          assertEquals(1, jobs.size());
+          assertEquals(jobId2,jobs.get(0).getJobId());
+        }finally{
+          conn.close();
+        }
+    }
+    
+    public void testScheduleImmediateMaxRows() throws Exception {
+        SchedulerDAOConnection conn = _factory.getConnection();
+        try{
+          _txm.begin();;
+          JobDAO job = conn.createJob(true, new Scheduler.JobDetails(), true, 100L);
+          String jobId1 = job.getJobId();
+          conn.insertJob(job,"n1", false);
+          job = conn.createJob(true, new Scheduler.JobDetails(), true, 200L);
+          String jobId2 = job.getJobId();
+          conn.insertJob(job,"n1", false);
+          _txm.commit();
+
+          _txm.begin();;
+          List<JobDAO> jobs = conn.dequeueImmediate("n1", 201L, 1);
+          _txm.commit();
+          assertNotNull(jobs);
+          assertEquals(1, jobs.size());
+          assertEquals(jobId1,jobs.get(0).getJobId());
+
+          _txm.begin();;
+          jobs = conn.dequeueImmediate("n1", 250L, 1000);
+          _txm.commit();
+          assertNotNull(jobs);
+          assertEquals(1, jobs.size());
+          assertEquals(jobId2,jobs.get(0).getJobId());
+        }finally{
+          conn.close();
+        }
+    }
+
+    public void testScheduleImmediateNodeFilter() throws Exception {
+        SchedulerDAOConnection conn = _factory.getConnection();
+        try{
+          _txm.begin();;
+          JobDAO job = conn.createJob(true, new Scheduler.JobDetails(), true, 100L);
+          String jobId1 = job.getJobId();
+          conn.insertJob(job,"n1", false);
+          job = conn.createJob(true, new Scheduler.JobDetails(), true, 200L);
+          String jobId2 = job.getJobId();
+          conn.insertJob(job,"n2", false);
+          _txm.commit();
+
+          _txm.begin();;
+          List<JobDAO> jobs = conn.dequeueImmediate("n2", 300L, 1000);
+          _txm.commit();
+          
+          assertNotNull(jobs);
+          assertEquals(1, jobs.size());
+          assertEquals(jobId2,jobs.get(0).getJobId());
+        }finally{
+          conn.close();
+        }
+    }
+
+    public void testDeleteJob() throws Exception {
+
+        SchedulerDAOConnection conn = _factory.getConnection();
+        try{
+          _txm.begin();;
+          JobDAO job = conn.createJob(true, new Scheduler.JobDetails(), true, 100L);
+          String jobId1 = job.getJobId();
+          conn.insertJob(job,"n1", false);
+          job = conn.createJob(true, new Scheduler.JobDetails(), true, 200L);
+          String jobId2 = job.getJobId();
+          conn.insertJob(job,"n2", false);
+          _txm.commit();
+
+          // try deleting, wrong jobid -- del should fail
+          _txm.begin();;
+          assertFalse(conn.deleteJob("j1x", "n1"));
+          assertEquals(2,conn.getNodeIds().size());
+          _txm.commit();
+
+          // wrong nodeid
+          _txm.begin();;
+          assertFalse(conn.deleteJob(jobId1, "n1x"));
+          assertEquals(2,conn.getNodeIds().size());
+          _txm.commit();
+
+          // now do the correct job
+          _txm.begin();;
+          assertTrue(conn.deleteJob(jobId1, "n1"));
+          assertEquals(1,conn.getNodeIds().size());
+          _txm.commit();
+        }finally{
+          conn.close();
+        }
+    }
+    
+    public void testUpgrade() throws Exception {
+
+        SchedulerDAOConnection conn = _factory.getConnection();
+        try{
+          _txm.begin();;
+          for (int i = 0; i < 200; ++i)
+            conn.insertJob(conn.createJob(true, new Scheduler.JobDetails(), true, i),null, false);
+          _txm.commit();
+
+          _txm.begin();;
+          int n1 = conn.updateAssignToNode("n1", 0, 3, 100);
+          int n2 = conn.updateAssignToNode("n2", 1, 3, 100);
+          int n3 = conn.updateAssignToNode("n3", 2, 3, 100);
+          _txm.commit();
+          // Make sure we got 100 upgraded nodes
+          assertEquals(100,n1+n2+n3);
+
+          // now do scheduling.
+          _txm.begin();;
+          assertEquals(n1,conn.dequeueImmediate("n1", 10000L, 1000).size());
+          assertEquals(n2,conn.dequeueImmediate("n2", 10000L, 1000).size());
+          assertEquals(n3,conn.dequeueImmediate("n3", 10000L, 1000).size());
+          _txm.commit();
+        }finally{
+          conn.close();
+        }
+    }
+    
+    public void testMigration() throws Exception {
+        SchedulerDAOConnection conn = _factory.getConnection();
+        try{
+          Scheduler.JobDetails j1 = new Scheduler.JobDetails();
+          j1.getDetailsExt().put("type", "MATCHER");
+          j1.getDetailsExt().put("iid", 1234L);
+          j1.getDetailsExt().put("pid", new QName("http://test1", "test2").toString());
+          j1.getDetailsExt().put("inmem", true);
+          j1.getDetailsExt().put("ckey", "123~abcd");
+          j1.getDetailsExt().put("channel", "123");
+          j1.getDetailsExt().put("mexid", "mexid123");
+          j1.getDetailsExt().put("correlatorId", "cid123");
+          j1.getDetailsExt().put("retryCount", "15");
+
+          _txm.begin();;
+          conn.insertJob(conn.createJob(true, j1, true, 0L), null, false);
+          conn.updateAssignToNode("m", 0, 3, 100);
+          _txm.commit();
+
+          _txm.begin();;
+          Scheduler.JobDetails j2 = conn.dequeueImmediate("m", 10000L, 1000).get(0).getDetails();
+          _txm.commit();
+
+          assertEquals(j2.getType(), JobType.MATCHER);
+          assertEquals(j2.getInstanceId(), (Object) 1234L);
+          assertEquals(j2.getProcessId(), new QName("http://test1", "test2"));
+          assertEquals(j2.getInMem(), (Object) true);
+          assertEquals(j2.getCorrelationKey().toCanonicalString(), (Object) "123~abcd");
+          assertEquals(j2.getChannel(), (Object) "123");
+          assertEquals(j2.getMexId(), (Object) "mexid123");
+          assertEquals(j2.getCorrelatorId(), (Object) "cid123");
+          assertEquals(j2.getRetryCount(), (Object) 15);
+        }finally{
+          conn.close();
+        }
+    }
+}

Modified: ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/RetriesTest.java
URL: http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/RetriesTest.java?rev=940263&r1=940262&r2=940263&view=diff
==============================================================================
--- ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/RetriesTest.java (original)
+++ ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/RetriesTest.java Sun May  2 17:02:51 2010
@@ -3,29 +3,26 @@ package org.apache.ode.scheduler.simple;
 import org.apache.ode.bpel.iapi.Scheduler;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.geronimo.transaction.manager.GeronimoTransactionManager;
 
-import javax.transaction.TransactionManager;
 import java.util.*;
 
-import junit.framework.TestCase;
+import org.apache.ode.dao.scheduler.SchedulerDAOConnection;
 
 /**
  * @author Matthieu Riou <mr...@apache.org>
  */
-public class RetriesTest extends TestCase implements Scheduler.JobProcessor {
+public class RetriesTest extends SchedulerTestBase implements Scheduler.JobProcessor {
     private static final Log __log = LogFactory.getLog(RetriesTest.class);
     
-    DelegateSupport _ds;
+
     SimpleScheduler _scheduler;
     ArrayList<Scheduler.JobInfo> _jobs;
     ArrayList<Scheduler.JobInfo> _commit;
-    TransactionManager _txm;
+    
     int _tried = 0;
 
     public void setUp() throws Exception {
-        _txm = new GeronimoTransactionManager();
-        _ds = new DelegateSupport();
+        super.setUp();
 
         _scheduler = newScheduler("n1");
         _jobs = new ArrayList<Scheduler.JobInfo>(100);
@@ -34,6 +31,7 @@ public class RetriesTest extends TestCas
 
     public void tearDown() throws Exception {
         _scheduler.shutdown();
+        super.tearDown();
     }
     
     public void testRetries() throws Exception {
@@ -41,11 +39,14 @@ public class RetriesTest extends TestCas
         _scheduler.setNearFutureInterval(5000);
         _scheduler.setImmediateInterval(1000);
         _scheduler.start();
+
+        SchedulerDAOConnection conn = _factory.getConnection();
         _txm.begin();
         try {
             _scheduler.schedulePersistedJob(newDetail("123"), new Date());
         } finally {
             _txm.commit();
+            conn.close();
         }
 
         Thread.sleep(10000);
@@ -55,7 +56,7 @@ public class RetriesTest extends TestCas
 
     public void onScheduledJob(Scheduler.JobInfo jobInfo) throws Scheduler.JobProcessorException {
         _tried++;
-        
+         __log.debug("onScheduledJob " + jobInfo.jobName);
         if (jobInfo.retryCount < 2) {
             __log.debug("retrying " + _tried);
             throw new Scheduler.JobProcessorException(true);
@@ -71,9 +72,8 @@ public class RetriesTest extends TestCas
     }
 
     private SimpleScheduler newScheduler(String nodeId) {
-        SimpleScheduler scheduler = new SimpleScheduler(nodeId, _ds.delegate(), new Properties());
+        SimpleScheduler scheduler = new SimpleScheduler(nodeId, _factory, _txm, new Properties());
         scheduler.setJobProcessor(this);
-        scheduler.setTransactionManager(_txm);
         return scheduler;
     }
 

Added: ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SchedulerTestBase.java
URL: http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SchedulerTestBase.java?rev=940263&view=auto
==============================================================================
--- ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SchedulerTestBase.java (added)
+++ ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SchedulerTestBase.java Sun May  2 17:02:51 2010
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.scheduler.simple;
+
+import java.io.InputStream;
+import java.sql.Connection;
+
+import java.util.Properties;
+import javax.transaction.TransactionManager;
+import junit.framework.TestCase;
+import org.apache.ode.dao.scheduler.SchedulerDAOConnectionFactory;
+import org.apache.ode.il.EmbeddedGeronimoFactory;
+import org.apache.ode.il.config.OdeConfigProperties;
+import org.apache.ode.il.dbutil.Database;
+import org.apache.ode.il.txutil.TxManager;
+import org.apache.ode.scheduler.simple.jdbc.SchedulerDAOConnectionFactoryImpl;
+
+/**
+ * Support class for creating a JDBC delegate (using in-mem HSQL db).
+ * 
+ * @author Maciej Szefler ( m s z e f l e r @ g m a i l . c o m )
+ */
+public class SchedulerTestBase extends TestCase {
+
+  protected Database _db;
+  protected SchedulerDAOConnectionFactory _factory;
+  protected TransactionManager _txm;
+
+  @Override
+  public void setUp() throws Exception {
+    Properties props = new Properties();
+    props.put(OdeConfigProperties.PROP_DAOCF_SCHEDULER, System.getProperty(OdeConfigProperties.PROP_DAOCF_SCHEDULER,OdeConfigProperties.DEFAULT_DAOCF_SCHEDULER_CLASS));
+    OdeConfigProperties odeProps = new OdeConfigProperties(props, "");
+    TxManager tx = new TxManager(odeProps);
+    _txm = tx.createTransactionManager();
+    _db = new Database(odeProps);
+    _db.setTransactionManager(_txm);
+    _db.start();
+    _factory = _db.createDaoSchedulerCF();
+
+    if (_factory instanceof SchedulerDAOConnectionFactoryImpl) {
+      Connection c = _db.getDataSource().getConnection();
+      try {
+        StringBuffer sql = new StringBuffer();
+
+        {
+          InputStream in = getClass().getResourceAsStream("/simplesched-h2.sql");
+          int v;
+          while ((v = in.read()) != -1) {
+            sql.append((char) v);
+          }
+        }
+
+        String[] cmds = sql.toString().split(";");
+        for (String cmd : cmds) {
+          c.createStatement().executeUpdate(cmd);
+        }
+      } finally {
+        c.close();
+      }
+    }
+
+  }
+
+  @Override
+  public void tearDown() throws Exception {
+    _factory.shutdown();
+    _db.shutdown();
+
+  }
+
+  public static long mod(long a, long b) {
+    return a % b;
+  }
+}
+

Modified: ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SchedulerThreadTest.java
URL: http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SchedulerThreadTest.java?rev=940263&r1=940262&r2=940263&view=diff
==============================================================================
--- ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SchedulerThreadTest.java (original)
+++ ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SchedulerThreadTest.java Sun May  2 17:02:51 2010
@@ -24,14 +24,9 @@ import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.ode.scheduler.simple.SchedulerThread;
-import org.apache.ode.scheduler.simple.Task;
-import org.apache.ode.scheduler.simple.TaskRunner;
-
-
 import junit.framework.TestCase;
 
+
 /**
  * Test of SchedulerThread. 
  * 
@@ -44,6 +39,7 @@ public class SchedulerThreadTest extends
     
     List<TR> _tasks = new ArrayList<TR>(100); 
     
+    @Override
     public void setUp() throws Exception {
         _st = new SchedulerThread(this);
     }

Modified: ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java
URL: http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java?rev=940263&r1=940262&r2=940263&view=diff
==============================================================================
--- ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java (original)
+++ ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java Sun May  2 17:02:51 2010
@@ -16,207 +16,198 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.ode.scheduler.simple;
 
 import java.util.*;
 
-import javax.transaction.TransactionManager;
-
-import junit.framework.TestCase;
-
-import org.apache.geronimo.transaction.manager.GeronimoTransactionManager;
 import org.apache.ode.bpel.iapi.Scheduler;
 import org.apache.ode.bpel.iapi.Scheduler.JobInfo;
 import org.apache.ode.bpel.iapi.Scheduler.JobProcessor;
 import org.apache.ode.bpel.iapi.Scheduler.JobProcessorException;
 
-public class SimpleSchedulerTest extends TestCase implements JobProcessor {
+public class SimpleSchedulerTest extends SchedulerTestBase implements JobProcessor {
 
-    DelegateSupport _ds;
-    SimpleScheduler _scheduler;
-    ArrayList<JobInfo> _jobs;
-    TransactionManager _txm;
-
-
-    public void setUp() throws Exception {
-        _txm = new GeronimoTransactionManager();
-        _ds = new DelegateSupport();
-
-        _scheduler = newScheduler("n1");
-        _jobs = new ArrayList<JobInfo>(100);
-    }
-
-    public void tearDown() throws Exception {
-        _scheduler.shutdown();
-    }
-
-    public void testConcurrentExec() throws Exception  {
-        _scheduler.start();
-        _txm.begin();
-        String jobId;
-        try {
-            jobId = _scheduler.schedulePersistedJob(newDetail("123"), new Date(System.currentTimeMillis() + 100));
-            Thread.sleep(200);
-            // Make sure we don't schedule until commit.
-            assertEquals(0, _jobs.size());
-        } finally {
-            _txm.commit();
-        }
-        // Wait for the job to be execed.
-        Thread.sleep(100);
-        // Should execute job,
-        assertEquals(1, _jobs.size());
-
-    }
-    
-    public void testImmediateScheduling() throws Exception {
-        _scheduler.start();
-        _txm.begin();
-        try {
-            _scheduler.schedulePersistedJob(newDetail("123"), new Date());
-            Thread.sleep(100);
-            // Make sure we don't schedule until commit.
-            assertEquals(0, _jobs.size());
-        } finally {
-            _txm.commit();
-        }
-        Thread.sleep(100);
-        assertEquals(1, _jobs.size());
-    }
-
-    public void testStartStop() throws Exception {
-        _scheduler.start();
-        _txm.begin();
-        try {
-            for (int i = 0; i < 10; ++i)
-                _scheduler.schedulePersistedJob(newDetail("123"), new Date(System.currentTimeMillis() + (i * 100)));
-        } finally {
-            _txm.commit();
-        }
-        Thread.sleep(100);
-        _scheduler.stop();
-        int jobs = _jobs.size();
-        assertTrue(jobs > 0);
-        assertTrue(jobs < 10);
-        Thread.sleep(200);
-        assertEquals(jobs, _jobs.size());
-        _scheduler.start();
-        Thread.sleep(1000);
-        assertEquals(10, _jobs.size());
-    }
-
-    public void testNearFutureScheduling() throws Exception {
-        // speed things up a bit to hit the right code paths
-        _scheduler.setNearFutureInterval(1000);
-        _scheduler.setImmediateInterval(500);
-        _scheduler.start();
-
-        _txm.begin();
-        try {
-            _scheduler.schedulePersistedJob(newDetail("123"), new Date(System.currentTimeMillis() + 750));
-        } finally {
-            _txm.commit();
-        }
-
-        Thread.sleep(850);
-        assertEquals(1, _jobs.size());
-    }
-
-    public void testFarFutureScheduling() throws Exception {
-        // speed things up a bit to hit the right code paths
-        _scheduler.setNearFutureInterval(700);
-        _scheduler.setImmediateInterval(300);
-        _scheduler.start();
-
-        _txm.begin();
-        try {
-            _scheduler.schedulePersistedJob(newDetail("123"), new Date(System.currentTimeMillis() + 750));
-        } finally {
-            _txm.commit();
-        }
-
-        Thread.sleep(850);
-        assertEquals(1, _jobs.size());
-    }
-
-    public void testRecovery() throws Exception {
-        // speed things up a bit to hit the right code paths
-        _scheduler.setNearFutureInterval(200);
-        _scheduler.setImmediateInterval(100);
-        _scheduler.setStaleInterval(50);
-
-        _txm.begin();
-        try {
-            _scheduler.schedulePersistedJob(newDetail("immediate"), new Date(System.currentTimeMillis()));
-            _scheduler.schedulePersistedJob(newDetail("near"), new Date(System.currentTimeMillis() + 110));
-            _scheduler.schedulePersistedJob(newDetail("far"), new Date(System.currentTimeMillis() + 250));
-        } finally {
-            _txm.commit();
-        }
-
-        _scheduler = newScheduler("n3");
-        _scheduler.setNearFutureInterval(200);
-        _scheduler.setImmediateInterval(100);
-        _scheduler.setStaleInterval(50);
-        _scheduler.start();
-        Thread.sleep(400);
-        assertEquals(3, _jobs.size());
-    }
-
-    public void testRecoverySuppressed() throws Exception {
-        // speed things up a bit to hit the right code paths
-        _scheduler.setNearFutureInterval(200);
-        _scheduler.setImmediateInterval(100);
-        _scheduler.setStaleInterval(50);
-
-        // schedule some jobs ...
-        _txm.begin();
-        try {
-            _scheduler.schedulePersistedJob(newDetail("immediate"), new Date(System.currentTimeMillis()));
-            _scheduler.schedulePersistedJob(newDetail("near"), new Date(System.currentTimeMillis() + 150));
-            _scheduler.schedulePersistedJob(newDetail("far"), new Date(System.currentTimeMillis() + 250));
-        } finally {
-            _txm.commit();
-        } 
-
-        // but don't start the scheduler.... 
-        
-        // create a second node for the scheduler. 
-        SimpleScheduler scheduler = newScheduler("n3");
-        scheduler.setNearFutureInterval(200);
-        scheduler.setImmediateInterval(100);
-        scheduler.setStaleInterval(50);
-        scheduler.start();
-        for (int i = 0; i < 40; ++i) {
-            scheduler.updateHeartBeat("n1");
-            Thread.sleep(10);
-        }
-
-        scheduler.stop();
-
-        assertTrue(_jobs.size() <= 1);
-        if (_jobs.size() == 1)
-            assertEquals("far", _jobs.get(0).jobDetail.getDetailsExt().get("foo"));
-    }
-
-    public void onScheduledJob(final JobInfo jobInfo) throws JobProcessorException {
-        synchronized (_jobs) {
-            _jobs.add(jobInfo);
-        }
-    }
-
-    Scheduler.JobDetails newDetail(String x) {
-        Scheduler.JobDetails jd = new Scheduler.JobDetails();
-        jd.getDetailsExt().put("foo", x);
-        return jd;
-    }
-
-    private SimpleScheduler newScheduler(String nodeId) {
-        SimpleScheduler scheduler = new SimpleScheduler(nodeId, _ds.delegate(), new Properties());
-        scheduler.setJobProcessor(this);
-        scheduler.setTransactionManager(_txm);
-        return scheduler;
-    }
-    
+  SimpleScheduler _scheduler;
+  ArrayList<JobInfo> _jobs;
+
+  public void setUp() throws Exception {
+    super.setUp();
+    _scheduler = newScheduler("n1");
+    _jobs = new ArrayList<JobInfo>(100);
+  }
+
+  public void tearDown() throws Exception {
+    _scheduler.shutdown();
+    super.tearDown();
+  }
+
+  public void testConcurrentExec() throws Exception {
+    _scheduler.start();
+    _txm.begin();
+    String jobId;
+    try {
+      jobId = _scheduler.schedulePersistedJob(newDetail("123"), new Date(System.currentTimeMillis() + 100));
+      Thread.sleep(200);
+      // Make sure we don't schedule until commit.
+      assertEquals(0, _jobs.size());
+    } finally {
+      _txm.commit();
+    }
+    // Wait for the job to be execed.
+    Thread.sleep(100);
+    // Should execute job,
+    assertEquals(1, _jobs.size());
+
+  }
+
+  public void testImmediateScheduling() throws Exception {
+    _scheduler.start();
+    _txm.begin();
+    try {
+      _scheduler.schedulePersistedJob(newDetail("123"), new Date());
+      Thread.sleep(100);
+      // Make sure we don't schedule until commit.
+      assertEquals(0, _jobs.size());
+    } finally {
+      _txm.commit();
+    }
+    Thread.sleep(100);
+    assertEquals(1, _jobs.size());
+  }
+
+  public void testStartStop() throws Exception {
+    _scheduler.start();
+    _txm.begin();
+    try {
+      for (int i = 0; i < 10; ++i) {
+        _scheduler.schedulePersistedJob(newDetail("123"), new Date(System.currentTimeMillis() + (i * 100)));
+      }
+    } finally {
+      _txm.commit();
+    }
+    Thread.sleep(100);
+    _scheduler.stop();
+    int jobs = _jobs.size();
+    assertTrue(jobs > 0);
+    assertTrue(jobs < 10);
+    Thread.sleep(200);
+    assertEquals(jobs, _jobs.size());
+    _scheduler.start();
+    Thread.sleep(1000);
+    assertEquals(10, _jobs.size());
+  }
+
+  public void testNearFutureScheduling() throws Exception {
+    // speed things up a bit to hit the right code paths
+    _scheduler.setNearFutureInterval(1000);
+    _scheduler.setImmediateInterval(500);
+    _scheduler.start();
+
+    _txm.begin();
+    try {
+      _scheduler.schedulePersistedJob(newDetail("123"), new Date(System.currentTimeMillis() + 750));
+    } finally {
+      _txm.commit();
+    }
+
+    Thread.sleep(850);
+    assertEquals(1, _jobs.size());
+  }
+
+  public void testFarFutureScheduling() throws Exception {
+    // speed things up a bit to hit the right code paths
+    _scheduler.setNearFutureInterval(700);
+    _scheduler.setImmediateInterval(300);
+    _scheduler.start();
+
+    _txm.begin();
+    try {
+      _scheduler.schedulePersistedJob(newDetail("123"), new Date(System.currentTimeMillis() + 750));
+    } finally {
+      _txm.commit();
+    }
+
+    Thread.sleep(850);
+    assertEquals(1, _jobs.size());
+  }
+
+  public void testRecovery() throws Exception {
+    // speed things up a bit to hit the right code paths
+    _scheduler.setNearFutureInterval(200);
+    _scheduler.setImmediateInterval(100);
+    _scheduler.setStaleInterval(50);
+
+    _txm.begin();
+    try {
+      _scheduler.schedulePersistedJob(newDetail("immediate"), new Date(System.currentTimeMillis()));
+      _scheduler.schedulePersistedJob(newDetail("near"), new Date(System.currentTimeMillis() + 110));
+      _scheduler.schedulePersistedJob(newDetail("far"), new Date(System.currentTimeMillis() + 250));
+    } finally {
+      _txm.commit();
+    }
+
+    _scheduler = newScheduler("n3");
+    _scheduler.setNearFutureInterval(200);
+    _scheduler.setImmediateInterval(100);
+    _scheduler.setStaleInterval(50);
+    _scheduler.start();
+    Thread.sleep(400);
+    assertEquals(3, _jobs.size());
+  }
+
+  public void testRecoverySuppressed() throws Exception {
+    // speed things up a bit to hit the right code paths
+    _scheduler.setNearFutureInterval(200);
+    _scheduler.setImmediateInterval(100);
+    _scheduler.setStaleInterval(50);
+
+    // schedule some jobs ...
+    _txm.begin();
+    try {
+      _scheduler.schedulePersistedJob(newDetail("immediate"), new Date(System.currentTimeMillis()));
+      _scheduler.schedulePersistedJob(newDetail("near"), new Date(System.currentTimeMillis() + 150));
+      _scheduler.schedulePersistedJob(newDetail("far"), new Date(System.currentTimeMillis() + 250));
+    } finally {
+      _txm.commit();
+    }
+
+    // but don't start the scheduler....
+
+    // create a second node for the scheduler.
+    SimpleScheduler scheduler = newScheduler("n3");
+    scheduler.setNearFutureInterval(200);
+    scheduler.setImmediateInterval(100);
+    scheduler.setStaleInterval(50);
+    scheduler.start();
+    for (int i = 0; i < 40; ++i) {
+      scheduler.updateHeartBeat("n1");
+      Thread.sleep(10);
+    }
+
+    scheduler.stop();
+
+    assertTrue(_jobs.size() <= 1);
+    if (_jobs.size() == 1) {
+      assertEquals("far", _jobs.get(0).jobDetail.getDetailsExt().get("foo"));
+    }
+  }
+
+  public void onScheduledJob(final JobInfo jobInfo) throws JobProcessorException {
+    synchronized (_jobs) {
+      _jobs.add(jobInfo);
+    }
+  }
+
+  Scheduler.JobDetails newDetail(String x) {
+    Scheduler.JobDetails jd = new Scheduler.JobDetails();
+    jd.getDetailsExt().put("foo", x);
+    return jd;
+  }
+
+  private SimpleScheduler newScheduler(String nodeId) {
+    SimpleScheduler scheduler = new SimpleScheduler(nodeId, _factory, _txm, new Properties());
+    scheduler.setJobProcessor(this);
+    return scheduler;
+  }
 }
+

Modified: ode/trunk/scheduler-simple/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/test/resources/log4j.properties?rev=940263&r1=940262&r2=940263&view=diff
==============================================================================
--- ode/trunk/scheduler-simple/src/test/resources/log4j.properties (original)
+++ ode/trunk/scheduler-simple/src/test/resources/log4j.properties Sun May  2 17:02:51 2010
@@ -16,9 +16,10 @@
 #
 
 # Set root logger level to WARN and its only appender to CONSOLE
-log4j.rootLogger=WARN, CONSOLE
+log4j.rootLogger=WARN, CONSOLE, FILE
 
 # log4j properties to work with commandline tools.
+log4j.category.org.apache.ode=ERROR
 log4j.category.org.apache.ode.scheduler.simple.RetriesTest=INFO
 log4j.category.org.apache.ode.bpel.engine=INFO
 
@@ -26,3 +27,10 @@ log4j.category.org.apache.ode.bpel.engin
 log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
 log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
 log4j.appender.CONSOLE.layout.ConversionPattern=%p - %C{1}.%M(%L) | %m%n
+
+log4j.appender.FILE=org.apache.log4j.FileAppender
+log4j.appender.FILE.File=target/scheduler-test.log
+log4j.appender.FILE.layout=org.apache.log4j.PatternLayout
+log4j.appender.FILE.layout.ConversionPattern=%d{MM-dd@HH:mm:ss} %-5p (%13F:%L) %3x - %m%n
+log4j.appender.FILE.append=false
+

Added: ode/trunk/scheduler-simple/src/test/resources/simplesched-h2.sql
URL: http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/test/resources/simplesched-h2.sql?rev=940263&view=auto
==============================================================================
--- ode/trunk/scheduler-simple/src/test/resources/simplesched-h2.sql (added)
+++ ode/trunk/scheduler-simple/src/test/resources/simplesched-h2.sql Sun May  2 17:02:51 2010
@@ -0,0 +1,22 @@
+CREATE TABLE ode_job (
+  jobid varchar(64)  NOT NULL DEFAULT '',
+  ts BIGINT  NOT NULL DEFAULT 0,
+  nodeid varchar(64),
+  scheduled int  NOT NULL DEFAULT 0,
+  transacted int  NOT NULL DEFAULT 0,
+
+  instanceId BIGINT,
+  mexId varchar(255),
+  processId varchar(255),
+  type varchar(255),
+  channel varchar(255),
+  correlatorId varchar(255),
+  correlationKey varchar(255),
+  retryCount int,
+  inMem int,
+  detailsExt blob(4096),
+
+  PRIMARY KEY(jobid));
+
+CREATE INDEX IDX_ODE_JOB_TS ON ode_job(ts);
+CREATE INDEX IDX_ODE_JOB_NODEID ON ode_job(nodeid);
\ No newline at end of file