You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@river.apache.org by pe...@apache.org on 2014/02/18 11:43:00 UTC

svn commit: r1569259 - in /river/jtsk/skunk/qa_refactor/trunk: qa/src/com/sun/jini/test/impl/servicediscovery/event/ src/com/sun/jini/reggie/ src/net/jini/lookup/ src/org/apache/river/impl/thread/

Author: peter_firmstone
Date: Tue Feb 18 10:42:59 2014
New Revision: 1569259

URL: http://svn.apache.org/r1569259
Log:
Add some latency into ServiceDiscoveryManager's ExecutorService to simulate TaskManager's contention, sufficient to allow wiring up of dependencies between tasks.

Modified:
    river/jtsk/skunk/qa_refactor/trunk/qa/src/com/sun/jini/test/impl/servicediscovery/event/AddListenerEvent.java
    river/jtsk/skunk/qa_refactor/trunk/qa/src/com/sun/jini/test/impl/servicediscovery/event/ReRegisterGoodEquals.java
    river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/reggie/RegistrarImpl.java
    river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/ServiceDiscoveryManager.java
    river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/DependencyLinker.java
    river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/SynchronousExecutors.java

Modified: river/jtsk/skunk/qa_refactor/trunk/qa/src/com/sun/jini/test/impl/servicediscovery/event/AddListenerEvent.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/qa/src/com/sun/jini/test/impl/servicediscovery/event/AddListenerEvent.java?rev=1569259&r1=1569258&r2=1569259&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/qa/src/com/sun/jini/test/impl/servicediscovery/event/AddListenerEvent.java (original)
+++ river/jtsk/skunk/qa_refactor/trunk/qa/src/com/sun/jini/test/impl/servicediscovery/event/AddListenerEvent.java Tue Feb 18 10:42:59 2014
@@ -32,6 +32,7 @@ import java.util.logging.Level;
 
 import com.sun.jini.qa.harness.QAConfig;
 import com.sun.jini.qa.harness.Test;
