You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by ms...@apache.org on 2007/08/09 21:55:31 UTC

svn commit: r564361 [2/2] - in /ode/branches/bart: bpel-api/src/main/java/org/apache/ode/bpel/iapi/ bpel-dao/src/main/java/org/apache/ode/bpel/dao/ bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ bpel-runtime/src/main/java/org/apache/ode/bpel/me...

Modified: ode/branches/bart/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java?view=diff&rev=564361&r1=564360&r2=564361
==============================================================================
--- ode/branches/bart/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java (original)
+++ ode/branches/bart/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java Thu Aug  9 12:55:19 2007
@@ -25,7 +25,10 @@
 import org.apache.ode.bpel.dao.PartnerLinkDAO;
 import org.apache.ode.bpel.dao.ProcessDAO;
 import org.apache.ode.bpel.dao.ProcessInstanceDAO;
+import org.apache.ode.bpel.iapi.InvocationStyle;
 import org.apache.ode.bpel.iapi.MessageExchange.AckType;
+import org.apache.ode.bpel.iapi.MessageExchange.FailureType;
+import org.apache.ode.bpel.iapi.MessageExchange.Status;
 import org.apache.ode.utils.DOMUtils;
 import org.apache.ode.utils.uuid.UUID;
 import org.w3c.dom.Element;
@@ -112,18 +115,17 @@
     @ManyToOne(fetch= FetchType.LAZY,cascade={CascadeType.PERSIST}) @Column(name="CORR_ID")
     private CorrelatorDAOImpl _correlator;
     
-    
     @Basic @Column(name="ISTYLE")
     private String _istyle;
 
-    @OneToOne(fetch=FetchType.LAZY,cascade={CascadeType.PERSIST}) @Column(name="P2P_PIPE_PEER")
-    private MessageExchangeDAO _pipedMex;
-    
     @Basic @Column(name="TIMEOUT")
     private long _timeout;
     
     @Basic @Column(name="FAILURE_TYPE")
     private String _failureType;
+    
+    @Basic @Column(name="PIPED_PID")
+    private String _pipedPid;
 
     public MessageExchangeDAOImpl() {}
     
@@ -240,8 +242,8 @@
 		return _response;
 	}
 
