You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by mr...@apache.org on 2007/02/06 23:17:57 UTC
svn commit: r504332 [2/2] - in /incubator/ode/trunk:
axis2/src/main/java/org/apache/ode/axis2/
bpel-api/src/main/java/org/apache/ode/bpel/iapi/
bpel-compiler/src/main/java/org/apache/ode/bpel/compiler/
bpel-obj/src/main/java/org/apache/ode/bpel/o/ bpel...
Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java?view=diff&rev=504332&r1=504331&r2=504332
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java (original)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java Tue Feb 6 14:17:56 2007
@@ -24,21 +24,19 @@
import org.apache.ode.bpel.dao.BpelDAOConnectionFactory;
import org.apache.ode.bpel.dao.ProcessDAO;
import org.apache.ode.bpel.evt.BpelEvent;
-import org.apache.ode.bpel.explang.ConfigurationException;
import org.apache.ode.bpel.iapi.*;
import org.apache.ode.bpel.iapi.BpelEventListener;
import org.apache.ode.bpel.iapi.Scheduler.JobInfo;
import org.apache.ode.bpel.iapi.Scheduler.JobProcessorException;
import org.apache.ode.bpel.iapi.Scheduler.Synchronizer;
import org.apache.ode.bpel.intercept.MessageExchangeInterceptor;
-import org.apache.ode.bpel.o.OExpressionLanguage;
import org.apache.ode.bpel.o.OProcess;
-import org.apache.ode.bpel.o.Serializer;
-import org.apache.ode.bpel.runtime.ExpressionLanguageRuntimeRegistry;
import org.apache.ode.utils.msg.MessageBundle;
import javax.xml.namespace.QName;
-import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -46,7 +44,7 @@
* <p>
* The BPEL server implementation.
* </p>
- *
+ *
* <p>
* This implementation is intended to be thread safe. The key concurrency
* mechanism is a "management" read/write lock that synchronizes all management
@@ -55,55 +53,55 @@
* to the lock is scoped to the method, while read access is scoped to a
* transaction.
* </p>
- *
+ *
* @author Maciej Szefler <mszefler at gmail dot com>
- * @author mriou <mriou at apache dot org>
+ * @author Matthieu Riou <mriou at apache dot org>
*/
-public class BpelServerImpl implements BpelServer, Scheduler.JobProcessor {
+public class BpelServerImpl implements BpelServer, Scheduler.JobProcessor, ProcessLifecycleCallback {
private static final Log __log = LogFactory.getLog(BpelServerImpl.class);
-
private static final Messages __msgs = MessageBundle.getMessages(Messages.class);
/** Maximum age of a process before it is quiesced */
private static Long __processMaxAge;
+ private final List<BpelProcess> _runningProcesses =
+ Collections.synchronizedList(new ArrayList<BpelProcess>());
+
+ private State _state = State.SHUTDOWN;
+ private Contexts _contexts = new Contexts();
+ private DehydrationPolicy _dehydrationPolicy;
+ BpelEngineImpl _engine;
+ BpelDatabase _db;
+
+ /**
+ * Management lock for synchronizing management operations and preventing
+ * processing (transactions) from occuring while management operations are
+ * in progress.
+ */
+ private ReadWriteLock _mngmtLock = new ReentrantReadWriteLock();
+
static {
// TODO Clean this up and factorize engine configuration
try {
- String processMaxAge = System.getenv("ODE_DEF_MAX_AGE");
+ String processMaxAge = System.getProperty("ode.process.maxage");
if (processMaxAge != null && processMaxAge.length() > 0) {
__processMaxAge = Long.valueOf(processMaxAge);
__log.info("Process definition max age adjusted. Max age = " + __processMaxAge + "ms.");
}
} catch (Throwable t) {
if (__log.isDebugEnabled()) {
- __log.debug("Could not parse ODE_DEF_MAX_AGE environment variable.", t);
+ __log.debug("Could not parse ode.process.maxage environment variable.", t);
} else {
- __log.info("Could not parse ODE_DEF_MAX_AGE environment variable; reaping disabled.");
+ __log.info("Could not parse ode.process.maxage environment variable; reaping disabled.");
}
}
}
- /**
- * Management lock for synchronizing management operations and preventing
- * processing (transactions) from occuring while management operations are
- * in progress.
- */
- private ReadWriteLock _mngmtLock = new ReentrantReadWriteLock();
-
private enum State {
SHUTDOWN, INIT, RUNNING
}
- private State _state = State.SHUTDOWN;
-
- private Contexts _contexts = new Contexts();
-
- BpelEngineImpl _engine;
-
- BpelDatabase _db;
-
public void start() {
_mngmtLock.writeLock().lock();
try {
@@ -117,6 +115,7 @@
_contexts.scheduler.start();
_state = State.RUNNING;
__log.info(__msgs.msgServerStarted());
+ if (_dehydrationPolicy != null) new Thread(new ProcessDefReaper()).start();
} finally {
_mngmtLock.writeLock().unlock();
}
@@ -125,7 +124,6 @@
/**
* Register a global listener to receive {@link BpelEvent}s froom all
* processes.
- *
* @param listener
*/
public void registerBpelEventListener(BpelEventListener listener) {
@@ -136,7 +134,6 @@
/**
* Unregister a global listener from receive {@link BpelEvent}s from all
* processes.
- *
* @param listener
*/
public void unregisterBpelEventListener(BpelEventListener listener) {
@@ -163,37 +160,6 @@
}
}
- public void setMessageExchangeContext(MessageExchangeContext mexContext) throws BpelEngineException {
- _contexts.mexContext = mexContext;
- }
-
- public void setScheduler(Scheduler scheduler) throws BpelEngineException {
- _contexts.scheduler = scheduler;
- }
-
- public void setEndpointReferenceContext(EndpointReferenceContext eprContext) throws BpelEngineException {
- _contexts.eprContext = eprContext;
- }
-
- /**
- * Set the DAO connection factory. The DAO is used by the BPEL engine to
- * persist information about active processes.
- *
- * @param daoCF
- * {@link BpelDAOConnectionFactory} implementation.
- */
- public void setDaoConnectionFactory(BpelDAOConnectionFactory daoCF) throws BpelEngineException {
- _contexts.dao = daoCF;
- }
-
- public void setInMemDaoConnectionFactory(BpelDAOConnectionFactory daoCF) {
- _contexts.inMemDao = daoCF;
- }
-
- public void setBindingContext(BindingContext bc) {
- _contexts.bindingContext = bc;
- }
-
public void init() throws BpelEngineException {
_mngmtLock.writeLock().lock();
try {
@@ -226,7 +192,6 @@
}
public BpelEngine getEngine() {
-
boolean registered = false;
_mngmtLock.readLock().lock();
try {
@@ -237,7 +202,6 @@
public void beforeCompletion() {
_mngmtLock.readLock().unlock();
}
-
});
registered = true;
} finally {
@@ -255,16 +219,6 @@
__log.debug("register: " + conf.getProcessId());
- // Load the compiled process.
- OProcess compiledProcess;
- try {
- compiledProcess = deserializeCompiledProcess(conf.getCBPInputStream());
- } catch (Exception e) {
- String errmsg = __msgs.msgProcessLoadError(conf.getProcessId());
- __log.error(errmsg, e);
- throw new BpelEngineException(errmsg, e);
- }
-
// Ok, IO out of the way, we will mod the server state, so need to get a
// lock.
try {
@@ -283,24 +237,10 @@
__log.debug("Registering process " + conf.getProcessId() + " with server.");
- // Create an expression language registry for this process
- ExpressionLanguageRuntimeRegistry elangRegistry = new ExpressionLanguageRuntimeRegistry();
- for (OExpressionLanguage elang : compiledProcess.expressionLanguages) {
- try {
- elangRegistry.registerRuntime(elang);
- } catch (ConfigurationException e) {
- String msg = __msgs.msgExpLangRegistrationError(elang.expressionLanguageUri, elang.properties);
- __log.error(msg, e);
- throw new BpelEngineException(msg, e);
- }
- }
-
- // Create the processDAO if necessary.
- createProcessDAO(conf.getProcessId(), conf.getVersion(), compiledProcess);
-
- BpelProcess process = new BpelProcess(conf, compiledProcess, null, elangRegistry);
+ BpelProcess process = new BpelProcess(conf, null, this);
_engine.registerProcess(process);
+ _runningProcesses.add(process);
__log.info(__msgs.msgProcessRegistered(conf.getProcessId()));
} finally {
@@ -320,8 +260,11 @@
}
try {
- if (_engine != null)
+ BpelProcess p = null;
+ if (_engine != null) {
_engine.unregisterProcess(pid);
+ _runningProcesses.remove(p);
+ }
__log.info(__msgs.msgProcessUnregistered(pid));
@@ -337,76 +280,38 @@
* If necessary, create an object in the data store to represent the
* process. We'll re-use an existing object if it already exists and matches
* the GUID.
- *
- * @param pid
- * @param oprocess
*/
- private void createProcessDAO(final QName pid, final long version, final OProcess oprocess) {
+ private void bounceProcessDAO(BpelDAOConnection conn, final QName pid,
+ final long version, final OProcess oprocess) {
__log.debug("Creating process DAO for " + pid + " (guid=" + oprocess.guid + ")");
try {
- boolean create = _db.exec(new BpelDatabase.Callable<Boolean>() {
- public Boolean run(BpelDAOConnection conn) throws Exception {
- ProcessDAO old = conn.getProcess(pid);
- if (old == null) {
- // we couldnt find the process, clearly we need to
- // create it
- return true;
- }
-
- __log.debug("Found ProcessDAO for " + pid + " with GUID " + old.getGuid());
-
- if (oprocess.guid == null) {
- // No guid, old version assume its good
- return false;
- }
-
+ boolean create = true;
+ ProcessDAO old = conn.getProcess(pid);
+ if (old != null) {
+ __log.debug("Found ProcessDAO for " + pid + " with GUID " + old.getGuid());
+ if (oprocess.guid == null) {
+ // No guid, old version assume its good
+ create = false;
+ } else {
if (old.getGuid().equals(oprocess.guid)) {
// Guids match, no need to create
- return false;
+ create = false;
+ } else {
+ // GUIDS dont match, delete and create new
+ String errmsg = "ProcessDAO GUID " + old.getGuid() + " does not match "
+ + oprocess.guid + "; replacing.";
+ __log.debug(errmsg);
+ old.delete();
}
-
- // GUIDS dont match, delete and create new
- String errmsg = "ProcessDAO GUID " + old.getGuid() + " does not match " + oprocess.guid
- + "; replacing.";
- __log.debug(errmsg);
- old.delete();
-
- return true;
-
}
- });
-
- if (create)
- _db.exec(new BpelDatabase.Callable<Object>() {
- public Object run(BpelDAOConnection conn) throws Exception {
- ProcessDAO old = conn.getProcess(pid);
- if (old != null) {
- __log.debug("Found ProcessDAO for " + pid + " with GUID " + old.getGuid());
-
- if (oprocess.guid != null) {
- if (!old.getGuid().equals(oprocess.guid)) {
- // TODO: Versioning will need to handle this
- // differently.
- String errmsg = "ProcessDAO GUID " + old.getGuid() + " does not match "
- + oprocess.guid + "; replacing.";
- __log.warn(errmsg);
- old.delete();
- } else {
- return null;
- }
- } else {
- // no guid, consider compatible.
- return null;
- }
- }
+ }
- ProcessDAO newDao = conn.createProcess(pid, oprocess.getQName(), oprocess.guid, version);
- for (String correlator : oprocess.getCorrelators()) {
- newDao.addCorrelator(correlator);
- }
- return null;
- }
- });
+ if (create) {
+ ProcessDAO newDao = conn.createProcess(pid, oprocess.getQName(), oprocess.guid, version);
+ for (String correlator : oprocess.getCorrelators()) {
+ newDao.addCorrelator(correlator);
+ }
+ }
} catch (BpelEngineException ex) {
throw ex;
} catch (Exception dce) {
@@ -417,9 +322,7 @@
/**
* Register a global message exchange interceptor.
- *
- * @param interceptor
- * message-exchange interceptor
+ * @param interceptor message-exchange interceptor
*/
public void registerMessageExchangeInterceptor(MessageExchangeInterceptor interceptor) {
// NOTE: do not synchronize, globalInterceptors is copy-on-write.
@@ -428,9 +331,7 @@
/**
* Unregister a global message exchange interceptor.
- *
- * @param interceptor
- * message-exchange interceptor
+ * @param interceptor message-exchange interceptor
*/
public void unregisterMessageExchangeInterceptor(MessageExchangeInterceptor interceptor) {
// NOTE: do not synchronize, globalInterceptors is copy-on-write.
@@ -443,32 +344,13 @@
private final boolean checkState(State i, State j) {
if (_state == i)
return true;
-
if (_state == j)
return false;
-
throw new IllegalStateException("Unexpected state: " + i);
-
- }
-
- /**
- * De-serialize the compiled process representation from a stream.
- *
- * @param is
- * input stream
- * @return process information from configuration database
- */
- static OProcess deserializeCompiledProcess(InputStream is) throws Exception {
-
- OProcess compiledProcess;
- Serializer ofh = new Serializer(is);
- compiledProcess = ofh.readOProcess();
- return compiledProcess;
}
/* TODO: We need to have a method of cleaning up old deployment data. */
private boolean deleteProcessDAO(final QName pid) {
-
try {
// Delete it from the database.
return _db.exec(new BpelDatabase.Callable<Boolean>() {
@@ -481,51 +363,92 @@
return false;
}
});
-
} catch (Exception ex) {
String errmsg = "DbError";
__log.error(errmsg, ex);
throw new BpelEngineException(errmsg, ex);
}
-
}
public void onScheduledJob(JobInfo jobInfo) throws JobProcessorException {
- getEngine().onScheduledJob(jobInfo);
+ getEngine().onScheduledJob(jobInfo);
+ }
+
+ public void hydrated(final BpelProcess process) {
+ // Recreating the process DAO if the definition has changed, shouldn't do anything
+ // except after a redeploy
+ bounceProcessDAO(_contexts.dao.getConnection(), process.getPID(), process._pconf.getVersion(), process.getOProcess());
+
+ _runningProcesses.add(process);
+ }
+
+ private class ProcessDefReaper implements Runnable {
+ public void run() {
+ __log.debug("Starting process definition reaper thread.");
+ long pollingTime = 10000;
+ try {
+ while (true) {
+ Thread.sleep(pollingTime);
+ _mngmtLock.writeLock().lock();
+ try {
+ __log.debug("Kicking reaper, OProcess instances: " + OProcess.instanceCount);
+ // Copying the runnning process list to avoid synchronization
+ // problems and a potential mess if a policy modifies the list
+ List<BpelProcess> runningProcesses;
+ synchronized(_runningProcesses) {
+ runningProcesses = new ArrayList<BpelProcess>(_runningProcesses);
+ }
+ // And the happy winners are...
+ List<BpelProcess> ripped = _dehydrationPolicy.markForDehydration(runningProcesses);
+ // Bye bye
+ for (BpelProcess process : ripped) {
+ __log.debug("Dehydrating process " + process.getPID());
+ process.dehydrate();
+ _runningProcesses.remove(process);
+ }
+ } finally {
+ _mngmtLock.writeLock().unlock();
+ }
+ }
+ } catch (InterruptedException e) {
+ __log.info(e);
+ }
+ }
+ }
+
+ public void setDehydrationPolicy(DehydrationPolicy dehydrationPolicy) {
+ _dehydrationPolicy = dehydrationPolicy;
+ }
+
+ public void setMessageExchangeContext(MessageExchangeContext mexContext) throws BpelEngineException {
+ _contexts.mexContext = mexContext;
+ }
+
+ public void setScheduler(Scheduler scheduler) throws BpelEngineException {
+ _contexts.scheduler = scheduler;
+ }
+
+ public void setEndpointReferenceContext(EndpointReferenceContext eprContext) throws BpelEngineException {
+ _contexts.eprContext = eprContext;
+ }
+
+ /**
+ * Set the DAO connection factory. The DAO is used by the BPEL engine to
+ * persist information about active processes.
+ *
+ * @param daoCF
+ * {@link BpelDAOConnectionFactory} implementation.
+ */
+ public void setDaoConnectionFactory(BpelDAOConnectionFactory daoCF) throws BpelEngineException {
+ _contexts.dao = daoCF;
+ }
+
+ public void setInMemDaoConnectionFactory(BpelDAOConnectionFactory daoCF) {
+ _contexts.inMemDao = daoCF;
+ }
+
+ public void setBindingContext(BindingContext bc) {
+ _contexts.bindingContext = bc;
}
- //
- // I've moved this code out of BpelEngineImpl, it should be here not there.
- // -Maciej 12/22/06
- // /**
- //
- // */
- // private class ProcessDefReaper implements Runnable {
- // public void run() {
- // try {
- // while (true) {
- // Thread.sleep(10000);
- // _mngmtLock.writeLock().lock();
- // try {
- // for (BpelProcess process : _activeProcesses.values()) {
- // Long lru;
- // synchronized(_processesLRU) {
- // lru = _processesLRU.get(process._pid);
- // }
- // if (lru != null && process._oprocess != null
- // && System.currentTimeMillis() - lru > _processMaxAge) {
- // process._oprocess = null;
- // __log.debug("Process definition reaper cleaning " + process._pid);
- // }
- // Thread.sleep(10);
- // }
- // } finally {
- // _mngmtLock.writeLock().unlock();
- // }
- // }
- // } catch (InterruptedException e) {
- // __log.info(e);
- // }
- // }
- // }
}
Added: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/CountLRUDehydrationPolicy.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/CountLRUDehydrationPolicy.java?view=auto&rev=504332
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/CountLRUDehydrationPolicy.java (added)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/CountLRUDehydrationPolicy.java Tue Feb 6 14:17:56 2007
@@ -0,0 +1,56 @@
+package org.apache.ode.bpel.engine;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+/**
+ * @author Matthieu Riou <mriou at apache dot org>
+ */
+public class CountLRUDehydrationPolicy implements DehydrationPolicy {
+
+ /** Maximum age of a process before it is quiesced */
+ private long _processMaxAge = 20 * 60 * 1000;
+ /** Maximum process count before oldest ones get quiesced */
+ private int _processMaxCount = 1000;
+
+ public List<BpelProcess> markForDehydration(List<BpelProcess> runningProcesses) {
+ ArrayList<BpelProcess> ripped = new ArrayList<BpelProcess>();
+
+ if (_processMaxAge > 0) {
+ // The oldies have to go first
+ long now = System.currentTimeMillis();
+ for (BpelProcess process : runningProcesses) {
+ if (now - process.getLastUsed() > _processMaxAge) {
+ ripped.add(process);
+ }
+ }
+ }
+
+ // If it's not enough, other ones must be put to the axe
+ if (runningProcesses.size() - ripped.size() > _processMaxCount) {
+ runningProcesses.removeAll(ripped);
+ Collections.sort(runningProcesses, new Comparator<BpelProcess>() {
+ public int compare(BpelProcess p1, BpelProcess p2) {
+ if (p1.getLastUsed() > p2.getLastUsed()) return 1;
+ if (p1.getLastUsed() < p2.getLastUsed()) return -1;
+ return 0;
+ }
+ });
+ for (int m = _processMaxCount; m < runningProcesses.size(); m++) {
+ ripped.add(runningProcesses.get(m));
+ }
+ }
+
+ return ripped;
+ }
+
+ public void setProcessMaxAge(long processMaxAge) {
+ _processMaxAge = processMaxAge;
+ }
+
+ public void setProcessMaxCount(int processMaxCount) {
+ _processMaxCount = processMaxCount;
+ }
+}
Added: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/DehydrationPolicy.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/DehydrationPolicy.java?view=auto&rev=504332
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/DehydrationPolicy.java (added)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/DehydrationPolicy.java Tue Feb 6 14:17:56 2007
@@ -0,0 +1,20 @@
+package org.apache.ode.bpel.engine;
+
+import java.util.List;
+
+/**
+ * Defines a policy to dehydrate running processes based on a limit in total
+ * process count or processes that haven't been used for a while.
+ * @author Matthieu Riou <mriou at apache dot org>
+ */
+public interface DehydrationPolicy {
+
+ /**
+ * Checks the currently running processes and marks some of them for
+ * dehydration according to a specifically configured policy. The
+ * returned processes will be dehydrated by the engine.
+ * @param runningProcesses all running (currently hydrated) processes
+ * @return processes elected for dehydration
+ */
+ List<BpelProcess> markForDehydration(List<BpelProcess> runningProcesses);
+}
Added: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java?view=auto&rev=504332
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java (added)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java Tue Feb 6 14:17:56 2007
@@ -0,0 +1,304 @@
+package org.apache.ode.bpel.engine;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.common.CorrelationKey;
+import org.apache.ode.bpel.common.FaultException;
+import org.apache.ode.bpel.common.InvalidMessageException;
+import org.apache.ode.bpel.dao.CorrelatorDAO;
+import org.apache.ode.bpel.dao.MessageRouteDAO;
+import org.apache.ode.bpel.dao.ProcessDAO;
+import org.apache.ode.bpel.dao.ProcessInstanceDAO;
+import org.apache.ode.bpel.evt.CorrelationMatchEvent;
+import org.apache.ode.bpel.evt.CorrelationNoMatchEvent;
+import org.apache.ode.bpel.evt.NewProcessInstanceEvent;
+import org.apache.ode.bpel.iapi.Endpoint;
+import org.apache.ode.bpel.iapi.MessageExchange;
+import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
+import org.apache.ode.bpel.iapi.ProcessState;
+import org.apache.ode.bpel.intercept.InterceptorInvoker;
+import org.apache.ode.bpel.o.OMessageVarType;
+import org.apache.ode.bpel.o.OPartnerLink;
+import org.apache.ode.bpel.o.OProcess;
+import org.apache.ode.bpel.o.OScope;
+import org.apache.ode.bpel.runtime.InvalidProcessException;
+import org.apache.ode.bpel.runtime.PROCESS;
+import org.apache.ode.utils.ArrayUtils;
+import org.apache.ode.utils.ObjectPrinter;
+import org.apache.ode.utils.msg.MessageBundle;
+import org.w3c.dom.Element;
+
+import javax.wsdl.Operation;
+import javax.xml.namespace.QName;
+import java.util.*;
+
+/**
+ * @author Matthieu Riou <mriou at apache dot org>
+ */
+class PartnerLinkMyRoleImpl extends PartnerLinkRoleImpl {
+ private static final Log __log = LogFactory.getLog(BpelProcess.class);
+ private static final Messages __msgs = MessageBundle.getMessages(Messages.class);
+
+ /** The local endpoint for this "myrole". */
+ public Endpoint _endpoint;
+
+ PartnerLinkMyRoleImpl(BpelProcess process, OPartnerLink plink, Endpoint endpoint) {
+ super(process, plink);
+ _endpoint = endpoint;
+ }
+
+ public String toString() {
+ StringBuffer buf = new StringBuffer("{PartnerLinkRole-");
+ buf.append(_plinkDef.name);
+ buf.append('.');
+ buf.append(_plinkDef.myRoleName);
+ buf.append(" on ");
+ buf.append(_endpoint);
+ buf.append('}');
+
+ return buf.toString();
+ }
+
+ /**
+ * Called when an input message has been received.
+ *
+ * @param mex
+ * exchange to which the message is related
+ */
+ public void invokeMyRole(MyRoleMessageExchangeImpl mex) {
+ if (__log.isTraceEnabled()) {
+ __log.trace(ObjectPrinter.stringifyMethodEnter(this + ":inputMsgRcvd", new Object[] {
+ "messageExchange", mex }));
+ }
+
+ Operation operation = getMyRoleOperation(mex.getOperationName());
+ if (operation == null) {
+ __log.error(__msgs.msgUnknownOperation(mex.getOperationName(), _plinkDef.myRolePortType.getQName()));
+ mex.setFailure(MessageExchange.FailureType.UNKNOWN_OPERATION, mex.getOperationName(), null);
+ return;
+ }
+
+ mex.getDAO().setPartnerLinkModelId(_plinkDef.getId());
+ mex.setPortOp(_plinkDef.myRolePortType, operation);
+ mex.setPattern(operation.getOutput() == null ? MessageExchange.MessageExchangePattern.REQUEST_ONLY
+ : MessageExchange.MessageExchangePattern.REQUEST_RESPONSE);
+
+ // Is this a /possible/ createInstance Operation?
+ boolean isCreateInstnace = _plinkDef.isCreateInstanceOperation(operation);
+
+ // now, the tricks begin: when a message arrives we have to see if
+ // there
+ // is anyone waiting for it.
+ // Get the correlator, a persisted communnication-reduction data
+ // structure
+ // supporting correlation correlationKey matching!
+ String correlatorId = BpelProcess.genCorrelatorId(_plinkDef, operation.getName());
+
+ CorrelatorDAO correlator = _process.getProcessDAO().getCorrelator(correlatorId);
+
+ CorrelationKey[] keys;
+ MessageRouteDAO messageRoute = null;
+
+ // We need to compute the correlation keys (based on the operation
+ // we can infer which correlation keys to compute - this is merely a set
+ // consisting of each correlationKey used in each correlation sets
+ // that is ever referenced in an <receive>/<onMessage> on this
+ // partnerlink/operation.
+ try {
+ keys = computeCorrelationKeys(mex);
+ } catch (InvalidMessageException ime) {
+ // We'd like to do a graceful exit here, no sense in rolling back due to a
+ // a message format problem.
+ __log.debug("Unable to evaluate correlation keys, invalid message format. ",ime);
+ mex.setFailure(MessageExchange.FailureType.FORMAT_ERROR, ime.getMessage(), null);
+ return;
+ }
+
+ String mySessionId = mex.getProperty(MessageExchange.PROPERTY_SEP_MYROLE_SESSIONID);
+ String partnerSessionId = mex.getProperty(MessageExchange.PROPERTY_SEP_PARTNERROLE_SESSIONID);
+ if (__log.isDebugEnabled()) {
+ __log.debug("INPUTMSG: " + correlatorId + ": MSG RCVD keys="
+ + ArrayUtils.makeCollection(HashSet.class, keys) + " mySessionId=" + mySessionId
+ + " partnerSessionId=" + partnerSessionId);
+ }
+
+ CorrelationKey matchedKey = null;
+
+ // Try to find a route for one of our keys.
+ for (CorrelationKey key : keys) {
+ messageRoute = correlator.findRoute(key);
+ if (messageRoute != null) {
+ if (__log.isDebugEnabled()) {
+ __log.debug("INPUTMSG: " + correlatorId + ": ckey " + key + " route is to " + messageRoute);
+ }
+ matchedKey = key;
+ break;
+ }
+ }
+
+ // TODO - ODE-58
+
+ // If no luck, and this operation qualifies for create-instance
+ // treatment, then create a new process
+ // instance.
+ if (messageRoute == null && isCreateInstnace) {
+ if (__log.isDebugEnabled()) {
+ __log.debug("INPUTMSG: " + correlatorId + ": routing failed, CREATING NEW INSTANCE");
+ }
+ ProcessDAO processDAO = _process.getProcessDAO();
+
+ if (_process._pconf.getState() == ProcessState.RETIRED) {
+ throw new InvalidProcessException("Process is retired.", InvalidProcessException.RETIRED_CAUSE_CODE);
+ }
+
+ if (!_process.processInterceptors(mex, InterceptorInvoker.__onNewInstanceInvoked)) {
+ __log.debug("Not creating a new instance for mex " + mex + "; interceptor prevented!");
+ return;
+ }
+
+ ProcessInstanceDAO newInstance = processDAO.createInstance(correlator);
+
+ BpelRuntimeContextImpl instance = _process
+ .createRuntimeContext(newInstance, new PROCESS(_process.getOProcess()), mex);
+
+ // send process instance event
+ NewProcessInstanceEvent evt = new NewProcessInstanceEvent(new QName(_process.getOProcess().targetNamespace,
+ _process.getOProcess().getName()), _process.getProcessDAO().getProcessId(), newInstance.getInstanceId());
+ evt.setPortType(mex.getPortType().getQName());
+ evt.setOperation(operation.getName());
+ evt.setMexId(mex.getMessageExchangeId());
+ _process._debugger.onEvent(evt);
+ _process.saveEvent(evt, newInstance);
+ mex.setCorrelationStatus(MyRoleMessageExchange.CorrelationStatus.CREATE_INSTANCE);
+ mex.getDAO().setInstance(newInstance);
+
+ instance.execute();
+ } else if (messageRoute != null) {
+ if (__log.isDebugEnabled()) {
+ __log.debug("INPUTMSG: " + correlatorId + ": ROUTING to instance "
+ + messageRoute.getTargetInstance().getInstanceId());
+ }
+
+ ProcessInstanceDAO instanceDao = messageRoute.getTargetInstance();
+
+ // Reload process instance for DAO.
+ BpelRuntimeContextImpl instance = _process.createRuntimeContext(instanceDao, null, null);
+ instance.inputMsgMatch(messageRoute.getGroupId(), messageRoute.getIndex(), mex);
+
+ // Kill the route so some new message does not get routed to
+ // same process instance.
+ correlator.removeRoutes(messageRoute.getGroupId(), instanceDao);
+
+ // send process instance event
+ CorrelationMatchEvent evt = new CorrelationMatchEvent(new QName(_process.getOProcess().targetNamespace,
+ _process.getOProcess().getName()), _process.getProcessDAO().getProcessId(),
+ instanceDao.getInstanceId(), matchedKey);
+ evt.setPortType(mex.getPortType().getQName());
+ evt.setOperation(operation.getName());
+ evt.setMexId(mex.getMessageExchangeId());
+
+ _process._debugger.onEvent(evt);
+ // store event
+ _process.saveEvent(evt, instanceDao);
+
+ // EXPERIMENTAL -- LOCK
+ // instanceDao.lock();
+
+ mex.setCorrelationStatus(MyRoleMessageExchange.CorrelationStatus.MATCHED);
+ mex.getDAO().setInstance(messageRoute.getTargetInstance());
+ instance.execute();
+ } else {
+ if (__log.isDebugEnabled()) {
+ __log.debug("INPUTMSG: " + correlatorId + ": SAVING to DB (no match) ");
+ }
+
+ if (!mex.isAsynchronous()) {
+ mex.setFailure(MessageExchange.FailureType.NOMATCH, "No process instance matching correlation keys.", null);
+
+ } else {
+ // send event
+ CorrelationNoMatchEvent evt = new CorrelationNoMatchEvent(mex.getPortType().getQName(), mex
+ .getOperation().getName(), mex.getMessageExchangeId(), keys);
+
+ evt.setProcessId(_process.getProcessDAO().getProcessId());
+ evt.setProcessName(new QName(_process.getOProcess().targetNamespace, _process.getOProcess().getName()));
+ _process._debugger.onEvent(evt);
+
+ mex.setCorrelationStatus(MyRoleMessageExchange.CorrelationStatus.QUEUED);
+
+ // No match, means we add message exchange to the queue.
+ correlator.enqueueMessage(mex.getDAO(), keys);
+
+ }
+ }
+
+ // Now we have to update our message exchange status. If the <reply>
+ // was not hit during the
+ // invocation, then we will be in the "REQUEST" phase which means
+ // that either this was a one-way
+ // or a two-way that needs to delivery the reply asynchronously.
+ if (mex.getStatus() == MessageExchange.Status.REQUEST) {
+ mex.setStatus(MessageExchange.Status.ASYNC);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private Operation getMyRoleOperation(String operationName) {
+ Operation op = _plinkDef.getMyRoleOperation(operationName);
+ return op;
+ }
+
+ private CorrelationKey[] computeCorrelationKeys(MyRoleMessageExchangeImpl mex) {
+ Operation operation = mex.getOperation();
+ Element msg = mex.getRequest().getMessage();
+ javax.wsdl.Message msgDescription = operation.getInput().getMessage();
+ List<CorrelationKey> keys = new ArrayList<CorrelationKey>();
+
+ Set<OScope.CorrelationSet> csets = _plinkDef.getCorrelationSetsForOperation(operation);
+
+ for (OScope.CorrelationSet cset : csets) {
+ CorrelationKey key = computeCorrelationKey(cset,
+ _process.getOProcess().messageTypes.get(msgDescription.getQName()), msg);
+ keys.add(key);
+ }
+
+ // Let's creata a key based on the sessionId
+ String mySessionId = mex.getProperty(MessageExchange.PROPERTY_SEP_MYROLE_SESSIONID);
+ if (mySessionId != null)
+ keys.add(new CorrelationKey(-1, new String[] { mySessionId }));
+
+ return keys.toArray(new CorrelationKey[keys.size()]);
+ }
+
+ private CorrelationKey computeCorrelationKey(OScope.CorrelationSet cset, OMessageVarType messagetype,
+ Element msg) {
+ String[] values = new String[cset.properties.size()];
+
+ int jIdx = 0;
+ for (Iterator j = cset.properties.iterator(); j.hasNext(); ++jIdx) {
+ OProcess.OProperty property = (OProcess.OProperty) j.next();
+ OProcess.OPropertyAlias alias = property.getAlias(messagetype);
+
+ if (alias == null) {
+ // TODO: Throw a real exception! And catch this at compile
+ // time.
+ throw new IllegalArgumentException("No alias matching property '" + property.name
+ + "' with message type '" + messagetype + "'");
+ }
+
+ String value;
+ try {
+ value = _process.extractProperty(msg, alias, msg.toString());
+ } catch (FaultException fe) {
+ String emsg = __msgs.msgPropertyAliasDerefFailedOnMessage(alias.getDescription(), fe.getMessage());
+ __log.error(emsg, fe);
+ throw new InvalidMessageException(emsg, fe);
+ }
+ values[jIdx] = value;
+ }
+
+ CorrelationKey key = new CorrelationKey(cset.getId(), values);
+ return key;
+ }
+
+}
Added: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkPartnerRoleImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkPartnerRoleImpl.java?view=auto&rev=504332
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkPartnerRoleImpl.java (added)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkPartnerRoleImpl.java Tue Feb 6 14:17:56 2007
@@ -0,0 +1,35 @@
+package org.apache.ode.bpel.engine;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.iapi.Endpoint;
+import org.apache.ode.bpel.iapi.PartnerRoleChannel;
+import org.apache.ode.bpel.o.OPartnerLink;
+
+/**
+ * @author Matthieu Riou <mriou at apache dot org>
+ */
+class PartnerLinkPartnerRoleImpl extends PartnerLinkRoleImpl {
+ static final Log __log = LogFactory.getLog(BpelProcess.class);
+
+ Endpoint _initialPartner;
+
+ public PartnerRoleChannel _channel;
+
+ PartnerLinkPartnerRoleImpl(BpelProcess process, OPartnerLink plink, Endpoint initialPartner) {
+ super(process, plink);
+ _initialPartner = initialPartner;
+ }
+
+ public void processPartnerResponse(PartnerRoleMessageExchangeImpl messageExchange) {
+ if (__log.isDebugEnabled()) {
+ __log.debug("Processing partner's response for partnerLink: " + messageExchange);
+ }
+
+ BpelRuntimeContextImpl processInstance =
+ _process.createRuntimeContext(messageExchange.getDAO().getInstance(), null, null);
+ processInstance.invocationResponse(messageExchange);
+ processInstance.execute();
+ }
+
+}
Added: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkRoleImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkRoleImpl.java?view=auto&rev=504332
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkRoleImpl.java (added)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkRoleImpl.java Tue Feb 6 14:17:56 2007
@@ -0,0 +1,31 @@
+package org.apache.ode.bpel.engine;
+
+import org.apache.ode.bpel.iapi.EndpointReference;
+import org.apache.ode.bpel.o.OPartnerLink;
+
+/**
+ * @author Matthieu Riou <mriou at apache dot org>
+ */
+abstract class PartnerLinkRoleImpl {
+ protected OPartnerLink _plinkDef;
+ protected EndpointReference _initialEPR;
+ protected BpelProcess _process;
+
+ PartnerLinkRoleImpl(BpelProcess process, OPartnerLink plink) {
+ _plinkDef = plink;
+ _process = process;
+ }
+ String getPartnerLinkName() {
+ return _plinkDef.name;
+ }
+ /**
+ * Get the initial value of this role's EPR. This value is obtained from
+ * the integration layer when the process is enabled on the server.
+ *
+ * @return initial epr
+ */
+ EndpointReference getInitialEPR() {
+ return _initialEPR;
+ }
+
+}
Added: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ProcessLifecycleCallback.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ProcessLifecycleCallback.java?view=auto&rev=504332
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ProcessLifecycleCallback.java (added)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ProcessLifecycleCallback.java Tue Feb 6 14:17:56 2007
@@ -0,0 +1,9 @@
+package org.apache.ode.bpel.engine;
+
+/**
+ * @author Matthieu Riou <mriou at apache dot org>
+ */
+public interface ProcessLifecycleCallback {
+
+ void hydrated(BpelProcess process);
+}
Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReplacementMapImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReplacementMapImpl.java?view=diff&rev=504332&r1=504331&r2=504332
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReplacementMapImpl.java (original)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReplacementMapImpl.java Tue Feb 6 14:17:56 2007
@@ -18,39 +18,66 @@
*/
package org.apache.ode.bpel.engine;
-import org.apache.ode.jacob.soup.ReplacementMap;
import org.apache.ode.bpel.o.OBase;
import org.apache.ode.bpel.o.OProcess;
+import org.apache.ode.jacob.soup.ReplacementMap;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
/**
* A JACOB {@link ReplacementMap} implementation that eliminates unnecessary serialization
* of the (constant) compiled process model.
*/
class ReplacementMapImpl implements ReplacementMap {
- private OProcess _oprocess;
+ private OProcess _oprocess;
- ReplacementMapImpl(OProcess oprocess) {
- _oprocess = oprocess;
- }
-
- public boolean isReplacement(Object obj) {
- return obj instanceof BpelProcess.OBaseReplacementImpl;
- }
-
- public Object getOriginal(Object replacement) throws IllegalArgumentException {
- if (!(replacement instanceof BpelProcess.OBaseReplacementImpl))
- throw new IllegalArgumentException("Not OBaseReplacementObject!");
- return _oprocess.getChild(((BpelProcess.OBaseReplacementImpl)replacement)._id);
- }
-
- public Object getReplacement(Object original) throws IllegalArgumentException {
- if (!(original instanceof OBase))
- throw new IllegalArgumentException("Not OBase!");
- return new BpelProcess.OBaseReplacementImpl(((OBase)original).getId());
- }
-
- public boolean isReplaceable(Object obj) {
- return obj instanceof OBase;
- }
+ ReplacementMapImpl(OProcess oprocess) {
+ _oprocess = oprocess;
+ }
+
+ public boolean isReplacement(Object obj) {
+ return obj instanceof OBaseReplacementImpl;
+ }
+
+ public Object getOriginal(Object replacement) throws IllegalArgumentException {
+ if (!(replacement instanceof OBaseReplacementImpl))
+ throw new IllegalArgumentException("Not OBaseReplacementObject!");
+ return _oprocess.getChild(((OBaseReplacementImpl)replacement)._id);
+ }
+
+ public Object getReplacement(Object original) throws IllegalArgumentException {
+ if (!(original instanceof OBase))
+ throw new IllegalArgumentException("Not OBase!");
+ return new OBaseReplacementImpl(((OBase)original).getId());
+ }
+
+ public boolean isReplaceable(Object obj) {
+ return obj instanceof OBase;
+ }
+
+ /**
+ * Replacement object for serializtation of the {@link OBase} (compiled
+ * BPEL) objects in the JACOB VPU.
+ */
+ public static final class OBaseReplacementImpl implements Externalizable {
+ private static final long serialVersionUID = 1L;
+
+ int _id;
+
+ public OBaseReplacementImpl() {
+ }
+ public OBaseReplacementImpl(int id) {
+ _id = id;
+ }
+ public void readExternal(ObjectInput in) throws IOException {
+ _id = in.readInt();
+ }
+ public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeInt(_id);
+ }
+ }
}
Modified: incubator/ode/trunk/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java?view=diff&rev=504332&r1=504331&r2=504332
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java (original)
+++ incubator/ode/trunk/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java Tue Feb 6 14:17:56 2007
@@ -18,35 +18,9 @@
*/
package org.apache.ode.bpel.runtime;
-import java.io.File;
-import java.sql.DriverManager;
-import java.util.Collection;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import javax.sql.DataSource;
-import javax.transaction.TransactionManager;
-import javax.wsdl.PortType;
-import javax.xml.namespace.QName;
-
import org.apache.ode.bpel.dao.BpelDAOConnectionFactory;
import org.apache.ode.bpel.engine.BpelServerImpl;
-import org.apache.ode.bpel.iapi.BindingContext;
-import org.apache.ode.bpel.iapi.BpelServer;
-import org.apache.ode.bpel.iapi.ContextException;
-import org.apache.ode.bpel.iapi.Endpoint;
-import org.apache.ode.bpel.iapi.EndpointReference;
-import org.apache.ode.bpel.iapi.EndpointReferenceContext;
-import org.apache.ode.bpel.iapi.Message;
-import org.apache.ode.bpel.iapi.MessageExchangeContext;
-import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
-import org.apache.ode.bpel.iapi.PartnerRoleChannel;
-import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange;
-import org.apache.ode.bpel.iapi.Scheduler;
+import org.apache.ode.bpel.iapi.*;
import org.apache.ode.bpel.scheduler.quartz.QuartzSchedulerImpl;
import org.apache.ode.dao.jpa.ojpa.BPELDAOConnectionFactoryImpl;
import org.apache.ode.store.ProcessStoreImpl;
@@ -58,6 +32,20 @@
import org.w3c.dom.Document;
import org.w3c.dom.Element;
+import javax.sql.DataSource;
+import javax.transaction.TransactionManager;
+import javax.wsdl.PortType;
+import javax.xml.namespace.QName;
+import java.io.File;
+import java.sql.DriverManager;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
class MockBpelServer {
@@ -242,7 +230,7 @@
protected BindingContext createBindingContext() {
_bindContext = new BindingContext() {
- public EndpointReference activateMyRoleEndpoint(QName processId, Endpoint myRoleEndpoint, PortType portType) {
+ public EndpointReference activateMyRoleEndpoint(QName processId, Endpoint myRoleEndpoint) {
final Document doc = DOMUtils.newDocument();
Element serviceRef = doc.createElementNS(EndpointReference.SERVICE_REF_QNAME.getNamespaceURI(),
EndpointReference.SERVICE_REF_QNAME.getLocalPart());
Modified: incubator/ode/trunk/bpel-test/src/main/java/org/apache/ode/test/BindingContextImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-test/src/main/java/org/apache/ode/test/BindingContextImpl.java?view=diff&rev=504332&r1=504331&r2=504332
==============================================================================
--- incubator/ode/trunk/bpel-test/src/main/java/org/apache/ode/test/BindingContextImpl.java (original)
+++ incubator/ode/trunk/bpel-test/src/main/java/org/apache/ode/test/BindingContextImpl.java Tue Feb 6 14:17:56 2007
@@ -37,9 +37,6 @@
*/
package org.apache.ode.test;
-import javax.wsdl.PortType;
-import javax.xml.namespace.QName;
-
import org.apache.ode.bpel.iapi.BindingContext;
import org.apache.ode.bpel.iapi.Endpoint;
import org.apache.ode.bpel.iapi.EndpointReference;
@@ -48,11 +45,13 @@
import org.w3c.dom.Document;
import org.w3c.dom.Element;
+import javax.wsdl.PortType;
+import javax.xml.namespace.QName;
+
public class BindingContextImpl implements BindingContext {
- public EndpointReference activateMyRoleEndpoint(QName processId, Endpoint myRoleEndpoint,
- PortType portType) {
+ public EndpointReference activateMyRoleEndpoint(QName processId, Endpoint myRoleEndpoint) {
final Document doc = DOMUtils.newDocument();
Element serviceref = doc.createElementNS(EndpointReference.SERVICE_REF_QNAME.getNamespaceURI(),
EndpointReference.SERVICE_REF_QNAME.getLocalPart());
Modified: incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/BindingContextImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/BindingContextImpl.java?view=diff&rev=504332&r1=504331&r2=504332
==============================================================================
--- incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/BindingContextImpl.java (original)
+++ incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/BindingContextImpl.java Tue Feb 6 14:17:56 2007
@@ -42,10 +42,9 @@
_ode = ode;
}
- public EndpointReference activateMyRoleEndpoint(QName processId, Endpoint myRoleEndpoint,
- PortType portType) {
+ public EndpointReference activateMyRoleEndpoint(QName processId, Endpoint myRoleEndpoint) {
try {
- return _ode.activateEndpoint(processId, myRoleEndpoint, portType.getQName());
+ return _ode.activateEndpoint(processId, myRoleEndpoint);
} catch (Exception ex) {
throw new ContextException("Could not activate endpoint " + myRoleEndpoint + " for process " + processId,
ex);
Modified: incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeContext.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeContext.java?view=diff&rev=504332&r1=504331&r2=504332
==============================================================================
--- incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeContext.java (original)
+++ incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeContext.java Tue Feb 6 14:17:56 2007
@@ -24,7 +24,6 @@
import org.apache.ode.bpel.engine.BpelServerImpl;
import org.apache.ode.bpel.iapi.Endpoint;
import org.apache.ode.bpel.iapi.ProcessConf;
-import org.apache.ode.bpel.iapi.ProcessStore;
import org.apache.ode.bpel.scheduler.quartz.QuartzSchedulerImpl;
import org.apache.ode.jbi.msgmap.Mapper;
import org.apache.ode.jbi.util.WSDLFlattener;
@@ -160,7 +159,7 @@
return (TransactionManager) getContext().getTransactionManager();
}
- public MyEndpointReference activateEndpoint(QName pid, Endpoint endpoint, QName portType) throws Exception {
+ public MyEndpointReference activateEndpoint(QName pid, Endpoint endpoint) throws Exception {
if (__log.isDebugEnabled()) {
__log.debug("Activate endpoint: " + endpoint);
}
@@ -168,8 +167,8 @@
OdeService service = new OdeService(this, endpoint);
try {
ProcessConf pc = _store.getProcessConfiguration(pid);
- Definition def = pc.getDefinitionForPortType(portType);
- def = new WSDLFlattener(def).getDefinition(portType);
+ Definition def = pc.getDefinitionForService(endpoint.serviceName);
+ def = new WSDLFlattener(def).getDefinition(endpoint);
Document doc = WSDLFactory.newInstance().newWSDLWriter().getDocument(def);
addEndpointDoc(endpoint.serviceName, doc);
} catch (Exception e) {
Modified: incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/util/WSDLFlattener.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/util/WSDLFlattener.java?view=diff&rev=504332&r1=504331&r2=504332
==============================================================================
--- incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/util/WSDLFlattener.java (original)
+++ incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/util/WSDLFlattener.java Tue Feb 6 14:17:56 2007
@@ -16,32 +16,18 @@
*/
package org.apache.ode.jbi.util;
-import java.net.URI;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
+import com.ibm.wsdl.extensions.schema.SchemaImpl;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.iapi.Endpoint;
-import javax.wsdl.Definition;
-import javax.wsdl.Fault;
-import javax.wsdl.Import;
-import javax.wsdl.Input;
-import javax.wsdl.Message;
-import javax.wsdl.Operation;
-import javax.wsdl.Output;
-import javax.wsdl.Part;
-import javax.wsdl.PortType;
-import javax.wsdl.Types;
+import javax.wsdl.*;
import javax.wsdl.extensions.ExtensibilityElement;
import javax.wsdl.extensions.schema.SchemaImport;
import javax.wsdl.factory.WSDLFactory;
import javax.xml.namespace.QName;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import com.ibm.wsdl.extensions.schema.SchemaImpl;
+import java.net.URI;
+import java.util.*;
public class WSDLFlattener {
@@ -49,7 +35,7 @@
private Definition definition;
private SchemaCollection schemas;
- private Map flattened;
+ private Map<Endpoint,Definition> flattened;
private boolean initialized;
public WSDLFlattener() {
@@ -82,15 +68,16 @@
/**
* Retrieve a flattened definition for a given port type name.
- * @param portType the port type to create a flat definition for
+ * @param endpoint the port type to create a flat definition for
* @return a flat definition for the port type
* @throws Exception if an error occurs
*/
- public Definition getDefinition(QName portType) throws Exception {
- Definition def = (Definition) flattened.get(portType);
+ public Definition getDefinition(Endpoint endpoint) throws Exception {
+ Definition def = (Definition) flattened.get(endpoint);
if (def == null) {
- def = flattenDefinition(portType);
- flattened.put(portType, def);
+ PortType pt = def.getService(endpoint.serviceName).getPort(endpoint.portName).getBinding().getPortType();
+ def = flattenDefinition(pt.getQName());
+ flattened.put(endpoint, def);
}
return def;
}