You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by mr...@apache.org on 2008/09/03 19:32:52 UTC

svn commit: r691693 - in /ode/trunk: axis2/src/main/java/org/apache/ode/axis2/soapbinding/ bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/

Author: mriou
Date: Wed Sep  3 10:32:51 2008
New Revision: 691693

URL: http://svn.apache.org/viewvc?rev=691693&view=rev
Log:
Canceling invocation check on reply.

Modified:
    ode/trunk/axis2/src/main/java/org/apache/ode/axis2/soapbinding/SoapExternalService.java
    ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
    ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ODEProcess.java
    ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/WorkEvent.java
    ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/Job.java
    ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SchedulerThread.java
    ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java

Modified: ode/trunk/axis2/src/main/java/org/apache/ode/axis2/soapbinding/SoapExternalService.java
URL: http://svn.apache.org/viewvc/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/soapbinding/SoapExternalService.java?rev=691693&r1=691692&r2=691693&view=diff
==============================================================================
--- ode/trunk/axis2/src/main/java/org/apache/ode/axis2/soapbinding/SoapExternalService.java (original)
+++ ode/trunk/axis2/src/main/java/org/apache/ode/axis2/soapbinding/SoapExternalService.java Wed Sep  3 10:32:51 2008
@@ -265,7 +265,6 @@
             __log.error(emsg, e);
 
         }
-
     }
 
     private void reply(final PartnerRoleMessageExchange odeMex, final Operation operation, final MessageContext reply, final boolean isFault) {

Modified: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java?rev=691693&r1=691692&r2=691693&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java (original)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java Wed Sep  3 10:32:51 2008
@@ -46,20 +46,7 @@
 import org.apache.ode.bpel.dao.ProcessDAO;
 import org.apache.ode.bpel.evar.ExternalVariableModule;
 import org.apache.ode.bpel.evt.BpelEvent;
-import org.apache.ode.bpel.iapi.BindingContext;
-import org.apache.ode.bpel.iapi.BpelEngineException;
-import org.apache.ode.bpel.iapi.BpelEventListener;
-import org.apache.ode.bpel.iapi.BpelServer;
-import org.apache.ode.bpel.iapi.ContextException;
-import org.apache.ode.bpel.iapi.Endpoint;
-import org.apache.ode.bpel.iapi.EndpointReferenceContext;
-import org.apache.ode.bpel.iapi.InvocationStyle;
-import org.apache.ode.bpel.iapi.Message;
-import org.apache.ode.bpel.iapi.MessageExchange;
-import org.apache.ode.bpel.iapi.MessageExchangeContext;
-import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
-import org.apache.ode.bpel.iapi.ProcessConf;
-import org.apache.ode.bpel.iapi.Scheduler;
+import org.apache.ode.bpel.iapi.*;
 import org.apache.ode.bpel.iapi.Scheduler.JobInfo;
 import org.apache.ode.bpel.iapi.Scheduler.JobProcessorException;
 import org.apache.ode.bpel.intercept.MessageExchangeInterceptor;
@@ -520,7 +507,6 @@
                 // doing any work on its behalf, therefore we will reschedule the
                 // events for some time in the future (1 minute).
                 _contexts.execTransaction(new Callable<Void>() {
-
                     public Void call() throws Exception {
                         _contexts.scheduler.jobCompleted(jobInfo.jobName);
                         Date future = new Date(System.currentTimeMillis() + (60 * 1000));
@@ -532,6 +518,19 @@
                 });
                 return;
             }
+            
+            if (we.getType().equals(WorkEvent.Type.INVOKE_CHECK)) {
+                if (__log.isDebugEnabled()) __log.debug("handleWorkEvent: InvokeCheck event for mexid " + we.getMexId());
+
+                PartnerRoleMessageExchange mex = (PartnerRoleMessageExchange) getMessageExchange(we.getMexId());
+                if (mex.getStatus() == MessageExchange.Status.ASYNC || mex.getStatus() == MessageExchange.Status.ACK) {
+                    String msg = "Dangling invocation (mexId=" + we.getMexId() + "), forcing it into a failed state.";
+                    if (__log.isDebugEnabled()) __log.debug(msg);
+                    mex.replyWithFailure(MessageExchange.FailureType.COMMUNICATION_ERROR, msg, null);
+                }
+                return;
+            }
+
             process.handleWorkEvent(jobInfo);
         } catch (Exception ex) {
             throw new JobProcessorException(ex, jobInfo.jobDetail.get("inmem") == null);

Modified: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ODEProcess.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ODEProcess.java?rev=691693&r1=691692&r2=691693&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ODEProcess.java (original)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ODEProcess.java Wed Sep  3 10:32:51 2008
@@ -421,6 +421,9 @@
 
         OdeRTInstance rti = _runtime.newInstance(getState(mexdao.getInstance()));
         BpelRuntimeContextImpl brc = new BpelRuntimeContextImpl(worker, mexdao.getInstance(), rti);
+        // Canceling invoke check
+        String jobId = mexdao.getProperty("invokeCheckJobId");
+        _contexts.scheduler.cancelJob(jobId);        
 
         brc.injectPartnerResponse(mexdao.getMessageExchangeId(), mexdao.getChannel());
         brc.execute();
@@ -498,7 +501,6 @@
                 _contexts.scheduler.jobCompleted(jobInfo.jobName);
                 execInstanceEvent(we);
             }
