You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tuscany.apache.org by an...@apache.org on 2007/07/20 17:54:23 UTC

svn commit: r558025 - in /incubator/tuscany/java/sca/modules: core/src/main/java/org/apache/tuscany/sca/core/scope/ConversationalScopeContainer.java host-embedded/src/main/java/org/apache/tuscany/sca/host/embedded/impl/ReallySmallRuntimeBuilder.java

Author: antelder
Date: Fri Jul 20 08:54:21 2007
New Revision: 558025

URL: http://svn.apache.org/viewvc?view=rev&rev=558025
Log:
TUSCANY-1456, Apply Lou Amodeo's contribution of a ConversationalScopeContainer

Modified:
    incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/scope/ConversationalScopeContainer.java
    incubator/tuscany/java/sca/modules/host-embedded/src/main/java/org/apache/tuscany/sca/host/embedded/impl/ReallySmallRuntimeBuilder.java

Modified: incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/scope/ConversationalScopeContainer.java
URL: http://svn.apache.org/viewvc/incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/scope/ConversationalScopeContainer.java?view=diff&rev=558025&r1=558024&r2=558025
==============================================================================
--- incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/scope/ConversationalScopeContainer.java (original)
+++ incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/scope/ConversationalScopeContainer.java Fri Jul 20 08:54:21 2007
@@ -16,167 +16,255 @@
  * specific language governing permissions and limitations
  * under the License.    
  */
+
+
 package org.apache.tuscany.sca.core.scope;
 
-import org.apache.tuscany.sca.core.invocation.ThreadMessageContext;
-import org.apache.tuscany.sca.event.Event;
-import org.apache.tuscany.sca.event.RuntimeEventListener;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.tuscany.sca.runtime.RuntimeComponent;
 import org.apache.tuscany.sca.scope.InstanceWrapper;
 import org.apache.tuscany.sca.scope.PersistenceException;
 import org.apache.tuscany.sca.scope.Scope;
-import org.apache.tuscany.sca.scope.TargetDestructionException;
+import org.apache.tuscany.sca.scope.ScopedImplementationProvider;
 import org.apache.tuscany.sca.scope.TargetResolutionException;
 import org.apache.tuscany.sca.store.Store;
-import org.apache.tuscany.sca.store.StoreExpirationEvent;
-import org.apache.tuscany.sca.store.StoreReadException;
-import org.apache.tuscany.sca.store.StoreWriteException;
+import org.osoa.sca.ConversationEndedException;
 
 /**
- * A scope context which manages atomic component instances keyed on a
- * conversation session
- * 
- * @version $Rev: 452655 $ $Date: 2006-10-03 18:09:02 -0400 (Tue, 03 Oct 2006) $
+ * A scope context which manages atomic component instances keyed on ConversationID
+ *  
  */
