You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@river.apache.org by pe...@apache.org on 2010/09/12 10:35:28 UTC
svn commit: r996263 -
/incubator/river/jtsk/skunk/pepe/src/net/jini/lookup/ServiceDiscoveryManager.java
Author: peter_firmstone
Date: Sun Sep 12 08:35:27 2010
New Revision: 996263
URL: http://svn.apache.org/viewvc?rev=996263&view=rev
Log:
River-345 patch from Bob Scheifler
Modified:
incubator/river/jtsk/skunk/pepe/src/net/jini/lookup/ServiceDiscoveryManager.java
Modified: incubator/river/jtsk/skunk/pepe/src/net/jini/lookup/ServiceDiscoveryManager.java
URL: http://svn.apache.org/viewvc/incubator/river/jtsk/skunk/pepe/src/net/jini/lookup/ServiceDiscoveryManager.java?rev=996263&r1=996262&r2=996263&view=diff
==============================================================================
--- incubator/river/jtsk/skunk/pepe/src/net/jini/lookup/ServiceDiscoveryManager.java (original)
+++ incubator/river/jtsk/skunk/pepe/src/net/jini/lookup/ServiceDiscoveryManager.java Sun Sep 12 08:35:27 2010
@@ -741,8 +741,10 @@ public class ServiceDiscoveryManager {
* track where the ServiceItem comes from.
*/
private final static class ServiceItemReg {
- /* Stores ServiceRegistrars that has the ServiceItem registered. */
- private final ArrayList proxys = new ArrayList(1);
+ /* Maps ServiceRegistrars to their latest registered item */
+ private final Map items = new HashMap();
+ /* The ServiceRegistrar currently being used to track changes */
+ private ServiceRegistrar proxy;
/* Flag that indicates that the ServiceItem has been discarded. */
private boolean bDiscarded = false;
/* The discovered service, prior to filtering. */
@@ -753,28 +755,42 @@ public class ServiceDiscoveryManager {
* lookup service proxy.
*/
public ServiceItemReg(ServiceRegistrar proxy, ServiceItem item) {
- addProxy(proxy);
+ this.proxy = proxy;
+ addProxy(proxy, item);
this.item = item;
}
- /* Adds the given proxy to the 'id-to-itemReg' map. This method is
+ /* Adds the given proxy to the 'proxy-to-item' map. This method is
* called from this class' constructor, LookupTask, NotifyEventTask,
- * and ProxyRegDropTask.
+ * and ProxyRegDropTask. Returns true if the proxy is being used
+ * to track changes, false otherwise.
*/
- public void addProxy(ServiceRegistrar proxy) {
- if(!proxys.contains(proxy)) proxys.add(proxy);
+ public boolean addProxy(ServiceRegistrar proxy, ServiceItem item) {
+ items.put(proxy, item);
+ return proxy.equals(this.proxy);
}
- /* Removes the given proxy from the 'id-to-itemReg' map. This method
- * is called from NotifyEventTask and ProxyRegDropTask.
- */
- public void removeProxy(ServiceRegistrar proxy) {
- int index = proxys.indexOf(proxy);
- if(index != -1) proxys.remove(index);
+ /* Removes the given proxy from the 'proxy-to-item' map. This method
+ * is called from NotifyEventTask and ProxyRegDropTask. If this proxy
+ * was being used to track changes, then pick a new one and return
+ * its current item, else return null.
+ */
+ public ServiceItem removeProxy(ServiceRegistrar proxy) {
+ items.remove(proxy);
+ if (proxy.equals(this.proxy)) {
+ if (items.isEmpty()) {
+ this.proxy = null;
+ } else {
+ Map.Entry ent = (Map.Entry) items.entrySet().iterator().next();
+ this.proxy = (ServiceRegistrar) ent.getKey();
+ return (ServiceItem) ent.getValue();
+ }//endif
+ }//endif
+ return null;
}
- /* Determines if the 'id-to-itemReg' map contains any mappings.
+ /* Determines if the 'proxy-to-item' map contains any mappings.
* This method is called from NotifyEventTask and ProxyRegDropTask.
*/
public boolean hasNoProxys() {
- return proxys.isEmpty();
+ return items.isEmpty();
}
/* Marks the ServiceItem as discarded. */
public void setDiscarded(boolean b) {
@@ -1076,33 +1092,18 @@ public class ServiceDiscoveryManager {
}//end run
}//end class LookupCacheImpl.ProxyRegDropTask
- /** Task class used to asynchronously discard the given service. */
+ /** Task class used to asynchronously notify service discard. */
private final class DiscardServiceTask extends CacheTask {
- private Object service;
- public DiscardServiceTask(Object service) {
+ private final ServiceItem item;
+ public DiscardServiceTask(ServiceItem item) {
super(null, 0);
- this.service = service;
+ this.item = item;
}
public void run() {
logger.finest("ServiceDiscoveryManager - DiscardServiceTask "
+"started");
- Iterator iter = getServiceIdMapEntrySetIterator();
- while(iter.hasNext()) {
- Map.Entry e = (Map.Entry)iter.next();
- ServiceItemReg itemReg = (ServiceItemReg)e.getValue();
- ServiceItem item;
- synchronized(itemReg) {
- item = itemReg.filteredItem;
- }//end sync(itemReg)
- if( (item.service).equals(service) ) {
- ServiceID sid = (ServiceID)e.getKey();
- removeServiceNotify(item);
- serviceDiscardTimerTaskMgr.add
- ( new ServiceDiscardTimerTask(sid) );
- return;
- }//endif
- }//end loop
+ removeServiceNotify(item);
logger.finest("ServiceDiscoveryManager - DiscardServiceTask "
+"completed");
}//end run
@@ -1229,9 +1230,11 @@ public class ServiceDiscoveryManager {
*/
private final class ServiceDiscardTimerTask implements TaskManager.Task
{
- private ServiceID serviceID;
+ private final ServiceID serviceID;
+ private final long endTime;
public ServiceDiscardTimerTask(ServiceID serviceID) {
this.serviceID = serviceID;
+ this.endTime = discardWait+System.currentTimeMillis();
}//end constructor
public void run(){
logger.finest("ServiceDiscoveryManager - "
@@ -1247,9 +1250,7 @@ public class ServiceDiscoveryManager {
synchronized(serviceIdMap) {
if(!serviceIdMap.containsKey(serviceID)) return;
}//end sync(serviceIdMap)
- /* Default to twice the (common) 5 minute max lease duration */
- long curDur = discardWait;
- long endTime = curDur+System.currentTimeMillis();
+ long curDur = endTime-System.currentTimeMillis();
synchronized(serviceDiscardMutex) {
/* Wait until interrupted or time expires */
while(curDur > 0) {
@@ -1306,6 +1307,7 @@ public class ServiceDiscoveryManager {
ServiceItem item = null;
ServiceItem filteredItem = null;
synchronized(itemReg) {
+ if(!itemReg.isDiscarded()) return;
if(itemReg.filteredItem == null) {
item = new ServiceItem
( (itemReg.item).serviceID,
@@ -1412,15 +1414,11 @@ public class ServiceDiscoveryManager {
public void run() {
logger.finest("ServiceDiscoveryManager - NewOldServiceTask "
+"started");
+ boolean changed = false;
ServiceItemReg itemReg;
synchronized(serviceIdMap) {
itemReg = (ServiceItemReg)serviceIdMap.get(thisTaskSid);
- }//end sync(serviceIdMap)
- if(itemReg != null) {//a. old, previously discovered item
- itemMatchMatchChange(reg.proxy, srvcItem,
- itemReg, matchMatchEvent);
- } else {//(itemReg == null) ==> b. newly discovered item
- synchronized(serviceIdMap) {
+ if (itemReg == null) {
if( !eventRegMap.containsKey(reg) ) {
/* reg must have been discarded, simply return */
logger.finest("ServiceDiscoveryManager - "
@@ -1429,7 +1427,14 @@ public class ServiceDiscoveryManager {
}//endif
itemReg = new ServiceItemReg( reg.proxy, srvcItem );
serviceIdMap.put( thisTaskSid, itemReg );
- }//end sync(serviceIdMap)
+ } else {
+ changed = true;
+ }
+ }//end sync(serviceIdMap)
+ if(changed) {//a. old, previously discovered item
+ itemMatchMatchChange(reg.proxy, srvcItem,
+ itemReg, matchMatchEvent);
+ } else {//b. newly discovered item
ServiceItem newFilteredItem =
filterMaybeDiscard(srvcItem,reg.proxy,false);
if(newFilteredItem != null) {
@@ -1486,14 +1491,21 @@ public class ServiceDiscoveryManager {
public void run() {
logger.finest("ServiceDiscoveryManager - UnmapProxyTask "
+"started");
- ServiceItem item = null;
+ ServiceRegistrar proxy = null;
+ ServiceItem item;
synchronized(itemReg) {
- itemReg.removeProxy(reg.proxy);//disassociate the LUS
- if( itemReg.hasNoProxys() ) {//no more LUSs, remove map
+ item = itemReg.removeProxy(reg.proxy);//disassociate the LUS
+ if (item != null) {// new LUS chosen to track changes
+ proxy = itemReg.proxy;
+ } else if( itemReg.hasNoProxys() ) {//no more LUSs, remove from map
item = itemReg.filteredItem;
}//endif
}//end sync(itemReg)
- if(item != null) removeServiceIdMap(thisTaskSid,item);
+ if(proxy != null) {
+ itemMatchMatchChange(proxy, item, itemReg, false);
+ } else if(item != null) {
+ removeServiceIdMap(thisTaskSid,item);
+ }//endif
logger.finest("ServiceDiscoveryManager - UnmapProxyTask "
+"completed");
}//end run
@@ -1645,20 +1657,24 @@ public class ServiceDiscoveryManager {
while(iter.hasNext()) {
Map.Entry e = (Map.Entry)iter.next();
ServiceItemReg itemReg = (ServiceItemReg)e.getValue();
+ ServiceItem filteredItem;
synchronized(itemReg) {
- if((itemReg.filteredItem.service).equals(serviceReference))
+ filteredItem = itemReg.filteredItem;
+ if((filteredItem.service).equals(serviceReference))
{
if( itemReg.isDiscarded() ) return;//already discarded
itemReg.setDiscarded(true);
discardIt = true;
- break;
}//endif
}//end sync(itemReg)
+ if(discardIt) {
+ ServiceID sid = (ServiceID)e.getKey();
+ serviceDiscardTimerTaskMgr.add
+ ( new ServiceDiscardTimerTask(sid) );
+ cacheTaskMgr.add(new DiscardServiceTask(filteredItem));
+ return;
+ }//endif
}//end loop
- if(discardIt) {
- CacheTask t = new DiscardServiceTask(serviceReference);
- cacheTaskMgr.add(t);
- }//endif
}//end LookupCacheImpl.discard
/* Returns the iterator of entry set in the copy of the ServiceIdMap */
@@ -2063,15 +2079,20 @@ public class ServiceDiscoveryManager {
*/
ServiceItem oldItem;
ServiceItem oldFilteredItem;
+ boolean itemRegIsDiscarded;
synchronized(itemReg) {
+ itemRegIsDiscarded = itemReg.isDiscarded();
+ if(!itemReg.addProxy(proxy, newItem)) { // not tracking
+ if(matchMatchEvent || !itemRegIsDiscarded) return;
+ itemReg.proxy = proxy; // start tracking instead
+ }//endif
oldItem = itemReg.item;
oldFilteredItem = itemReg.filteredItem;
- if(itemReg.isDiscarded()) {
+ if(itemRegIsDiscarded) {
itemReg.item = newItem;//capture changes for discard
itemReg.filteredItem = null;//so filter will be retried
- return;
+ if(matchMatchEvent) return;
}//endif
- itemReg.addProxy(proxy);
}//end sync(itemReg)
/* For an explanation of the logic of the following if-else-block,
* refer to the method description above.
@@ -2079,6 +2100,7 @@ public class ServiceDiscoveryManager {
boolean attrsChanged = false;
boolean versionChanged = false;
boolean isSameVersion = sameVersion(newItem,oldItem);
+ if(itemRegIsDiscarded) return;
if( matchMatchEvent || isSameVersion ) {
/* Same version, determine if the attributes have changed.
* But first, replace the new service proxy with the old
@@ -2137,8 +2159,8 @@ public class ServiceDiscoveryManager {
logger.finer("itemMatchMatchChange - version changed");
}//endif
/* Now apply the filter, and send events if appropriate */
- ServiceItem newFilteredItem = filterMaybeDiscard
- (newItem, proxy, true);
+ ServiceItem newFilteredItem =
+ filterMaybeDiscard(newItem, proxy, !itemRegIsDiscarded);
if(newFilteredItem != null) {
/* Passed the filter, okay to send event(s). */
if(attrsChanged) {
@@ -2149,7 +2171,9 @@ public class ServiceDiscoveryManager {
if(versionChanged) {
logger.finer("itemMatchMatchChange - send serviceRemoved");
- removeServiceNotify(oldFilteredItem);
+ if (!itemRegIsDiscarded) {
+ removeServiceNotify(oldFilteredItem);
+ }//endif
logger.finer("itemMatchMatchChange - send serviceAdded");
addServiceNotify(newFilteredItem);
}//endif
@@ -2354,7 +2378,12 @@ public class ServiceDiscoveryManager {
}//end sync(itemReg)
removeServiceIdMap(srvcID, oldFilteredItem);
} else {
+ boolean itemRegIsDiscarded;
+ synchronized(itemReg) {
+ itemRegIsDiscarded = itemReg.isDiscarded();
+ }//end sync(itemReg)
removeServiceIdMapSendNoEvent(srvcID);
+ if(itemRegIsDiscarded) cancelDiscardTask(srvcID);
}//endif
}//endif
return null;
@@ -2387,10 +2416,15 @@ public class ServiceDiscoveryManager {
itemReg = (ServiceItemReg)serviceIdMap.get(item.serviceID);
}//end sync(serviceIdMap)
if(itemReg == null) return;
+ boolean itemRegIsDiscarded;
synchronized(itemReg) {
itemReg.item = item;
itemReg.filteredItem = filteredItem;
+ if(itemRegIsDiscarded = itemReg.isDiscarded()) {
+ itemReg.setDiscarded(false);
+ }//endif
}//end sync(itemReg)
+ if(itemRegIsDiscarded) cancelDiscardTask(item.serviceID);
}//end LookupCacheImpl.addFilteredItemToMap
/** Convenience method called by <code>filterMaybeDiscard</code>
@@ -2425,9 +2459,9 @@ public class ServiceDiscoveryManager {
}//endif
itemReg.setDiscarded(true);
}//end sync(itemReg)
- if(sendEvent) removeServiceNotify(oldFilteredItem);
serviceDiscardTimerTaskMgr.add
( new ServiceDiscardTimerTask(item.serviceID) );
+ if(sendEvent) removeServiceNotify(oldFilteredItem);
}//end LookupCacheImpl.discardRetryLater
/** Convenience method called by <code>NotifyEventTask.run</code> (only
@@ -2445,25 +2479,27 @@ public class ServiceDiscoveryManager {
itemReg = (ServiceItemReg)serviceIdMap.get(srvcID);
}//end sync(serviceIdMap)
if(itemReg != null) {
+ ServiceItem newItem;
boolean itemRegHasNoProxys;
boolean itemRegIsDiscarded;
ServiceItem filteredItem;
synchronized(itemReg) {
- itemReg.removeProxy(proxy);
+ newItem = itemReg.removeProxy(proxy);
itemRegHasNoProxys = itemReg.hasNoProxys();
itemRegIsDiscarded = itemReg.isDiscarded();
filteredItem = itemReg.filteredItem;
}//end sync(itemReg)
- if(itemRegHasNoProxys) {
+ if(newItem != null) {
+ itemMatchMatchChange(itemReg.proxy, newItem, itemReg,
+ false);
+ } else if(itemRegHasNoProxys) {
if(itemRegIsDiscarded) {
/* Remove item from map and wake up the discard task */
logger.finer("handleMatchNoMatch - "
+"TRANSITION_MATCH_NOMATCH && service in "
+"discard queue - send NO event");
removeServiceIdMapSendNoEvent(srvcID);
- synchronized(serviceDiscardMutex) {
- serviceDiscardMutex.notifyAll();
- }//end sync
+ cancelDiscardTask(srvcID);
} else {//remove item from map and send removed event
logger.finer("handleMatchNoMatch - "
+"TRANSITION_MATCH_NOMATCH - send "
@@ -2474,6 +2510,22 @@ public class ServiceDiscoveryManager {
}//endif
}//end LookupCacheImpl.handleMatchNoMatch
+ /** Wake up service discard task if running, else remove from mgr. */
+ private void cancelDiscardTask(ServiceID sid) {
+ Iterator iter = serviceDiscardTimerTaskMgr.getPending().iterator();
+ while (iter.hasNext()) {
+ ServiceDiscardTimerTask t =
+ (ServiceDiscardTimerTask)iter.next();
+ if (sid.equals(t.serviceID)) {
+ if(serviceDiscardTimerTaskMgr.removeIfPending(t)) return;
+ break;
+ }//endif
+ }//end loop
+ synchronized(serviceDiscardMutex) {
+ serviceDiscardMutex.notifyAll();
+ }//end sync
+ }//end LookupCacheImpl.cancelDiscardTask
+
}//end class ServiceDiscoveryManager.LookupCacheImpl
/* Name of this component; used in config entry retrieval and the logger.*/