You are viewing a plain text version of this content. The canonical link for it is here.
Posted to agila-commits@incubator.apache.org by mr...@apache.org on 2005/04/27 20:28:07 UTC
svn commit: r165042 [17/30] - in /incubator/agila/trunk: ./ config/ modules/
modules/bpel/ modules/bpel/client/ modules/bpel/client/api/
modules/bpel/client/api/src/ modules/bpel/client/api/src/conf/
modules/bpel/client/api/src/java/ modules/bpel/client/api/src/java/org/
modules/bpel/client/api/src/java/org/apache/
modules/bpel/client/api/src/java/org/apache/agila/
modules/bpel/client/api/src/java/org/apache/agila/bpel/
modules/bpel/client/api/src/java/org/apache/agila/bpel/client/
modules/bpel/client/api/src/test/ modules/bpel/client/api/src/test/junit/
modules/bpel/client/api/src/test/junit/org/
modules/bpel/client/api/src/test/junit/org/apache/
modules/bpel/client/api/src/test/junit/org/apache/agila/
modules/bpel/client/api/src/test/junit/org/apache/agila/bpel/
modules/bpel/client/api/src/test/junit/org/apache/agila/bpel/client/
modules/bpel/client/web/ modules/bpel/client/web/src/
modules/bpel/client/web/src/conf/ modules/bpel/client/web/src/java/
modules/bpel/client/web/src/java/org/
modules/bpel/client/web/src/java/org/apache/
modules/bpel/client/web/src/java/org/apache/agila/
modules/bpel/client/web/src/java/org/apache/agila/bpel/
modules/bpel/client/web/src/java/org/apache/agila/bpel/web/
modules/bpel/client/web/src/java/org/apache/agila/bpel/web/common/
modules/bpel/client/web/src/java/org/apache/agila/bpel/web/deployer/
modules/bpel/client/web/src/java/org/apache/agila/bpel/web/deployer/action/
modules/bpel/client/web/src/java/org/apache/agila/bpel/web/deployer/form/
modules/bpel/client/web/src/java/org/apache/agila/bpel/web/engine/
modules/bpel/client/web/src/java/org/apache/agila/bpel/web/engine/action/
modules/bpel/client/web/src/java/org/apache/agila/bpel/web/engine/form/
modules/bpel/client/web/src/java/org/apache/agila/bpel/web/global/
modules/bpel/client/web/src/java/org/apache/agila/bpel/web/global/action/
modules/bpel/client/web/src/java/org/apache/agila/bpel/web/security/
modules/bpel/client/web/src/java/org/apache/agila/bpel/web/security/action/
modules/bpel/client/web/src/java/org/apache/agila/bpel/web/security/form/
modules/bpel/client/web/src/java/org/apache/agila/bpel/web/user/
modules/bpel/client/web/src/java/org/apache/agila/bpel/web/user/action/
modules/bpel/client/web/src/java/org/apache/agila/bpel/web/user/form/
modules/bpel/client/web/src/java/org/apache/agila/bpel/web/util/
modules/bpel/client/web/src/java/org/apache/agila/bpel/web/worklist/
modules/bpel/client/web/src/java/org/apache/agila/bpel/web/worklist/action/
modules/bpel/client/web/src/webapp/
modules/bpel/client/web/src/webapp/META-INF/
modules/bpel/client/web/src/webapp/WEB-INF/
modules/bpel/client/web/src/webapp/WEB-INF/classes/
modules/bpel/client/web/src/webapp/images/
modules/bpel/client/web/src/webapp/images/audit/
modules/bpel/client/web/src/webapp/style/ modules/bpel/common/
modules/bpel/common/src/ modules/bpel/common/src/aspect/
modules/bpel/common/src/conf/ modules/bpel/common/src/java/
modules/bpel/common/src/java/org/ modules/bpel/common/src/java/org/apache/
modules/bpel/common/src/java/org/apache/agila/
modules/bpel/common/src/java/org/apache/agila/bpel/
modules/bpel/common/src/java/org/apache/agila/bpel/common/
modules/bpel/common/src/java/org/apache/agila/bpel/common/aspect/
modules/bpel/common/src/java/org/apache/agila/bpel/common/configuration/
modules/bpel/common/src/java/org/apache/agila/bpel/common/util/
modules/bpel/common/src/java/org/apache/agila/bpel/common/util/logger/
modules/bpel/common/src/test/ modules/bpel/common/src/test/junit/
modules/bpel/common/src/test/junit/org/
modules/bpel/common/src/test/junit/org/apache/
modules/bpel/common/src/test/junit/org/apache/agila/
modules/bpel/common/src/test/junit/org/apache/agila/bpel/
modules/bpel/common/src/test/junit/org/apache/agila/bpel/common/
modules/bpel/common/src/test/junit/org/apache/agila/bpel/common/util/
modules/bpel/dist/ modules/bpel/dist/src/ modules/bpel/dist/src/java/
modules/bpel/dist/src/java/org/ modules/bpel/dist/src/java/org/apache/
modules/bpel/dist/src/java/org/apache/agila/
modules/bpel/dist/src/java/org/apache/agila/bpel/
modules/bpel/dist/src/java/org/apache/agila/bpel/util/
modules/bpel/dist/src/script/ modules/bpel/engine/
modules/bpel/engine/common/ modules/bpel/engine/common/src/
modules/bpel/engine/common/src/aspect/
modules/bpel/engine/common/src/hibernate/
modules/bpel/engine/common/src/hibernate/org/
modules/bpel/engine/common/src/hibernate/org/apache/
modules/bpel/engine/common/src/hibernate/org/apache/agila/
modules/bpel/engine/common/src/hibernate/org/apache/agila/bpel/
modules/bpel/engine/common/src/hibernate/org/apache/agila/bpel/engine/
modules/bpel/engine/common/src/hibernate/org/apache/agila/bpel/engine/common/
modules/bpel/engine/common/src/java/ modules/bpel/engine/common/src/java/org/
modules/bpel/engine/common/src/java/org/apache/
modules/bpel/engine/common/src/java/org/apache/agila/
modules/bpel/engine/common/src/java/org/apache/agila/bpel/
modules/bpel/engine/common/src/java/org/apache/agila/bpel/engine/
modules/bpel/engine/common/src/java/org/apache/agila/bpel/engine/common/
modules/bpel/engine/common/src/java/org/apache/agila/bpel/engine/common/aspect/
modules/bpel/engine/common/src/java/org/apache/agila/bpel/engine/common/event/
modules/bpel/engine/common/src/java/org/apache/agila/bpel/engine/common/event/execution/
modules/bpel/engine/common/src/java/org/apache/agila/bpel/engine/common/event/execution/impl/
modules/bpel/engine/common/src/java/org/apache/agila/bpel/engine/common/event/lifecycle/
modules/bpel/engine/common/src/java/org/apache/agila/bpel/engine/common/event/lifecycle/dao/
modules/bpel/engine/common/src/java/org/apache/agila/bpel/engine/common/lifecycle/
modules/bpel/engine/common/src/java/org/apache/agila/bpel/engine/common/monitoring/
modules/bpel/engine/common/src/java/org/apache/agila/bpel/engine/common/persistence/
modules/bpel/engine/common/src/java/org/apache/agila/bpel/engine/common/transaction/
modules/bpel/engine/common/src/test/
modules/bpel/engine/common/src/test/junit/
modules/bpel/engine/common/src/test/junit/org/
modules/bpel/engine/common/src/test/junit/org/apache/
modules/bpel/engine/common/src/test/junit/org/apache/agila/
modules/bpel/engine/common/src/test/junit/org/apache/agila/bpel/
modules/bpel/engine/common/src/test/junit/org/apache/agila/bpel/engine/
modules/bpel/engine/common/src/test/junit/org/apache/agila/bpel/engine/common/
modules/bpel/engine/common/src/test/junit/org/apache/agila/bpel/engine/common/event/
modules/bpel/engine/common/src/test/resources/ modules/bpel/engine/core/
modules/bpel/engine/core/src/ modules/bpel/engine/core/src/aspect/
modules/bpel/engine/core/src/hibernate/
modules/bpel/engine/core/src/hibernate/org/
modules/bpel/engine/core/src/hibernate/org/apache/
modules/bpel/engine/core/src/hibernate/org/apache/agila/
modules/bpel/engine/core/src/hibernate/org/apache/agila/bpel/
modules/bpel/engine/core/src/hibernate/org/apache/agila/bpel/engine/
modules/bpel/engine/core/src/java/ modules/bpel/engine/core/src/java/org/
modules/bpel/engine/core/src/java/org/apache/
modules/bpel/engine/core/src/java/org/apache/agila/
modules/bpel/engine/core/src/java/org/apache/agila/bpel/
modules/bpel/engine/core/src/java/org/apache/agila/bpel/engine/
modules/bpel/engine/core/src/java/org/apache/agila/bpel/engine/exception/
modules/bpel/engine/core/src/java/org/apache/agila/bpel/engine/priv/
modules/bpel/engine/core/src/java/org/apache/agila/bpel/engine/priv/core/
modules/bpel/engine/core/src/java/org/apache/agila/bpel/engine/priv/core/definition/
modules/bpel/engine/core/src/java/org/apache/agila/bpel/engine/priv/core/definition/impl/
modules/bpel/engine/core/src/java/org/apache/agila/bpel/engine/priv/core/definition/impl/dao/
modules/bpel/engine/core/src/java/org/apache/agila/bpel/engine/priv/core/dynamic/
modules/bpel/engine/core/src/java/org/apache/agila/bpel/engine/priv/core/dynamic/impl/
modules/bpel/engine/core/src/java/org/apache/agila/bpel/engine/priv/core/dynamic/impl/dao/
modules/bpel/engine/core/src/java/org/apache/agila/bpel/engine/priv/core/dynamic/impl/xao/
modules/bpel/engine/core/src/java/org/apache/agila/bpel/engine/priv/expression/
modules/bpel/engine/core/src/java/org/apache/agila/bpel/engine/priv/expression/impl/
modules/bpel/engine/core/src/java/org/apache/agila/bpel/engine/priv/messaging/
modules/bpel/engine/core/src/java/org/apache/agila/bpel/engine/priv/messaging/impl/
modules/bpel/engine/core/src/java/org/apache/agila/bpel/engine/priv/timer/
modules/bpel/engine/core/src/java/org/apache/agila/bpel/engine/priv/timer/dao/
modules/bpel/engine/core/src/java/org/apache/agila/bpel/engine/priv/timer/po/
modules/bpel/engine/core/src/java/org/apache/agila/bpel/engine/priv/util/
modules/bpel/engine/core/src/test/ modules/bpel/engine/core/src/test/junit/
modules/bpel/engine/core/src/test/junit/org/
modules/bpel/engine/core/src/test/junit/org/apache/
modules/bpel/engine/core/src/test/junit/org/apache/agila/
modules/bpel/engine/core/src/test/junit/org/apache/agila/bpel/
modules/bpel/engine/core/src/test/junit/org/apache/agila/bpel/engine/
modules/bpel/engine/core/src/test/junit/org/apache/agila/bpel/engine/core/
modules/bpel/engine/core/src/test/junit/org/apache/agila/bpel/engine/core/definition/
modules/bpel/engine/core/src/test/junit/org/apache/agila/bpel/engine/core/dynamic/
modules/bpel/engine/core/src/test/junit/org/apache/agila/bpel/engine/messaging/
modules/bpel/engine/core/src/test/junit/org/apache/agila/bpel/tools/
modules/bpel/engine/core/src/test/junit/org/apache/agila/bpel/util/
modules/bpel/engine/core/src/test/resources/ modules/bpel/engine/deployer/
modules/bpel/engine/deployer/src/ modules/bpel/engine/deployer/src/aspect/
modules/bpel/engine/deployer/src/java/
modules/bpel/engine/deployer/src/java/org/
modules/bpel/engine/deployer/src/java/org/apache/
modules/bpel/engine/deployer/src/java/org/apache/agila/
modules/bpel/engine/deployer/src/java/org/apache/agila/bpel/
modules/bpel/engine/deployer/src/java/org/apache/agila/bpel/deployer/
modules/bpel/engine/deployer/src/java/org/apache/agila/bpel/deployer/exception/
modules/bpel/engine/deployer/src/java/org/apache/agila/bpel/deployer/priv/
modules/bpel/engine/deployer/src/java/org/apache/agila/bpel/deployer/priv/context/
modules/bpel/engine/deployer/src/java/org/apache/agila/bpel/deployer/priv/validate/
modules/bpel/engine/deployer/src/java/org/apache/agila/bpel/deployer/priv/validate/bpel/
modules/bpel/engine/deployer/src/java/org/apache/agila/bpel/deployer/priv/validate/bpel/complex/
modules/bpel/engine/deployer/src/java/org/apache/agila/bpel/deployer/priv/validate/bpel/exclusivity/
modules/bpel/engine/deployer/src/java/org/apache/agila/bpel/deployer/priv/validate/bpel/reference/
modules/bpel/engine/deployer/src/java/org/apache/agila/bpel/deployer/priv/validate/bpel/unicity/
modules/bpel/engine/deployer/src/java/org/apache/agila/bpel/deployer/priv/validate/wsdl/
modules/bpel/engine/deployer/src/test/
modules/bpel/engine/deployer/src/test/junit/
modules/bpel/engine/deployer/src/test/junit/org/
modules/bpel/engine/deployer/src/test/junit/org/apache/
modules/bpel/engine/deployer/src/test/junit/org/apache/agila/
modules/bpel/engine/deployer/src/test/junit/org/apache/agila/bpel/
modules/bpel/engine/deployer/src/test/junit/org/apache/agila/bpel/deployer/
modules/bpel/engine/deployer/src/test/resources/ modules/bpel/engine/user/
modules/bpel/engine/user/src/ modules/bpel/engine/user/src/aspect/
modules/bpel/engine/user/src/hibernate/
modules/bpel/engine/user/src/hibernate/org/
modules/bpel/engine/user/src/hibernate/org/apache/
modules/bpel/engine/user/src/hibernate/org/apache/agila/
modules/bpel/engine/user/src/hibernate/org/apache/agila/bpel/
modules/bpel/engine/user/src/hibernate/org/apache/agila/bpel/user/
modules/bpel/engine/user/src/java/ modules/bpel/engine/user/src/java/org/
modules/bpel/engine/user/src/java/org/apache/
modules/bpel/engine/user/src/java/org/apache/agila/
modules/bpel/engine/user/src/java/org/apache/agila/bpel/
modules/bpel/engine/user/src/java/org/apache/agila/bpel/user/
modules/bpel/engine/user/src/java/org/apache/agila/bpel/user/exception/
modules/bpel/engine/user/src/java/org/apache/agila/bpel/user/priv/
modules/bpel/engine/user/src/java/org/apache/agila/bpel/user/priv/dao/
modules/bpel/engine/user/src/test/ modules/bpel/engine/user/src/test/junit/
modules/bpel/engine/user/src/test/junit/org/
modules/bpel/engine/user/src/test/junit/org/apache/
modules/bpel/engine/user/src/test/junit/org/apache/agila/
modules/bpel/engine/user/src/test/junit/org/apache/agila/bpel/
modules/bpel/engine/user/src/test/junit/org/apache/agila/bpel/user/
modules/bpel/engine/user/src/test/resource/ modules/bpel/engine/worklist/
modules/bpel/engine/worklist/src/ modules/bpel/engine/worklist/src/aspect/
modules/bpel/engine/worklist/src/hibernate/
modules/bpel/engine/worklist/src/hibernate/org/
modules/bpel/engine/worklist/src/hibernate/org/apache/
modules/bpel/engine/worklist/src/hibernate/org/apache/agila/
modules/bpel/engine/worklist/src/hibernate/org/apache/agila/bpel/
modules/bpel/engine/worklist/src/hibernate/org/apache/agila/bpel/worklist/
modules/bpel/engine/worklist/src/java/
modules/bpel/engine/worklist/src/java/org/
modules/bpel/engine/worklist/src/java/org/apache/
modules/bpel/engine/worklist/src/java/org/apache/agila/
modules/bpel/engine/worklist/src/java/org/apache/agila/bpel/
modules/bpel/engine/worklist/src/java/org/apache/agila/bpel/worklist/
modules/bpel/engine/worklist/src/java/org/apache/agila/bpel/worklist/exception/
modules/bpel/engine/worklist/src/java/org/apache/agila/bpel/worklist/priv/
modules/bpel/engine/worklist/src/java/org/apache/agila/bpel/worklist/priv/dao/
modules/bpel/engine/worklist/src/java/org/apache/agila/bpel/worklist/priv/po/
modules/bpel/engine/worklist/src/test/
modules/bpel/engine/worklist/src/test/junit/
modules/bpel/engine/worklist/src/test/junit/org/
modules/bpel/engine/worklist/src/test/junit/org/apache/
modules/bpel/engine/worklist/src/test/junit/org/apache/agila/
modules/bpel/engine/worklist/src/test/junit/org/apache/agila/bpel/
modules/bpel/engine/worklist/src/test/junit/org/apache/agila/bpel/worklist/
modules/bpel/engine/worklist/src/test/resource/ modules/bpel/engine/wsa/
modules/bpel/engine/wsa/src/ modules/bpel/engine/wsa/src/aspect/
modules/bpel/engine/wsa/src/conf/ modules/bpel/engine/wsa/src/hibernate/
modules/bpel/engine/wsa/src/java/ modules/bpel/engine/wsa/src/java/org/
modules/bpel/engine/wsa/src/java/org/apache/
modules/bpel/engine/wsa/src/java/org/apache/agila/
modules/bpel/engine/wsa/src/java/org/apache/agila/bpel/
modules/bpel/engine/wsa/src/java/org/apache/agila/bpel/wsa/
modules/bpel/engine/wsa/src/schema/ modules/bpel/engine/wsa/src/test/
modules/bpel/engine/wsa/src/test/junit/
modules/bpel/engine/wsa/src/test/junit/org/
modules/bpel/engine/wsa/src/test/junit/org/apache/
modules/bpel/engine/wsa/src/test/junit/org/apache/agila/
modules/bpel/engine/wsa/src/test/junit/org/apache/agila/bpel/
modules/bpel/engine/wsa/src/test/junit/org/apache/agila/bpel/worklist/
modules/bpel/engine/wsa/src/test/resources/
modules/bpel/engine/wsa/src/webapp/
modules/bpel/engine/wsa/src/webapp/META-INF/
modules/bpel/engine/wsa/src/webapp/WEB-INF/
modules/bpel/engine/wsa/src/webapp/WEB-INF/classes/ modules/bpel/samples/
modules/bpel/samples/src/ modules/bpel/samples/src/xml/
modules/bpel/samples/src/xml/example1/
modules/bpel/samples/src/xml/example1/messages/
modules/bpel/samples/src/xml/example2/
modules/bpel/samples/src/xml/example2/messages/
modules/bpel/samples/src/xml/example3/
modules/bpel/samples/src/xml/example3/messages/
modules/bpel/samples/src/xml/example4/
modules/bpel/samples/src/xml/example4/messages/ modules/bpm/
modules/bpm/config/ modules/bpm/src/ modules/bpm/src/java/
modules/bpm/src/resources/ modules/bpm/src/test/ modules/bpm/src/webapp/ src/
Added: incubator/agila/trunk/modules/bpel/engine/core/src/java/org/apache/agila/bpel/engine/priv/messaging/impl/CoreWrappingFactory.java
URL: http://svn.apache.org/viewcvs/incubator/agila/trunk/modules/bpel/engine/core/src/java/org/apache/agila/bpel/engine/priv/messaging/impl/CoreWrappingFactory.java?rev=165042&view=auto
==============================================================================
--- incubator/agila/trunk/modules/bpel/engine/core/src/java/org/apache/agila/bpel/engine/priv/messaging/impl/CoreWrappingFactory.java (added)
+++ incubator/agila/trunk/modules/bpel/engine/core/src/java/org/apache/agila/bpel/engine/priv/messaging/impl/CoreWrappingFactory.java Wed Apr 27 13:27:43 2005
@@ -0,0 +1,122 @@
+/*
+ * Copyright 2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License")
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.agila.bpel.engine.priv.messaging.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.agila.bpel.engine.common.persistence.DBSessionException;
+import org.apache.agila.bpel.engine.common.persistence.FinderException;
+import org.apache.agila.bpel.engine.priv.core.definition.Activity;
+import org.apache.agila.bpel.engine.priv.core.definition.ActivityFactory;
+import org.apache.agila.bpel.engine.priv.core.definition.Pick;
+import org.apache.agila.bpel.engine.priv.core.dynamic.ExecutionContext;
+import org.apache.agila.bpel.engine.priv.core.dynamic.ExecutionContextFactory;
+import org.apache.agila.bpel.engine.priv.core.dynamic.PickEC;
+import org.apache.agila.bpel.engine.priv.core.dynamic.ProcessInstance;
+import org.apache.agila.bpel.engine.priv.core.dynamic.ReceiveEC;
+
+/**
+ * Factory class used by the MessageController implementation to get activities
+ * and execution contexts from the engine core. This factory includes methods
+ * to find picks and receives and wraps the picks to behave as a receive using
+ * the wrapping class. It basically calls factory methods of the engine core and
+ * creates correct wrappers for picks.
+ *
+ * @see PickAsReceive
+ * @see PickECAsReceiveEC
+ * @see org.apache.agila.bpel.engine.priv.core.definition.ActivityFactory
+ * @see org.apache.agila.bpel.engine.priv.core.dynamic.ExecutionContextFactory
+ */
+public class CoreWrappingFactory {
+
+ /**
+ * Find execution context by its activity and an instance. A special case is
+ * when the provided activity is our wrapper PickAsReceive. In this particular case
+ * we replace the wrapper with the Pick behind it in the finder and return the PickEC
+ * wrapped around PickECAsReceiveEC.
+ *
+ * @param activity
+ * @return
+ */
+ public static ExecutionContext findECsByActivityAndInstance(ProcessInstance instance, Activity activity) throws FinderException {
+ if (activity instanceof PickAsReceive) {
+ PickAsReceive pickAsReceive = (PickAsReceive) activity;
+ PickEC pickEC = (PickEC) ExecutionContextFactory.findECForActivityInInstance(instance, pickAsReceive.getPick());
+ return new PickECAsReceiveEC(pickEC, pickAsReceive.getEventPos());
+ } else {
+ return ExecutionContextFactory.findECForActivityInInstance(instance, activity);
+ }
+ }
+
+ /**
+ * Find receives as well as picks by invoker and wraps the picks using the
+ * class PickAsReceive, returning a List only containing Receive implementations.
+ *
+ * @param partnerLink
+ * @param portType
+ * @param operation
+ * @return List of Receive implementations, either a really Receive or a pick wrapped as a Receive
+ * @throws DBSessionException
+ */
+ public static List findReceivesByInvoker(String partnerLink, String namespace, String portType, String operation) throws DBSessionException {
+ List result = ActivityFactory.findReceivesByInvoker(partnerLink, namespace, portType, operation);
+ List pickEvents = ActivityFactory.findPickEventsByInvoker(partnerLink,namespace, portType, operation);
+ result.addAll(wrapPicksAsReceives(pickEvents));
+ return result;
+ }
+
+ /**
+ * Find receives as well as picks by invoker and wraps the picks using the
+ * class PickAsReceive, returning a List only containing Receive implementations.
+ *
+ * @param partnerLink
+ * @param portType
+ * @param operation
+ * @return List of Receive implementations, either a really Receive or a pick wrapped as a Receive
+ * @throws DBSessionException
+ */
+ public static List findReceivesByInvoker(String partnerLink, String portType, String operation) throws DBSessionException {
+ List result = ActivityFactory.findReceivesByInvoker(partnerLink, portType, operation);
+ List pickEvents = ActivityFactory.findPickEventsByInvoker(partnerLink, portType, operation);
+ result.addAll(wrapPicksAsReceives(pickEvents));
+ return result;
+ }
+
+ public static ReceiveEC reload(ReceiveEC receiveEC) throws DBSessionException, FinderException {
+ if (receiveEC instanceof PickECAsReceiveEC) {
+ PickEC pickEC = (PickEC) ExecutionContextFactory.findECById(receiveEC.getId());
+ return new PickECAsReceiveEC(pickEC, ((PickECAsReceiveEC) receiveEC).getEventPos());
+ } else {
+ return (ReceiveEC) ExecutionContextFactory.findECById(receiveEC.getId());
+ }
+ }
+
+ private static List wrapPicksAsReceives(List pickEvents) {
+ List result = new ArrayList(pickEvents.size());
+ Pick pick;
+ int eventPos;
+ for (int m = 0; m < pickEvents.size(); m++) {
+ Object[] objects = (Object[]) pickEvents.get(m);
+ pick = (Pick) objects[0];
+ eventPos = pick.getMessageEvents().indexOf(objects[1]);
+ PickAsReceive pickAsReceive = new PickAsReceive(pick, eventPos);
+ result.add(pickAsReceive);
+ }
+ return result;
+ }
+
+}
Added: incubator/agila/trunk/modules/bpel/engine/core/src/java/org/apache/agila/bpel/engine/priv/messaging/impl/DefaultMessageBrokerImpl.java
URL: http://svn.apache.org/viewcvs/incubator/agila/trunk/modules/bpel/engine/core/src/java/org/apache/agila/bpel/engine/priv/messaging/impl/DefaultMessageBrokerImpl.java?rev=165042&view=auto
==============================================================================
--- incubator/agila/trunk/modules/bpel/engine/core/src/java/org/apache/agila/bpel/engine/priv/messaging/impl/DefaultMessageBrokerImpl.java (added)
+++ incubator/agila/trunk/modules/bpel/engine/core/src/java/org/apache/agila/bpel/engine/priv/messaging/impl/DefaultMessageBrokerImpl.java Wed Apr 27 13:27:43 2005
@@ -0,0 +1,84 @@
+/*
+ * Copyright 2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License")
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.agila.bpel.engine.priv.messaging.impl;
+
+import org.dom4j.Document;
+import org.dom4j.DocumentHelper;
+import org.dom4j.Element;
+
+import org.apache.log4j.Logger;
+
+import org.apache.agila.bpel.engine.priv.core.definition.Activity;
+import org.apache.agila.bpel.engine.priv.messaging.MessageBroker;
+
+/**
+ * A default implementation of the MessageBroker interface that
+ * does absolutely nothing except logging.
+ */
+public class DefaultMessageBrokerImpl extends MessageBroker {
+
+ private static Logger log = Logger.getLogger(DefaultMessageBrokerImpl.class);
+
+ public void asyncSend(Activity sender, String partner, String portType, String operation, Document message) {
+ if (message == null) {
+ log.info("Send an asynchronous message with partner = " + partner + ", portType = " +
+ portType + ", operation = " + operation + " and message null.");
+ } else {
+ log.info("Send an asynchronous message with partner = " + partner + ", portType = " +
+ portType + ", operation = " + operation + " and message " + message.asXML());
+ }
+ }
+
+ public void asyncSend(Activity sender, String partner, String namespace, String portType, String operation, Document message) {
+ if (message == null) {
+ log.info("Send an asynchronous message with partner = " + partner + ", portType = " +
+ "{" + namespace + "}" + portType + ", operation = " + operation + " and message null.");
+ } else {
+ log.info("Send an asynchronous message with partner = " + partner + ", portType = " +
+ "{" + namespace + "}" + portType + ", operation = " + operation + " and message " + message.asXML());
+ }
+ }
+
+ public Document syncSend(Activity sender, String partner, String portType, String operation, Document message) {
+ if (message == null) {
+ log.info("Send a synchronous message with partner = " + partner + ", portType = " +
+ portType + ", operation = " + operation + " and message null.");
+ } else {
+ log.info("Send a synchronous message with partner = " + partner + ", portType = " +
+ portType + ", operation = " + operation + " and message " + message.asXML());
+ }
+
+ Document doc = DocumentHelper.createDocument();
+ Element elmt = doc.addElement("message");
+ elmt.addElement("reply").addElement("status").setText("ok");
+ return doc;
+ }
+
+ public Document syncSend(Activity sender, String partner, String namespace, String portType, String operation, Document message) {
+ if (message == null) {
+ log.info("Send a synchronous message with partner = " + partner + ", portType = " +
+ "{" + namespace + "}" + portType + ", operation = " + operation + " and message null.");
+ } else {
+ log.info("Send a synchronous message with partner = " + partner + ", portType = " +
+ "{" + namespace + "}" + portType + ", operation = " + operation + " and message " + message.asXML());
+ }
+
+ Document doc = DocumentHelper.createDocument();
+ Element elmt = doc.addElement("message");
+ elmt.addElement("reply").addElement("status").setText("ok");
+ return doc;
+ }
+}
Added: incubator/agila/trunk/modules/bpel/engine/core/src/java/org/apache/agila/bpel/engine/priv/messaging/impl/MessageControllerImpl.java
URL: http://svn.apache.org/viewcvs/incubator/agila/trunk/modules/bpel/engine/core/src/java/org/apache/agila/bpel/engine/priv/messaging/impl/MessageControllerImpl.java?rev=165042&view=auto
==============================================================================
--- incubator/agila/trunk/modules/bpel/engine/core/src/java/org/apache/agila/bpel/engine/priv/messaging/impl/MessageControllerImpl.java (added)
+++ incubator/agila/trunk/modules/bpel/engine/core/src/java/org/apache/agila/bpel/engine/priv/messaging/impl/MessageControllerImpl.java Wed Apr 27 13:27:43 2005
@@ -0,0 +1,518 @@
+/*
+ * Copyright 2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License")
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.agila.bpel.engine.priv.messaging.impl;
+
+import org.apache.log4j.Logger;
+import org.dom4j.Document;
+import org.apache.agila.bpel.engine.common.persistence.DBSessionException;
+import org.apache.agila.bpel.engine.common.persistence.FinderException;
+import org.apache.agila.bpel.engine.common.persistence.XMLSessionException;
+import org.apache.agila.bpel.engine.exception.*;
+import org.apache.agila.bpel.engine.priv.core.definition.CorrelationRef;
+import org.apache.agila.bpel.engine.priv.core.definition.Receive;
+import org.apache.agila.bpel.engine.priv.core.definition.Activity;
+import org.apache.agila.bpel.engine.priv.core.definition.Flow;
+import org.apache.agila.bpel.engine.priv.core.definition.impl.StructuredActivityImpl;
+import org.apache.agila.bpel.engine.priv.core.definition.impl.ProcessImpl;
+import org.apache.agila.bpel.engine.priv.core.dynamic.*;
+import org.apache.agila.bpel.engine.priv.core.dynamic.impl.ReceiveECImpl;
+import org.apache.agila.bpel.engine.priv.messaging.MessageController;
+import org.apache.agila.bpel.engine.priv.util.CorrelationExtractor;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Implementation of the MessageController interface.
+ *
+ * @see org.apache.agila.bpel.engine.priv.messaging.MessageController
+ */
+public class MessageControllerImpl implements MessageController {
+
+ private Logger log = Logger.getLogger(getClass());
+
+ /**
+ * As the acknowledge method requires a partner link it's always targeted at only one process. If
+ * a process includes several receives with createInstance, they must have same correlation.
+ * 1. Checking all receives and selecting their correlations
+ * 2. For each receive, finding their ec from correlation and instance.
+ * 3. If several ecs are found, send conflicting receive failure. Done.
+ * 4. If one ec is found, sending it the message. Done.
+ * 5. If no ecs are found :
+ * 5.1. If no process instance exists for the extracted correlation, just pick one of the receives
+ * and executing it. Done.
+ * 5.2. If a process instance already exists, using this instance to execute a new receive into (case
+ * of a process having several starting receives in parrallel flow branches). Done.
+ * Picks are processed exactly as receives using wrappers.
+ */
+ public ReceiveEC acknowledge(String partnerLink, String namespace, String portType, String operation, Document message) throws CorrelationViolationException, ConflictingReceiveException {
+ // Finding the activity from partnerLink, portType and operation
+ if (message == null || !"message".equals(message.getRootElement().getName())) {
+ throw new IllegalArgumentException("The message must have a 'message' element as root");
+ }
+ List receives = null;
+ try {
+ receives = CoreWrappingFactory.findReceivesByInvoker(partnerLink, namespace, portType, operation);
+ } catch (DBSessionException e) {
+ log.error("Could not acknowledge a message from (" + partnerLink + ", " + "{" + namespace + "}" + portType
+ + ", " + operation + ") : " + message.asXML(), e);
+ throw new EngineRuntimeException("Server error", e);
+ }
+
+
+ ReceiveEC receiveEC = null;
+ Map receiveECCorrels = null;
+ List createInstanceReceive = new ArrayList();
+ List createInstanceReceiveCorrel = new ArrayList();
+
+ log.debug("Found receives: " + receives.size());
+
+ // 1 and 2
+ for (int m = 0; m < receives.size(); m++) {
+ Receive receive = (Receive) receives.get(m);
+ Map selectedCorrelRefs = null;
+
+ // Checking receive correlations to retrieve a value from the message for all properties
+ // of those correlations.
+ try {
+ selectedCorrelRefs = getValuedCorrelations(receive, message);
+ } catch (CorrelationViolationException e) {
+ log.error("Message doesn't include necessary correlation information : " + message.asXML(), e);
+ throw e;
+ } catch (RuntimeException e) {
+ log.error("Could not value correlations for receive " + receive + " and message " + message.asXML());
+ throw e;
+ }
+
+ // Taking profit of this iteration to prepare collections with receives that
+ // have the createInstance attribute to true and their valued correlation.
+ if (receive.isCreateInstance()) {
+ createInstanceReceive.add(receive);
+ createInstanceReceiveCorrel.add(selectedCorrelRefs);
+ }
+
+ // First correlation already initiated will lead us to the right receiveEC
+ CorrelationRef initiatedRef = null;
+ for (Iterator correlIter = selectedCorrelRefs.keySet().iterator(); correlIter.hasNext();) {
+ CorrelationRef setRef = (CorrelationRef) correlIter.next();
+ if (!setRef.isInitiate()) {
+ initiatedRef = setRef;
+ }
+ }
+ // If we cannot find any initiated correlation, no instance can be found so the receive
+ // is ignored
+ if (initiatedRef != null) {
+ if (receiveEC == null) {
+ // The correlation ref helps us find the right instance and with the instance
+ // and the originating activity we can find the right waiting receive ec.
+ receiveEC = getExecutionContext(receive, initiatedRef.getSet(),
+ (Map) selectedCorrelRefs.get(initiatedRef), true);
+ receiveECCorrels = selectedCorrelRefs;
+ } else {
+ ReceiveEC anotherOne = getExecutionContext(receive, initiatedRef.getSet(),
+ (Map) selectedCorrelRefs.get(initiatedRef), true);
+ if (anotherOne != null) {
+ // 3 The above condition just checks that there is also an execution context for
+ // this receive. If not execution exist, it's not a conflict.
+ throw new ConflictingReceiveException("Several receive execution contexts are " +
+ "waiting at the same message at the same time. Message : " + message.asXML(), ((ReceiveECImpl) anotherOne).fetchInstance());
+ }
+ }
+ }
+ }
+
+ // Forwarding the message to the right execution context given activity and correlation
+ // 4
+ if (receiveEC != null) {
+ log.debug("Step 4.");
+ // Adding newly initiated correlations to the process instance
+ ProcessInstance instance = receiveEC.fetchInstance();
+ try {
+ for (Iterator correlIter = receiveECCorrels.keySet().iterator(); correlIter.hasNext();) {
+ CorrelationRef correlationRef = (CorrelationRef) correlIter.next();
+ if (correlationRef.isInitiate()) {
+ if (receiveECCorrels.get(correlationRef) != null) {
+ ProcessInstanceFactory.addCorrelation(instance, correlationRef.getSet(),
+ (Map) receiveECCorrels.get(correlationRef), true);
+ } else {
+ throw new CorrelationViolationException("A correlation " + correlationRef.getSet() +
+ " defined as initiate could not be extracted from received message" +
+ message.asXML());
+ }
+ }
+ }
+ } catch (DBSessionException e) {
+ throw new EngineRuntimeException("Could not add correlation to receive execution context " + receiveEC, e);
+ } catch (XMLSessionException e) {
+ throw new EngineRuntimeException("Could not add correlation to receive execution context " + receiveEC, e);
+ }
+ return receiveEC;
+ } else {
+ // 5.1
+ log.debug("Step 5.");
+ if (createInstanceReceive.size() > 0) {
+ CorrelationRef correlationRef = (CorrelationRef)
+ ((Map) createInstanceReceiveCorrel.get(0)).keySet().iterator().next();
+ Map correlationValues = (Map) ((Map) createInstanceReceiveCorrel.get(0)).get(correlationRef);
+ ProcessInstance instance = null;
+ try {
+ instance = ProcessInstanceFactory.findInstanceByCorrelation(correlationRef.getSet(), correlationValues);
+ } catch (DBSessionException e) {
+ throw new EngineRuntimeException("Could not find instance from correlation " + correlationRef.getSet(), e);
+ } catch (FinderException e) { // instance is just null, it's ok this way
+ }
+
+ if (instance == null) {
+ log.debug("Step 5.1.");
+ ReceiveEC resultEC = executeReceive((Receive) createInstanceReceive.get(0), message, (Map) createInstanceReceiveCorrel.get(0), null);
+ // No need to go further, only activity with createInstance is served.
+ return resultEC;
+ } else {
+ log.debug("Step 5.2.");
+ // 5.2
+ // All receives with createInstance must have same correlation so they all have same instance
+ if (createInstanceReceiveCorrel.size() == 0) {
+ throw new ConflictingReceiveException("Several receive with createInstance are " +
+ "waiting at the same message at the same time. Message : " + message);
+ }
+ for (int m = 0; m < createInstanceReceive.size(); m++) {
+ Receive receive = (Receive) createInstanceReceive.get(m);
+ CorrelationRef receiveCorrelationRef = (CorrelationRef)
+ ((Map) createInstanceReceiveCorrel.get(m)).keySet().iterator().next();
+ Map receiveCorrelationValues = (Map) ((Map) createInstanceReceiveCorrel.get(m)).get(correlationRef);
+ ReceiveEC ec = getExecutionContext(receive, receiveCorrelationRef.getSet(), receiveCorrelationValues, false);
+ if (ec == null) {
+ // Executing first one not executed yet.
+ ReceiveEC resultEC = executeReceive(receive, message,
+ (Map) createInstanceReceiveCorrel.get(m), instance);
+ return resultEC;
+ }
+ }
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
+ * As the acknowledge method requires a partner link it's always targeted at only one process. If
+ * a process includes several receives with createInstance, they must have same correlation.
+ * 1. Checking all receives and selecting their correlations
+ * 2. For each receive, finding their ec from correlation and instance.
+ * 3. If several ecs are found, send conflicting receive failure. Done.
+ * 4. If one ec is found, sending it the message. Done.
+ * 5. If no ecs are found :
+ * 5.1. If no process instance exists for the extracted correlation, just pick one of the receives
+ * and executing it. Done.
+ * 5.2. If a process instance already exists, using this instance to execute a new receive into (case
+ * of a process having several starting receives in parrallel flow branches). Done.
+ * Picks are processed exactly as receives using wrappers.
+ */
+ public ReceiveEC acknowledge(String partnerLink, String portType, String operation, Document message) throws CorrelationViolationException, ConflictingReceiveException {
+ // Finding the activity from partnerLink, portType and operation
+ if (message == null || !"message".equals(message.getRootElement().getName())) {
+ throw new IllegalArgumentException("The message must have a 'message' element as root");
+ }
+ List receives = null;
+ try {
+ receives = CoreWrappingFactory.findReceivesByInvoker(partnerLink, portType, operation);
+ } catch (DBSessionException e) {
+ log.error("Could not acknowledge a message from (" + partnerLink + ", " + portType
+ + ", " + operation + ") : " + message.asXML(), e);
+ throw new EngineRuntimeException("Server error", e);
+ }
+
+ ReceiveEC receiveEC = null;
+ Map receiveECCorrels = null;
+ List createInstanceReceive = new ArrayList();
+ List createInstanceReceiveCorrel = new ArrayList();
+ // 1 and 2
+ for (int m = 0; m < receives.size(); m++) {
+ Receive receive = (Receive) receives.get(m);
+ Map selectedCorrelRefs = null;
+
+ // Checking receive correlations to retrieve a value from the message for all properties
+ // of those correlations.
+ try {
+ selectedCorrelRefs = getValuedCorrelations(receive, message);
+ } catch (CorrelationViolationException e) {
+ log.error("Message doesn't include necessary correlation information : " + message.asXML(), e);
+ throw e;
+ } catch (RuntimeException e) {
+ log.error("Could not value correlations for receive " + receive + " and message " + message.asXML());
+ throw e;
+ }
+
+ // Taking profit of this iteration to prepare collections with receives that
+ // have the createInstance attribute to true and their valued correlation.
+ if (receive.isCreateInstance()) {
+ createInstanceReceive.add(receive);
+ createInstanceReceiveCorrel.add(selectedCorrelRefs);
+ }
+
+ // First correlation already initiated will lead us to the right receiveEC
+ CorrelationRef initiatedRef = null;
+ for (Iterator correlIter = selectedCorrelRefs.keySet().iterator(); correlIter.hasNext();) {
+ CorrelationRef setRef = (CorrelationRef) correlIter.next();
+ if (!setRef.isInitiate()) {
+ initiatedRef = setRef;
+ }
+ }
+ // If we cannot find any initiated correlation, no instance can be found so the receive
+ // is ignored
+ if (initiatedRef != null) {
+ if (receiveEC == null) {
+ // The correlation ref helps us find the right instance and with the instance
+ // and the originating activity we can find the right waiting receive ec.
+ receiveEC = getExecutionContext(receive, initiatedRef.getSet(),
+ (Map) selectedCorrelRefs.get(initiatedRef), true);
+ receiveECCorrels = selectedCorrelRefs;
+ } else if (getExecutionContext(receive, initiatedRef.getSet(),
+ (Map) selectedCorrelRefs.get(initiatedRef), true) != null) {
+ // 3 The above condition just checks that there is also an execution context for
+ // this receive. If not execution exist, it's not a conflict.
+ throw new ConflictingReceiveException("Several receive execution contexts are " +
+ "waiting at the same message at the same time. Message : " + message);
+ }
+ }
+ }
+
+ // Forwarding the message to the right execution context given activity and correlation
+ // 4
+ if (receiveEC != null) {
+ // Adding newly initiated correlations to the process instance
+ ProcessInstance instance = receiveEC.fetchInstance();
+ try {
+ for (Iterator correlIter = receiveECCorrels.keySet().iterator(); correlIter.hasNext();) {
+ CorrelationRef correlationRef = (CorrelationRef) correlIter.next();
+ if (correlationRef.isInitiate()) {
+ if (receiveECCorrels.get(correlationRef) != null) {
+ ProcessInstanceFactory.addCorrelation(instance, correlationRef.getSet(),
+ (Map) receiveECCorrels.get(correlationRef), true);
+ } else {
+ throw new CorrelationViolationException("A correlation " + correlationRef.getSet() +
+ " defined as initiate could not be extracted from received message" +
+ message.asXML());
+ }
+ }
+ }
+ } catch (DBSessionException e) {
+ throw new EngineRuntimeException("Could not add correlation to receive execution context " + receiveEC, e);
+ } catch (XMLSessionException e) {
+ throw new EngineRuntimeException("Could not add correlation to receive execution context " + receiveEC, e);
+ }
+ return receiveEC;
+ } else {
+ // 5.1
+ log.debug("Step 5.");
+ if (createInstanceReceive.size() > 0) {
+ ProcessInstance instance = null;
+ CorrelationRef correlationRef = null;
+ if (((Map) createInstanceReceiveCorrel.get(0)).size() > 0) {
+ correlationRef = (CorrelationRef)
+ ((Map) createInstanceReceiveCorrel.get(0)).keySet().iterator().next();
+ Map correlationValues = (Map) ((Map) createInstanceReceiveCorrel.get(0)).get(correlationRef);
+ try {
+ instance = ProcessInstanceFactory.findInstanceByCorrelation(correlationRef.getSet(), correlationValues);
+ } catch (DBSessionException e) {
+ throw new EngineRuntimeException("Could not find instance from correlation " + correlationRef.getSet(), e);
+ } catch (FinderException e) { // instance is just null, it's ok this way
+ }
+ }
+
+ if (instance == null) {
+ log.debug("Step 5.1.");
+ ReceiveEC resultEC = executeReceive((Receive) createInstanceReceive.get(0), message, (Map) createInstanceReceiveCorrel.get(0), null);
+ // No need to go further, only activity with createInstance is served.
+ return resultEC;
+ } else {
+ log.debug("Step 5.2.");
+ // 5.2
+ // All receives with createInstance must have same correlation so they all have same instance
+ if (createInstanceReceiveCorrel.size() == 0) {
+ throw new ConflictingReceiveException("Several receive with createInstance are " +
+ "waiting at the same message at the same time. Message : " + message);
+ }
+ for (int m = 0; m < createInstanceReceive.size(); m++) {
+ Receive receive = (Receive) createInstanceReceive.get(m);
+ CorrelationRef receiveCorrelationRef = (CorrelationRef)
+ ((Map) createInstanceReceiveCorrel.get(m)).keySet().iterator().next();
+ Map receiveCorrelationValues = (Map) ((Map) createInstanceReceiveCorrel.get(m)).get(correlationRef);
+ ReceiveEC ec = getExecutionContext(receive, receiveCorrelationRef.getSet(), receiveCorrelationValues, false);
+ if (ec == null) {
+ // Executing first one not executed yet.
+ ReceiveEC resultEC = executeReceive(receive, message, (Map) createInstanceReceiveCorrel.get(m), instance);
+ return resultEC;
+ }
+ }
+
+ }
+ }
+ }
+ return null;
+ }
+
+ /*
+ * Returns a map containing the CorrelationRef as key and a map of properties name/value
+ * pairs as value.
+ */
+ private Map getValuedCorrelations(Receive receive, Document message) throws CorrelationViolationException {
+ Map selectedCorrelRefs = CorrelationExtractor.extractCorrelationsValues(receive.fetchProcess(), receive.getNamespace(), receive.getCorrelations(), message);
+ return selectedCorrelRefs;
+ }
+
+ /**
+ * Returns an execution context from its parent receive and its correlation values. The
+ * only solution here was to get the collection of all execution contexts for an activity
+ * and filtrate the collection the retain only the execution context belonging to the wanted
+ * process instance.
+ */
+ private ReceiveEC getExecutionContext(Receive receive, String correlationName, Map correlationValues, boolean active) {
+ ReceiveEC result = null;
+ ProcessInstance instance = null;
+ try {
+ instance = ProcessInstanceFactory.findInstanceByCorrelation(correlationName, correlationValues);
+ } catch (DBSessionException e) {
+ //log.error("An error occured when finding a receive ec from receive " + receive, e);
+ // TODO
+ return null;
+ //throw new RuntimeException("Server error");
+ } catch (FinderException e) {
+ //log.error("Could not find instance from correlation (" + correlationName + ", " + correlationValues + ")", e);
+ // TODO
+ return null;
+ //throw new RuntimeException("Server error");
+ }
+
+ try {
+ result = (ReceiveEC) CoreWrappingFactory.findECsByActivityAndInstance(instance, receive);
+ } catch (FinderException e) {
+ // This just means that the provided receive doesn't have any execution context so
+ // we're just returning null.
+ }
+
+ if (result == null) {
+ return null;
+ }
+ if (active && result.getStatus() != ExecutionContext.ACTIVE) {
+ return null;
+ } else {
+ return result;
+ }
+ }
+
+ private ReceiveEC executeReceive(Receive receive, Document message, Map receiveCorrelations, ProcessInstance instance) {
+ ReceiveEC createdEC = null;
+ try {
+ // Executing the activity (creates activity ec and its containers ec up to
+ // the process instance) providing it all correlations (if there are some).
+ if (receiveCorrelations.isEmpty()) {
+ createdEC = (ReceiveEC) receive.execute(null, null);
+ } else {
+ for (Iterator correlIter = receiveCorrelations.keySet().iterator(); correlIter.hasNext();) {
+ CorrelationRef setRef = (CorrelationRef) correlIter.next();
+ if (setRef.isInitiate()) {
+ if (receiveCorrelations.get(setRef) != null) {
+ if (createdEC == null) {
+ if (instance == null) {
+ createdEC = (ReceiveEC) receive.execute(setRef.getSet(),
+ (Map) receiveCorrelations.get(setRef));
+ } else {
+ // Very specific case when an instance already exists and we have to create
+ // an ec for another receive that is being also executed in the same process.
+ // This basically heppens when several receive can all start a process execution
+ // in different branches of a flow acivity.
+ createdEC = (ReceiveEC) executeInInstance(receive, instance);
+ createdEC.execute();
+ }
+ } else {
+ ProcessInstanceFactory.addCorrelation(createdEC.fetchInstance(), setRef.getSet(),
+ (Map) receiveCorrelations.get(setRef), true);
+ }
+ } else {
+ throw new CorrelationViolationException("A correlation " + setRef.getSet() +
+ " defined as initiate could not be extracted from received message" +
+ message.asXML());
+ }
+ }
+ }
+ }
+ } catch (EngineException e) {
+ log.error("Could not execute receive " + receive, e);
+ throw new EngineRuntimeException("Server error", e);
+ } catch (Exception e) {
+ log.error("Could not add correlation to receive execution context " + createdEC, e);
+ throw new EngineRuntimeException("Server error", e);
+ }
+ // Making the ReceiveEC to acknowledge the message.
+// createdEC.acknowledgeMessage(message);
+ return createdEC;
+ }
+
+ /**
+ * Instantiates an activity in an instance that has already been started. Basically
+ * starts from instantiating the activity and all containers up to it until reaching
+ * the container that has already been started in the provided instance.
+ * This is necessary when several receive activities can all start a process execution in
+ * different branches of a flow activity. Recursive implementation.
+ * @param activity
+ * @param instance
+ * @return
+ */
+ private ExecutionContext executeInInstance(Activity activity, ProcessInstance instance) {
+ ExecutionContext context = null;
+
+ if (activity instanceof Flow) {
+ FlowEC flowEC = fetchFirstFlowEC(instance.getChildExecutionContext());
+ return flowEC;
+ } else if (activity.getContainer() != null) {
+ StructuredEC parentContext = (StructuredEC) executeInInstance(activity.getContainer(), instance);
+ try {
+ context = ExecutionContextFactory.createExecutionContext(activity, parentContext);
+ } catch (DBSessionException e) {
+ throw new EngineRuntimeException("Server error", e);
+ }
+ }
+
+ return context;
+
+ }
+
+ /**
+ * Gets the first flow execution context encountered while going down the execution contexts
+ * containment hierarchy. Recursive implementation.
+ * @param childExecutionContext
+ * @return
+ */
+ private FlowEC fetchFirstFlowEC(ExecutionContext childExecutionContext) {
+ if (childExecutionContext instanceof FlowEC) {
+ return (FlowEC) childExecutionContext;
+ } else {
+ if (childExecutionContext instanceof StructuredEC) {
+ List children = ((StructuredEC)childExecutionContext).getExecutionContexts();
+ for (int m = 0; m < children.size(); m++) {
+ ExecutionContext child = (ExecutionContext) children.get(m);
+ return fetchFirstFlowEC(child);
+ }
+ }
+ }
+ return null;
+ }
+
+
+}
Added: incubator/agila/trunk/modules/bpel/engine/core/src/java/org/apache/agila/bpel/engine/priv/messaging/impl/MyMessageBroker.java
URL: http://svn.apache.org/viewcvs/incubator/agila/trunk/modules/bpel/engine/core/src/java/org/apache/agila/bpel/engine/priv/messaging/impl/MyMessageBroker.java?rev=165042&view=auto
==============================================================================
--- incubator/agila/trunk/modules/bpel/engine/core/src/java/org/apache/agila/bpel/engine/priv/messaging/impl/MyMessageBroker.java (added)
+++ incubator/agila/trunk/modules/bpel/engine/core/src/java/org/apache/agila/bpel/engine/priv/messaging/impl/MyMessageBroker.java Wed Apr 27 13:27:43 2005
@@ -0,0 +1,567 @@
+/*
+ * Copyright 2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License")
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.agila.bpel.engine.priv.messaging.impl;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.ByteArrayOutputStream;
+import java.rmi.RemoteException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Vector;
+
+import javax.wsdl.Binding;
+import javax.wsdl.BindingOperation;
+import javax.wsdl.Definition;
+import javax.wsdl.Import;
+import javax.wsdl.Service;
+import javax.wsdl.WSDLException;
+import javax.wsdl.extensions.ExtensibilityElement;
+import javax.wsdl.extensions.soap.SOAPBody;
+import javax.wsdl.extensions.soap.SOAPOperation;
+import javax.wsdl.factory.WSDLFactory;
+import javax.wsdl.xml.WSDLReader;
+import javax.wsdl.xml.WSDLWriter;
+import javax.xml.namespace.QName;
+import javax.xml.rpc.ServiceException;
+
+import org.apache.axis.client.Call;
+import org.apache.axis.message.SOAPBodyElement;
+import org.dom4j.Document;
+import org.dom4j.DocumentException;
+import org.dom4j.DocumentHelper;
+import org.dom4j.io.DOMReader;
+import org.dom4j.io.DOMWriter;
+
+import org.apache.log4j.Logger;
+
+import org.apache.agila.bpel.engine.common.persistence.FinderException;
+import org.apache.agila.bpel.engine.common.persistence.XMLDataAccess;
+import org.apache.agila.bpel.engine.common.persistence.XMLSessionException;
+import org.apache.agila.bpel.engine.priv.core.definition.Activity;
+import org.apache.agila.bpel.engine.priv.core.definition.AgilaProcess;
+import org.apache.agila.bpel.engine.priv.messaging.InvocationException;
+import org.apache.agila.bpel.engine.priv.messaging.MessageBroker;
+
+import EDU.oswego.cs.dl.util.concurrent.BoundedBuffer;
+import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
+
+
+import javax.xml.namespace.QName;
+/**
+ * Invokes a web service from a WSDL definition.
+ */
+public class MyMessageBroker extends MessageBroker {
+
+ /** @TODO parameterize this */
+ private PooledExecutor threadPool = new PooledExecutor(new BoundedBuffer(10), 100);
+
+ private static final String SOAP_NS = "http://schemas.xmlsoap.org/wsdl/soap/";
+ private DOMReader domReader = null;
+ private DOMWriter domWriter = null;
+
+ protected void asyncSend(Activity sender, String partner, String portType, String operation, Document message) {
+ final Activity fsender = sender;
+ final String fpartner = partner;
+ final String fportType = portType;
+ final String foperation = operation;
+ final Document fmessage = message;
+ try {
+ threadPool.execute(new Runnable() {
+ public void run() {
+ syncSend(fsender, fpartner, fportType, foperation, fmessage);
+ }
+ });
+ } catch (InterruptedException e) {
+ log.error("Could not send message to " + portType + " : " + message.asXML(), e);
+ }
+ }
+
+ protected void asyncSend(Activity sender, String partner, String namespace, String portType, String operation, Document message) {
+ final Activity fsender = sender;
+ final String fpartner = partner;
+ final String fportType = portType;
+ final String fnamespace = namespace;
+ final String foperation = operation;
+ final Document fmessage = message;
+ try {
+ threadPool.execute(new Runnable() {
+ public void run() {
+ syncSend(fsender, fpartner, fnamespace, fportType, foperation, fmessage);
+ }
+ });
+ } catch (InterruptedException e) {
+ log.error("Could not send message to " + portType + " : " + message.asXML(), e);
+ }
+ }
+
+ protected Document syncSend(Activity sender, String partner, String portType, String operation, Document message) {
+ AgilaProcess process = sender.fetchProcess();
+ org.w3c.dom.Document wsdlDoc = getWSDLDesc(process.getNamespace(), process.getName());
+ Definition def = readWSDL(wsdlDoc);
+
+ String wsdlStringDoc = getDOMReader().read(wsdlDoc).asXML();
+ String nameSpaceURI = def.getTargetNamespace();
+ QName serviceName = getService(def, portType);
+ String[] wsdlAdditionalInfo = getWSDLAdditionalInfo(def, portType, operation);
+ String soapAction = wsdlAdditionalInfo[0];
+ String msgNS = null;
+ if (wsdlAdditionalInfo[1] != null && wsdlAdditionalInfo[1].length() > 0) {
+ msgNS = wsdlAdditionalInfo[1];
+ } else {
+ msgNS = nameSpaceURI;
+ }
+ String encoding = wsdlAdditionalInfo[2];
+ InputStream wsdlDesc = new ByteArrayInputStream(wsdlStringDoc.getBytes());
+ Document preparedMsg = prepareMessage(message, operation, msgNS, encoding);
+
+ Document result = null;
+ try {
+ new org.dom4j.io.XMLWriter(System.out).write(preparedMsg);
+ org.w3c.dom.Document domDoc = getDOMWriter().write(preparedMsg);
+ result = getDOMReader().read(sendMessage(nameSpaceURI, serviceName,
+ new QName(nameSpaceURI,portType), operation, soapAction, wsdlDesc, domDoc));
+ result.getRootElement().setName("message");
+ } catch (DocumentException e) {
+ log.error("Could not send message to " + portType + " : " + preparedMsg.asXML(), e);
+ } catch (ServiceException e) {
+ log.error("Could not send message to " + portType + " : " + preparedMsg.asXML(), e);
+ } catch (RemoteException e) {
+ log.error("Could not send message to " + portType + " : " + preparedMsg.asXML(), e);
+ } catch (Exception e) {
+ log.error("Could not send message to " + portType + " : " + preparedMsg.asXML(), e);
+ }
+ log.debug("Received message : " + result.asXML());
+ return result;
+ }
+
+
+ protected Document syncSend(Activity sender, String partner, String namespace, String portType, String operation, Document message) {
+ QName qnPortType = new QName(namespace,portType);
+ AgilaProcess process = sender.fetchProcess();
+ org.w3c.dom.Document wsdlDoc = getWSDLDesc(process.getNamespace(), process.getName());
+ Definition def = readWSDL(wsdlDoc);
+
+ //String wsdlStringDoc = getDOMReader().read(wsdlDoc).asXML();
+
+ Object tempObj[]= null;
+ tempObj = getService(def, qnPortType);
+ QName serviceName = (QName)tempObj[0];
+ String wsdlStringDoc=null;// = new String();
+ def = (Definition)tempObj[1];
+ String nameSpaceURI = def.getTargetNamespace();
+ ByteArrayOutputStream byteOut = new ByteArrayOutputStream();
+ writeWSDL(def,byteOut);
+ wsdlStringDoc = byteOut.toString();
+
+ //getDOMReader().read(wsdlDoc).asXML();
+ String[] wsdlAdditionalInfo = getWSDLAdditionalInfo(def, qnPortType, operation);
+ String soapAction = wsdlAdditionalInfo[0];
+ String msgNS = null;
+ if (wsdlAdditionalInfo[1] != null && wsdlAdditionalInfo[1].length() > 0) {
+ msgNS = wsdlAdditionalInfo[1];
+ } else {
+ msgNS = nameSpaceURI;
+ }
+ String encoding = wsdlAdditionalInfo[2];
+ // System.out.println(wsdlStringDoc);
+ InputStream wsdlDesc = new ByteArrayInputStream(wsdlStringDoc.getBytes());
+ Document preparedMsg = prepareMessage(message, operation, msgNS, encoding);
+
+ Document result = null;
+ try {
+ new org.dom4j.io.XMLWriter(System.out).write(preparedMsg);
+ org.w3c.dom.Document domDoc = getDOMWriter().write(preparedMsg);
+ result = getDOMReader().read(sendMessage(nameSpaceURI, serviceName,
+ qnPortType, operation, soapAction, wsdlDesc, domDoc));
+ result.getRootElement().setName("message");
+ //result.getRootElement().setTargetNamespace(qnPortType.getNamespaceURI());
+ } catch (DocumentException e) {
+ log.error("Could not send message to " + qnPortType + " : " + preparedMsg.asXML(), e);
+ } catch (ServiceException e) {
+ log.error("Could not send message to " + qnPortType + " : " + preparedMsg.asXML(), e);
+ } catch (RemoteException e) {
+ log.error("Could not send message to " + qnPortType + " : " + preparedMsg.asXML(), e);
+ } catch (Exception e) {
+ log.error("Could not send message to " + portType + " : " + preparedMsg.asXML(), e);
+ }
+ log.debug("Received message : " + result.asXML());
+ return result;
+ }
+
+
+
+ private static org.dom4j.Document prepareMessage(Document doc, String elmtName,
+ String namespace, String encoding) {
+ org.dom4j.QName qName = null;
+ if ("literal".equals(encoding)) {
+ qName = org.dom4j.QName.get(elmtName, "", namespace);
+ } else {
+ qName = org.dom4j.QName.get(elmtName, "ns1", namespace);
+ }
+ Document resultDoc = DocumentHelper.createDocument();
+ org.dom4j.Element root = DocumentHelper.createElement(qName);
+ resultDoc.add(root);
+ List elmtList = doc.getRootElement().elements();
+ for (int m =0; m < elmtList.size(); m++) {
+ org.dom4j.Element detachedElmt = (org.dom4j.Element) ((org.dom4j.Element)elmtList.get(m)).detach();
+ root.add(detachedElmt);
+ }
+ return resultDoc;
+ }
+
+ private static Definition readWSDL(org.w3c.dom.Document wsdlDesc) {
+ WSDLFactory factory = null;
+ try {
+ factory = WSDLFactory.newInstance();
+ } catch (WSDLException e) {
+ throw new InvocationException("Could not find WSDL factory.", e);
+ }
+ WSDLReader reader = factory.newWSDLReader();
+ reader.setFeature("javax.wsdl.verbose", true);
+ reader.setFeature("javax.wsdl.importDocuments", true);
+
+ Definition def = null;
+ try {
+ def = reader.readWSDL(null, wsdlDesc);
+ } catch (WSDLException e) {
+ throw new InvocationException("Could not read a WSDL document.", e);
+ }
+ return def;
+ }
+
+ private static void writeWSDL(Definition def,OutputStream out)
+ {
+ WSDLFactory factory = null;
+ try {
+ factory = WSDLFactory.newInstance();
+ WSDLWriter writer = factory.newWSDLWriter();
+ writer.writeWSDL(def,out);
+ } catch (WSDLException e) {
+ throw new InvocationException("Could not find WSDL factory.", e);
+ }
+
+ }
+
+ private static org.w3c.dom.Document getWSDLDesc(String processNS, String processName) {
+ org.w3c.dom.Document document = null;
+ try {
+ document = XMLDataAccess.getDOMDocument("/process/def", ""+(processNS + processName).hashCode());
+ } catch (XMLSessionException e) {
+ throw new InvocationException(e);
+ } catch (FinderException e) {
+ throw new InvocationException(e);
+ }
+ return document;
+ }
+
+
+ /**
+ * gets wsdl info uses Qname porttype
+ * it goes inside imports, that's why there is QName
+ * @author milan chudik
+ */
+ private static String[] getWSDLAdditionalInfo(Definition def,QName portTypeName, String operationName)
+ {
+
+ Binding portBinding = findPortBinding(def,portTypeName,operationName);
+
+ if (portBinding == null) {
+
+ throw new InvocationException("Could not find a binding for port " +
+ portTypeName + " in service " + def.getTargetNamespace());
+
+ /*System.out.println("Could not find a binding for port "
+ + portTypeName + " in service " + def.getTargetNamespace());
+ System.exit(1);*/
+ }
+
+ String[] result = new String[3];
+ String soapAction = null;
+ BindingOperation bindingOperation = portBinding.getBindingOperation(
+ operationName, null, null);
+ List extensibilityElmts = bindingOperation.getExtensibilityElements();
+ for (int m = 0; m < extensibilityElmts.size(); m++) {
+ ExtensibilityElement extensibilityElement = (ExtensibilityElement) extensibilityElmts
+ .get(m);
+ if (SOAP_NS.equals(extensibilityElement.getElementType()
+ .getNamespaceURI())) {
+ soapAction = ((SOAPOperation) extensibilityElement)
+ .getSoapActionURI();
+ break;
+ }
+ }
+
+ String msgUrl = null;
+ String encoding = null;
+ List bindingInputExt = bindingOperation.getBindingInput()
+ .getExtensibilityElements();
+ for (int m = 0; m < bindingInputExt.size(); m++) {
+ ExtensibilityElement extensibilityElement = (ExtensibilityElement) bindingInputExt
+ .get(m);
+ extensibilityElement.getElementType().getNamespaceURI();
+ if (SOAP_NS.equals(extensibilityElement.getElementType()
+ .getNamespaceURI())) {
+ SOAPBody soapBodyExt = (SOAPBody) extensibilityElement;
+
+ msgUrl = soapBodyExt.getNamespaceURI();
+ encoding = soapBodyExt.getUse();
+ }
+
+ }
+
+ if (soapAction == null)
+ System.out.println("No SOAPAction could be found for service "
+ + def.getTargetNamespace() + " no SOAPAction element "
+ + "will be included in the message.");
+ result[0] = soapAction;
+ result[1] = msgUrl;
+ result[2] = encoding;
+
+ return result;
+
+ }
+
+ /**
+ * finds PortBinding in imports according to porttype and operation name
+ * @author milan chudik
+ */
+ private static Binding findPortBinding(Definition def,QName portTypeName,String operationName)
+ {
+ Binding retBind = null;
+ Map bindings = def.getBindings();
+ if (bindings != null) {
+ for (Iterator bindingsIter = bindings.keySet().iterator(); bindingsIter
+ .hasNext();) {
+ QName bindingName = (QName) bindingsIter.next();
+ if (bindingName.getNamespaceURI().equals(
+ portTypeName.getNamespaceURI())) {
+ Binding binding = (Binding) bindings.get(bindingName);
+ if (binding.getPortType().getQName().getLocalPart().equals(
+ portTypeName.getLocalPart())) {
+ if (binding.getBindingOperation(operationName, null,
+ null) == null) {
+
+ throw new InvocationException("Service " +
+ def.getTargetNamespace() + " with port " +
+ portTypeName + " doesn't have the operation " +
+ operationName + " declared in the port binding.");
+
+ }
+ retBind = binding;
+ break;
+ }
+ }
+ }
+ }
+
+ if(retBind==null)
+ {
+// and now go in imports
+ Map mapImports = def.getImports();
+ Definition tempDef = null;
+ if (mapImports != null && retBind == null) {
+ for (Iterator iter = mapImports.values().iterator(); iter.hasNext();) {
+ List element = (List) iter.next();
+ for (Iterator iterator = element.iterator(); iterator.hasNext();) {
+ Import elem = (Import) iterator.next();
+ if(elem.getDefinition().getTargetNamespace()!=null && elem.getDefinition().getTargetNamespace().equals(portTypeName.getNamespaceURI()))
+ {
+ retBind = findPortBinding(elem.getDefinition(), portTypeName,operationName);
+ if(retBind!=null) break;
+ }
+ }
+ }
+ }
+
+ }
+ return retBind;
+
+ }
+
+
+
+
+
+
+ private static String[] getWSDLAdditionalInfo(Definition def, String portTypeName, String operationName) {
+ Map bindings = def.getBindings();
+ Binding portBinding = null;
+ if (bindings != null) {
+ for (Iterator bindingsIter = bindings.keySet().iterator(); bindingsIter.hasNext();) {
+ QName bindingName = (QName) bindingsIter.next();
+ Binding binding = (Binding) bindings.get(bindingName);
+ if (binding.getPortType().getQName().getLocalPart().equals(portTypeName)) {
+ if (binding.getBindingOperation(operationName, null, null) == null) {
+ throw new InvocationException("Service " + def.getTargetNamespace() + " with port " +
+ portTypeName + " doesn't have the operation " + operationName +
+ " declared in the port binding.");
+ }
+ portBinding = binding;
+ break;
+ }
+ }
+ }
+ if (portBinding == null) {
+ throw new InvocationException("Could not find a binding for port " + portTypeName +
+ " in service " + def.getTargetNamespace());
+ }
+
+ String[] result = new String[3];
+ String soapAction = null;
+ BindingOperation bindingOperation = portBinding.getBindingOperation(operationName, null, null);
+ List extensibilityElmts = bindingOperation.getExtensibilityElements();
+ for (int m = 0; m < extensibilityElmts.size(); m++) {
+ ExtensibilityElement extensibilityElement = (ExtensibilityElement) extensibilityElmts.get(m);
+ if (SOAP_NS.equals(extensibilityElement.getElementType().getNamespaceURI())) {
+ soapAction = ((SOAPOperation)extensibilityElement).getSoapActionURI();
+ break;
+ }
+ }
+
+ String msgUrl = null;
+ String encoding = null;
+ List bindingInputExt = bindingOperation.getBindingInput().getExtensibilityElements();
+ for (int m = 0; m < bindingInputExt.size(); m++) {
+ ExtensibilityElement extensibilityElement = (ExtensibilityElement) bindingInputExt.get(m);
+ extensibilityElement.getElementType().getNamespaceURI();
+ if (SOAP_NS.equals(extensibilityElement.getElementType().getNamespaceURI())) {
+ SOAPBody soapBodyExt = (SOAPBody)extensibilityElement;
+
+ msgUrl = soapBodyExt.getNamespaceURI();
+ encoding = soapBodyExt.getUse();
+ }
+
+ }
+
+ if (soapAction == null)
+ log.info("No SOAPAction could be found for service " + def.getTargetNamespace() + " no SOAPAction element " +
+ "will be included in the message.");
+ result[0] = soapAction;
+ result[1] = msgUrl;
+ result[2] = encoding;
+
+ return result;
+ }
+
+
+
+ /**
+ * my search method for service, it uses QName to find sevice definition in
+ * imports etc
+ *
+ * @param def definition
+ * @param port port as QName
+ * @return array of object, 0. index is qname, 1. is definition for service
+ */
+ private static Object[] getService(Definition def, QName port) {
+
+ //first looks for service in root definition
+ Object retval[] = null;
+ QName retQName = null;
+ Map services = def.getServices();
+ if (services != null) {
+ for (Iterator serviceIter = services.keySet().iterator(); serviceIter
+ .hasNext();) {
+ QName serviceQName = (QName) serviceIter.next();
+ if (serviceQName.getNamespaceURI().equals(
+ port.getNamespaceURI())) {
+ Service service = (Service) services.get(serviceQName);
+ if (service.getPort(port.getLocalPart()) != null) {
+ retQName = service.getQName();
+ retval = new Object[2];
+ retval[0]=retQName;
+ retval[1]=def;
+ break;
+ }
+ }
+ }
+ }
+ // and now go in imports
+ Map mapImports = def.getImports();
+ Definition tempDef = null;
+ if (mapImports != null && retval == null) {
+ for (Iterator iter = mapImports.values().iterator(); iter.hasNext();) {
+ List element = (List) iter.next();
+ for (Iterator iterator = element.iterator(); iterator.hasNext();) {
+ Import elem = (Import) iterator.next();
+ if(elem.getDefinition().getTargetNamespace()!=null && elem.getDefinition().getTargetNamespace().equals(port.getNamespaceURI()))
+ {
+ retval = getService(elem.getDefinition(), port);
+ if(retval!=null) break;
+ }
+ }
+ }
+ }
+ if (retval == null)
+ {
+ throw new InvocationException("No service have been defined for portType " + port);
+ }
+
+ return retval;
+ }
+
+
+
+ private static QName getService(Definition def, String port) {
+ Map services = def.getServices();
+ for (Iterator serviceIter = services.keySet().iterator(); serviceIter.hasNext();) {
+ QName serviceQName = (QName) serviceIter.next();
+ Service service = (Service) services.get(serviceQName);
+ if (service.getPort(port) != null) {
+ return service.getQName();
+ }
+ }
+ throw new InvocationException("No service have been defined for portType " + port);
+ }
+
+ private static org.w3c.dom.Document sendMessage(String nameSpaceURI, QName serviceQName, QName portType,
+ String operation, String soapAction, InputStream wsdlDesc,
+ org.w3c.dom.Document sentDoc) throws ServiceException, RemoteException {
+ org.apache.axis.client.Service service = new org.apache.axis.client.Service(wsdlDesc, serviceQName);
+ QName portQName = portType;//new QName(nameSpaceURI, portType);
+ Call call = (Call)service.createCall(portQName, operation);
+ call.setProperty(Call.SOAPACTION_USE_PROPERTY, new Boolean(true));
+ if (soapAction != null) call.setProperty(Call.SOAPACTION_URI_PROPERTY, soapAction);
+
+ SOAPBodyElement[] input = new SOAPBodyElement[1];
+ input[0] = new SOAPBodyElement(sentDoc.getDocumentElement());
+ Vector results = (Vector) call.invoke(input);
+
+ org.w3c.dom.Document result = null;
+ try {
+ result = ((SOAPBodyElement) results.get(0)).getAsDocument();
+ } catch (Exception e) {
+ throw new ServiceException(e);
+ }
+ return result;
+ }
+
+ private DOMReader getDOMReader() {
+ if (domReader == null) {
+ domReader = new DOMReader();
+ }
+ return domReader;
+ }
+ private DOMWriter getDOMWriter() {
+ if (domWriter == null) {
+ domWriter = new DOMWriter();
+ }
+ return domWriter;
+ }
+}
Added: incubator/agila/trunk/modules/bpel/engine/core/src/java/org/apache/agila/bpel/engine/priv/messaging/impl/PickAsReceive.java
URL: http://svn.apache.org/viewcvs/incubator/agila/trunk/modules/bpel/engine/core/src/java/org/apache/agila/bpel/engine/priv/messaging/impl/PickAsReceive.java?rev=165042&view=auto
==============================================================================
--- incubator/agila/trunk/modules/bpel/engine/core/src/java/org/apache/agila/bpel/engine/priv/messaging/impl/PickAsReceive.java (added)
+++ incubator/agila/trunk/modules/bpel/engine/core/src/java/org/apache/agila/bpel/engine/priv/messaging/impl/PickAsReceive.java Wed Apr 27 13:27:43 2005
@@ -0,0 +1,177 @@
+/*
+ * Copyright 2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License")
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.agila.bpel.engine.priv.messaging.impl;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.agila.bpel.engine.exception.EngineException;
+import org.apache.agila.bpel.engine.priv.core.definition.CorrelationRef;
+import org.apache.agila.bpel.engine.priv.core.definition.MessageEvent;
+import org.apache.agila.bpel.engine.priv.core.definition.Pick;
+import org.apache.agila.bpel.engine.priv.core.definition.Receive;
+import org.apache.agila.bpel.engine.priv.core.definition.StructuredActivity;
+import org.apache.agila.bpel.engine.priv.core.definition.AgilaProcess;
+import org.apache.agila.bpel.engine.priv.core.dynamic.ExecutionContext;
+import org.apache.agila.bpel.engine.priv.core.dynamic.PickEC;
+
+/**
+ * This class is a wrapper around the MessageEvent of a Pick structured
+ * activity to use it just as using a Receive. This way our MessageController
+ * implementation just consider Receive activities and we don't have to worry
+ * about always differentiating the two cases : Receive and Pick MessageEvent.
+ * @see org.apache.agila.bpel.engine.priv.messaging.MessageController
+ * @see org.apache.agila.bpel.engine.priv.core.definition.Receive
+ * @see org.apache.agila.bpel.engine.priv.core.definition.Pick
+ * @see org.apache.agila.bpel.engine.priv.core.definition.MessageEvent
+ */
+public class PickAsReceive implements Receive {
+
+ private Pick pick;
+ private int eventPos;
+
+ public PickAsReceive(Pick pick, int eventPos) {
+ this.pick = pick;
+ this.eventPos = eventPos;
+ }
+
+ public Pick getPick() {
+ return pick;
+ }
+
+ public int getEventPos() {
+ return eventPos;
+ }
+
+ public String getPartner() {
+ return getMessageEvent(eventPos).getPartnerLink();
+ }
+
+ public String getPortType() {
+ return getMessageEvent(eventPos).getPortType();
+ }
+
+ public String getNamespace() {
+ return getMessageEvent(eventPos).getNamespace();
+ }
+
+ public String getOperation() {
+ return getMessageEvent(eventPos).getOperation();
+ }
+
+ public String getVariable() {
+ return getMessageEvent(eventPos).getVariable();
+ }
+
+ public boolean isCreateInstance() {
+ return pick.isCreateInstance();
+ }
+
+ public Collection getCorrelations() {
+ return getMessageEvent(eventPos).getCorrelations();
+ }
+
+ public String getName() {
+ return pick.getName();
+ }
+
+ public ExecutionContext execute(String correlationSetName, Map correlation) throws EngineException {
+ PickEC pickEC = (PickEC) pick.execute(correlationSetName, correlation);
+ return new PickECAsReceiveEC(pickEC, eventPos);
+ }
+
+ public StructuredActivity getContainer() {
+ return pick;
+ }
+
+ public AgilaProcess getProcess() {
+ return null;
+ }
+
+ public AgilaProcess fetchProcess() {
+ return pick.fetchProcess();
+ }
+
+ public void setPartner(String partner) {
+ // Useless for our purpose
+ throw new UnsupportedOperationException("This method shouldn't be used, this class is read only");
+ }
+ public void setPortType(String portType) {
+ // Useless for our purpose
+ throw new UnsupportedOperationException("This method shouldn't be used, this class is read only");
+ }
+
+ public void setNamespace(String namespace) {
+ // Useless for our purpose
+ throw new UnsupportedOperationException("This method shouldn't be used, this class is read only");
+ }
+
+ public void setOperation(String operation) {
+ // Useless for our purpose
+ throw new UnsupportedOperationException("This method shouldn't be used, this class is read only");
+ }
+ public void setVariable(String variable) {
+ // Useless for our purpose
+ throw new UnsupportedOperationException("This method shouldn't be used, this class is read only");
+ }
+ public void setCreateInstance(boolean createInstance) {
+ // Useless for our purpose
+ throw new UnsupportedOperationException("This method shouldn't be used, this class is read only");
+ }
+ public void addCorrelation(CorrelationRef correlationRef) {
+ // Useless for our purpose
+ throw new UnsupportedOperationException("This method shouldn't be used, this class is read only");
+ }
+ public void setName(String name) {
+ // Useless for our purpose
+ throw new UnsupportedOperationException("This method shouldn't be used, this class is read only");
+ }
+ public String getJoinCondition() {
+ // Useless for our purpose
+ throw new UnsupportedOperationException("This method shouldn't be used, this class is read only");
+ }
+ public void setJoinCondition(String expr) {
+ // Useless for our purpose
+ throw new UnsupportedOperationException("This method shouldn't be used, this class is read only");
+ }
+ public Set getSourceLinks() {
+ // Useless for our purpose
+ throw new UnsupportedOperationException("This method shouldn't be used, this class is read only");
+ }
+ public void setSourceLinks(Set sources) {
+ // Useless for our purpose
+ throw new UnsupportedOperationException("This method shouldn't be used, this class is read only");
+ }
+ public Set getTargetLinks() {
+ // Useless for our purpose
+ throw new UnsupportedOperationException("This method shouldn't be used, this class is read only");
+ }
+ public void setTargetLinks(Set targets) {
+ // Useless for our purpose
+ throw new UnsupportedOperationException("This method shouldn't be used, this class is read only");
+ }
+
+ private MessageEvent getMessageEvent(int pos) {
+ return (MessageEvent) pick.getMessageEvents().get(pos);
+ }
+
+ public void setReply(boolean reply) { }
+
+ public boolean isReply() {
+ return false;
+ }
+}
Added: incubator/agila/trunk/modules/bpel/engine/core/src/java/org/apache/agila/bpel/engine/priv/messaging/impl/PickECAsReceiveEC.java
URL: http://svn.apache.org/viewcvs/incubator/agila/trunk/modules/bpel/engine/core/src/java/org/apache/agila/bpel/engine/priv/messaging/impl/PickECAsReceiveEC.java?rev=165042&view=auto
==============================================================================
--- incubator/agila/trunk/modules/bpel/engine/core/src/java/org/apache/agila/bpel/engine/priv/messaging/impl/PickECAsReceiveEC.java (added)
+++ incubator/agila/trunk/modules/bpel/engine/core/src/java/org/apache/agila/bpel/engine/priv/messaging/impl/PickECAsReceiveEC.java Wed Apr 27 13:27:43 2005
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License")
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.agila.bpel.engine.priv.messaging.impl;
+
+import java.util.Collection;
+
+import org.dom4j.Document;
+import org.apache.agila.bpel.engine.priv.core.definition.Activity;
+import org.apache.agila.bpel.engine.priv.core.dynamic.PickEC;
+import org.apache.agila.bpel.engine.priv.core.dynamic.ProcessInstance;
+import org.apache.agila.bpel.engine.priv.core.dynamic.ReceiveEC;
+import org.apache.agila.bpel.engine.priv.core.dynamic.StructuredEC;
+import org.apache.agila.bpel.engine.common.monitoring.AuditEntry;
+
+/**
+ * Wrapper around a Pick execution context to have it behave like a Receive
+ * execution context.
+ * @see PickAsReceive
+ * @see org.apache.agila.bpel.engine.priv.core.dynamic.PickEC
+ * @see org.apache.agila.bpel.engine.priv.core.dynamic.ReceiveEC
+ */
+public class PickECAsReceiveEC implements ReceiveEC {
+
+ private PickEC pickEC;
+ private int eventPos;
+
+ public PickECAsReceiveEC(PickEC pickEC, int eventPos) {
+ this.pickEC = pickEC;
+ this.eventPos = eventPos;
+ }
+
+ public Long getId() {
+ return pickEC.getId();
+ }
+
+ public StructuredEC getContainer() {
+ return pickEC;
+ }
+
+ public ProcessInstance getInstance() {
+ return pickEC.getInstance();
+ }
+
+ public ProcessInstance fetchInstance() {
+ return pickEC.fetchInstance();
+ }
+
+ public int getStatus() {
+ return pickEC.getStatus();
+ }
+
+ public void execute() {
+ pickEC.execute();
+ }
+
+ public AuditEntry audit() throws Exception {
+ return pickEC.audit();
+ }
+
+ public Document acknowledgeMessage(Document message) {
+ Activity underlyingActivity = (Activity) pickEC.getActivity().getActivities().get(eventPos);
+ pickEC.acknowledgeMessage(underlyingActivity, message);
+ return null;
+ }
+
+ public void setStatus(int status) {
+ // Useless for our purpose
+ throw new UnsupportedOperationException("This method shouldn't be used, this class is just a wrapper");
+ }
+
+ public Collection getEvents() {
+ // Useless for our purpose
+ throw new UnsupportedOperationException("This method shouldn't be used, this class is just a wrapper");
+ }
+
+ public int getEventPos() {
+ return eventPos;
+ }
+
+ public void setEventPos(int eventPos) {
+ this.eventPos = eventPos;
+ }
+
+ public Activity getInitialActivity() {
+ return null;
+ }
+
+}