You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@river.apache.org by pe...@apache.org on 2014/02/03 12:56:33 UTC
svn commit: r1563851 - in /river/jtsk/skunk/qa_refactor/trunk/src:
net/jini/lookup/JoinManager.java net/jini/lookup/ServiceDiscoveryManager.java
org/apache/river/impl/thread/DependencyLinker.java
Author: peter_firmstone
Date: Mon Feb 3 11:56:32 2014
New Revision: 1563851
URL: http://svn.apache.org/r1563851
Log:
New dependency linking and call backs for JoinManager ProxyRegTask. Same dependency code.
Modified:
river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/JoinManager.java
river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/ServiceDiscoveryManager.java
river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/DependencyLinker.java
Modified: river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/JoinManager.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/JoinManager.java?rev=1563851&r1=1563850&r2=1563851&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/JoinManager.java (original)
+++ river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/JoinManager.java Mon Feb 3 11:56:32 2014
@@ -53,9 +53,11 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ThreadPoolExecutor;
@@ -63,6 +65,7 @@ import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.river.api.util.FutureObserver;
+import org.apache.river.impl.thread.DependencyLinker;
import org.apache.river.impl.thread.ExtensibleExecutorService;
import org.apache.river.impl.thread.ExtensibleExecutorService.RunnableFutureFactory;
import org.apache.river.impl.thread.NamedThreadFactory;
@@ -688,22 +691,17 @@ public class JoinManager {
* @param tasks the tasks with which to compare the current task
* @param size elements with index less than size are considered
*/
-// public boolean runAfter(List tasks, int size) {
-// /* If the service's ID has already been set, then it's okay
-// * to run all ProxyRegTask's in parallel, otherwise, the
-// * ProxyRegTask with the lowest sequence number should be run.
-// */
-// if(serviceItem.serviceID != null) return false;
-// /* For task with lowest seq #, run it now; else run it later */
-// for(int i=0; i<size; i++) {
-// Object t = tasks.get(i);
-// if (t instanceof ProxyRegTask){
-// int nextTaskSeqN = ((ProxyRegTask)t).getSeqN();
-// if( seqN > nextTaskSeqN ) return true;
-// }
-// }//end loop
-// return false;
-// }//end runAfter
+ public boolean dependsOn(ProxyRegTask t) {
+ return seqN > t.getSeqN();
+ }
+
+ /* If the service's ID has already been set, then it's okay
+ * to run all ProxyRegTask's in parallel, otherwise, the
+ * ProxyRegTask with the lowest sequence number should be run.
+ */
+ public boolean hasDeps(){
+ return serviceItem.serviceID == null;
+ }
/** Accessor method that returns the instance of <code>ProxyReg</code>
* (the lookup service) associated with the task represented by
@@ -755,6 +753,46 @@ public class JoinManager {
return 0;
}
}//end class ProxyRegTask
+
+ private static final class ProxyRegTaskQueue implements FutureObserver {
+ // CacheTasks pending completion.
+ private final ConcurrentLinkedQueue<ProxyRegTask> pending;
+ private final ExecutorService executor;
+
+ private ProxyRegTaskQueue(ExecutorService e){
+ this.pending = new ConcurrentLinkedQueue<ProxyRegTask>();
+ executor = e;
+ }
+
+ private Future submit(ProxyRegTask t){
+ pending.offer(t);
+ t.addObserver(this);
+ if (t.hasDeps()) {
+ List<ObservableFuture> deps = new LinkedList<ObservableFuture>();
+ Iterator<ProxyRegTask> it = pending.iterator();
+ while (it.hasNext()){
+ ProxyRegTask c = it.next();
+ if (t.dependsOn(c)) {
+ deps.add(c);
+ }
+ }
+ if (deps.isEmpty()){
+ executor.submit(t);
+ } else {
+ DependencyLinker linker = new DependencyLinker(executor, deps, t);
+ linker.register();
+ }
+ } else {
+ executor.submit(t);
+ }
+ return t;
+ }
+
+ @Override
+ public void futureCompleted(Future e) {
+ pending.remove(e);
+ }
+ }
/** Abstract base class from which all the sub-task classes are derived. */
private static abstract class JoinTask {
@@ -1234,7 +1272,7 @@ public class JoinManager {
if(this.proxyRegTask == null) {
this.proxyRegTask = new ProxyRegTask(this,taskSeqN++);
this.proxyRegTask.addObserver(this);
- future = taskMgr.submit(this.proxyRegTask);
+ future = proxyRegTaskQueue.submit(this.proxyRegTask);
}//endif
}//end sync(taskList)
synchronized (runningTasks){
@@ -1480,6 +1518,7 @@ public class JoinManager {
* <code>TaskManager</code>.
*/
private final ExecutorService taskMgr;
+ private final ProxyRegTaskQueue proxyRegTaskQueue;
/** Maximum number of times a failed task is allowed to be re-executed. */
private final int maxNRetries;
/** Wakeup manager for the various tasks executed by this join manager.
@@ -2580,7 +2619,7 @@ public class JoinManager {
MAX_N_TASKS, /* Ignored */
15,
TimeUnit.SECONDS,
- new PriorityBlockingQueue(100), /* Unbounded Queue */
+ new LinkedBlockingQueue(), /* Unbounded Queue */
new NamedThreadFactory("JoinManager executor thread", false)
);
}
@@ -2670,6 +2709,7 @@ public class JoinManager {
}
});
+ proxyRegTaskQueue = new ProxyRegTaskQueue(taskMgr);
wakeupMgr = conf.wakeupManager;
maxNRetries = conf.maxNretrys;
leaseRenewalMgr = conf.leaseRenewalManager;
Modified: river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/ServiceDiscoveryManager.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/ServiceDiscoveryManager.java?rev=1563851&r1=1563850&r2=1563851&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/ServiceDiscoveryManager.java (original)
+++ river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/ServiceDiscoveryManager.java Mon Feb 3 11:56:32 2014
@@ -1022,7 +1022,7 @@ public class ServiceDiscoveryManager {
CacheTask t = w.getTask();
if(t.isFromProxy(reg)) {
w.cancel(true); // Also causes task to be removed
- }
+ }
}//end loop
}//end LookupCacheImpl.removeUselessTask
}
Modified: river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/DependencyLinker.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/DependencyLinker.java?rev=1563851&r1=1563850&r2=1563851&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/DependencyLinker.java (original)
+++ river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/DependencyLinker.java Mon Feb 3 11:56:32 2014
@@ -12,6 +12,7 @@ import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
+import java.util.concurrent.RunnableFuture;
import org.apache.river.api.util.FutureObserver;
/**
@@ -21,9 +22,9 @@ import org.apache.river.api.util.FutureO
public class DependencyLinker implements FutureObserver {
private final ExecutorService executor;
private final List<ObservableFuture> tasks;
- private final FutureTask dependant;
+ private final RunnableFuture dependant;
- public DependencyLinker(ExecutorService ex, List<ObservableFuture> tasks, FutureTask dep) {
+ public DependencyLinker(ExecutorService ex, List<ObservableFuture> tasks, RunnableFuture dep) {
executor = ex;
this.tasks = new ArrayList<ObservableFuture>(tasks);
dependant = dep;