You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@river.apache.org by pe...@apache.org on 2014/02/02 13:31:10 UTC

svn commit: r1563596 [2/3] - in /river/jtsk/skunk/qa_refactor/trunk: qa/src/com/sun/jini/test/impl/outrigger/matching/ qa/src/com/sun/jini/test/impl/servicediscovery/event/ qa/src/com/sun/jini/test/spec/lookupservice/test_set00/ qa/src/com/sun/jini/tes...

Modified: river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/ServiceDiscoveryManager.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/ServiceDiscoveryManager.java?rev=1563596&r1=1563595&r2=1563596&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/ServiceDiscoveryManager.java (original)
+++ river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/ServiceDiscoveryManager.java Sun Feb  2 12:31:09 2014
@@ -17,6 +17,7 @@
  */
 package net.jini.lookup;
 
+import org.apache.river.impl.thread.DependencyLinker;
 import au.net.zeus.collection.RC;
 import au.net.zeus.collection.Ref;
 import au.net.zeus.collection.Referrer;
@@ -28,7 +29,6 @@ import java.rmi.RemoteException;
 import java.rmi.server.ExportException;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -39,12 +39,12 @@ import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.FutureTask;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.RunnableFuture;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -82,9 +82,14 @@ import net.jini.security.BasicProxyPrepa
 import net.jini.security.ProxyPreparer;
 import net.jini.security.TrustVerifier;
 import net.jini.security.proxytrust.ServerProxyTrust;
+import org.apache.river.api.util.FutureObserver;
+import org.apache.river.api.util.FutureObserver.ObservableFuture;
+import org.apache.river.api.util.FutureObserver.Subscribeable;
+import org.apache.river.api.util.FutureObserver.Subscriber;
 import org.apache.river.impl.thread.ExtensibleExecutorService;
 import org.apache.river.impl.thread.ExtensibleExecutorService.RunnableFutureFactory;
 import org.apache.river.impl.thread.NamedThreadFactory;
+import org.apache.river.impl.thread.ObservableFutureTask;
 
 /**
  * The <code>ServiceDiscoveryManager</code> class is a helper utility class
@@ -642,6 +647,12 @@ public class ServiceDiscoveryManager {
         }//end getSeqN
         
 	public abstract void run();
+        
+        public abstract boolean hasDeps();
+        
+        public boolean dependsOn(CacheTask task){
+            return false;
+        }
     }//end class ServiceDiscoveryManager.CacheTask
 
     /** Abstract base class for controlling the order-of-execution of tasks
@@ -666,23 +677,24 @@ public class ServiceDiscoveryManager {
          *    then run this task *after* the task in the list (return true);
          *    otherwise run this task now (return false).
          *
-         *  @param tasks the tasks to consider.
-         *  @param size elements with index less than size are considered.
          */