-	public String getStatus() {
-		return _status;
+	public Status getStatus() {
+		return _status == null ? null : Status.valueOf(_status);
 	}
 
 	public void setCallee(QName callee) {
@@ -313,8 +315,8 @@
 		_response = (MessageDAOImpl)msg;
 	}
 
-	public void setStatus(String status) {
-		_status = status;
+	public void setStatus(Status status) {
+		_status = status == null ?  null : status.toString();
 	}
 
     public String getPipedMessageExchangeId() {
@@ -357,32 +359,25 @@
         _correlator = correlator;
     }
 
-    public String getInvocationStyle() {
-        return _istyle;
+    public InvocationStyle getInvocationStyle() {
+        return _istyle == null ? null : InvocationStyle.valueOf(_istyle);
     }
 
-    public MessageExchangeDAO getPipedMessageExchange() {
-        return _pipedMex;
-    }
 
     public long getTimeout() {
         return _timeout;
     }
 
-    public void setFailureType(String failureType) {
-        _failureType = failureType;
+    public void setFailureType(FailureType failureType) {
+        _failureType = failureType == null ? null :failureType.toString();
     }
     
-    public String getFailureType() {
-        return _failureType;
-    }
-
-    public void setInvocationStyle(String invocationStyle) {
-        _istyle = invocationStyle;
+    public FailureType getFailureType() {
+        return _failureType == null ? null : FailureType.valueOf(_failureType);
     }
 
-    public void setPipedMessageExchange(MessageExchangeDAO mex) {
-        _pipedMex = mex;
+    public void setInvocationStyle(InvocationStyle invocationStyle) {
+        _istyle = invocationStyle == null ? null : invocationStyle.toString();
     }
 
     public void setTimeout(long timeout) {
@@ -395,5 +390,13 @@
 
     public void setAckType(AckType ackType) {
         _ackType = ackType == null ? null :ackType.toString();
+    }
+
+    public QName getPipedPID() {
+        return _pipedPid == null ? null : QName.valueOf(_pipedPid);
+    }
+
+    public void setPipedPID(QName pipedPid) {
+        _pipedPid = pipedPid == null ? null : pipedPid.toString();
     }
 }

Modified: ode/branches/bart/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/JdbcDelegateTest.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/JdbcDelegateTest.java?view=diff&rev=564361&r1=564360&r2=564361
==============================================================================
--- ode/branches/bart/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/JdbcDelegateTest.java (original)
+++ ode/branches/bart/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/JdbcDelegateTest.java Thu Aug  9 12:55:19 2007
@@ -43,7 +43,7 @@
         _ds = new DelegateSupport();
         _del = _ds.delegate();
     }
-    
+     
     
     public void testGetNodeIds() throws Exception {
         // should have no node ids in the db, empty list (not null)

Modified: ode/branches/bart/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java?view=diff&rev=564361&r1=564360&r2=564361
==============================================================================
--- ode/branches/bart/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java (original)
+++ ode/branches/bart/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java Thu Aug  9 12:55:19 2007
@@ -24,25 +24,20 @@
 import java.util.HashMap;
 import java.util.Map;
 
-import javax.transaction.RollbackException;
-import javax.transaction.Status;
-import javax.transaction.Synchronization;
-import javax.transaction.SystemException;
 import javax.transaction.TransactionManager;
 
 import junit.framework.TestCase;
 
+import org.apache.geronimo.transaction.manager.GeronimoTransactionManager;
 import org.apache.ode.bpel.iapi.Scheduler.JobInfo;
 import org.apache.ode.bpel.iapi.Scheduler.JobProcessor;
 import org.apache.ode.bpel.iapi.Scheduler.JobProcessorException;
-import org.apache.geronimo.transaction.manager.GeronimoTransactionManager;
 
 public class SimpleSchedulerTest extends TestCase implements JobProcessor {
 
     DelegateSupport _ds;
     SimpleScheduler _scheduler;
     ArrayList<JobInfo> _jobs;
-    ArrayList<JobInfo> _commit;
     TransactionManager _txm;
 
 
@@ -52,7 +47,6 @@
 
         _scheduler = newScheduler("n1");
         _jobs = new ArrayList<JobInfo>(100);
-        _commit = new ArrayList<JobInfo>(100);
     }
 
     public void tearDown() throws Exception {
@@ -64,21 +58,18 @@
         _txm.begin();
         String jobId;
         try {
-            jobId = _scheduler.schedulePersistedJob(newDetail("123"), new Date(System.currentTimeMillis() + 200));
-            Thread.sleep(100);
+            jobId = _scheduler.schedulePersistedJob(newDetail("123"), new Date(System.currentTimeMillis() + 100));
+            Thread.sleep(200);
             // Make sure we don't schedule until commit.
             assertEquals(0, _jobs.size());
         } finally {
             _txm.commit();
         }
-        // Delete from DB
-        assertEquals(true,_ds.delegate().deleteJob(jobId, "n1"));
         // Wait for the job to be execed.
-        Thread.sleep(250);
+        Thread.sleep(100);
         // Should execute job,
         assertEquals(1, _jobs.size());
-        // But should not commit.
-        assertEquals(0, _commit.size());
+
     }
     
     public void testImmediateScheduling() throws Exception {
@@ -119,123 +110,102 @@
 
     public void testNearFutureScheduling() throws Exception {
         // speed things up a bit to hit the right code paths
-        _scheduler.setNearFutureInterval(10000);
-        _scheduler.setImmediateInterval(5000);
+        _scheduler.setNearFutureInterval(1000);
+        _scheduler.setImmediateInterval(500);
         _scheduler.start();
 
         _txm.begin();
         try {
-            _scheduler.schedulePersistedJob(newDetail("123"), new Date(System.currentTimeMillis() + 7500));
+            _scheduler.schedulePersistedJob(newDetail("123"), new Date(System.currentTimeMillis() + 750));
         } finally {
             _txm.commit();
         }
 
-        Thread.sleep(8500);
+        Thread.sleep(850);
         assertEquals(1, _jobs.size());
     }
 
     public void testFarFutureScheduling() throws Exception {
         // speed things up a bit to hit the right code paths
-        _scheduler.setNearFutureInterval(7000);
-        _scheduler.setImmediateInterval(3000);
+        _scheduler.setNearFutureInterval(700);
+        _scheduler.setImmediateInterval(300);
         _scheduler.start();
 
         _txm.begin();
         try {
-            _scheduler.schedulePersistedJob(newDetail("123"), new Date(System.currentTimeMillis() + 7500));
+            _scheduler.schedulePersistedJob(newDetail("123"), new Date(System.currentTimeMillis() + 750));
         } finally {
             _txm.commit();
         }
 
-        Thread.sleep(8500);
+        Thread.sleep(850);
         assertEquals(1, _jobs.size());
     }
 
     public void testRecovery() throws Exception {
         // speed things up a bit to hit the right code paths
-        _scheduler.setNearFutureInterval(2000);
-        _scheduler.setImmediateInterval(1000);
-        _scheduler.setStaleInterval(500);
+        _scheduler.setNearFutureInterval(200);
+        _scheduler.setImmediateInterval(100);
+        _scheduler.setStaleInterval(50);
 
         _txm.begin();
         try {
             _scheduler.schedulePersistedJob(newDetail("immediate"), new Date(System.currentTimeMillis()));
-            _scheduler.schedulePersistedJob(newDetail("near"), new Date(System.currentTimeMillis() + 1100));
-            _scheduler.schedulePersistedJob(newDetail("far"), new Date(System.currentTimeMillis() + 2500));
+            _scheduler.schedulePersistedJob(newDetail("near"), new Date(System.currentTimeMillis() + 110));
+            _scheduler.schedulePersistedJob(newDetail("far"), new Date(System.currentTimeMillis() + 250));
         } finally {
             _txm.commit();
         }
 
         _scheduler = newScheduler("n3");
-        _scheduler.setNearFutureInterval(2000);
-        _scheduler.setImmediateInterval(1000);
-        _scheduler.setStaleInterval(1000);
+        _scheduler.setNearFutureInterval(200);
+        _scheduler.setImmediateInterval(100);
+        _scheduler.setStaleInterval(50);
         _scheduler.start();
-        Thread.sleep(4000);
+        Thread.sleep(400);
         assertEquals(3, _jobs.size());
     }
 
     public void testRecoverySuppressed() throws Exception {
         // speed things up a bit to hit the right code paths
-        _scheduler.setNearFutureInterval(2000);
-        _scheduler.setImmediateInterval(1000);
-        _scheduler.setStaleInterval(500);
+        _scheduler.setNearFutureInterval(200);
+        _scheduler.setImmediateInterval(100);
+        _scheduler.setStaleInterval(50);
 
+        // schedule some jobs ...
         _txm.begin();
         try {
             _scheduler.schedulePersistedJob(newDetail("immediate"), new Date(System.currentTimeMillis()));
-            _scheduler.schedulePersistedJob(newDetail("near"), new Date(System.currentTimeMillis() + 1100));
-            _scheduler.schedulePersistedJob(newDetail("far"), new Date(System.currentTimeMillis() + 2500));
+            _scheduler.schedulePersistedJob(newDetail("near"), new Date(System.currentTimeMillis() + 150));
+            _scheduler.schedulePersistedJob(newDetail("far"), new Date(System.currentTimeMillis() + 250));
         } finally {
             _txm.commit();
-        }
+        } 
 
-        _scheduler = newScheduler("n3");
-        _scheduler.setNearFutureInterval(2000);
-        _scheduler.setImmediateInterval(1000);
-        _scheduler.setStaleInterval(1000);
-        _scheduler.start();
+        // but don't start the scheduler.... 
+        
+        // create a second node for the scheduler. 
+        SimpleScheduler scheduler = newScheduler("n3");
+        scheduler.setNearFutureInterval(200);
+        scheduler.setImmediateInterval(100);
+        scheduler.setStaleInterval(50);
+        scheduler.start();
         for (int i = 0; i < 40; ++i) {
-            _scheduler.updateHeartBeat("n1");
-            Thread.sleep(100);
+            scheduler.updateHeartBeat("n1");
+            Thread.sleep(10);
         }
 
-        _scheduler.stop();
-        Thread.sleep(1000);
+        scheduler.stop();
 
-        assertEquals(0, _jobs.size());
+        assertTrue(_jobs.size() <= 1);
+        if (_jobs.size() == 1)
+            assertEquals("far", _jobs.get(0).jobDetail.get("foo"));
     }
 
     public void onScheduledJob(final JobInfo jobInfo) throws JobProcessorException {
         synchronized (_jobs) {
             _jobs.add(jobInfo);
         }
-        
-        try {
-            _txm.getTransaction().registerSynchronization(new Synchronization() {
-
-                public void afterCompletion(int arg0) {
-                    if (arg0 == Status.STATUS_COMMITTED) 
-                        _commit.add(jobInfo);
-                }
-
-                public void beforeCompletion() {
-                    // TODO Auto-generated method stub
-                    
-                }
-                
-            });
-        } catch (IllegalStateException e) {
-            // TODO Auto-generated catch block
-            e.printStackTrace();
-        } catch (RollbackException e) {
-            // TODO Auto-generated catch block
-            e.printStackTrace();
-        } catch (SystemException e) {
-            // TODO Auto-generated catch block
-            e.printStackTrace();
-        }
-
     }
 
     Map<String, Object> newDetail(String x) {