You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@river.apache.org by pe...@apache.org on 2010/09/12 10:35:28 UTC

svn commit: r996263 - /incubator/river/jtsk/skunk/pepe/src/net/jini/lookup/ServiceDiscoveryManager.java

Author: peter_firmstone
Date: Sun Sep 12 08:35:27 2010
New Revision: 996263

URL: http://svn.apache.org/viewvc?rev=996263&view=rev
Log:
River-345 patch from Bob Scheifler

Modified:
    incubator/river/jtsk/skunk/pepe/src/net/jini/lookup/ServiceDiscoveryManager.java

Modified: incubator/river/jtsk/skunk/pepe/src/net/jini/lookup/ServiceDiscoveryManager.java
URL: http://svn.apache.org/viewvc/incubator/river/jtsk/skunk/pepe/src/net/jini/lookup/ServiceDiscoveryManager.java?rev=996263&r1=996262&r2=996263&view=diff
==============================================================================
--- incubator/river/jtsk/skunk/pepe/src/net/jini/lookup/ServiceDiscoveryManager.java (original)
+++ incubator/river/jtsk/skunk/pepe/src/net/jini/lookup/ServiceDiscoveryManager.java Sun Sep 12 08:35:27 2010
@@ -741,8 +741,10 @@ public class ServiceDiscoveryManager {
      * track where the ServiceItem comes from.
      */
     private final static class ServiceItemReg  {
-	/* Stores ServiceRegistrars that has the ServiceItem registered. */
-	private final ArrayList proxys = new ArrayList(1);
+	/* Maps ServiceRegistrars to their latest registered item */
+	private final Map items = new HashMap();
+	/* The ServiceRegistrar currently being used to track changes */
+	private ServiceRegistrar proxy;
 	/* Flag that indicates that the ServiceItem has been discarded. */
 	private boolean bDiscarded = false;
         /* The discovered service, prior to filtering. */
@@ -753,28 +755,42 @@ public class ServiceDiscoveryManager {
          * lookup service proxy.
          */
 	public ServiceItemReg(ServiceRegistrar proxy, ServiceItem item) {
-	    addProxy(proxy);
+	    this.proxy = proxy;
+	    addProxy(proxy, item);
 	    this.item = item;
 	}
-	/* Adds the given proxy to the 'id-to-itemReg' map. This method is
+	/* Adds the given proxy to the 'proxy-to-item' map. This method is
          * called from this class' constructor, LookupTask, NotifyEventTask,
-         * and ProxyRegDropTask.
+         * and ProxyRegDropTask.  Returns true if the proxy is being used
+	 * to track changes, false otherwise.
          */
-	public void addProxy(ServiceRegistrar proxy) {
-	    if(!proxys.contains(proxy))  proxys.add(proxy);
+	public boolean addProxy(ServiceRegistrar proxy, ServiceItem item) {
+	    items.put(proxy, item);
+	    return proxy.equals(this.proxy);
 	}
-	/* Removes the given proxy from the 'id-to-itemReg' map. This method
-         * is called from NotifyEventTask and ProxyRegDropTask.
-         */
-	public void removeProxy(ServiceRegistrar proxy) {
-	    int index = proxys.indexOf(proxy);
-	    if(index != -1)  proxys.remove(index);
+	/* Removes the given proxy from the 'proxy-to-item' map. This method
+         * is called from NotifyEventTask and ProxyRegDropTask.  If this proxy
+	 * was being used to track changes, then pick a new one and return
+	 * its current item, else return null.
+         */
+	public ServiceItem removeProxy(ServiceRegistrar proxy) {
+	    items.remove(proxy);
+	    if (proxy.equals(this.proxy)) {
+		if (items.isEmpty()) {
+		    this.proxy = null;
+		} else {
+		    Map.Entry ent = (Map.Entry) items.entrySet().iterator().next();
+		    this.proxy = (ServiceRegistrar) ent.getKey();
+		    return (ServiceItem) ent.getValue();
+		}//endif
+	    }//endif
+	    return null;
 	}
-	/* Determines if the 'id-to-itemReg' map contains any mappings.
+	/* Determines if the 'proxy-to-item' map contains any mappings.
          * This method is called from NotifyEventTask and ProxyRegDropTask.
          */
 	public boolean hasNoProxys() {
-	    return proxys.isEmpty();
+	    return items.isEmpty();
 	}
 	/* Marks the ServiceItem as discarded. */ 
 	public void setDiscarded(boolean b) {
@@ -1076,33 +1092,18 @@ public class ServiceDiscoveryManager {
             }//end run
 	}//end class LookupCacheImpl.ProxyRegDropTask
 
-	/** Task class used to asynchronously discard the given service. */
+	/** Task class used to asynchronously notify service discard. */
         private final class DiscardServiceTask extends CacheTask {
-	    private Object service;
-                public DiscardServiceTask(Object service) {
+	    private final ServiceItem item;
+                public DiscardServiceTask(ServiceItem item) {
                 super(null, 0);
-		this.service = service;
+		this.item = item;
 	    }
 
             public void run() {
                 logger.finest("ServiceDiscoveryManager - DiscardServiceTask "
                               +"started");
-		Iterator iter = getServiceIdMapEntrySetIterator();
-		while(iter.hasNext()) {
-		    Map.Entry e = (Map.Entry)iter.next();
-		    ServiceItemReg itemReg = (ServiceItemReg)e.getValue();
-                    ServiceItem item;
-                    synchronized(itemReg) {
-                        item = itemReg.filteredItem;
-                    }//end sync(itemReg)
-		    if( (item.service).equals(service) ) {
-			ServiceID sid = (ServiceID)e.getKey();
-                        removeServiceNotify(item);
-                        serviceDiscardTimerTaskMgr.add
-                                         ( new ServiceDiscardTimerTask(sid) );
-			return;
-		    }//endif
-		}//end loop
+		removeServiceNotify(item);
                 logger.finest("ServiceDiscoveryManager - DiscardServiceTask "
                               +"completed");
             }//end run
@@ -1229,9 +1230,11 @@ public class ServiceDiscoveryManager {
          */
         private final class ServiceDiscardTimerTask implements TaskManager.Task
         {
-            private ServiceID serviceID;
+            private final ServiceID serviceID;
+	    private final long endTime;
             public ServiceDiscardTimerTask(ServiceID serviceID) {
                 this.serviceID = serviceID;
+                this.endTime = discardWait+System.currentTimeMillis();
             }//end constructor
             public void run(){
                 logger.finest("ServiceDiscoveryManager - "
@@ -1247,9 +1250,7 @@ public class ServiceDiscoveryManager {
                 synchronized(serviceIdMap) {
                     if(!serviceIdMap.containsKey(serviceID))  return;
                 }//end sync(serviceIdMap)
-                /* Default to twice the (common) 5 minute max lease duration */
-                long curDur = discardWait;
-                long endTime = curDur+System.currentTimeMillis();
+                long curDur = endTime-System.currentTimeMillis();
                 synchronized(serviceDiscardMutex) {
                     /* Wait until interrupted or time expires */
                     while(curDur > 0) {
@@ -1306,6 +1307,7 @@ public class ServiceDiscoveryManager {
                     ServiceItem item = null;
                     ServiceItem filteredItem = null;
                     synchronized(itemReg) {
+			if(!itemReg.isDiscarded()) return;
                         if(itemReg.filteredItem == null) {
                             item = new ServiceItem
                                               ( (itemReg.item).serviceID,
@@ -1412,15 +1414,11 @@ public class ServiceDiscoveryManager {
             public void run() {
                 logger.finest("ServiceDiscoveryManager - NewOldServiceTask "
                               +"started");
+		boolean changed = false;
                 ServiceItemReg itemReg;
                 synchronized(serviceIdMap) {
                     itemReg = (ServiceItemReg)serviceIdMap.get(thisTaskSid);
-                }//end sync(serviceIdMap)
-                if(itemReg != null) {//a. old, previously discovered item
-                    itemMatchMatchChange(reg.proxy, srvcItem,
-                                         itemReg, matchMatchEvent);
-                } else {//(itemReg == null) ==> b. newly discovered item
-                    synchronized(serviceIdMap) {
+		    if (itemReg == null) {
                         if( !eventRegMap.containsKey(reg) ) {
                             /* reg must have been discarded, simply return */
                             logger.finest("ServiceDiscoveryManager - "
@@ -1429,7 +1427,14 @@ public class ServiceDiscoveryManager {
                         }//endif
                         itemReg = new ServiceItemReg( reg.proxy, srvcItem );
                         serviceIdMap.put( thisTaskSid, itemReg );
-                    }//end sync(serviceIdMap)
+		    } else {
+			changed = true;
+		    }
+                }//end sync(serviceIdMap)
+                if(changed) {//a. old, previously discovered item
+                    itemMatchMatchChange(reg.proxy, srvcItem,
+                                         itemReg, matchMatchEvent);
+                } else {//b. newly discovered item
                     ServiceItem newFilteredItem =
                                   filterMaybeDiscard(srvcItem,reg.proxy,false);
                     if(newFilteredItem != null) {
@@ -1486,14 +1491,21 @@ public class ServiceDiscoveryManager {
             public void run() {
                 logger.finest("ServiceDiscoveryManager - UnmapProxyTask "
                               +"started");
-                ServiceItem item = null;
+		ServiceRegistrar proxy = null;
+                ServiceItem item;
                 synchronized(itemReg) {
-                    itemReg.removeProxy(reg.proxy);//disassociate the LUS
-                    if( itemReg.hasNoProxys() ) {//no more LUSs, remove map
+                    item = itemReg.removeProxy(reg.proxy);//disassociate the LUS
+		    if (item != null) {// new LUS chosen to track changes
+			proxy = itemReg.proxy;
+		    } else if( itemReg.hasNoProxys() ) {//no more LUSs, remove from map
                         item = itemReg.filteredItem;
                     }//endif
                 }//end sync(itemReg)
-                if(item != null) removeServiceIdMap(thisTaskSid,item);
+		if(proxy != null) {
+		    itemMatchMatchChange(proxy, item, itemReg, false);
+		} else if(item != null) {
+		    removeServiceIdMap(thisTaskSid,item);
+		}//endif
                 logger.finest("ServiceDiscoveryManager - UnmapProxyTask "
                               +"completed");
             }//end run
@@ -1645,20 +1657,24 @@ public class ServiceDiscoveryManager {
 	    while(iter.hasNext()) {
 		Map.Entry e = (Map.Entry)iter.next();
 		ServiceItemReg itemReg = (ServiceItemReg)e.getValue();
+		ServiceItem filteredItem;
 		synchronized(itemReg) {
-                    if((itemReg.filteredItem.service).equals(serviceReference))
+		    filteredItem = itemReg.filteredItem;
+                    if((filteredItem.service).equals(serviceReference))
                     {
                         if( itemReg.isDiscarded() ) return;//already discarded
                         itemReg.setDiscarded(true);
                         discardIt = true;
-                        break;
                     }//endif
 		}//end sync(itemReg)
+		if(discardIt) {
+		    ServiceID sid = (ServiceID)e.getKey();
+		    serviceDiscardTimerTaskMgr.add
+                                     ( new ServiceDiscardTimerTask(sid) );
+		    cacheTaskMgr.add(new DiscardServiceTask(filteredItem));
+		    return;
+		}//endif
 	    }//end loop
-            if(discardIt) {
-                CacheTask t = new DiscardServiceTask(serviceReference);
-                cacheTaskMgr.add(t);
-            }//endif
 	}//end LookupCacheImpl.discard
 
 	/* Returns the iterator of entry set in the copy of the ServiceIdMap */
@@ -2063,15 +2079,20 @@ public class ServiceDiscoveryManager {
              */
 	    ServiceItem oldItem;
 	    ServiceItem oldFilteredItem;
+	    boolean itemRegIsDiscarded;
 	    synchronized(itemReg) {
+		itemRegIsDiscarded = itemReg.isDiscarded();
+                if(!itemReg.addProxy(proxy, newItem)) { // not tracking
+		    if(matchMatchEvent || !itemRegIsDiscarded) return;
+		    itemReg.proxy = proxy; // start tracking instead
+		}//endif
                 oldItem = itemReg.item;
                 oldFilteredItem = itemReg.filteredItem;
-		if(itemReg.isDiscarded()) {
+		if(itemRegIsDiscarded) {
                     itemReg.item = newItem;//capture changes for discard
                     itemReg.filteredItem = null;//so filter will be retried
-                    return;
+                    if(matchMatchEvent) return;
                 }//endif
-                itemReg.addProxy(proxy);
 	    }//end sync(itemReg)
             /* For an explanation of the logic of the following if-else-block,
              * refer to the method description above.
@@ -2079,6 +2100,7 @@ public class ServiceDiscoveryManager {
             boolean attrsChanged = false;
             boolean versionChanged = false;
             boolean isSameVersion = sameVersion(newItem,oldItem);
+		if(itemRegIsDiscarded) return;
             if( matchMatchEvent || isSameVersion ) {
                 /* Same version, determine if the attributes have changed.
                  * But first, replace the new service proxy with the old
@@ -2137,8 +2159,8 @@ public class ServiceDiscoveryManager {
                 logger.finer("itemMatchMatchChange - version changed");
             }//endif
             /* Now apply the filter, and send events if appropriate */
-            ServiceItem newFilteredItem = filterMaybeDiscard
-                                                       (newItem, proxy, true);
+            ServiceItem newFilteredItem =
+		filterMaybeDiscard(newItem, proxy, !itemRegIsDiscarded);
             if(newFilteredItem != null) {
                 /* Passed the filter, okay to send event(s). */
                 if(attrsChanged) {
@@ -2149,7 +2171,9 @@ public class ServiceDiscoveryManager {
 
                 if(versionChanged) {
                     logger.finer("itemMatchMatchChange - send serviceRemoved");
-                    removeServiceNotify(oldFilteredItem);
+                    if (!itemRegIsDiscarded) {
+			removeServiceNotify(oldFilteredItem);
+		    }//endif
                     logger.finer("itemMatchMatchChange - send serviceAdded");
                     addServiceNotify(newFilteredItem);
                 }//endif
@@ -2354,7 +2378,12 @@ public class ServiceDiscoveryManager {
                         }//end sync(itemReg)
                         removeServiceIdMap(srvcID, oldFilteredItem);
                     } else {
+			boolean itemRegIsDiscarded;
+                        synchronized(itemReg) {
+			    itemRegIsDiscarded = itemReg.isDiscarded();
+                        }//end sync(itemReg)
                         removeServiceIdMapSendNoEvent(srvcID);
+			if(itemRegIsDiscarded) cancelDiscardTask(srvcID);
                     }//endif
                 }//endif
                 return null;
@@ -2387,10 +2416,15 @@ public class ServiceDiscoveryManager {
                 itemReg = (ServiceItemReg)serviceIdMap.get(item.serviceID);
             }//end sync(serviceIdMap)
             if(itemReg == null)  return;
+	    boolean itemRegIsDiscarded;
             synchronized(itemReg) {
                 itemReg.item = item;
                 itemReg.filteredItem = filteredItem;
+		if(itemRegIsDiscarded = itemReg.isDiscarded()) {
+		    itemReg.setDiscarded(false);
+		}//endif
             }//end sync(itemReg)
+	    if(itemRegIsDiscarded) cancelDiscardTask(item.serviceID);
         }//end LookupCacheImpl.addFilteredItemToMap
 
 	/** Convenience method called by <code>filterMaybeDiscard</code>
@@ -2425,9 +2459,9 @@ public class ServiceDiscoveryManager {
                 }//endif
                 itemReg.setDiscarded(true);
             }//end sync(itemReg)
-            if(sendEvent)  removeServiceNotify(oldFilteredItem);
             serviceDiscardTimerTaskMgr.add
                               ( new ServiceDiscardTimerTask(item.serviceID) );
+            if(sendEvent)  removeServiceNotify(oldFilteredItem);
         }//end LookupCacheImpl.discardRetryLater
 
 	/** Convenience method called by <code>NotifyEventTask.run</code> (only
@@ -2445,25 +2479,27 @@ public class ServiceDiscoveryManager {
                 itemReg = (ServiceItemReg)serviceIdMap.get(srvcID);
             }//end sync(serviceIdMap)
             if(itemReg != null) {
+		ServiceItem newItem;
                 boolean itemRegHasNoProxys;
                 boolean itemRegIsDiscarded;
                 ServiceItem filteredItem;
                 synchronized(itemReg) {
-                    itemReg.removeProxy(proxy);
+                    newItem = itemReg.removeProxy(proxy);
                     itemRegHasNoProxys = itemReg.hasNoProxys();
                     itemRegIsDiscarded = itemReg.isDiscarded();
                     filteredItem = itemReg.filteredItem;
                 }//end sync(itemReg)
-                if(itemRegHasNoProxys) {
+                if(newItem != null) {
+ 		    itemMatchMatchChange(itemReg.proxy, newItem, itemReg,
+ 					 false);
+                 } else if(itemRegHasNoProxys) {
                     if(itemRegIsDiscarded) {
                         /* Remove item from map and wake up the discard task */
                         logger.finer("handleMatchNoMatch - "
                                      +"TRANSITION_MATCH_NOMATCH && service in "
                                      +"discard queue - send NO event");
                         removeServiceIdMapSendNoEvent(srvcID);
-                        synchronized(serviceDiscardMutex) {
-                            serviceDiscardMutex.notifyAll();
-                        }//end sync
+                        cancelDiscardTask(srvcID);
                     } else {//remove item from map and send removed event
                         logger.finer("handleMatchNoMatch - "
                                      +"TRANSITION_MATCH_NOMATCH - send "
@@ -2474,6 +2510,22 @@ public class ServiceDiscoveryManager {
             }//endif
         }//end LookupCacheImpl.handleMatchNoMatch
 
+	/** Wake up service discard task if running, else remove from mgr. */
+	private void cancelDiscardTask(ServiceID sid) {
+	    Iterator iter = serviceDiscardTimerTaskMgr.getPending().iterator();
+	    while (iter.hasNext()) {
+		ServiceDiscardTimerTask t =
+		    (ServiceDiscardTimerTask)iter.next();
+		if (sid.equals(t.serviceID)) {
+		    if(serviceDiscardTimerTaskMgr.removeIfPending(t)) return;
+		    break;
+		}//endif
+	    }//end loop
+	    synchronized(serviceDiscardMutex) {
+		serviceDiscardMutex.notifyAll();
+	    }//end sync
+	}//end LookupCacheImpl.cancelDiscardTask
+
     }//end class ServiceDiscoveryManager.LookupCacheImpl
     
     /* Name of this component; used in config entry retrieval and the logger.*/