-public class ConversationalScopeContainer<KEY> extends AbstractScopeContainer<KEY> {
-    private final Store nonDurableStore;
-
-    public ConversationalScopeContainer(Store store, RuntimeComponent component) {
-        super(Scope.CONVERSATION, component);
-        this.nonDurableStore = store;
-        if (store != null) {
-            store.addListener(new ExpirationListener());
-        }
-    }
+public class ConversationalScopeContainer extends AbstractScopeContainer<Object> {      
+        
+    private Map<Object, InstanceLifeCycleWrapper> instanceLifecycleCollection = new ConcurrentHashMap<Object, InstanceLifeCycleWrapper>();
+    
+    //TODO: This needs to observe the value set by ConversationalAttributes for now we will hard code it. 
+    private long max_age = 60 * 60 *  1000;  // 1 hour;
+    private long max_idle_time = 60 * 60 *  1000;  // 1 hour;
+    private long reaper_interval = 60;  // every minute; 
+    private ScheduledExecutorService scheduler;
+    
+    public ConversationalScopeContainer(Store aStore, RuntimeComponent component) {
+        super(Scope.CONVERSATION, component); 
+        
+        // Note: aStore is here to preserve the original factory interface. It is not currently used in this 
+        // implemenation since we do not support instance persistence.
+        
+        // Check System properties to see if timeout values have been specified. All timeout values 
+        // will be specifed in seconds.
+        //
+        String aProperty; 
+        aProperty = System.getProperty("org.apache.tuscany.sca.core.scope.ConversationalScopeContainer.MaxIdleTime");
+        if (aProperty != null)
+          try 
+           {
+                max_idle_time = (new Long(aProperty) * 1000);   
+           }
+           catch (NumberFormatException nfe) {};
+           
+        aProperty = System.getProperty("org.apache.tuscany.sca.core.scope.ConversationalScopeContainer.MaxAge");
+        if (aProperty != null)
+            try 
+             {
+                  max_age = (new Long(aProperty) * 1000);   
+             }
+             catch (NumberFormatException nfe) {};
+             
+        aProperty = System.getProperty("org.apache.tuscany.sca.core.scope.ConversationalScopeContainer.ReaperInterval");
+        if (aProperty != null)
+            try 
+             {
+                  reaper_interval = new Long(aProperty);   
+             }
+             catch (NumberFormatException nfe) {};
+           
+             
+        // Check to see if the maxAge and/or maxIdleTime have been specified using @ConversationAttributes.  
+        // Implementation annoated attributes are honored first.
+        if (this.getComponent().getImplementation() instanceof ScopedImplementationProvider) 
+         {
+            ScopedImplementationProvider aScopedImpl = (ScopedImplementationProvider) this.getComponent().getImplementation();
+            
+            long maxAge = aScopedImpl.getMaxAge();
+            if (maxAge > 0) {
+                max_age = maxAge;
+            }
+            long maxIdleTime = aScopedImpl.getMaxIdleTime();
+            if (maxIdleTime > 0 ) {
+                max_idle_time = maxIdleTime;
+            }
+         }                      
 
-    public void onEvent(Event event) {
-        checkInit();
-    }
+    }    
+    
 
     public synchronized void start() {
         if (lifecycleState != UNINITIALIZED && lifecycleState != STOPPED) {
             throw new IllegalStateException("Scope must be in UNINITIALIZED or STOPPED state [" + lifecycleState + "]");
         }
-        lifecycleState = RUNNING;
-    }
+
+        // Get a scheduler and scheduled a task to be run in the future indefinitely until its explicitly shutdown. 
+        this.scheduler = Executors.newSingleThreadScheduledExecutor();
+        scheduler.scheduleAtFixedRate(new ConversationalInstanceReaper(this.instanceLifecycleCollection), 30, reaper_interval, TimeUnit.SECONDS);
+        
+        lifecycleState = RUNNING;        
+       }
 
     public synchronized void stop() {
+        
+        // Prevent the scheduler from submitting any additional reapers, initiate an orderly shutdown if a reaper task is in progress. 
+        if (this.scheduler != null) 
+                this.scheduler.shutdown(); 
+        
         lifecycleState = STOPPED;
     }
 
-    public void persistNew(RuntimeComponent component, String id, Object instance, long expiration)
-        throws PersistenceException {
-         try {
-            nonDurableStore.insertRecord(component, id, instance, expiration);
-        } catch (StoreWriteException e) {
-            throw new PersistenceException(e);
-        }
-    }
-
-    public void persist(RuntimeComponent component, String id, Object instance, long expiration)
-        throws PersistenceException {
-         try {
-            nonDurableStore.updateRecord(component, id, instance, expiration);
-        } catch (StoreWriteException e) {
-            throw new PersistenceException(e);
+    protected InstanceWrapper getInstanceWrapper(boolean create,Object contextId) throws TargetResolutionException {    
+        
+        InstanceLifeCycleWrapper anInstanceWrapper = this.instanceLifecycleCollection.get(contextId);
+        if (anInstanceWrapper == null && !create)
+                return null;
+        
+        if (anInstanceWrapper == null) 
+        {
+         anInstanceWrapper = new InstanceLifeCycleWrapper(contextId);  
+         this.instanceLifecycleCollection.put(contextId, anInstanceWrapper);
+        }
+        // If an existing intsance is found return it only if its not expired and update its 
+        // last referenced time. 
+        else
+        {
+          if (anInstanceWrapper.isExpired())
+             throw new ConversationEndedException();
+          anInstanceWrapper.updateLastReferencedTime();
         }
+        
+        return anInstanceWrapper.getInstanceWrapper();          
+                      
     }
-
+    
     @Override
+    public InstanceWrapper getWrapper(Object contextId) throws TargetResolutionException {
+        return getInstanceWrapper(true,contextId);
+    } 
+    
+    @Override
+    // Override the remove to make sure if this path ever gets enabled this 
+    // implementation gets updated.
+    //
     public void remove() throws PersistenceException {
-        remove(component);
     }
     
-    public void remove(RuntimeComponent component) throws PersistenceException {
-         String conversationId = getConversationId();
-        try {
-//            workContext.setCurrentAtomicComponent(component);
-            // FIXME this should be an InstanceWrapper and shouldn't we stop it?
-            Object instance = nonDurableStore.readRecord(component, conversationId);
-            if (instance != null) {
-                nonDurableStore.removeRecord(component, conversationId);
-            }
-        } catch (StoreReadException e) {
-            throw new PersistenceException(e);
-        } catch (StoreWriteException e) {
-            throw new PersistenceException(e);
+    // The remove is invoked when a conversation is explcitly ended.  This can occur by using the @EndsConversation or API.  
+    // In this case the instance is immediately removed.  A new conversation will be started on the next operation
+    // associated with this conversationId's service reference. 
+    //
+    public void remove(Object contextId) {
+        if (this.instanceLifecycleCollection.containsKey(contextId)) 
+        {
+         InstanceLifeCycleWrapper anInstanceLifeCycleWrapper = this.instanceLifecycleCollection.get(contextId);
+         this.instanceLifecycleCollection.remove(contextId);
+         anInstanceLifeCycleWrapper.removeInstanceWrapper();
+        } 
+    }  
+    
+    /*
+     *  This ia an inner class that keeps track of the lifecycle of a conversation scoped
+     *  implementation instance.   
+     * 
+     */
+    
+    private class InstanceLifeCycleWrapper 
+    {
+        private Object instanceId;
+        private long   creationTime;
+        private long   lastReferencedTime;
+        private long   expirationInterval;
+        private long   maxIdleTime;
+        
+        private InstanceLifeCycleWrapper(Object contextId) throws TargetResolutionException
+        {
+         this.instanceId = contextId;
+         this.creationTime = System.currentTimeMillis();
+         this.lastReferencedTime = this.creationTime;
+         this.expirationInterval = max_age;
+         this.maxIdleTime = max_idle_time;
+         this.createInstance();
         }
-    }
-
-    public InstanceWrapper getWrapper(KEY contextId) throws TargetResolutionException {
         
-        boolean create = true; // FIXME
+        private boolean isExpired() 
+        {               
+         long currentTime = System.currentTimeMillis(); 
+         if ((this.lastReferencedTime +  this.maxIdleTime) < currentTime) // max idle time exceeded
+             return true;
+         if ((this.creationTime + this.expirationInterval) < currentTime) // max time to live exceeded
+             return true;
+             
+         return false;
+        }
         
-        String conversationId = getConversationId();
-        try {
-//            workContext.setCurrentAtomicComponent(component);
-            InstanceWrapper wrapper = (InstanceWrapper)nonDurableStore.readRecord(component, conversationId);
-            if (wrapper != null) {
-//                if (component.getMaxIdleTime() > 0) {
-//                    // update expiration
-//                    long expire = System.currentTimeMillis() + component.getMaxIdleTime();
-//                    nonDurableStore.updateRecord(component, conversationId, wrapper, expire);
-//                }
-            } else if (create) {
-                // FIXME should the store really be persisting the wrappers
-                wrapper = createInstanceWrapper();
-                wrapper.start();
-                long expire = calculateExpiration(component);
-                nonDurableStore.insertRecord(component, conversationId, wrapper, expire);
-            }
-            return wrapper;
-        } catch (StoreReadException e) {
-            throw new TargetResolutionException("Error retrieving target instance", e);
-        } catch (StoreWriteException e) {
-            throw new TargetResolutionException("Error persisting target instance", e);
-        } finally {
-//            workContext.setCurrentAtomicComponent(null);
+        private void updateLastReferencedTime() 
+        {
+         this.lastReferencedTime = System.currentTimeMillis();
         }
+        
+        //
+        // Return the backing implementation instance  
+        //
+        private InstanceWrapper getInstanceWrapper()
+        {
+          InstanceWrapper ctx = wrappers.get(this.instanceId);
+          return ctx;
+        }
+        
+        private void removeInstanceWrapper()
+        {
+          wrappers.remove(this.instanceId);       
+        }
+        
+        private void createInstance()throws TargetResolutionException 
+        {
+            InstanceWrapper instanceWrapper = createInstanceWrapper();
+            instanceWrapper.start();
+            wrappers.put(this.instanceId, instanceWrapper);       
+        }
+        
     }
-
-    /**
-     * Returns the conversation id associated with the current invocation
-     * context
-     * 
-     * @return the conversation id
-     */
-    private String getConversationId() {
-        String conversationId = ThreadMessageContext.getMessageContext().getConversationID();
-        assert conversationId != null;
-        return conversationId;
-    }
-
-     private long calculateExpiration(RuntimeComponent component) {
-//        if (component.getMaxAge() > 0) {
-//            long now = System.currentTimeMillis();
-//            return now + component.getMaxAge();
-//        } else if (component.getMaxIdleTime() > 0) {
-//            long now = System.currentTimeMillis();
-//            return now + component.getMaxIdleTime();
-//        } else {
-            return Store.DEFAULT_EXPIRATION_OFFSET;
-//        }
-    }
-
-    /**
-     * Receives expiration events from the store and notifies the corresponding
-     * atomic component
-     */
-    private static class ExpirationListener implements RuntimeEventListener {
-
-        public ExpirationListener() {
+    
+    //
+    // This inner class is an instance reaper.  It periodically iterates over the InstanceLifeCycleCollection
+    // and for any instances that have expired removes the backing instance and the entry in the InstanceLifeCycle 
+    // Collection.
+    //
+    class ConversationalInstanceReaper implements Runnable 
+    {
+        private Map<Object, InstanceLifeCycleWrapper> instanceLifecycleCollection;
+        
+        public ConversationalInstanceReaper(Map<Object, InstanceLifeCycleWrapper> aMap)
+        {
+         this.instanceLifecycleCollection = aMap;
         }
-
-        public void onEvent(Event event) {
-            if (event instanceof StoreExpirationEvent) {
-                StoreExpirationEvent expiration = (StoreExpirationEvent)event;
-                InstanceWrapper wrapper = (InstanceWrapper)expiration.getInstance();
-                try {
-                    wrapper.stop();
-                } catch (TargetDestructionException e) {
-                    // monitor.destructionError(e);
-                }
+        
+        public void run()
+        {
+          Iterator<Map.Entry<Object,InstanceLifeCycleWrapper>> anIterator = this.instanceLifecycleCollection.entrySet().iterator();             
+        
+          while (anIterator.hasNext())
+          {
+                Map.Entry<Object,InstanceLifeCycleWrapper> anEntry = anIterator.next();   
+            InstanceLifeCycleWrapper anInstanceLifeCycleWrapper = anEntry.getValue();
+            if (anInstanceLifeCycleWrapper.isExpired())
+            {
+              anInstanceLifeCycleWrapper.removeInstanceWrapper();
+              this.instanceLifecycleCollection.remove(anInstanceLifeCycleWrapper.instanceId);
             }
+          }             
         }
     }
 }

Modified: incubator/tuscany/java/sca/modules/host-embedded/src/main/java/org/apache/tuscany/sca/host/embedded/impl/ReallySmallRuntimeBuilder.java
URL: http://svn.apache.org/viewvc/incubator/tuscany/java/sca/modules/host-embedded/src/main/java/org/apache/tuscany/sca/host/embedded/impl/ReallySmallRuntimeBuilder.java?view=diff&rev=558025&r1=558024&r2=558025
==============================================================================
--- incubator/tuscany/java/sca/modules/host-embedded/src/main/java/org/apache/tuscany/sca/host/embedded/impl/ReallySmallRuntimeBuilder.java (original)
+++ incubator/tuscany/java/sca/modules/host-embedded/src/main/java/org/apache/tuscany/sca/host/embedded/impl/ReallySmallRuntimeBuilder.java Fri Jul 20 08:54:21 2007
@@ -231,7 +231,7 @@
         ScopeContainerFactory[] factories = new ScopeContainerFactory[] {new CompositeScopeContainerFactory(),
                                                                          new StatelessScopeContainerFactory(),
                                                                          new RequestScopeContainerFactory(),
-         new ConversationalScopeContainerFactory(new MemoryStore(null)),
+         new ConversationalScopeContainerFactory(null),
         // new HttpSessionScopeContainer(monitor)
         };
         for (ScopeContainerFactory f : factories) {



---------------------------------------------------------------------
To unsubscribe, e-mail: tuscany-commits-unsubscribe@ws.apache.org
For additional commands, e-mail: tuscany-commits-help@ws.apache.org