You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2013/06/27 15:25:53 UTC

svn commit: r1497348 - /manifoldcf/branches/CONNECTORS-703/framework/authority-servlet/src/main/java/org/apache/manifoldcf/authorityservlet/UserACLServlet.java

Author: kwright
Date: Thu Jun 27 13:25:53 2013
New Revision: 1497348

URL: http://svn.apache.org/r1497348
Log:
Hook up prereqs for authorities

Modified:
    manifoldcf/branches/CONNECTORS-703/framework/authority-servlet/src/main/java/org/apache/manifoldcf/authorityservlet/UserACLServlet.java

Modified: manifoldcf/branches/CONNECTORS-703/framework/authority-servlet/src/main/java/org/apache/manifoldcf/authorityservlet/UserACLServlet.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-703/framework/authority-servlet/src/main/java/org/apache/manifoldcf/authorityservlet/UserACLServlet.java?rev=1497348&r1=1497347&r2=1497348&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-703/framework/authority-servlet/src/main/java/org/apache/manifoldcf/authorityservlet/UserACLServlet.java (original)
+++ manifoldcf/branches/CONNECTORS-703/framework/authority-servlet/src/main/java/org/apache/manifoldcf/authorityservlet/UserACLServlet.java Thu Jun 27 13:25:53 2013
@@ -190,17 +190,24 @@ public class UserACLServlet extends Http
       IThreadContext itc = ThreadContextFactory.make();
       
       IMappingConnectionManager mappingConnManager = MappingConnectionManagerFactory.make(itc);
-      
+      IAuthorityConnectionManager authConnManager = AuthorityConnectionManagerFactory.make(itc);
+
       IMappingConnection[] mappingConnections = mappingConnManager.getAllConnections();
+      IAuthorityConnection[] connections = authConnManager.getAllConnections();
+      
       // One thread per connection, which is responsible for starting the mapping process when it is ready.
       MappingOrderThread[] mappingThreads = new MappingOrderThread[mappingConnections.length];
-      // Requests that exist but may not yet have been queued
+      // One thread per authority, which is responsible for starting the auth request when it is ready.
+      AuthOrderThread[] authThreads = new AuthOrderThread[connections.length];
+
+      // Create mapping requests
       Map<String,MappingRequest> mappingRequests = new HashMap<String,MappingRequest>();
-      
       for (int i = 0; i < mappingConnections.length; i++)
       {
         IMappingConnection thisConnection = mappingConnections[i];
         String identifyingString = thisConnection.getDescription();
+        if (identifyingString == null || identifyingString.length() == 0)
+          identifyingString = thisConnection.getName();
         
         // Create a record and add it to the queue
         MappingRequest mr = new MappingRequest(userRecord,
@@ -210,17 +217,42 @@ public class UserACLServlet extends Http
         mappingThreads[i] = new MappingOrderThread(mappingQueue, mappingRequests, thisConnection);
       }
       
+      // Create auth requests
+      Map<String,AuthRequest> authRequests = new HashMap<String,AuthRequest>();
+      for (int i = 0; i < connections.length; i++)
+      {
+        IAuthorityConnection thisConnection = connections[i];
+        String identifyingString = thisConnection.getDescription();
+        if (identifyingString == null || identifyingString.length() == 0)
+          identifyingString = thisConnection.getName();
+        
+        // Create a record and add it to the queue
+        AuthRequest ar = new AuthRequest(userRecord,
+          thisConnection.getClassName(),identifyingString,thisConnection.getConfigParams(),thisConnection.getMaxConnections());
+        
+        authRequests.put(thisConnection.getName(), ar);
+        authThreads[i] = new AuthOrderThread(queue, authRequests, thisConnection);
+      }
+      
       // Start the threads!
       for (int i = 0; i < mappingConnections.length; i++)
       {
         mappingThreads[i].start();
       }
+      for (int i = 0; i < connections.length; i++)
+      {
+        authThreads[i].start();
+      }
       
       // Wait for the threads to finish up.  This will guarantee that all mappers have been started.
       for (int i = 0;  i < mappingConnections.length; i++)
       {
         mappingThreads[i].finishUp();
       }
+      for (int i = 0;  i < connections.length; i++)
+      {
+        authThreads[i].finishUp();
+      }
       
       // Wait for everything to finish.
       for (MappingRequest mr : mappingRequests.values())
@@ -239,27 +271,6 @@ public class UserACLServlet extends Http
         }
       }
       
