You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@river.apache.org by pe...@apache.org on 2014/02/03 12:56:33 UTC

svn commit: r1563851 - in /river/jtsk/skunk/qa_refactor/trunk/src: net/jini/lookup/JoinManager.java net/jini/lookup/ServiceDiscoveryManager.java org/apache/river/impl/thread/DependencyLinker.java

Author: peter_firmstone
Date: Mon Feb  3 11:56:32 2014
New Revision: 1563851

URL: http://svn.apache.org/r1563851
Log:
New dependency linking and call backs for JoinManager ProxyRegTask.  Same dependency code.

Modified:
    river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/JoinManager.java
    river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/ServiceDiscoveryManager.java
    river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/DependencyLinker.java

Modified: river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/JoinManager.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/JoinManager.java?rev=1563851&r1=1563850&r2=1563851&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/JoinManager.java (original)
+++ river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/JoinManager.java Mon Feb  3 11:56:32 2014
@@ -53,9 +53,11 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.RunnableFuture;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -63,6 +65,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 import org.apache.river.api.util.FutureObserver;
+import org.apache.river.impl.thread.DependencyLinker;
 import org.apache.river.impl.thread.ExtensibleExecutorService;
 import org.apache.river.impl.thread.ExtensibleExecutorService.RunnableFutureFactory;
 import org.apache.river.impl.thread.NamedThreadFactory;
@@ -688,22 +691,17 @@ public class JoinManager {
          *  @param tasks the tasks with which to compare the current task
          *  @param size  elements with index less than size are considered
          */
-//        public boolean runAfter(List tasks, int size) {
-//            /* If the service's ID has already been set, then it's okay
-//             * to run all ProxyRegTask's in parallel, otherwise, the
-//             * ProxyRegTask with the lowest sequence number should be run.
-//             */
-//            if(serviceItem.serviceID != null)  return false;
-//            /* For task with lowest seq #, run it now; else run it later */
-//            for(int i=0; i<size; i++) {
-//                Object t = tasks.get(i);
-//                if (t instanceof ProxyRegTask){
-//                    int nextTaskSeqN = ((ProxyRegTask)t).getSeqN();
-//                    if( seqN > nextTaskSeqN )  return true;
-//                }
-//            }//end loop
-//            return false;
-//        }//end runAfter
+        public boolean dependsOn(ProxyRegTask t) {
+            return seqN > t.getSeqN();
+        }
+        
+        /* If the service's ID has already been set, then it's okay
+         * to run all ProxyRegTask's in parallel, otherwise, the
+         * ProxyRegTask with the lowest sequence number should be run.
+         */
+        public boolean hasDeps(){
+            return serviceItem.serviceID == null;
+        }
 
         /** Accessor method that returns the instance of <code>ProxyReg</code>
          *  (the lookup service) associated with the task represented by
@@ -755,6 +753,46 @@ public class JoinManager {
             return 0;
         }
     }//end class ProxyRegTask
+    
+        private static final class ProxyRegTaskQueue implements FutureObserver {
+        // CacheTasks pending completion.
+        private final ConcurrentLinkedQueue<ProxyRegTask> pending;
+        private final ExecutorService executor;
+        
+        private ProxyRegTaskQueue(ExecutorService e){
+            this.pending = new ConcurrentLinkedQueue<ProxyRegTask>();
+            executor = e;
+        }
+        
+        private Future submit(ProxyRegTask t){
+            pending.offer(t);
+            t.addObserver(this);
+            if (t.hasDeps()) {
+                List<ObservableFuture> deps = new LinkedList<ObservableFuture>();
+                Iterator<ProxyRegTask> it = pending.iterator();
+                while (it.hasNext()){
+                    ProxyRegTask c = it.next();
+                    if (t.dependsOn(c)) {
+                        deps.add(c);
+                    }
+                }
+                if (deps.isEmpty()){
+                    executor.submit(t);
+                } else {
+                    DependencyLinker linker = new DependencyLinker(executor, deps, t);
+                    linker.register();
+                }
+            } else {
+                executor.submit(t);
+            }
+            return t;
+        }
+
+        @Override
+        public void futureCompleted(Future e) {
+            pending.remove(e);
+        }
+    }
 
     /** Abstract base class from which all the sub-task classes are derived. */
     private static abstract class JoinTask {
@@ -1234,7 +1272,7 @@ public class JoinManager {
                 if(this.proxyRegTask == null) {
                     this.proxyRegTask = new ProxyRegTask(this,taskSeqN++);
                     this.proxyRegTask.addObserver(this);
-                    future = taskMgr.submit(this.proxyRegTask);
+                    future = proxyRegTaskQueue.submit(this.proxyRegTask);
                 }//endif
             }//end sync(taskList)
             synchronized (runningTasks){
@@ -1480,6 +1518,7 @@ public class JoinManager {
      *  <code>TaskManager</code>.
      */
     private final ExecutorService taskMgr;
+    private final ProxyRegTaskQueue proxyRegTaskQueue;
     /** Maximum number of times a failed task is allowed to be re-executed. */
     private final int maxNRetries;
     /** Wakeup manager for the various tasks executed by this join manager.
@@ -2580,7 +2619,7 @@ public class JoinManager {
                 MAX_N_TASKS, /* Ignored */
                 15,
                 TimeUnit.SECONDS,
-                new PriorityBlockingQueue(100), /* Unbounded Queue */
+                new LinkedBlockingQueue(), /* Unbounded Queue */
                 new NamedThreadFactory("JoinManager executor thread", false)
             );
         }
@@ -2670,6 +2709,7 @@ public class JoinManager {
             }
             
         });
+        proxyRegTaskQueue = new ProxyRegTaskQueue(taskMgr);
         wakeupMgr = conf.wakeupManager;
         maxNRetries = conf.maxNretrys;
         leaseRenewalMgr = conf.leaseRenewalManager;

Modified: river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/ServiceDiscoveryManager.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/ServiceDiscoveryManager.java?rev=1563851&r1=1563850&r2=1563851&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/ServiceDiscoveryManager.java (original)
+++ river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/ServiceDiscoveryManager.java Mon Feb  3 11:56:32 2014
@@ -1022,7 +1022,7 @@ public class ServiceDiscoveryManager {
                 CacheTask t = w.getTask();
                 if(t.isFromProxy(reg)) {
                     w.cancel(true); // Also causes task to be removed
-    }
+                }
             }//end loop
 	}//end LookupCacheImpl.removeUselessTask
     }

Modified: river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/DependencyLinker.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/DependencyLinker.java?rev=1563851&r1=1563850&r2=1563851&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/DependencyLinker.java (original)
+++ river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/DependencyLinker.java Mon Feb  3 11:56:32 2014
@@ -12,6 +12,7 @@ import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.FutureTask;
+import java.util.concurrent.RunnableFuture;
 import org.apache.river.api.util.FutureObserver;
 
 /**
@@ -21,9 +22,9 @@ import org.apache.river.api.util.FutureO
 public class DependencyLinker implements FutureObserver {
     private final ExecutorService executor;
     private final List<ObservableFuture> tasks;
-    private final FutureTask dependant;
+    private final RunnableFuture dependant;
 
-    public DependencyLinker(ExecutorService ex, List<ObservableFuture> tasks, FutureTask dep) {
+    public DependencyLinker(ExecutorService ex, List<ObservableFuture> tasks, RunnableFuture dep) {
         executor = ex;
         this.tasks = new ArrayList<ObservableFuture>(tasks);
         dependant = dep;