You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by ms...@apache.org on 2007/07/26 19:28:22 UTC
svn commit: r559893 - in /incubator/ode/branches/bart: ./
bpel-epr/src/main/java/org/apache/ode/il/
bpel-runtime/src/main/java/org/apache/ode/bpel/engine/
bpel-runtime/src/test/java/org/apache/ode/bpel/engine/
bpel-runtime/src/test/java/org/apache/ode/...
Author: mszefler
Date: Thu Jul 26 10:28:21 2007
New Revision: 559893
URL: http://svn.apache.org/viewvc?view=rev&rev=559893
Log:
BART: test case fixings.
Removed:
incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/InstanceLockManager.java
incubator/ode/branches/bart/bpel-runtime/src/test/java/org/apache/ode/bpel/engine/InstanceLockManagerTest.java
incubator/ode/branches/bart/jbi/src/test/resources/log4j.properties
Modified:
incubator/ode/branches/bart/Rakefile
incubator/ode/branches/bart/bpel-epr/src/main/java/org/apache/ode/il/MockScheduler.java
incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingMyRoleMessageExchangeImpl.java
incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/DebuggerSupport.java
incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MemBackedMessageImpl.java
incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java
incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
incubator/ode/branches/bart/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java
incubator/ode/branches/bart/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/ProcessManagementTest.java
Modified: incubator/ode/branches/bart/Rakefile
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/Rakefile?view=diff&rev=559893&r1=559892&r2=559893
==============================================================================
--- incubator/ode/branches/bart/Rakefile (original)
+++ incubator/ode/branches/bart/Rakefile Thu Jul 26 10:28:21 2007
@@ -246,7 +246,7 @@
compile.from apt
compile.with projects("bpel-api", "bpel-compiler", "bpel-dao", "bpel-obj", "bpel-schemas",
"bpel-store", "jacob", "jacob-ap", "utils"),
- COMMONS.logging, COMMONS.collections, JAXEN, JAVAX.persistence, JAVAX.stream, SAXON, WSDL4J, XMLBEANS
+ COMMONS.logging, COMMONS.collections, JAXEN, JAVAX.persistence, JAVAX.stream, JAVAX.transaction, SAXON, WSDL4J, XMLBEANS
test.compile.with projects("scheduler-simple", "dao-jpa", "dao-hibernate", "bpel-epr"),
BACKPORT, COMMONS.pool, COMMONS.lang, DERBY, JAVAX.connector, JAVAX.transaction,
@@ -293,7 +293,7 @@
define "bpel-test" do
compile.with projects("bpel-api", "bpel-compiler", "bpel-dao", "bpel-runtime",
"bpel-store", "utils", "bpel-epr", "dao-jpa"),
- DERBY, Java::JUNIT_REQUIRES, JAVAX.persistence, OPENJPA, WSDL4J
+ DERBY, Java::JUNIT_REQUIRES, JAVAX.persistence, OPENJPA, WSDL4J, JAVAX.transaction
test.with projects("bpel-obj", "jacob", "bpel-schemas",
"bpel-scripts", "scheduler-simple"),
Modified: incubator/ode/branches/bart/bpel-epr/src/main/java/org/apache/ode/il/MockScheduler.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-epr/src/main/java/org/apache/ode/il/MockScheduler.java?view=diff&rev=559893&r1=559892&r2=559893
==============================================================================
--- incubator/ode/branches/bart/bpel-epr/src/main/java/org/apache/ode/il/MockScheduler.java (original)
+++ incubator/ode/branches/bart/bpel-epr/src/main/java/org/apache/ode/il/MockScheduler.java Thu Jul 26 10:28:21 2007
@@ -24,6 +24,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -54,6 +55,7 @@
public MockScheduler(TransactionManager txm) {
_txm = txm;
+ _exec = Executors.newSingleThreadScheduledExecutor();
}
ThreadLocal<List<Synchronization>> _synchros = new ThreadLocal<List<Synchronization>>() {
@@ -63,7 +65,8 @@
}
};
- public String schedulePersistedJob(final Map<String, Object> detail, final Date date) throws ContextException {
+ public String schedulePersistedJob(final Map<String, Object> detail, Date dt) throws ContextException {
+ final Date date = dt == null ? new Date() : dt;
registerSynchronizer(new Synchronization() {
public void afterCompletion(int status) {
long delay = Math.max(0, date.getTime() - System.currentTimeMillis());
Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingMyRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingMyRoleMessageExchangeImpl.java?view=diff&rev=559893&r1=559892&r2=559893
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingMyRoleMessageExchangeImpl.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingMyRoleMessageExchangeImpl.java Thu Jul 26 10:28:21 2007
@@ -34,7 +34,7 @@
Future<Status> future = _future != null ? _future : super.invokeAsync();
try {
- future.get(Math.max(_timeout,1), TimeUnit.MILLISECONDS);
+ _status = future.get(Math.max(_timeout,1), TimeUnit.MILLISECONDS);
_done = true;
return _status;
} catch (InterruptedException e) {
Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java?view=diff&rev=559893&r1=559892&r2=559893
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java Thu Jul 26 10:28:21 2007
@@ -50,6 +50,7 @@
import org.apache.ode.bpel.iapi.EndpointReference;
import org.apache.ode.bpel.iapi.InvocationStyle;
import org.apache.ode.bpel.iapi.MessageExchange;
+import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
import org.apache.ode.bpel.iapi.PartnerRoleChannel;
import org.apache.ode.bpel.iapi.ProcessConf;
import org.apache.ode.bpel.iapi.MessageExchange.MessageExchangePattern;
@@ -152,6 +153,7 @@
_pid = conf.getProcessId();
_pconf = conf;
_hydrationLatch = new HydrationLatch();
+ _contexts = server._contexts;
_inMemDao = new BpelDAOConnectionFactoryImpl(_contexts.txManager);
// TODO : do this on a per-partnerlink basis, support transacted styles.
@@ -266,17 +268,16 @@
instance.execute();
}
-
-
private void enqueueInstanceWork(Long instanceId, Runnable runnable) {
BpelInstanceWorker iworker = _instanceWorkerCache.get(instanceId);
- iworker.enqueue(new ProcessRunnable(runnable));
+ iworker.enqueue(runnable);
}
private void enqueueInstanceTransaction(Long instanceId, final Runnable runnable) {
- enqueueInstanceWork(instanceId, new ProcessRunnable(new TransactedRunnable(runnable)));
+ BpelInstanceWorker iworker = _instanceWorkerCache.get(instanceId);
+ iworker.enqueue(_server.new TransactedRunnable(runnable));
}
-
+
/**
* Schedule work for a given instance; work will occur if transaction commits.
*
@@ -292,10 +293,6 @@
});
}
-
- private void scheduleInstanceTX(Long instanceId, final Runnable transaction) {
- scheduleInstanceWork(instanceId, new TransactedRunnable(transaction));
- }
private <T> T doInstanceWork(Long instanceId, final Callable<T> callable) {
try {
@@ -315,23 +312,6 @@
return null;
}
- void initMyRoleMex(MyRoleMessageExchangeImpl mex) {
- markused();
- PartnerLinkMyRoleImpl target = null;
- for (Endpoint endpoint : getEndpointToMyRoleMap().keySet()) {
- if (endpoint.serviceName.equals(mex.getServiceName()))
- target = getEndpointToMyRoleMap().get(endpoint);
- }
- if (target != null) {
- Operation op = target._plinkDef.getMyRoleOperation(mex.getOperationName());
- MessageExchange.MessageExchangePattern pattern = op.getOutput() == null ? MessageExchange.MessageExchangePattern.REQUEST_ONLY
- : MessageExchange.MessageExchangePattern.REQUEST_RESPONSE;
- mex.init(target._plinkDef.myRolePortType, op, pattern);
- } else {
- __log.warn("Couldn't find endpoint from service " + mex.getServiceName() + " when initializing a myRole mex.");
- }
- }
-
/**
* Extract the value of a BPEL property from a BPEL messsage variable.
*
@@ -467,18 +447,7 @@
*/
private <T> Future<T> enqueueTransaction(final Callable<T> tx) {
// We have to wrap our transaction to make sure that we are hydrated when the transaction runs.
- return _server.execIsolatedTransaction(new Callable<T>() {
- public T call() throws Exception {
- _hydrationLatch.latch(1);
- try {
- return tx.call();
- } finally {
- _hydrationLatch.release(1);
- }
- }
-
- });
-
+ return _server.enqueueTransaction(new ProcessCallable<T>(tx));
}
private void execInstanceEvent(WorkEvent we) {
@@ -768,6 +737,7 @@
OPartnerLink plink = (OPartnerLink) _oprocess.getChild(mexdao.getPartnerLinkModelId());
PortType ptype = plink.myRolePortType;
Operation op = plink.getMyRoleOperation(mexdao.getOperation());
+ mex.load(mexdao);
mex.init(ptype, op, MessageExchangePattern.valueOf(mexdao.getPattern()));
return mex;
} finally {
@@ -809,6 +779,8 @@
throw new BpelEngineException("Unexpected InvocationStyle: " + istyle);
}
+
+ mex.load(mexdao);
return mex;
} finally {
_hydrationLatch.release(1);
@@ -1031,39 +1003,15 @@
_server.scheduleRunnable(new ProcessRunnable(runnable));
}
-
- class TransactedRunnable implements Runnable {
- Runnable _work;
-
- TransactedRunnable(Runnable work) {
- _work = work;
- }
-
- public void run() {
- _contexts.execTransaction(_work);
- }
- }
-
- class TransactedCallable<T> implements Callable<T> {
- Callable<T> _work;
-
- TransactedCallable(Callable<T> work) {
- _work = work;
- }
-
- public T call() throws Exception {
- return _contexts.execTransaction(_work);
- }
- }
-
+
class ProcessRunnable implements Runnable {
Runnable _work;
-
+
ProcessRunnable(Runnable work) {
_work = work;
}
-
+
public void run() {
_hydrationLatch.latch(1);
try {
@@ -1071,28 +1019,93 @@
} finally {
_hydrationLatch.release(1);
}
-
+
}
-
+
}
-
+
class ProcessCallable<T> implements Callable<T> {
Callable<T> _work;
-
+
ProcessCallable(Callable<T> work) {
_work = work;
}
-
- public T call () throws Exception {
+
+ public T call() throws Exception {
_hydrationLatch.latch(1);
try {
return _work.call();
} finally {
_hydrationLatch.release(1);
}
+
+ }
+
+ }
+
+ public MyRoleMessageExchange createNewMyRoleMex(final InvocationStyle istyle, final QName targetService, final String operation, final String clientKey) {
+ _hydrationLatch.latch(1);
+ try {
+ final PartnerLinkMyRoleImpl target = getPartnerLinkForService(targetService);
+ if (target == null)
+ throw new BpelEngineException("NoSuchService: " + targetService);
+ final Operation op = target._plinkDef.getMyRoleOperation(operation);
+ if (op == null)
+ throw new BpelEngineException("NoSuchOperation: " + operation);
+
+ final MessageExchangePattern pattern = op.getOutput() == null ? MessageExchange.MessageExchangePattern.REQUEST_ONLY
+ : MessageExchange.MessageExchangePattern.REQUEST_RESPONSE;
+
+ Callable<MyRoleMessageExchange> createDao = new Callable<MyRoleMessageExchange>() {
+
+ public MyRoleMessageExchange call() throws Exception {
+ MessageExchangeDAO dao = createMessageExchange(MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE);
+ dao.setInvocationStyle(istyle.toString());
+ dao.setCorrelationId(clientKey);
+ dao.setCorrelationStatus(CorrelationStatus.UKNOWN_ENDPOINT.toString());
+ dao.setPattern(pattern.toString());
+ dao.setCallee(targetService);
+ dao.setStatus(Status.NEW.toString());
+ dao.setOperation(operation);
+ dao.setPartnerLinkModelId(target._plinkDef.getId());
+ dao.setTimeout(30 * 1000); // default timeout is 30 seconds, can be chaged by client.
+ return createMyRoleMex(dao);
+ }
+
+ };
+
+ try {
+ if (isInMemory() || istyle == InvocationStyle.TRANSACTED || istyle == InvocationStyle.RELIABLE)
+ return createDao.call();
+ else
+ return _contexts.execTransaction(createDao);
+
+ } catch (BpelEngineException be) {
+ throw be;
+ } catch (Exception e) {
+ __log.error("Internal Error: could not create message exchange.", e);
+ throw new BpelEngineException("Internal Error", e);
}
+ } finally {
+ _hydrationLatch.release(1);
}
+ }
+
+ /**
+ * Find the partner-link-my-role that corresponds to the given service name.
+ * @param serviceName name of service
+ * @return corresponding {@link PartnerLinkMyRoleImpl}
+ */
+ private PartnerLinkMyRoleImpl getPartnerLinkForService(QName serviceName) {
+ PartnerLinkMyRoleImpl target = null;
+ for (Endpoint endpoint : getEndpointToMyRoleMap().keySet()) {
+ if (endpoint.serviceName.equals(serviceName))
+ target = getEndpointToMyRoleMap().get(endpoint);
+ }
+
+ return target;
+ }
}
Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java?view=diff&rev=559893&r1=559892&r2=559893
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java Thu Jul 26 10:28:21 2007
@@ -33,6 +33,7 @@
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import javax.transaction.TransactionManager;
import javax.xml.namespace.QName;
import org.apache.commons.logging.Log;
@@ -162,7 +163,7 @@
if (_exec == null)
_exec = Executors.newCachedThreadPool();
-
+
_contexts.scheduler.start();
_state = State.RUNNING;
__log.info(__msgs.msgServerStarted());
@@ -431,6 +432,10 @@
}
}
+ public void setTransactionManager(TransactionManager txm) {
+ _contexts.txManager = txm;
+ }
+
public void setDehydrationPolicy(DehydrationPolicy dehydrationPolicy) {
_dehydrationPolicy = dehydrationPolicy;
}
@@ -475,70 +480,13 @@
if (target == null)
throw new BpelEngineException("NoSuchService: " + targetService);
- Callable<String> createDao = new Callable<String>() {
-
- public String call() throws Exception {
- MessageExchangeDAO dao = target.createMessageExchange(MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE);
- dao.setInvocationStyle(istyle.toString());
- dao.setCorrelationId(clientKey);
- dao.setCorrelationStatus(CorrelationStatus.UKNOWN_ENDPOINT.toString());
- dao.setPattern(MessageExchangePattern.UNKNOWN.toString());
- dao.setCallee(targetService);
- dao.setStatus(Status.NEW.toString());
- dao.setOperation(operation);
- return dao.getMessageExchangeId();
- }
-
- };
-
- MyRoleMessageExchangeImpl mex;
- String mexId;
- switch (istyle) {
- case ASYNC:
- try {
- mexId = _contexts.execTransaction(createDao);
- } catch (Exception e) {
- __log.error("Internal Error: could not execute isolated transaction.", e);
- throw new BpelEngineException("Internal Error", e);
- }
- mex = new AsyncMyRoleMessageExchangeImpl(target, mexId);
- break;
- case BLOCKING:
- try {
- mexId = _contexts.execTransaction(createDao);
- } catch (Exception e) {
- __log.error("Internal Error: could not execute isolated transaction.", e);
- throw new BpelEngineException("Internal Error", e);
- }
- mex = new BlockingMyRoleMessageExchangeImpl(target, mexId);
- break;
-
- case RELIABLE:
+ if (istyle == InvocationStyle.RELIABLE || istyle == InvocationStyle.TRANSACTED)
assertTransaction();
- try {
- mexId = createDao.call();
- } catch (Exception e) {
- __log.error("Internal Error: could not execute DB calls.", e);
- throw new BpelEngineException("Internal Error", e);
- }
- mex = new ReliableMyRoleMessageExchangeImpl(target, mexId);
- break;
- case TRANSACTED:
- assertTransaction();
- try {
- mexId = createDao.call();
- } catch (Exception e) {
- __log.error("Internal Error: could not execute DB calls.", e);
- throw new BpelEngineException("Internal Error", e);
- }
- mex = new TransactedMyRoleMessageExchangeImpl(target, mexId);
- default:
- throw new Error("Internal Error: unknown InvocationStyle: " + istyle);
- }
-
- target.initMyRoleMex(mex);
-
- return mex;
+ else
+ assertNoTransaction();
+
+
+ return target.createNewMyRoleMex(istyle, targetService, operation, clientKey);
} finally {
_mngmtLock.readLock().unlock();
}
@@ -587,11 +535,10 @@
};
try {
- if (inmemdao != null)
+ if (inmemdao != null || _contexts.isTransacted()) // TODO: hmmmmm, catch-22, need to be able to infer if TRANSACTED/RELIABLE just from mex id ? here || istyle == InvocationStyle.RELIABLE || istyle == InvocationStyle.TRANSACTED)
return loadMex.call();
-
- // TODO: should we not do this in the current thread if the mex is a transacted/reliable?
- return execIsolatedTransaction(loadMex).get();
+ else
+ return enqueueTransaction(loadMex).get();
} catch (ContextException e) {
throw new BpelEngineException(e);
} catch (Exception e) {
@@ -658,13 +605,8 @@
}
- <T> Future<T> execIsolatedTransaction(final Callable<T> transaction) throws ContextException {
- return _exec.submit(new Callable<T>() {
- public T call() throws Exception {
-
- return _contexts.execTransaction(transaction);
- }
- });
+ <T> Future<T> enqueueTransaction(final Callable<T> transaction) throws ContextException {
+ return _exec.submit(new ServerCallable<T>(new TransactedCallable<T>(transaction)));
}
/**
@@ -673,7 +615,7 @@
*/
void scheduleRunnable(Runnable runnable) {
assertTransaction();
- _contexts.registerCommitSynchronizer(runnable);
+ _contexts.registerCommitSynchronizer(new ServerRunnable(runnable));
}
protected void assertTransaction() {
@@ -681,6 +623,11 @@
throw new BpelEngineException("Operation must be performed in a transaction!");
}
+ protected void assertNoTransaction() {
+ if (_contexts.isTransacted())
+ throw new BpelEngineException("Operation must be performed outside of a transaction!");
+ }
+
void fireEvent(BpelEvent event) {
// Note that the eventListeners list is a copy-on-write array, so need
// to mess with synchronization.
@@ -761,4 +708,68 @@
}
+
+
+ class ServerRunnable implements Runnable {
+ final Runnable _work;
+ ServerRunnable(Runnable work) {
+ _work = work;
+ }
+
+ public void run() {
+ _mngmtLock.readLock().lock();
+ try {
+ _work.run();
+ } catch (Throwable ex) {
+ __log.fatal("Internal Error", ex);
+ } finally {
+ _mngmtLock.readLock().unlock();
+ }
+ }
+
+ }
+
+
+
+ class ServerCallable<T> implements Callable<T>{
+ final Callable<T> _work;
+ ServerCallable(Callable<T> work) {
+ _work = work;
+ }
+
+ public T call () throws Exception {
+ _mngmtLock.readLock().lock();
+ try {
+ return _work.call();
+ } finally {
+ _mngmtLock.readLock().unlock();
+ }
+ }
+
+ }
+
+ class TransactedCallable<T> implements Callable<T> {
+ Callable<T> _work;
+
+ TransactedCallable(Callable<T> work) {
+ _work = work;
+ }
+
+ public T call() throws Exception {
+ return _contexts.execTransaction(_work);
+ }
+ }
+
+
+ class TransactedRunnable implements Runnable {
+ Runnable _work;
+
+ TransactedRunnable(Runnable work) {
+ _work = work;
+ }
+
+ public void run() {
+ _contexts.execTransaction(_work);
+ }
+ }
}
Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/DebuggerSupport.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/DebuggerSupport.java?view=diff&rev=559893&r1=559892&r2=559893
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/DebuggerSupport.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/DebuggerSupport.java Thu Jul 26 10:28:21 2007
@@ -49,24 +49,26 @@
import java.util.Set;
/**
- * Class providing functions used to support debugging funtionality
- * in the BPEL engine. This class serves as the underlying
- * implementation of the {@link BpelManagementFacade} interface, and
- * the various MBean interfaces.
- *
+ * Class providing functions used to support debugging funtionality in the BPEL engine. This class serves as the underlying
+ * implementation of the {@link BpelManagementFacade} interface, and the various MBean interfaces.
+ *
* @todo Need to revisit the whole stepping/suspend/resume mechanism.
*/
class DebuggerSupport {
private static final Log __log = LogFactory.getLog(DebuggerSupport.class);
+
private static final Messages __msgs = MessageBundle.getMessages(Messages.class);
static final Breakpoint[] EMPTY_BP = new Breakpoint[0];
private boolean _enabled = true;
+
private Breakpoint[] _globalBreakPoints = EMPTY_BP;
+
private final Set<Long> _step = new HashSet<Long>();
- private final Map<Long, Breakpoint[]>_instanceBreakPoints = new HashMap<Long, Breakpoint[]>();
+
+ private final Map<Long, Breakpoint[]> _instanceBreakPoints = new HashMap<Long, Breakpoint[]>();
/** BPEL process database */
private BpelProcessDatabase _db;
@@ -76,42 +78,40 @@
/**
* Constructor.
- * @param db BPEL process database
+ *
+ * @param db
+ * BPEL process database
*/
DebuggerSupport(BpelProcess process) {
_process = process;
- _db = new BpelProcessDatabase(_process._contexts,
- _process._pid);
+ _db = new BpelProcessDatabase(_process._contexts, _process._pid);
}
- void enable(boolean enabled){
+ void enable(boolean enabled) {
_enabled = enabled;
}
- Breakpoint[] getGlobalBreakpoints(){
+ Breakpoint[] getGlobalBreakpoints() {
return _globalBreakPoints;
}
- Breakpoint[] getBreakpoints(Long pid){
+ Breakpoint[] getBreakpoints(Long pid) {
Breakpoint[] arr = _instanceBreakPoints.get(pid);
- return (arr == null)
- ? EMPTY_BP
- : arr;
+ return (arr == null) ? EMPTY_BP : arr;
}
- void addGlobalBreakpoint(Breakpoint breakpoint){
+ void addGlobalBreakpoint(Breakpoint breakpoint) {
Collection<Breakpoint> c = ArrayUtils.makeCollection(ArrayList.class, _globalBreakPoints);
c.add(breakpoint);
_globalBreakPoints = c.toArray(new Breakpoint[c.size()]);
}
- void addBreakpoint(Long pid, Breakpoint breakpoint){
+ void addBreakpoint(Long pid, Breakpoint breakpoint) {
Breakpoint[] bpArr = _instanceBreakPoints.get(pid);
- if(bpArr == null) {
- bpArr = new Breakpoint[]{breakpoint};
- }
- else{
+ if (bpArr == null) {
+ bpArr = new Breakpoint[] { breakpoint };
+ } else {
Collection<Breakpoint> c = ArrayUtils.makeCollection(ArrayList.class, bpArr);
c.add(breakpoint);
bpArr = c.toArray(new Breakpoint[c.size()]);
@@ -119,22 +119,21 @@
_instanceBreakPoints.put(pid, bpArr);
}
- void removeGlobalBreakpoint(Breakpoint breakpoint){
+ void removeGlobalBreakpoint(Breakpoint breakpoint) {
Collection<Breakpoint> c = ArrayUtils.makeCollection(ArrayList.class, _globalBreakPoints);
c.remove(breakpoint);
_globalBreakPoints = c.toArray(new Breakpoint[c.size()]);
}
- void removeBreakpoint(Long pid, Breakpoint breakpoint){
+ void removeBreakpoint(Long pid, Breakpoint breakpoint) {
Breakpoint[] bpArr = _instanceBreakPoints.get(pid);
- if(bpArr != null){
+ if (bpArr != null) {
Collection<Breakpoint> c = ArrayUtils.makeCollection(ArrayList.class, bpArr);
c.remove(breakpoint);
bpArr = c.toArray(new Breakpoint[c.size()]);
- if(bpArr.length == 0) {
+ if (bpArr.length == 0) {
_instanceBreakPoints.remove(pid);
- }
- else {
+ } else {
_instanceBreakPoints.put(pid, bpArr);
}
}
@@ -150,7 +149,7 @@
if (instance == null)
throw new InstanceNotFoundException("" + iid);
- if(ProcessState.STATE_SUSPENDED == instance.getState()){
+ if (ProcessState.STATE_SUSPENDED == instance.getState()) {
// send event
ProcessInstanceStateChangeEvent evt = new ProcessInstanceStateChangeEvent();
evt.setOldState(ProcessState.STATE_SUSPENDED);
@@ -193,22 +192,23 @@
/**
* Process BPEL events WRT debugging.
- * @param event BPEL event
+ *
+ * @param event
+ * BPEL event
*/
public void onEvent(BpelEvent event) {
- if(_enabled && (event instanceof ProcessInstanceEvent) &&
- // I have this excluded since we are recursing here when onEvent()
+ if (_enabled && (event instanceof ProcessInstanceEvent) &&
+ // I have this excluded since we are recursing here when onEvent()
// is called from DebugSupport codepath's which change state
!(event instanceof ProcessInstanceStateChangeEvent)) {
- final ProcessInstanceEvent evt = (ProcessInstanceEvent)event;
+ final ProcessInstanceEvent evt = (ProcessInstanceEvent) event;
//
// prevent leaking of memory
//
- if(evt instanceof ProcessCompletionEvent ||
- evt instanceof ProcessTerminationEvent) {
+ if (evt instanceof ProcessCompletionEvent || evt instanceof ProcessTerminationEvent) {
_step.remove(evt.getProcessInstanceId());
_instanceBreakPoints.remove(evt.getProcessInstanceId());
return;
@@ -218,19 +218,19 @@
if (!suspend) {
suspend = checkBreakPoints(evt, _globalBreakPoints);
}
- if (!suspend){
+ if (!suspend) {
Breakpoint[] bp = _instanceBreakPoints.get(evt.getProcessInstanceId());
- if(bp != null) {
+ if (bp != null) {
suspend = checkBreakPoints(evt, bp);
}
}
- if(suspend){
+ if (suspend) {
_step.remove(evt.getProcessInstanceId());
try {
ProcessDAO process = _db.getProcessDAO();
ProcessInstanceDAO instance = process.getInstance(evt.getProcessInstanceId());
- if(ProcessState.canExecute(instance.getState())){
+ if (ProcessState.canExecute(instance.getState())) {
// send event
ProcessInstanceStateChangeEvent changeEvent = new ProcessInstanceStateChangeEvent();
changeEvent.setOldState(instance.getState());
@@ -251,17 +251,15 @@
}
}
- private boolean checkStep(ProcessInstanceEvent event){
+ private boolean checkStep(ProcessInstanceEvent event) {
Long pid = event.getProcessInstanceId();
- return (_step.contains(pid)
- && (event instanceof ActivityExecStartEvent
- || event instanceof ScopeCompletionEvent));
+ return (_step.contains(pid) && (event instanceof ActivityExecStartEvent || event instanceof ScopeCompletionEvent));
}
- private boolean checkBreakPoints(ProcessInstanceEvent event, Breakpoint[] breakpoints){
+ private boolean checkBreakPoints(ProcessInstanceEvent event, Breakpoint[] breakpoints) {
boolean suspended = false;
- for(int i = 0; i < breakpoints.length; ++i){
- if (((BreakpointImpl)breakpoints[i]).checkBreak(event)){
+ for (int i = 0; i < breakpoints.length; ++i) {
+ if (((BreakpointImpl) breakpoints[i]).checkBreak(event)) {
suspended = true;
break;
}
@@ -279,7 +277,7 @@
if (instance == null)
throw new InstanceNotFoundException("" + iid);
- if(ProcessState.STATE_SUSPENDED == instance.getState()){
+ if (ProcessState.STATE_SUSPENDED == instance.getState()) {
// send event
ProcessInstanceStateChangeEvent evt = new ProcessInstanceStateChangeEvent();
evt.setOldState(ProcessState.STATE_SUSPENDED);
@@ -299,7 +297,6 @@
we.setIID(iid);
_process._contexts.scheduler.schedulePersistedJob(we.getDetail(), null);
-
return true;
}
return false;
@@ -310,7 +307,7 @@
throw infe;
} catch (Exception ex) {
__log.error("ProcessingEx", ex);
- throw new ProcessingException(ex.getMessage(),ex);
+ throw new ProcessingException(ex.getMessage(), ex);
}
return doit;
@@ -372,8 +369,7 @@
//
// TerminationEvent (peer of ProcessCompletionEvent)
//
- ProcessTerminationEvent terminationEvent =
- new ProcessTerminationEvent();
+ ProcessTerminationEvent terminationEvent = new ProcessTerminationEvent();
terminationEvent.setProcessInstanceId(iid);
terminationEvent.setProcessName(processName);
terminationEvent.setProcessId(processId);
@@ -391,7 +387,6 @@
__log.error("DbError", e);
throw new RuntimeException(e);
}
-
}
}
Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MemBackedMessageImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MemBackedMessageImpl.java?view=diff&rev=559893&r1=559892&r2=559893
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MemBackedMessageImpl.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MemBackedMessageImpl.java Thu Jul 26 10:28:21 2007
@@ -2,8 +2,6 @@
import javax.xml.namespace.QName;
-import org.apache.derby.impl.store.raw.data.SetReservedSpaceOperation;
-import org.apache.ode.bpel.iapi.BpelEngineException;
import org.w3c.dom.Element;
/**
Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java?view=diff&rev=559893&r1=559892&r2=559893
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java Thu Jul 26 10:28:21 2007
@@ -144,20 +144,18 @@
}
void load(MessageExchangeDAO dao) {
- if (dao.getMessageExchangeId().equals(_mexId))
+ if (!dao.getMessageExchangeId().equals(_mexId))
throw new IllegalArgumentException("MessageExchangeId mismatch!");
-
- if (_pattern == null)
- _pattern = MessageExchangePattern.valueOf(dao.getPattern());
- if (_opname == null)
- _opname = dao.getOperation();
+ _pattern = MessageExchangePattern.valueOf(dao.getPattern());
+ _opname = dao.getOperation();
+ _timeout = dao.getTimeout();
+
if (_fault == null)
_fault = dao.getFault();
if (_explanation == null)
_explanation = dao.getFaultExplanation();
if (_status == null)
_status = Status.valueOf(dao.getStatus());
- _timeout = dao.getTimeout();
}
public void save(MessageExchangeDAO dao) {
@@ -411,7 +409,7 @@
return action.call(getDAO());
} else {
try {
- return _process._server.execIsolatedTransaction(new Callable<T>() {
+ return _process._server.enqueueTransaction(new Callable<T>() {
public T call() throws Exception {
assertTransaction();
return action.call(getDAO());
Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java?view=diff&rev=559893&r1=559892&r2=559893
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java Thu Jul 26 10:28:21 2007
@@ -39,12 +39,9 @@
@Override
void load(MessageExchangeDAO dao) {
super.load(dao);
- if (_cstatus == null)
- _cstatus = CorrelationStatus.valueOf(dao.getCorrelationStatus());
- if (_clientId == null)
- _clientId = dao.getCorrelationId();
- if (_callee == null)
- _callee = dao.getCallee();
+ _cstatus = CorrelationStatus.valueOf(dao.getCorrelationStatus());
+ _clientId = dao.getCorrelationId();
+ _callee = dao.getCallee();
}
@Override
Modified: incubator/ode/branches/bart/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java?view=diff&rev=559893&r1=559892&r2=559893
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java Thu Jul 26 10:28:21 2007
@@ -58,20 +58,30 @@
import org.w3c.dom.Document;
import org.w3c.dom.Element;
-
class MockBpelServer {
- BpelServerImpl _server;
- ProcessStoreImpl _store;
- TransactionManager _txManager;
- Database _database;
- DataSource _dataSource;
- SchedulerWrapper _scheduler;
- BpelDAOConnectionFactory _daoCF;
- EndpointReferenceContext _eprContext;
- MessageExchangeContext _mexContext;
- BindingContext _bindContext;
- HashMap<String, QName> _activated = new HashMap<String,QName>();
+ BpelServerImpl _server;
+
+ ProcessStoreImpl _store;
+
+ TransactionManager _txManager;
+
+ Database _database;
+
+ DataSource _dataSource;
+
+ SchedulerWrapper _scheduler;
+
+ BpelDAOConnectionFactory _daoCF;
+
+ EndpointReferenceContext _eprContext;
+
+ MessageExchangeContext _mexContext;
+
+ BindingContext _bindContext;
+
+ HashMap<String, QName> _activated = new HashMap<String, QName>();
+
HashMap<String, EndpointReference> _endpoints = new HashMap<String, EndpointReference>();
public MockBpelServer() {
@@ -86,7 +96,8 @@
_server.setDaoConnectionFactory(_daoCF);
if (_scheduler == null)
throw new RuntimeException("No scheduler");
- _store = new ProcessStoreImpl(_dataSource,"jpa", true);
+ _store = new ProcessStoreImpl(_dataSource, "jpa", true);
+ _server.setTransactionManager(_txManager);
_server.setScheduler(_scheduler);
_server.setEndpointReferenceContext(createEndpointReferenceContext());
_server.setMessageExchangeContext(createMessageExchangeContext());
@@ -102,53 +113,34 @@
public Collection<QName> deploy(File deploymentUnitDirectory) {
Collection<QName> pids = _store.deploy(deploymentUnitDirectory);
- for (QName pid: pids)
+ for (QName pid : pids)
_server.register(_store.getProcessConfiguration(pid));
return pids;
}
public void invoke(QName serviceName, String opName, Element body) throws Exception {
- try {
- String messageId = new GUID().toString();
- MyRoleMessageExchange mex;
+ String messageId = new GUID().toString();
+ MyRoleMessageExchange mex;
+
+ mex = _server.createMessageExchange(InvocationStyle.BLOCKING, serviceName, opName, "" + messageId);
+ if (mex.getOperation() == null)
+ throw new Exception("Did not find operation " + opName + " on service " + serviceName);
+ Message request = mex.createMessage(mex.getOperation().getInput().getMessage().getQName());
+ Element wrapper = body.getOwnerDocument().createElementNS("", "main");
+ wrapper.appendChild(body);
+ Element message = body.getOwnerDocument().createElementNS("", "message");
+ message.appendChild(wrapper);
+ request.setMessage(message);
+ mex.setRequest(request);
+ mex.invokeBlocking();
+ mex.complete();
- _txManager.begin();
- mex = _server.createMessageExchange(InvocationStyle.ASYNC,serviceName, opName, "" + messageId);
- if (mex.getOperation() == null)
- throw new Exception("Did not find operation " + opName + " on service " + serviceName);
- Message request = mex.createMessage(mex.getOperation().getInput().getMessage().getQName());
- Element wrapper = body.getOwnerDocument().createElementNS("", "main");
- wrapper.appendChild(body);
- Element message = body.getOwnerDocument().createElementNS("", "message");
- message.appendChild(wrapper);
- request.setMessage(message);
- mex.setRequest(request);
- mex.invokeAsync();
- mex.complete();
- _txManager.commit();
- } catch (Exception except) {
- _txManager.rollback();
- throw except;
- }
}
public TransactionManager getTransactionManager() {
return _txManager;
}
- public void waitForBlocking() {
- try {
- long delay = 1000;
- while (true) {
- // Be warned: ugly hack and not safe for slow CPUs.
- long cutoff = System.currentTimeMillis() - delay;
- if (_scheduler._nextSchedule < cutoff)
- break;
- Thread.sleep(delay);
- }
- } catch (InterruptedException except) { }
- }
-
public void shutdown() throws Exception {
_server.stop();
_scheduler.stop();
@@ -204,44 +196,57 @@
_eprContext = new EndpointReferenceContext() {
public EndpointReference resolveEndpointReference(Element element) {
String service = DOMUtils.getChildCharacterData(element);
- return (EndpointReference)_endpoints.get(service);
+ return (EndpointReference) _endpoints.get(service);
+ }
+
+ public EndpointReference convertEndpoint(QName qName, Element element) {
+ return null;
}
- public EndpointReference convertEndpoint(QName qName, Element element) { return null; }
};
return _eprContext;
}
protected MessageExchangeContext createMessageExchangeContext() {
- _mexContext = new MessageExchangeContext() {
- public void invokePartner(PartnerRoleMessageExchange mex) { }
- public void onAsyncReply(MyRoleMessageExchange myRoleMex) { }
+ _mexContext = new MessageExchangeContext() {
+ public void invokePartner(PartnerRoleMessageExchange mex) {
+ }
+
+ public void onAsyncReply(MyRoleMessageExchange myRoleMex) {
+ }
+
public void cancel(PartnerRoleMessageExchange mex) throws ContextException {
// TODO Auto-generated method stub
-
+
}
+
public Set<InvocationStyle> getSupportedInvocationStyle(PartnerRoleChannel prc, EndpointReference partnerEpr) {
// TODO Auto-generated method stub
return null;
}
+
public void invokePartnerAsynch(PartnerRoleMessageExchange mex) throws ContextException {
// TODO Auto-generated method stub
-
+
}
+
public void invokePartnerBlocking(PartnerRoleMessageExchange mex) throws ContextException {
// TODO Auto-generated method stub
-
+
}
+
public void invokePartnerReliable(PartnerRoleMessageExchange mex) throws ContextException {
// TODO Auto-generated method stub
-
+
}
+
public void invokePartnerTransacted(PartnerRoleMessageExchange mex) throws ContextException {
// TODO Auto-generated method stub
-
+
}
+
public void onReliableReply(MyRoleMessageExchange myRoleMex) throws BpelEngineException {
// TODO Auto-generated method stub
-
+
}
};
return _mexContext;
@@ -252,12 +257,14 @@
public EndpointReference activateMyRoleEndpoint(QName processId, Endpoint myRoleEndpoint) {
final Document doc = DOMUtils.newDocument();
Element serviceRef = doc.createElementNS(EndpointReference.SERVICE_REF_QNAME.getNamespaceURI(),
- EndpointReference.SERVICE_REF_QNAME.getLocalPart());
+ EndpointReference.SERVICE_REF_QNAME.getLocalPart());
serviceRef.appendChild(doc.createTextNode(myRoleEndpoint.serviceName.toString()));
doc.appendChild(serviceRef);
_activated.put(myRoleEndpoint.toString(), processId);
return new EndpointReference() {
- public Document toXML() { return doc; }
+ public Document toXML() {
+ return doc;
+ }
};
}
@@ -266,12 +273,12 @@
}
public PartnerRoleChannel createPartnerRoleChannel(QName processId, PortType portType,
- final Endpoint initialPartnerEndpoint) {
+ final Endpoint initialPartnerEndpoint) {
final EndpointReference epr = new EndpointReference() {
public Document toXML() {
Document doc = DOMUtils.newDocument();
Element serviceRef = doc.createElementNS(EndpointReference.SERVICE_REF_QNAME.getNamespaceURI(),
- EndpointReference.SERVICE_REF_QNAME.getLocalPart());
+ EndpointReference.SERVICE_REF_QNAME.getLocalPart());
serviceRef.appendChild(doc.createTextNode(initialPartnerEndpoint.serviceName.toString()));
doc.appendChild(serviceRef);
return doc;
@@ -279,28 +286,32 @@
};
_endpoints.put(initialPartnerEndpoint.serviceName.toString(), epr);
return new PartnerRoleChannel() {
- public EndpointReference getInitialEndpointReference() { return epr; }
- public void close() { };
+ public EndpointReference getInitialEndpointReference() {
+ return epr;
+ }
+
+ public void close() {
+ };
};
}
};
return _bindContext;
}
-
private class SchedulerWrapper implements Scheduler {
MockScheduler _scheduler;
- long _nextSchedule;
+
+ long _nextSchedule;
SchedulerWrapper(BpelServerImpl server, TransactionManager txManager, DataSource dataSource) {
_scheduler = new MockScheduler(_txManager);
_scheduler.setJobProcessor(server);
}
- public String schedulePersistedJob(Map<String,Object>jobDetail,Date when) throws ContextException {
+ public String schedulePersistedJob(Map<String, Object> jobDetail, Date when) throws ContextException {
String jobId = _scheduler.schedulePersistedJob(jobDetail, when);
- _nextSchedule = when == null ? System.currentTimeMillis() : when.getTime();
+ _nextSchedule = when == null ? System.currentTimeMillis() : when.getTime();
return jobId;
}
@@ -308,11 +319,17 @@
_scheduler.cancelJob(jobId);
}
+ public void start() {
+ _scheduler.start();
+ }
- public void start() { _scheduler.start(); }
- public void stop() { _scheduler.stop(); }
- public void shutdown() { _scheduler.shutdown(); }
+ public void stop() {
+ _scheduler.stop();
+ }
+ public void shutdown() {
+ _scheduler.shutdown();
+ }
public void setJobProcessor(JobProcessor processor) throws ContextException {
_scheduler.setJobProcessor(processor);
@@ -321,7 +338,7 @@
public void jobCompleted(String jobId) {
_scheduler.jobCompleted(jobId);
-
+
}
}
Modified: incubator/ode/branches/bart/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/ProcessManagementTest.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/ProcessManagementTest.java?view=diff&rev=559893&r1=559892&r2=559893
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/ProcessManagementTest.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/ProcessManagementTest.java Thu Jul 26 10:28:21 2007
@@ -116,6 +116,5 @@
_processQName = new QName(NAMESPACE, process);
_server.invoke(_processQName, "instantiate",
DOMUtils.newDocument().createElementNS(NAMESPACE, "tns:RequestElement"));
- _server.waitForBlocking();
}
}