You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2013/06/25 15:55:48 UTC
svn commit: r1496474 -
/manifoldcf/branches/CONNECTORS-703/framework/authority-servlet/src/main/java/org/apache/manifoldcf/authorityservlet/UserACLServlet.java
Author: kwright
Date: Tue Jun 25 13:55:48 2013
New Revision: 1496474
URL: http://svn.apache.org/r1496474
Log:
Implement mapper invocation logic.
Modified:
manifoldcf/branches/CONNECTORS-703/framework/authority-servlet/src/main/java/org/apache/manifoldcf/authorityservlet/UserACLServlet.java
Modified: manifoldcf/branches/CONNECTORS-703/framework/authority-servlet/src/main/java/org/apache/manifoldcf/authorityservlet/UserACLServlet.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-703/framework/authority-servlet/src/main/java/org/apache/manifoldcf/authorityservlet/UserACLServlet.java?rev=1496474&r1=1496473&r2=1496474&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-703/framework/authority-servlet/src/main/java/org/apache/manifoldcf/authorityservlet/UserACLServlet.java (original)
+++ manifoldcf/branches/CONNECTORS-703/framework/authority-servlet/src/main/java/org/apache/manifoldcf/authorityservlet/UserACLServlet.java Tue Jun 25 13:55:48 2013
@@ -24,6 +24,7 @@ import org.apache.manifoldcf.authorities
import org.apache.manifoldcf.authorities.system.Logging;
import org.apache.manifoldcf.authorities.system.RequestQueue;
import org.apache.manifoldcf.authorities.system.AuthRequest;
+import org.apache.manifoldcf.authorities.system.MappingRequest;
import java.io.*;
import java.util.*;
@@ -171,6 +172,13 @@ public class UserACLServlet extends Http
Logging.authorityService.debug("Received authority request for user '"+userRecord.toString()+"'");
}
+ RequestQueue<MappingRequest> mappingQueue = ManifoldCF.getMappingRequestQueue();
+ if (mappingQueue == null)
+ {
+ // System wasn't started; return unauthorized
+ throw new ManifoldCFException("System improperly initialized");
+ }
+
RequestQueue<AuthRequest> queue = ManifoldCF.getRequestQueue();
if (queue == null)
{
@@ -178,16 +186,67 @@ public class UserACLServlet extends Http
throw new ManifoldCFException("System improperly initialized");
}
+
IThreadContext itc = ThreadContextFactory.make();
+
+ IMappingConnectionManager mappingConnManager = MappingConnectionManagerFactory.make(itc);
+
+ IMappingConnection[] mappingConnections = mappingConnManager.getAllConnections();
+ // One thread per connection, which is responsible for starting the mapping process when it is ready.
+ MappingOrderThread[] mappingThreads = new MappingOrderThread[mappingConnections.length];
+ // Requests that exist but may not yet have been queued
+ Map<String,MappingRequest> mappingRequests = new HashMap<String,MappingRequest>();
+
+ for (int i = 0; i < mappingConnections.length; i++)
+ {
+ IMappingConnection thisConnection = mappingConnections[i];
+ String identifyingString = thisConnection.getDescription();
+
+ // Create a record and add it to the queue
+ MappingRequest mr = new MappingRequest(userRecord,
+ thisConnection.getClassName(),identifyingString,thisConnection.getConfigParams(),thisConnection.getMaxConnections());
+
+ mappingRequests.put(thisConnection.getName(), mr);
+ mappingThreads[i] = new MappingOrderThread(mappingQueue, mappingRequests, thisConnection);
+ }
+
+ // Start the threads!
+ for (int i = 0; i < mappingConnections.length; i++)
+ {
+ mappingThreads[i].start();
+ }
+
+ // Wait for the threads to finish up. This will guarantee that all mappers have been started.
+ for (int i = 0; i < mappingConnections.length; i++)
+ {
+ mappingThreads[i].finishUp();
+ }
+
+ // Wait for everything to finish.
+ for (MappingRequest mr : mappingRequests.values())
+ {
+ mr.waitForComplete();
+ }
+
+ // Handle all exceptions thrown during mapping. In general this just means logging them, because
+ // the downstream authorities will presumably not find what they are looking for and error out that way.
+ for (MappingRequest mr : mappingRequests.values())
+ {
+ Throwable exception = mr.getAnswerException();
+ if (exception != null)
+ {
+ Logging.authorityService.warn("Mapping exception logged from "+mr.getIdentifyingString()+": "+exception.getMessage()+"; mapper aborted", exception);
+ }
+ }
+
IAuthorityConnectionManager authConnManager = AuthorityConnectionManagerFactory.make(itc);
IAuthorityConnection[] connections = authConnManager.getAllConnections();
- int i = 0;
-
+
AuthRequest[] requests = new AuthRequest[connections.length];
// Queue up all the requests
- while (i < connections.length)
+ for (int i = 0; i < connections.length; i++)
{
IAuthorityConnection ac = connections[i];
@@ -198,11 +257,10 @@ public class UserACLServlet extends Http
AuthRequest ar = new AuthRequest(userRecord,ac.getClassName(),identifyingString,ac.getConfigParams(),ac.getMaxConnections());
queue.addRequest(ar);
- requests[i++] = ar;
+ requests[i] = ar;
}
// Now, work through the returning answers.
- i = 0;
// Ask all the registered authorities for their ACLs, and merge the final list together.
StringBuilder sb = new StringBuilder();
@@ -211,10 +269,10 @@ public class UserACLServlet extends Http
ServletOutputStream out = response.getOutputStream();
try
{
- while (i < connections.length)
+ for (int i = 0; i < connections.length; i++)
{
IAuthorityConnection ac = connections[i];
- AuthRequest ar = requests[i++];
+ AuthRequest ar = requests[i];
if (Logging.authorityService.isDebugEnabled())
Logging.authorityService.debug("Waiting for answer from connector class '"+ac.getClassName()+"' for user '"+userRecord.toString()+"'");
@@ -306,4 +364,68 @@ public class UserACLServlet extends Http
}
}
+ /** This thread is responsible for making sure that the constraints for a given mapping connection
+ * are met, and then when they are, firing off a MappingRequest. One of these threads is spun up
+ * for every IMappingConnection being handled.
+ * NOTE WELL: The number of threads this might require is worrisome. It is essentially
+ * <number_of_app_server_threads> * <number_of_mappers>. I will try later to see if I can find
+ * a way of limiting this to sane numbers.
+ */
+ protected static class MappingOrderThread extends Thread
+ {
+ protected final Map<String,MappingRequest> requests;
+ protected final RequestQueue<MappingRequest> mappingRequestQueue;
+ protected final IMappingConnection mappingConnection;
+
+ protected Throwable exception = null;
+
+ public MappingOrderThread(RequestQueue<MappingRequest> mappingRequestQueue,
+ Map<String, MappingRequest> requests,
+ IMappingConnection mappingConnection)
+ {
+ super();
+ this.mappingRequestQueue = mappingRequestQueue;
+ this.mappingConnection = mappingConnection;
+ this.requests = requests;
+ setName("Constraint matcher for mapper "+mappingConnection.getName());
+ setDaemon(true);
+ }
+
+ public void run()
+ {
+ try
+ {
+ while (true)
+ {
+ Set<String> prereqs = mappingConnection.getPrerequisites();
+ for (String x : prereqs)
+ {
+ MappingRequest mr = requests.get(x);
+ mr.waitForComplete();
+ }
+ // Constraints are met. Fire off the request.
+ mappingRequestQueue.addRequest(requests.get(mappingConnection.getName()));
+ }
+ }
+ catch (Throwable e)
+ {
+ exception = e;
+ }
+ }
+
+ public void finishUp()
+ throws InterruptedException
+ {
+ join();
+ if (exception != null)
+ {
+ if (exception instanceof Error)
+ throw (Error)exception;
+ else if (exception instanceof RuntimeException)
+ throw (RuntimeException)exception;
+ }
+ }
+
+ }
+
}