-      IAuthorityConnectionManager authConnManager = AuthorityConnectionManagerFactory.make(itc);
-
-      IAuthorityConnection[] connections = authConnManager.getAllConnections();
-      
-      AuthRequest[] requests = new AuthRequest[connections.length];
-
-      // Queue up all the requests
-      for (int i = 0; i < connections.length; i++)
-      {
-        IAuthorityConnection ac = connections[i];
-
-        String identifyingString = ac.getDescription();
-        if (identifyingString == null || identifyingString.length() == 0)
-          identifyingString = ac.getName();
-
-        AuthRequest ar = new AuthRequest(userRecord,ac.getClassName(),identifyingString,ac.getConfigParams(),ac.getMaxConnections());
-        queue.addRequest(ar);
-
-        requests[i] = ar;
-      }
-
       // Now, work through the returning answers.
 
       // Ask all the registered authorities for their ACLs, and merge the final list together.
@@ -272,7 +283,7 @@ public class UserACLServlet extends Http
         for (int i = 0; i < connections.length; i++)
         {
           IAuthorityConnection ac = connections[i];
-          AuthRequest ar = requests[i];
+          AuthRequest ar = authRequests.get(ac.getName());
 
           if (Logging.authorityService.isDebugEnabled())
             Logging.authorityService.debug("Waiting for answer from connector class '"+ac.getClassName()+"' for user '"+userRecord.toString()+"'");
@@ -427,5 +438,69 @@ public class UserACLServlet extends Http
     }
     
   }
+
+  /** This thread is responsible for making sure that the constraints for a given authority connection
+  * are met, and then when they are, firing off an AuthRequest.  One of these threads is spun up
+  * for every IAuthorityConnection being handled.
+  * NOTE WELL: The number of threads this might require is worrisome.  It is essentially
+  * <number_of_app_server_threads> * <number_of_authorities>.  I will try later to see if I can find
+  * a way of limiting this to sane numbers.
+  */
+  protected static class AuthOrderThread extends Thread
+  {
+    protected final Map<String,AuthRequest> requests;
+    protected final RequestQueue<AuthRequest> authRequestQueue;
+    protected final IAuthorityConnection authConnection;
+    
+    protected Throwable exception = null;
+    
+    public AuthOrderThread(RequestQueue<AuthRequest> authRequestQueue,
+      Map<String, AuthRequest> requests,
+      IAuthorityConnection authConnection)
+    {
+      super();
+      this.authRequestQueue = authRequestQueue;
+      this.authConnection = authConnection;
+      this.requests = requests;
+      setName("Constraint matcher for authority "+authConnection.getName());
+      setDaemon(true);
+    }
+    
+    public void run()
+    {
+      try
+      {
+        while (true)
+        {
+          Set<String> prereqs = authConnection.getPrerequisites();
+          for (String x : prereqs)
+          {
+            AuthRequest mr = requests.get(x);
+            mr.waitForComplete();
+          }
+          // Constraints are met.  Fire off the request.
+          authRequestQueue.addRequest(requests.get(authConnection.getName()));
+        }
+      }
+      catch (Throwable e)
+      {
+        exception = e;
+      }
+    }
+
+    public void finishUp()
+      throws InterruptedException
+    {
+      join();
+      if (exception != null)
+      {
+        if (exception instanceof Error)
+          throw (Error)exception;
+        else if (exception instanceof RuntimeException)
+          throw (RuntimeException)exception;
+      }
+    }
+    
+  }
   
 }