+import com.sun.jini.test.share.LookupServices;
 
 /**
  * This class verifies that bug 4712396 has been fixed. As stated in the
@@ -62,8 +63,8 @@ public class AddListenerEvent extends Ab
     protected SDMListener[] sdmListener = new SDMListener[nListeners];
 
     public static class SDMListener extends AbstractBaseTest.SrvcListener {
-        String testName;
-        int listenerIndx;
+        final String testName;
+        final int listenerIndx;
         public SDMListener(QAConfig config, String classname, int listenerIndx) {
             super(config,classname);
             this.testName = classname;
@@ -117,9 +118,10 @@ public class AddListenerEvent extends Ab
      */
     protected void applyTestDef() throws Exception {
 	/* Register new proxies */
-        int nServices = getLookupServices().getnServices();
-        int nAttributes = getLookupServices().getnAttributes();
-        int nSecsServiceDiscovery = getLookupServices().getnSecsServiceDiscovery();
+        LookupServices lookupServices = getLookupServices();
+        int nServices = lookupServices.getnServices();
+        int nAttributes = lookupServices.getnAttributes();
+        int nSecsServiceDiscovery = lookupServices.getnSecsServiceDiscovery();
 	registerServices(0,nServices,nAttributes,testServiceType);
 	/* Create a cache for the service that was registered; register
              * the first listener to receive service discovery events.

Modified: river/jtsk/skunk/qa_refactor/trunk/qa/src/com/sun/jini/test/impl/servicediscovery/event/ReRegisterGoodEquals.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/qa/src/com/sun/jini/test/impl/servicediscovery/event/ReRegisterGoodEquals.java?rev=1569259&r1=1569258&r2=1569259&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/qa/src/com/sun/jini/test/impl/servicediscovery/event/ReRegisterGoodEquals.java (original)
+++ river/jtsk/skunk/qa_refactor/trunk/qa/src/com/sun/jini/test/impl/servicediscovery/event/ReRegisterGoodEquals.java Tue Feb 18 10:42:59 2014
@@ -68,10 +68,11 @@ public class ReRegisterGoodEquals extend
      */
     public Test construct(QAConfig config) throws Exception {
         super.construct(config);
-        testDesc = ""+getnLookupServices()+" lookup service(s), "+getnServices()
+        int nServices = getnServices();
+        testDesc = ""+getnLookupServices()+" lookup service(s), " + nServices
                        +" service(s) with well-defined equals() method";
-        nAddedExpected   = getnServices()*2;
-        nRemovedExpected = nAddedExpected-getnServices();
+        nAddedExpected   = nServices*2;
+        nRemovedExpected = nAddedExpected-nServices;
         testServiceType  = AbstractBaseTest.TEST_SERVICE;
         return this;
     }//end construct

Modified: river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/reggie/RegistrarImpl.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/reggie/RegistrarImpl.java?rev=1569259&r1=1569258&r2=1569259&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/reggie/RegistrarImpl.java (original)
+++ river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/reggie/RegistrarImpl.java Tue Feb 18 10:42:59 2014
@@ -544,24 +544,6 @@ class RegistrarImpl implements Registrar
         unicastDiscoveryConstraints = init.unicastDiscoveryConstraints;
         context = init.context;
         eventNotifierExec = new SynchronousExecutors(init.scheduledExecutor);
-                
-//                new ExtensibleExecutorService(init.executor,
-//            new RunnableFutureFactory(){
-//
-//                @Override
-//                public <T> RunnableFuture<T> newTaskFor(Runnable r, T value) {
-//                     if (r instanceof ObservableFutureTask) return (RunnableFuture<T>) r;
-//                     return new ObservableFutureTask(r, value);
-//                }
-//
-//                @Override
-//                public <T> RunnableFuture<T> newTaskFor(Callable<T> c) {
-//                    if (c instanceof ObservableFutureTask) return (RunnableFuture<T>) c;
-//                    return new ObservableFutureTask(c);
-//                }
-//
-//            });
-//        eventTaskQueue = new EventTaskQueue(eventNotifierExec);
         eventTaskMap = new TreeMap<EventReg,ExecutorService>();
         discoveryResponseExec = init.executor;
         ReliableLog log = null;
@@ -2049,109 +2031,7 @@ class RegistrarImpl implements Registrar
 	    reg = null;
 	}
     }
