You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2014/06/20 02:01:11 UTC

svn commit: r1604051 - in /manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch: AmazonCloudSearchConnector.java DocumentChunkManager.java DocumentRecord.java

Author: kwright
Date: Fri Jun 20 00:01:11 2014
New Revision: 1604051

URL: http://svn.apache.org/r1604051
Log:
Almost done; just have to write composite input stream class

Modified:
    manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/AmazonCloudSearchConnector.java
    manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentChunkManager.java
    manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentRecord.java

Modified: manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/AmazonCloudSearchConnector.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/AmazonCloudSearchConnector.java?rev=1604051&r1=1604050&r2=1604051&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/AmazonCloudSearchConnector.java (original)
+++ manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/AmazonCloudSearchConnector.java Fri Jun 20 00:01:11 2014
@@ -32,6 +32,8 @@ import java.util.Locale;
 import java.util.Set;
 import java.util.HashSet;
 
+import org.apache.commons.io.input.ReaderInputStream;
+
 import org.apache.commons.io.FilenameUtils;
 import org.apache.http.Consts;
 import org.apache.http.HttpEntity;
@@ -64,14 +66,6 @@ import org.apache.manifoldcf.core.interf
 import org.apache.manifoldcf.core.interfaces.SpecificationNode;
 import org.apache.manifoldcf.core.system.ManifoldCF;
 import org.apache.manifoldcf.crawler.system.Logging;
-import org.apache.tika.exception.TikaException;
-import org.apache.tika.metadata.Metadata;
-import org.apache.tika.parser.ParseContext;
-import org.apache.tika.parser.Parser;
-import org.apache.tika.parser.AutoDetectParser;
-import org.apache.tika.sax.BodyContentHandler;
-import org.xml.sax.ContentHandler;
-import org.xml.sax.SAXException;
 
 import com.fasterxml.jackson.core.JsonFactory;
 import com.fasterxml.jackson.core.JsonParseException;
@@ -82,6 +76,8 @@ import com.fasterxml.jackson.databind.Js
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 
+import org.apache.manifoldcf.core.jsongen.*;
+
 public class AmazonCloudSearchConnector extends BaseOutputConnector {
 
   /** Ingestion activity */
@@ -109,8 +105,12 @@ public class AmazonCloudSearchConnector 
   /** Local connection */
   protected HttpPost poster = null;
   
+  // What we need for database keys
+  protected String serverHost = null;
+  protected String serverPath = null;
+  
   /** Document Chunk Manager */
-  private DocumentChunkManager documentChunkManager;
+  private DocumentChunkManager documentChunkManager = null;
   
   /** cloudsearch field name for file body text. */
   private static final String FILE_BODY_TEXT_FIELDNAME = "f_bodytext";
@@ -139,7 +139,7 @@ public class AmazonCloudSearchConnector 
       ManifoldCF.getMasterDatabaseUsername(),
       ManifoldCF.getMasterDatabasePassword());
     
-    DocumentChunkManager dcmanager = new DocumentChunkManager(threadContext,mainDatabase);
+    DocumentChunkManager dcmanager = new DocumentChunkManager(mainDatabase);
     dcmanager.install();
   }
 
@@ -192,10 +192,9 @@ public class AmazonCloudSearchConnector 
   public void disconnect()
     throws ManifoldCFException
   {
-    if (poster != null)
-    {
-      poster = null;
-    }
+    serverHost = null;
+    serverPath = null;
+    poster = null;
     super.disconnect();
   }
 
@@ -212,10 +211,10 @@ public class AmazonCloudSearchConnector 
       documentChunkManager = new DocumentChunkManager(currentContext,databaseHandle);
     }
 
-    String serverHost = params.getParameter(AmazonCloudSearchConfig.SERVER_HOST);
+    serverHost = params.getParameter(AmazonCloudSearchConfig.SERVER_HOST);
     if (serverHost == null)
       throw new ManifoldCFException("Server host parameter required");
-    String serverPath = params.getParameter(AmazonCloudSearchConfig.SERVER_PATH);
+    serverPath = params.getParameter(AmazonCloudSearchConfig.SERVER_PATH);
     if (serverPath == null)
       throw new ManifoldCFException("Server path parameter required");
     String proxyProtocol = params.getParameter(AmazonCloudSearchConfig.PROXY_PROTOCOL);
