You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2014/10/25 09:40:17 UTC
svn commit: r1634189 - in /manifoldcf/branches/dev_1x: ./
connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/
Author: kwright
Date: Sat Oct 25 07:40:17 2014
New Revision: 1634189
URL: http://svn.apache.org/r1634189
Log:
Pull up more changes for CONNECTORS-1077 from trunk
Modified:
manifoldcf/branches/dev_1x/ (props changed)
manifoldcf/branches/dev_1x/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/AmazonCloudSearchConnector.java
manifoldcf/branches/dev_1x/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentChunkManager.java
manifoldcf/branches/dev_1x/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentRecord.java
Propchange: manifoldcf/branches/dev_1x/
------------------------------------------------------------------------------
Merged /manifoldcf/trunk:r1634188
Modified: manifoldcf/branches/dev_1x/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/AmazonCloudSearchConnector.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/dev_1x/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/AmazonCloudSearchConnector.java?rev=1634189&r1=1634188&r2=1634189&view=diff
==============================================================================
--- manifoldcf/branches/dev_1x/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/AmazonCloudSearchConnector.java (original)
+++ manifoldcf/branches/dev_1x/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/AmazonCloudSearchConnector.java Sat Oct 25 07:40:17 2014
@@ -53,6 +53,7 @@ import org.apache.manifoldcf.agents.inte
import org.apache.manifoldcf.agents.interfaces.IOutputNotifyActivity;
import org.apache.manifoldcf.agents.interfaces.IOutputRemoveActivity;
import org.apache.manifoldcf.agents.interfaces.IOutputCheckActivity;
+import org.apache.manifoldcf.agents.interfaces.IOutputHistoryActivity;
import org.apache.manifoldcf.agents.interfaces.RepositoryDocument;
import org.apache.manifoldcf.agents.interfaces.ServiceInterruption;
import org.apache.manifoldcf.agents.output.BaseOutputConnector;
@@ -70,6 +71,7 @@ import org.apache.manifoldcf.core.interf
import org.apache.manifoldcf.core.interfaces.BinaryInput;
import org.apache.manifoldcf.core.interfaces.TempFileInput;
import org.apache.manifoldcf.core.interfaces.VersionContext;
+import org.apache.manifoldcf.core.common.DateParser;
import org.apache.manifoldcf.agents.system.ManifoldCF;
import org.apache.manifoldcf.agents.system.Logging;
@@ -113,6 +115,9 @@ public class AmazonCloudSearchConnector
/** cloudsearch field name for file body text. */
private static final String FILE_BODY_TEXT_FIELDNAME = "f_bodytext";
+ /** Field name we use for document's URI. */
+ private static final String DOCUMENT_URI_FIELDNAME = "document_URI";
+
/** Constructor.
*/
public AmazonCloudSearchConnector(){
@@ -305,6 +310,7 @@ public class AmazonCloudSearchConnector
private final static Set<String> acceptableMimeTypes = new HashSet<String>();
static
{
+ // We presume input can be decoded using UTF-8, so we can accept only UTF-8 and others for which this also applies
acceptableMimeTypes.add("text/plain;charset=utf-8");
acceptableMimeTypes.add("text/plain;charset=ascii");
acceptableMimeTypes.add("text/plain;charset=us-ascii");
@@ -321,6 +327,8 @@ public class AmazonCloudSearchConnector
public boolean checkMimeTypeIndexable(VersionContext outputDescription, String mimeType, IOutputCheckActivity activities)
throws ManifoldCFException, ServiceInterruption
{
+ if (mimeType == null)
+ return false;
return acceptableMimeTypes.contains(mimeType.toLowerCase(Locale.ROOT));
}
@@ -374,7 +382,7 @@ public class AmazonCloudSearchConnector
{
for (int i = 0; i < elements.length; i++)
{
- elements[i] = new JSONStringReader(((Date)fieldValues[i]).toString());
+ elements[i] = new JSONStringReader(DateParser.formatISO8601Date((Date)fieldValues[i]));
}
}
else if (fieldValues instanceof String[])
@@ -390,12 +398,16 @@ public class AmazonCloudSearchConnector
fieldReader.addNameValuePair(new JSONNameValueReader(new JSONStringReader(fieldName),new JSONArrayReader(elements)));
}
+ // Add in the original URI
+ fieldReader.addNameValuePair(new JSONNameValueReader(new JSONStringReader(DOCUMENT_URI_FIELDNAME),
+ new JSONStringReader(documentURI)));
+
// Add the primary content data in.
fieldReader.addNameValuePair(new JSONNameValueReader(new JSONStringReader(FILE_BODY_TEXT_FIELDNAME),
new JSONStringReader(new InputStreamReader(document.getBinaryStream(),Consts.UTF_8))));
- documentChunkManager.recordDocument(uid, serverHost, serverPath, new ReaderInputStream(objectReader, Consts.UTF_8));
- conditionallyFlushDocuments();
+ documentChunkManager.recordDocument(uid, serverHost, serverPath, documentURI, INGEST_ACTIVITY, new Long(document.getBinaryLength()), new ReaderInputStream(objectReader, Consts.UTF_8));
+ conditionallyFlushDocuments(activities);
return DOCUMENTSTATUS_ACCEPTED;
}
@@ -423,32 +435,32 @@ public class AmazonCloudSearchConnector
try
{
- documentChunkManager.recordDocument(uid, serverHost, serverPath, new ReaderInputStream(objectReader, Consts.UTF_8));
+ documentChunkManager.recordDocument(uid, serverHost, serverPath, documentURI, REMOVE_ACTIVITY, null, new ReaderInputStream(objectReader, Consts.UTF_8));
}
catch (IOException e)
{
handleIOException(e);
}
- conditionallyFlushDocuments();
+ conditionallyFlushDocuments(activities);
}
@Override
public void noteJobComplete(IOutputNotifyActivity activities)
throws ManifoldCFException, ServiceInterruption {
getSession();
- flushDocuments();
+ flushDocuments(activities);
}
protected static final int CHUNK_SIZE = 1000;
- protected void conditionallyFlushDocuments()
+ protected void conditionallyFlushDocuments(IOutputHistoryActivity activities)
throws ManifoldCFException, ServiceInterruption
{
if (documentChunkManager.equalOrMoreThan(serverHost, serverPath, CHUNK_SIZE))
- flushDocuments();
+ flushDocuments(activities);
}
- protected void flushDocuments()
+ protected void flushDocuments(IOutputHistoryActivity activities)
throws ManifoldCFException, ServiceInterruption
{
Logging.ingest.info("AmazonCloudSearch: Starting flush to Amazon");
@@ -476,15 +488,43 @@ public class AmazonCloudSearchConnector
String status = getStatusFromJsonResponse(responsbody);
if("success".equals(status))
{
+ // Activity-log the individual documents we sent
+ for (DocumentRecord dr : records)
+ {
+ activities.recordActivity(null,dr.getActivity(),dr.getDataSize(),dr.getUri(),"OK",null);
+ }
Logging.ingest.info("AmazonCloudSearch: Successfully sent document chunk " + chunkNumber);
//remove documents from table..
documentChunkManager.deleteChunk(records);
}
else
{
- Logging.ingest.error("AmazonCloudSearch: Error sending document chunk "+ chunkNumber+": "+ responsbody);
- throw new ManifoldCFException("recieved error status from service after feeding document. response body : " + responsbody);
+ // Activity-log the individual documents that failed
+ for (DocumentRecord dr : records)
+ {
+ activities.recordActivity(null,dr.getActivity(),dr.getDataSize(),dr.getUri(),"FAILED",responsbody);
+ }
+ Logging.ingest.error("AmazonCloudSearch: Error sending document chunk "+ chunkNumber+": '"+ responsbody + "'");
+ throw new ManifoldCFException("Received error status from service after feeding document. Response body: '" + responsbody +"'");
+ }
+ }
+ catch (ManifoldCFException e)
+ {
+ if (e.getErrorCode() == ManifoldCFException.INTERRUPTED)
+ throw e;
+ for (DocumentRecord dr : records)
+ {
+ activities.recordActivity(null,dr.getActivity(),dr.getDataSize(),dr.getUri(),e.getClass().getSimpleName().toUpperCase(Locale.ROOT),e.getMessage());
+ }
+ throw e;
+ }
+ catch (ServiceInterruption e)
+ {
+ for (DocumentRecord dr : records)
+ {
+ activities.recordActivity(null,dr.getActivity(),dr.getDataSize(),dr.getUri(),e.getClass().getSimpleName().toUpperCase(Locale.ROOT),e.getMessage());
}
+ throw e;
}
finally
{
Modified: manifoldcf/branches/dev_1x/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentChunkManager.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/dev_1x/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentChunkManager.java?rev=1634189&r1=1634188&r2=1634189&view=diff
==============================================================================
--- manifoldcf/branches/dev_1x/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentChunkManager.java (original)
+++ manifoldcf/branches/dev_1x/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentChunkManager.java Sat Oct 25 07:40:17 2014
@@ -42,6 +42,9 @@ public class DocumentChunkManager extend
{
// Database fields
private final static String UID_FIELD = "uid"; // This is the document key, which is a dochash value
+ private final static String URI_FIELD = "documenturi"; // This is the document URI in plain text
+ private final static String ACTIVITY_FIELD = "activity"; // This is a flag, for activity logging, describing whether this is an indexing operation or a delete
+ private final static String LENGTH_FIELD = "doclength"; // Binary length of original document
private final static String HOST_FIELD = "serverhost"; // The host and path are there to make sure we don't collide between connections
private final static String PATH_FIELD = "serverpath";
private final static String SDF_DATA_FIELD = "sdfdata";
@@ -68,12 +71,24 @@ public class DocumentChunkManager extend
map.put(UID_FIELD,new ColumnDescription("VARCHAR(40)",false,false,null,null,false));
map.put(HOST_FIELD,new ColumnDescription("VARCHAR(255)",false,false,null,null,false));
map.put(PATH_FIELD,new ColumnDescription("VARCHAR(255)",false,false,null,null,false));
+ map.put(URI_FIELD,new ColumnDescription("LONGTEXT",false,true,null,null,false));
+ map.put(ACTIVITY_FIELD,new ColumnDescription("VARCHAR(255)",false,true,null,null,false));
+ map.put(LENGTH_FIELD,new ColumnDescription("BIGINT",false,true,null,null,false));
map.put(SDF_DATA_FIELD,new ColumnDescription("BLOB",false,true,null,null,false));
performCreate(map,null);
}
else
{
// Upgrade code, if needed, goes here
+ if (existing.get(URI_FIELD) == null)
+ {
+ // Add the new columns
+ HashMap map = new HashMap();
+ map.put(URI_FIELD,new ColumnDescription("LONGTEXT",false,true,null,null,false));
+ map.put(ACTIVITY_FIELD,new ColumnDescription("VARCHAR(255)",false,true,null,null,false));
+ map.put(LENGTH_FIELD,new ColumnDescription("BIGINT",false,true,null,null,false));
+ performAlter(map,null,null,null);
+ }
}
// Handle indexes, if needed
@@ -116,7 +131,7 @@ public class DocumentChunkManager extend
* @param sdfData document SDF data.
* @throws ManifoldCFException
*/
- public void recordDocument(String uid, String host, String path, InputStream sdfData)
+ public void recordDocument(String uid, String host, String path, String uri, String activity, Long length, InputStream sdfData)
throws ManifoldCFException, IOException
{
TempFileInput tfi = null;
@@ -154,6 +169,10 @@ public class DocumentChunkManager extend
Map<String,Object> parameterMap = new HashMap<String,Object>();
parameterMap.put(SDF_DATA_FIELD, tfi);
+ parameterMap.put(URI_FIELD, uri);
+ parameterMap.put(ACTIVITY_FIELD, activity);
+ if (length != null)
+ parameterMap.put(LENGTH_FIELD, length);
//if record exists on table, update record.
if(set.getRowCount() > 0)
@@ -250,6 +269,9 @@ public class DocumentChunkManager extend
IResultRow row = set.getRow(i);
rval[i] = new DocumentRecord(host,path,
(String)row.getValue(UID_FIELD),
+ (String)row.getValue(URI_FIELD),
+ (String)row.getValue(ACTIVITY_FIELD),
+ (Long)row.getValue(LENGTH_FIELD),
(BinaryInput)row.getValue(SDF_DATA_FIELD));
}
return rval;
Modified: manifoldcf/branches/dev_1x/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentRecord.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/dev_1x/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentRecord.java?rev=1634189&r1=1634188&r2=1634189&view=diff
==============================================================================
--- manifoldcf/branches/dev_1x/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentRecord.java (original)
+++ manifoldcf/branches/dev_1x/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentRecord.java Sat Oct 25 07:40:17 2014
@@ -27,13 +27,19 @@ public class DocumentRecord {
protected final String host;
protected final String path;
protected final String uid;
+ protected final String uri;
+ protected final String activity;
+ protected final Long dataSize;
protected final BinaryInput data;
- public DocumentRecord(String host, String path, String uid, BinaryInput data)
+ public DocumentRecord(String host, String path, String uid, String uri, String activity, Long dataSize, BinaryInput data)
{
this.host = host;
this.path = path;
this.uid = uid;
+ this.uri = uri;
+ this.activity = activity;
+ this.dataSize = dataSize;
this.data = data;
}
@@ -52,6 +58,21 @@ public class DocumentRecord {
return uid;
}
+ public String getUri()
+ {
+ return uri;
+ }
+
+ public String getActivity()
+ {
+ return activity;
+ }
+
+ public Long getDataSize()
+ {
+ return dataSize;
+ }
+
public long getStreamLength()
throws ManifoldCFException
{