-    
-//    private static class DependencyLinker implements FutureObserver {
-//        private final ExecutorService executor;
-//        private final ObservableFuture precedent;
-//        private final FutureTask dependant;
-//
-//        public DependencyLinker(ExecutorService ex, ObservableFuture precedent, FutureTask dep) {
-//            executor = ex;
-//            this.precedent = precedent;
-//            dependant = dep;
-//        }
-//
-//        public synchronized void register() {
-//            precedent.addObserver(this);
-//        }
-//
-//        @Override
-//        public synchronized void futureCompleted(Future e) {
-//            if (precedent.equals(e)) executor.submit(dependant);
-//        }
-//
-//    }
-//     /**
-//     * 
-//     */
-//    static final class EventTaskWrapper extends ObservableFutureTask 
-//    {
-//        private final EventTask task;
-//
-//        private EventTaskWrapper(Runnable r, Object result) {
-//            super(r, result);
-//            if (r instanceof EventTask) task = (EventTask) r;
-//            else task = null;
-//        }
-//
-//        private EventTaskWrapper(Callable c)
-//        {
-//            super(c);
-//            task = null;
-//        }
-//        
-//        private EventTask getTask(){
-//            return task;
-//        }
-//    }
-//    
-//    private static class EventTaskWrapperComparator 
-//        implements Comparator<EventTaskWrapper> {
-//
-//        @Override
-//        public int compare(EventTaskWrapper o1, EventTaskWrapper o2) {
-//            EventTask t1 = o1.getTask();
-//            EventTask t2 = o2.getTask();
-//            if (t1.seqNo < t2.seqNo) return -1;
-//            if (t1.seqNo > t2.seqNo) return 1;
-//            return 0;
-//        }
-//        
-//    }
-//    
-//    private static final class EventTaskQueue implements FutureObserver {
-//        // CacheTasks pending completion.
-//        private final List<EventTaskWrapper> pending;
-//        private final ExecutorService executor;
-//        
-//        private EventTaskQueue(ExecutorService e){
-//            this.pending = new ArrayList<EventTaskWrapper>(200);
-//            executor = e;
-//        }
-//        
-//        private EventTaskWrapper submit(EventTask t){
-//            EventTaskWrapper future = new EventTaskWrapper(t, null);
-//            EventTaskWrapper precedent = null;
-//            synchronized (pending){
-//                pending.add(future);
-//                future.addObserver(this);
-//                Iterator<EventTaskWrapper> it = pending.iterator();
-//                while (it.hasNext()){
-//                    EventTaskWrapper w = it.next();
-//                    EventTask c = w.getTask();
-//                    if (t.dependsOn(c)) {
-//                        precedent = w;
-//                        break;
-//                    }
-//                }
-//            }
-//            if (precedent == null){
-//                executor.submit(future);
-//            } else {
-//                DependencyLinker linker = new DependencyLinker(executor, precedent, future);
-//                linker.register();
-//            }
-//            return future;
-//        }
-//
-//        @Override
-//        public void futureCompleted(Future e) {
-//            synchronized (pending){
-//                pending.remove(e);
-//            }
-//        }
-//    }
-//    
+
     /** An event to be sent, and the listener to send it to. */
     private static final class EventTask implements Callable<Boolean>, Comparable<EventTask> {
 

Modified: river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/ServiceDiscoveryManager.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/ServiceDiscoveryManager.java?rev=1569259&r1=1569258&r2=1569259&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/ServiceDiscoveryManager.java (original)
+++ river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/ServiceDiscoveryManager.java Tue Feb 18 10:42:59 2014
@@ -984,6 +984,11 @@ public class ServiceDiscoveryManager {
             CacheTaskWrapper future = new CacheTaskWrapper(t, null);
             pending.offer(future);
             future.addObserver(this);
+            try {
+                Thread.sleep(100L); //Brief sleep to allow deps to find each other.
+            } catch (InterruptedException ex) {
+                Thread.currentThread().interrupt(); // restore.
+            }
             if (t.hasDeps()) {
                 List<ObservableFuture> deps = new ArrayList<ObservableFuture>();
                 Iterator<CacheTaskWrapper> it = pending.iterator();

Modified: river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/DependencyLinker.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/DependencyLinker.java?rev=1569259&r1=1569258&r2=1569259&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/DependencyLinker.java (original)
+++ river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/DependencyLinker.java Tue Feb 18 10:42:59 2014
@@ -31,9 +31,14 @@ public class DependencyLinker implements
     }
 
     public synchronized void register() {
-        Iterator<ObservableFuture> it = tasks.iterator();
-        while (it.hasNext()) {
-            it.next().addObserver(this);
+        for (int i = 0; i < tasks.size(); i++){
+            ObservableFuture f = null;
+            try {
+                f = tasks.get(i);
+            } catch (IndexOutOfBoundsException e){
+                continue;
+            }
+            if (f != null) f.addObserver(this);
         }
     }
 

Modified: river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/SynchronousExecutors.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/SynchronousExecutors.java?rev=1569259&r1=1569258&r2=1569259&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/SynchronousExecutors.java (original)
+++ river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/SynchronousExecutors.java Tue Feb 18 10:42:59 2014
@@ -440,17 +440,20 @@ public class SynchronousExecutors implem
                 reschedule = true;
                 throw e;
             } finally {
-                if (reschedule) {
-                    attempt++;
-                    queue.peek = null; // set peek to null to unblock queue.
-                    executorLock.lock();
-                    try {
-                        waiting.signalAll();
-                    } finally {
-                        executorLock.unlock();
+                try {
+                    if (reschedule) {
+                        attempt++;
+                        queue.peek = null; // set peek to null to unblock queue.
+                        executorLock.lock();
+                        try {
+                            waiting.signalAll();
+                        } finally {
+                            executorLock.unlock();
+                        }
                     }
+                } finally {
+                    queue.lock.unlock();
                 }
-                queue.lock.unlock();
             }
         }