You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2013/06/25 15:55:48 UTC

svn commit: r1496474 - /manifoldcf/branches/CONNECTORS-703/framework/authority-servlet/src/main/java/org/apache/manifoldcf/authorityservlet/UserACLServlet.java

Author: kwright
Date: Tue Jun 25 13:55:48 2013
New Revision: 1496474

URL: http://svn.apache.org/r1496474
Log:
Implement mapper invocation logic.

Modified:
    manifoldcf/branches/CONNECTORS-703/framework/authority-servlet/src/main/java/org/apache/manifoldcf/authorityservlet/UserACLServlet.java

Modified: manifoldcf/branches/CONNECTORS-703/framework/authority-servlet/src/main/java/org/apache/manifoldcf/authorityservlet/UserACLServlet.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-703/framework/authority-servlet/src/main/java/org/apache/manifoldcf/authorityservlet/UserACLServlet.java?rev=1496474&r1=1496473&r2=1496474&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-703/framework/authority-servlet/src/main/java/org/apache/manifoldcf/authorityservlet/UserACLServlet.java (original)
+++ manifoldcf/branches/CONNECTORS-703/framework/authority-servlet/src/main/java/org/apache/manifoldcf/authorityservlet/UserACLServlet.java Tue Jun 25 13:55:48 2013
@@ -24,6 +24,7 @@ import org.apache.manifoldcf.authorities
 import org.apache.manifoldcf.authorities.system.Logging;
 import org.apache.manifoldcf.authorities.system.RequestQueue;
 import org.apache.manifoldcf.authorities.system.AuthRequest;
+import org.apache.manifoldcf.authorities.system.MappingRequest;
 
 import java.io.*;
 import java.util.*;
@@ -171,6 +172,13 @@ public class UserACLServlet extends Http
         Logging.authorityService.debug("Received authority request for user '"+userRecord.toString()+"'");
       }
 
+      RequestQueue<MappingRequest> mappingQueue = ManifoldCF.getMappingRequestQueue();
+      if (mappingQueue == null)
+      {
+        // System wasn't started; return unauthorized
+        throw new ManifoldCFException("System improperly initialized");
+      }
+
       RequestQueue<AuthRequest> queue = ManifoldCF.getRequestQueue();
       if (queue == null)
       {
@@ -178,16 +186,67 @@ public class UserACLServlet extends Http
         throw new ManifoldCFException("System improperly initialized");
       }
 
+      
       IThreadContext itc = ThreadContextFactory.make();
+      
+      IMappingConnectionManager mappingConnManager = MappingConnectionManagerFactory.make(itc);
+      
+      IMappingConnection[] mappingConnections = mappingConnManager.getAllConnections();
+      // One thread per connection, which is responsible for starting the mapping process when it is ready.
+      MappingOrderThread[] mappingThreads = new MappingOrderThread[mappingConnections.length];
+      // Requests that exist but may not yet have been queued
+      Map<String,MappingRequest> mappingRequests = new HashMap<String,MappingRequest>();
+      
+      for (int i = 0; i < mappingConnections.length; i++)
+      {
+        IMappingConnection thisConnection = mappingConnections[i];
+        String identifyingString = thisConnection.getDescription();
+        
+        // Create a record and add it to the queue
+        MappingRequest mr = new MappingRequest(userRecord,
+          thisConnection.getClassName(),identifyingString,thisConnection.getConfigParams(),thisConnection.getMaxConnections());
+        
+        mappingRequests.put(thisConnection.getName(), mr);
+        mappingThreads[i] = new MappingOrderThread(mappingQueue, mappingRequests, thisConnection);
+      }
+      
+      // Start the threads!
+      for (int i = 0; i < mappingConnections.length; i++)
+      {
+        mappingThreads[i].start();
+      }
+      
+      // Wait for the threads to finish up.  This will guarantee that all mappers have been started.
+      for (int i = 0;  i < mappingConnections.length; i++)
+      {
+        mappingThreads[i].finishUp();
+      }
+      
+      // Wait for everything to finish.
+      for (MappingRequest mr : mappingRequests.values())
+      {
+        mr.waitForComplete();
+      }
+      
+      // Handle all exceptions thrown during mapping.  In general this just means logging them, because
+      // the downstream authorities will presumably not find what they are looking for and error out that way.
+      for (MappingRequest mr : mappingRequests.values())
+      {
+        Throwable exception = mr.getAnswerException();
+        if (exception != null)
+        {
+          Logging.authorityService.warn("Mapping exception logged from "+mr.getIdentifyingString()+": "+exception.getMessage()+"; mapper aborted", exception);
+        }
+      }
+      
       IAuthorityConnectionManager authConnManager = AuthorityConnectionManagerFactory.make(itc);
 
       IAuthorityConnection[] connections = authConnManager.getAllConnections();
