You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@river.apache.org by bt...@apache.org on 2009/11/08 21:49:16 UTC
svn commit: r833922 -
/incubator/river/jtsk/trunk/src/net/jini/lookup/ServiceDiscoveryManager.java
Author: btmurphy
Date: Sun Nov 8 20:49:16 2009
New Revision: 833922
URL: http://svn.apache.org/viewvc?rev=833922&view=rev
Log:
fix for river-324: Under certain circumstances, the ServiceDiscoveryManager internal LookupCache implementation can incorrectly process attribute change events before the lookup snapshot is processed.
Modified:
incubator/river/jtsk/trunk/src/net/jini/lookup/ServiceDiscoveryManager.java
Modified: incubator/river/jtsk/trunk/src/net/jini/lookup/ServiceDiscoveryManager.java
URL: http://svn.apache.org/viewvc/incubator/river/jtsk/trunk/src/net/jini/lookup/ServiceDiscoveryManager.java?rev=833922&r1=833921&r2=833922&view=diff
==============================================================================
--- incubator/river/jtsk/trunk/src/net/jini/lookup/ServiceDiscoveryManager.java (original)
+++ incubator/river/jtsk/trunk/src/net/jini/lookup/ServiceDiscoveryManager.java Sun Nov 8 20:49:16 2009
@@ -721,11 +721,17 @@
public long seqNo;
/* The Event notification lease */
public Lease lease;
+ /* The pending events */
+ public ArrayList pending;
+ /* The number of pending LookupTasks */
+ public int lookupsPending;
public EventReg(Object source, long eventID, long seqNo, Lease lease) {
this.source = source;
this.eventID = eventID;
this.seqNo = seqNo;
this.lease = lease;
+ this.pending = new ArrayList();
+ this.lookupsPending = 0;
}
}//end class ServiceDiscoveryManager.EventReg
@@ -889,16 +895,18 @@
tmpl,
lookupListenerProxy,
duration);
+ eventReg.lookupsPending++;
synchronized(serviceIdMap) {
/* Cancel the lease if the cache has been terminated */
if(bCacheTerminated) {
cancelLease(eventReg.lease);
+ return;
} else {
eventRegMap.put(reg, eventReg);
}//endif
}//end sync(serviceIdMap)
/* Execute the LookupTask only if there were no problems */
- (new LookupTask(reg, this.getSeqN())).run();
+ (new LookupTask(reg, this.getSeqN(), eventReg)).run();
} catch (Exception e) {
boolean cacheTerminated;
synchronized(serviceIdMap) {
@@ -917,8 +925,10 @@
/** This class requests a "snapshot" of the given registrar's state.*/
private final class LookupTask extends CacheTask {
- public LookupTask(ProxyReg reg, long seqN) {
+ private EventReg eReg;
+ public LookupTask(ProxyReg reg, long seqN, EventReg eReg) {
super(reg, seqN);
+ this.eReg = eReg;
}
public void run() {
logger.finest("ServiceDiscoveryManager - LookupTask started");
@@ -930,6 +940,7 @@
} catch (Exception e) {
boolean cacheTerminated;
synchronized(serviceIdMap) {
+ eReg.lookupsPending--;
cacheTerminated = bCacheTerminated;
}//end sync
ServiceDiscoveryManager.this.fail
@@ -971,6 +982,10 @@
taskSeqN++);
cacheTaskMgr.add(t);
}//end loop
+ /* 3. Handle events that came in prior to lookup */
+ eReg.lookupsPending--;
+ cacheTaskMgr.addAll(eReg.pending);
+ eReg.pending.clear();
}//end sync(serviceIdMap)
logger.finest("ServiceDiscoveryManager - LookupTask "
+"completed");
@@ -1863,8 +1878,16 @@
if(seqNo == (prevSeqNo+1)) {//no gap, handle current event
t = new NotifyEventTask
(reg, sid, item, transition, taskSeqN++);
+ if(eReg.lookupsPending > 0) {
+ eReg.pending.add(t);
+ return;
+ }//endif
+ } else if(eReg.lookupsPending > 1) {
+ //gap in event sequence, but snapshot already pending
+ return;
} else {//gap in event sequence, request snapshot
- t = new LookupTask(reg, taskSeqN++);
+ eReg.lookupsPending++;
+ t = new LookupTask(reg, taskSeqN++, eReg);
if( logger.isLoggable(Levels.HANDLED) ) {
String msg ="notifyServiceMap - GAP in event sequence "
+"[serviceRegistrar={0}], "