-//        public boolean runAfter(List tasks, int size) {
-//            for(int i=0; i<size; i++) {
-//                Runnable t = (Runnable) tasks.get(i);
-//                //Compare only instances of this task class
-//                if( !(t instanceof ServiceIdTask) )  continue;
-//                ServiceID otherTaskSid = ((ServiceIdTask)t).getServiceID();
-//                if( thisTaskSid.equals(otherTaskSid) ) {
-//                    if(thisTaskSeqN > ((ServiceIdTask)t).getSeqN()) {
-//                        return true;//run this task after the other task
-//                    }//endif
-//                }//endif
-//            }//end loop
-//            return false;//run this task now
-//        }//end runAfter
+        @Override
+        public boolean dependsOn(CacheTask t) {
+            //Compare only instances of this task class
+            if( !(t instanceof ServiceIdTask) )  return false;
+            ServiceID otherTaskSid = ((ServiceIdTask)t).getServiceID();
+            if( thisTaskSid.equals(otherTaskSid) ) {
+                if(thisTaskSeqN > ((ServiceIdTask)t).getSeqN()) {
+                    return true;//run this task after the other task
+                }//endif
+            }//endif
+            return false;//run this task now
+        }//end dependsOn
+
+        @Override
+        public boolean hasDeps(){
+            return true;
+        }
 
         /** Returns the ServiceID associated with this task. */
         public ServiceID getServiceID() {
@@ -699,11 +711,14 @@ public class ServiceDiscoveryManager {
     {
 	private final List items = new LinkedList();
         
+        @Override
 	public synchronized void serviceAdded(ServiceDiscoveryEvent event) {
             items.add(event.getPostEventServiceItem());
             this.notifyAll();
 	}
+        @Override
 	public void serviceRemoved(ServiceDiscoveryEvent event){ }
+        @Override
 	public void serviceChanged(ServiceDiscoveryEvent event){ }
 	public synchronized ServiceItem[] getServiceItem() {
 	    ServiceItem[] r = new ServiceItem[items.size()];
@@ -718,27 +733,71 @@ public class ServiceDiscoveryManager {
      * number. For each LookupCache, there is a HashMap that maps a ProxyReg
      * to an EventReg.
      */
-    private final static class EventReg  {
+    private final static class EventReg {
         /* The Event source from the event registration */
         final Object source;
         /* The Event ID */
         public final long eventID;
 	/* The current event sequence number for the Service template */
-	public long seqNo;
+	private long seqNo;
 	/* The Event notification lease */
 	public final Lease lease;
 	/* The pending events */
-	public final ArrayList pending;
+	private final List<LookupCacheImpl.NotifyEventTask> pending;
 	/* The number of pending LookupTasks */
-	public int lookupsPending;
+	private int lookupsPending;
+        
         public EventReg(Object source, long eventID, long seqNo, Lease lease) {
 	    this.source  = source;
             this.eventID = eventID;
 	    this.seqNo   = seqNo;
 	    this.lease   = lease;
-	    this.pending = new ArrayList();
+            pending = new ArrayList<LookupCacheImpl.NotifyEventTask>();
 	    this.lookupsPending = 0;
 	}
+
+        /**
+         * @param seqNo the seqNo to set, return a positive delta if
+         * successful, otherwise a negative value indicates failure.
+         */
+        public synchronized long updateSeqNo(long seqNo) {
+            long thisSeqNo = this.seqNo;
+            if (seqNo > this.seqNo){
+                this.seqNo = seqNo;
+                return thisSeqNo - seqNo;
+            } else {
+                return thisSeqNo - seqNo; // Return a negative value
+            }
+        }
+        
+        public synchronized boolean addIfLookupsPending(CacheTask task){
+            if (task instanceof LookupCacheImpl.NotifyEventTask){
+                if (getLookupsPending() > 0){
+                    pending.add((LookupCacheImpl.NotifyEventTask) task);
+                }
+            }
+            return false;
+        }
+//        
+//        public synchronized void addLookupPending(ObservableFutureTask task){
+//            task.addObserver(this);
+//            pendingLookups.add(task);
+//        }
+
+        /**
+         * @return the lookupsPending
+         */
+        public synchronized int getLookupsPending() {
+            return lookupsPending;
+        }
+        
+        public synchronized void decrementLookupsPending(){
+            lookupsPending--;
+        }
+
+        public synchronized void incrementLookupsPending(){
+            lookupsPending++;
+        }
     }//end class ServiceDiscoveryManager.EventReg
 
     /**
@@ -748,7 +807,7 @@ public class ServiceDiscoveryManager {
      */
     private final static class ServiceItemReg  {
 	/* Maps ServiceRegistrars to their latest registered item */
-	private final Map<ServiceRegistrar,ServiceItem> items = new HashMap<ServiceRegistrar,ServiceItem>();
+	private final Map<ServiceRegistrar,ServiceItem> items;
 	/* The ServiceRegistrar currently being used to track changes */
 	private ServiceRegistrar proxy;
 	/* Flag that indicates that the ServiceItem has been discarded. */
@@ -761,6 +820,7 @@ public class ServiceDiscoveryManager {
          * lookup service proxy.
          */
 	public ServiceItemReg(ServiceRegistrar proxy, ServiceItem item) {
+            items = new HashMap<ServiceRegistrar,ServiceItem>();
 	    this.proxy = proxy;
 	    items.put(proxy, item);
 	    this.item = item;
@@ -854,22 +914,12 @@ public class ServiceDiscoveryManager {
     /** Allows termination of LookupCacheImpl so blocking lookup can return
      * quickly
      */
-    private final class LookupCacheTerminator extends Thread {
+    private static final class LookupCacheTerminator implements Runnable {
         private final BlockingQueue<LookupCacheImpl> cacheList = new LinkedBlockingQueue<LookupCacheImpl>(20);
-        LookupCacheTerminator(){
-            super("SDM lookup cache terminator");
-            setDaemon(true);
-        }
         
-        public void start(){
-            synchronized (this){
-                started = true;
-            }
-            super.start();
-        }
         
         public void run(){
-            while (!isInterrupted()) {
+            while (!Thread.currentThread().isInterrupted()) {
                 try {
                     LookupCacheImpl cache = cacheList.take();
                     synchronized (cache){
@@ -894,69 +944,94 @@ public class ServiceDiscoveryManager {
         
     }
     
-    private static class DependencyLinker {
-
-        private final ExecutorService executor;
-
-        DependencyLinker(ExecutorService ex){
-            executor = new ExtensibleExecutorService
-            (
-                ex, 
-                new RunnableFutureFactory(){
-
-                    @Override
-                    public <T> RunnableFuture<T> newTaskFor(Runnable r, T value) {
-                        return new FutureTaskSeqNo<T>(r, value);
-                    }
-
-                    @Override
-                    public <T> RunnableFuture<T> newTaskFor(Callable<T> c) {
-                        return new FutureTaskSeqNo<T>(c);
-                    }
-                }
-            );
-        }
-
-        void execute(CacheTask task)
-        {
-            executor.submit(task);
-        }
-    }
-    
     /**
      * 
      * @param <T> 
      */
-    public static final class FutureTaskSeqNo<T> extends FutureTask<T> implements Comparable<FutureTaskSeqNo>
+    static final class CacheTaskWrapper<T> extends ObservableFutureTask<T> 
     {
-        private final long seqNo;
+        private final CacheTask task;
 
-        private FutureTaskSeqNo(Runnable r, T result) {
+        private CacheTaskWrapper(Runnable r, T result) {
             super(r, result);
-            if (r instanceof CacheTask) seqNo = ((CacheTask)r).getSeqN();
-            else seqNo = -1;
+            if (r instanceof CacheTask) task = (CacheTask)r;
+            else task = null;
         }
 
-        private FutureTaskSeqNo(Callable<T> c)
+        private CacheTaskWrapper(Callable<T> c)
         {
             super(c);
-            if (c instanceof CacheTask) seqNo = ((CacheTask)c).getSeqN();
-            else seqNo = -1;
+            if (c instanceof CacheTask) task = (CacheTask)c;
+            else task = null;
+        }
+
+        private CacheTask getTask(){
+            return task;
+        }
+    }
+    
+    private static final class CacheTaskQueue implements FutureObserver {
+        // CacheTasks pending completion.
+        private final ConcurrentLinkedQueue<CacheTaskWrapper> pending;
+        private final ExecutorService executor;
+        
+        private CacheTaskQueue(ExecutorService e){
+            this.pending = new ConcurrentLinkedQueue<CacheTaskWrapper>();
+            executor = e;
+        }
+        
+        private CacheTaskWrapper submit(CacheTask t){
+            CacheTaskWrapper future = new CacheTaskWrapper(t, null);
+            pending.offer(future);
+            future.addObserver(this);
+            if (t.hasDeps()) {
+                List<ObservableFuture> deps = new ArrayList<ObservableFuture>();
+                Iterator<CacheTaskWrapper> it = pending.iterator();
+                while (it.hasNext()){
+                    CacheTaskWrapper w = it.next();
+                    CacheTask c = w.getTask();
+                    if (t.dependsOn(c)) {
+                        deps.add(w);
+                    }
+                }
+                if (deps.isEmpty()){
+                    executor.submit(future);
+                } else {
+                    DependencyLinker linker = new DependencyLinker(executor, deps, future);
+                    linker.register();
+                }
+            } else {
+                executor.submit(future);
+            }
+            return future;
         }
 
         @Override
-        public int compareTo(FutureTaskSeqNo o) {
-            if (seqNo < o.seqNo) return -1;
-            if (seqNo > o.seqNo) return 1;
-            return 0;
+        public void futureCompleted(Future e) {
+            pending.remove(e);
         }
+        
+        /** Removes from the cache's task queue, all pending tasks
+         *  associated with the given ProxyReg. This method is called
+         *  when the given ProxyReg has been discarded.
+	 */
+	private void removeUselessTask(ProxyReg reg) {
+            Iterator<CacheTaskWrapper> it = pending.iterator();
+            while( it.hasNext()) {
+                CacheTaskWrapper w = it.next();
+                CacheTask t = w.getTask();
+                if(t.isFromProxy(reg)) {
+                    w.cancel(true); // Also causes task to be removed
+    }
+            }//end loop
+	}//end LookupCacheImpl.removeUselessTask
     }
     
     /** Internal implementation of the LookupCache interface. Instances of
      *  this class are used in the blocking versions of lookup() and are
      *  returned by createLookupCache.
      */
-    private final class LookupCacheImpl implements LookupCache {
+    private static final class LookupCacheImpl implements LookupCache {
         
 	/* RemoteEventListener class that is registered with the proxy to
          * receive notifications from lookup services when any ServiceItem
@@ -1016,59 +1091,95 @@ public class ServiceDiscoveryManager {
          *  process has completed, resulting in an incorrect view of the
          *  current state of the ServiceRegistrar.
          */
-        private final class RegisterListenerTask extends CacheTask {
-            public RegisterListenerTask(ProxyReg reg, long seqN) {
+        private static final class RegisterListenerTask extends CacheTask 
+        implements Subscribeable {
+            private final LookupCacheImpl cache;
+            private Subscriber subscriber;
+            public RegisterListenerTask(
+                    ProxyReg reg, 
+                    long seqN, 
+                    LookupCacheImpl cache) 
+            {
                 super(reg, seqN);
+                this.cache = cache;
 	    }
 
+            public boolean hasDeps(){
+                return true;
+            }
+            
+            public boolean dependsOn(CacheTask t){
+                if (t instanceof ProxyRegDropTask ){
+                    ProxyReg r = getProxyReg();
+                    if (r != null && r.equals(t.getProxyReg())){
+                        if (t.getSeqN() < getSeqN()) return true;
+                    }
+                }
+                return false;
+            }
+
             public void run() {
                 logger.finest("ServiceDiscoveryManager - RegisterListenerTask "
                               +"started");
-		long duration = getLeaseDuration();
+		long duration = cache.getLeaseDuration();
 		if(duration < 0)  return;
                 try {
-                    EventReg eventReg = registerListener(reg.getProxy(),
-                                                         tmpl,
-                                                         lookupListenerProxy,
+                    EventReg eventReg = cache.sdm.registerListener(reg.getProxy(),
+                                                         cache.tmpl,
+                                                         cache.lookupListenerProxy,
                                                          duration);
                     // eventReg is a new object not visible to other threads yet.
-                    eventReg.lookupsPending++;
+                    eventReg.incrementLookupsPending();
                     /* Cancel the lease if the cache has been terminated */
-                    if(bCacheTerminated) {
+                    if(cache.bCacheTerminated || Thread.currentThread().isInterrupted()) {
                         // eventReg.lease is final and is already safely published
-                        cancelLease(eventReg.lease);
+                        cache.sdm.cancelLease(eventReg.lease);
                         return;
                     } else {
                         // eventReg will be published safely to other threads 
                         // via eventRegMap and LookupTask
-                        EventReg existed = eventRegMap.putIfAbsent(reg, eventReg);
+                        EventReg existed = cache.eventRegMap.putIfAbsent(reg, eventReg);
                         if (existed == null ) {
-                            (new LookupTask(reg, this.getSeqN(), eventReg)).run();
+                            ObservableFuture task = 
+                                cache.taskQueue.submit( 
+                                    new LookupTask(reg, this.getSeqN(), eventReg, cache)
+                                );
+                             synchronized (this){
+                                if (subscriber != null) subscriber.reccommendedViewing(task);
+                            }
                         } else {
                             // Another eventReg.lease exists, cancel new lease.
-                            cancelLease(eventReg.lease);
+                            cache.sdm.cancelLease(eventReg.lease);
                         }
                     }//endif
                     /* Execute the LookupTask only if there were no problems */
 		   
                 } catch (Exception e) {
-                    ServiceDiscoveryManager.this.fail
+                    cache.sdm.fail
                          (e, reg.getProxy(),this.getClass().getName(),"run",
                           "Exception occurred while attempting to register "
                           +"with the lookup service event mechanism",
-                          bCacheTerminated);
+                          cache.bCacheTerminated);
                 }
                 logger.finest("ServiceDiscoveryManager - RegisterListenerTask "
                               +"completed");
             }//end run
+
+            @Override
+            public synchronized void subscribe(Subscriber subscriber) {
+                this.subscriber = subscriber;
+            }
 	}//end class LookupCacheImpl.RegisterListenerTask
 
 	/** This class requests a "snapshot" of the given registrar's state.*/
-        private final class LookupTask extends CacheTask {
+        private static final class LookupTask extends CacheTask implements Subscribeable {
 	    private final EventReg eReg;
-            public LookupTask(ProxyReg reg, long seqN, EventReg eReg) {
+            private final LookupCacheImpl cache;
+            private Subscriber subscriber;
+            public LookupTask(ProxyReg reg, long seqN, EventReg eReg, LookupCacheImpl cache) {
                 super(reg, seqN);
 		this.eReg = eReg;
+                this.cache = cache;
 	    }
             public void run() {
                 logger.finest("ServiceDiscoveryManager - LookupTask started");
@@ -1076,17 +1187,13 @@ public class ServiceDiscoveryManager {
 		ServiceMatches matches;
                 /* For the given lookup, get all services matching the tmpl */
 		try {
-		    matches = proxy.lookup(tmpl, Integer.MAX_VALUE);
+		    matches = proxy.lookup(cache.tmpl, Integer.MAX_VALUE);
 		} catch (Exception e) {
-                    boolean cacheTerminated;
-                    synchronized(eReg) {
-			eReg.lookupsPending--;
-                        cacheTerminated = bCacheTerminated;
-                    }//end sync
-                    ServiceDiscoveryManager.this.fail
+                    eReg.decrementLookupsPending();
+                    cache.sdm.fail
                            (e,proxy,this.getClass().getName(),"run",
                             "Exception occurred during call to lookup",
-                            cacheTerminated);
+                            cache.bCacheTerminated);
 		    return;
 		}
                 if(matches.items == null) {
@@ -1096,41 +1203,61 @@ public class ServiceDiscoveryManager {
                               +"null 'items' field");
                 }
                 /* 1. Cleanup "orphaned" itemReg's. */
-                Iterator<Map.Entry<ServiceID,ServiceItemReg>> iter = (serviceIdMap.entrySet()).iterator();
+                Iterator<Map.Entry<ServiceID,ServiceItemReg>> iter 
+                        = cache.serviceIdMap.entrySet().iterator();
                 while(iter.hasNext()) {
                     Map.Entry<ServiceID,ServiceItemReg> e = iter.next();
                     ServiceID srvcID = e.getKey();
-                    ServiceItem itemInSnapshot = findItem(srvcID,
+                    ServiceItem itemInSnapshot = cache.findItem(srvcID,
                                                           matches.items);
                     if(itemInSnapshot != null) continue;//not an orphan
+                    if (Thread.currentThread().isInterrupted()) continue; // skip
                     ServiceItemReg itemReg = e.getValue();
-                    UnmapProxyTask t = new UnmapProxyTask(reg,
+                    CacheTask t = 
+                        new UnmapProxyTask(
+                                reg,
                                                           itemReg,
                                                           srvcID,
-                                                          taskSeqN.getAndIncrement());
-                    cacheDependencyLinker.execute(t);
+                                getSeqN(),
+                                cache
+                        );
+                    ObservableFuture task = cache.taskQueue.submit(t);
+                    synchronized (this){
+                        if (subscriber != null) subscriber.reccommendedViewing(task);
+                    }
                 }//end loop
                 /* 2. Handle "new" and "old" items from the given lookup */
                 for(int i=0; i<(matches.items).length; i++) {
                     /* Skip items with null service field (Bug 4378751) */
                     if( (matches.items[i]).service == null )  continue;
-                    NewOldServiceTask t =
-                                    new NewOldServiceTask(reg,
+                    if (Thread.currentThread().isInterrupted()) continue; // skip
+                    CacheTask t = 
+                                new NewOldServiceTask(
+                                        reg,
                                                           matches.items[i],
                                                           false,
-                                                          taskSeqN.getAndIncrement());
-                    cacheDependencyLinker.execute(t);
+                                        getSeqN(),
+                                        cache
+                                );
+                    
+                    ObservableFuture task = cache.taskQueue.submit(t);
+                    synchronized (this){
+                        if (subscriber != null) subscriber.reccommendedViewing(task);
+                    }
                 }//end loop
                 /* 3. Handle events that came in prior to lookup */
                 synchronized (eReg){
-                    eReg.lookupsPending--;
+                    eReg.decrementLookupsPending();
+                    if (eReg.getLookupsPending() == 0){
                     Iterator it = eReg.pending.iterator() ;
                     while (it.hasNext()) {
+                            if (Thread.currentThread().isInterrupted()) continue; // skip
                         NotifyEventTask t = (NotifyEventTask) it.next();
-                        t.thisTaskSeqN = taskSeqN.getAndIncrement(); // assign new seqN
-                        cacheDependencyLinker.execute(t);
+                            t.thisTaskSeqN = cache.taskSeqN.getAndIncrement(); // assign new seqN
+                            cache.taskQueue.submit(t);
                     }
                     eReg.pending.clear();
+                    }
                 }//end sync(eReg)
                 logger.finest("ServiceDiscoveryManager - LookupTask "
                               +"completed");
@@ -1154,21 +1281,27 @@ public class ServiceDiscoveryManager {
              *  @param tasks the tasks to consider.
              *  @param size elements with index less than size are considered.
              */
-//            public boolean runAfter(List tasks, int size) {
-//                for(int i=0; i<size; i++) {
-//                    CacheTask t = (CacheTask)tasks.get(i);
-//                    if(   t instanceof RegisterListenerTask
-//                       || t instanceof LookupTask
-//                       || t instanceof NotifyEventTask )
-//                    {
-//                        ProxyReg otherReg = t.getProxyReg();
-//                        if( reg.equals(otherReg) ) {
-//                            if(thisTaskSeqN > t.getSeqN()) return true;
-//                        }//endif
-//                    }//endif
-//                }//end loop
-//                return false;
-//            }//end runAfter
+            public boolean dependsOn(CacheTask t) {
+                if(   t instanceof RegisterListenerTask
+                   || t instanceof LookupTask
+                   || t instanceof NotifyEventTask )
+                {
+                    ProxyReg otherReg = t.getProxyReg();
+                    if( reg.equals(otherReg) ) {
+                        if(thisTaskSeqN > t.getSeqN()) return true;
+                    }//endif
+                }//endif
+                return false;
+            }//end dependsOn
+
+            public boolean hasDeps(){
+                return true;
+            }
+
+            @Override
+            public synchronized void subscribe(Subscriber subscriber) {
+                this.subscriber = subscriber;
+            }
 
 	}//end class LookupCacheImpl.LookupTask
 
@@ -1176,35 +1309,72 @@ public class ServiceDiscoveryManager {
          *  remove the registrar from the various maps maintained by this
          *  cache.
          */
-        private final class ProxyRegDropTask extends CacheTask {
-            public ProxyRegDropTask(ProxyReg reg, long seqN) {
+        private static final class ProxyRegDropTask extends CacheTask
+        implements Subscribeable {
+            
+            private final LookupCacheImpl cache;
+            private final EventReg eReg;
+            private Subscriber subscriber;
+            
+            public ProxyRegDropTask(
+                    ProxyReg reg, EventReg eReg, long seqN, LookupCacheImpl cache)
+            {
             super(reg, seqN);
+            this.cache = cache;
+            this.eReg = eReg;
 	    }
             public void run() {
                 logger.finest("ServiceDiscoveryManager - ProxyRegDropTask "
                               +"started");
                 //lease has already been cancelled by removeProxyReg
-                eventRegMap.remove(reg);
+                cache.eventRegMap.remove(reg, eReg);
                 /* For each itemReg in the serviceIdMap, disassociate the
                  * lookup service referenced here from the itemReg; and
                  * if the itemReg then has no more lookup services associated
                  * with it, remove the itemReg from the map and send a
                  * service removed event.
                  */
-                Iterator iter = (serviceIdMap.entrySet()).iterator();
+                Iterator iter = (cache.serviceIdMap.entrySet()).iterator();
                 while(iter.hasNext()) {
                     Map.Entry e = (Map.Entry)iter.next();
                     ServiceID srvcID = (ServiceID)e.getKey();
                     ServiceItemReg itemReg = (ServiceItemReg)e.getValue();
-                    UnmapProxyTask t = new UnmapProxyTask(reg,
+                    UnmapProxyTask t = 
+                            new UnmapProxyTask(
+                                    reg,
                                                           itemReg,
                                                           srvcID,
-                                                          taskSeqN.getAndIncrement());
-                    cacheDependencyLinker.execute(t);
+                                    getSeqN(),
+                                    cache);
+                    ObservableFuture task = cache.taskQueue.submit(t);
+                    synchronized (this){
+                        if (subscriber != null) subscriber.reccommendedViewing(task);
+                    }
                 }//end loop
                 logger.finest("ServiceDiscoveryManager - ProxyRegDropTask "
                               +"completed");
             }//end run
+            
+            @Override
+            public boolean hasDeps(){
+                return true;
+            }
+            
+            public boolean dependsOn(CacheTask t){
+                if (t instanceof RegisterListenerTask ){
+                    ProxyReg r = getProxyReg();
+                    if (r != null && r.equals(t.getProxyReg())){
+                        if (t.getSeqN() < getSeqN()) return true;
+                    }
+                }
+                return false;
+            }
+
+            @Override
+            public synchronized void subscribe(Subscriber subscriber) {
+                this.subscriber = subscriber;
+            }
+            
 	}//end class LookupCacheImpl.ProxyRegDropTask
 
 	/** Task class used to asynchronously notify service discard. */
@@ -1214,6 +1384,10 @@ public class ServiceDiscoveryManager {
                 super(null, 0);
 		this.item = item;
 	    }
+                
+            public boolean hasDeps(){
+                return false;
+            }
 
             public void run() {
                 logger.finest("ServiceDiscoveryManager - DiscardServiceTask "
@@ -1228,11 +1402,15 @@ public class ServiceDiscoveryManager {
          *  discovery listeners of serviceAdded/serviceRemoved/serviceChanged
          *  events.
          */
-        private final class NotifyEventTask extends ServiceIdTask {
+        private static class NotifyEventTask extends ServiceIdTask 
+        implements Subscribeable {
 	    private final ServiceID sid;
 	    private final ServiceItem item;
 	    private final int transition;
-	    public NotifyEventTask(ProxyReg reg,
+            private final LookupCacheImpl cache;
+            private Subscriber subscriber;
+	    public NotifyEventTask(LookupCacheImpl cache,
+                                   ProxyReg reg,
 				   ServiceID sid,
 				   ServiceItem item,
 				   int transition,
@@ -1242,6 +1420,7 @@ public class ServiceDiscoveryManager {
 		this.sid = sid;
 		this.item = item;
 		this.transition = transition;
+                this.cache = cache;
 	    }//end constructor
 
             public void run() {
@@ -1258,7 +1437,8 @@ public class ServiceDiscoveryManager {
                  * because the primary if-block will be unintentionally
                  * entered due to the null service field in the ServiceItem.
                  */
-                if( (item != null) && (item.service == null) ) {
+                if( ((item != null) && (item.service == null))
+                        || Thread.currentThread().isInterrupted()) {
                     return;
                 }//endif
                 /* Handle the event by the transition type, and by whether
@@ -1266,11 +1446,14 @@ public class ServiceDiscoveryManager {
                  * item, or a newly discovered item.
                  */
 		if(transition == ServiceRegistrar.TRANSITION_MATCH_NOMATCH) {
-                    handleMatchNoMatch(reg.getProxy(), sid, item);
+                    cache.handleMatchNoMatch(reg.getProxy(), sid);
                 } else {//(transition == NOMATCH_MATCH or MATCH_MATCH)
-                    (new NewOldServiceTask(reg, item,
+                    ObservableFuture task = cache.taskQueue.submit(new NewOldServiceTask(reg, item,
                        (transition == ServiceRegistrar.TRANSITION_MATCH_MATCH),
-                                               thisTaskSeqN)).run();
+                                               thisTaskSeqN, cache));
+                    synchronized (this){
+                        if (subscriber != null) subscriber.reccommendedViewing(task);
+                    }
                 }//endif(transition)
                 logger.finest("ServiceDiscoveryManager - NotifyEventTask "
                               +"completed");
@@ -1300,20 +1483,26 @@ public class ServiceDiscoveryManager {
              *  @param tasks the tasks to consider.
              *  @param size elements with index less than size are considered.
              */
-//            public boolean runAfter(List tasks, int size) {
-//                for(int i=0; i<size; i++) {
-//                    Runnable t = (Runnable)tasks.get(i);
-//                    if(   t instanceof RegisterListenerTask
-//                       || t instanceof LookupTask )
-//                    {
-//                        ProxyReg otherReg = ((CacheTask)t).getProxyReg();
-//                        if( reg.equals(otherReg) ) {
-//                            if(thisTaskSeqN > ((CacheTask)t).getSeqN()) return true;
-//                        }//endif
-//                    }//endif
-//                }//end loop
-//                return super.runAfter(tasks, size);
-//            }//end runAfter
+            public boolean dependsOn(CacheTask t) {
+                if(   t instanceof RegisterListenerTask
+                   || t instanceof LookupTask )
+                {
+                    ProxyReg otherReg = t.getProxyReg();
+                    if( reg.equals(otherReg) ) {
+                        if(thisTaskSeqN > t.getSeqN()) return true;
+                    }//endif
+                }//endif
+                return super.dependsOn(t);
+            }//end runAfter
+
+            public boolean hasDeps(){
+                return true;
+            }
+
+            @Override
+            public synchronized void subscribe(Subscriber subscriber) {
+                this.subscriber = subscriber;
+            }
 
 	}//end class LookupCacheImpl.NotifyEventTask
 
@@ -1323,33 +1512,35 @@ public class ServiceDiscoveryManager {
          *  a filter retry on an item in which the cache's filter initially
          *  returned indefinite.
          */
-        private final class ServiceDiscardTimerTask implements Runnable
+        private static final class ServiceDiscardTimerTask implements Runnable
         {
             private final ServiceID serviceID;
 	    private final long endTime;
-            public ServiceDiscardTimerTask(ServiceID serviceID) {
+            private final LookupCacheImpl cache;
+            public ServiceDiscardTimerTask( LookupCacheImpl cache, ServiceID serviceID) {
                 this.serviceID = serviceID;
-                this.endTime = discardWait+System.currentTimeMillis();
+                this.cache = cache;
+                this.endTime = cache.sdm.discardWait + System.currentTimeMillis();
             }//end constructor
             public void run(){
                 logger.finest("ServiceDiscoveryManager - "
                               +"ServiceDiscardTimerTask started");
                 /* Exit if this cache has already been terminated. */
-                if(bCacheTerminated)  return;
+                if(cache.bCacheTerminated)  return;
                 /* Simply return if a MATCH_NOMATCH event arrived for this
                  * item prior to this task running and as a result, the item
                  * was removed from the map.
                  */
-                if(!serviceIdMap.containsKey(serviceID))  return;
+                if(!cache.serviceIdMap.containsKey(serviceID))  return;
                 long curDur = endTime-System.currentTimeMillis();
-                synchronized(serviceDiscardMutex) {
+                synchronized(cache.serviceDiscardMutex) {
                     /* Wait until interrupted or time expires */
                     while(curDur > 0) {
                         try {
-                            serviceDiscardMutex.wait(curDur);
+                            cache.serviceDiscardMutex.wait(curDur);
                         } catch(InterruptedException e){ }
                         /* Exit if this cache was terminated while waiting. */
-                        if(bCacheTerminated) return;
+                        if(cache.bCacheTerminated) return;
                         /* Either the wait period has completed or has been
                          * interrupted. If the service ID is no longer in
                          * in the serviceIdMap, then it's assumed that a
@@ -1360,7 +1551,7 @@ public class ServiceDiscoveryManager {
                          * re-discovered when it comes back on line. In that
                          * case, exit the thread.
                          */
-                        if(!serviceIdMap.containsKey(serviceID))  return;
+                        if(!cache.serviceIdMap.containsKey(serviceID))  return;
                         curDur = endTime-System.currentTimeMillis();
                     }//end loop
                 }//end sync
@@ -1386,10 +1577,11 @@ public class ServiceDiscoveryManager {
                  * event was already sent when the service was originally
                  * discarded.
                  */
-                ServiceItemReg itemReg = serviceIdMap.get(serviceID);
+                ServiceItemReg itemReg = cache.serviceIdMap.get(serviceID);
                 if(itemReg != null) {
                     ServiceItem item = null;
                     ServiceItem filteredItem = null;
+                    ServiceItem itemToSend;
                     synchronized(itemReg) {
 			if(!itemReg.isDiscarded()) return;
                         if(itemReg.filteredItem == null) {
@@ -1402,29 +1594,26 @@ public class ServiceDiscoveryManager {
                                                 (itemReg.item).service,
                                                 (itemReg.item).attributeSets);
                         }//endif
-                    }//end sync(itemReg)
-                    if(filteredItem != null) {//retry the filter
-                        if( filterPassFail(filteredItem,filter) ) {
-                            addFilteredItemToMap(item,filteredItem);
-                        } else {//'quietly' remove the item
-                            removeServiceIdMapSendNoEvent(serviceID);
-                            return;
+                        if(filteredItem != null) {//retry the filter
+                            if( cache.sdm.filterPassFail(filteredItem,cache.filter) ) {
+                                cache.addFilteredItemToMap(item,filteredItem);
+                            } else {//'quietly' remove the item
+                                cache.removeServiceIdMapSendNoEvent(serviceID, itemReg);
+                                return;
+                            }//endif
                         }//endif
-                    }//endif
-                    /* Either the filter was retried and passed, in which case,
-                     * the filtered itemCopy was placed in the map; or the
-                     * filter wasn't applied above (a non-null filteredItem
-                     * field in the itemReg in the map means that the filter
-                     * was applied at some previous time). In either case, the
-                     * service can now be "un-discarded", and a notification
-                     * that the service is now available can be sent.
-                     */
-                    ServiceItem itemToSend;
-                    synchronized(itemReg) {
+                        /* Either the filter was retried and passed, in which case,
+                         * the filtered itemCopy was placed in the map; or the
+                         * filter wasn't applied above (a non-null filteredItem
+                         * field in the itemReg in the map means that the filter
+                         * was applied at some previous time). In either case, the
+                         * service can now be "un-discarded", and a notification
+                         * that the service is now available can be sent.
+                         */
                         itemReg.setDiscarded(false);
                         itemToSend = itemReg.filteredItem;
                     }//end sync(itemReg)
-                    addServiceNotify(itemToSend);
+                    cache.addServiceNotify(itemToSend);
          
                 }//endif
                 logger.finest("ServiceDiscoveryManager - "
@@ -1475,26 +1664,32 @@ public class ServiceDiscoveryManager {
          *           queue another filter attempt for later but
          *           send NO removed event
          */
-        private final class NewOldServiceTask extends ServiceIdTask {
+        private static final class NewOldServiceTask extends ServiceIdTask {
             private final ServiceItem srvcItem;
             private final boolean matchMatchEvent;
-            public NewOldServiceTask(ProxyReg reg,
+            private final LookupCacheImpl cache;
+            public NewOldServiceTask(
+                    ProxyReg reg, 
                                      ServiceItem item,
                                      boolean matchMatchEvent,
-                                     long seqN)
+                    long seqN, 
+                    LookupCacheImpl cache)
             {
                 super(item.serviceID, reg, seqN);
                 this.srvcItem = item;
                 this.matchMatchEvent = matchMatchEvent;
+                this.cache = cache;
             }//end constructor
 
             public void run() {
                 logger.finest("ServiceDiscoveryManager - NewOldServiceTask "
                               +"started");
 		boolean previouslyDiscovered = false;
-                ServiceItemReg itemReg = serviceIdMap.get(thisTaskSid);
+                
+                ServiceItemReg itemReg = null;
+                itemReg = cache.serviceIdMap.get(thisTaskSid);
                 if (itemReg == null) {
-                    if( !eventRegMap.containsKey(reg) ) {
+                    if( !cache.eventRegMap.containsKey(reg) ) {
                         /* reg must have been discarded, simply return */
                         logger.finest("ServiceDiscoveryManager - "
                                       +"NewOldServiceTask completed");
@@ -1502,23 +1697,34 @@ public class ServiceDiscoveryManager {
                     }//endif
                     // else
                     itemReg = new ServiceItemReg( reg.getProxy(), srvcItem );
-                    ServiceItemReg existed = serviceIdMap.putIfAbsent( thisTaskSid, itemReg );
-                    if (existed != null) {
+                    ServiceItemReg existed = cache.serviceIdMap.putIfAbsent(thisTaskSid, itemReg);
+                    if (existed != null){
                         itemReg = existed;
-                        // Probably changed while we were stuffing around.
                         previouslyDiscovered = true;
                     }
                 } else {
                     previouslyDiscovered = true;
                 }
+                
                 if(previouslyDiscovered) {//a. old, previously discovered item
-                    itemMatchMatchChange(reg.getProxy(), srvcItem,
+//                        if (itemReg.hasNoProxys()) { // In danger of removal
+//                            newItemReg = (ServiceItemReg) itemReg.clone();
+//                            if ( serviceIdMap.replace(thisTaskSid, itemReg, newItemReg)){
+//                                itemReg = newItemReg;
+//                            }
+//                        }
+                    // If it didn't get replaced the added sync is reentrant,
+                    // otherwise we sync again in case it did get replaced.
+                        synchronized (itemReg){
+                            cache.itemMatchMatchChange(reg.getProxy(), srvcItem,
                                          itemReg, matchMatchEvent);
+                        }
+                    
                 } else {//b. newly discovered item
                     ServiceItem newFilteredItem =
-                                  filterMaybeDiscard(srvcItem, reg.getProxy(),false);
+                                  cache.filterMaybeDiscard(srvcItem, false);
                     if(newFilteredItem != null) {
-                        addServiceNotify(newFilteredItem);
+                        cache.addServiceNotify(newFilteredItem);
                     }//endif
                 }//endif
                 logger.finest("ServiceDiscoveryManager - NewOldServiceTask "
@@ -1557,15 +1763,14 @@ public class ServiceDiscoveryManager {
          *  service will not concurrently modify any state related to that
          *  service.
          */
-        private final class UnmapProxyTask extends ServiceIdTask {
+        private static final class UnmapProxyTask extends ServiceIdTask {
             private final ServiceItemReg itemReg;
-            public UnmapProxyTask(ProxyReg       reg,
-                                  ServiceItemReg itemReg,
-                                  ServiceID      srvcId,
-                                  long           seqN)
+            private final LookupCacheImpl cache;
+            public UnmapProxyTask(ProxyReg reg, ServiceItemReg itemReg, ServiceID srvcId, long seqN, LookupCacheImpl cache)
             {
                 super(srvcId, reg, seqN);
                 this.itemReg = itemReg;
+                this.cache = cache;
             }//end constructor
 
             public void run() {
@@ -1573,19 +1778,25 @@ public class ServiceDiscoveryManager {
                               +"started");
 		ServiceRegistrar proxy = null;
                 ServiceItem item;
+                boolean notify = false;
                 synchronized(itemReg) {
                     item = itemReg.removeProxy(reg.getProxy());//disassociate the LUS
 		    if (item != null) {// new LUS chosen to track changes
 			proxy = itemReg.proxy;
-		    } else if( itemReg.hasNoProxys() ) {//no more LUSs, remove from map
+		    } else if( itemReg.hasNoProxys()) {//no more LUSs, remove from map
                         item = itemReg.filteredItem;
+                        boolean removed = false;
+                        removed = (cache.removeServiceIdMapSendNoEvent(thisTaskSid, itemReg));
+                        if (removed && !itemReg.isDiscarded()){
+                            itemReg.setDiscarded(true);
+                            notify = true;
+                        }
                     }//endif
                 }//end sync(itemReg)
 		if(proxy != null) {
-		    itemMatchMatchChange(proxy, item, itemReg, false);
-		} else if(item != null) {
-		    removeServiceIdMap(thisTaskSid,item);
+                        cache.itemMatchMatchChange(proxy, item, itemReg, false);
 		}//endif
+                if (notify) cache.removeServiceNotify(item);
                 logger.finest("ServiceDiscoveryManager - UnmapProxyTask "
                               +"completed");
             }//end run
@@ -1603,11 +1814,11 @@ public class ServiceDiscoveryManager {
 	private volatile RemoteEventListener lookupListenerProxy;
         /** Task manager for the various tasks executed by this LookupCache */
         private volatile ExecutorService cacheTaskMgr;
-        private volatile DependencyLinker cacheDependencyLinker;
+        private volatile CacheTaskQueue taskQueue;
 	/* Flag that indicates if the LookupCache has been terminated. */
 	private volatile boolean bCacheTerminated = false;
 	/* Contains the ServiceDiscoveryListener's that receive local events */
-	private final ArrayList<ServiceDiscoveryListener> sItemListeners = new ArrayList<ServiceDiscoveryListener>(1);
+	private final List<ServiceDiscoveryListener> sItemListeners = new ArrayList<ServiceDiscoveryListener>(1);
 	/* Map from ServiceID to ServiceItemReg */
 	private final ConcurrentMap<ServiceID,ServiceItemReg> serviceIdMap = new ConcurrentHashMap<ServiceID,ServiceItemReg>();
 	/* Map from ProxyReg to EventReg: (proxyReg, {source,id,seqNo,lease})*/
@@ -1636,10 +1847,14 @@ public class ServiceDiscoveryManager {
          */
         private final AtomicLong taskSeqN = new AtomicLong();
 
-	public LookupCacheImpl(ServiceTemplate tmpl,
+        private final ServiceDiscoveryManager sdm;
+
+	public LookupCacheImpl(
+                ServiceTemplate tmpl, 
 			       ServiceItemFilter filter,
 			       ServiceDiscoveryListener sListener,
-			       long leaseDuration)     throws RemoteException
+                long leaseDuration, 
+                ServiceDiscoveryManager sdm) throws RemoteException
         {
             this.serviceDiscardFutures = RC.concurrentMap(new ConcurrentHashMap<Referrer<ServiceID>,Referrer<Future>>(), Ref.WEAK_IDENTITY, Ref.STRONG, 60000, 60000);
 	    this.tmpl = copyServiceTemplate(tmpl);
@@ -1647,6 +1862,7 @@ public class ServiceDiscoveryManager {
 	    this.filter = filter;
             lookupListener = new LookupListener();
             if(sListener != null ) sItemListeners.add(sListener);
+            this.sdm = sdm;
 	}//end constructor
 
 	// This method's javadoc is inherited from an interface of this class
@@ -1655,12 +1871,11 @@ public class ServiceDiscoveryManager {
                 if(bCacheTerminated) return;//allow for multiple terminations
                 bCacheTerminated = true;
             }//end sync
-            synchronized (caches){
-                caches.remove(this);
+            synchronized (sdm.caches){
+                sdm.caches.remove(this);
             }
             /* Terminate all tasks: first, terminate this cache's TaskManager*/
             cacheTaskMgr.shutdownNow();
-            cacheTaskMgr = null;
             /* Terminate ServiceDiscardTimerTasks running for this cache */
             synchronized(serviceDiscardMutex) {
                 serviceDiscardTimerTaskMgr.shutdownNow();
@@ -1671,7 +1886,7 @@ public class ServiceDiscoveryManager {
             while(iter.hasNext()) {
                 Map.Entry e = (Map.Entry)iter.next();
                 EventReg eReg = (EventReg)e.getValue();
-                cancelLease(eReg.lease);
+                sdm.cancelLease(eReg.lease);
             }//end loop
             /* Un-export the remote listener for events from lookups. */
 	    try {
@@ -1692,7 +1907,7 @@ public class ServiceDiscoveryManager {
 	    if (ret.length == 0 )  return null;
             // Maths.abs(Integer.MIN_VALUE) = -ve, so to avoid random 
             // hard to debug bugs, this has been changed.
-	    int rand = random.nextInt(ret.length);
+	    int rand = sdm.random.nextInt(ret.length);
 	    return ret[rand];
 	}//end LookupCacheImpl.lookup
 
@@ -1705,7 +1920,7 @@ public class ServiceDiscoveryManager {
 	    ServiceItem[] sa = getServiceItems(myFilter);
 	    int len = sa.length;
 	    if (len == 0 )  return new ServiceItem[0];
-	    int rand = random.nextInt(Integer.MAX_VALUE) % len;
+	    int rand = sdm.random.nextInt(Integer.MAX_VALUE) % len;
 	    for(int i=0; i<len; i++) {
 		items.add(sa[(i+rand) % len ]);
 		if(items.size() == maxMatches)
@@ -1724,11 +1939,10 @@ public class ServiceDiscoveryManager {
              * exists, and it's not already discarded, then queue a task
              * to discard the given serviceReference.
              */
-            boolean discardIt = false;
-	    Iterator iter = getServiceIdMapEntrySetIterator();
+	    Iterator<Map.Entry<ServiceID,ServiceItemReg>> iter = getServiceIdMapEntrySetIterator();
 	    while(iter.hasNext()) {
-		Map.Entry e = (Map.Entry)iter.next();
-		ServiceItemReg itemReg = (ServiceItemReg)e.getValue();
+		Map.Entry<ServiceID,ServiceItemReg> e = iter.next();
+		ServiceItemReg itemReg = e.getValue();
 		ServiceItem filteredItem;
 		synchronized(itemReg) {
 		    filteredItem = itemReg.filteredItem;
@@ -1736,22 +1950,19 @@ public class ServiceDiscoveryManager {
                     {
                         if( itemReg.isDiscarded() ) return;//already discarded
                         itemReg.setDiscarded(true);
-                        discardIt = true;
+                        ServiceID sid = e.getKey();
+                        Future f = serviceDiscardTimerTaskMgr.submit
+                                         ( new ServiceDiscardTimerTask(this, sid) );
+                        serviceDiscardFutures.put(sid, f);
+                        taskQueue.submit(new DiscardServiceTask(filteredItem));
+                        return;
                     }//endif
 		}//end sync(itemReg)
-		if(discardIt) {
-		    ServiceID sid = (ServiceID)e.getKey();
-		    Future f = serviceDiscardTimerTaskMgr.submit
-                                     ( new ServiceDiscardTimerTask(sid) );
-                    serviceDiscardFutures.put(sid, f);
-                    cacheDependencyLinker.execute(new DiscardServiceTask(filteredItem));
-		    return;
-		}//endif
 	    }//end loop
 	}//end LookupCacheImpl.discard
 
 	/* Returns the iterator of entry set in the copy of the ServiceIdMap */
-	private Iterator getServiceIdMapEntrySetIterator() {
+	private Iterator<Map.Entry<ServiceID,ServiceItemReg>> getServiceIdMapEntrySetIterator() {
             return serviceIdMap.entrySet().iterator();
 	}//end LookupCacheImpl.getServiceIdMapEntrySetIterator
 
@@ -1826,7 +2037,8 @@ public class ServiceDiscoveryManager {
 		sItemListeners.add(listener);
 	    }
 	    ServiceItem[] items = getServiceItems(null);
-	    for(int i=0; i<items.length; i++) {
+            int l = items.length;
+	    for(int i=0; i<l; i++) {
                 addServiceNotify(items[i],listener);
             }//end loop
 	}//end LookupCacheImpl.addListener
@@ -1847,8 +2059,11 @@ public class ServiceDiscoveryManager {
 	 */
 	public void addProxyReg(ProxyReg reg) {
             RegisterListenerTask treg = 
-                    new RegisterListenerTask(reg, taskSeqN.getAndIncrement());
-            cacheDependencyLinker.execute(treg);
+                    new RegisterListenerTask(
+                            reg, 
+                            taskSeqN.getAndIncrement(), 
+                            this);
+            taskQueue.submit(treg);
 	}//end LookupCacheImpl.addProxyReg
 
 	/** Remove a ProxyReg from the lookupCache. Called by DiscMgrListener's
@@ -1861,22 +2076,23 @@ public class ServiceDiscoveryManager {
             EventReg eReg = eventRegMap.get(reg);
             if(eReg != null) {
                 try {
-                    leaseRenewalMgr.remove(eReg.lease);
+                    sdm.leaseRenewalMgr.remove(eReg.lease);
                 } catch(Exception e) {
                     logger.log(Level.FINER,
                                "exception occurred while removing an "
                                +"event registration lease", e);
                 }
             }//endif
-            t = new ProxyRegDropTask(reg, taskSeqN.getAndIncrement());
-            cacheDependencyLinker.execute(t);
+            t = new ProxyRegDropTask(reg, eReg, taskSeqN.getAndIncrement(), this);
+            taskQueue.removeUselessTask(reg);
+            taskQueue.submit(t);
 	}//end LookupCacheImpl.removeProxyReg
 
 	/* Throws IllegalStateException if this lookup cache has been
          * terminated
          */
 	private void checkCacheTerminated() {
-	    checkTerminated();
+	    sdm.checkTerminated();
             if(bCacheTerminated) {
                 throw new IllegalStateException
                                     ("this lookup cache was terminated");
@@ -1943,26 +2159,28 @@ public class ServiceDiscoveryManager {
                     break;
                 }//endif
             }//end loop
-            if(reg == null) return;//event arrived before eventReg in map
+            if(reg == null || eReg == null) return;//event arrived before eventReg in map
 
-            /* Next, look for gaps in the event sequence. */
+            /* Next, look for any gaps in the event sequence. */
             synchronized (eReg){
-                long prevSeqNo = eReg.seqNo;
-                eReg.seqNo = seqNo;
-                CacheTask t;
-                if(seqNo == (prevSeqNo+1)) {//no gap, handle current event
-                    t = new NotifyEventTask
-                                    (reg, sid, item, transition, taskSeqN.getAndIncrement());
-                    if (eReg.lookupsPending > 0) {
-                        eReg.pending.add(t);
-                        return;
-                    }
-                } else if (eReg.lookupsPending > 1) {
+                long delta = eReg.updateSeqNo(seqNo);
+                CacheTask t = null;
+                if(delta == 1) {//no gap, handle current event
+                    t = new LookupCacheImpl.NotifyEventTask(
+                                        this,
+                                        reg,
+                                        sid, 
+                                        item, 
+                                        transition,
+                                        taskSeqN.getAndIncrement()
+                                );
+                    if (eReg.addIfLookupsPending(t)) return;
+                } else if (eReg.getLookupsPending() > 1) {
                     // gap in event sequence, but snapshot already pending
                     return;
                 } else {//gap in event sequence, request snapshot
-                    eReg.lookupsPending++;
-                    t = new LookupTask(reg, taskSeqN.getAndIncrement(), eReg);
+                        eReg.incrementLookupsPending();
+                        t = new LookupTask(reg, taskSeqN.getAndIncrement(), eReg, this);
                     if( logger.isLoggable(Levels.HANDLED) ) {
                         StringBuilder sb = new StringBuilder(300);
                                     sb.append("notifyServiceMap - GAP in event sequence ")
@@ -1971,7 +2189,7 @@ public class ServiceDiscoveryManager {
                                      .append("serviceID={2}], ")
                                      .append("[eventSource={3}, ")
                                      .append("eventID={4,number,#}, ")
-                                     .append("oldSeqN={5,number,#}, ")
+                                     .append("delta={5,number,#}, ")
                                      .append("newSeqN={6,number,#}]");
                         String msg = sb.toString();
                         Object[] params = new Object[] { reg !=null ? reg.getProxy() : "",
@@ -1979,24 +2197,18 @@ public class ServiceDiscoveryManager {
                                                          sid,
                                                          eventSource,
                                                          Long.valueOf(eventID),
-                                                         Long.valueOf(prevSeqNo),
+                                                         Long.valueOf(delta),
                                                          Long.valueOf(seqNo) };
                         logger.log(Levels.HANDLED, msg, params);
                     }//endif
                 }//endif
-                cacheDependencyLinker.execute(t);
+                if (t != null) taskQueue.submit(t);
             } //end sync(eReg)
 	}//end LookupCacheImpl.notifyServiceMap
 
-	/** Removes an entry from the serviceIdMap, and sends a notification.*/
-	private void removeServiceIdMap(ServiceID sid, ServiceItem item) {
-            removeServiceIdMapSendNoEvent(sid);
-	    removeServiceNotify(item);
-	}//end LookupCacheImpl.removeServiceIdMap
-
 	/** Removes an entry in the serviceIdMap, but sends no notification. */
-	private void removeServiceIdMapSendNoEvent(ServiceID sid) {
-            serviceIdMap.remove(sid);
+	private boolean removeServiceIdMapSendNoEvent(ServiceID sid, ServiceItemReg itemReg) {
+                return serviceIdMap.remove(sid, itemReg);
 	}//end LookupCacheImpl.removeServiceIdMapSendNoEvent
 
 	/** Returns the element in the given items array having the given
@@ -2103,6 +2315,8 @@ public class ServiceDiscoveryManager {
 	    ServiceItem oldItem;
 	    ServiceItem oldFilteredItem;
 	    boolean itemRegIsDiscarded;
+            boolean attrsChanged = false;
+            boolean versionChanged = false;
 	    synchronized(itemReg) {
 		itemRegIsDiscarded = itemReg.isDiscarded();
                 if(!itemReg.addProxy(proxy, newItem)) { // not tracking
@@ -2116,34 +2330,31 @@ public class ServiceDiscoveryManager {
                     itemReg.filteredItem = null;//so filter will be retried
                     if(matchMatchEvent) return;
                 }//endif
-	    }//end sync(itemReg)
-            /* For an explanation of the logic of the following if-else-block,
-             * refer to the method description above.
-             */
-            boolean attrsChanged = false;
-            boolean versionChanged = false;
-	    if( matchMatchEvent || sameVersion(newItem,oldItem) ) {
-		if(itemRegIsDiscarded) return;
-                /* Same version, determine if the attributes have changed.
-                 * But first, replace the new service proxy with the old
-                 * service proxy so the client always uses the old proxy
-                 * (at least, until the version is changed).
+	    
+                /* For an explanation of the logic of the following if-else-block,
+                 * refer to the method description above.
                  */
-                synchronized(itemReg){
+
+                if( matchMatchEvent || sameVersion(newItem,oldItem) ) {
+                    if(itemRegIsDiscarded) return;
+                    /* Same version, determine if the attributes have changed.
+                     * But first, replace the new service proxy with the old
+                     * service proxy so the client always uses the old proxy
+                     * (at least, until the version is changed).
+                     */
                     newItem.service = oldItem.service;
                     /* Now compare attributes */
-                    attrsChanged = 
-                            !LookupAttributes.equal(newItem.attributeSets,
-                                                    oldItem.attributeSets);
-                }
-                
-                if(!attrsChanged) return;//no change, no need to filter
-            } else {//(!matchMatchEvent && !same version) ==> re-registration
-                versionChanged = true;
-            }//endif
+                    attrsChanged = !LookupAttributes.equal(newItem.attributeSets,
+                                                           oldItem.attributeSets);
+
+                    if(!attrsChanged) return;//no change, no need to filter
+                } else {//(!matchMatchEvent && !same version) ==> re-registration
+                    versionChanged = true;
+                }//endif
+            }//end sync(itemReg)
             /* Now apply the filter, and send events if appropriate */
             ServiceItem newFilteredItem =
-		filterMaybeDiscard(newItem, proxy, !itemRegIsDiscarded);
+		filterMaybeDiscard(newItem, !itemRegIsDiscarded);
             if(newFilteredItem != null) {
                 /* Passed the filter, okay to send event(s). */
                 if(attrsChanged) changeServiceNotify(newFilteredItem,
@@ -2217,15 +2428,14 @@ public class ServiceDiscoveryManager {
 				     ServiceItem item,
 				     int action)
 	{
-	    ArrayList notifies;
+	    List<ServiceDiscoveryListener> notifies;
 	    synchronized(sItemListeners) {
 		if(sItemListeners.isEmpty()) return;
-	        notifies = (ArrayList)sItemListeners.clone();
+	        notifies =  new ArrayList<ServiceDiscoveryListener>(sItemListeners);
 	    }
-	    Iterator iter = notifies.iterator();
+	    Iterator<ServiceDiscoveryListener> iter = notifies.iterator();
 	    while (iter.hasNext()) {
-		ServiceDiscoveryListener sl
-                                      = (ServiceDiscoveryListener)iter.next();
+		ServiceDiscoveryListener sl = iter.next();
                 serviceNotifyDo(oldItem,item,action,sl);
 	    }//end loop
 	}//end LookupCacheImpl.serviceNotifyDo
@@ -2255,7 +2465,7 @@ public class ServiceDiscoveryManager {
                                             new BasicILFactory(),
                                             false, false);
                 lookupListenerExporter =
-                  (Exporter)thisConfig.getEntry(COMPONENT_NAME,
+                      sdm.thisConfig.getEntry(COMPONENT_NAME,
                                                 "eventListenerExporter",
                                                 Exporter.class,
                                                 defaultExporter );
@@ -2272,7 +2482,7 @@ public class ServiceDiscoveryManager {
              * various tasks executed by this instance of the lookup cache.
              */
             try {
-                cacheTaskMgr = thisConfig.getEntry(COMPONENT_NAME,
+                cacheTaskMgr = sdm.thisConfig.getEntry(COMPONENT_NAME,
                                                         "cacheExecutorService",
                                                         ExecutorService.class);
             } catch(ConfigurationException e) { /* use default */
@@ -2282,7 +2492,7 @@ public class ServiceDiscoveryManager {
                                 10, /* Ignored */
                                 15, 
                                 TimeUnit.SECONDS,
-                                new PriorityBlockingQueue(100), /* Unbounded */
+                                new LinkedBlockingQueue(), /* Unbounded */
                                 new NamedThreadFactory(
                                         "SDM lookup cache",
                                         false
@@ -2290,7 +2500,26 @@ public class ServiceDiscoveryManager {
                         );
                 
             }
-            cacheDependencyLinker = new DependencyLinker(cacheTaskMgr);
+            cacheTaskMgr = new ExtensibleExecutorService
+            (
+                cacheTaskMgr, 
+                new RunnableFutureFactory(){
+
+                    @Override
+                    public <T> RunnableFuture<T> newTaskFor(Runnable r, T value) {
+                        if (r instanceof ObservableFutureTask) return (RunnableFuture<T>) r;
+                        return new CacheTaskWrapper<T>(r, value);
+                    }
+
+                    @Override
+                    public <T> RunnableFuture<T> newTaskFor(Callable<T> c) {
+                        if (c instanceof ObservableFutureTask) return (RunnableFuture<T>) c;
+                        return new CacheTaskWrapper<T>(c);
+                    }
+                }
+            );
+            
+            taskQueue = new CacheTaskQueue(cacheTaskMgr);
             /* Get a special-purpose task manager for this cache from the
              * configuration. That task manager will be used to manage the
              * various instances of the special-purpose task, executed by
@@ -2299,7 +2528,7 @@ public class ServiceDiscoveryManager {
              */
             try {
                 serviceDiscardTimerTaskMgr
-                    = thisConfig.getEntry(COMPONENT_NAME,
+                    = sdm.thisConfig.getEntry(COMPONENT_NAME,
                                            "discardExecutorService",
                                            ExecutorService.class);
             } catch(ConfigurationException e) { /* use default */
@@ -2319,7 +2548,7 @@ public class ServiceDiscoveryManager {
             }
             // Moved here from constructor to avoid publishing this reference
             lookupListenerProxy = lookupListener.export();
-            Iterator<ProxyReg> it = proxyRegSet.iterator();
+            Iterator<ProxyReg> it = sdm.proxyRegSet.iterator();
             while (it.hasNext()){
                 addProxyReg(it.next());
             }
@@ -2351,9 +2580,7 @@ public class ServiceDiscoveryManager {
          *  <code>ServiceItemReg</code> element of the
          *  <code>serviceIdMap</code> is left unchanged.
 	 */
-  	private ServiceItem filterMaybeDiscard(ServiceItem item,
-                                               ServiceRegistrar proxy,
-				               boolean sendEvent)
+  	private ServiceItem filterMaybeDiscard(ServiceItem item, boolean sendEvent)
         {
             if( (item == null) || (item.service == null) ) return null;
             if(filter == null) {
@@ -2369,21 +2596,22 @@ public class ServiceDiscoveryManager {
             if(!pass) {
                 ServiceID srvcID = item.serviceID;
                 ServiceItemReg itemReg = serviceIdMap.get(srvcID);
+                boolean notify = false;
+                boolean itemRegIsDiscarded = false;
+                ServiceItem oldFilteredItem = null;
                 if(itemReg != null) {
-                    if(sendEvent) {
-                        ServiceItem oldFilteredItem;
-                        synchronized(itemReg) {
+                    synchronized (itemReg){
+                        if(sendEvent) {
                             oldFilteredItem = itemReg.filteredItem;
-                        }//end sync(itemReg)
-                        removeServiceIdMap(srvcID, oldFilteredItem);
-                    } else {
-			boolean itemRegIsDiscarded;
-                        synchronized(itemReg) {
-			    itemRegIsDiscarded = itemReg.isDiscarded();
-                        }//end sync(itemReg)
-                        removeServiceIdMapSendNoEvent(srvcID);
-			if(itemRegIsDiscarded) cancelDiscardTask(srvcID);
-                    }//endif
+                            notify = removeServiceIdMapSendNoEvent(srvcID, itemReg);
+                            itemReg.setDiscarded(notify);
+                        } else {
+                            itemRegIsDiscarded = itemReg.isDiscarded();
+                            removeServiceIdMapSendNoEvent(srvcID, itemReg);
+                        }//endif
+                    }// end sync(itemReg)
+                    if (notify) removeServiceNotify(oldFilteredItem);
+                    if(itemRegIsDiscarded) cancelDiscardTask(srvcID);
                 }//endif
                 return null;
             }//endif(fail)
@@ -2393,7 +2621,7 @@ public class ServiceDiscoveryManager {
                 return filteredItem;
             }//endif(pass)
             /* Handle filter indefinite */
-            discardRetryLater(item, proxy, sendEvent);
+            discardRetryLater(item, sendEvent);
             return null;
         }//end LookupCacheImpl.filterMaybeDiscard
 
@@ -2432,9 +2660,7 @@ public class ServiceDiscoveryManager {
          *  contain a <code>ServiceItemReg</code> corresponding to the
          *  given <code>ServiceItem</code>, then this method simply returns.
 	 */
-  	private void discardRetryLater(ServiceItem item,
-                                       ServiceRegistrar proxy,
-                                       boolean sendEvent) {
+  	private void discardRetryLater(ServiceItem item, boolean sendEvent) {
             ServiceItemReg itemReg = serviceIdMap.get(item.serviceID);
             if(itemReg == null) return;
             ServiceItem oldFilteredItem;
@@ -2453,7 +2679,7 @@ public class ServiceDiscoveryManager {
                 itemReg.setDiscarded(true);
             }//end sync(itemReg)
             Future f = serviceDiscardTimerTaskMgr.submit
-                              ( new ServiceDiscardTimerTask(item.serviceID) );
+                              ( new ServiceDiscardTimerTask(this, item.serviceID) );
             serviceDiscardFutures.put(item.serviceID, f);
             if(sendEvent)  removeServiceNotify(oldFilteredItem);
         }//end LookupCacheImpl.discardRetryLater
@@ -2464,34 +2690,35 @@ public class ServiceDiscoveryManager {
          *  and wakes up the <code>ServiceDiscardTimerTask</code> if the given
          *  <code>item</code> is discarded; otherwise, sends a removed event.
 	 */
-  	private void handleMatchNoMatch(ServiceRegistrar proxy,
-                                        ServiceID srvcID,
-                                        ServiceItem item)
+  	private void handleMatchNoMatch(ServiceRegistrar proxy, ServiceID srvcID)
         {
             ServiceItemReg itemReg = serviceIdMap.get(srvcID);
             if(itemReg != null) {
 		ServiceItem newItem;
-                boolean itemRegHasNoProxys;
-                boolean itemRegIsDiscarded;
                 ServiceItem filteredItem;
+                boolean notify = false;
+                ServiceRegistrar itemRegProxy = null;
                 synchronized(itemReg) {
                     newItem = itemReg.removeProxy(proxy);
-                    itemRegHasNoProxys = itemReg.hasNoProxys();
-                    itemRegIsDiscarded = itemReg.isDiscarded();
                     filteredItem = itemReg.filteredItem;
-                }//end sync(itemReg)
-		if(newItem != null) {
-		    itemMatchMatchChange(itemReg.proxy, newItem, itemReg,
-					 false);
-                } else if(itemRegHasNoProxys) {
-                    if(itemRegIsDiscarded) {
-                        /* Remove item from map and wake up the discard task */
-                        removeServiceIdMapSendNoEvent(srvcID);
-			cancelDiscardTask(srvcID);
-                    } else {//remove item from map and send removed event
-                        removeServiceIdMap(srvcID, filteredItem);
+                    if(newItem != null) {
+                        itemRegProxy = itemReg.proxy;
+                    } else if(itemReg.hasNoProxys()) {
+                        if(itemReg.isDiscarded()) {
+                            /* Remove item from map and wake up the discard task */
+                            removeServiceIdMapSendNoEvent(srvcID, itemReg);
+                            cancelDiscardTask(srvcID);
+                        } else {//remove item from map and send removed event
+                            notify = removeServiceIdMapSendNoEvent(srvcID, itemReg);
+                            itemReg.setDiscarded(notify);
+                        }//endif
                     }//endif
-                }//endif
+                }//end sync(itemReg)
+                if (itemRegProxy != null) {
+                    itemMatchMatchChange(itemRegProxy, newItem, itemReg, false);
+                } else if (notify){
+                    removeServiceNotify(filteredItem);
+                }
             }//endif
         }//end LookupCacheImpl.handleMatchNoMatch
 
@@ -2530,11 +2757,12 @@ public class ServiceDiscoveryManager {
     private final List<LookupCache> caches;
 
     /* Flag to indicate if the ServiceDiscoveryManager has been terminated. */
-    private volatile boolean bTerminated = false;
+    private boolean bTerminated = false; //sync on this
     
+    private final Thread terminatorThread;
     private final LookupCacheTerminator terminator;
-    
-    private volatile boolean started = false;
+    /* Flag to indicate LookupCacheTerminator has been started */
+    private boolean started = false; // sync on terminatorThread
     /* Object used to obtain the configuration items for this utility. */
     private final Configuration thisConfig;
     /* Preparer for the proxies to the lookup services that are discovered
@@ -2899,6 +3127,8 @@ public class ServiceDiscoveryManager {
         discMgrListener = new DiscMgrListener();
         discMgr.addDiscoveryListener(discMgrListener);
         terminator = new LookupCacheTerminator();
+        terminatorThread = new Thread(terminator, "SDM lookup cache terminator");
+        terminatorThread.setDaemon(false);
     }
 
     /** Sends discarded event to each listener waiting for discarded lookups.*/
@@ -3307,7 +3537,7 @@ public class ServiceDiscoveryManager {
             discMgr.removeDiscoveryListener(discMgrListener);
             if(discMgrInternal) discMgr.terminate();
 	}//end sync
-        terminator.interrupt();
+        terminatorThread.interrupt();
         /* Terminate all caches: cancel event leases, un-export listeners */
         List<LookupCache> terminate;
         synchronized (caches){
@@ -3695,9 +3925,13 @@ public class ServiceDiscoveryManager {
 					 long leaseDuration)
                                                        throws RemoteException
     {
-        if (!started) terminator.start();
+        /* Atomic start of terminator */
+        synchronized (terminatorThread){
+            if (!started) terminatorThread.start();
+            started = true;
+        }
 	if(tmpl == null) tmpl = new ServiceTemplate(null, null, null);
-	LookupCacheImpl cache = new LookupCacheImpl(tmpl, filter, listener, leaseDuration);
+	LookupCacheImpl cache = new LookupCacheImpl(tmpl, filter, listener, leaseDuration, this);
         cache.initCache();
         synchronized (caches){
             caches.add(cache);
@@ -3841,7 +4075,7 @@ public class ServiceDiscoveryManager {
     /** Throws an IllegalStateException if the current instance of the
      *  ServiceDiscoveryManager has been terminated.
      */
-    private void checkTerminated() {
+    private synchronized void checkTerminated() {
         if(bTerminated) {
             throw new IllegalStateException
                              ("service discovery manager was terminated");

Modified: river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/api/util/FutureObserver.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/api/util/FutureObserver.java?rev=1563596&r1=1563595&r2=1563596&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/api/util/FutureObserver.java (original)
+++ river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/api/util/FutureObserver.java Sun Feb  2 12:31:09 2014
@@ -39,4 +39,21 @@ public interface FutureObserver<T> exten
         public boolean addObserver(FutureObserver<T> observer);
     }
     
+    public interface Subscriber<T> {
+    
+        /**
+         * Prior to completion, if a Future sets off additional
+         * ObservableFuture tasks, these may be recommended for 
+         * Subscribers to Observe.
+         * 
+         * @param e
+         */
+        public void reccommendedViewing(ObservableFuture<T> e);
+    }
+    
+    public interface Subscribeable<T>{
+        
+        public void subscribe(Subscriber<T> subscriber);
+    }
+    
 }

Added: river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/DependencyLinker.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/DependencyLinker.java?rev=1563596&view=auto
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/DependencyLinker.java (added)
+++ river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/DependencyLinker.java Sun Feb  2 12:31:09 2014
@@ -0,0 +1,47 @@
+/*
+ * To change this license header, choose License Headers in Project Properties.
+ * To change this template file, choose Tools | Templates
+ * and open the template in the editor.
+ */
+
+package org.apache.river.impl.thread;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import org.apache.river.api.util.FutureObserver;
+
+/**
+ *
+ * @author peter
+ */
+public class DependencyLinker implements FutureObserver {
+    private final ExecutorService executor;
+    private final List<ObservableFuture> tasks;
+    private final FutureTask dependant;
+
+    public DependencyLinker(ExecutorService ex, List<ObservableFuture> tasks, FutureTask dep) {
+        executor = ex;
+        this.tasks = new ArrayList<ObservableFuture>(tasks);
+        dependant = dep;
+    }
+
+    public synchronized void register() {
+        Iterator<ObservableFuture> it = tasks.iterator();
+        while (it.hasNext()) {
+            it.next().addObserver(this);
+        }
+    }
+
+    @Override
+    public synchronized void futureCompleted(Future e) {
+        tasks.remove(e);
+        if (tasks.isEmpty()) {
+            executor.submit(dependant);
+        }
+    }
+    
+}

Modified: river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/ObservableFutureTask.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/ObservableFutureTask.java?rev=1563596&r1=1563595&r2=1563596&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/ObservableFutureTask.java (original)
+++ river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/ObservableFutureTask.java Sun Feb  2 12:31:09 2014
@@ -24,7 +24,9 @@ import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.FutureTask;
 import org.apache.river.api.util.FutureObserver;
+import org.apache.river.api.util.FutureObserver.Subscribeable;
 import org.apache.river.api.util.FutureObserver.ObservableFuture;
+import org.apache.river.api.util.FutureObserver.Subscriber;
 
 /**
  *
@@ -37,13 +39,20 @@ public class ObservableFutureTask<T> ext
     public ObservableFutureTask(Callable<T> callable) {
         super(callable);
         listeners = new LinkedList<FutureObserver<T>>();
+        if (callable instanceof Subscribeable){
+            ((Subscribeable<T>) callable).subscribe(new FutureSubscriber(listeners));
+        }
     }
     
     public ObservableFutureTask(Runnable r, T result){
         super(r,result);
         listeners = new LinkedList<FutureObserver<T>>();
+        if (r instanceof Subscribeable){
+            ((Subscribeable<T>) r).subscribe(new FutureSubscriber(listeners));
+        }
     }
     
+    @Override
     protected void done() {
         done = true;
         synchronized (listeners){
@@ -65,4 +74,26 @@ public class ObservableFutureTask<T> ext
             return listeners.add(l);
         }
     }
+    
+    private static final class FutureSubscriber<T> implements Subscriber<T> {
+        /* Shared list of subscribers */
+        public final List<FutureObserver<T>> listeners;
+        
+        FutureSubscriber(List<FutureObserver<T>> listeners){
+            this.listeners = listeners;
+        }
+        @Override
+        public void reccommendedViewing(ObservableFuture<T> e) {
+            synchronized (listeners){
+                Iterator<FutureObserver<T>> it = listeners.iterator();
+                while (it.hasNext()){
+                    FutureObserver<T> future = it.next();
+                    if (future instanceof Subscriber){
+                        Subscriber<T> subscriber = (Subscriber<T>) future;
+                        subscriber.reccommendedViewing(e);
+                    }
+                }
+            }
+        }
+    }
 }