-      int i = 0;
-
+      
       AuthRequest[] requests = new AuthRequest[connections.length];
 
       // Queue up all the requests
-      while (i < connections.length)
+      for (int i = 0; i < connections.length; i++)
       {
         IAuthorityConnection ac = connections[i];
 
@@ -198,11 +257,10 @@ public class UserACLServlet extends Http
         AuthRequest ar = new AuthRequest(userRecord,ac.getClassName(),identifyingString,ac.getConfigParams(),ac.getMaxConnections());
         queue.addRequest(ar);
 
-        requests[i++] = ar;
+        requests[i] = ar;
       }
 
       // Now, work through the returning answers.
-      i = 0;
 
       // Ask all the registered authorities for their ACLs, and merge the final list together.
       StringBuilder sb = new StringBuilder();
@@ -211,10 +269,10 @@ public class UserACLServlet extends Http
       ServletOutputStream out = response.getOutputStream();
       try
       {
-        while (i < connections.length)
+        for (int i = 0; i < connections.length; i++)
         {
           IAuthorityConnection ac = connections[i];
-          AuthRequest ar = requests[i++];
+          AuthRequest ar = requests[i];
 
           if (Logging.authorityService.isDebugEnabled())
             Logging.authorityService.debug("Waiting for answer from connector class '"+ac.getClassName()+"' for user '"+userRecord.toString()+"'");
@@ -306,4 +364,68 @@ public class UserACLServlet extends Http
     }
   }
 
+  /** This thread is responsible for making sure that the constraints for a given mapping connection
+  * are met, and then when they are, firing off a MappingRequest.  One of these threads is spun up
+  * for every IMappingConnection being handled.
+  * NOTE WELL: The number of threads this might require is worrisome.  It is essentially
+  * <number_of_app_server_threads> * <number_of_mappers>.  I will try later to see if I can find
+  * a way of limiting this to sane numbers.
+  */
+  protected static class MappingOrderThread extends Thread
+  {
+    protected final Map<String,MappingRequest> requests;
+    protected final RequestQueue<MappingRequest> mappingRequestQueue;
+    protected final IMappingConnection mappingConnection;
+    
+    protected Throwable exception = null;
+    
+    public MappingOrderThread(RequestQueue<MappingRequest> mappingRequestQueue,
+      Map<String, MappingRequest> requests,
+      IMappingConnection mappingConnection)
+    {
+      super();
+      this.mappingRequestQueue = mappingRequestQueue;
+      this.mappingConnection = mappingConnection;
+      this.requests = requests;
+      setName("Constraint matcher for mapper "+mappingConnection.getName());
+      setDaemon(true);
+    }
+    
+    public void run()
+    {
+      try
+      {
+        while (true)
+        {
+          Set<String> prereqs = mappingConnection.getPrerequisites();
+          for (String x : prereqs)
+          {
+            MappingRequest mr = requests.get(x);
+            mr.waitForComplete();
+          }
+          // Constraints are met.  Fire off the request.
+          mappingRequestQueue.addRequest(requests.get(mappingConnection.getName()));
+        }
+      }
+      catch (Throwable e)
+      {
+        exception = e;
+      }
+    }
+
+    public void finishUp()
+      throws InterruptedException
+    {
+      join();
+      if (exception != null)
+      {
+        if (exception instanceof Error)
+          throw (Error)exception;
+        else if (exception instanceof RuntimeException)
+          throw (RuntimeException)exception;
+      }
+    }
+    
+  }
+  
 }