You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2014/06/20 02:01:11 UTC
svn commit: r1604051 - in
/manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch:
AmazonCloudSearchConnector.java DocumentChunkManager.java DocumentRecord.java
Author: kwright
Date: Fri Jun 20 00:01:11 2014
New Revision: 1604051
URL: http://svn.apache.org/r1604051
Log:
Almost done; just have to write composite input stream class
Modified:
manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/AmazonCloudSearchConnector.java
manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentChunkManager.java
manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentRecord.java
Modified: manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/AmazonCloudSearchConnector.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/AmazonCloudSearchConnector.java?rev=1604051&r1=1604050&r2=1604051&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/AmazonCloudSearchConnector.java (original)
+++ manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/AmazonCloudSearchConnector.java Fri Jun 20 00:01:11 2014
@@ -32,6 +32,8 @@ import java.util.Locale;
import java.util.Set;
import java.util.HashSet;
+import org.apache.commons.io.input.ReaderInputStream;
+
import org.apache.commons.io.FilenameUtils;
import org.apache.http.Consts;
import org.apache.http.HttpEntity;
@@ -64,14 +66,6 @@ import org.apache.manifoldcf.core.interf
import org.apache.manifoldcf.core.interfaces.SpecificationNode;
import org.apache.manifoldcf.core.system.ManifoldCF;
import org.apache.manifoldcf.crawler.system.Logging;
-import org.apache.tika.exception.TikaException;
-import org.apache.tika.metadata.Metadata;
-import org.apache.tika.parser.ParseContext;
-import org.apache.tika.parser.Parser;
-import org.apache.tika.parser.AutoDetectParser;
-import org.apache.tika.sax.BodyContentHandler;
-import org.xml.sax.ContentHandler;
-import org.xml.sax.SAXException;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParseException;
@@ -82,6 +76,8 @@ import com.fasterxml.jackson.databind.Js
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.manifoldcf.core.jsongen.*;
+
public class AmazonCloudSearchConnector extends BaseOutputConnector {
/** Ingestion activity */
@@ -109,8 +105,12 @@ public class AmazonCloudSearchConnector
/** Local connection */
protected HttpPost poster = null;
+ // What we need for database keys
+ protected String serverHost = null;
+ protected String serverPath = null;
+
/** Document Chunk Manager */
- private DocumentChunkManager documentChunkManager;
+ private DocumentChunkManager documentChunkManager = null;
/** cloudsearch field name for file body text. */
private static final String FILE_BODY_TEXT_FIELDNAME = "f_bodytext";
@@ -139,7 +139,7 @@ public class AmazonCloudSearchConnector
ManifoldCF.getMasterDatabaseUsername(),
ManifoldCF.getMasterDatabasePassword());
- DocumentChunkManager dcmanager = new DocumentChunkManager(threadContext,mainDatabase);
+ DocumentChunkManager dcmanager = new DocumentChunkManager(mainDatabase);
dcmanager.install();
}
@@ -192,10 +192,9 @@ public class AmazonCloudSearchConnector
public void disconnect()
throws ManifoldCFException
{
- if (poster != null)
- {
- poster = null;
- }
+ serverHost = null;
+ serverPath = null;
+ poster = null;
super.disconnect();
}
@@ -212,10 +211,10 @@ public class AmazonCloudSearchConnector
documentChunkManager = new DocumentChunkManager(currentContext,databaseHandle);
}
- String serverHost = params.getParameter(AmazonCloudSearchConfig.SERVER_HOST);
+ serverHost = params.getParameter(AmazonCloudSearchConfig.SERVER_HOST);
if (serverHost == null)
throw new ManifoldCFException("Server host parameter required");
- String serverPath = params.getParameter(AmazonCloudSearchConfig.SERVER_PATH);
+ serverPath = params.getParameter(AmazonCloudSearchConfig.SERVER_PATH);
if (serverPath == null)
throw new ManifoldCFException("Server path parameter required");
String proxyProtocol = params.getParameter(AmazonCloudSearchConfig.PROXY_PROTOCOL);
@@ -252,7 +251,7 @@ public class AmazonCloudSearchConnector
public String check() throws ManifoldCFException {
try {
getSession();
- String responsbody = postData("[]");
+ String responsbody = postData(new InputStreamReader(new StringReader("[]"),Consts.UTF_8));
String status = "";
try
{
@@ -401,98 +400,60 @@ public class AmazonCloudSearchConnector
SpecPacker sp = new SpecPacker(outputDescription);
- String jsondata = "";
- try {
- //build json..
- SDFModel model = new SDFModel();
- Document doc = model.new Document();
- doc.setType("add");
- doc.setId(ManifoldCF.hash(documentURI));
-
- HashMap fields = new HashMap();
- Metadata metadata = extractBinaryFile(document, fields);
-
- Iterator<String> itr = document.getFields();
- while(itr.hasNext())
+ String uid = ManifoldCF.hash(documentURI);
+
+ // Build a JSON generator
+ JSONObjectReader objectReader = new JSONObjectReader();
+ // Build the metadata field part
+ JSONObjectReader fieldReader = new JSONObjectReader();
+ // Add the type and ID
+ objectReader.addNameValuePair(new JSONNameValueReader(new JSONStringValue("id"),new JSONStringValue(uid)))
+ .addNameValuePair(new JSONNameValueReader(new JSONStringValue("type"),new JSONStringValue("add")))
+ .addNameValuePair(new JSONNameValueReader(new JSONStringValue("fields"),fieldReader));
+
+ // Populate the fields...
+ Iterator<String> itr = document.getFields();
+ while (itr.hasNext())
+ {
+ String fieldName = itr.next();
+ Object[] fieldValues = document.getField(fieldName);
+ JSONReader[] elements = new JSONReader[fieldValues.length];
+ if (fieldValues instanceof Reader[])
{
- String fName = itr.next();
- Object[] value = document.getField(fName);
- String target = sp.getMapping(fName);
- if(target!=null)
+ for (int i = 0; i < elements.length; i++)
{
- fields.put(target, value);
- }
- else
- {
- if(sp.keepAllMetadata())
- {
- fields.put(fName, value);
- }
+ elements[i] = new JSONStringReader((Reader)fieldValues[i]);
}
}
-
- //metadata of binary files.
- String[] metaNames = metadata.names();
- for(String mName : metaNames){
- String value = metadata.get(mName);
- String target = sp.getMapping(mName);
- if(target!=null)
+ else if (fieldValues instanceof Date[])
+ {
+ for (int i = 0; i < elements.length; i++)
{
- fields.put(target, value);
+ elements[i] = new JSONStringReader(((Date)fieldValues[i]).toString());
}
- else
+ }
+ else if (fieldValues instanceof String[])
+ {
+ for (int i = 0; i < elements.length; i++)
{
- if(sp.keepAllMetadata())
- {
- fields.put(mName, value);
- }
+ elements[i] = new JSONStringReader((String)fieldValues[i]);
}
}
- doc.setFields(fields);
- model.addDocument(doc);
-
- //generate json data.
- jsondata = model.toJSON();
+ else
+ throw IllegalStateException("Unexpected metadata type: "+fieldValues.getClass().getName());
- documentChunkManager.addDocument(doc.getId(), false, jsondata);
- return DOCUMENTSTATUS_ACCEPTED;
- }
- catch (SAXException e) {
- // if document data could not be converted to JSON by jackson.
- Logging.connectors.debug(e);
- throw new ManifoldCFException(e);
- } catch (JsonProcessingException e) {
- // if document data could not be converted to JSON by jackson.
- Logging.connectors.debug(e);
- throw new ManifoldCFException(e);
- } catch (TikaException e) {
- // if document could not be parsed by tika.
- Logging.connectors.debug(e);
- return DOCUMENTSTATUS_REJECTED;
- } catch (IOException e) {
- // if document data could not be read when the document parsing by tika.
- Logging.connectors.debug(e);
- throw new ManifoldCFException(e);
+ fieldReader.addNameValuePair(new JSONNameValueReader(new JSONStringReader(fieldName),new JSONArrayReader(elements)));
}
- }
-
- private Metadata extractBinaryFile(RepositoryDocument document, HashMap fields)
- throws IOException, SAXException, TikaException {
- //extract body text and metadata fields from binary file.
- InputStream is = document.getBinaryStream();
- Parser parser = new AutoDetectParser();
- ContentHandler handler = new BodyContentHandler();
- Metadata metadata = new Metadata();
- parser.parse(is, handler, metadata, new ParseContext());
- String bodyStr = handler.toString();
- if(bodyStr != null){
- bodyStr = handler.toString().replaceAll("\\n", "").replaceAll("\\t", "");
- fields.put(FILE_BODY_TEXT_FIELDNAME, bodyStr);
- }
- return metadata;
+ // Add the primary content data in.
+ fieldReader.addNameValuePair(new JSONNameValueReader(new JSONStringReader(FILE_BODY_TEXT_FIELDNAME),
+ new JSONStringReader(new InputStreamReader(document.getBinaryStream(),Consts.UTF_8))));
+
+ documentChunkManager.addOrReplaceDocument(uid, serverHost, serverPath, new ReaderInputStream(objectReader, Consts.UTF_8));
+
+ return DOCUMENTSTATUS_ACCEPTED;
}
-
+
/** Remove a document using the connector.
* Note that the last outputDescription is included, since it may be necessary for the connector to use such information to know how to properly remove the document.
*@param documentURI is the URI of the document. The URI is presumed to be the unique identifier which the output data store will use to process
@@ -507,20 +468,15 @@ public class AmazonCloudSearchConnector
// Establish a session
getSession();
- String jsonData = "";
- try {
- SDFModel model = new SDFModel();
- SDFModel.Document doc = model.new Document();
- doc.setType("delete");
- doc.setId(ManifoldCF.hash(documentURI));
- model.addDocument(doc);
- jsonData = model.toJSON();
-
- documentChunkManager.addDocument(doc.getId(), false, jsonData);
-
- } catch (JsonProcessingException e) {
- throw new ManifoldCFException(e);
- }
+ String uid = ManifoldCF.hash(documentURI);
+
+ // Build a JSON generator
+ JSONObjectReader objectReader = new JSONObjectReader();
+ // Add the type and ID
+ objectReader.addNameValuePair(new JSONNameValueReader(new JSONStringValue("id"),new JSONStringValue(uid)))
+ .addNameValuePair(new JSONNameValueReader(new JSONStringValue("type"),new JSONStringValue("delete")))
+
+ documentChunkManager.removeDocument(uid, serverHost, serverPath, new ReaderInputStream(objectReader, Consts.UTF_8));
}
@Override
@@ -528,32 +484,83 @@ public class AmazonCloudSearchConnector
throws ManifoldCFException, ServiceInterruption {
getSession();
- final int chunkNum = documentChunkManager.getDocumentChunkNum();
-
- for(int i = 0;i<chunkNum;i++)
+ // Repeat until we are empty of cached stuff
+ int chunkNumber = 0;
+ while (true)
{
- String chunk = documentChunkManager.buildDocumentChunk(String.valueOf(i), i);
-
- //post data..
- String responsbody = postData(chunk);
- // check status
- String status = getStatusFromJsonResponse(responsbody);
- if("success".equals(status))
+ DocumentRecord[] records = documentChunkManager.readChunk(serverHost, serverPath, 1000);
+ try
{
- int n = i + 1;
- Logging.connectors.info("successfully send document chunk " + n + " of " + chunkNum);
+ if (records.length == 0)
+ break;
+ // The records consist of up to 1000 individual input streams, which must be all concatenated together into the post
+ // To do that, we've got a composite input stream object we glue all these together with.
+ CompositeInputStream cis = new CompositeInputStream();
+ for (DocumentRecord dr : records)
+ {
+ cis.addStream(dr.getDataStream());
+ }
- //remove documents from table..
- documentChunkManager.removeDocumentsInChunk(String.valueOf(i));
+ //post data..
+ String responsbody = postData(cis);
+ // check status
+ String status = getStatusFromJsonResponse(responsbody);
+ if("success".equals(status))
+ {
+ Logging.connectors.info("AmazonCloudSearch: Successfully sent document chunk " + chunkNumber);
+ //remove documents from table..
+ documentChunkManager.deleteChunk(records);
+ }
+ else
+ {
+ Logging.connectors.error("AmazonCloudSearch: Error sending document chunk "+ chunkNumber+": "+ responseBody);
+ throw new ManifoldCFException("recieved error status from service after feeding document. response body : " + responsbody);
+ }
}
- else
+ finally
{
- throw new ManifoldCFException("recieved error status from service after feeding document. response body : " + responsbody);
+ Throwable exception = null;
+ for (DocumentRecord dr : records)
+ {
+ try
+ {
+ dr.close();
+ }
+ catch (Throwable e)
+ {
+ exception = e;
+ }
+ }
+ if (exception != null)
+ {
+ if (exception instanceof ManifoldCFException)
+ throw (ManifoldCFException)exception;
+ else if (exception instanceof Error)
+ throw (Error)exception;
+ else if (exception instanceof RuntimeException)
+ throw (RuntimeException)exception;
+ else
+ throw new RuntimeException("Unknown exception class thrown: "+exception.getClass().getName()+": "+exception.getMessage(),e);
+ }
}
-
}
}
+ protected static class CompositeInputStream extends InputStream
+ {
+
+ public CompositeInputStream()
+ {
+ }
+
+ public void addStream(InputStream stream)
+ {
+ // MHL
+ }
+
+ // MHL
+ }
+
/**
* Fill in a Server tab configuration parameter map for calling a Velocity
* template.
@@ -699,10 +706,10 @@ public class AmazonCloudSearchConnector
return null;
}
- private String postData(String jsonData) throws ServiceInterruption, ManifoldCFException {
+ private String postData(InputStream jsonData) throws ServiceInterruption, ManifoldCFException {
CloseableHttpClient httpclient = HttpClients.createDefault();
try {
- poster.setEntity(new StringEntity(jsonData, Consts.UTF_8));
+ poster.setEntity(new InputStreamEntity(jsonData));
HttpResponse res = httpclient.execute(poster);
HttpEntity resEntity = res.getEntity();
Modified: manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentChunkManager.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentChunkManager.java?rev=1604051&r1=1604050&r2=1604051&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentChunkManager.java (original)
+++ manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentChunkManager.java Fri Jun 20 00:01:11 2014
@@ -44,7 +44,6 @@ public class DocumentChunkManager extend
private final static String UID_FIELD = "uid"; // This is the document key, which is a dochash value
private final static String HOST_FIELD = "serverhost"; // The host and path are there to make sure we don't collide between connections
private final static String PATH_FIELD = "serverpath";
- private final static String ON_DELETE_FIELD = "ondelete";
private final static String SDF_DATA_FIELD = "sdfdata";
public DocumentChunkManager(
@@ -69,7 +68,6 @@ public class DocumentChunkManager extend
map.put(UID_FIELD,new ColumnDescription("VARCHAR(40)",false,false,null,null,false));
map.put(HOST_FIELD,new ColumnDescription("VARCHAR(255)",false,false,null,null,false));
map.put(PATH_FIELD,new ColumnDescription("VARCHAR(255)",false,false,null,null,false));
- map.put(ON_DELETE_FIELD,new ColumnDescription("CHAR(1)",false,false,null,null,false));
map.put(SDF_DATA_FIELD,new ColumnDescription("BLOB",false,true,null,null,false));
performCreate(map,null);
}
@@ -113,89 +111,12 @@ public class DocumentChunkManager extend
}
/**
- * Remove document information in table (and make a delete marker).
- * @param uid document uid
- */
- public void removeDocument(String uid, String host, String path)
- throws ManifoldCFException
- {
- while (true)
- {
- long sleepAmt = 0L;
- try
- {
- beginTransaction();
- try
- {
-
- ArrayList params = new ArrayList();
- String query = buildConjunctionClause(params,new ClauseDescription[]{
- new UnitaryClause(HOST_FIELD,host),
- new UnitaryClause(PATH_FIELD,path),
- new UnitaryClause(UID_FIELD,uid)});
-
- IResultSet set = performQuery("SELECT "+UID_FIELD+" FROM "+getTableName()+" WHERE "+
- query+" FOR UPDATE",params,null,null);
-
- Map<String,String> parameterMap = new HashMap<String,String>();
- parameterMap.put(ON_DELETE_FIELD, "1");
- parameterMap.put(SDF_DATA_FIELD, null);
-
- //if record exists on table, update record.
- if(set.getRowCount() > 0)
- {
- performUpdate(parameterMap, " WHERE "+query, whereParameters, null);
- }
- else
- {
- parameterMap.put(UID_FIELD, uid);
- parameterMap.put(HOST_FIELD, host);
- parameterMap.put(PATH_FIELD, path);
- performInsert(parameterMap, null);
- }
-
- break;
- }
- catch (ManifoldCFException e)
- {
- signalRollback();
- throw e;
- }
- catch (RuntimeException e)
- {
- signalRollback();
- throw e;
- }
- catch (Error e)
- {
- signalRollback();
- throw e;
- }
- finally
- {
- endTransaction();
- }
- }
- catch (ManifoldCFException e)
- {
- // Look for deadlock and retry if so
- if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
- {
- sleepAmt = getSleepAmt();
- continue;
- }
- throw e;
- }
- }
- }
-
- /**
- * Add/replace document information in table.
+ * Record document information for later trasmission to Amazon.
* @param uid documentuid
* @param sdfData document SDF data.
* @throws ManifoldCFException
*/
- public void addOrReplaceDocument(String uid, String host, String path, InputStream sdfData)
+ public void recordDocument(String uid, String host, String path, InputStream sdfData)
throws ManifoldCFException, IOException
{
TempFileInput tfi = null;
@@ -232,7 +153,6 @@ public class DocumentChunkManager extend
query+" FOR UPDATE",params,null,null);
Map<String,String> parameterMap = new HashMap<String,String>();
- parameterMap.put(ON_DELETE_FIELD, "0");
parameterMap.put(SDF_DATA_FIELD, tfi);
//if record exists on table, update record.
@@ -307,7 +227,6 @@ public class DocumentChunkManager extend
IResultRow row = set.getRow(i);
rval[i] = new DocumentRecord(host,path,
(String)row.getValue(UID_FIELD),
- ((String)row.getValue(ON_DELETE_FIELD)).equals("1"),
(BinaryInput)row.getValue(SDF_DATA_FIELD));
}
return rval;
Modified: manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentRecord.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentRecord.java?rev=1604051&r1=1604050&r2=1604051&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentRecord.java (original)
+++ manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentRecord.java Fri Jun 20 00:01:11 2014
@@ -27,15 +27,13 @@ public class DocumentRecord {
protected final String host;
protected final String path;
protected final String uid;
- protected final boolean delete;
protected final BinaryInput data;
- public DocumentRecord(String host, String path, String uid, boolean delete, BinaryInput data)
+ public DocumentRecord(String host, String path, String uid, BinaryInput data)
{
this.host = host;
this.path = path;
this.uid = uid;
- this.delete = delete;
this.data = data;
}
@@ -54,11 +52,6 @@ public class DocumentRecord {
return uid;
}
- public boolean getDelete()
- {
- return delete;
- }
-
public long getStreamLength()
throws ManifoldCFException
{