You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by mr...@apache.org on 2008/09/03 19:32:52 UTC
svn commit: r691693 - in /ode/trunk:
axis2/src/main/java/org/apache/ode/axis2/soapbinding/
bpel-runtime/src/main/java/org/apache/ode/bpel/engine/
scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/
Author: mriou
Date: Wed Sep 3 10:32:51 2008
New Revision: 691693
URL: http://svn.apache.org/viewvc?rev=691693&view=rev
Log:
Canceling invocation check on reply.
Modified:
ode/trunk/axis2/src/main/java/org/apache/ode/axis2/soapbinding/SoapExternalService.java
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ODEProcess.java
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/WorkEvent.java
ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/Job.java
ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SchedulerThread.java
ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
Modified: ode/trunk/axis2/src/main/java/org/apache/ode/axis2/soapbinding/SoapExternalService.java
URL: http://svn.apache.org/viewvc/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/soapbinding/SoapExternalService.java?rev=691693&r1=691692&r2=691693&view=diff
==============================================================================
--- ode/trunk/axis2/src/main/java/org/apache/ode/axis2/soapbinding/SoapExternalService.java (original)
+++ ode/trunk/axis2/src/main/java/org/apache/ode/axis2/soapbinding/SoapExternalService.java Wed Sep 3 10:32:51 2008
@@ -265,7 +265,6 @@
__log.error(emsg, e);
}
-
}
private void reply(final PartnerRoleMessageExchange odeMex, final Operation operation, final MessageContext reply, final boolean isFault) {
Modified: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java?rev=691693&r1=691692&r2=691693&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java (original)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java Wed Sep 3 10:32:51 2008
@@ -46,20 +46,7 @@
import org.apache.ode.bpel.dao.ProcessDAO;
import org.apache.ode.bpel.evar.ExternalVariableModule;
import org.apache.ode.bpel.evt.BpelEvent;
-import org.apache.ode.bpel.iapi.BindingContext;
-import org.apache.ode.bpel.iapi.BpelEngineException;
-import org.apache.ode.bpel.iapi.BpelEventListener;
-import org.apache.ode.bpel.iapi.BpelServer;
-import org.apache.ode.bpel.iapi.ContextException;
-import org.apache.ode.bpel.iapi.Endpoint;
-import org.apache.ode.bpel.iapi.EndpointReferenceContext;
-import org.apache.ode.bpel.iapi.InvocationStyle;
-import org.apache.ode.bpel.iapi.Message;
-import org.apache.ode.bpel.iapi.MessageExchange;
-import org.apache.ode.bpel.iapi.MessageExchangeContext;
-import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
-import org.apache.ode.bpel.iapi.ProcessConf;
-import org.apache.ode.bpel.iapi.Scheduler;
+import org.apache.ode.bpel.iapi.*;
import org.apache.ode.bpel.iapi.Scheduler.JobInfo;
import org.apache.ode.bpel.iapi.Scheduler.JobProcessorException;
import org.apache.ode.bpel.intercept.MessageExchangeInterceptor;
@@ -520,7 +507,6 @@
// doing any work on its behalf, therefore we will reschedule the
// events for some time in the future (1 minute).
_contexts.execTransaction(new Callable<Void>() {
-
public Void call() throws Exception {
_contexts.scheduler.jobCompleted(jobInfo.jobName);
Date future = new Date(System.currentTimeMillis() + (60 * 1000));
@@ -532,6 +518,19 @@
});
return;
}
+
+ if (we.getType().equals(WorkEvent.Type.INVOKE_CHECK)) {
+ if (__log.isDebugEnabled()) __log.debug("handleWorkEvent: InvokeCheck event for mexid " + we.getMexId());
+
+ PartnerRoleMessageExchange mex = (PartnerRoleMessageExchange) getMessageExchange(we.getMexId());
+ if (mex.getStatus() == MessageExchange.Status.ASYNC || mex.getStatus() == MessageExchange.Status.ACK) {
+ String msg = "Dangling invocation (mexId=" + we.getMexId() + "), forcing it into a failed state.";
+ if (__log.isDebugEnabled()) __log.debug(msg);
+ mex.replyWithFailure(MessageExchange.FailureType.COMMUNICATION_ERROR, msg, null);
+ }
+ return;
+ }
+
process.handleWorkEvent(jobInfo);
} catch (Exception ex) {
throw new JobProcessorException(ex, jobInfo.jobDetail.get("inmem") == null);
Modified: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ODEProcess.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ODEProcess.java?rev=691693&r1=691692&r2=691693&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ODEProcess.java (original)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ODEProcess.java Wed Sep 3 10:32:51 2008
@@ -421,6 +421,9 @@
OdeRTInstance rti = _runtime.newInstance(getState(mexdao.getInstance()));
BpelRuntimeContextImpl brc = new BpelRuntimeContextImpl(worker, mexdao.getInstance(), rti);
+ // Canceling invoke check
+ String jobId = mexdao.getProperty("invokeCheckJobId");
+ _contexts.scheduler.cancelJob(jobId);
brc.injectPartnerResponse(mexdao.getMessageExchangeId(), mexdao.getChannel());
brc.execute();
@@ -498,7 +501,6 @@
_contexts.scheduler.jobCompleted(jobInfo.jobName);
execInstanceEvent(we);
}
-
});
}
@@ -1192,11 +1194,11 @@
}
- public void scheduleWorkEvent(WorkEvent we, Date timeToFire) {
+ public String scheduleWorkEvent(WorkEvent we, Date timeToFire) {
// if (isInMemory())
// throw new InvalidProcessException("In-mem process execution resulted in event scheduling.");
- _contexts.scheduler.schedulePersistedJob(we.getDetail(), timeToFire);
+ return _contexts.scheduler.schedulePersistedJob(we.getDetail(), timeToFire);
}
void invokePartner(MessageExchangeDAO mexdao) {
@@ -1233,6 +1235,9 @@
}
} else {
partnerRole.invokeIL(mexdao);
+ // Scheduling a verification to see if the invoke has really been processed. Otherwise
+ // we put it in activity recovery mode (case of a server crash during invocation).
+ scheduleInvokeCheck(mexdao);
}
} finally {
if (mexdao.getStatus() != Status.ACK)
@@ -1243,6 +1248,21 @@
assert mexdao.getStatus() == Status.ACK || mexdao.getStatus() == Status.ASYNC;
}
+ private void scheduleInvokeCheck(MessageExchangeDAO mex) {
+ boolean isTwoWay = mex.getPattern() ==
+ org.apache.ode.bpel.iapi.MessageExchange.MessageExchangePattern.REQUEST_RESPONSE;
+ if (!isInMemory() && isTwoWay) {
+ if (__log.isDebugEnabled()) __log.debug("Creating invocation check event for mexid " + mex.getMessageExchangeId());
+ WorkEvent event = new WorkEvent();
+ event.setMexId(mex.getMessageExchangeId());
+ event.setProcessId(getPID());
+ event.setType(WorkEvent.Type.INVOKE_CHECK);
+ Date future = new Date(System.currentTimeMillis() + (180 * 1000));
+ String jobId = scheduleWorkEvent(event, future);
+ mex.setProperty("invokeCheckJobId", jobId);
+ }
+ }
+
/**
* Invoke a partner process directly (via the engine), bypassing the Integration Layer. Obviously this can only be used when an
* process is partners with another process hosted on the same engine.
Modified: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/WorkEvent.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/WorkEvent.java?rev=691693&r1=691692&r2=691693&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/WorkEvent.java (original)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/WorkEvent.java Wed Sep 3 10:32:51 2008
@@ -76,7 +76,9 @@
/** Invoke a "my role" operation (i.e. implemented by the process). */
MYROLE_INVOKE,
- MYROLE_INVOKE_ASYNC_RESPONSE
+ MYROLE_INVOKE_ASYNC_RESPONSE,
+
+ INVOKE_CHECK
}
public String getChannel() {
Modified: ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/Job.java
URL: http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/Job.java?rev=691693&r1=691692&r2=691693&view=diff
==============================================================================
--- ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/Job.java (original)
+++ ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/Job.java Wed Sep 3 10:32:51 2008
@@ -25,7 +25,7 @@
/**
* Like a task, but a little bit better.
- *
+ *
* @author Maciej Szefler ( m s z e f l e r @ g m a i l . c o m )
*/
class Job extends Task {
@@ -37,7 +37,7 @@
public Job(long when, boolean transacted, Map<String, Object> jobDetail) {
this(when, new GUID().toString(),transacted,jobDetail);
}
-
+
public Job(long when, String jobId, boolean transacted,Map<String, Object> jobDetail) {
super(when);
this.jobId = jobId;
@@ -45,5 +45,14 @@
this.transacted = transacted;
}
-
+ @Override
+ public int hashCode() {
+ return jobId.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj instanceof Job && jobId.equals(((Job) obj).jobId);
+ }
+
}
Modified: ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SchedulerThread.java
URL: http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SchedulerThread.java?rev=691693&r1=691692&r2=691693&view=diff
==============================================================================
--- ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SchedulerThread.java (original)
+++ ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SchedulerThread.java Wed Sep 3 10:32:51 2008
@@ -31,152 +31,168 @@
/**
* Implements the "todo" queue and prioritized scheduling mechanism.
- *
+ *
* @author mszefler
* @author Maciej Szefler ( m s z e f l e r @ g m a i l . c o m )
- *
+ *
*/
class SchedulerThread implements Runnable {
- private static final Log __log = LogFactory.getLog(SchedulerThread.class);
+ private static final Log __log = LogFactory.getLog(SchedulerThread.class);
- private static final int TODO_QUEUE_INITIAL_CAPACITY = 200;
+ private static final int TODO_QUEUE_INITIAL_CAPACITY = 200;
- /** Jobs ready for immediate execution. */
- private PriorityBlockingQueue<Task> _todo;
+ /** Jobs ready for immediate execution. */
+ private PriorityBlockingQueue<Task> _todo;
- /** Lock for managing the queue */
- private ReentrantLock _lock = new ReentrantLock();
-
- private Condition _activity = _lock.newCondition();
-
- private volatile boolean _done;
-
- private TaskRunner _taskrunner;
-
- private Thread _thread;
-
- SchedulerThread(TaskRunner runner) {
- _todo = new PriorityBlockingQueue<Task>(TODO_QUEUE_INITIAL_CAPACITY,
- new JobComparatorByDate());
- _taskrunner = runner;
- }
-
- void start() {
- if (_thread != null)
- return;
-
- _done = false;
- _thread = new Thread(this, "OdeScheduler");
- _thread.start();
- }
-
- /**
- * Shutdown the thread.
- */
- void stop() {
- if (_thread == null)
- return;
-
- _done = true;
- _lock.lock();
- try {
- _activity.signal();
- } finally {
- _lock.unlock();
-
- }
-
- while (_thread != null)
- try {
- _thread.join();
- _thread = null;
- } catch (InterruptedException e) {
- ;
- }
-
- }
-
- /**
- * Add a job to the todo queue.
- *
- * @param job
- */
- void enqueue(Task task) {
- _lock.lock();
- try {
- _todo.add(task);
- _activity.signal();
- } finally {
- _lock.unlock();
- }
- }
-
- /**
- * Get the size of the todo queue.
- *
- * @return
- */
- public int size() {
- return _todo.size();
- }
-
- /**
- * Pop items off the todo queue, and send them to the task runner for processing.
- */
- public void run() {
- while (!_done) {
- _lock.lock();
- try {
- long nextjob;
- while ((nextjob = nextJobTime()) > 0 && !_done)
- _activity.await(nextjob, TimeUnit.MILLISECONDS);
-
- if (!_done && nextjob == 0) {
- Task task = _todo.take();
- _taskrunner.runTask(task);
-
- }
- } catch (InterruptedException ex) {
- ; // ignore
- } finally {
- _lock.unlock();
- }
- }
- }
-
- /**
- * Calculate the time until the next available job.
- *
- * @return time until next job, 0 if one is one is scheduled to go, and some
- * really large number if there are no jobs to speak of
- */
- private long nextJobTime() {
- assert _lock.isLocked();
-
- Task job = _todo.peek();
- if (job == null)
- return Long.MAX_VALUE;
-
- return Math.max(0, job.schedDate - System.currentTimeMillis());
- }
-
- /**
- * Remove the tasks of a given type from the list.
- * @param tasktype type of task
- */
- public void clearTasks(final Class<? extends Task> tasktype) {
- _lock.lock();
- try {
- CollectionsX.remove_if(_todo, new MemberOfFunction<Task>() {
- @Override
- public boolean isMember(Task o) {
- return tasktype.isAssignableFrom(o.getClass());
- }
-
- });
- } finally {
- _lock.unlock();
- }
- }
+ /** Lock for managing the queue */
+ private ReentrantLock _lock = new ReentrantLock();
+
+ private Condition _activity = _lock.newCondition();
+
+ private volatile boolean _done;
+
+ private TaskRunner _taskrunner;
+
+ private Thread _thread;
+
+ SchedulerThread(TaskRunner runner) {
+ _todo = new PriorityBlockingQueue<Task>(TODO_QUEUE_INITIAL_CAPACITY,
+ new JobComparatorByDate());
+ _taskrunner = runner;
+ }
+
+ void start() {
+ if (_thread != null)
+ return;
+
+ _done = false;
+ _thread = new Thread(this, "OdeScheduler");
+ _thread.start();
+ }
+
+ /**
+ * Shutdown the thread.
+ */
+ void stop() {
+ if (_thread == null)
+ return;
+
+ _done = true;
+ _lock.lock();
+ try {
+ _activity.signal();
+ } finally {
+ _lock.unlock();
+
+ }
+
+ while (_thread != null)
+ try {
+ _thread.join();
+ _thread = null;
+ } catch (InterruptedException e) {
+ ;
+ }
+
+ }
+
+ /**
+ * Add a job to the todo queue.
+ *
+ * @param job
+ */
+ void enqueue(Task task) {
+ _lock.lock();
+ try {
+ _todo.add(task);
+ _activity.signal();
+ } finally {
+ _lock.unlock();
+ }
+ }
+
+ /**
+ * Remove a job to the todo queue.
+ *
+ * @param job
+ */
+ void dequeue(Task task) {
+ _lock.lock();
+ try {
+ _todo.remove(task);
+ _activity.signal();
+ } finally {
+ _lock.unlock();
+ }
+ }
+
+
+ /**
+ * Get the size of the todo queue.
+ *
+ * @return
+ */
+ public int size() {
+ return _todo.size();
+ }
+
+ /**
+ * Pop items off the todo queue, and send them to the task runner for processing.
+ */
+ public void run() {
+ while (!_done) {
+ _lock.lock();
+ try {
+ long nextjob;
+ while ((nextjob = nextJobTime()) > 0 && !_done)
+ _activity.await(nextjob, TimeUnit.MILLISECONDS);
+
+ if (!_done && nextjob == 0) {
+ Task task = _todo.take();
+ _taskrunner.runTask(task);
+
+ }
+ } catch (InterruptedException ex) {
+ ; // ignore
+ } finally {
+ _lock.unlock();
+ }
+ }
+ }
+
+ /**
+ * Calculate the time until the next available job.
+ *
+ * @return time until next job, 0 if one is one is scheduled to go, and some
+ * really large number if there are no jobs to speak of
+ */
+ private long nextJobTime() {
+ assert _lock.isLocked();
+
+ Task job = _todo.peek();
+ if (job == null)
+ return Long.MAX_VALUE;
+
+ return Math.max(0, job.schedDate - System.currentTimeMillis());
+ }
+
+ /**
+ * Remove the tasks of a given type from the list.
+ * @param tasktype type of task
+ */
+ public void clearTasks(final Class<? extends Task> tasktype) {
+ _lock.lock();
+ try {
+ CollectionsX.remove_if(_todo, new MemberOfFunction<Task>() {
+ @Override
+ public boolean isMember(Task o) {
+ return tasktype.isAssignableFrom(o.getClass());
+ }
+
+ });
+ } finally {
+ _lock.unlock();
+ }
+ }
}
Modified: ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
URL: http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java?rev=691693&r1=691692&r2=691693&view=diff
==============================================================================
--- ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java (original)
+++ ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java Wed Sep 3 10:32:51 2008
@@ -132,7 +132,13 @@
}
public void cancelJob(String jobId) throws ContextException {
- // TODO: maybe later, not really necessary.
+ _todo.dequeue(new Job(0, jobId, false, null));
+ try {
+ _db.deleteJob(jobId, _nodeId);
+ } catch (DatabaseException e) {
+ __log.debug("Job removal failed.", e);
+ throw new ContextException("Job removal failed.", e);
+ }
}
public <T> T execTransaction(Callable<T> transaction) throws Exception, ContextException {