-
         });
 
     }
@@ -1192,11 +1194,11 @@
 
     }
 
-    public void scheduleWorkEvent(WorkEvent we, Date timeToFire) {
+    public String scheduleWorkEvent(WorkEvent we, Date timeToFire) {
         // if (isInMemory())
         // throw new InvalidProcessException("In-mem process execution resulted in event scheduling.");
 
-        _contexts.scheduler.schedulePersistedJob(we.getDetail(), timeToFire);
+        return _contexts.scheduler.schedulePersistedJob(we.getDetail(), timeToFire);
     }
 
     void invokePartner(MessageExchangeDAO mexdao) {
@@ -1233,6 +1235,9 @@
                 }
             } else {
                 partnerRole.invokeIL(mexdao);
+                // Scheduling a verification to see if the invoke has really been processed. Otherwise
+                // we put it in activity recovery mode (case of a server crash during invocation).
+                scheduleInvokeCheck(mexdao);
             }
         } finally {
             if (mexdao.getStatus() != Status.ACK)
@@ -1243,6 +1248,21 @@
         assert mexdao.getStatus() == Status.ACK || mexdao.getStatus() == Status.ASYNC;
     }
 
+    private void scheduleInvokeCheck(MessageExchangeDAO mex) {
+        boolean isTwoWay = mex.getPattern() ==
+                org.apache.ode.bpel.iapi.MessageExchange.MessageExchangePattern.REQUEST_RESPONSE;
+        if (!isInMemory() && isTwoWay) {
+            if (__log.isDebugEnabled()) __log.debug("Creating invocation check event for mexid " + mex.getMessageExchangeId());
+            WorkEvent event = new WorkEvent();
+            event.setMexId(mex.getMessageExchangeId());
+            event.setProcessId(getPID());
+            event.setType(WorkEvent.Type.INVOKE_CHECK);
+            Date future = new Date(System.currentTimeMillis() + (180 * 1000));
+            String jobId = scheduleWorkEvent(event, future);
+            mex.setProperty("invokeCheckJobId", jobId);
+        }
+    }
+
     /**
      * Invoke a partner process directly (via the engine), bypassing the Integration Layer. Obviously this can only be used when an
      * process is partners with another process hosted on the same engine.

Modified: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/WorkEvent.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/WorkEvent.java?rev=691693&r1=691692&r2=691693&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/WorkEvent.java (original)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/WorkEvent.java Wed Sep  3 10:32:51 2008
@@ -76,7 +76,9 @@
         /** Invoke a "my role" operation (i.e. implemented by the process). */
         MYROLE_INVOKE, 
         