@@ -252,7 +251,7 @@ public class AmazonCloudSearchConnector 
   public String check() throws ManifoldCFException {
     try {
       getSession();
-      String responsbody = postData("[]");
+      String responsbody = postData(new InputStreamReader(new StringReader("[]"),Consts.UTF_8));
       String status = "";
       try
       {
@@ -401,98 +400,60 @@ public class AmazonCloudSearchConnector 
     
     SpecPacker sp = new SpecPacker(outputDescription);
     
-    String jsondata = "";
-    try {
-      //build json..
-      SDFModel model = new SDFModel();
-      Document doc = model.new Document();
-      doc.setType("add");
-      doc.setId(ManifoldCF.hash(documentURI));
-      
-      HashMap fields = new HashMap();
-      Metadata metadata = extractBinaryFile(document, fields);
-      
-      Iterator<String> itr = document.getFields();
-      while(itr.hasNext())
+    String uid = ManifoldCF.hash(documentURI);
+
+    // Build a JSON generator
+    JSONObjectReader objectReader = new JSONObjectReader();
+    // Build the metadata field part
+    JSONObjectReader fieldReader = new JSONObjectReader();
+    // Add the type and ID
+    objectReader.addNameValuePair(new JSONNameValueReader(new JSONStringValue("id"),new JSONStringValue(uid)))
+      .addNameValuePair(new JSONNameValueReader(new JSONStringValue("type"),new JSONStringValue("add")))
+      .addNameValuePair(new JSONNameValueReader(new JSONStringValue("fields"),fieldReader));
+    
+    // Populate the fields...
+    Iterator<String> itr = document.getFields();
+    while (itr.hasNext())
+    {
+      String fieldName = itr.next();
+      Object[] fieldValues = document.getField(fieldName);
+      JSONReader[] elements = new JSONReader[fieldValues.length];
+      if (fieldValues instanceof Reader[])
       {
-        String fName = itr.next();
-        Object[] value = document.getField(fName);
-        String target = sp.getMapping(fName);
-        if(target!=null)
+        for (int i = 0; i < elements.length; i++)
         {
-          fields.put(target, value);
-        }
-        else
-        {
-          if(sp.keepAllMetadata())
-          {
-            fields.put(fName, value);
-          }
+          elements[i] = new JSONStringReader((Reader)fieldValues[i]);
         }
       }
-      
-      //metadata of binary files.
-      String[] metaNames = metadata.names();
-      for(String mName : metaNames){
-        String value = metadata.get(mName);
-        String target = sp.getMapping(mName);
-        if(target!=null)
+      else if (fieldValues instanceof Date[])
+      {
+        for (int i = 0; i < elements.length; i++)
         {
-          fields.put(target, value);
+          elements[i] = new JSONStringReader(((Date)fieldValues[i]).toString());
         }
-        else
+      }
+      else if (fieldValues instanceof String[])
+      {
+        for (int i = 0; i < elements.length; i++)
         {
-          if(sp.keepAllMetadata())
-          {
-            fields.put(mName, value);
-          }
+          elements[i] = new JSONStringReader((String)fieldValues[i]);
         }
       }
-      doc.setFields(fields);
-      model.addDocument(doc);
-      
-      //generate json data.
-      jsondata = model.toJSON();
+      else
+        throw IllegalStateException("Unexpected metadata type: "+fieldValues.getClass().getName());
       
-      documentChunkManager.addDocument(doc.getId(), false, jsondata);
-      return DOCUMENTSTATUS_ACCEPTED;
-    } 
-    catch (SAXException e) {
-      // if document data could not be converted to JSON by jackson.
-      Logging.connectors.debug(e);
-      throw new ManifoldCFException(e);
-    } catch (JsonProcessingException e) {
-      // if document data could not be converted to JSON by jackson.
-      Logging.connectors.debug(e);
-      throw new ManifoldCFException(e);
-    } catch (TikaException e) {
-      // if document could not be parsed by tika.
-      Logging.connectors.debug(e);
-      return DOCUMENTSTATUS_REJECTED;
-    } catch (IOException e) {
-      // if document data could not be read when the document parsing by tika.
-      Logging.connectors.debug(e);
-      throw new ManifoldCFException(e);
+      fieldReader.addNameValuePair(new JSONNameValueReader(new JSONStringReader(fieldName),new JSONArrayReader(elements)));
     }
-  }
-  
-  private Metadata extractBinaryFile(RepositoryDocument document, HashMap fields)
-      throws IOException, SAXException, TikaException {
     
-    //extract body text and metadata fields from binary file.
-    InputStream is = document.getBinaryStream();
-    Parser parser = new AutoDetectParser();
-    ContentHandler handler = new BodyContentHandler();
-    Metadata metadata = new Metadata();
-    parser.parse(is, handler, metadata, new ParseContext());
-    String bodyStr = handler.toString();
-    if(bodyStr != null){
-      bodyStr = handler.toString().replaceAll("\\n", "").replaceAll("\\t", "");
-      fields.put(FILE_BODY_TEXT_FIELDNAME, bodyStr);
-    }
-    return metadata;
+    // Add the primary content data in.
+    fieldReader.addNameValuePair(new JSONNameValueReader(new JSONStringReader(FILE_BODY_TEXT_FIELDNAME),
+      new JSONStringReader(new InputStreamReader(document.getBinaryStream(),Consts.UTF_8))));
+    
+    documentChunkManager.addOrReplaceDocument(uid, serverHost, serverPath, new ReaderInputStream(objectReader, Consts.UTF_8));
+    
+    return DOCUMENTSTATUS_ACCEPTED;
   }
-
+  
   /** Remove a document using the connector.
   * Note that the last outputDescription is included, since it may be necessary for the connector to use such information to know how to properly remove the document.
   *@param documentURI is the URI of the document.  The URI is presumed to be the unique identifier which the output data store will use to process
@@ -507,20 +468,15 @@ public class AmazonCloudSearchConnector 
     // Establish a session
     getSession();
     
-    String jsonData = "";
-    try {
-      SDFModel model = new SDFModel();
-      SDFModel.Document doc = model.new Document();
-      doc.setType("delete");
-      doc.setId(ManifoldCF.hash(documentURI));
-      model.addDocument(doc);
-      jsonData = model.toJSON();
-      
-      documentChunkManager.addDocument(doc.getId(), false, jsonData);
-      
-    } catch (JsonProcessingException e) {
-      throw new ManifoldCFException(e);
-    }
+    String uid = ManifoldCF.hash(documentURI);
+
+    // Build a JSON generator
+    JSONObjectReader objectReader = new JSONObjectReader();
+    // Add the type and ID
+    objectReader.addNameValuePair(new JSONNameValueReader(new JSONStringValue("id"),new JSONStringValue(uid)))
+      .addNameValuePair(new JSONNameValueReader(new JSONStringValue("type"),new JSONStringValue("delete")))
+
+    documentChunkManager.removeDocument(uid, serverHost, serverPath, new ReaderInputStream(objectReader, Consts.UTF_8));
   }
   
   @Override
@@ -528,32 +484,83 @@ public class AmazonCloudSearchConnector 
       throws ManifoldCFException, ServiceInterruption {
     getSession();
     
-    final int chunkNum = documentChunkManager.getDocumentChunkNum();
-    
-    for(int i = 0;i<chunkNum;i++)
+    // Repeat until we are empty of cached stuff
+    int chunkNumber = 0;
+    while (true)
     {
-      String chunk = documentChunkManager.buildDocumentChunk(String.valueOf(i), i);
-      
-      //post data..
-      String responsbody = postData(chunk);
-      // check status
-      String status = getStatusFromJsonResponse(responsbody);
-      if("success".equals(status))
+      DocumentRecord[] records = documentChunkManager.readChunk(serverHost, serverPath, 1000);
+      try
       {
-        int n = i + 1;
-        Logging.connectors.info("successfully send document chunk " + n + " of " + chunkNum);
+        if (records.length == 0)
+          break;
+        // The records consist of up to 1000 individual input streams, which must be all concatenated together into the post
+        // To do that, we've got a composite input stream object we glue all these together with.
+        CompositeInputStream cis = new CompositeInputStream();
+        for (DocumentRecord dr : records)
+        {
+          cis.addStream(dr.getDataStream());
+        }
         
-        //remove documents from table..
-        documentChunkManager.removeDocumentsInChunk(String.valueOf(i));
+        //post data..
+        String responsbody = postData(cis);
+        // check status
+        String status = getStatusFromJsonResponse(responsbody);
+        if("success".equals(status))
+        {
+          Logging.connectors.info("AmazonCloudSearch: Successfully sent document chunk " + chunkNumber);
+          //remove documents from table..
+          documentChunkManager.deleteChunk(records);
+        }
+        else
+        {
+          Logging.connectors.error("AmazonCloudSearch: Error sending document chunk "+ chunkNumber+": "+ responseBody);
+          throw new ManifoldCFException("recieved error status from service after feeding document. response body : " + responsbody);
+        }
       }
-      else
+      finally
       {
-        throw new ManifoldCFException("recieved error status from service after feeding document. response body : " + responsbody);
+        Throwable exception = null;
+        for (DocumentRecord dr : records)
+        {
+          try
+          {
+            dr.close();
+          }
+          catch (Throwable e)
+          {
+            exception = e;
+          }
+        }
+        if (exception != null)
+        {
+          if (exception instanceof ManifoldCFException)
+            throw (ManifoldCFException)exception;
+          else if (exception instanceof Error)
+            throw (Error)exception;
+          else if (exception instanceof RuntimeException)
+            throw (RuntimeException)exception;
+          else
+            throw new RuntimeException("Unknown exception class thrown: "+exception.getClass().getName()+": "+exception.getMessage(),e);
+        }
       }
-      
     }
   }
 
+  protected static class CompositeInputStream extends InputStream
+  {
+    
+    public CompositeInputStream()
+    {
+    }
+    
+    public void addStream(InputStream stream)
+    {
+      // MHL
+    }
+    
+    // MHL
+  }
+  
   /**
    * Fill in a Server tab configuration parameter map for calling a Velocity
    * template.
@@ -699,10 +706,10 @@ public class AmazonCloudSearchConnector 
     return null;
   }
 
-  private String postData(String jsonData) throws ServiceInterruption, ManifoldCFException {
+  private String postData(InputStream jsonData) throws ServiceInterruption, ManifoldCFException {
     CloseableHttpClient httpclient = HttpClients.createDefault();
     try {
-      poster.setEntity(new StringEntity(jsonData, Consts.UTF_8));
+      poster.setEntity(new InputStreamEntity(jsonData));
       HttpResponse res = httpclient.execute(poster);
       
       HttpEntity resEntity = res.getEntity();

Modified: manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentChunkManager.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentChunkManager.java?rev=1604051&r1=1604050&r2=1604051&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentChunkManager.java (original)
+++ manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentChunkManager.java Fri Jun 20 00:01:11 2014
@@ -44,7 +44,6 @@ public class DocumentChunkManager extend
   private final static String UID_FIELD = "uid";                        // This is the document key, which is a dochash value
   private final static String HOST_FIELD = "serverhost";            // The host and path are there to make sure we don't collide between connections
   private final static String PATH_FIELD = "serverpath";
-  private final static String ON_DELETE_FIELD = "ondelete";
   private final static String SDF_DATA_FIELD = "sdfdata";
   
   public DocumentChunkManager(
@@ -69,7 +68,6 @@ public class DocumentChunkManager extend
         map.put(UID_FIELD,new ColumnDescription("VARCHAR(40)",false,false,null,null,false));
         map.put(HOST_FIELD,new ColumnDescription("VARCHAR(255)",false,false,null,null,false));
         map.put(PATH_FIELD,new ColumnDescription("VARCHAR(255)",false,false,null,null,false));
-        map.put(ON_DELETE_FIELD,new ColumnDescription("CHAR(1)",false,false,null,null,false));
         map.put(SDF_DATA_FIELD,new ColumnDescription("BLOB",false,true,null,null,false));
         performCreate(map,null);
       }
@@ -113,89 +111,12 @@ public class DocumentChunkManager extend
   }
   
   /**
-  * Remove document information in table (and make a delete marker).
-  * @param uid document uid
-  */
-  public void removeDocument(String uid, String host, String path)
-    throws ManifoldCFException
-  {
-    while (true)
-    {
-      long sleepAmt = 0L;
-      try
-      {
-        beginTransaction();
-        try
-        {
-
-          ArrayList params = new ArrayList();
-          String query = buildConjunctionClause(params,new ClauseDescription[]{
-            new UnitaryClause(HOST_FIELD,host),
-            new UnitaryClause(PATH_FIELD,path),
-            new UnitaryClause(UID_FIELD,uid)});
-
-          IResultSet set = performQuery("SELECT "+UID_FIELD+" FROM "+getTableName()+" WHERE "+
-            query+" FOR UPDATE",params,null,null);
-            
-          Map<String,String> parameterMap = new HashMap<String,String>();
-          parameterMap.put(ON_DELETE_FIELD, "1");
-          parameterMap.put(SDF_DATA_FIELD, null);
-            
-          //if record exists on table, update record.
-          if(set.getRowCount() > 0)
-          {
-            performUpdate(parameterMap, " WHERE "+query, whereParameters, null);
-          }
-          else
-          {
-            parameterMap.put(UID_FIELD, uid);
-            parameterMap.put(HOST_FIELD, host);
-            parameterMap.put(PATH_FIELD, path);
-            performInsert(parameterMap, null);
-          }
-
-          break;
-        }
-        catch (ManifoldCFException e)
-        {
-          signalRollback();
-          throw e;
-        }
-        catch (RuntimeException e)
-        {
-          signalRollback();
-          throw e;
-        }
-        catch (Error e)
-        {
-          signalRollback();
-          throw e;
-        }
-        finally
-        {
-          endTransaction();
-        }
-      }
-      catch (ManifoldCFException e)
-      {
-        // Look for deadlock and retry if so
-        if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
-        {
-          sleepAmt = getSleepAmt();
-          continue;
-        }
-        throw e;
-      }
-    }
-  }
-  
-  /**
-   * Add/replace document information in table.
+   * Record document information for later trasmission to Amazon.
    * @param uid documentuid
    * @param sdfData document SDF data.
    * @throws ManifoldCFException
    */
-  public void addOrReplaceDocument(String uid, String host, String path, InputStream sdfData) 
+  public void recordDocument(String uid, String host, String path, InputStream sdfData) 
       throws ManifoldCFException, IOException
   {
     TempFileInput tfi = null;
@@ -232,7 +153,6 @@ public class DocumentChunkManager extend
               query+" FOR UPDATE",params,null,null);
             
             Map<String,String> parameterMap = new HashMap<String,String>();
-            parameterMap.put(ON_DELETE_FIELD, "0");
             parameterMap.put(SDF_DATA_FIELD, tfi);
             
             //if record exists on table, update record.
@@ -307,7 +227,6 @@ public class DocumentChunkManager extend
       IResultRow row = set.getRow(i);
       rval[i] = new DocumentRecord(host,path,
         (String)row.getValue(UID_FIELD),
-        ((String)row.getValue(ON_DELETE_FIELD)).equals("1"),
         (BinaryInput)row.getValue(SDF_DATA_FIELD));
     }
     return rval;

Modified: manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentRecord.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentRecord.java?rev=1604051&r1=1604050&r2=1604051&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentRecord.java (original)
+++ manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentRecord.java Fri Jun 20 00:01:11 2014
@@ -27,15 +27,13 @@ public class DocumentRecord {
   protected final String host;
   protected final String path;
   protected final String uid;
-  protected final boolean delete;
   protected final BinaryInput data;
   
-  public DocumentRecord(String host, String path, String uid, boolean delete, BinaryInput data)
+  public DocumentRecord(String host, String path, String uid, BinaryInput data)
   {
     this.host = host;
     this.path = path;
     this.uid = uid;
-    this.delete = delete;
     this.data = data;
   }
 
@@ -54,11 +52,6 @@ public class DocumentRecord {
     return uid;
   }
   
-  public boolean getDelete()
-  {
-    return delete;
-  }
-  
   public long getStreamLength()
     throws ManifoldCFException
   {