You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2014/10/25 09:38:24 UTC

svn commit: r1634188 - in /manifoldcf/trunk/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch: AmazonCloudSearchConnector.java DocumentChunkManager.java DocumentRecord.java

Author: kwright
Date: Sat Oct 25 07:38:23 2014
New Revision: 1634188

URL: http://svn.apache.org/r1634188
Log:
Fix CONNECTORS-1077 for Amazon Cloud Search connector.

Modified:
    manifoldcf/trunk/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/AmazonCloudSearchConnector.java
    manifoldcf/trunk/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentChunkManager.java
    manifoldcf/trunk/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentRecord.java

Modified: manifoldcf/trunk/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/AmazonCloudSearchConnector.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/AmazonCloudSearchConnector.java?rev=1634188&r1=1634187&r2=1634188&view=diff
==============================================================================
--- manifoldcf/trunk/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/AmazonCloudSearchConnector.java (original)
+++ manifoldcf/trunk/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/AmazonCloudSearchConnector.java Sat Oct 25 07:38:23 2014
@@ -53,6 +53,7 @@ import org.apache.manifoldcf.agents.inte
 import org.apache.manifoldcf.agents.interfaces.IOutputNotifyActivity;
 import org.apache.manifoldcf.agents.interfaces.IOutputRemoveActivity;
 import org.apache.manifoldcf.agents.interfaces.IOutputCheckActivity;
+import org.apache.manifoldcf.agents.interfaces.IOutputHistoryActivity;
 import org.apache.manifoldcf.agents.interfaces.RepositoryDocument;
 import org.apache.manifoldcf.agents.interfaces.ServiceInterruption;
 import org.apache.manifoldcf.agents.output.BaseOutputConnector;
@@ -70,6 +71,7 @@ import org.apache.manifoldcf.core.interf
 import org.apache.manifoldcf.core.interfaces.BinaryInput;
 import org.apache.manifoldcf.core.interfaces.TempFileInput;
 import org.apache.manifoldcf.core.interfaces.VersionContext;
+import org.apache.manifoldcf.core.common.DateParser;
 import org.apache.manifoldcf.agents.system.ManifoldCF;
 import org.apache.manifoldcf.agents.system.Logging;
 
@@ -113,6 +115,9 @@ public class AmazonCloudSearchConnector 
   /** cloudsearch field name for file body text. */
   private static final String FILE_BODY_TEXT_FIELDNAME = "f_bodytext";
   
+  /** Field name we use for document's URI. */
+  private static final String DOCUMENT_URI_FIELDNAME = "document_URI";
+  
   /** Constructor.
    */
   public AmazonCloudSearchConnector(){
@@ -305,6 +310,7 @@ public class AmazonCloudSearchConnector 
   private final static Set<String> acceptableMimeTypes = new HashSet<String>();
   static
   {
+    // We presume input can be decoded using UTF-8, so we can accept only UTF-8 and others for which this also applies
     acceptableMimeTypes.add("text/plain;charset=utf-8");
     acceptableMimeTypes.add("text/plain;charset=ascii");
     acceptableMimeTypes.add("text/plain;charset=us-ascii");
@@ -321,6 +327,8 @@ public class AmazonCloudSearchConnector 
   public boolean checkMimeTypeIndexable(VersionContext outputDescription, String mimeType, IOutputCheckActivity activities)
     throws ManifoldCFException, ServiceInterruption
   {
+    if (mimeType == null)
+      return false;
     return acceptableMimeTypes.contains(mimeType.toLowerCase(Locale.ROOT));
   }
 
@@ -374,7 +382,7 @@ public class AmazonCloudSearchConnector 
       {
         for (int i = 0; i < elements.length; i++)
         {
-          elements[i] = new JSONStringReader(((Date)fieldValues[i]).toString());
+          elements[i] = new JSONStringReader(DateParser.formatISO8601Date((Date)fieldValues[i]));
         }
       }
       else if (fieldValues instanceof String[])
@@ -390,12 +398,16 @@ public class AmazonCloudSearchConnector 
       fieldReader.addNameValuePair(new JSONNameValueReader(new JSONStringReader(fieldName),new JSONArrayReader(elements)));
     }
     
+    // Add in the original URI
+    fieldReader.addNameValuePair(new JSONNameValueReader(new JSONStringReader(DOCUMENT_URI_FIELDNAME),
+      new JSONStringReader(documentURI)));
+
     // Add the primary content data in.
     fieldReader.addNameValuePair(new JSONNameValueReader(new JSONStringReader(FILE_BODY_TEXT_FIELDNAME),
       new JSONStringReader(new InputStreamReader(document.getBinaryStream(),Consts.UTF_8))));
     
-    documentChunkManager.recordDocument(uid, serverHost, serverPath, new ReaderInputStream(objectReader, Consts.UTF_8));
-    conditionallyFlushDocuments();
+    documentChunkManager.recordDocument(uid, serverHost, serverPath, documentURI, INGEST_ACTIVITY, new Long(document.getBinaryLength()), new ReaderInputStream(objectReader, Consts.UTF_8));
+    conditionallyFlushDocuments(activities);
     return DOCUMENTSTATUS_ACCEPTED;
   }
   