-        MYROLE_INVOKE_ASYNC_RESPONSE
+        MYROLE_INVOKE_ASYNC_RESPONSE,
+
+        INVOKE_CHECK
     }
 
     public String getChannel() {

Modified: ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/Job.java
URL: http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/Job.java?rev=691693&r1=691692&r2=691693&view=diff
==============================================================================
--- ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/Job.java (original)
+++ ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/Job.java Wed Sep  3 10:32:51 2008
@@ -25,7 +25,7 @@
 
 /**
  * Like a task, but a little bit better.
- * 
+ *
  * @author Maciej Szefler ( m s z e f l e r @ g m a i l . c o m )
  */
 class Job extends Task {
@@ -37,7 +37,7 @@
     public Job(long when, boolean transacted, Map<String, Object> jobDetail) {
         this(when, new GUID().toString(),transacted,jobDetail);
     }
-    
+
     public Job(long when, String jobId, boolean transacted,Map<String, Object> jobDetail) {
         super(when);
         this.jobId = jobId;
@@ -45,5 +45,14 @@
         this.transacted = transacted;
     }
 
-    
+    @Override
+    public int hashCode() {
+        return jobId.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        return obj instanceof Job && jobId.equals(((Job) obj).jobId);
+    }
+
 }

Modified: ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SchedulerThread.java
URL: http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SchedulerThread.java?rev=691693&r1=691692&r2=691693&view=diff
==============================================================================
--- ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SchedulerThread.java (original)
+++ ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SchedulerThread.java Wed Sep  3 10:32:51 2008
@@ -31,152 +31,168 @@
 
 /**
  * Implements the "todo" queue and prioritized scheduling mechanism. 
- * 
+ *
  * @author mszefler
  * @author Maciej Szefler ( m s z e f l e r @ g m a i l . c o m )
- * 
+ *
  */
 class SchedulerThread implements Runnable {
 
-	private static final Log __log = LogFactory.getLog(SchedulerThread.class);
+    private static final Log __log = LogFactory.getLog(SchedulerThread.class);
 
-	private static final int TODO_QUEUE_INITIAL_CAPACITY = 200;
+    private static final int TODO_QUEUE_INITIAL_CAPACITY = 200;
 
-	/** Jobs ready for immediate execution. */
-	private PriorityBlockingQueue<Task> _todo;
+    /** Jobs ready for immediate execution. */
+    private PriorityBlockingQueue<Task> _todo;
 
-	/** Lock for managing the queue */
-	private ReentrantLock _lock = new ReentrantLock();
-
-	private Condition _activity = _lock.newCondition();
-
-	private volatile boolean _done;
-
-	private TaskRunner _taskrunner;
-
-	private Thread _thread;
-
-	SchedulerThread(TaskRunner runner) {
-		_todo = new PriorityBlockingQueue<Task>(TODO_QUEUE_INITIAL_CAPACITY,
-				new JobComparatorByDate());
-		_taskrunner = runner;
-	}
-
-	void start() {
-		if (_thread != null)
-			return;
-
-		_done = false;
-		_thread = new Thread(this, "OdeScheduler");
-		_thread.start();
-	}
-
-	/**
-	 * Shutdown the thread.
-	 */
-	void stop() {
-		if (_thread == null)
-			return;
-
-		_done = true;
-		_lock.lock();
-		try {
-			_activity.signal();
-		} finally {
-			_lock.unlock();
-
-		}
-
-		while (_thread != null)
-			try {
-				_thread.join();
-				_thread = null;
-			} catch (InterruptedException e) {
-				;
-			}
-
-	}
-
-	/**
-	 * Add a job to the todo queue.
-	 * 
-	 * @param job
-	 */
-	void enqueue(Task task) {
-		_lock.lock();
-		try {
-			_todo.add(task);
-			_activity.signal();
-		} finally {
-			_lock.unlock();
-		}
-	}
-
-	/**
-	 * Get the size of the todo queue.
-	 * 
-	 * @return
-	 */
-	public int size() {
-		return _todo.size();
-	}
-
-	/**
-	 * Pop items off the todo queue, and send them to the task runner for processing.
-	 */
-	public void run() {
-		while (!_done) {
-			_lock.lock();
-			try {
-				long nextjob;
-				while ((nextjob = nextJobTime()) > 0 && !_done)
-					_activity.await(nextjob, TimeUnit.MILLISECONDS);
-
-				if (!_done && nextjob == 0) {
-					Task task = _todo.take();
-					_taskrunner.runTask(task);
-
-				}
-			} catch (InterruptedException ex) {
-				; // ignore
-			} finally {
-				_lock.unlock();
-			}
-		}
-	}
-
-	/**
-	 * Calculate the time until the next available job.
-	 * 
-	 * @return time until next job, 0 if one is one is scheduled to go, and some
-	 *         really large number if there are no jobs to speak of
-	 */
-	private long nextJobTime() {
-		assert _lock.isLocked();
-
-		Task job = _todo.peek();
-		if (job == null)
-			return Long.MAX_VALUE;
-
-		return Math.max(0, job.schedDate - System.currentTimeMillis());
-	}
-
-	/**
-	 * Remove the tasks of a given type from the list. 
-	 * @param tasktype type of task
-	 */
-	public void clearTasks(final Class<? extends Task> tasktype) {
-		_lock.lock();
-		try {
-			CollectionsX.remove_if(_todo, new MemberOfFunction<Task>() {
-				@Override
-				public boolean isMember(Task o) {
-					return tasktype.isAssignableFrom(o.getClass());
-				}
-
-			});
-		} finally {
-			_lock.unlock();
-		}
-	}
+    /** Lock for managing the queue */
+    private ReentrantLock _lock = new ReentrantLock();
+
+    private Condition _activity = _lock.newCondition();
+
+    private volatile boolean _done;
+
+    private TaskRunner _taskrunner;
+
+    private Thread _thread;
+
+    SchedulerThread(TaskRunner runner) {
+        _todo = new PriorityBlockingQueue<Task>(TODO_QUEUE_INITIAL_CAPACITY,
+                new JobComparatorByDate());
+        _taskrunner = runner;
+    }
+
+    void start() {
+        if (_thread != null)
+            return;
+
+        _done = false;
+        _thread = new Thread(this, "OdeScheduler");
+        _thread.start();
+    }
+
+    /**
+     * Shutdown the thread.
+     */
+    void stop() {
+        if (_thread == null)
+            return;
+
+        _done = true;
+        _lock.lock();
+        try {
+            _activity.signal();
+        } finally {
+            _lock.unlock();
+
+        }
+
+        while (_thread != null)
+            try {
+                _thread.join();
+                _thread = null;
+            } catch (InterruptedException e) {
+                ;
+            }
+
+    }
+
+    /**
+     * Add a job to the todo queue.
+     *
+     * @param job
+     */
+    void enqueue(Task task) {
+        _lock.lock();
+        try {
+            _todo.add(task);
+            _activity.signal();
+        } finally {
+            _lock.unlock();
+        }
+    }
+
+    /**
+     * Remove a job to the todo queue.
+     *
+     * @param job
+     */
+    void dequeue(Task task) {
+        _lock.lock();
+        try {
+            _todo.remove(task);
+            _activity.signal();
+        } finally {
+            _lock.unlock();
+        }
+    }
+
+
+    /**
+     * Get the size of the todo queue.
+     *
+     * @return
+     */
+    public int size() {
+        return _todo.size();
+    }
+
+    /**
+     * Pop items off the todo queue, and send them to the task runner for processing.
+     */
+    public void run() {
+        while (!_done) {
+            _lock.lock();
+            try {
+                long nextjob;
+                while ((nextjob = nextJobTime()) > 0 && !_done)
+                    _activity.await(nextjob, TimeUnit.MILLISECONDS);
+
+                if (!_done && nextjob == 0) {
+                    Task task = _todo.take();
+                    _taskrunner.runTask(task);
+
+                }
+            } catch (InterruptedException ex) {
+                ; // ignore
+            } finally {
+                _lock.unlock();
+            }
+        }
+    }
+
+    /**
+     * Calculate the time until the next available job.
+     *
+     * @return time until next job, 0 if one is one is scheduled to go, and some
+     *         really large number if there are no jobs to speak of
+     */
+    private long nextJobTime() {
+        assert _lock.isLocked();
+
+        Task job = _todo.peek();
+        if (job == null)
+            return Long.MAX_VALUE;
+
+        return Math.max(0, job.schedDate - System.currentTimeMillis());
+    }
+
+    /**
+     * Remove the tasks of a given type from the list.
+     * @param tasktype type of task
+     */
+    public void clearTasks(final Class<? extends Task> tasktype) {
+        _lock.lock();
+        try {
+            CollectionsX.remove_if(_todo, new MemberOfFunction<Task>() {
+                @Override
+                public boolean isMember(Task o) {
+                    return tasktype.isAssignableFrom(o.getClass());
+                }
+
+            });
+        } finally {
+            _lock.unlock();
+        }
+    }
 }

Modified: ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
URL: http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java?rev=691693&r1=691692&r2=691693&view=diff
==============================================================================
--- ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java (original)
+++ ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java Wed Sep  3 10:32:51 2008
@@ -132,7 +132,13 @@
     }
 
     public void cancelJob(String jobId) throws ContextException {
-        // TODO: maybe later, not really necessary.
+        _todo.dequeue(new Job(0, jobId, false, null));
+        try {
+            _db.deleteJob(jobId, _nodeId);
+        } catch (DatabaseException e) {
+            __log.debug("Job removal failed.", e);
+            throw new ContextException("Job removal failed.", e);
+        }
     }
 
     public <T> T execTransaction(Callable<T> transaction) throws Exception, ContextException {