You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@river.apache.org by pe...@apache.org on 2014/02/02 13:31:10 UTC
svn commit: r1563596 [2/3] - in /river/jtsk/skunk/qa_refactor/trunk:
qa/src/com/sun/jini/test/impl/outrigger/matching/
qa/src/com/sun/jini/test/impl/servicediscovery/event/
qa/src/com/sun/jini/test/spec/lookupservice/test_set00/
qa/src/com/sun/jini/tes...
Modified: river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/ServiceDiscoveryManager.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/ServiceDiscoveryManager.java?rev=1563596&r1=1563595&r2=1563596&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/ServiceDiscoveryManager.java (original)
+++ river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/ServiceDiscoveryManager.java Sun Feb 2 12:31:09 2014
@@ -17,6 +17,7 @@
*/
package net.jini.lookup;
+import org.apache.river.impl.thread.DependencyLinker;
import au.net.zeus.collection.RC;
import au.net.zeus.collection.Ref;
import au.net.zeus.collection.Referrer;
@@ -28,7 +29,6 @@ import java.rmi.RemoteException;
import java.rmi.server.ExportException;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
@@ -39,12 +39,12 @@ import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -82,9 +82,14 @@ import net.jini.security.BasicProxyPrepa
import net.jini.security.ProxyPreparer;
import net.jini.security.TrustVerifier;
import net.jini.security.proxytrust.ServerProxyTrust;
+import org.apache.river.api.util.FutureObserver;
+import org.apache.river.api.util.FutureObserver.ObservableFuture;
+import org.apache.river.api.util.FutureObserver.Subscribeable;
+import org.apache.river.api.util.FutureObserver.Subscriber;
import org.apache.river.impl.thread.ExtensibleExecutorService;
import org.apache.river.impl.thread.ExtensibleExecutorService.RunnableFutureFactory;
import org.apache.river.impl.thread.NamedThreadFactory;
+import org.apache.river.impl.thread.ObservableFutureTask;
/**
* The <code>ServiceDiscoveryManager</code> class is a helper utility class
@@ -642,6 +647,12 @@ public class ServiceDiscoveryManager {
}//end getSeqN
public abstract void run();
+
+ public abstract boolean hasDeps();
+
+ public boolean dependsOn(CacheTask task){
+ return false;
+ }
}//end class ServiceDiscoveryManager.CacheTask
/** Abstract base class for controlling the order-of-execution of tasks
@@ -666,23 +677,24 @@ public class ServiceDiscoveryManager {
* then run this task *after* the task in the list (return true);
* otherwise run this task now (return false).
*
- * @param tasks the tasks to consider.
- * @param size elements with index less than size are considered.
*/
-// public boolean runAfter(List tasks, int size) {
-// for(int i=0; i<size; i++) {
-// Runnable t = (Runnable) tasks.get(i);
-// //Compare only instances of this task class
-// if( !(t instanceof ServiceIdTask) ) continue;
-// ServiceID otherTaskSid = ((ServiceIdTask)t).getServiceID();
-// if( thisTaskSid.equals(otherTaskSid) ) {
-// if(thisTaskSeqN > ((ServiceIdTask)t).getSeqN()) {
-// return true;//run this task after the other task
-// }//endif
-// }//endif
-// }//end loop
-// return false;//run this task now
-// }//end runAfter
+ @Override
+ public boolean dependsOn(CacheTask t) {
+ //Compare only instances of this task class
+ if( !(t instanceof ServiceIdTask) ) return false;
+ ServiceID otherTaskSid = ((ServiceIdTask)t).getServiceID();
+ if( thisTaskSid.equals(otherTaskSid) ) {
+ if(thisTaskSeqN > ((ServiceIdTask)t).getSeqN()) {
+ return true;//run this task after the other task
+ }//endif
+ }//endif
+ return false;//run this task now
+ }//end dependsOn
+
+ @Override
+ public boolean hasDeps(){
+ return true;
+ }
/** Returns the ServiceID associated with this task. */
public ServiceID getServiceID() {
@@ -699,11 +711,14 @@ public class ServiceDiscoveryManager {
{
private final List items = new LinkedList();
+ @Override
public synchronized void serviceAdded(ServiceDiscoveryEvent event) {
items.add(event.getPostEventServiceItem());
this.notifyAll();
}
+ @Override
public void serviceRemoved(ServiceDiscoveryEvent event){ }
+ @Override
public void serviceChanged(ServiceDiscoveryEvent event){ }
public synchronized ServiceItem[] getServiceItem() {
ServiceItem[] r = new ServiceItem[items.size()];
@@ -718,27 +733,71 @@ public class ServiceDiscoveryManager {
* number. For each LookupCache, there is a HashMap that maps a ProxyReg
* to an EventReg.
*/
- private final static class EventReg {
+ private final static class EventReg {
/* The Event source from the event registration */
final Object source;
/* The Event ID */
public final long eventID;
/* The current event sequence number for the Service template */
- public long seqNo;
+ private long seqNo;
/* The Event notification lease */
public final Lease lease;
/* The pending events */
- public final ArrayList pending;
+ private final List<LookupCacheImpl.NotifyEventTask> pending;
/* The number of pending LookupTasks */
- public int lookupsPending;
+ private int lookupsPending;
+
public EventReg(Object source, long eventID, long seqNo, Lease lease) {
this.source = source;
this.eventID = eventID;
this.seqNo = seqNo;
this.lease = lease;
- this.pending = new ArrayList();
+ pending = new ArrayList<LookupCacheImpl.NotifyEventTask>();
this.lookupsPending = 0;
}
+
+ /**
+ * @param seqNo the seqNo to set, return a positive delta if
+ * successful, otherwise a negative value indicates failure.
+ */
+ public synchronized long updateSeqNo(long seqNo) {
+ long thisSeqNo = this.seqNo;
+ if (seqNo > this.seqNo){
+ this.seqNo = seqNo;
+ return thisSeqNo - seqNo;
+ } else {
+ return thisSeqNo - seqNo; // Return a negative value
+ }
+ }
+
+ public synchronized boolean addIfLookupsPending(CacheTask task){
+ if (task instanceof LookupCacheImpl.NotifyEventTask){
+ if (getLookupsPending() > 0){
+ pending.add((LookupCacheImpl.NotifyEventTask) task);
+ }
+ }
+ return false;
+ }
+//
+// public synchronized void addLookupPending(ObservableFutureTask task){
+// task.addObserver(this);
+// pendingLookups.add(task);
+// }
+
+ /**
+ * @return the lookupsPending
+ */
+ public synchronized int getLookupsPending() {
+ return lookupsPending;
+ }
+
+ public synchronized void decrementLookupsPending(){
+ lookupsPending--;
+ }
+
+ public synchronized void incrementLookupsPending(){
+ lookupsPending++;
+ }
}//end class ServiceDiscoveryManager.EventReg
/**
@@ -748,7 +807,7 @@ public class ServiceDiscoveryManager {
*/
private final static class ServiceItemReg {
/* Maps ServiceRegistrars to their latest registered item */
- private final Map<ServiceRegistrar,ServiceItem> items = new HashMap<ServiceRegistrar,ServiceItem>();
+ private final Map<ServiceRegistrar,ServiceItem> items;
/* The ServiceRegistrar currently being used to track changes */
private ServiceRegistrar proxy;
/* Flag that indicates that the ServiceItem has been discarded. */
@@ -761,6 +820,7 @@ public class ServiceDiscoveryManager {
* lookup service proxy.
*/
public ServiceItemReg(ServiceRegistrar proxy, ServiceItem item) {
+ items = new HashMap<ServiceRegistrar,ServiceItem>();
this.proxy = proxy;
items.put(proxy, item);
this.item = item;
@@ -854,22 +914,12 @@ public class ServiceDiscoveryManager {
/** Allows termination of LookupCacheImpl so blocking lookup can return
* quickly
*/
- private final class LookupCacheTerminator extends Thread {
+ private static final class LookupCacheTerminator implements Runnable {
private final BlockingQueue<LookupCacheImpl> cacheList = new LinkedBlockingQueue<LookupCacheImpl>(20);
- LookupCacheTerminator(){
- super("SDM lookup cache terminator");
- setDaemon(true);
- }
- public void start(){
- synchronized (this){
- started = true;
- }
- super.start();
- }
public void run(){
- while (!isInterrupted()) {
+ while (!Thread.currentThread().isInterrupted()) {
try {
LookupCacheImpl cache = cacheList.take();
synchronized (cache){
@@ -894,69 +944,94 @@ public class ServiceDiscoveryManager {
}
- private static class DependencyLinker {
-
- private final ExecutorService executor;
-
- DependencyLinker(ExecutorService ex){
- executor = new ExtensibleExecutorService
- (
- ex,
- new RunnableFutureFactory(){
-
- @Override
- public <T> RunnableFuture<T> newTaskFor(Runnable r, T value) {
- return new FutureTaskSeqNo<T>(r, value);
- }
-
- @Override
- public <T> RunnableFuture<T> newTaskFor(Callable<T> c) {
- return new FutureTaskSeqNo<T>(c);
- }
- }
- );
- }
-
- void execute(CacheTask task)
- {
- executor.submit(task);
- }
- }
-
/**
*
* @param <T>
*/
- public static final class FutureTaskSeqNo<T> extends FutureTask<T> implements Comparable<FutureTaskSeqNo>
+ static final class CacheTaskWrapper<T> extends ObservableFutureTask<T>
{
- private final long seqNo;
+ private final CacheTask task;
- private FutureTaskSeqNo(Runnable r, T result) {
+ private CacheTaskWrapper(Runnable r, T result) {
super(r, result);
- if (r instanceof CacheTask) seqNo = ((CacheTask)r).getSeqN();
- else seqNo = -1;
+ if (r instanceof CacheTask) task = (CacheTask)r;
+ else task = null;
}
- private FutureTaskSeqNo(Callable<T> c)
+ private CacheTaskWrapper(Callable<T> c)
{
super(c);
- if (c instanceof CacheTask) seqNo = ((CacheTask)c).getSeqN();
- else seqNo = -1;
+ if (c instanceof CacheTask) task = (CacheTask)c;
+ else task = null;
+ }
+
+ private CacheTask getTask(){
+ return task;
+ }
+ }
+
+ private static final class CacheTaskQueue implements FutureObserver {
+ // CacheTasks pending completion.
+ private final ConcurrentLinkedQueue<CacheTaskWrapper> pending;
+ private final ExecutorService executor;
+
+ private CacheTaskQueue(ExecutorService e){
+ this.pending = new ConcurrentLinkedQueue<CacheTaskWrapper>();
+ executor = e;
+ }
+
+ private CacheTaskWrapper submit(CacheTask t){
+ CacheTaskWrapper future = new CacheTaskWrapper(t, null);
+ pending.offer(future);
+ future.addObserver(this);
+ if (t.hasDeps()) {
+ List<ObservableFuture> deps = new ArrayList<ObservableFuture>();
+ Iterator<CacheTaskWrapper> it = pending.iterator();
+ while (it.hasNext()){
+ CacheTaskWrapper w = it.next();
+ CacheTask c = w.getTask();
+ if (t.dependsOn(c)) {
+ deps.add(w);
+ }
+ }
+ if (deps.isEmpty()){
+ executor.submit(future);
+ } else {
+ DependencyLinker linker = new DependencyLinker(executor, deps, future);
+ linker.register();
+ }
+ } else {
+ executor.submit(future);
+ }
+ return future;
}
@Override
- public int compareTo(FutureTaskSeqNo o) {
- if (seqNo < o.seqNo) return -1;
- if (seqNo > o.seqNo) return 1;
- return 0;
+ public void futureCompleted(Future e) {
+ pending.remove(e);
}
+
+ /** Removes from the cache's task queue, all pending tasks
+ * associated with the given ProxyReg. This method is called
+ * when the given ProxyReg has been discarded.
+ */
+ private void removeUselessTask(ProxyReg reg) {
+ Iterator<CacheTaskWrapper> it = pending.iterator();
+ while( it.hasNext()) {
+ CacheTaskWrapper w = it.next();
+ CacheTask t = w.getTask();
+ if(t.isFromProxy(reg)) {
+ w.cancel(true); // Also causes task to be removed
+ }
+ }//end loop
+ }//end LookupCacheImpl.removeUselessTask
}
/** Internal implementation of the LookupCache interface. Instances of
* this class are used in the blocking versions of lookup() and are
* returned by createLookupCache.
*/
- private final class LookupCacheImpl implements LookupCache {
+ private static final class LookupCacheImpl implements LookupCache {
/* RemoteEventListener class that is registered with the proxy to
* receive notifications from lookup services when any ServiceItem
@@ -1016,59 +1091,95 @@ public class ServiceDiscoveryManager {
* process has completed, resulting in an incorrect view of the
* current state of the ServiceRegistrar.
*/
- private final class RegisterListenerTask extends CacheTask {
- public RegisterListenerTask(ProxyReg reg, long seqN) {
+ private static final class RegisterListenerTask extends CacheTask
+ implements Subscribeable {
+ private final LookupCacheImpl cache;
+ private Subscriber subscriber;
+ public RegisterListenerTask(
+ ProxyReg reg,
+ long seqN,
+ LookupCacheImpl cache)
+ {
super(reg, seqN);
+ this.cache = cache;
}
+ public boolean hasDeps(){
+ return true;
+ }
+
+ public boolean dependsOn(CacheTask t){
+ if (t instanceof ProxyRegDropTask ){
+ ProxyReg r = getProxyReg();
+ if (r != null && r.equals(t.getProxyReg())){
+ if (t.getSeqN() < getSeqN()) return true;
+ }
+ }
+ return false;
+ }
+
public void run() {
logger.finest("ServiceDiscoveryManager - RegisterListenerTask "
+"started");
- long duration = getLeaseDuration();
+ long duration = cache.getLeaseDuration();
if(duration < 0) return;
try {
- EventReg eventReg = registerListener(reg.getProxy(),
- tmpl,
- lookupListenerProxy,
+ EventReg eventReg = cache.sdm.registerListener(reg.getProxy(),
+ cache.tmpl,
+ cache.lookupListenerProxy,
duration);
// eventReg is a new object not visible to other threads yet.
- eventReg.lookupsPending++;
+ eventReg.incrementLookupsPending();
/* Cancel the lease if the cache has been terminated */
- if(bCacheTerminated) {
+ if(cache.bCacheTerminated || Thread.currentThread().isInterrupted()) {
// eventReg.lease is final and is already safely published
- cancelLease(eventReg.lease);
+ cache.sdm.cancelLease(eventReg.lease);
return;
} else {
// eventReg will be published safely to other threads
// via eventRegMap and LookupTask
- EventReg existed = eventRegMap.putIfAbsent(reg, eventReg);
+ EventReg existed = cache.eventRegMap.putIfAbsent(reg, eventReg);
if (existed == null ) {
- (new LookupTask(reg, this.getSeqN(), eventReg)).run();
+ ObservableFuture task =
+ cache.taskQueue.submit(
+ new LookupTask(reg, this.getSeqN(), eventReg, cache)
+ );
+ synchronized (this){
+ if (subscriber != null) subscriber.reccommendedViewing(task);
+ }
} else {
// Another eventReg.lease exists, cancel new lease.
- cancelLease(eventReg.lease);
+ cache.sdm.cancelLease(eventReg.lease);
}
}//endif
/* Execute the LookupTask only if there were no problems */
} catch (Exception e) {
- ServiceDiscoveryManager.this.fail
+ cache.sdm.fail
(e, reg.getProxy(),this.getClass().getName(),"run",
"Exception occurred while attempting to register "
+"with the lookup service event mechanism",
- bCacheTerminated);
+ cache.bCacheTerminated);
}
logger.finest("ServiceDiscoveryManager - RegisterListenerTask "
+"completed");
}//end run
+
+ @Override
+ public synchronized void subscribe(Subscriber subscriber) {
+ this.subscriber = subscriber;
+ }
}//end class LookupCacheImpl.RegisterListenerTask
/** This class requests a "snapshot" of the given registrar's state.*/
- private final class LookupTask extends CacheTask {
+ private static final class LookupTask extends CacheTask implements Subscribeable {
private final EventReg eReg;
- public LookupTask(ProxyReg reg, long seqN, EventReg eReg) {
+ private final LookupCacheImpl cache;
+ private Subscriber subscriber;
+ public LookupTask(ProxyReg reg, long seqN, EventReg eReg, LookupCacheImpl cache) {
super(reg, seqN);
this.eReg = eReg;
+ this.cache = cache;
}
public void run() {
logger.finest("ServiceDiscoveryManager - LookupTask started");
@@ -1076,17 +1187,13 @@ public class ServiceDiscoveryManager {
ServiceMatches matches;
/* For the given lookup, get all services matching the tmpl */
try {
- matches = proxy.lookup(tmpl, Integer.MAX_VALUE);
+ matches = proxy.lookup(cache.tmpl, Integer.MAX_VALUE);
} catch (Exception e) {
- boolean cacheTerminated;
- synchronized(eReg) {
- eReg.lookupsPending--;
- cacheTerminated = bCacheTerminated;
- }//end sync
- ServiceDiscoveryManager.this.fail
+ eReg.decrementLookupsPending();
+ cache.sdm.fail
(e,proxy,this.getClass().getName(),"run",
"Exception occurred during call to lookup",
- cacheTerminated);
+ cache.bCacheTerminated);
return;
}
if(matches.items == null) {
@@ -1096,41 +1203,61 @@ public class ServiceDiscoveryManager {
+"null 'items' field");
}
/* 1. Cleanup "orphaned" itemReg's. */
- Iterator<Map.Entry<ServiceID,ServiceItemReg>> iter = (serviceIdMap.entrySet()).iterator();
+ Iterator<Map.Entry<ServiceID,ServiceItemReg>> iter
+ = cache.serviceIdMap.entrySet().iterator();
while(iter.hasNext()) {
Map.Entry<ServiceID,ServiceItemReg> e = iter.next();
ServiceID srvcID = e.getKey();
- ServiceItem itemInSnapshot = findItem(srvcID,
+ ServiceItem itemInSnapshot = cache.findItem(srvcID,
matches.items);
if(itemInSnapshot != null) continue;//not an orphan
+ if (Thread.currentThread().isInterrupted()) continue; // skip
ServiceItemReg itemReg = e.getValue();
- UnmapProxyTask t = new UnmapProxyTask(reg,
+ CacheTask t =
+ new UnmapProxyTask(
+ reg,
itemReg,
srvcID,
- taskSeqN.getAndIncrement());
- cacheDependencyLinker.execute(t);
+ getSeqN(),
+ cache
+ );
+ ObservableFuture task = cache.taskQueue.submit(t);
+ synchronized (this){
+ if (subscriber != null) subscriber.reccommendedViewing(task);
+ }
}//end loop
/* 2. Handle "new" and "old" items from the given lookup */
for(int i=0; i<(matches.items).length; i++) {
/* Skip items with null service field (Bug 4378751) */
if( (matches.items[i]).service == null ) continue;
- NewOldServiceTask t =
- new NewOldServiceTask(reg,
+ if (Thread.currentThread().isInterrupted()) continue; // skip
+ CacheTask t =
+ new NewOldServiceTask(
+ reg,
matches.items[i],
false,
- taskSeqN.getAndIncrement());
- cacheDependencyLinker.execute(t);
+ getSeqN(),
+ cache
+ );
+
+ ObservableFuture task = cache.taskQueue.submit(t);
+ synchronized (this){
+ if (subscriber != null) subscriber.reccommendedViewing(task);
+ }
}//end loop
/* 3. Handle events that came in prior to lookup */
synchronized (eReg){
- eReg.lookupsPending--;
+ eReg.decrementLookupsPending();
+ if (eReg.getLookupsPending() == 0){
Iterator it = eReg.pending.iterator() ;
while (it.hasNext()) {
+ if (Thread.currentThread().isInterrupted()) continue; // skip
NotifyEventTask t = (NotifyEventTask) it.next();
- t.thisTaskSeqN = taskSeqN.getAndIncrement(); // assign new seqN
- cacheDependencyLinker.execute(t);
+ t.thisTaskSeqN = cache.taskSeqN.getAndIncrement(); // assign new seqN
+ cache.taskQueue.submit(t);
}
eReg.pending.clear();
+ }
}//end sync(eReg)
logger.finest("ServiceDiscoveryManager - LookupTask "
+"completed");
@@ -1154,21 +1281,27 @@ public class ServiceDiscoveryManager {
* @param tasks the tasks to consider.
* @param size elements with index less than size are considered.
*/
-// public boolean runAfter(List tasks, int size) {
-// for(int i=0; i<size; i++) {
-// CacheTask t = (CacheTask)tasks.get(i);
-// if( t instanceof RegisterListenerTask
-// || t instanceof LookupTask
-// || t instanceof NotifyEventTask )
-// {
-// ProxyReg otherReg = t.getProxyReg();
-// if( reg.equals(otherReg) ) {
-// if(thisTaskSeqN > t.getSeqN()) return true;
-// }//endif
-// }//endif
-// }//end loop
-// return false;
-// }//end runAfter
+ public boolean dependsOn(CacheTask t) {
+ if( t instanceof RegisterListenerTask
+ || t instanceof LookupTask
+ || t instanceof NotifyEventTask )
+ {
+ ProxyReg otherReg = t.getProxyReg();
+ if( reg.equals(otherReg) ) {
+ if(thisTaskSeqN > t.getSeqN()) return true;
+ }//endif
+ }//endif
+ return false;
+ }//end dependsOn
+
+ public boolean hasDeps(){
+ return true;
+ }
+
+ @Override
+ public synchronized void subscribe(Subscriber subscriber) {
+ this.subscriber = subscriber;
+ }
}//end class LookupCacheImpl.LookupTask
@@ -1176,35 +1309,72 @@ public class ServiceDiscoveryManager {
* remove the registrar from the various maps maintained by this
* cache.
*/
- private final class ProxyRegDropTask extends CacheTask {
- public ProxyRegDropTask(ProxyReg reg, long seqN) {
+ private static final class ProxyRegDropTask extends CacheTask
+ implements Subscribeable {
+
+ private final LookupCacheImpl cache;
+ private final EventReg eReg;
+ private Subscriber subscriber;
+
+ public ProxyRegDropTask(
+ ProxyReg reg, EventReg eReg, long seqN, LookupCacheImpl cache)
+ {
super(reg, seqN);
+ this.cache = cache;
+ this.eReg = eReg;
}
public void run() {
logger.finest("ServiceDiscoveryManager - ProxyRegDropTask "
+"started");
//lease has already been cancelled by removeProxyReg
- eventRegMap.remove(reg);
+ cache.eventRegMap.remove(reg, eReg);
/* For each itemReg in the serviceIdMap, disassociate the
* lookup service referenced here from the itemReg; and
* if the itemReg then has no more lookup services associated
* with it, remove the itemReg from the map and send a
* service removed event.
*/
- Iterator iter = (serviceIdMap.entrySet()).iterator();
+ Iterator iter = (cache.serviceIdMap.entrySet()).iterator();
while(iter.hasNext()) {
Map.Entry e = (Map.Entry)iter.next();
ServiceID srvcID = (ServiceID)e.getKey();
ServiceItemReg itemReg = (ServiceItemReg)e.getValue();
- UnmapProxyTask t = new UnmapProxyTask(reg,
+ UnmapProxyTask t =
+ new UnmapProxyTask(
+ reg,
itemReg,
srvcID,
- taskSeqN.getAndIncrement());
- cacheDependencyLinker.execute(t);
+ getSeqN(),
+ cache);
+ ObservableFuture task = cache.taskQueue.submit(t);
+ synchronized (this){
+ if (subscriber != null) subscriber.reccommendedViewing(task);
+ }
}//end loop
logger.finest("ServiceDiscoveryManager - ProxyRegDropTask "
+"completed");
}//end run
+
+ @Override
+ public boolean hasDeps(){
+ return true;
+ }
+
+ public boolean dependsOn(CacheTask t){
+ if (t instanceof RegisterListenerTask ){
+ ProxyReg r = getProxyReg();
+ if (r != null && r.equals(t.getProxyReg())){
+ if (t.getSeqN() < getSeqN()) return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public synchronized void subscribe(Subscriber subscriber) {
+ this.subscriber = subscriber;
+ }
+
}//end class LookupCacheImpl.ProxyRegDropTask
/** Task class used to asynchronously notify service discard. */
@@ -1214,6 +1384,10 @@ public class ServiceDiscoveryManager {
super(null, 0);
this.item = item;
}
+
+ public boolean hasDeps(){
+ return false;
+ }
public void run() {
logger.finest("ServiceDiscoveryManager - DiscardServiceTask "
@@ -1228,11 +1402,15 @@ public class ServiceDiscoveryManager {
* discovery listeners of serviceAdded/serviceRemoved/serviceChanged
* events.
*/
- private final class NotifyEventTask extends ServiceIdTask {
+ private static class NotifyEventTask extends ServiceIdTask
+ implements Subscribeable {
private final ServiceID sid;
private final ServiceItem item;
private final int transition;
- public NotifyEventTask(ProxyReg reg,
+ private final LookupCacheImpl cache;
+ private Subscriber subscriber;
+ public NotifyEventTask(LookupCacheImpl cache,
+ ProxyReg reg,
ServiceID sid,
ServiceItem item,
int transition,
@@ -1242,6 +1420,7 @@ public class ServiceDiscoveryManager {
this.sid = sid;
this.item = item;
this.transition = transition;
+ this.cache = cache;
}//end constructor
public void run() {
@@ -1258,7 +1437,8 @@ public class ServiceDiscoveryManager {
* because the primary if-block will be unintentionally
* entered due to the null service field in the ServiceItem.
*/
- if( (item != null) && (item.service == null) ) {
+ if( ((item != null) && (item.service == null))
+ || Thread.currentThread().isInterrupted()) {
return;
}//endif
/* Handle the event by the transition type, and by whether
@@ -1266,11 +1446,14 @@ public class ServiceDiscoveryManager {
* item, or a newly discovered item.
*/
if(transition == ServiceRegistrar.TRANSITION_MATCH_NOMATCH) {
- handleMatchNoMatch(reg.getProxy(), sid, item);
+ cache.handleMatchNoMatch(reg.getProxy(), sid);
} else {//(transition == NOMATCH_MATCH or MATCH_MATCH)
- (new NewOldServiceTask(reg, item,
+ ObservableFuture task = cache.taskQueue.submit(new NewOldServiceTask(reg, item,
(transition == ServiceRegistrar.TRANSITION_MATCH_MATCH),
- thisTaskSeqN)).run();
+ thisTaskSeqN, cache));
+ synchronized (this){
+ if (subscriber != null) subscriber.reccommendedViewing(task);
+ }
}//endif(transition)
logger.finest("ServiceDiscoveryManager - NotifyEventTask "
+"completed");
@@ -1300,20 +1483,26 @@ public class ServiceDiscoveryManager {
* @param tasks the tasks to consider.
* @param size elements with index less than size are considered.
*/
-// public boolean runAfter(List tasks, int size) {
-// for(int i=0; i<size; i++) {
-// Runnable t = (Runnable)tasks.get(i);
-// if( t instanceof RegisterListenerTask
-// || t instanceof LookupTask )
-// {
-// ProxyReg otherReg = ((CacheTask)t).getProxyReg();
-// if( reg.equals(otherReg) ) {
-// if(thisTaskSeqN > ((CacheTask)t).getSeqN()) return true;
-// }//endif
-// }//endif
-// }//end loop
-// return super.runAfter(tasks, size);
-// }//end runAfter
+ public boolean dependsOn(CacheTask t) {
+ if( t instanceof RegisterListenerTask
+ || t instanceof LookupTask )
+ {
+ ProxyReg otherReg = t.getProxyReg();
+ if( reg.equals(otherReg) ) {
+ if(thisTaskSeqN > t.getSeqN()) return true;
+ }//endif
+ }//endif
+ return super.dependsOn(t);
+ }//end runAfter
+
+ public boolean hasDeps(){
+ return true;
+ }
+
+ @Override
+ public synchronized void subscribe(Subscriber subscriber) {
+ this.subscriber = subscriber;
+ }
}//end class LookupCacheImpl.NotifyEventTask
@@ -1323,33 +1512,35 @@ public class ServiceDiscoveryManager {
* a filter retry on an item in which the cache's filter initially
* returned indefinite.
*/
- private final class ServiceDiscardTimerTask implements Runnable
+ private static final class ServiceDiscardTimerTask implements Runnable
{
private final ServiceID serviceID;
private final long endTime;
- public ServiceDiscardTimerTask(ServiceID serviceID) {
+ private final LookupCacheImpl cache;
+ public ServiceDiscardTimerTask( LookupCacheImpl cache, ServiceID serviceID) {
this.serviceID = serviceID;
- this.endTime = discardWait+System.currentTimeMillis();
+ this.cache = cache;
+ this.endTime = cache.sdm.discardWait + System.currentTimeMillis();
}//end constructor
public void run(){
logger.finest("ServiceDiscoveryManager - "
+"ServiceDiscardTimerTask started");
/* Exit if this cache has already been terminated. */
- if(bCacheTerminated) return;
+ if(cache.bCacheTerminated) return;
/* Simply return if a MATCH_NOMATCH event arrived for this
* item prior to this task running and as a result, the item
* was removed from the map.
*/
- if(!serviceIdMap.containsKey(serviceID)) return;
+ if(!cache.serviceIdMap.containsKey(serviceID)) return;
long curDur = endTime-System.currentTimeMillis();
- synchronized(serviceDiscardMutex) {
+ synchronized(cache.serviceDiscardMutex) {
/* Wait until interrupted or time expires */
while(curDur > 0) {
try {
- serviceDiscardMutex.wait(curDur);
+ cache.serviceDiscardMutex.wait(curDur);
} catch(InterruptedException e){ }
/* Exit if this cache was terminated while waiting. */
- if(bCacheTerminated) return;
+ if(cache.bCacheTerminated) return;
/* Either the wait period has completed or has been
* interrupted. If the service ID is no longer in
* in the serviceIdMap, then it's assumed that a
@@ -1360,7 +1551,7 @@ public class ServiceDiscoveryManager {
* re-discovered when it comes back on line. In that
* case, exit the thread.
*/
- if(!serviceIdMap.containsKey(serviceID)) return;
+ if(!cache.serviceIdMap.containsKey(serviceID)) return;
curDur = endTime-System.currentTimeMillis();
}//end loop
}//end sync
@@ -1386,10 +1577,11 @@ public class ServiceDiscoveryManager {
* event was already sent when the service was originally
* discarded.
*/
- ServiceItemReg itemReg = serviceIdMap.get(serviceID);
+ ServiceItemReg itemReg = cache.serviceIdMap.get(serviceID);
if(itemReg != null) {
ServiceItem item = null;
ServiceItem filteredItem = null;
+ ServiceItem itemToSend;
synchronized(itemReg) {
if(!itemReg.isDiscarded()) return;
if(itemReg.filteredItem == null) {
@@ -1402,29 +1594,26 @@ public class ServiceDiscoveryManager {
(itemReg.item).service,
(itemReg.item).attributeSets);
}//endif
- }//end sync(itemReg)
- if(filteredItem != null) {//retry the filter
- if( filterPassFail(filteredItem,filter) ) {
- addFilteredItemToMap(item,filteredItem);
- } else {//'quietly' remove the item
- removeServiceIdMapSendNoEvent(serviceID);
- return;
+ if(filteredItem != null) {//retry the filter
+ if( cache.sdm.filterPassFail(filteredItem,cache.filter) ) {
+ cache.addFilteredItemToMap(item,filteredItem);
+ } else {//'quietly' remove the item
+ cache.removeServiceIdMapSendNoEvent(serviceID, itemReg);
+ return;
+ }//endif
}//endif
- }//endif
- /* Either the filter was retried and passed, in which case,
- * the filtered itemCopy was placed in the map; or the
- * filter wasn't applied above (a non-null filteredItem
- * field in the itemReg in the map means that the filter
- * was applied at some previous time). In either case, the
- * service can now be "un-discarded", and a notification
- * that the service is now available can be sent.
- */
- ServiceItem itemToSend;
- synchronized(itemReg) {
+ /* Either the filter was retried and passed, in which case,
+ * the filtered itemCopy was placed in the map; or the
+ * filter wasn't applied above (a non-null filteredItem
+ * field in the itemReg in the map means that the filter
+ * was applied at some previous time). In either case, the
+ * service can now be "un-discarded", and a notification
+ * that the service is now available can be sent.
+ */
itemReg.setDiscarded(false);
itemToSend = itemReg.filteredItem;
}//end sync(itemReg)
- addServiceNotify(itemToSend);
+ cache.addServiceNotify(itemToSend);
}//endif
logger.finest("ServiceDiscoveryManager - "
@@ -1475,26 +1664,32 @@ public class ServiceDiscoveryManager {
* queue another filter attempt for later but
* send NO removed event
*/
- private final class NewOldServiceTask extends ServiceIdTask {
+ private static final class NewOldServiceTask extends ServiceIdTask {
private final ServiceItem srvcItem;
private final boolean matchMatchEvent;
- public NewOldServiceTask(ProxyReg reg,
+ private final LookupCacheImpl cache;
+ public NewOldServiceTask(
+ ProxyReg reg,
ServiceItem item,
boolean matchMatchEvent,
- long seqN)
+ long seqN,
+ LookupCacheImpl cache)
{
super(item.serviceID, reg, seqN);
this.srvcItem = item;
this.matchMatchEvent = matchMatchEvent;
+ this.cache = cache;
}//end constructor
public void run() {
logger.finest("ServiceDiscoveryManager - NewOldServiceTask "
+"started");
boolean previouslyDiscovered = false;
- ServiceItemReg itemReg = serviceIdMap.get(thisTaskSid);
+
+ ServiceItemReg itemReg = null;
+ itemReg = cache.serviceIdMap.get(thisTaskSid);
if (itemReg == null) {
- if( !eventRegMap.containsKey(reg) ) {
+ if( !cache.eventRegMap.containsKey(reg) ) {
/* reg must have been discarded, simply return */
logger.finest("ServiceDiscoveryManager - "
+"NewOldServiceTask completed");
@@ -1502,23 +1697,34 @@ public class ServiceDiscoveryManager {
}//endif
// else
itemReg = new ServiceItemReg( reg.getProxy(), srvcItem );
- ServiceItemReg existed = serviceIdMap.putIfAbsent( thisTaskSid, itemReg );
- if (existed != null) {
+ ServiceItemReg existed = cache.serviceIdMap.putIfAbsent(thisTaskSid, itemReg);
+ if (existed != null){
itemReg = existed;
- // Probably changed while we were stuffing around.
previouslyDiscovered = true;
}
} else {
previouslyDiscovered = true;
}
+
if(previouslyDiscovered) {//a. old, previously discovered item
- itemMatchMatchChange(reg.getProxy(), srvcItem,
+// if (itemReg.hasNoProxys()) { // In danger of removal
+// newItemReg = (ServiceItemReg) itemReg.clone();
+// if ( serviceIdMap.replace(thisTaskSid, itemReg, newItemReg)){
+// itemReg = newItemReg;
+// }
+// }
+ // If it didn't get replaced the added sync is reentrant,
+ // otherwise we sync again in case it did get replaced.
+ synchronized (itemReg){
+ cache.itemMatchMatchChange(reg.getProxy(), srvcItem,
itemReg, matchMatchEvent);
+ }
+
} else {//b. newly discovered item
ServiceItem newFilteredItem =
- filterMaybeDiscard(srvcItem, reg.getProxy(),false);
+ cache.filterMaybeDiscard(srvcItem, false);
if(newFilteredItem != null) {
- addServiceNotify(newFilteredItem);
+ cache.addServiceNotify(newFilteredItem);
}//endif
}//endif
logger.finest("ServiceDiscoveryManager - NewOldServiceTask "
@@ -1557,15 +1763,14 @@ public class ServiceDiscoveryManager {
* service will not concurrently modify any state related to that
* service.
*/
- private final class UnmapProxyTask extends ServiceIdTask {
+ private static final class UnmapProxyTask extends ServiceIdTask {
private final ServiceItemReg itemReg;
- public UnmapProxyTask(ProxyReg reg,
- ServiceItemReg itemReg,
- ServiceID srvcId,
- long seqN)
+ private final LookupCacheImpl cache;
+ public UnmapProxyTask(ProxyReg reg, ServiceItemReg itemReg, ServiceID srvcId, long seqN, LookupCacheImpl cache)
{
super(srvcId, reg, seqN);
this.itemReg = itemReg;
+ this.cache = cache;
}//end constructor
public void run() {
@@ -1573,19 +1778,25 @@ public class ServiceDiscoveryManager {
+"started");
ServiceRegistrar proxy = null;
ServiceItem item;
+ boolean notify = false;
synchronized(itemReg) {
item = itemReg.removeProxy(reg.getProxy());//disassociate the LUS
if (item != null) {// new LUS chosen to track changes
proxy = itemReg.proxy;
- } else if( itemReg.hasNoProxys() ) {//no more LUSs, remove from map
+ } else if( itemReg.hasNoProxys()) {//no more LUSs, remove from map
item = itemReg.filteredItem;
+ boolean removed = false;
+ removed = (cache.removeServiceIdMapSendNoEvent(thisTaskSid, itemReg));
+ if (removed && !itemReg.isDiscarded()){
+ itemReg.setDiscarded(true);
+ notify = true;
+ }
}//endif
}//end sync(itemReg)
if(proxy != null) {
- itemMatchMatchChange(proxy, item, itemReg, false);
- } else if(item != null) {
- removeServiceIdMap(thisTaskSid,item);
+ cache.itemMatchMatchChange(proxy, item, itemReg, false);
}//endif
+ if (notify) cache.removeServiceNotify(item);
logger.finest("ServiceDiscoveryManager - UnmapProxyTask "
+"completed");
}//end run
@@ -1603,11 +1814,11 @@ public class ServiceDiscoveryManager {
private volatile RemoteEventListener lookupListenerProxy;
/** Task manager for the various tasks executed by this LookupCache */
private volatile ExecutorService cacheTaskMgr;
- private volatile DependencyLinker cacheDependencyLinker;
+ private volatile CacheTaskQueue taskQueue;
/* Flag that indicates if the LookupCache has been terminated. */
private volatile boolean bCacheTerminated = false;
/* Contains the ServiceDiscoveryListener's that receive local events */
- private final ArrayList<ServiceDiscoveryListener> sItemListeners = new ArrayList<ServiceDiscoveryListener>(1);
+ private final List<ServiceDiscoveryListener> sItemListeners = new ArrayList<ServiceDiscoveryListener>(1);
/* Map from ServiceID to ServiceItemReg */
private final ConcurrentMap<ServiceID,ServiceItemReg> serviceIdMap = new ConcurrentHashMap<ServiceID,ServiceItemReg>();
/* Map from ProxyReg to EventReg: (proxyReg, {source,id,seqNo,lease})*/
@@ -1636,10 +1847,14 @@ public class ServiceDiscoveryManager {
*/
private final AtomicLong taskSeqN = new AtomicLong();
- public LookupCacheImpl(ServiceTemplate tmpl,
+ private final ServiceDiscoveryManager sdm;
+
+ public LookupCacheImpl(
+ ServiceTemplate tmpl,
ServiceItemFilter filter,
ServiceDiscoveryListener sListener,
- long leaseDuration) throws RemoteException
+ long leaseDuration,
+ ServiceDiscoveryManager sdm) throws RemoteException
{
this.serviceDiscardFutures = RC.concurrentMap(new ConcurrentHashMap<Referrer<ServiceID>,Referrer<Future>>(), Ref.WEAK_IDENTITY, Ref.STRONG, 60000, 60000);
this.tmpl = copyServiceTemplate(tmpl);
@@ -1647,6 +1862,7 @@ public class ServiceDiscoveryManager {
this.filter = filter;
lookupListener = new LookupListener();
if(sListener != null ) sItemListeners.add(sListener);
+ this.sdm = sdm;
}//end constructor
// This method's javadoc is inherited from an interface of this class
@@ -1655,12 +1871,11 @@ public class ServiceDiscoveryManager {
if(bCacheTerminated) return;//allow for multiple terminations
bCacheTerminated = true;
}//end sync
- synchronized (caches){
- caches.remove(this);
+ synchronized (sdm.caches){
+ sdm.caches.remove(this);
}
/* Terminate all tasks: first, terminate this cache's TaskManager*/
cacheTaskMgr.shutdownNow();
- cacheTaskMgr = null;
/* Terminate ServiceDiscardTimerTasks running for this cache */
synchronized(serviceDiscardMutex) {
serviceDiscardTimerTaskMgr.shutdownNow();
@@ -1671,7 +1886,7 @@ public class ServiceDiscoveryManager {
while(iter.hasNext()) {
Map.Entry e = (Map.Entry)iter.next();
EventReg eReg = (EventReg)e.getValue();
- cancelLease(eReg.lease);
+ sdm.cancelLease(eReg.lease);
}//end loop
/* Un-export the remote listener for events from lookups. */
try {
@@ -1692,7 +1907,7 @@ public class ServiceDiscoveryManager {
if (ret.length == 0 ) return null;
// Maths.abs(Integer.MIN_VALUE) = -ve, so to avoid random
// hard to debug bugs, this has been changed.
- int rand = random.nextInt(ret.length);
+ int rand = sdm.random.nextInt(ret.length);
return ret[rand];
}//end LookupCacheImpl.lookup
@@ -1705,7 +1920,7 @@ public class ServiceDiscoveryManager {
ServiceItem[] sa = getServiceItems(myFilter);
int len = sa.length;
if (len == 0 ) return new ServiceItem[0];
- int rand = random.nextInt(Integer.MAX_VALUE) % len;
+ int rand = sdm.random.nextInt(Integer.MAX_VALUE) % len;
for(int i=0; i<len; i++) {
items.add(sa[(i+rand) % len ]);
if(items.size() == maxMatches)
@@ -1724,11 +1939,10 @@ public class ServiceDiscoveryManager {
* exists, and it's not already discarded, then queue a task
* to discard the given serviceReference.
*/
- boolean discardIt = false;
- Iterator iter = getServiceIdMapEntrySetIterator();
+ Iterator<Map.Entry<ServiceID,ServiceItemReg>> iter = getServiceIdMapEntrySetIterator();
while(iter.hasNext()) {
- Map.Entry e = (Map.Entry)iter.next();
- ServiceItemReg itemReg = (ServiceItemReg)e.getValue();
+ Map.Entry<ServiceID,ServiceItemReg> e = iter.next();
+ ServiceItemReg itemReg = e.getValue();
ServiceItem filteredItem;
synchronized(itemReg) {
filteredItem = itemReg.filteredItem;
@@ -1736,22 +1950,19 @@ public class ServiceDiscoveryManager {
{
if( itemReg.isDiscarded() ) return;//already discarded
itemReg.setDiscarded(true);
- discardIt = true;
+ ServiceID sid = e.getKey();
+ Future f = serviceDiscardTimerTaskMgr.submit
+ ( new ServiceDiscardTimerTask(this, sid) );
+ serviceDiscardFutures.put(sid, f);
+ taskQueue.submit(new DiscardServiceTask(filteredItem));
+ return;
}//endif
}//end sync(itemReg)
- if(discardIt) {
- ServiceID sid = (ServiceID)e.getKey();
- Future f = serviceDiscardTimerTaskMgr.submit
- ( new ServiceDiscardTimerTask(sid) );
- serviceDiscardFutures.put(sid, f);
- cacheDependencyLinker.execute(new DiscardServiceTask(filteredItem));
- return;
- }//endif
}//end loop
}//end LookupCacheImpl.discard
/* Returns the iterator of entry set in the copy of the ServiceIdMap */
- private Iterator getServiceIdMapEntrySetIterator() {
+ private Iterator<Map.Entry<ServiceID,ServiceItemReg>> getServiceIdMapEntrySetIterator() {
return serviceIdMap.entrySet().iterator();
}//end LookupCacheImpl.getServiceIdMapEntrySetIterator
@@ -1826,7 +2037,8 @@ public class ServiceDiscoveryManager {
sItemListeners.add(listener);
}
ServiceItem[] items = getServiceItems(null);
- for(int i=0; i<items.length; i++) {
+ int l = items.length;
+ for(int i=0; i<l; i++) {
addServiceNotify(items[i],listener);
}//end loop
}//end LookupCacheImpl.addListener
@@ -1847,8 +2059,11 @@ public class ServiceDiscoveryManager {
*/
public void addProxyReg(ProxyReg reg) {
RegisterListenerTask treg =
- new RegisterListenerTask(reg, taskSeqN.getAndIncrement());
- cacheDependencyLinker.execute(treg);
+ new RegisterListenerTask(
+ reg,
+ taskSeqN.getAndIncrement(),
+ this);
+ taskQueue.submit(treg);
}//end LookupCacheImpl.addProxyReg
/** Remove a ProxyReg from the lookupCache. Called by DiscMgrListener's
@@ -1861,22 +2076,23 @@ public class ServiceDiscoveryManager {
EventReg eReg = eventRegMap.get(reg);
if(eReg != null) {
try {
- leaseRenewalMgr.remove(eReg.lease);
+ sdm.leaseRenewalMgr.remove(eReg.lease);
} catch(Exception e) {
logger.log(Level.FINER,
"exception occurred while removing an "
+"event registration lease", e);
}
}//endif
- t = new ProxyRegDropTask(reg, taskSeqN.getAndIncrement());
- cacheDependencyLinker.execute(t);
+ t = new ProxyRegDropTask(reg, eReg, taskSeqN.getAndIncrement(), this);
+ taskQueue.removeUselessTask(reg);
+ taskQueue.submit(t);
}//end LookupCacheImpl.removeProxyReg
/* Throws IllegalStateException if this lookup cache has been
* terminated
*/
private void checkCacheTerminated() {
- checkTerminated();
+ sdm.checkTerminated();
if(bCacheTerminated) {
throw new IllegalStateException
("this lookup cache was terminated");
@@ -1943,26 +2159,28 @@ public class ServiceDiscoveryManager {
break;
}//endif
}//end loop
- if(reg == null) return;//event arrived before eventReg in map
+ if(reg == null || eReg == null) return;//event arrived before eventReg in map
- /* Next, look for gaps in the event sequence. */
+ /* Next, look for any gaps in the event sequence. */
synchronized (eReg){
- long prevSeqNo = eReg.seqNo;
- eReg.seqNo = seqNo;
- CacheTask t;
- if(seqNo == (prevSeqNo+1)) {//no gap, handle current event
- t = new NotifyEventTask
- (reg, sid, item, transition, taskSeqN.getAndIncrement());
- if (eReg.lookupsPending > 0) {
- eReg.pending.add(t);
- return;
- }
- } else if (eReg.lookupsPending > 1) {
+ long delta = eReg.updateSeqNo(seqNo);
+ CacheTask t = null;
+ if(delta == 1) {//no gap, handle current event
+ t = new LookupCacheImpl.NotifyEventTask(
+ this,
+ reg,
+ sid,
+ item,
+ transition,
+ taskSeqN.getAndIncrement()
+ );
+ if (eReg.addIfLookupsPending(t)) return;
+ } else if (eReg.getLookupsPending() > 1) {
// gap in event sequence, but snapshot already pending
return;
} else {//gap in event sequence, request snapshot
- eReg.lookupsPending++;
- t = new LookupTask(reg, taskSeqN.getAndIncrement(), eReg);
+ eReg.incrementLookupsPending();
+ t = new LookupTask(reg, taskSeqN.getAndIncrement(), eReg, this);
if( logger.isLoggable(Levels.HANDLED) ) {
StringBuilder sb = new StringBuilder(300);
sb.append("notifyServiceMap - GAP in event sequence ")
@@ -1971,7 +2189,7 @@ public class ServiceDiscoveryManager {
.append("serviceID={2}], ")
.append("[eventSource={3}, ")
.append("eventID={4,number,#}, ")
- .append("oldSeqN={5,number,#}, ")
+ .append("delta={5,number,#}, ")
.append("newSeqN={6,number,#}]");
String msg = sb.toString();
Object[] params = new Object[] { reg !=null ? reg.getProxy() : "",
@@ -1979,24 +2197,18 @@ public class ServiceDiscoveryManager {
sid,
eventSource,
Long.valueOf(eventID),
- Long.valueOf(prevSeqNo),
+ Long.valueOf(delta),
Long.valueOf(seqNo) };
logger.log(Levels.HANDLED, msg, params);
}//endif
}//endif
- cacheDependencyLinker.execute(t);
+ if (t != null) taskQueue.submit(t);
} //end sync(eReg)
}//end LookupCacheImpl.notifyServiceMap
- /** Removes an entry from the serviceIdMap, and sends a notification.*/
- private void removeServiceIdMap(ServiceID sid, ServiceItem item) {
- removeServiceIdMapSendNoEvent(sid);
- removeServiceNotify(item);
- }//end LookupCacheImpl.removeServiceIdMap
-
/** Removes an entry in the serviceIdMap, but sends no notification. */
- private void removeServiceIdMapSendNoEvent(ServiceID sid) {
- serviceIdMap.remove(sid);
+ private boolean removeServiceIdMapSendNoEvent(ServiceID sid, ServiceItemReg itemReg) {
+ return serviceIdMap.remove(sid, itemReg);
}//end LookupCacheImpl.removeServiceIdMapSendNoEvent
/** Returns the element in the given items array having the given
@@ -2103,6 +2315,8 @@ public class ServiceDiscoveryManager {
ServiceItem oldItem;
ServiceItem oldFilteredItem;
boolean itemRegIsDiscarded;
+ boolean attrsChanged = false;
+ boolean versionChanged = false;
synchronized(itemReg) {
itemRegIsDiscarded = itemReg.isDiscarded();
if(!itemReg.addProxy(proxy, newItem)) { // not tracking
@@ -2116,34 +2330,31 @@ public class ServiceDiscoveryManager {
itemReg.filteredItem = null;//so filter will be retried
if(matchMatchEvent) return;
}//endif
- }//end sync(itemReg)
- /* For an explanation of the logic of the following if-else-block,
- * refer to the method description above.
- */
- boolean attrsChanged = false;
- boolean versionChanged = false;
- if( matchMatchEvent || sameVersion(newItem,oldItem) ) {
- if(itemRegIsDiscarded) return;
- /* Same version, determine if the attributes have changed.
- * But first, replace the new service proxy with the old
- * service proxy so the client always uses the old proxy
- * (at least, until the version is changed).
+
+ /* For an explanation of the logic of the following if-else-block,
+ * refer to the method description above.
*/
- synchronized(itemReg){
+
+ if( matchMatchEvent || sameVersion(newItem,oldItem) ) {
+ if(itemRegIsDiscarded) return;
+ /* Same version, determine if the attributes have changed.
+ * But first, replace the new service proxy with the old
+ * service proxy so the client always uses the old proxy
+ * (at least, until the version is changed).
+ */
newItem.service = oldItem.service;
/* Now compare attributes */
- attrsChanged =
- !LookupAttributes.equal(newItem.attributeSets,
- oldItem.attributeSets);
- }
-
- if(!attrsChanged) return;//no change, no need to filter
- } else {//(!matchMatchEvent && !same version) ==> re-registration
- versionChanged = true;
- }//endif
+ attrsChanged = !LookupAttributes.equal(newItem.attributeSets,
+ oldItem.attributeSets);
+
+ if(!attrsChanged) return;//no change, no need to filter
+ } else {//(!matchMatchEvent && !same version) ==> re-registration
+ versionChanged = true;
+ }//endif
+ }//end sync(itemReg)
/* Now apply the filter, and send events if appropriate */
ServiceItem newFilteredItem =
- filterMaybeDiscard(newItem, proxy, !itemRegIsDiscarded);
+ filterMaybeDiscard(newItem, !itemRegIsDiscarded);
if(newFilteredItem != null) {
/* Passed the filter, okay to send event(s). */
if(attrsChanged) changeServiceNotify(newFilteredItem,
@@ -2217,15 +2428,14 @@ public class ServiceDiscoveryManager {
ServiceItem item,
int action)
{
- ArrayList notifies;
+ List<ServiceDiscoveryListener> notifies;
synchronized(sItemListeners) {
if(sItemListeners.isEmpty()) return;
- notifies = (ArrayList)sItemListeners.clone();
+ notifies = new ArrayList<ServiceDiscoveryListener>(sItemListeners);
}
- Iterator iter = notifies.iterator();
+ Iterator<ServiceDiscoveryListener> iter = notifies.iterator();
while (iter.hasNext()) {
- ServiceDiscoveryListener sl
- = (ServiceDiscoveryListener)iter.next();
+ ServiceDiscoveryListener sl = iter.next();
serviceNotifyDo(oldItem,item,action,sl);
}//end loop
}//end LookupCacheImpl.serviceNotifyDo
@@ -2255,7 +2465,7 @@ public class ServiceDiscoveryManager {
new BasicILFactory(),
false, false);
lookupListenerExporter =
- (Exporter)thisConfig.getEntry(COMPONENT_NAME,
+ sdm.thisConfig.getEntry(COMPONENT_NAME,
"eventListenerExporter",
Exporter.class,
defaultExporter );
@@ -2272,7 +2482,7 @@ public class ServiceDiscoveryManager {
* various tasks executed by this instance of the lookup cache.
*/
try {
- cacheTaskMgr = thisConfig.getEntry(COMPONENT_NAME,
+ cacheTaskMgr = sdm.thisConfig.getEntry(COMPONENT_NAME,
"cacheExecutorService",
ExecutorService.class);
} catch(ConfigurationException e) { /* use default */
@@ -2282,7 +2492,7 @@ public class ServiceDiscoveryManager {
10, /* Ignored */
15,
TimeUnit.SECONDS,
- new PriorityBlockingQueue(100), /* Unbounded */
+ new LinkedBlockingQueue(), /* Unbounded */
new NamedThreadFactory(
"SDM lookup cache",
false
@@ -2290,7 +2500,26 @@ public class ServiceDiscoveryManager {
);
}
- cacheDependencyLinker = new DependencyLinker(cacheTaskMgr);
+ cacheTaskMgr = new ExtensibleExecutorService
+ (
+ cacheTaskMgr,
+ new RunnableFutureFactory(){
+
+ @Override
+ public <T> RunnableFuture<T> newTaskFor(Runnable r, T value) {
+ if (r instanceof ObservableFutureTask) return (RunnableFuture<T>) r;
+ return new CacheTaskWrapper<T>(r, value);
+ }
+
+ @Override
+ public <T> RunnableFuture<T> newTaskFor(Callable<T> c) {
+ if (c instanceof ObservableFutureTask) return (RunnableFuture<T>) c;
+ return new CacheTaskWrapper<T>(c);
+ }
+ }
+ );
+
+ taskQueue = new CacheTaskQueue(cacheTaskMgr);
/* Get a special-purpose task manager for this cache from the
* configuration. That task manager will be used to manage the
* various instances of the special-purpose task, executed by
@@ -2299,7 +2528,7 @@ public class ServiceDiscoveryManager {
*/
try {
serviceDiscardTimerTaskMgr
- = thisConfig.getEntry(COMPONENT_NAME,
+ = sdm.thisConfig.getEntry(COMPONENT_NAME,
"discardExecutorService",
ExecutorService.class);
} catch(ConfigurationException e) { /* use default */
@@ -2319,7 +2548,7 @@ public class ServiceDiscoveryManager {
}
// Moved here from constructor to avoid publishing this reference
lookupListenerProxy = lookupListener.export();
- Iterator<ProxyReg> it = proxyRegSet.iterator();
+ Iterator<ProxyReg> it = sdm.proxyRegSet.iterator();
while (it.hasNext()){
addProxyReg(it.next());
}
@@ -2351,9 +2580,7 @@ public class ServiceDiscoveryManager {
* <code>ServiceItemReg</code> element of the
* <code>serviceIdMap</code> is left unchanged.
*/
- private ServiceItem filterMaybeDiscard(ServiceItem item,
- ServiceRegistrar proxy,
- boolean sendEvent)
+ private ServiceItem filterMaybeDiscard(ServiceItem item, boolean sendEvent)
{
if( (item == null) || (item.service == null) ) return null;
if(filter == null) {
@@ -2369,21 +2596,22 @@ public class ServiceDiscoveryManager {
if(!pass) {
ServiceID srvcID = item.serviceID;
ServiceItemReg itemReg = serviceIdMap.get(srvcID);
+ boolean notify = false;
+ boolean itemRegIsDiscarded = false;
+ ServiceItem oldFilteredItem = null;
if(itemReg != null) {
- if(sendEvent) {
- ServiceItem oldFilteredItem;
- synchronized(itemReg) {
+ synchronized (itemReg){
+ if(sendEvent) {
oldFilteredItem = itemReg.filteredItem;
- }//end sync(itemReg)
- removeServiceIdMap(srvcID, oldFilteredItem);
- } else {
- boolean itemRegIsDiscarded;
- synchronized(itemReg) {
- itemRegIsDiscarded = itemReg.isDiscarded();
- }//end sync(itemReg)
- removeServiceIdMapSendNoEvent(srvcID);
- if(itemRegIsDiscarded) cancelDiscardTask(srvcID);
- }//endif
+ notify = removeServiceIdMapSendNoEvent(srvcID, itemReg);
+ itemReg.setDiscarded(notify);
+ } else {
+ itemRegIsDiscarded = itemReg.isDiscarded();
+ removeServiceIdMapSendNoEvent(srvcID, itemReg);
+ }//endif
+ }// end sync(itemReg)
+ if (notify) removeServiceNotify(oldFilteredItem);
+ if(itemRegIsDiscarded) cancelDiscardTask(srvcID);
}//endif
return null;
}//endif(fail)
@@ -2393,7 +2621,7 @@ public class ServiceDiscoveryManager {
return filteredItem;
}//endif(pass)
/* Handle filter indefinite */
- discardRetryLater(item, proxy, sendEvent);
+ discardRetryLater(item, sendEvent);
return null;
}//end LookupCacheImpl.filterMaybeDiscard
@@ -2432,9 +2660,7 @@ public class ServiceDiscoveryManager {
* contain a <code>ServiceItemReg</code> corresponding to the
* given <code>ServiceItem</code>, then this method simply returns.
*/
- private void discardRetryLater(ServiceItem item,
- ServiceRegistrar proxy,
- boolean sendEvent) {
+ private void discardRetryLater(ServiceItem item, boolean sendEvent) {
ServiceItemReg itemReg = serviceIdMap.get(item.serviceID);
if(itemReg == null) return;
ServiceItem oldFilteredItem;
@@ -2453,7 +2679,7 @@ public class ServiceDiscoveryManager {
itemReg.setDiscarded(true);
}//end sync(itemReg)
Future f = serviceDiscardTimerTaskMgr.submit
- ( new ServiceDiscardTimerTask(item.serviceID) );
+ ( new ServiceDiscardTimerTask(this, item.serviceID) );
serviceDiscardFutures.put(item.serviceID, f);
if(sendEvent) removeServiceNotify(oldFilteredItem);
}//end LookupCacheImpl.discardRetryLater
@@ -2464,34 +2690,35 @@ public class ServiceDiscoveryManager {
* and wakes up the <code>ServiceDiscardTimerTask</code> if the given
* <code>item</code> is discarded; otherwise, sends a removed event.
*/
- private void handleMatchNoMatch(ServiceRegistrar proxy,
- ServiceID srvcID,
- ServiceItem item)
+ private void handleMatchNoMatch(ServiceRegistrar proxy, ServiceID srvcID)
{
ServiceItemReg itemReg = serviceIdMap.get(srvcID);
if(itemReg != null) {
ServiceItem newItem;
- boolean itemRegHasNoProxys;
- boolean itemRegIsDiscarded;
ServiceItem filteredItem;
+ boolean notify = false;
+ ServiceRegistrar itemRegProxy = null;
synchronized(itemReg) {
newItem = itemReg.removeProxy(proxy);
- itemRegHasNoProxys = itemReg.hasNoProxys();
- itemRegIsDiscarded = itemReg.isDiscarded();
filteredItem = itemReg.filteredItem;
- }//end sync(itemReg)
- if(newItem != null) {
- itemMatchMatchChange(itemReg.proxy, newItem, itemReg,
- false);
- } else if(itemRegHasNoProxys) {
- if(itemRegIsDiscarded) {
- /* Remove item from map and wake up the discard task */
- removeServiceIdMapSendNoEvent(srvcID);
- cancelDiscardTask(srvcID);
- } else {//remove item from map and send removed event
- removeServiceIdMap(srvcID, filteredItem);
+ if(newItem != null) {
+ itemRegProxy = itemReg.proxy;
+ } else if(itemReg.hasNoProxys()) {
+ if(itemReg.isDiscarded()) {
+ /* Remove item from map and wake up the discard task */
+ removeServiceIdMapSendNoEvent(srvcID, itemReg);
+ cancelDiscardTask(srvcID);
+ } else {//remove item from map and send removed event
+ notify = removeServiceIdMapSendNoEvent(srvcID, itemReg);
+ itemReg.setDiscarded(notify);
+ }//endif
}//endif
- }//endif
+ }//end sync(itemReg)
+ if (itemRegProxy != null) {
+ itemMatchMatchChange(itemRegProxy, newItem, itemReg, false);
+ } else if (notify){
+ removeServiceNotify(filteredItem);
+ }
}//endif
}//end LookupCacheImpl.handleMatchNoMatch
@@ -2530,11 +2757,12 @@ public class ServiceDiscoveryManager {
private final List<LookupCache> caches;
/* Flag to indicate if the ServiceDiscoveryManager has been terminated. */
- private volatile boolean bTerminated = false;
+ private boolean bTerminated = false; //sync on this
+ private final Thread terminatorThread;
private final LookupCacheTerminator terminator;
-
- private volatile boolean started = false;
+ /* Flag to indicate LookupCacheTerminator has been started */
+ private boolean started = false; // sync on terminatorThread
/* Object used to obtain the configuration items for this utility. */
private final Configuration thisConfig;
/* Preparer for the proxies to the lookup services that are discovered
@@ -2899,6 +3127,8 @@ public class ServiceDiscoveryManager {
discMgrListener = new DiscMgrListener();
discMgr.addDiscoveryListener(discMgrListener);
terminator = new LookupCacheTerminator();
+ terminatorThread = new Thread(terminator, "SDM lookup cache terminator");
+ terminatorThread.setDaemon(false);
}
/** Sends discarded event to each listener waiting for discarded lookups.*/
@@ -3307,7 +3537,7 @@ public class ServiceDiscoveryManager {
discMgr.removeDiscoveryListener(discMgrListener);
if(discMgrInternal) discMgr.terminate();
}//end sync
- terminator.interrupt();
+ terminatorThread.interrupt();
/* Terminate all caches: cancel event leases, un-export listeners */
List<LookupCache> terminate;
synchronized (caches){
@@ -3695,9 +3925,13 @@ public class ServiceDiscoveryManager {
long leaseDuration)
throws RemoteException
{
- if (!started) terminator.start();
+ /* Atomic start of terminator */
+ synchronized (terminatorThread){
+ if (!started) terminatorThread.start();
+ started = true;
+ }
if(tmpl == null) tmpl = new ServiceTemplate(null, null, null);
- LookupCacheImpl cache = new LookupCacheImpl(tmpl, filter, listener, leaseDuration);
+ LookupCacheImpl cache = new LookupCacheImpl(tmpl, filter, listener, leaseDuration, this);
cache.initCache();
synchronized (caches){
caches.add(cache);
@@ -3841,7 +4075,7 @@ public class ServiceDiscoveryManager {
/** Throws an IllegalStateException if the current instance of the
* ServiceDiscoveryManager has been terminated.
*/
- private void checkTerminated() {
+ private synchronized void checkTerminated() {
if(bTerminated) {
throw new IllegalStateException
("service discovery manager was terminated");
Modified: river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/api/util/FutureObserver.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/api/util/FutureObserver.java?rev=1563596&r1=1563595&r2=1563596&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/api/util/FutureObserver.java (original)
+++ river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/api/util/FutureObserver.java Sun Feb 2 12:31:09 2014
@@ -39,4 +39,21 @@ public interface FutureObserver<T> exten
public boolean addObserver(FutureObserver<T> observer);
}
+ public interface Subscriber<T> {
+
+ /**
+ * Prior to completion, if a Future sets off additional
+ * ObservableFuture tasks, these may be recommended for
+ * Subscribers to Observe.
+ *
+ * @param e
+ */
+ public void reccommendedViewing(ObservableFuture<T> e);
+ }
+
+ public interface Subscribeable<T>{
+
+ public void subscribe(Subscriber<T> subscriber);
+ }
+
}
Added: river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/DependencyLinker.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/DependencyLinker.java?rev=1563596&view=auto
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/DependencyLinker.java (added)
+++ river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/DependencyLinker.java Sun Feb 2 12:31:09 2014
@@ -0,0 +1,47 @@
+/*
+ * To change this license header, choose License Headers in Project Properties.
+ * To change this template file, choose Tools | Templates
+ * and open the template in the editor.
+ */
+
+package org.apache.river.impl.thread;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import org.apache.river.api.util.FutureObserver;
+
+/**
+ *
+ * @author peter
+ */
+public class DependencyLinker implements FutureObserver {
+ private final ExecutorService executor;
+ private final List<ObservableFuture> tasks;
+ private final FutureTask dependant;
+
+ public DependencyLinker(ExecutorService ex, List<ObservableFuture> tasks, FutureTask dep) {
+ executor = ex;
+ this.tasks = new ArrayList<ObservableFuture>(tasks);
+ dependant = dep;
+ }
+
+ public synchronized void register() {
+ Iterator<ObservableFuture> it = tasks.iterator();
+ while (it.hasNext()) {
+ it.next().addObserver(this);
+ }
+ }
+
+ @Override
+ public synchronized void futureCompleted(Future e) {
+ tasks.remove(e);
+ if (tasks.isEmpty()) {
+ executor.submit(dependant);
+ }
+ }
+
+}
Modified: river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/ObservableFutureTask.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/ObservableFutureTask.java?rev=1563596&r1=1563595&r2=1563596&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/ObservableFutureTask.java (original)
+++ river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/ObservableFutureTask.java Sun Feb 2 12:31:09 2014
@@ -24,7 +24,9 @@ import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;
import org.apache.river.api.util.FutureObserver;
+import org.apache.river.api.util.FutureObserver.Subscribeable;
import org.apache.river.api.util.FutureObserver.ObservableFuture;
+import org.apache.river.api.util.FutureObserver.Subscriber;
/**
*
@@ -37,13 +39,20 @@ public class ObservableFutureTask<T> ext
public ObservableFutureTask(Callable<T> callable) {
super(callable);
listeners = new LinkedList<FutureObserver<T>>();
+ if (callable instanceof Subscribeable){
+ ((Subscribeable<T>) callable).subscribe(new FutureSubscriber(listeners));
+ }
}
public ObservableFutureTask(Runnable r, T result){
super(r,result);
listeners = new LinkedList<FutureObserver<T>>();
+ if (r instanceof Subscribeable){
+ ((Subscribeable<T>) r).subscribe(new FutureSubscriber(listeners));
+ }
}
+ @Override
protected void done() {
done = true;
synchronized (listeners){
@@ -65,4 +74,26 @@ public class ObservableFutureTask<T> ext
return listeners.add(l);
}
}
+
+ private static final class FutureSubscriber<T> implements Subscriber<T> {
+ /* Shared list of subscribers */
+ public final List<FutureObserver<T>> listeners;
+
+ FutureSubscriber(List<FutureObserver<T>> listeners){
+ this.listeners = listeners;
+ }
+ @Override
+ public void reccommendedViewing(ObservableFuture<T> e) {
+ synchronized (listeners){
+ Iterator<FutureObserver<T>> it = listeners.iterator();
+ while (it.hasNext()){
+ FutureObserver<T> future = it.next();
+ if (future instanceof Subscriber){
+ Subscriber<T> subscriber = (Subscriber<T>) future;
+ subscriber.reccommendedViewing(e);
+ }
+ }
+ }
+ }
+ }
}