You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by ml...@apache.org on 2013/06/24 15:26:36 UTC
svn commit: r1496030 -
/manifoldcf/branches/CONNECTORS-727/connectors/generic/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/generic/GenericConnector.java
Author: mlizewski
Date: Mon Jun 24 13:26:35 2013
New Revision: 1496030
URL: http://svn.apache.org/r1496030
Log:
refactored processing documents in separate thread
Modified:
manifoldcf/branches/CONNECTORS-727/connectors/generic/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/generic/GenericConnector.java
Modified: manifoldcf/branches/CONNECTORS-727/connectors/generic/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/generic/GenericConnector.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-727/connectors/generic/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/generic/GenericConnector.java?rev=1496030&r1=1496029&r2=1496030&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-727/connectors/generic/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/generic/GenericConnector.java (original)
+++ manifoldcf/branches/CONNECTORS-727/connectors/generic/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/generic/GenericConnector.java Mon Jun 24 13:26:35 2013
@@ -48,6 +48,7 @@ import org.apache.http.HttpStatus;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.Credentials;
import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.auth.BasicScheme;
@@ -65,30 +66,30 @@ import org.xml.sax.SAXException;
import org.xml.sax.helpers.DefaultHandler;
public class GenericConnector extends BaseRepositoryConnector {
-
+
public static final String _rcsid = "@(#)$Id: GenericConnector.java 994959 2010-09-08 10:04:42Z redguy $";
/**
* Deny access token for default authority
*/
private final static String defaultAuthorityDenyToken = "DEAD_AUTHORITY";
-
+
private final static String ACTION_PARAM_NAME = "action";
-
+
private final static String ACTION_CHECK = "check";
-
+
private final static String ACTION_SEED = "seed";
-
+
private final static String ACTION_ITEMS = "items";
-
+
private final static String ACTION_ITEM = "item";
-
+
private String genericLogin = null;
-
+
private String genericPassword = null;
-
+
private String genericEntryPoint = null;
-
+
private ConcurrentHashMap<String, Item> documentCache = new ConcurrentHashMap<String, Item>(10);
/**
@@ -96,12 +97,12 @@ public class GenericConnector extends Ba
*/
public GenericConnector() {
}
-
+
@Override
public int getMaxDocumentRequest() {
return 10;
}
-
+
@Override
public int getConnectorModel() {
return GenericConnector.MODEL_ADD_CHANGE;
@@ -136,7 +137,7 @@ public class GenericConnector extends Ba
} catch (ManifoldCFException ignore) {
}
}
-
+
protected DefaultHttpClient getClient() throws ManifoldCFException {
DefaultHttpClient cl = new DefaultHttpClient();
if (genericLogin != null && !genericLogin.isEmpty()) {
@@ -151,7 +152,7 @@ public class GenericConnector extends Ba
}
return cl;
}
-
+
@Override
public String check() throws ManifoldCFException {
HttpClient client = getClient();
@@ -168,15 +169,15 @@ public class GenericConnector extends Ba
return "Check exception: " + ex.getMessage();
}
}
-
+
@Override
public void addSeedDocuments(ISeedingActivity activities, DocumentSpecification spec,
long startTime, long endTime)
throws ManifoldCFException, ServiceInterruption {
-
+
HttpClient client = getClient();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
-
+
try {
StringBuilder url = new StringBuilder(genericEntryPoint);
url.append("?").append(ACTION_PARAM_NAME).append("=").append(ACTION_SEED);
@@ -219,7 +220,7 @@ public class GenericConnector extends Ba
throw new ManifoldCFException("addSeedDocuments error: " + ex.getMessage(), ex);
}
}
-
+
@Override
public String[] getDocumentVersions(String[] documentIdentifiers, String[] oldVersions, IVersionActivity activities,
DocumentSpecification spec, int jobType, boolean usesDefaultAuthority)
@@ -230,7 +231,7 @@ public class GenericConnector extends Ba
// Sort it,
java.util.Arrays.sort(acls);
String rights = java.util.Arrays.toString(acls);
-
+
String genericAuthMode = "provided";
for (int i = 0; i < spec.getChildCount(); i++) {
SpecificationNode sn = spec.getChild(i);
@@ -239,7 +240,7 @@ public class GenericConnector extends Ba
break;
}
}
-
+
HttpClient client = getClient();
StringBuilder url = new StringBuilder(genericEntryPoint);
try {
@@ -283,7 +284,7 @@ public class GenericConnector extends Ba
throw new ManifoldCFException("getDocumentVersions error: " + ex.getMessage(), ex);
}
}
-
+
@Override
public void processDocuments(String[] documentIdentifiers, String[] versions, IProcessActivity activities,
DocumentSpecification spec, boolean[] scanOnly, int jobType)
@@ -291,7 +292,7 @@ public class GenericConnector extends Ba
// Forced acls
String[] acls = getAcls(spec);
-
+
String genericAuthMode = "provided";
for (int i = 0; i < spec.getChildCount(); i++) {
SpecificationNode sn = spec.getChild(i);
@@ -300,7 +301,7 @@ public class GenericConnector extends Ba
break;
}
}
-
+
HttpClient client = getClient();
for (int i = 0; i < documentIdentifiers.length; i++) {
if (scanOnly[i]) {
@@ -310,7 +311,7 @@ public class GenericConnector extends Ba
if (item == null) {
throw new ManifoldCFException("processDocuments error - no cache entry for: " + documentIdentifiers[i]);
}
-
+
RepositoryDocument doc = new RepositoryDocument();
if (item.mimeType != null) {
doc.setMimeType(item.mimeType);
@@ -390,28 +391,35 @@ public class GenericConnector extends Ba
url.append("&").append(URLEncoder.encode(paramName, "UTF-8")).append("=").append(URLEncoder.encode(paramValue, "UTF-8"));
}
}
- HttpGet method = new HttpGet(url.toString());
- HttpResponse response = client.execute(method);
- try {
- if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
- throw new ManifoldCFException("processDocuments error - interface returned incorrect return code for: " + documentIdentifiers[i]);
+
+ ExecuteProcessThread ingestThread = new ExecuteProcessThread(client, activities, doc, item.url, url.toString(), item.id, versions[i]);
+ ingestThread.start();
+ ingestThread.join();
+ if (ingestThread.getException() != null) {
+ Throwable thr = ingestThread.getException();
+ if (thr instanceof ManifoldCFException) {
+ if (((ManifoldCFException) thr).getErrorCode() == ManifoldCFException.INTERRUPTED) {
+ throw new InterruptedException(thr.getMessage());
+ }
+ throw (ManifoldCFException) thr;
+ } else if (thr instanceof ServiceInterruption) {
+ throw (ServiceInterruption) thr;
+ } else if (thr instanceof IOException) {
+ handleIOException((IOException) thr);
+ } else if (thr instanceof RuntimeException) {
+ throw (RuntimeException) thr;
}
-
- doc.setBinary(response.getEntity().getContent(), response.getEntity().getContentLength());
- activities.ingestDocument(documentIdentifiers[i], versions[i], item.url, doc);
- } finally {
- EntityUtils.consume(response.getEntity());
- method.releaseConnection();
+ throw new ManifoldCFException("processDocuments error: " + thr.getMessage(), thr);
}
+ } catch (InterruptedException ex) {
+ throw new ManifoldCFException("processDocuments error - invalid chars in id: " + ex.getMessage(), ex);
} catch (UnsupportedEncodingException ex) {
throw new ManifoldCFException("processDocuments error - invalid chars in id: " + ex.getMessage(), ex);
- } catch (IOException ex) {
- handleIOException(ex);
}
}
}
}
-
+
@Override
public void releaseDocumentVersions(String[] documentIdentifiers, String[] versions) throws ManifoldCFException {
for (int i = 0; i < documentIdentifiers.length; i++) {
@@ -421,13 +429,13 @@ public class GenericConnector extends Ba
}
super.releaseDocumentVersions(documentIdentifiers, versions);
}
-
+
@Override
public void outputConfigurationHeader(IThreadContext threadContext, IHTTPOutput out,
Locale locale, ConfigParams parameters, List<String> tabsArray)
throws ManifoldCFException, IOException {
tabsArray.add(Messages.getString(locale, "generic.EntryPoint"));
-
+
out.print(
"<script type=\"text/javascript\">\n"
+ "<!--\n"
@@ -441,12 +449,12 @@ public class GenericConnector extends Ba
+ "//-->\n"
+ "</script>\n");
}
-
+
@Override
public void outputConfigurationBody(IThreadContext threadContext, IHTTPOutput out,
Locale locale, ConfigParams parameters, String tabName)
throws ManifoldCFException, IOException {
-
+
String server = getParam(parameters, "genericEntryPoint", "");
String login = getParam(parameters, "genericLogin", "");
String password = "";
@@ -454,7 +462,7 @@ public class GenericConnector extends Ba
password = ManifoldCF.deobfuscate(getParam(parameters, "genericPassword", ""));
} catch (ManifoldCFException ignore) {
}
-
+
if (tabName.equals(Messages.getString(locale, "generic.EntryPoint"))) {
out.print(
"<table class=\"displaytable\">\n"
@@ -478,15 +486,15 @@ public class GenericConnector extends Ba
out.print("<input type=\"hidden\" name=\"genericPassword\" value=\"" + Encoder.attributeEscape(password) + "\"/>\n");
}
}
-
+
@Override
public String processConfigurationPost(IThreadContext threadContext, IPostParameters variableContext,
Locale locale, ConfigParams parameters)
throws ManifoldCFException {
-
+
copyParam(variableContext, parameters, "genericLogin");
copyParam(variableContext, parameters, "genericEntryPoint");
-
+
String password = variableContext.getParameter("genericPassword");
if (password == null) {
password = "";
@@ -494,14 +502,14 @@ public class GenericConnector extends Ba
parameters.setParameter("genericPassword", ManifoldCF.obfuscate(password));
return null;
}
-
+
@Override
public void viewConfiguration(IThreadContext threadContext, IHTTPOutput out,
Locale locale, ConfigParams parameters)
throws ManifoldCFException, IOException {
String login = getParam(parameters, "genericLogin", "");
String server = getParam(parameters, "genericEntryPoint", "");
-
+
out.print(
"<table class=\"displaytable\">\n"
+ " <tr><td class=\"separator\" colspan=\"2\"><hr/></td></tr>\n"
@@ -519,13 +527,13 @@ public class GenericConnector extends Ba
+ " </tr>\n"
+ "</table>\n");
}
-
+
@Override
public void outputSpecificationHeader(IHTTPOutput out, Locale locale, DocumentSpecification ds, List<String> tabsArray)
throws ManifoldCFException, IOException {
tabsArray.add(Messages.getString(locale, "generic.Parameters"));
tabsArray.add(Messages.getString(locale, "generic.Security"));
-
+
out.print(
"<script type=\"text/javascript\">\n"
+ "<!--\n"
@@ -559,26 +567,26 @@ public class GenericConnector extends Ba
+ "//-->\n"
+ "</script>\n");
}
-
+
@Override
public void outputSpecificationBody(IHTTPOutput out, Locale locale, DocumentSpecification ds, String tabName)
throws ManifoldCFException, IOException {
-
+
int k, i;
-
+
if (tabName.equals(Messages.getString(locale, "generic.Parameters"))) {
-
+
out.print("<table class=\"displaytable\">"
+ "<tr><td class=\"description\"><nobr>" + Messages.getBodyString(locale, "generic.ParametersColon") + "</nobr></td>"
+ "<td class=\"value\">");
-
+
out.print("<table class=\"formtable\">\n"
+ "<tr class=\"formheaderrow\">"
+ "<td class=\"formcolumnheader\"></td>"
+ "<td class=\"formcolumnheader\">" + Messages.getBodyString(locale, "generic.ParameterName") + "</td>"
+ "<td class=\"formcolumnheader\">" + Messages.getBodyString(locale, "generic.ParameterValue") + "</td>"
+ "</tr>");
-
+
i = 0;
k = 0;
while (i < ds.getChildCount()) {
@@ -661,7 +669,7 @@ public class GenericConnector extends Ba
out.print(
"<table class=\"displaytable\">\n"
+ " <tr><td class=\"separator\" colspan=\"2\"><hr/></td></tr>\n");
-
+
out.print(" <tr>\n"
+ " <td class=\"description\">" + Messages.getBodyString(locale, "generic.AuthMode") + "</td>\n"
+ " <td class=\"value\" >\n"
@@ -741,11 +749,11 @@ public class GenericConnector extends Ba
out.print("<input type=\"hidden\" name=\"genericAuthMode\" value=\"" + Encoder.attributeEscape(genericAuthMode) + "\"/>\n");
}
}
-
+
@Override
public String processSpecificationPost(IPostParameters variableContext, Locale locale, DocumentSpecification ds)
throws ManifoldCFException {
-
+
String xc = variableContext.getParameter("paramcount");
if (xc != null) {
// Delete all tokens first
@@ -758,7 +766,7 @@ public class GenericConnector extends Ba
i++;
}
}
-
+
int accessCount = Integer.parseInt(xc);
i = 0;
while (i < accessCount) {
@@ -779,7 +787,7 @@ public class GenericConnector extends Ba
ds.addChild(ds.getChildCount(), node);
i++;
}
-
+
String op = variableContext.getParameter("paramop");
if (op != null && op.equals("Add")) {
String paramName = variableContext.getParameter("specparamname");
@@ -790,7 +798,7 @@ public class GenericConnector extends Ba
ds.addChild(ds.getChildCount(), node);
}
}
-
+
String redmineAuthMode = variableContext.getParameter("genericAuthMode");
if (redmineAuthMode != null) {
// Delete existing seeds record first
@@ -807,7 +815,7 @@ public class GenericConnector extends Ba
cn.setValue(redmineAuthMode);
ds.addChild(ds.getChildCount(), cn);
}
-
+
xc = variableContext.getParameter("tokencount");
if (xc != null) {
// Delete all tokens first
@@ -820,7 +828,7 @@ public class GenericConnector extends Ba
i++;
}
}
-
+
int accessCount = Integer.parseInt(xc);
i = 0;
while (i < accessCount) {
@@ -839,7 +847,7 @@ public class GenericConnector extends Ba
ds.addChild(ds.getChildCount(), node);
i++;
}
-
+
String op = variableContext.getParameter("accessop");
if (op != null && op.equals("Add")) {
String accessspec = variableContext.getParameter("spectoken");
@@ -848,16 +856,16 @@ public class GenericConnector extends Ba
ds.addChild(ds.getChildCount(), node);
}
}
-
+
return null;
}
-
+
@Override
public void viewSpecification(IHTTPOutput out, Locale locale, DocumentSpecification ds)
throws ManifoldCFException, IOException {
boolean seenAny;
int i;
-
+
i = 0;
seenAny = false;
while (i < ds.getChildCount()) {
@@ -875,7 +883,7 @@ public class GenericConnector extends Ba
out.print(Encoder.bodyEscape(paramName) + " = " + Encoder.bodyEscape(paramValue) + "<br/>\n");
}
}
-
+
if (seenAny) {
out.print(
" </td>\n"
@@ -884,7 +892,7 @@ public class GenericConnector extends Ba
out.print(
" <tr><td class=\"message\" colspan=\"4\"><nobr>" + Messages.getBodyString(locale, "generic.NoParametersSpecified") + "</nobr></td></tr>\n");
}
-
+
out.print(
" <tr><td class=\"separator\" colspan=\"4\"><hr/></td></tr>\n");
@@ -905,7 +913,7 @@ public class GenericConnector extends Ba
out.print(Encoder.bodyEscape(token) + "<br/>\n");
}
}
-
+
if (seenAny) {
out.print(
" </td>\n"
@@ -917,11 +925,11 @@ public class GenericConnector extends Ba
out.print(
" <tr><td class=\"separator\" colspan=\"4\"><hr/></td></tr>\n");
}
-
+
private String getParam(ConfigParams parameters, String name, String def) {
return parameters.getParameter(name) != null ? parameters.getParameter(name) : def;
}
-
+
private boolean copyParam(IPostParameters variableContext, ConfigParams parameters, String name) {
String val = variableContext.getParameter(name);
if (val == null) {
@@ -930,7 +938,7 @@ public class GenericConnector extends Ba
parameters.setParameter(name, val);
return true;
}
-
+
protected static String[] getAcls(DocumentSpecification spec) {
HashMap map = new HashMap();
int i = 0;
@@ -941,7 +949,7 @@ public class GenericConnector extends Ba
map.put(token, token);
}
}
-
+
String[] rval = new String[map.size()];
Iterator iter = map.keySet().iterator();
i = 0;
@@ -950,7 +958,7 @@ public class GenericConnector extends Ba
}
return rval;
}
-
+
protected static void handleIOException(IOException e)
throws ManifoldCFException, ServiceInterruption {
if (!(e instanceof java.net.SocketTimeoutException) && (e instanceof InterruptedIOException)) {
@@ -961,38 +969,38 @@ public class GenericConnector extends Ba
throw new ServiceInterruption("IO exception: " + e.getMessage(), e, currentTime + 300000L,
currentTime + 3 * 60 * 60000L, -1, false);
}
-
+
static class PreemptiveAuth implements HttpRequestInterceptor {
-
+
private Credentials credentials;
-
+
public PreemptiveAuth(Credentials creds) {
this.credentials = creds;
}
-
+
@Override
public void process(final HttpRequest request, final HttpContext context) throws HttpException, IOException {
request.addHeader(BasicScheme.authenticate(credentials, "US-ASCII", false));
}
}
-
+
protected static class CheckThread extends Thread {
-
+
protected HttpClient client;
-
+
protected String url;
-
+
protected Throwable exception = null;
-
+
protected String result = "Unknown";
-
+
public CheckThread(HttpClient client, String url) {
super();
setDaemon(true);
this.client = client;
this.url = url;
}
-
+
@Override
public void run() {
HttpGet method = new HttpGet(url);
@@ -1013,26 +1021,26 @@ public class GenericConnector extends Ba
exception = ex;
}
}
-
+
public Throwable getException() {
return exception;
}
-
+
public String getResult() {
return result;
}
}
-
+
protected static class ExecuteSeedingThread extends Thread {
-
+
protected HttpClient client;
-
+
protected String url;
-
+
protected ISeedingActivity activities;
-
+
protected Throwable exception = null;
-
+
public ExecuteSeedingThread(HttpClient client, ISeedingActivity activities, String url) {
super();
setDaemon(true);
@@ -1040,11 +1048,11 @@ public class GenericConnector extends Ba
this.url = url;
this.activities = activities;
}
-
+
@Override
public void run() {
HttpGet method = new HttpGet(url.toString());
-
+
try {
HttpResponse response = client.execute(method);
try {
@@ -1052,7 +1060,7 @@ public class GenericConnector extends Ba
exception = new ManifoldCFException("addSeedDocuments error - interface returned incorrect return code");
return;
}
-
+
try {
SAXParserFactory factory = SAXParserFactory.newInstance();
factory.setNamespaceAware(true);
@@ -1074,30 +1082,30 @@ public class GenericConnector extends Ba
exception = ex;
}
}
-
+
public Throwable getException() {
return exception;
}
}
-
+
protected static class DocumentVersionThread extends Thread {
-
+
protected HttpClient client;
-
+
protected String url;
-
+
protected Throwable exception = null;
-
+
protected String[] versions;
-
+
protected ConcurrentHashMap<String, Item> documentCache;
-
+
protected String[] documentIdentifiers;
-
+
protected String genericAuthMode;
-
+
protected String defaultRights;
-
+
public DocumentVersionThread(HttpClient client, String url, String[] documentIdentifiers, String genericAuthMode, String defaultRights, ConcurrentHashMap<String, Item> documentCache) {
super();
setDaemon(true);
@@ -1112,12 +1120,12 @@ public class GenericConnector extends Ba
versions[i] = null;
}
}
-
+
@Override
public void run() {
try {
HttpGet method = new HttpGet(url.toString());
-
+
HttpResponse response = client.execute(method);
try {
if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
@@ -1153,24 +1161,84 @@ public class GenericConnector extends Ba
exception = ex;
}
}
-
+
public Throwable getException() {
return exception;
}
-
+
public String[] getVersions() {
return versions;
}
}
-
+
+ protected static class ExecuteProcessThread extends Thread {
+
+ protected HttpClient client;
+
+ protected String url;
+
+ protected String sourceUrl;
+
+ protected IProcessActivity activities;
+
+ protected Throwable exception = null;
+
+ protected String id;
+
+ protected String version;
+
+ RepositoryDocument doc;
+
+ public ExecuteProcessThread(HttpClient client, IProcessActivity activities, RepositoryDocument doc, String url, String sourceUrl, String id, String version) {
+ super();
+ setDaemon(true);
+ this.client = client;
+ this.url = url;
+ this.sourceUrl = sourceUrl;
+ this.activities = activities;
+ this.id = id;
+ this.version = version;
+ this.doc = doc;
+ }
+
+ @Override
+ public void run() {
+ try {
+ HttpGet method = new HttpGet(sourceUrl);
+ HttpResponse response = client.execute(method);
+ try {
+ if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
+ exception = new ManifoldCFException("processDocuments error - interface returned incorrect return code for: " + id);
+ }
+
+ doc.setBinary(response.getEntity().getContent(), response.getEntity().getContentLength());
+ activities.ingestDocument(id, version, url, doc);
+ } catch (ManifoldCFException ex) {
+ exception = ex;
+ } catch (ServiceInterruption ex) {
+ exception = ex;
+ } finally {
+ EntityUtils.consume(response.getEntity());
+ method.releaseConnection();
+ }
+ } catch (IOException ex) {
+ exception = ex;
+ }
+ }
+
+ public Throwable getException() {
+ return exception;
+ }
+ }
+
static public class SAXSeedingHandler extends DefaultHandler {
-
+
protected ISeedingActivity activities;
-
+
public SAXSeedingHandler(ISeedingActivity activities) {
this.activities = activities;
}
-
+
@Override
public void startElement(String uri, String localName, String qName, Attributes attributes) throws SAXException {
if ("seed".equals(localName) && attributes.getValue("id") != null) {
@@ -1182,56 +1250,56 @@ public class GenericConnector extends Ba
}
}
}
-
+
@XmlRootElement(name = "meta")
public static class Meta {
-
+
@XmlAttribute(name = "name")
String name;
-
+
@XmlValue
String value;
}
-
+
@XmlRootElement(name = "item")
public static class Item {
-
+
@XmlAttribute(name = "id", required = true)
String id;
-
+
@XmlElement(name = "url", required = true)
String url;
-
+
@XmlElement(name = "version", required = true)
String version;
-
+
@XmlElement(name = "content")
String content;
-
+
@XmlElement(name = "mimetype")
String mimeType;
-
+
@XmlElement(name = "created")
@XmlJavaTypeAdapter(DateAdapter.class)
Date created;
-
+
@XmlElement(name = "updated")
@XmlJavaTypeAdapter(DateAdapter.class)
Date updated;
-
+
@XmlElement(name = "filename")
String fileName;
-
+
@XmlElementWrapper(name = "metadata")
@XmlElements({
@XmlElement(name = "meta", type = Meta.class)})
List<Meta> metadata;
-
+
@XmlElementWrapper(name = "auth")
@XmlElements({
@XmlElement(name = "token", type = String.class)})
List<String> auth;
-
+
public String getVersionString() {
if (version == null) {
return "";
@@ -1245,10 +1313,10 @@ public class GenericConnector extends Ba
return sb.toString();
}
}
-
+
@XmlRootElement(name = "items")
public static class Items {
-
+
@XmlElements({
@XmlElement(name = "item", type = Item.class)})
List<Item> items;