You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tuscany.apache.org by rf...@apache.org on 2007/09/13 00:09:21 UTC
svn commit: r575101 [2/2] - in /incubator/tuscany/java/sca:
itest/conversations/src/main/java/org/apache/tuscany/sca/itest/conversational/impl/
modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/
modules/core/src/main/java/org/apache/tusca...
Modified: incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/scope/ConversationalScopeContainer.java
URL: http://svn.apache.org/viewvc/incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/scope/ConversationalScopeContainer.java?rev=575101&r1=575100&r2=575101&view=diff
==============================================================================
--- incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/scope/ConversationalScopeContainer.java (original)
+++ incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/scope/ConversationalScopeContainer.java Wed Sep 12 15:09:19 2007
@@ -17,11 +17,9 @@
* under the License.
*/
-
package org.apache.tuscany.sca.core.scope;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -31,82 +29,85 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import org.apache.tuscany.sca.core.context.ConversationImpl;
import org.apache.tuscany.sca.core.context.InstanceWrapper;
+import org.apache.tuscany.sca.core.conversation.ConversationListener;
+import org.apache.tuscany.sca.core.conversation.ConversationManager;
+import org.apache.tuscany.sca.core.conversation.ExtendedConversation;
import org.apache.tuscany.sca.core.invocation.ThreadMessageContext;
import org.apache.tuscany.sca.invocation.Message;
import org.apache.tuscany.sca.runtime.RuntimeComponent;
import org.apache.tuscany.sca.store.Store;
-import org.osoa.sca.Conversation;
import org.osoa.sca.ConversationEndedException;
/**
* A scope context which manages atomic component instances keyed on ConversationID
*
*/
-public class ConversationalScopeContainer extends AbstractScopeContainer<Object> {
-
- private Map<Object, InstanceLifeCycleWrapper> instanceLifecycleCollection = new ConcurrentHashMap<Object, InstanceLifeCycleWrapper>();
-
+public class ConversationalScopeContainer extends AbstractScopeContainer<Object> implements ConversationListener {
+ private ConversationManager conversationManager;
+ private Map<Object, InstanceLifeCycleWrapper> instanceLifecycleCollection =
+ new ConcurrentHashMap<Object, InstanceLifeCycleWrapper>();
+
//TODO: This needs to observe the value set by ConversationalAttributes for now we will hard code it.
- private long max_age = 60 * 60 * 1000; // 1 hour;
- private long max_idle_time = 60 * 60 * 1000; // 1 hour;
- private long reaper_interval = 60; // every minute;
+ private long max_age = 60 * 60 * 1000; // 1 hour;
+ private long max_idle_time = 60 * 60 * 1000; // 1 hour;
+ private long reaper_interval = 60; // every minute;
private ScheduledExecutorService scheduler;
-
+
public ConversationalScopeContainer(Store aStore, RuntimeComponent component) {
- super(Scope.CONVERSATION, component);
-
+ super(Scope.CONVERSATION, component);
+
// Note: aStore is here to preserve the original factory interface. It is not currently used in this
// implementation since we do not support instance persistence.
-
+
// Check System properties to see if timeout values have been specified. All timeout values
// will be specified in seconds.
//
- String aProperty;
+ String aProperty;
aProperty = System.getProperty("org.apache.tuscany.sca.core.scope.ConversationalScopeContainer.MaxIdleTime");
- if (aProperty != null)
- try
- {
- max_idle_time = (new Long(aProperty) * 1000);
- }
- catch (NumberFormatException nfe) {};
-
+ if (aProperty != null) {
+ try {
+ max_idle_time = (new Long(aProperty) * 1000);
+ } catch (NumberFormatException nfe) {
+ // Ignore
+ }
+ }
+
aProperty = System.getProperty("org.apache.tuscany.sca.core.scope.ConversationalScopeContainer.MaxAge");
- if (aProperty != null)
- try
- {
- max_age = (new Long(aProperty) * 1000);
- }
- catch (NumberFormatException nfe) {};
-
+ if (aProperty != null) {
+ try {
+ max_age = (new Long(aProperty) * 1000);
+ } catch (NumberFormatException nfe) {
+ // Ignore
+ }
+ }
+
aProperty = System.getProperty("org.apache.tuscany.sca.core.scope.ConversationalScopeContainer.ReaperInterval");
- if (aProperty != null)
- try
- {
- reaper_interval = new Long(aProperty);
- }
- catch (NumberFormatException nfe) {};
-
-
+ if (aProperty != null) {
+ try {
+ reaper_interval = new Long(aProperty);
+ } catch (NumberFormatException nfe) {
+ // Ignore
+ }
+ }
+
// Check to see if the maxAge and/or maxIdleTime have been specified using @ConversationAttributes.
// Implementation annoated attributes are honored first.
- if (this.getComponent().getImplementationProvider() instanceof ScopedImplementationProvider)
- {
- ScopedImplementationProvider aScopedImpl = (ScopedImplementationProvider) this.getComponent().getImplementationProvider();
-
+ if (this.getComponent().getImplementationProvider() instanceof ScopedImplementationProvider) {
+ ScopedImplementationProvider aScopedImpl =
+ (ScopedImplementationProvider)this.getComponent().getImplementationProvider();
+
long maxAge = aScopedImpl.getMaxAge();
if (maxAge > 0) {
max_age = maxAge;
}
long maxIdleTime = aScopedImpl.getMaxIdleTime();
- if (maxIdleTime > 0 ) {
+ if (maxIdleTime > 0) {
max_idle_time = maxIdleTime;
}
- }
+ }
- }
-
+ }
@Override
public synchronized void start() {
@@ -116,65 +117,66 @@
// Get a scheduler and scheduled a task to be run in the future indefinitely until its explicitly shutdown.
this.scheduler = Executors.newSingleThreadScheduledExecutor();
- scheduler.scheduleAtFixedRate(new ConversationalInstanceReaper(this.instanceLifecycleCollection), 3, reaper_interval, TimeUnit.SECONDS);
-
- lifecycleState = RUNNING;
- }
+ scheduler.scheduleAtFixedRate(new ConversationalInstanceReaper(this.instanceLifecycleCollection),
+ 3,
+ reaper_interval,
+ TimeUnit.SECONDS);
+
+ lifecycleState = RUNNING;
+ }
@Override
public synchronized void stop() {
-
+
// Prevent the scheduler from submitting any additional reapers, initiate an orderly shutdown if a reaper task is in progress.
- if (this.scheduler != null)
- this.scheduler.shutdown();
-
+ if (this.scheduler != null)
+ this.scheduler.shutdown();
+
lifecycleState = STOPPED;
}
- protected InstanceWrapper getInstanceWrapper(boolean create,Object contextId) throws TargetResolutionException {
-
+ protected InstanceWrapper getInstanceWrapper(boolean create, Object contextId) throws TargetResolutionException {
+
// we might get a null context if the target service has
// conversational scope but only its callback interface
// is conversational. In this case we need to invent a
// conversation Id here to store the service against
// and populate the thread context
- if (contextId == null){
+ if (contextId == null) {
contextId = UUID.randomUUID().toString();
Message msgContext = ThreadMessageContext.getMessageContext();
-
- if (msgContext != null){
+
+ if (msgContext != null) {
msgContext.getTo().getReferenceParameters().setConversationID(contextId);
}
}
-
+
InstanceLifeCycleWrapper anInstanceWrapper = this.instanceLifecycleCollection.get(contextId);
-
+
if (anInstanceWrapper == null && !create)
- return null;
-
- if (anInstanceWrapper == null)
- {
- anInstanceWrapper = new InstanceLifeCycleWrapper(contextId);
+ return null;
+
+ if (anInstanceWrapper == null) {
+ anInstanceWrapper = new InstanceLifeCycleWrapper(contextId);
this.instanceLifecycleCollection.put(contextId, anInstanceWrapper);
}
// If an existing instance is found return it only if its not expired and update its
// last referenced time.
- else
- {
- if (anInstanceWrapper.isExpired())
- throw new ConversationEndedException();
- anInstanceWrapper.updateLastReferencedTime();
- }
-
- return anInstanceWrapper.getInstanceWrapper(contextId);
-
+ else {
+ if (anInstanceWrapper.isExpired())
+ throw new ConversationEndedException();
+ anInstanceWrapper.updateLastReferencedTime();
+ }
+
+ return anInstanceWrapper.getInstanceWrapper(contextId);
+
}
-
+
@Override
public InstanceWrapper getWrapper(Object contextId) throws TargetResolutionException {
- return getInstanceWrapper(true,contextId);
- }
-
+ return getInstanceWrapper(true, contextId);
+ }
+
/**
* This method allows a new context id to be registered alongside an existing one. This happens in
* one case, when a conversation includes a stateful callback. The client component instance
@@ -186,20 +188,18 @@
* and reset when the component instance is removed
*/
public void addWrapperReference(Object existingContextId, Object contextId) throws TargetResolutionException {
- Conversation conversation = (Conversation)contextId;
-
// get the instance wrapper via the existing id
InstanceLifeCycleWrapper existingInstanceWrapper = this.instanceLifecycleCollection.get(existingContextId);
- InstanceLifeCycleWrapper newInstanceWrapper = this.instanceLifecycleCollection.get(conversation.getConversationID());
-
+ InstanceLifeCycleWrapper newInstanceWrapper = this.instanceLifecycleCollection.get(contextId);
+
// only add the extra reference once
if (newInstanceWrapper == null) {
// add the id to the list of ids that the wrapper holds. Used for reference
// counting and conversation resetting on destruction.
- existingInstanceWrapper.addCallbackConversation(conversation);
-
+ existingInstanceWrapper.addCallbackConversation(contextId);
+
// add the reference to the collection
- this.instanceLifecycleCollection.put(conversation.getConversationID(), existingInstanceWrapper);
+ this.instanceLifecycleCollection.put(contextId, existingInstanceWrapper);
}
}
@@ -214,187 +214,192 @@
}
}
- anInstanceWrapper = new InstanceLifeCycleWrapper(wrapper, contextId);
+ anInstanceWrapper = new InstanceLifeCycleWrapper(wrapper, contextId);
this.instanceLifecycleCollection.put(contextId, anInstanceWrapper);
}
-
-
+
// The remove is invoked when a conversation is explicitly ended. This can occur by using the @EndsConversation or API.
// In this case the instance is immediately removed. A new conversation will be started on the next operation
// associated with this conversationId's service reference.
//
@Override
public void remove(Object contextId) throws TargetDestructionException {
- if (contextId != null){
- if (this.instanceLifecycleCollection.containsKey(contextId))
- {
- InstanceLifeCycleWrapper anInstanceLifeCycleWrapper = this.instanceLifecycleCollection.get(contextId);
- this.instanceLifecycleCollection.remove(contextId);
- anInstanceLifeCycleWrapper.removeInstanceWrapper(contextId);
- }
- }
- }
-
-
+ if (contextId != null) {
+ if (this.instanceLifecycleCollection.containsKey(contextId)) {
+ InstanceLifeCycleWrapper anInstanceLifeCycleWrapper = this.instanceLifecycleCollection.get(contextId);
+ this.instanceLifecycleCollection.remove(contextId);
+ anInstanceLifeCycleWrapper.removeInstanceWrapper(contextId);
+ }
+ }
+ }
+
/*
* This is an inner class that keeps track of the lifecycle of a conversation scoped
* implementation instance.
*
*/
-
- private class InstanceLifeCycleWrapper
- {
+
+ private class InstanceLifeCycleWrapper {
private Object clientConversationId;
- private List<ConversationImpl> callbackConversations = new ArrayList<ConversationImpl>();
- private long creationTime;
- private long lastReferencedTime;
- private long expirationInterval;
- private long maxIdleTime;
- private Conversation conversation;
-
- private InstanceLifeCycleWrapper(Object contextId) throws TargetResolutionException
- {
- this.clientConversationId = contextId;
- this.creationTime = System.currentTimeMillis();
- this.lastReferencedTime = this.creationTime;
- this.expirationInterval = max_age;
- this.maxIdleTime = max_idle_time;
- this.createInstance(contextId);
- }
-
- private InstanceLifeCycleWrapper(InstanceWrapper wrapper, Object contextId) throws TargetResolutionException
- {
- this.clientConversationId = contextId;
- this.creationTime = System.currentTimeMillis();
- this.lastReferencedTime = this.creationTime;
- this.expirationInterval = max_age;
- this.maxIdleTime = max_idle_time;
- wrappers.put(contextId, wrapper);
- }
-
- private boolean isExpired()
- {
- long currentTime = System.currentTimeMillis();
- if ((this.lastReferencedTime + this.maxIdleTime) < currentTime) // max idle time exceeded
- return true;
- if ((this.creationTime + this.expirationInterval) < currentTime) // max time to live exceeded
- return true;
-
- return false;
- }
-
- private void updateLastReferencedTime()
- {
- this.lastReferencedTime = System.currentTimeMillis();
+ private List<Object> callbackConversations = new ArrayList<Object>();
+ private long creationTime;
+ private long lastReferencedTime;
+ private long expirationInterval;
+ private long maxIdleTime;
+
+ private InstanceLifeCycleWrapper(Object contextId) throws TargetResolutionException {
+ this.clientConversationId = contextId;
+ this.creationTime = System.currentTimeMillis();
+ this.lastReferencedTime = this.creationTime;
+ this.expirationInterval = max_age;
+ this.maxIdleTime = max_idle_time;
+ this.createInstance(contextId);
+ }
+
+ private InstanceLifeCycleWrapper(InstanceWrapper wrapper, Object contextId) throws TargetResolutionException {
+ this.clientConversationId = contextId;
+ this.creationTime = System.currentTimeMillis();
+ this.lastReferencedTime = this.creationTime;
+ this.expirationInterval = max_age;
+ this.maxIdleTime = max_idle_time;
+ wrappers.put(contextId, wrapper);
+ }
+
+ private boolean isExpired() {
+ long currentTime = System.currentTimeMillis();
+ if ((this.lastReferencedTime + this.maxIdleTime) < currentTime) // max idle time exceeded
+ return true;
+ if ((this.creationTime + this.expirationInterval) < currentTime) // max time to live exceeded
+ return true;
+
+ return false;
}
-
+
+ private void updateLastReferencedTime() {
+ this.lastReferencedTime = System.currentTimeMillis();
+ }
+
// Associates a callback conversation with this instance. Each time the scope container
// is asked to remove an object given a ontextId an associated conversation object will
// have its conversationId reset to null. When the list of ids is empty the component instance
// will be removed from the scope container
- private void addCallbackConversation(Conversation conversation){
- InstanceWrapper ctx = getInstanceWrapper(clientConversationId);
- callbackConversations.add((ConversationImpl)conversation);
- wrappers.put(conversation.getConversationID(), ctx);
+ private void addCallbackConversation(Object conversationID) {
+ InstanceWrapper ctx = getInstanceWrapper(clientConversationId);
+ callbackConversations.add(conversationID);
+ wrappers.put(conversationID, ctx);
}
-
+
//
// Return the backing implementation instance
//
- private InstanceWrapper getInstanceWrapper(Object contextId)
- {
- InstanceWrapper ctx = wrappers.get(contextId);
- return ctx;
- }
-
- private void removeInstanceWrapper(Object contextId) throws TargetDestructionException
- {
- InstanceWrapper ctx = getInstanceWrapper(contextId);
- wrappers.remove(contextId);
-
- // find out if we are dealing with the original client conversation id
- // and reset accordingly
- if (clientConversationId.equals(contextId)){
- clientConversationId = null;
- } else {
- // reset the conversationId in the conversation object if present
- // so that and ending callback causes the conversation in the originating
- // service reference in the client to be reset
- ConversationImpl conversation = null;
-
- for (ConversationImpl loopConversation : callbackConversations) {
- if (loopConversation.getConversationID().equals(contextId)) {
- conversation = loopConversation;
- }
- }
- if(conversation != null){
- conversation.setConversationID(null);
- callbackConversations.remove(conversation);
- }
- }
-
-
- // stop the component if this removes the last reference
- if (clientConversationId == null &&
- callbackConversations.isEmpty()) {
- ctx.stop();
- if (conversation != null) {
- ((ConversationImpl)conversation).setConversationID(null);
- }
- }
- }
-
- private void createInstance(Object contextId) throws TargetResolutionException
- {
+ private InstanceWrapper getInstanceWrapper(Object contextId) {
+ InstanceWrapper ctx = wrappers.get(contextId);
+ return ctx;
+ }
+
+ private void removeInstanceWrapper(Object contextId) throws TargetDestructionException {
+ InstanceWrapper ctx = getInstanceWrapper(contextId);
+ wrappers.remove(contextId);
+
+ // find out if we are dealing with the original client conversation id
+ // and reset accordingly
+ if (clientConversationId.equals(contextId)) {
+ clientConversationId = null;
+ } else {
+ // reset the conversationId in the conversation object if present
+ // so that and ending callback causes the conversation in the originating
+ // service reference in the client to be reset
+ callbackConversations.remove(contextId);
+ }
+
+ // stop the component if this removes the last reference
+ if (clientConversationId == null && callbackConversations.isEmpty()) {
+ ctx.stop();
+ }
+ }
+
+ private void createInstance(Object contextId) throws TargetResolutionException {
InstanceWrapper instanceWrapper = createInstanceWrapper();
instanceWrapper.start();
- wrappers.put(contextId, instanceWrapper);
+ wrappers.put(contextId, instanceWrapper);
}
-
+
}
-
+
//
// This inner class is an instance reaper. It periodically iterates over the InstanceLifeCycleCollection
// and for any instances that have expired removes the backing instance and the entry in the InstanceLifeCycle
// Collection.
//
- class ConversationalInstanceReaper implements Runnable
- {
+ class ConversationalInstanceReaper implements Runnable {
private Map<Object, InstanceLifeCycleWrapper> instanceLifecycleCollection;
-
- public ConversationalInstanceReaper(Map<Object, InstanceLifeCycleWrapper> aMap)
- {
+
+ public ConversationalInstanceReaper(Map<Object, InstanceLifeCycleWrapper> aMap) {
this.instanceLifecycleCollection = aMap;
}
-
- public void run()
- {
- Iterator<Map.Entry<Object,InstanceLifeCycleWrapper>> anIterator = this.instanceLifecycleCollection.entrySet().iterator();
-
- while (anIterator.hasNext())
- {
- Map.Entry<Object,InstanceLifeCycleWrapper> anEntry = anIterator.next();
+
+ public void run() {
+ Iterator<Map.Entry<Object, InstanceLifeCycleWrapper>> anIterator =
+ this.instanceLifecycleCollection.entrySet().iterator();
+
+ while (anIterator.hasNext()) {
+ Map.Entry<Object, InstanceLifeCycleWrapper> anEntry = anIterator.next();
InstanceLifeCycleWrapper anInstanceLifeCycleWrapper = anEntry.getValue();
- if (anInstanceLifeCycleWrapper.isExpired())
- {
+ if (anInstanceLifeCycleWrapper.isExpired()) {
try {
// cycle through all the references to this instance and
// remove them from the underlying wrappers collection and
// from the lifecycle wrappers collection
- for (ConversationImpl conversation : anInstanceLifeCycleWrapper.callbackConversations) {
- anInstanceLifeCycleWrapper.removeInstanceWrapper(conversation.getConversationID());
- this.instanceLifecycleCollection.remove(conversation.getConversationID());
+ for (Object conversationID : anInstanceLifeCycleWrapper.callbackConversations) {
+ anInstanceLifeCycleWrapper.removeInstanceWrapper(conversationID);
+ this.instanceLifecycleCollection.remove(conversationID);
}
-
- if (anInstanceLifeCycleWrapper.clientConversationId != null){
- anInstanceLifeCycleWrapper.removeInstanceWrapper(anInstanceLifeCycleWrapper.clientConversationId);
+
+ if (anInstanceLifeCycleWrapper.clientConversationId != null) {
+ anInstanceLifeCycleWrapper
+ .removeInstanceWrapper(anInstanceLifeCycleWrapper.clientConversationId);
this.instanceLifecycleCollection.remove(anInstanceLifeCycleWrapper.clientConversationId);
}
} catch (Exception ex) {
- // TODO - what to do with any asynchronous exceptions?
+ // TODO - what to do with any asynchronous exceptions?
}
}
- }
+ }
}
}
+
+ /**
+ * @see org.apache.tuscany.sca.core.conversation.ConversationListener#conversationEnded(org.apache.tuscany.sca.core.conversation.ExtendedConversation)
+ */
+ public void conversationEnded(ExtendedConversation conversation) {
+ stopContext(conversation.getConversationID());
+ }
+
+ /**
+ * @see org.apache.tuscany.sca.core.conversation.ConversationListener#conversationExpired(org.apache.tuscany.sca.core.conversation.ExtendedConversation)
+ */
+ public void conversationExpired(ExtendedConversation conversation) {
+ }
+
+ /**
+ * @see org.apache.tuscany.sca.core.conversation.ConversationListener#conversationStarted(org.apache.tuscany.sca.core.conversation.ExtendedConversation)
+ */
+ public void conversationStarted(ExtendedConversation conversation) {
+ startContext(conversation.getConversationID());
+ }
+
+ /**
+ * @return the conversationManager
+ */
+ public ConversationManager getConversationManager() {
+ return conversationManager;
+ }
+
+ /**
+ * @param conversationManager the conversationManager to set
+ */
+ public void setConversationManager(ConversationManager conversationManager) {
+ this.conversationManager = conversationManager;
+ }
+
}
Modified: incubator/tuscany/java/sca/modules/host-embedded/src/main/java/org/apache/tuscany/sca/host/embedded/impl/ReallySmallRuntimeBuilder.java
URL: http://svn.apache.org/viewvc/incubator/tuscany/java/sca/modules/host-embedded/src/main/java/org/apache/tuscany/sca/host/embedded/impl/ReallySmallRuntimeBuilder.java?rev=575101&r1=575100&r2=575101&view=diff
==============================================================================
--- incubator/tuscany/java/sca/modules/host-embedded/src/main/java/org/apache/tuscany/sca/host/embedded/impl/ReallySmallRuntimeBuilder.java (original)
+++ incubator/tuscany/java/sca/modules/host-embedded/src/main/java/org/apache/tuscany/sca/host/embedded/impl/ReallySmallRuntimeBuilder.java Wed Sep 12 15:09:19 2007
@@ -61,6 +61,8 @@
import org.apache.tuscany.sca.core.assembly.ActivationException;
import org.apache.tuscany.sca.core.assembly.CompositeActivator;
import org.apache.tuscany.sca.core.assembly.CompositeActivatorImpl;
+import org.apache.tuscany.sca.core.conversation.ConversationManager;
+import org.apache.tuscany.sca.core.conversation.ConversationManagerImpl;
import org.apache.tuscany.sca.core.invocation.DefaultProxyFactoryExtensionPoint;
import org.apache.tuscany.sca.core.invocation.ExtensibleWireProcessor;
import org.apache.tuscany.sca.core.invocation.ProxyFactory;
@@ -127,11 +129,14 @@
RequestContextFactory requestContextFactory =
registry.getExtensionPoint(ContextFactoryExtensionPoint.class).getFactory(RequestContextFactory.class);
+ ConversationManager conversationManager = new ConversationManagerImpl();
+ registry.addExtensionPoint(conversationManager);
+
// Create the composite activator
CompositeActivator compositeActivator =
new CompositeActivatorImpl(assemblyFactory, messageFactory, javaInterfaceFactory, scaBindingFactory,
mapper, scopeRegistry, workScheduler, wireProcessor, requestContextFactory,
- proxyFactory, providerFactories, processors);
+ proxyFactory, providerFactories, processors, conversationManager);
return compositeActivator;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: tuscany-commits-unsubscribe@ws.apache.org
For additional commands, e-mail: tuscany-commits-help@ws.apache.org