@@ -423,32 +435,32 @@ public class AmazonCloudSearchConnector 
 
     try
     {
-      documentChunkManager.recordDocument(uid, serverHost, serverPath, new ReaderInputStream(objectReader, Consts.UTF_8));
+      documentChunkManager.recordDocument(uid, serverHost, serverPath, documentURI, REMOVE_ACTIVITY, null, new ReaderInputStream(objectReader, Consts.UTF_8));
     }
     catch (IOException e)
     {
       handleIOException(e);
     }
-    conditionallyFlushDocuments();
+    conditionallyFlushDocuments(activities);
   }
   
   @Override
   public void noteJobComplete(IOutputNotifyActivity activities)
       throws ManifoldCFException, ServiceInterruption {
     getSession();
-    flushDocuments();
+    flushDocuments(activities);
   }
   
   protected static final int CHUNK_SIZE = 1000;
 
-  protected void conditionallyFlushDocuments()
+  protected void conditionallyFlushDocuments(IOutputHistoryActivity activities)
     throws ManifoldCFException, ServiceInterruption
   {
     if (documentChunkManager.equalOrMoreThan(serverHost, serverPath, CHUNK_SIZE))
-      flushDocuments();
+      flushDocuments(activities);
   }
   
-  protected void flushDocuments()
+  protected void flushDocuments(IOutputHistoryActivity activities)
     throws ManifoldCFException, ServiceInterruption
   {
     Logging.ingest.info("AmazonCloudSearch: Starting flush to Amazon");
@@ -476,15 +488,43 @@ public class AmazonCloudSearchConnector 
         String status = getStatusFromJsonResponse(responsbody);
         if("success".equals(status))
         {
+          // Activity-log the individual documents we sent
+          for (DocumentRecord dr : records)
+          {
+            activities.recordActivity(null,dr.getActivity(),dr.getDataSize(),dr.getUri(),"OK",null);
+          }
           Logging.ingest.info("AmazonCloudSearch: Successfully sent document chunk " + chunkNumber);
           //remove documents from table..
           documentChunkManager.deleteChunk(records);
         }
         else
         {
-          Logging.ingest.error("AmazonCloudSearch: Error sending document chunk "+ chunkNumber+": "+ responsbody);
-          throw new ManifoldCFException("recieved error status from service after feeding document. response body : " + responsbody);
+          // Activity-log the individual documents that failed
+          for (DocumentRecord dr : records)
+          {
+            activities.recordActivity(null,dr.getActivity(),dr.getDataSize(),dr.getUri(),"FAILED",responsbody);
+          }
+          Logging.ingest.error("AmazonCloudSearch: Error sending document chunk "+ chunkNumber+": '"+ responsbody + "'");
+          throw new ManifoldCFException("Received error status from service after feeding document.  Response body: '" + responsbody +"'");
+        }
+      }
+      catch (ManifoldCFException e)
+      {
+        if (e.getErrorCode() == ManifoldCFException.INTERRUPTED)
+          throw e;
+        for (DocumentRecord dr : records)
+        {
+          activities.recordActivity(null,dr.getActivity(),dr.getDataSize(),dr.getUri(),e.getClass().getSimpleName().toUpperCase(Locale.ROOT),e.getMessage());
+        }
+        throw e;
+      }
+      catch (ServiceInterruption e)
+      {
+        for (DocumentRecord dr : records)
+        {
+          activities.recordActivity(null,dr.getActivity(),dr.getDataSize(),dr.getUri(),e.getClass().getSimpleName().toUpperCase(Locale.ROOT),e.getMessage());
         }
+        throw e;
       }
       finally
       {

Modified: manifoldcf/trunk/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentChunkManager.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentChunkManager.java?rev=1634188&r1=1634187&r2=1634188&view=diff
==============================================================================
--- manifoldcf/trunk/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentChunkManager.java (original)
+++ manifoldcf/trunk/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentChunkManager.java Sat Oct 25 07:38:23 2014
@@ -42,6 +42,9 @@ public class DocumentChunkManager extend
 {
   // Database fields
   private final static String UID_FIELD = "uid";                        // This is the document key, which is a dochash value
+  private final static String URI_FIELD = "documenturi";            // This is the document URI in plain text
+  private final static String ACTIVITY_FIELD = "activity";          //  This is a flag, for activity logging, describing whether this is an indexing operation or a delete
+  private final static String LENGTH_FIELD = "doclength";          // Binary length of original document
   private final static String HOST_FIELD = "serverhost";            // The host and path are there to make sure we don't collide between connections
   private final static String PATH_FIELD = "serverpath";
   private final static String SDF_DATA_FIELD = "sdfdata";
@@ -68,12 +71,24 @@ public class DocumentChunkManager extend
         map.put(UID_FIELD,new ColumnDescription("VARCHAR(40)",false,false,null,null,false));
         map.put(HOST_FIELD,new ColumnDescription("VARCHAR(255)",false,false,null,null,false));
         map.put(PATH_FIELD,new ColumnDescription("VARCHAR(255)",false,false,null,null,false));
+        map.put(URI_FIELD,new ColumnDescription("LONGTEXT",false,true,null,null,false));
+        map.put(ACTIVITY_FIELD,new ColumnDescription("VARCHAR(255)",false,true,null,null,false));
+        map.put(LENGTH_FIELD,new ColumnDescription("BIGINT",false,true,null,null,false));
         map.put(SDF_DATA_FIELD,new ColumnDescription("BLOB",false,true,null,null,false));
         performCreate(map,null);
       }
       else
       {
         // Upgrade code, if needed, goes here
+        if (existing.get(URI_FIELD) == null)
+        {
+          // Add the new columns
+          HashMap map = new HashMap();
+          map.put(URI_FIELD,new ColumnDescription("LONGTEXT",false,true,null,null,false));
+          map.put(ACTIVITY_FIELD,new ColumnDescription("VARCHAR(255)",false,true,null,null,false));
+          map.put(LENGTH_FIELD,new ColumnDescription("BIGINT",false,true,null,null,false));
+          performAlter(map,null,null,null);
+        }
       }
 
       // Handle indexes, if needed
@@ -116,7 +131,7 @@ public class DocumentChunkManager extend
    * @param sdfData document SDF data.
    * @throws ManifoldCFException
    */
-  public void recordDocument(String uid, String host, String path, InputStream sdfData) 
+  public void recordDocument(String uid, String host, String path, String uri, String activity, Long length, InputStream sdfData) 
       throws ManifoldCFException, IOException
   {
     TempFileInput tfi = null;
@@ -154,6 +169,10 @@ public class DocumentChunkManager extend
             
             Map<String,Object> parameterMap = new HashMap<String,Object>();
             parameterMap.put(SDF_DATA_FIELD, tfi);
+            parameterMap.put(URI_FIELD, uri);
+            parameterMap.put(ACTIVITY_FIELD, activity);
+            if (length != null)
+              parameterMap.put(LENGTH_FIELD, length);
             
             //if record exists on table, update record.
             if(set.getRowCount() > 0)
@@ -250,6 +269,9 @@ public class DocumentChunkManager extend
       IResultRow row = set.getRow(i);
       rval[i] = new DocumentRecord(host,path,
         (String)row.getValue(UID_FIELD),
+        (String)row.getValue(URI_FIELD),
+        (String)row.getValue(ACTIVITY_FIELD),
+        (Long)row.getValue(LENGTH_FIELD),
         (BinaryInput)row.getValue(SDF_DATA_FIELD));
     }
     return rval;

Modified: manifoldcf/trunk/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentRecord.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentRecord.java?rev=1634188&r1=1634187&r2=1634188&view=diff
==============================================================================
--- manifoldcf/trunk/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentRecord.java (original)
+++ manifoldcf/trunk/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentRecord.java Sat Oct 25 07:38:23 2014
@@ -27,13 +27,19 @@ public class DocumentRecord {
   protected final String host;
   protected final String path;
   protected final String uid;
+  protected final String uri;
+  protected final String activity;
+  protected final Long dataSize;
   protected final BinaryInput data;
   
-  public DocumentRecord(String host, String path, String uid, BinaryInput data)
+  public DocumentRecord(String host, String path, String uid, String uri, String activity, Long dataSize, BinaryInput data)
   {
     this.host = host;
     this.path = path;
     this.uid = uid;
+    this.uri = uri;
+    this.activity = activity;
+    this.dataSize = dataSize;
     this.data = data;
   }
 
@@ -52,6 +58,21 @@ public class DocumentRecord {
     return uid;
   }
   
+  public String getUri()
+  {
+    return uri;
+  }
+  
+  public String getActivity()
+  {
+    return activity;
+  }
+  
+  public Long getDataSize()
+  {
+    return dataSize;
+  }
+  
   public long getStreamLength()
     throws ManifoldCFException
   {