You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by mr...@apache.org on 2008/09/10 21:07:05 UTC
svn commit: r693931 [9/12] - in /ode/trunk:
bpel-compiler/src/main/java/org/apache/ode/bpel/compiler/v1/
bpel-compiler/src/main/java/org/apache/ode/bpel/compiler/v1/xpath10/
bpel-compiler/src/main/java/org/apache/ode/bpel/compiler/v1/xpath20/
runtimes/...
Added: ode/trunk/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/REPLY.java
URL: http://svn.apache.org/viewvc/ode/trunk/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/REPLY.java?rev=693931&view=auto
==============================================================================
--- ode/trunk/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/REPLY.java (added)
+++ ode/trunk/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/REPLY.java Wed Sep 10 12:06:59 2008
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.bpel.rtrep.v1;
+
+import org.apache.ode.bpel.common.FaultException;
+import org.apache.ode.bpel.rtrep.v1.OReply;
+import org.apache.ode.bpel.rtrep.v1.OScope;
+import org.apache.ode.bpel.rtrep.v1.channels.FaultData;
+
+import java.util.Iterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+
+class REPLY extends ACTIVITY {
+ private static final long serialVersionUID = 3040651951885161304L;
+ private static final Log __log = LogFactory.getLog(REPLY.class);
+
+ REPLY(ActivityInfo self, ScopeFrame scopeFrame, LinkFrame linkFrame) {
+ super(self, scopeFrame, linkFrame);
+ }
+
+ public void run() {
+ final OReply oreply = (OReply)_self.o;
+
+ if (__log.isDebugEnabled()) {
+ __log.debug("<reply> partnerLink=" + oreply.partnerLink + ", operation=" + oreply.operation);
+ }
+ FaultData fault = null;
+
+ // TODO: Check for fault without message.
+
+ try {
+ sendVariableReadEvent(_scopeFrame.resolve(oreply.variable));
+ Node msg = fetchVariableData(_scopeFrame.resolve(oreply.variable), false);
+
+ assert msg instanceof Element;
+
+ for (Iterator<OScope.CorrelationSet> i = oreply.initCorrelations.iterator(); i.hasNext(); ) {
+ OScope.CorrelationSet cset = i.next();
+ initializeCorrelation(_scopeFrame.resolve(cset),
+ _scopeFrame.resolve(oreply.variable));
+ }
+
+ // send reply
+ getBpelRuntime()
+ .reply(_scopeFrame.resolve(oreply.partnerLink), oreply.operation.getName(),
+ oreply.messageExchangeId, (Element)msg,
+ (oreply.fault != null) ? oreply.fault : null);
+ } catch (FaultException e) {
+ __log.error(e);
+ fault = createFault(e.getQName(), oreply);
+ }
+
+ _self.parent.completed(fault, CompensationHandler.emptySet());
+ }
+}
Added: ode/trunk/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/RETHROW.java
URL: http://svn.apache.org/viewvc/ode/trunk/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/RETHROW.java?rev=693931&view=auto
==============================================================================
--- ode/trunk/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/RETHROW.java (added)
+++ ode/trunk/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/RETHROW.java Wed Sep 10 12:06:59 2008
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.bpel.rtrep.v1;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.rtrep.v1.channels.FaultData;
+import org.apache.ode.bpel.rapi.InvalidProcessException;
+
+
+/**
+ * FaultActivity
+ */
+class RETHROW extends ACTIVITY {
+ private static final long serialVersionUID = -6433171659586530126L;
+ private static final Log __log = LogFactory.getLog(RETHROW.class);
+
+ RETHROW(ActivityInfo self, ScopeFrame scopeFrame, LinkFrame linkFrame) {
+ super(self, scopeFrame, linkFrame);
+ }
+
+ public void run() {
+ // find the faultData in the scope stack
+ FaultData fault = _scopeFrame.getFault();
+ if(fault == null){
+ String msg = "Attempting to execute 'rethrow' activity with no visible fault in scope.";
+ __log.error(msg);
+ throw new InvalidProcessException(msg);
+ }
+
+ _self.parent.completed(fault,CompensationHandler.emptySet());
+ }
+}
Added: ode/trunk/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/ReplacementMapImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/ReplacementMapImpl.java?rev=693931&view=auto
==============================================================================
--- ode/trunk/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/ReplacementMapImpl.java (added)
+++ ode/trunk/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/ReplacementMapImpl.java Wed Sep 10 12:06:59 2008
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.bpel.rtrep.v1;
+
+import org.apache.ode.bpel.rtrep.v1.OBase;
+import org.apache.ode.bpel.rtrep.v1.OProcess;
+import org.apache.ode.jacob.soup.ReplacementMap;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+
+/**
+ * A JACOB {@link ReplacementMap} implementation that eliminates unnecessary serialization
+ * of the (constant) compiled process model.
+ */
+class ReplacementMapImpl implements ReplacementMap {
+ private OProcess _oprocess;
+
+ ReplacementMapImpl(OProcess oprocess) {
+ _oprocess = oprocess;
+ }
+
+ public boolean isReplacement(Object obj) {
+ return obj instanceof OBaseReplacementImpl;
+ }
+
+ public Object getOriginal(Object replacement) throws IllegalArgumentException {
+ if (!(replacement instanceof OBaseReplacementImpl))
+ throw new IllegalArgumentException("Not OBaseReplacementObject!");
+ return _oprocess.getChild(((OBaseReplacementImpl)replacement)._id);
+ }
+
+ public Object getReplacement(Object original) throws IllegalArgumentException {
+ if (!(original instanceof OBase))
+ throw new IllegalArgumentException("Not OBase!");
+ return new OBaseReplacementImpl(((OBase)original).getId());
+ }
+
+ public boolean isReplaceable(Object obj) {
+ return obj instanceof OBase;
+ }
+
+ /**
+ * Replacement object for serializtation of the {@link OBase} (compiled
+ * BPEL) objects in the JACOB VPU.
+ */
+ public static final class OBaseReplacementImpl implements Externalizable {
+ private static final long serialVersionUID = 1L;
+
+ int _id;
+
+ public OBaseReplacementImpl() {
+ }
+ public OBaseReplacementImpl(int id) {
+ _id = id;
+ }
+ public void readExternal(ObjectInput in) throws IOException {
+ _id = in.readInt();
+ }
+ public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeInt(_id);
+ }
+ }
+
+}
Added: ode/trunk/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/ResumeWork.java
URL: http://svn.apache.org/viewvc/ode/trunk/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/ResumeWork.java?rev=693931&view=auto
==============================================================================
--- ode/trunk/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/ResumeWork.java (added)
+++ ode/trunk/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/ResumeWork.java Wed Sep 10 12:06:59 2008
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.bpel.rtrep.v1;
+
+import org.apache.ode.utils.ObjectPrinter;
+
+import java.io.Serializable;
+
+/**
+ * Resumes execution of a bpel process
+ *
+ */
+public class ResumeWork implements Serializable{
+ static final long serialVersionUID = 1;
+
+ private Long _pid;
+
+ public ResumeWork(Long pid) {
+ _pid = pid;
+ }
+
+ public Long getPID(){
+ return _pid;
+ }
+
+ public String toString() {
+ return ObjectPrinter.toString(this, new Object[]{"pid", _pid});
+ }
+
+}
Added: ode/trunk/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/RuntimeImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/RuntimeImpl.java?rev=693931&view=auto
==============================================================================
--- ode/trunk/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/RuntimeImpl.java (added)
+++ ode/trunk/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/RuntimeImpl.java Wed Sep 10 12:06:59 2008
@@ -0,0 +1,161 @@
+package org.apache.ode.bpel.rtrep.v1;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.iapi.BpelEngineException;
+import org.apache.ode.bpel.iapi.ProcessConf;
+import org.apache.ode.bpel.common.FaultException;
+import org.apache.ode.bpel.rapi.OdeRuntime;
+import org.apache.ode.bpel.rapi.ProcessModel;
+import org.apache.ode.bpel.rapi.PropertyAliasModel;
+import org.apache.ode.bpel.rapi.OdeRTInstance;
+import org.apache.ode.bpel.rtrep.common.ConfigurationException;
+import org.apache.ode.bpel.extension.ExtensionBundleRuntime;
+import org.apache.ode.jacob.soup.ReplacementMap;
+import org.apache.ode.jacob.vpu.ExecutionQueueImpl;
+import org.apache.ode.utils.msg.MessageBundle;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+import org.w3c.dom.Text;
+
+import javax.xml.namespace.QName;
+
+public class RuntimeImpl implements OdeRuntime {
+ private static final Log __log = LogFactory.getLog(RuntimeImpl.class);
+ private static final Messages __msgs = MessageBundle.getMessages(Messages.class);
+
+ ProcessConf _pconf;
+ OProcess _oprocess;
+ Set<String> _mustUnderstandExtensions;
+ ReplacementMap _replacementMap;
+ ExpressionLanguageRuntimeRegistry _expLangRuntimeRegistry;
+ Map<String, ExtensionBundleRuntime> _extensionRegistry;
+
+ /**
+ * Initialize according to process configuration.
+ */
+ public void init(ProcessConf pconf) {
+ _pconf = pconf;
+ try {
+ _oprocess = deserializeCompiledProcess(_pconf.getCBPInputStream());
+ } catch (Exception e) {
+ String errmsg = "Error reloading compiled process " + _pconf.getProcessId() + "; the file appears to be corrupted.";
+ __log.error(errmsg);
+ throw new BpelEngineException(errmsg, e);
+ }
+
+ _replacementMap = new ReplacementMapImpl(_oprocess);
+
+ // Create an expression language registry for this process
+ ExpressionLanguageRuntimeRegistry elangRegistry = new ExpressionLanguageRuntimeRegistry();
+ for (OExpressionLanguage elang : _oprocess.expressionLanguages) {
+ try {
+ elangRegistry.registerRuntime(elang);
+ } catch (ConfigurationException e) {
+ String msg = __msgs.msgExpLangRegistrationError(elang.expressionLanguageUri, elang.properties);
+ __log.error(msg, e);
+ throw new BpelEngineException(msg, e);
+ }
+ }
+ _expLangRuntimeRegistry = elangRegistry;
+
+ // Checking for registered extension bundles, throw an exception when
+ // a "mustUnderstand" extension is not available
+ _mustUnderstandExtensions = new HashSet<String>();
+
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.ode.bpel.engine.rapi.OdeRuntime#createInstance(org.apache.ode.bpel.engine.rapi.OdeRTInstanceContext)
+ */
+ public OdeRTInstance newInstance(Object state) {
+ return new RuntimeInstanceImpl(this, (ExecutionQueueImpl) state);
+ }
+
+ public ReplacementMap getReplacementMap(QName processName) {
+ if (_pconf.getProcessId().equals(processName))
+ return new ReplacementMapImpl(_oprocess);
+ else throw new UnsupportedOperationException("Implement the creation of replacement map for other version.");
+ }
+
+ public ProcessModel getModel() {
+ return _oprocess;
+ }
+
+ /**
+ * Extract the value of a BPEL property from a BPEL messsage variable.
+ *
+ * @param msgData message variable data
+ * @param aliasModel alias to apply
+ * @param target description of the data (for error logging only)
+ * @return value of the property
+ * @throws org.apache.ode.bpel.common.FaultException
+ */
+ public String extractProperty(Element msgData, PropertyAliasModel aliasModel, String target) throws FaultException {
+ OProcess.OPropertyAlias alias = (OProcess.OPropertyAlias) aliasModel;
+ PropertyAliasEvaluationContext ectx = new PropertyAliasEvaluationContext(msgData, alias);
+ Node lValue = ectx.getRootNode();
+
+ if (alias.location != null)
+ lValue = _expLangRuntimeRegistry.evaluateNode(alias.location, ectx);
+
+ if (lValue == null) {
+ String errmsg = __msgs.msgPropertyAliasReturnedNullSet(alias.getDescription(), target);
+ if (__log.isErrorEnabled()) __log.error(errmsg);
+ throw new FaultException(_oprocess.constants.qnSelectionFailure, errmsg);
+ }
+
+ if (lValue.getNodeType() == Node.ELEMENT_NODE) {
+ // This is a bit hokey, we concatenate all the children's values; we really should be
+ // checking to make sure that we are only dealing with text and attribute nodes.
+ StringBuffer val = new StringBuffer();
+ NodeList nl = lValue.getChildNodes();
+ for (int i = 0; i < nl.getLength(); ++i) {
+ Node n = nl.item(i);
+ val.append(n.getNodeValue());
+ }
+ return val.toString();
+ } else if (lValue.getNodeType() == Node.TEXT_NODE) {
+ return ((Text) lValue).getWholeText();
+ } else
+ return null;
+ }
+
+ public void clear() {
+ _pconf = null;
+ _oprocess = null;
+ _mustUnderstandExtensions = null;
+ _replacementMap = null;
+ _expLangRuntimeRegistry = null;
+ _extensionRegistry = null;
+ }
+
+ public void setExtensionRegistry(Map<String, ExtensionBundleRuntime> extensionRegistry) {
+ _extensionRegistry = extensionRegistry;
+ }
+
+ /**
+ * Read an {@link org.apache.ode.bpel.rtrep.v1.OProcess} representation from a stream.
+ * @param is input stream
+ * @return deserialized process representation
+ * @throws java.io.IOException
+ * @throws ClassNotFoundException
+ */
+ private OProcess deserializeCompiledProcess(InputStream is) throws IOException, ClassNotFoundException {
+ OProcess compiledProcess;
+ Serializer ofh = new Serializer(is);
+ compiledProcess = ofh.readOProcess();
+ return compiledProcess;
+ }
+
+
+}
\ No newline at end of file
Added: ode/trunk/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/RuntimeInstanceImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/RuntimeInstanceImpl.java?rev=693931&view=auto
==============================================================================
--- ode/trunk/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/RuntimeInstanceImpl.java (added)
+++ ode/trunk/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/RuntimeInstanceImpl.java Wed Sep 10 12:06:59 2008
@@ -0,0 +1,729 @@
+package org.apache.ode.bpel.rtrep.v1;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Collection;
+import java.util.Date;
+import java.net.URI;
+
+import javax.wsdl.Operation;
+import javax.xml.namespace.QName;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.common.CorrelationKey;
+import org.apache.ode.bpel.common.FaultException;
+import org.apache.ode.bpel.evt.ProcessInstanceStartedEvent;
+import org.apache.ode.bpel.evt.ScopeEvent;
+import org.apache.ode.bpel.rapi.*;
+import org.apache.ode.bpel.extension.ExtensionOperation;
+import org.apache.ode.bpel.evar.ExternalVariableModuleException;
+import org.apache.ode.bpel.evar.IncompleteKeyException;
+import org.apache.ode.bpel.iapi.BpelEngineException;
+import org.apache.ode.bpel.extension.ExtensionBundleRuntime;
+import org.apache.ode.bpel.rtrep.v1.channels.*;
+import org.apache.ode.jacob.JacobRunnable;
+import org.apache.ode.jacob.vpu.ExecutionQueueImpl;
+import org.apache.ode.jacob.vpu.JacobVPU;
+import org.apache.ode.utils.DOMUtils;
+import org.apache.ode.utils.msg.MessageBundle;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+import org.w3c.dom.Text;
+
+/**
+ * Implementation of {@link org.apache.ode.bpel.rtrep.v1.OdeInternalInstance} for the "modern" runtime. This class also serves as a repository for kitchen sink type
+ * methods that the activities all use. A lot of these methods are simply deferals to similar methods on
+ * {@link org.apache.ode.bpel.rapi.OdeRTInstanceContext}; however here these methods use representation-specific classes (e.g.
+ * {@link org.apache.ode.bpel.rtrep.v1.OPartnerLink) while the {@link org.apache.ode.bpel.rapi.OdeRTInstanceContext} methods use only the general (non-representation specific) interfaces
+ * (e.g. {@link org.apache.ode.bpel.rapi.PartnerLink}.
+ *
+ * @author Maciej Szefler
+ *
+ */
+public class RuntimeInstanceImpl implements OdeInternalInstance, OdeRTInstance {
+ private static final Log __log = LogFactory.getLog(RuntimeInstanceImpl.class);
+
+ private static final Messages __msgs = MessageBundle.getMessages(Messages.class);
+
+ private OdeRTInstanceContext _brc;
+
+ /** JACOB VPU */
+ protected JacobVPU _vpu;
+
+ /** JACOB ExecutionQueue (state) */
+ protected ExecutionQueueImpl _soup;
+
+ private RuntimeImpl _runtime;
+
+ public RuntimeInstanceImpl(RuntimeImpl runtime, ExecutionQueueImpl soup) {
+ _runtime = runtime;
+ _vpu = new JacobVPU();
+ _vpu.registerExtension(OdeRTInstanceContext.class, this);
+ if (soup == null) {
+ _soup = new ExecutionQueueImpl(getClass().getClassLoader());
+ _soup.setGlobalData(new OutstandingRequestManager());
+ } else {
+ _soup = soup;
+ }
+
+ _soup.setReplacementMap(_runtime._replacementMap);
+ _vpu.setContext(_soup);
+ }
+
+ public ProcessModel getProcessModel() {
+ return _runtime._oprocess;
+ }
+
+ public boolean isCorrelationInitialized(CorrelationSetInstance correlationSet) {
+ return _brc.isCorrelationInitialized(correlationSet);
+ }
+
+ public boolean isVariableInitialized(VariableInstance var) {
+ return _brc.isVariableInitialized(var);
+ }
+
+ public boolean isPartnerRoleEndpointInitialized(PartnerLinkInstance pLink) {
+ return _brc.isPartnerRoleEndpointInitialized(pLink);
+ }
+
+ public void completedFault(FaultData faultData) {
+ cleanupOutstandingMyRoleExchanges(faultData);
+ _brc.completedFault(faultData);
+ }
+
+ public void completedOk() {
+ cleanupOutstandingMyRoleExchanges(null);
+ _brc.completedOk();
+ }
+
+ public Long createScopeInstance(Long parentScopeId, String name, int modelId) {
+ return _brc.createScopeInstance(parentScopeId, name, modelId);
+ }
+
+ public void initializePartnerLinks(Long parentScopeId, Collection<OPartnerLink> partnerLinks) {
+ _brc.initializePartnerLinks(parentScopeId, partnerLinks);
+
+ }
+
+ public void select(PickResponseChannel pickResponseChannel, Date timeout, boolean createInstance, Selector[] selectors)
+ throws FaultException {
+
+ final String pickResponseChannelStr = pickResponseChannel.export();
+
+ int conflict = getORM().findConflict(selectors);
+ if (conflict != -1)
+ throw new FaultException(_runtime._oprocess.constants.qnConflictingReceive, selectors[conflict].toString());
+
+ getORM().register(pickResponseChannelStr, selectors);
+
+ _brc.select(pickResponseChannelStr, timeout, selectors);
+ }
+
+ public CorrelationKey readCorrelation(CorrelationSetInstance cset) {
+ return _brc.readCorrelation(cset);
+ }
+
+ public Node fetchVariableData(VariableInstance variable, ScopeFrame scopeFrame, boolean forWriting) throws FaultException {
+ if (variable.declaration.extVar != null) {
+ // Note, that when using external variables, the database will not contain the value of the
+ // variable, instead we need to go the external variable subsystems.
+ Element reference = (Element) _brc.fetchVariableData(scopeFrame.resolve(variable.declaration.extVar.related), false);
+ try {
+ Node ret = _brc.readExtVar(variable, reference);
+ if (ret == null) {
+ throw new FaultException(_runtime._oprocess.constants.qnUninitializedVariable,
+ "The external variable \"" + variable.declaration.name + "\" has not been initialized.");
+ }
+ return ret;
+ } catch (IncompleteKeyException ike) {
+ // This indicates that the external variable needed to be written do, put has not been.
+ __log.error("External variable could not be read due to incomplete key; the following key " +
+ "components were missing: " + ike.getMissing());
+ throw new FaultException(_runtime._oprocess.constants.qnUninitializedVariable,
+ "The extenral variable \"" + variable.declaration.name + "\" has not been properly initialized;" +
+ "the following key compoenents were missing:" + ike.getMissing());
+ } catch (ExternalVariableModuleException e) {
+ throw new BpelEngineException(e);
+ }
+ } else /* not external */ {
+ Node data = _brc.fetchVariableData(variable, forWriting);
+ if (data == null) {
+ // Special case of messageType variables with no part
+ if (variable.declaration.type instanceof OMessageVarType) {
+ OMessageVarType msgType = (OMessageVarType) variable.declaration.type;
+ if (msgType.parts.size() == 0) {
+ Document doc = DOMUtils.newDocument();
+ Element root = doc.createElement("message");
+ doc.appendChild(root);
+ return root;
+ }
+ }
+ throw new FaultException(_runtime._oprocess.constants.qnUninitializedVariable,
+ "The variable " + variable.declaration.name + " isn't properly initialized.");
+ }
+ return data;
+ }
+ }
+
+ public Node fetchVariableData(VariableInstance var, ScopeFrame scopeFrame,
+ OMessageVarType.Part part, boolean forWriting) throws FaultException {
+ Node val = fetchVariableData(var, scopeFrame, forWriting);
+ if (part != null) return getPartData((Element) val, part);
+ return val;
+ }
+
+ public void writeCorrelation(CorrelationSetInstance cset, CorrelationKey ckeyVal) {
+ OScope.CorrelationSet csetdef = cset.declaration;
+ QName[] propNames = new QName[csetdef.properties.size()];
+ for (int m = 0; m < csetdef.properties.size(); m++) {
+ OProcess.OProperty oProperty = csetdef.properties.get(m);
+ propNames[m] = oProperty.name;
+ }
+
+ _brc.writeCorrelation(cset, propNames, ckeyVal);
+
+ }
+
+ /**
+ * Proxy to {@link org.apache.ode.bpel.rapi.OdeRTInstanceContext#sendEvent(org.apache.ode.bpel.evt.ProcessInstanceEvent)}.
+ *
+ * @param event
+ */
+ public void sendEvent(ScopeEvent event) {
+ _brc.sendEvent(event);
+ }
+
+ public void unregisterActivityForRecovery(ActivityRecoveryChannel recoveryChannel) {
+ _brc.unregisterActivityForRecovery(recoveryChannel.export());
+ }
+
+ /**
+ * Proxy to {@link org.apache.ode.bpel.rapi.RecoveryContext#registerActivityForRecovery(String, long, String, java.util.Date, org.w3c.dom.Element, String[], int)}.
+ */
+ public void registerActivityForRecovery(ActivityRecoveryChannel recoveryChannel, long id, String reason, Date dateTime,
+ Element details, String[] actions, int retryCount) {
+ _brc.registerActivityForRecovery(recoveryChannel.export(), id, reason, dateTime, details, actions, retryCount);
+ }
+
+ /**
+ * Proxy to {@link org.apache.ode.bpel.rapi.IOContext#registerTimer(String, java.util.Date)} .
+ */
+ public void registerTimer(TimerResponseChannel timerChannel, Date future) {
+ _brc.registerTimer(timerChannel.export(), future);
+ }
+
+ /**
+ * Proxy to {@link org.apache.ode.bpel.rapi.VariableContext#readVariableProperty(org.apache.ode.bpel.rapi.Variable, javax.xml.namespace.QName)}.
+ */
+ public String readProperty(VariableInstance variable, OProcess.OProperty property) throws FaultException {
+ try {
+ return _brc.readVariableProperty(variable, property.name);
+ } catch (UninitializedVariableException e) {
+ throw new FaultException(_runtime._oprocess.constants.qnUninitializedVariable);
+ }
+ }
+
+ /**
+ * Proxy to {@link org.apache.ode.bpel.rapi.OdeRTInstanceContext#genId() }.
+ */
+ public long genId() {
+ return _brc.genId();
+ }
+
+ /**
+ * Proxy to {@link org.apache.ode.bpel.rapi.OdeRTInstanceContext#initializeVariable(org.apache.ode.bpel.rapi.Variable, org.w3c.dom.Node)} then write properties.
+ */
+ public Node initializeVariable(VariableInstance var, ScopeFrame scopeFrame, Node val) throws ExternalVariableModuleException {
+ try {
+ if (var.declaration.extVar != null) /* external variable */ {
+ if (__log.isDebugEnabled())
+ __log.debug("Initialize external variable: name=" + var.declaration + " value="+DOMUtils.domToString(val));
+ Node reference = null;
+ try {
+ reference = fetchVariableData(var, scopeFrame, true);
+ } catch (FaultException fe) {
+ // In this context this is not necessarily a problem, since the assignment may re-init the related var
+ }
+ if (reference != null) val = _brc.readExtVar(var, reference);
+ return val;
+ } else /* normal variable */ {
+ if (__log.isDebugEnabled()) __log.debug("Initialize variable: name=" + var.declaration +
+ " value=" + DOMUtils.domToString(val));
+ return _brc.initializeVariable(var, val);
+ }
+ } finally {
+ writeProperties(var, val);
+ }
+ }
+
+ /**
+ * Proxy to {@link org.apache.ode.bpel.rapi.VariableContext#fetchMyRoleEndpointReferenceData(org.apache.ode.bpel.rapi.PartnerLink)}.
+ */
+ public Node fetchMyRoleEndpointReferenceData(PartnerLinkInstance link) {
+ return _brc.fetchMyRoleEndpointReferenceData(link);
+ }
+
+ public Node fetchPartnerRoleEndpointReferenceData(PartnerLinkInstance link) throws FaultException {
+ Element epr = _brc.fetchPartnerRoleEndpointReferenceData(link);
+ if (epr == null) {
+ throw new FaultException(_runtime._oprocess.constants.qnUninitializedPartnerRole);
+ }
+
+ return epr;
+
+ }
+
+ /**
+ * Proxy to {@link org.apache.ode.bpel.rapi.OdeRTInstanceContext#convertEndpointReference(org.w3c.dom.Element, org.w3c.dom.Node) }.
+ */
+ public Node convertEndpointReference(Element epr, Node lvaluePtr) {
+ return _brc.convertEndpointReference(epr, lvaluePtr);
+ }
+
+ public void commitChanges(VariableInstance var, ScopeFrame scopeFrame, Node value) throws ExternalVariableModuleException {
+ if (var.declaration.extVar != null) /* external variable */ {
+ __log.debug("Write external variable: name="+var.declaration + " value="+DOMUtils.domToString(value));
+ VariableInstance related = scopeFrame.resolve(var.declaration.extVar.related);
+ Node reference = null;
+ try {
+ reference = fetchVariableData(var, scopeFrame, true);
+ } catch (FaultException fe) {
+ // In this context this is not necessarily a problem, since the assignment may re-init the related var
+ }
+ VariableContext.ValueReferencePair vrp = _brc.writeExtVar(var, reference, value);
+ commitChanges(related, scopeFrame, vrp.reference);
+ } else /* normal variable */ {
+ __log.debug("Write variable: name="+var.declaration + " value="+DOMUtils.domToString(value));
+ _brc.commitChanges(var, value);
+ }
+ writeProperties(var, value);
+ }
+
+
+ /**
+ * Proxy to {@link BpelRuntimeContext# }.
+ */
+ public void writeEndpointReference(PartnerLinkInstance plval, Element element) {
+ _brc.writeEndpointReference(plval, element);
+ }
+
+ /**
+ * Proxy to {@link org.apache.ode.bpel.rapi.OdeRTInstanceContext#createScopeInstance(Long, String, int)}.
+ */
+ public Long createScopeInstance(Long scopeInstanceId, OScope scopedef) {
+ return _brc.createScopeInstance(scopeInstanceId, scopedef.name, scopedef.getId());
+ }
+
+ /**
+ * Proxy to {@link BpelRuntimeContext# }.
+ */
+ public String fetchMySessionId(PartnerLinkInstance linkInstance) {
+ return _brc.fetchMySessionId(linkInstance);
+ }
+
+ /**
+ * Proxy to {@link BpelRuntimeContext# }.
+ */
+ public void cancel(PickResponseChannel responseChannel) {
+ final String id = responseChannel.export();
+ _brc.cancelSelect(id);
+
+ getORM().cancel(id);
+
+ _vpu.inject(new JacobRunnable() {
+ private static final long serialVersionUID = 6157913683737696396L;
+
+ public void run() {
+ TimerResponseChannel responseChannel = importChannel(id, TimerResponseChannel.class);
+ responseChannel.onCancel();
+ }
+ });
+ }
+
+ /**
+ * Proxy to {@link BpelRuntimeContext# }.
+ */
+ public Element getMyRequest(String mexId) {
+ return _brc.getMyRequest(mexId);
+ }
+
+ /**
+ * Proxy to {@link BpelRuntimeContext# }.
+ */
+ public void initializePartnersSessionId(PartnerLinkInstance instance, String partnersSessionId) {
+ _brc.initializePartnersSessionId(instance, partnersSessionId);
+ }
+
+ /**
+ * Proxy to {@link org.apache.ode.bpel.rapi.IOContext#getSourceSessionId(String) }.
+ */
+ public String getSourceSessionId(String mexId) {
+ return _brc.getSourceSessionId(mexId);
+ }
+
+ public Node getSourceEPR(String mexId) {
+ return _brc.getSourceEPR(mexId);
+ }
+
+ public ExtensionOperation createExtensionActivityImplementation(QName name) {
+ if (name == null) return null;
+ ExtensionBundleRuntime bundle = _runtime._extensionRegistry.get(name.getNamespaceURI());
+ if (bundle == null) {
+ return null;
+ } else {
+ try {
+ return bundle.getExtensionOperationInstance(name.getLocalPart());
+ } catch (Exception e) {
+ return null;
+ }
+ }
+ }
+
+ /**
+ * Proxy to {@link org.apache.ode.bpel.rapi.ProcessControlContext# }.
+ */
+ public Long getPid() {
+ return _brc.getPid();
+ }
+
+ /**
+ * Proxy to {@link org.apache.ode.bpel.rapi.IOContext#getPartnerResponse(String)}.
+ */
+ public Element getPartnerResponse(String mexId) {
+ return _brc.getPartnerResponse(mexId);
+ }
+
+ /**
+ * Proxy to {@link org.apache.ode.bpel.rapi.IOContext#releasePartnerMex(String) }.
+ */
+ public void releasePartnerMex(String mexId) {
+ _brc.releasePartnerMex(mexId);
+ }
+
+ /**
+ * Proxy to {@link org.apache.ode.bpel.rapi.IOContext#getPartnerFault(String) }.
+ */
+ public QName getPartnerFault(String mexId) {
+ return _brc.getPartnerFault(mexId);
+ }
+
+ /**
+ * Proxy to {@link org.apache.ode.bpel.rapi.IOContext#getPartnerResponseType(String) }.
+ */
+ public QName getPartnerResponseType(String mexId) {
+ return _brc.getPartnerResponseType(mexId);
+ }
+
+ /**
+ * Proxy to {@link org.apache.ode.bpel.rapi.IOContext#getPartnerFaultExplanation(String) }.
+ */
+ public String getPartnerFaultExplanation(String mexId) {
+ return _brc.getPartnerFaultExplanation(mexId);
+ }
+
+ /**
+ * Proxy to {@link org.apache.ode.bpel.rapi.OdeRTInstanceContext#sendEvent(org.apache.ode.bpel.evt.ProcessInstanceEvent) }.
+ */
+ public void sendEvent(ProcessInstanceStartedEvent evt) {
+ _brc.sendEvent(evt);
+ }
+
+ /**
+ * Proxy to {@link org.apache.ode.bpel.rapi.IOContext#reply(org.apache.ode.bpel.rapi.PartnerLink, String, String, org.w3c.dom.Element, javax.xml.namespace.QName) }.
+ */
+ public void reply(PartnerLinkInstance plink, String opName, String bpelmex, Element element, QName fault) throws FaultException {
+ String mexid = getORM().release(plink, opName, bpelmex);
+ if (mexid == null)
+ throw new FaultException(_runtime._oprocess.constants.qnMissingRequest);
+
+ try {
+ _brc.reply(mexid, plink, opName, element, fault);
+ } catch (NoSuchOperationException e) {
+ // reply to operation that is either not defined or one-way. Perhaps this should be detected at compile time?
+ throw new FaultException(_runtime._oprocess.constants.qnMissingRequest,
+ "Undefined two-way operation \"" + opName + "\".");
+ }
+ }
+
+ /**
+ * Proxy to {@link org.apache.ode.bpel.rapi.ProcessControlContext#forceFlush() }.
+ */
+ public void forceFlush() {
+ _brc.forceFlush();
+ }
+
+ /**
+ * Proxy to {@link org.apache.ode.bpel.rapi.ProcessControlContext#terminate()}.
+ */
+ public void terminate() {
+ cleanupOutstandingMyRoleExchanges(null);
+ _brc.terminate();
+ }
+
+ /**
+ * Record all values of properties of a 'MessageType' variable for efficient lookup.
+ */
+ private void writeProperties(VariableInstance variable, Node value) {
+ if (variable.declaration.type instanceof OMessageVarType) {
+ for (OProcess.OProperty property : variable.declaration.getOwner().properties) {
+ OProcess.OPropertyAlias alias = property.getAlias(variable.declaration.type);
+ if (alias != null) {
+ try {
+ String val = extractProperty((Element) value, alias, variable.declaration.getDescription());
+ if (val != null)
+ _brc.writeVariableProperty(variable, property.name, val);
+ } catch (UninitializedVariableException uve) {
+ // This really should not happen, since we are writing to a variable that we just modified.
+ __log.fatal("Couldn't extract property '" + property.toString() + "' in property pre-extraction: " + uve);
+ throw new RuntimeException(uve);
+ } catch (FaultException e) {
+ // This will fail as we're basically trying to extract properties on all received messages
+ // for optimization purposes.
+ if (__log.isDebugEnabled())
+ __log.debug("Couldn't extract property '" + property.toString() + "' in property pre-extraction: "
+ + e.toString());
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Extract the value of a BPEL property from a BPEL messsage variable.
+ *
+ * @param msgData
+ * message variable data
+ * @param alias
+ * alias to apply
+ * @param target
+ * description of the data (for error logging only)
+ * @return value of the property
+ * @throws org.apache.ode.bpel.common.FaultException
+ */
+ String extractProperty(Element msgData, OProcess.OPropertyAlias alias, String target) throws FaultException {
+ PropertyAliasEvaluationContext ectx = new PropertyAliasEvaluationContext(msgData, alias);
+ Node lValue = ectx.getRootNode();
+
+ if (alias.location != null)
+ lValue = _runtime._expLangRuntimeRegistry.evaluateNode(alias.location, ectx);
+
+ if (lValue == null) {
+ String errmsg = __msgs.msgPropertyAliasReturnedNullSet(alias.getDescription(), target);
+ if (__log.isErrorEnabled()) {
+ __log.error(errmsg);
+ }
+ throw new FaultException(_runtime._oprocess.constants.qnSelectionFailure, errmsg);
+ }
+
+ if (lValue.getNodeType() == Node.ELEMENT_NODE) {
+ // This is a bit hokey, we concatenate all the children's values; we really should be checking
+ // to make sure that we are only dealing with text and attribute nodes.
+ StringBuffer val = new StringBuffer();
+ NodeList nl = lValue.getChildNodes();
+ for (int i = 0; i < nl.getLength(); ++i) {
+ Node n = nl.item(i);
+ val.append(n.getNodeValue());
+ }
+ return val.toString();
+ } else if (lValue.getNodeType() == Node.TEXT_NODE) {
+ return ((Text) lValue).getWholeText();
+ } else
+ return null;
+ }
+
+ public Node getPartData(Element message, OMessageVarType.Part part) {
+ // borrowed from ASSIGN.evalQuery()
+ Node ret = DOMUtils.findChildByName(message, new QName(null, part.name));
+ if (part.type instanceof OElementVarType) {
+ QName elName = ((OElementVarType) part.type).elementType;
+ ret = DOMUtils.findChildByName((Element) ret, elName);
+ } else if (part.type == null) {
+ // Special case of header parts never referenced in the WSDL def
+ if (ret != null && ret.getNodeType() == Node.ELEMENT_NODE
+ && ((Element)ret).getAttribute("headerPart") != null
+ && DOMUtils.getTextContent(ret) == null)
+ ret = DOMUtils.getFirstChildElement((Element) ret);
+ // The needed part isn't there, dynamically creating it
+ if (ret == null) {
+ ret = message.getOwnerDocument().createElementNS(null, part.name);
+ ((Element)ret).setAttribute("headerPart", "true");
+ message.appendChild(ret);
+ }
+ }
+
+ return ret;
+ }
+
+ /**
+ * @param instance
+ * @param operation
+ * @param outboundMsg
+ * @param object
+ */
+ public String invoke(String invokeId, PartnerLinkInstance instance, Operation operation, Element outboundMsg, Object object)
+ throws FaultException {
+ try {
+ return _brc.invoke(invokeId, instance, operation, outboundMsg);
+ } catch (UninitializedPartnerEPR e) {
+ throw new FaultException(_runtime._oprocess.constants.qnUninitializedPartnerRole);
+ }
+ }
+
+ /**
+ * @return
+ */
+ public ExpressionLanguageRuntimeRegistry getExpLangRuntime() {
+ return _runtime._expLangRuntimeRegistry;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.ode.bpel.engine.rapi.OdeInternalInstance#onMyRoleMessageExchange(java.lang.String, java.lang.String)
+ */
+ public void onSelectEvent(final String selectId, final String messageExchangeId, final int selectorIdx) {
+ getORM().associate(selectId, messageExchangeId);
+
+ _vpu.inject(new JacobRunnable() {
+ private static final long serialVersionUID = 3168964409165899533L;
+
+ public void run() {
+ // NOTE: we chose the selectId to be the exported representation of the pick response channel!
+ PickResponseChannel responseChannel = importChannel(selectId, PickResponseChannel.class);
+ responseChannel.onRequestRcvd(selectorIdx, messageExchangeId);
+ }
+ });
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.ode.bpel.engine.rapi.OdeInternalInstance#onTimerEvent(java.lang.String)
+ */
+ public void onTimerEvent(final String timerId) {
+ getORM().cancel(timerId);
+
+ _vpu.inject(new JacobRunnable() {
+ private static final long serialVersionUID = -7767141033611036745L;
+
+ public void run() {
+ // NOTE: note short cut, we chose timer id to be the same as the exported channel representation.
+ TimerResponseChannel responseChannel = importChannel(timerId, TimerResponseChannel.class);
+ responseChannel.onTimeout();
+ }
+ });
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.ode.bpel.engine.rapi.OdeInternalInstance#execute()
+ */
+ public boolean execute() {
+ return _vpu.execute();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.ode.bpel.engine.rapi.OdeInternalInstance#onInvokeResponse(java.lang.String, java.lang.String)
+ */
+ public void onInvokeResponse(final String invokeId, InvokeResponseType irt, final String mexid) {
+ // NOTE: do the switch outside the inject, since we don't want to end up serializing InvokeResponseType objects!
+ switch (irt) {
+ case REPLY:
+ _vpu.inject(new BpelJacobRunnable() {
+ private static final long serialVersionUID = -1095444335740879981L;
+
+ public void run() {
+ importChannel(invokeId, InvokeResponseChannel.class).onResponse();
+ }
+ });
+ break;
+ case FAULT:
+ _vpu.inject(new BpelJacobRunnable() {
+ private static final long serialVersionUID = -1095444335740879981L;
+
+ public void run() {
+ importChannel(invokeId, InvokeResponseChannel.class).onFault();
+ }
+ });
+ break;
+ case FAILURE:
+ _vpu.inject(new BpelJacobRunnable() {
+ private static final long serialVersionUID = -1095444335740879981L;
+
+ public void run() {
+ importChannel(invokeId, InvokeResponseChannel.class).onFailure();
+ }
+ });
+ break;
+ }
+ }
+
+ public void recoverActivity(final String channel, final long activityId, final String action, FaultInfo fault) {
+ // TODO: better translation here?
+ final FaultData fdata = (fault != null) ? new FaultData(fault.getFaultName(), null, fault.getExplanation()) : null;
+
+ _vpu.inject(new JacobRunnable() {
+ private static final long serialVersionUID = 3168964409165899533L;
+
+ public void run() {
+ ActivityRecoveryChannel recovery = importChannel(channel, ActivityRecoveryChannel.class);
+ __log.info("ActivityRecovery: Recovering activity " + activityId +
+ " with action " + action + " on channel " + recovery);
+ if (recovery != null) {
+ if ("cancel".equals(action)) recovery.cancel();
+ else if ("retry".equals(action)) recovery.retry();
+ else if ("fault".equals(action)) recovery.fault(fdata);
+ }
+ }
+ });
+ }
+
+ private OutstandingRequestManager getORM() {
+ return (OutstandingRequestManager) _soup.getGlobalData();
+ }
+
+ /**
+ * Called when the process completes to clean up any outstanding message exchanges.
+ *
+ */
+ private void cleanupOutstandingMyRoleExchanges(FaultInfo optionalFaultData) {
+ // TODO: all this should be moved into the engine. We don't really need the ORM to find
+ // these mexs, we can just scan the database
+ String[] mexRefs = getORM().releaseAll();
+ for (String mexId : mexRefs) {
+ _brc.noreply(mexId, optionalFaultData);
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.ode.bpel.engine.rapi.OdeInternalInstance#saveState()
+ */
+ public Object saveState(OutputStream bos) throws IOException {
+ _soup.write(bos);
+ return _soup;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.ode.bpel.engine.rapi.OdeInternalInstance#createInstance(java.lang.String)
+ */
+ public void onCreateInstance(String messageExchangeId) {
+ _vpu.inject(new PROCESS(_runtime._oprocess));
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.ode.bpel.engine.rapi.OdeInternalInstance#setContext(org.apache.ode.bpel.engine.rapi.OdeRTInstanceContext)
+ */
+ public void setContext(OdeRTInstanceContext ctx) {
+ _brc = ctx;
+ }
+
+ public URI getBaseResourceURI() {
+ return _runtime._pconf.getBaseURI();
+ }
+}
\ No newline at end of file
Added: ode/trunk/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/SCOPE.java
URL: http://svn.apache.org/viewvc/ode/trunk/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/SCOPE.java?rev=693931&view=auto
==============================================================================
--- ode/trunk/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/SCOPE.java (added)
+++ ode/trunk/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/SCOPE.java Wed Sep 10 12:06:59 2008
@@ -0,0 +1,470 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.bpel.rtrep.v1;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import javax.xml.namespace.QName;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.evt.ScopeFaultEvent;
+import org.apache.ode.bpel.evt.ScopeStartEvent;
+import org.apache.ode.bpel.evt.VariableModificationEvent;
+import org.apache.ode.bpel.rtrep.v1.OBase;
+import org.apache.ode.bpel.rtrep.v1.OCatch;
+import org.apache.ode.bpel.rtrep.v1.OElementVarType;
+import org.apache.ode.bpel.rtrep.v1.OEventHandler;
+import org.apache.ode.bpel.rtrep.v1.OFailureHandling;
+import org.apache.ode.bpel.rtrep.v1.OFaultHandler;
+import org.apache.ode.bpel.rtrep.v1.OLink;
+import org.apache.ode.bpel.rtrep.v1.OMessageVarType;
+import org.apache.ode.bpel.rtrep.v1.OScope;
+import org.apache.ode.bpel.rtrep.v1.OVarType;
+import org.apache.ode.bpel.rtrep.v1.channels.CompensationChannel;
+import org.apache.ode.bpel.rtrep.v1.channels.EventHandlerControlChannel;
+import org.apache.ode.bpel.rtrep.v1.channels.FaultData;
+import org.apache.ode.bpel.rtrep.v1.channels.ParentScopeChannel;
+import org.apache.ode.bpel.rtrep.v1.channels.ParentScopeChannelListener;
+import org.apache.ode.bpel.rtrep.v1.channels.TerminationChannel;
+import org.apache.ode.bpel.rtrep.v1.channels.TerminationChannelListener;
+import org.apache.ode.bpel.rapi.InvalidProcessException;
+import org.apache.ode.jacob.ChannelListener;
+import org.apache.ode.jacob.SynchChannel;
+import org.w3c.dom.Element;
+
+/**
+ * An active scope.
+ */
+class SCOPE extends ACTIVITY {
+ private static final long serialVersionUID = 6111903798996023525L;
+
+ private static final Log __log = LogFactory.getLog(SCOPE.class);
+
+ private OScope _oscope;
+ private ActivityInfo _child;
+ private Set<EventHandlerInfo> _eventHandlers = new HashSet<EventHandlerInfo>();
+
+ /** Constructor. */
+ public SCOPE(ActivityInfo self, ScopeFrame frame, LinkFrame linkFrame) {
+ super(self, frame, linkFrame);
+ _oscope = (OScope) self.o;
+ assert _oscope.activity != null;
+ }
+
+ public void run() {
+
+ // Start the child activity.
+ _child = new ActivityInfo(genMonotonic(),
+ _oscope.activity,
+ newChannel(TerminationChannel.class), newChannel(ParentScopeChannel.class));
+ instance(createChild(_child, _scopeFrame, _linkFrame));
+
+ if (_oscope.eventHandler != null) {
+ for (OEventHandler.OAlarm alarm : _oscope.eventHandler.onAlarms) {
+ EventHandlerInfo ehi = new EventHandlerInfo(alarm,
+ newChannel(EventHandlerControlChannel.class),
+ newChannel(ParentScopeChannel.class),
+ newChannel(TerminationChannel.class));
+ _eventHandlers.add(ehi);
+ instance(new EH_ALARM(ehi.psc, ehi.tc, ehi.cc, alarm, _scopeFrame));
+ }
+
+ for (OEventHandler.OEvent event : _oscope.eventHandler.onMessages) {
+ EventHandlerInfo ehi = new EventHandlerInfo(event,
+ newChannel(EventHandlerControlChannel.class),
+ newChannel(ParentScopeChannel.class),
+ newChannel(TerminationChannel.class));
+ _eventHandlers.add(ehi);
+ instance(new EH_EVENT(ehi.psc, ehi.tc, ehi.cc, event, _scopeFrame));
+ }
+ }
+
+ getBpelRuntime().initializePartnerLinks(_scopeFrame.scopeInstanceId,
+ _oscope.partnerLinks.values());
+
+ sendEvent(new ScopeStartEvent());
+ instance(new ACTIVE());
+ }
+
+ private List<CompensationHandler> findCompensationData(OScope scope) {
+ List<CompensationHandler> out = new ArrayList<CompensationHandler>();
+ for (Iterator<CompensationHandler> i = _scopeFrame.availableCompensations.iterator(); i.hasNext(); ) {
+ CompensationHandler ch = i.next();
+ if (null == scope || ch.compensated.oscope.equals(scope))
+ out.add(ch);
+ }
+ // TODO: sort out in terms of completion order
+ return out;
+ }
+
+ class ACTIVE extends ACTIVITY {
+ private static final long serialVersionUID = -5876892592071965346L;
+ /** Links collected. */
+ private boolean _terminated;
+ private FaultData _fault;
+ private long _startTime;
+ private final HashSet<CompensationHandler> _compensations = new HashSet<CompensationHandler>();
+ private boolean _childTermRequested;
+
+ ACTIVE() {
+ super(SCOPE.this._self, SCOPE.this._scopeFrame, SCOPE.this._linkFrame);
+ _startTime = System.currentTimeMillis();
+ }
+
+ public void run() {
+ if (_child != null || !_eventHandlers.isEmpty()) {
+ HashSet<ChannelListener> mlSet = new HashSet<ChannelListener>();
+
+ // Listen to messages from our parent.
+ mlSet.add(new TerminationChannelListener(_self.self) {
+ private static final long serialVersionUID = 1913414844895865116L;
+
+ public void terminate() {
+ _terminated = true;
+
+ // Forward the termination request to the nested activity.
+ if (_child != null && !_childTermRequested) {
+ replication(_child.self).terminate();
+ _childTermRequested = true;
+ }
+
+ // Forward the termination request to our event handlers.
+ terminateEventHandlers();
+
+ instance(ACTIVE.this);
+ }
+ });
+
+ // Handle messages from the child if it is still alive
+ if (_child != null) {
+ mlSet.add(new ParentScopeChannelListener(_child.parent) {
+ private static final long serialVersionUID = -6934246487304813033L;
+
+ public void compensate(OScope scope, SynchChannel ret) {
+ // If this scope does not have available compensations, defer to
+ // parent scope, otherwise do compensation.
+ if (_scopeFrame.availableCompensations == null)
+ _self.parent.compensate(scope, ret);
+ else {
+ // TODO: Check if we are doing duplicate compensation
+ List<CompensationHandler> compensations = findCompensationData(scope);
+ _scopeFrame.availableCompensations.removeAll(compensations);
+ instance(new ORDEREDCOMPENSATOR(compensations, ret));
+ }
+ instance(ACTIVE.this);
+ }
+
+ public void completed(FaultData flt, Set<CompensationHandler> compensations) {
+ // Set the fault to the activity's choice, if and only if no previous fault
+ // has been detected (first fault wins).
+ if (flt != null && _fault == null)
+ _fault = flt;
+ _child = null;
+ _compensations.addAll(compensations);
+
+ if (flt == null)
+ stopEventHandlers();
+ else
+ terminateEventHandlers();
+
+ instance(ACTIVE.this);
+ }
+
+ public void cancelled() {
+ // Implicit scope holds links of the enclosed activity,
+ // they only get cancelled when we propagate upwards.
+ if (_oscope.implicitScope)
+ _self.parent.cancelled();
+ else
+ completed(null, CompensationHandler.emptySet());
+ }
+
+ public void failure(String reason, Element data) {
+ completed(createFault(OFailureHandling.FAILURE_FAULT_NAME, _self.o, null),
+ CompensationHandler.emptySet());
+ }
+
+ });
+ }
+
+ // Similarly, handle messages from the event handler, if one exists
+ // and if it has not completed.
+ for (Iterator<EventHandlerInfo> i = _eventHandlers.iterator();i.hasNext();) {
+ final EventHandlerInfo ehi = i.next();
+
+ mlSet.add(new ParentScopeChannelListener(ehi.psc) {
+ private static final long serialVersionUID = -4694721357537858221L;
+
+ public void compensate(OScope scope, SynchChannel ret) {
+ // ACTIVE scopes do not compensate, send request up to parent.
+ _self.parent.compensate(scope, ret);
+ instance(ACTIVE.this);
+ }
+
+ public void completed(FaultData flt, Set<CompensationHandler> compenstations) {
+ // Set the fault to the activity's choice, if and only if no previous fault
+ // has been detected (first fault wins).
+ if (flt != null && _fault == null)
+ _fault = flt;
+ _eventHandlers.remove(ehi);
+ _compensations.addAll(compenstations);
+
+ if (flt != null) {
+ // Terminate child if we get a fault from the event handler.
+ if (_child != null && !_childTermRequested) {
+ replication(_child.self).terminate();
+ _childTermRequested = true;
+ }
+ terminateEventHandlers();
+ } else
+ stopEventHandlers();
+
+ instance(ACTIVE.this);
+ }
+
+ public void cancelled() { completed(null, CompensationHandler.emptySet()); }
+ public void failure(String reason, Element data) { completed(null, CompensationHandler.emptySet()); }
+ });
+ }
+ object(false, mlSet);
+ } else /* nothing to wait for... */ {
+ // Any compensation handlers that were available but not activated will be forgotten.
+ Set<CompensationHandler> unreachableCompensationHandlers = _scopeFrame.availableCompensations;
+ if (unreachableCompensationHandlers != null)
+ for (Iterator<CompensationHandler> i = unreachableCompensationHandlers.iterator(); i.hasNext(); ) {
+ CompensationHandler ch = i.next();
+ ch.compChannel.forget();
+ }
+ _scopeFrame.availableCompensations = null;
+
+ // Maintain a set of links needing dead-path elimination.
+ Set<OLink> linksNeedingDPE = new HashSet<OLink>();
+ if (_oscope.faultHandler != null)
+ for (Iterator<OCatch> i = _oscope.faultHandler.catchBlocks.iterator(); i.hasNext(); )
+ linksNeedingDPE.addAll(i.next().outgoingLinks);
+
+ // We're done with the main work, if we were terminated, we will
+ // need to load the termination handler:
+ if (_terminated) {
+ __log.debug("Scope: " + _oscope + " was terminated.");
+ // ??? Should we forward
+ _self.parent.completed(null,_compensations);
+ } else if (_fault != null) {
+
+ sendEvent(new ScopeFaultEvent(_fault.getFaultName(), _fault.getFaultLineNo(),_fault.getExplanation()));
+
+ // Find a fault handler for our fault.
+ OCatch catchBlock = _oscope.faultHandler == null ? null : findCatch(_oscope.faultHandler, _fault.getFaultName(), _fault.getFaultType());
+
+ // Collect all the compensation data for completed child scopes.
+ assert !!_eventHandlers.isEmpty();
+ assert _child == null;
+ if (catchBlock == null) {
+ // If we cannot find a catch block for this fault, then we simply propagate the fault
+ // to the parent. NOTE: the "default" fault handler as described in the BPEL spec
+ // must be generated by the compiler.
+ if (__log.isDebugEnabled())
+ __log.debug(_self + ": has no fault handler for "
+ + _fault.getFaultName() + "; scope will propagate FAULT!");
+
+
+ _self.parent.completed(_fault, _compensations);
+ } else /* catchBlock != null */ {
+ if (__log.isDebugEnabled())
+ __log.debug(_self + ": has a fault handler for "
+ + _fault.getFaultName() + ": "+ catchBlock);
+
+ linksNeedingDPE.removeAll(catchBlock.outgoingLinks);
+
+ // We have to create a scope for the catch block.
+ OdeInternalInstance ntive = getBpelRuntime();
+
+ ActivityInfo faultHandlerActivity = new ActivityInfo(genMonotonic(), catchBlock,
+ newChannel(TerminationChannel.class,"FH"), newChannel(ParentScopeChannel.class,"FH"));
+
+ ScopeFrame faultHandlerScopeFrame = new ScopeFrame(catchBlock,
+ ntive.createScopeInstance(_scopeFrame.scopeInstanceId, catchBlock),
+ _scopeFrame, _compensations, _fault);
+ if (catchBlock.faultVariable != null) {
+ try {
+ VariableInstance vinst = faultHandlerScopeFrame.resolve(catchBlock.faultVariable);
+ initializeVariable(vinst, _fault.getFaultMessage());
+
+ // Generating event
+ VariableModificationEvent se = new VariableModificationEvent(vinst.declaration.name);
+ se.setNewValue(_fault.getFaultMessage());
+ if (_oscope.debugInfo != null)
+ se.setLineNo(_oscope.debugInfo.startLine);
+ sendEvent(se);
+ } catch (Exception ex) {
+ __log.fatal(ex);
+ throw new InvalidProcessException(ex);
+ }
+ }
+
+ // Create the fault handler scope.
+ instance(new SCOPE(faultHandlerActivity,faultHandlerScopeFrame, SCOPE.this._linkFrame));
+
+ object(new ParentScopeChannelListener(faultHandlerActivity.parent) {
+ private static final long serialVersionUID = -6009078124717125270L;
+
+ public void compensate(OScope scope, SynchChannel ret) {
+ // This should never happen.
+ throw new AssertionError("received compensate request!");
+ }
+
+ public void completed(FaultData fault, Set<CompensationHandler> compensations) {
+ // The compensations that have been registered here, will never be activated,
+ // so we'll forget them as soon as possible.
+ for (CompensationHandler compensation : compensations)
+ compensation.compChannel.forget();
+
+ _self.parent.completed(fault, CompensationHandler.emptySet());
+ }
+
+ public void cancelled() { completed(null, CompensationHandler.emptySet()); }
+ public void failure(String reason, Element data) { completed(null, CompensationHandler.emptySet()); }
+ });
+ }
+ } else /* completed ok */ {
+ if (_oscope.compensationHandler != null) {
+ CompensationHandler compensationHandler = new CompensationHandler(
+ _scopeFrame,
+ newChannel(CompensationChannel.class),
+ _startTime,
+ System.currentTimeMillis());
+ _self.parent.completed(null, Collections.singleton(compensationHandler));
+ instance(new COMPENSATIONHANDLER_(compensationHandler, _compensations));
+ } else /* no compensation handler */ {
+ _self.parent.completed(null, _compensations);
+ }
+ }
+
+ // DPE links needing DPE (i.e. the unselected catch blocks).
+ dpe(linksNeedingDPE);
+ }
+ }
+
+ private void terminateEventHandlers() {
+ for (Iterator<EventHandlerInfo> i = _eventHandlers.iterator();i.hasNext(); ) {
+ EventHandlerInfo ehi = i.next();
+ if (!ehi.terminateRequested && !ehi.stopRequested) {
+ replication(ehi.tc).terminate();
+ ehi.terminateRequested = true;
+ }
+ }
+ }
+
+ private void stopEventHandlers() {
+ for (Iterator<EventHandlerInfo> i = _eventHandlers.iterator();i.hasNext();) {
+ EventHandlerInfo ehi = i.next();
+ if (!ehi.stopRequested && !ehi.terminateRequested) {
+ ehi.cc.stop();
+ ehi.stopRequested = true;
+ }
+ }
+ }
+
+ }
+
+
+ private static OCatch findCatch(OFaultHandler fh, QName faultName, OVarType faultType) {
+ OCatch bestMatch = null;
+ for (OCatch c : fh.catchBlocks) {
+ // First we try to eliminate this catch block based on fault-name mismatches:
+ if (c.faultName != null) {
+ if (faultName == null)
+ continue;
+ if (!faultName.equals(c.faultName))
+ continue;
+ }
+
+ // Then we try to eliminate this catch based on type incompatibility:
+ if (c.faultVariable != null) {
+ if (faultType == null)
+ continue;
+ else if (c.faultVariable.type instanceof OMessageVarType) {
+ if (faultType instanceof OMessageVarType
+ && ((OMessageVarType) faultType).equals(c.faultVariable.type)) {
+ // Don't eliminate.
+ } else if (faultType instanceof OElementVarType
+ && ((OMessageVarType) c.faultVariable.type).docLitType != null
+ && !((OMessageVarType) c.faultVariable.type).docLitType.equals(faultType)) {
+ // Don't eliminate.
+ } else {
+ continue; // Eliminate.
+ }
+ } else if (c.faultVariable.type instanceof OElementVarType) {
+ if (faultType instanceof OElementVarType && faultType.equals(c.faultVariable.type)) {
+ // Don't eliminate
+ } else if (faultType instanceof OMessageVarType
+ && ((OMessageVarType) faultType).docLitType != null
+ && ((OMessageVarType) faultType).docLitType.equals(c.faultVariable.type)) {
+ // Don't eliminate
+ } else {
+ continue; // eliminate
+ }
+ } else {
+ continue; // Eliminate
+ }
+ }
+
+ // If we got to this point we did not eliminate this catch block. However, we don't just
+ // use the first non-eliminated catch, we instead try to find the best match.
+ if (bestMatch == null) {
+ // Obviously something is better then nothing.
+ bestMatch = c;
+ } else {
+ // Otherwise we prefer name and variable matches but prefer name-only matches to
+ // variable-only matches.
+ int existingScore = (bestMatch.faultName == null ? 0 : 2) + (bestMatch.faultVariable == null ? 0 : 1);
+ int currentScore = (c.faultName == null ? 0 : 2) + (c.faultVariable == null ? 0 : 1);
+ if (currentScore > existingScore) {
+ bestMatch = c;
+ }
+ }
+ }
+ return bestMatch;
+ }
+
+ static final class EventHandlerInfo implements Serializable {
+ private static final long serialVersionUID = -9046603073542446478L;
+ final OBase o;
+ final EventHandlerControlChannel cc;
+ final ParentScopeChannel psc;
+ final TerminationChannel tc;
+ boolean terminateRequested;
+ boolean stopRequested;
+
+ EventHandlerInfo(OBase o, EventHandlerControlChannel cc, ParentScopeChannel psc, TerminationChannel tc) {
+ this.o = o;
+ this.cc = cc;
+ this.psc = psc;
+ this.tc = tc;
+ }
+ }
+
+}
Added: ode/trunk/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/SCOPEACT.java
URL: http://svn.apache.org/viewvc/ode/trunk/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/SCOPEACT.java?rev=693931&view=auto
==============================================================================
--- ode/trunk/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/SCOPEACT.java (added)
+++ ode/trunk/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/SCOPEACT.java Wed Sep 10 12:06:59 2008
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.bpel.rtrep.v1;
+
+import org.apache.ode.bpel.rtrep.v1.OScope;
+
+/**
+ * A scope activity. The scope activity creates a new scope frame and proceeeds
+ * using the {@link SCOPE} template.
+ */
+public class SCOPEACT extends ACTIVITY {
+ private static final long serialVersionUID = -4593029783757994939L;
+
+ public SCOPEACT(ActivityInfo self, ScopeFrame scopeFrame, LinkFrame linkFrame) {
+ super(self, scopeFrame, linkFrame);
+ }
+
+ public void run() {
+ ScopeFrame newFrame = new ScopeFrame(
+ (OScope) _self.o,getBpelRuntime().createScopeInstance(_scopeFrame.scopeInstanceId,(OScope) _self.o),
+ _scopeFrame,
+ null);
+ instance(new SCOPE(_self,newFrame, _linkFrame));
+ }
+}
Added: ode/trunk/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/SEQUENCE.java
URL: http://svn.apache.org/viewvc/ode/trunk/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/SEQUENCE.java?rev=693931&view=auto
==============================================================================
--- ode/trunk/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/SEQUENCE.java (added)
+++ ode/trunk/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/SEQUENCE.java Wed Sep 10 12:06:59 2008
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.bpel.rtrep.v1;
+
+import org.apache.ode.bpel.rtrep.v1.OActivity;
+import org.apache.ode.bpel.rtrep.v1.OScope;
+import org.apache.ode.bpel.rtrep.v1.OSequence;
+import org.apache.ode.bpel.rtrep.v1.channels.FaultData;
+import org.apache.ode.bpel.rtrep.v1.channels.ParentScopeChannel;
+import org.apache.ode.bpel.rtrep.v1.channels.ParentScopeChannelListener;
+import org.apache.ode.bpel.rtrep.v1.channels.TerminationChannel;
+import org.apache.ode.bpel.rtrep.v1.channels.TerminationChannelListener;
+import org.apache.ode.jacob.SynchChannel;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import org.w3c.dom.Element;
+
+/**
+ * Implementation of the BPEL <sequence> activity.
+ */
+class SEQUENCE extends ACTIVITY {
+ private static final long serialVersionUID = 1L;
+ private final List<OActivity> _remaining;
+ private final Set<CompensationHandler> _compensations;
+
+ SEQUENCE(ActivityInfo self, ScopeFrame scopeFrame, LinkFrame linkFrame) {
+ this(self, scopeFrame, linkFrame, ((OSequence)self.o).sequence, CompensationHandler.emptySet());
+ }
+
+ SEQUENCE(ActivityInfo self,
+ ScopeFrame scopeFrame,
+ LinkFrame linkFrame,
+ List<OActivity> remaining,
+ Set<CompensationHandler> compensations) {
+ super(self, scopeFrame, linkFrame);
+ _remaining = Collections.unmodifiableList(remaining);
+ _compensations =Collections.unmodifiableSet(compensations);
+ }
+
+ public void run() {
+ final ActivityInfo child = new ActivityInfo(genMonotonic(),
+ _remaining.get(0),
+ newChannel(TerminationChannel.class), newChannel(ParentScopeChannel.class));
+ instance(createChild(child, _scopeFrame, _linkFrame));
+ instance(new ACTIVE(child));
+ }
+
+ private class ACTIVE extends BpelJacobRunnable {
+ private static final long serialVersionUID = -2663862698981385732L;
+ private ActivityInfo _child;
+ private boolean _terminateRequested = false;
+
+ ACTIVE(ActivityInfo child) {
+ _child = child;
+ }
+
+ public void run() {
+ object(false, new TerminationChannelListener(_self.self) {
+ private static final long serialVersionUID = -2680515407515637639L;
+
+ public void terminate() {
+ replication(_child.self).terminate();
+
+ // Don't do any of the remaining activiites, DPE instead.
+ ArrayList<OActivity> remaining = new ArrayList<OActivity>(_remaining);
+ remaining.remove(0);
+ deadPathRemaining(remaining);
+
+ _terminateRequested = true;
+ instance(ACTIVE.this);
+ }
+ }.or(new ParentScopeChannelListener(_child.parent) {
+ private static final long serialVersionUID = 7195562310281985971L;
+
+ public void compensate(OScope scope, SynchChannel ret) {
+ _self.parent.compensate(scope,ret);
+ instance(ACTIVE.this);
+ }
+
+ public void completed(FaultData faultData, Set<CompensationHandler> compensations) {
+ HashSet<CompensationHandler> comps = new HashSet<CompensationHandler>(_compensations);
+ comps.addAll(compensations);
+ if (faultData != null || _terminateRequested || _remaining.size() <= 1) {
+ deadPathRemaining(_remaining);
+ _self.parent.completed(faultData, comps);
+ } else /* !fault && ! terminateRequested && !remaining.isEmpty */ {
+ ArrayList<OActivity> remaining = new ArrayList<OActivity>(_remaining);
+ remaining.remove(0);
+ instance(new SEQUENCE(_self, _scopeFrame, _linkFrame, remaining, comps));
+ }
+ }
+
+ public void cancelled() { completed(null, CompensationHandler.emptySet()); }
+ public void failure(String reason, Element data) { completed(null, CompensationHandler.emptySet()); }
+ }));
+ }
+
+ private void deadPathRemaining(List<OActivity> remaining) {
+ for (Iterator<OActivity> i = remaining.iterator();i.hasNext();)
+ dpe(i.next());
+ }
+
+ }
+
+ public String toString() {
+ StringBuffer buf = new StringBuffer("SEQUENCE(self=");
+ buf.append(_self);
+ buf.append(", linkframe=");
+ buf.append(_linkFrame);
+ buf.append(", remaining=");
+ buf.append(_remaining);
+ buf.append(')');
+ return buf.toString();
+ }
+}
Added: ode/trunk/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/SWITCH.java
URL: http://svn.apache.org/viewvc/ode/trunk/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/SWITCH.java?rev=693931&view=auto
==============================================================================
--- ode/trunk/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/SWITCH.java (added)
+++ ode/trunk/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/SWITCH.java Wed Sep 10 12:06:59 2008
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.bpel.rtrep.v1;
+
+import org.apache.ode.bpel.common.FaultException;
+import org.apache.ode.bpel.rtrep.v1.channels.FaultData;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Runtime implementation of the <code><switch></code> activity.
+ */
+class SWITCH extends ACTIVITY {
+ private static final long serialVersionUID = 1L;
+ private static final Log __log = LogFactory.getLog(SWITCH.class);
+
+ public SWITCH(ActivityInfo self, ScopeFrame scopeFrame, LinkFrame linkFrame) {
+ super(self, scopeFrame, linkFrame);
+ }
+
+ public final void run() {
+ OSwitch oswitch = (OSwitch)_self.o;
+ OSwitch.OCase matchedOCase = null;
+ FaultData faultData = null;
+
+ EvaluationContext evalCtx = getEvaluationContext();
+ for (Object o : oswitch.getCases()) {
+ OSwitch.OCase ocase = (OSwitch.OCase) o;
+ try {
+ if (getBpelRuntime().getExpLangRuntime().evaluateAsBoolean(ocase.expression, evalCtx)) {
+ matchedOCase = ocase;
+ break;
+ }
+ } catch (FaultException e) {
+ __log.error(e.getMessage(), e);
+ faultData = createFault(e.getQName(), ocase);
+ _self.parent.completed(faultData, CompensationHandler.emptySet());
+
+ // Dead path all the child activiites:
+ for (OSwitch.OCase oCase : oswitch.getCases()) dpe(oCase.activity);
+ return;
+ }
+ }
+
+ // Dead path cases not chosen
+ for (OSwitch.OCase cs : oswitch.getCases()) {
+ if (cs != matchedOCase)
+ dpe(cs.activity);
+ }
+
+ // no conditions satisfied, we're done.
+ if (matchedOCase == null) {
+ _self.parent.completed(null, CompensationHandler.emptySet());
+ } else /* matched case */ {
+ // Re-use our current channels.
+ ActivityInfo child = new ActivityInfo(genMonotonic(),matchedOCase.activity, _self.self, _self.parent);
+ instance(createChild(child,_scopeFrame,_linkFrame));
+ }
+ }
+}
\ No newline at end of file
Added: ode/trunk/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/ScopeFrame.java
URL: http://svn.apache.org/viewvc/ode/trunk/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/ScopeFrame.java?rev=693931&view=auto
==============================================================================
--- ode/trunk/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/ScopeFrame.java (added)
+++ ode/trunk/runtimes/src/main/java/org/apache/ode/bpel/rtrep/v1/ScopeFrame.java Wed Sep 10 12:06:59 2008
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.bpel.rtrep.v1;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.evt.ScopeEvent;
+import org.apache.ode.bpel.rtrep.v1.channels.FaultData;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Set;
+
+import javax.xml.namespace.QName;
+
+
+/**
+ * N-tuple representing a scope "frame" (as in stack frame).
+ */
+class ScopeFrame implements Serializable {
+ private static final long serialVersionUID = 1L;
+ private static final Log __log = LogFactory.getLog(ScopeFrame.class);
+
+ /** The compiled scope representation. */
+ final OScope oscope;
+
+ /** The parent scope frame. */
+ final ScopeFrame parent;
+
+ /** Database scope instance identifier. */
+ final Long scopeInstanceId;
+
+ Set<CompensationHandler> availableCompensations;
+
+ /** The fault context for this scope. */
+ private FaultData _faultData;
+
+ /** Constructor used to create "fault" scopes. */
+ ScopeFrame( OScope scopeDef,
+ Long scopeInstanceId,
+ ScopeFrame parent,
+ Set<CompensationHandler> visibleCompensationHandlers,
+ FaultData fault) {
+ this(scopeDef,scopeInstanceId,parent,visibleCompensationHandlers);
+ _faultData = fault;
+
+ }
+
+ public ScopeFrame( OScope scopeDef,
+ Long scopeInstanceId,
+ ScopeFrame parent,
+ Set<CompensationHandler> visibleCompensationHandlers) {
+ this.oscope = scopeDef;
+ this.scopeInstanceId = scopeInstanceId;
+ this.parent = parent;
+ this.availableCompensations = visibleCompensationHandlers;
+ }
+
+
+ public ScopeFrame find(OScope scope) {
+ if (oscope.name.equals(scope.name)) {
+ return this;
+ }
+
+ return (parent != null)
+ ? parent.find(scope)
+ : null;
+ }
+
+ public VariableInstance resolve(OScope.Variable variable) {
+ ScopeFrame scopeFrame = find(variable.declaringScope);
+ if (scopeFrame == null) return null;
+ return new VariableInstance(scopeFrame.scopeInstanceId, variable);
+ }
+
+ public CorrelationSetInstance resolve(OScope.CorrelationSet cset) {
+ return new CorrelationSetInstance(find(cset.declaringScope).scopeInstanceId, cset);
+ }
+
+ public PartnerLinkInstance resolve(OPartnerLink partnerLink) {
+ return new PartnerLinkInstance(find(partnerLink.declaringScope).scopeInstanceId, partnerLink);
+ }
+
+ public String toString() {
+ StringBuffer buf= new StringBuffer("{ScopeFrame: o=");
+ buf.append(oscope);
+ buf.append(", id=");
+ buf.append(scopeInstanceId);
+ if (availableCompensations != null) {
+ buf.append(", avComps=");
+ buf.append(availableCompensations);
+ }
+ if (_faultData != null) {
+ buf.append(", fault=");
+ buf.append(_faultData);
+ }
+ buf.append('}');
+ return buf.toString();
+ }
+
+ public FaultData getFault() {
+ if (_faultData != null)
+ return _faultData;
+ if (parent != null)
+ return parent.getFault();
+ return null;
+ }
+
+ public void fillEventInfo(ScopeEvent event) {
+ ScopeFrame currentScope = this;
+ ArrayList<String> parentNames = new ArrayList<String>();
+ while (currentScope != null) {
+ parentNames.add(currentScope.oscope.name);
+ currentScope = currentScope.parent;
+ }
+ event.setParentScopesNames(parentNames);
+ if (parent != null)
+ event.setParentScopeId(parent.scopeInstanceId);
+ event.setScopeId(scopeInstanceId);
+ event.setScopeName(oscope.name);
+ event.setScopeDeclerationId(oscope.getId());
+ if (event.getLineNo() == -1 && oscope.debugInfo != null)
+ event.setLineNo(oscope.debugInfo.startLine);
+ }
+
+}