You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2014/05/27 15:44:13 UTC

svn commit: r1597785 - in /manifoldcf/branches/CONNECTORS-916/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch: AmazonCloudSearchConnector.java DocumentChunkManager.java

Author: kwright
Date: Tue May 27 13:44:12 2014
New Revision: 1597785

URL: http://svn.apache.org/r1597785
Log:
Patch from Takumi

Added:
    manifoldcf/branches/CONNECTORS-916/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentChunkManager.java
Modified:
    manifoldcf/branches/CONNECTORS-916/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/AmazonCloudSearchConnector.java

Modified: manifoldcf/branches/CONNECTORS-916/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/AmazonCloudSearchConnector.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-916/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/AmazonCloudSearchConnector.java?rev=1597785&r1=1597784&r2=1597785&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-916/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/AmazonCloudSearchConnector.java (original)
+++ manifoldcf/branches/CONNECTORS-916/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/AmazonCloudSearchConnector.java Tue May 27 13:44:12 2014
@@ -42,21 +42,9 @@ import org.apache.http.client.config.Req
 import org.apache.http.client.methods.HttpPost;
 import org.apache.http.entity.StringEntity;
 import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClients;
-import org.apache.http.util.EntityUtils;
-import org.apache.manifoldcf.agents.interfaces.IOutputAddActivity;
-import org.apache.manifoldcf.agents.interfaces.IOutputRemoveActivity;
-import org.apache.manifoldcf.agents.interfaces.OutputSpecification;
-import org.apache.manifoldcf.agents.interfaces.RepositoryDocument;
-import org.apache.manifoldcf.agents.interfaces.ServiceInterruption;
+import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.apache.manifoldcf.agents.interfaces.IOutputAddActivity;
import org.apache.manifoldcf.agents.interfaces.IOutputNotifyActivity;
import org.apache.manifoldcf.agents.interfaces.IOutputRemoveActivity;
import org.apache.manifoldcf.agents.interfaces.OutputSpecification;
import org.apache.manifoldcf.agents.interfaces.RepositoryDocument;
import org.apache.manifoldcf.agents.interfaces.ServiceInterruption;
 import org.apache.manifoldcf.agents.output.BaseOutputConnector;
-import org.apache.manifoldcf.agents.output.amazoncloudsearch.SDFModel.Document;
-import org.apache.manifoldcf.core.interfaces.ConfigParams;
-import org.apache.manifoldcf.core.interfaces.ConfigurationNode;
-import org.apache.manifoldcf.core.interfaces.ManifoldCFException;
-import org.apache.manifoldcf.core.interfaces.IThreadContext;
-import org.apache.manifoldcf.core.interfaces.IHTTPOutput;
-import org.apache.manifoldcf.core.interfaces.IPostParameters;
+import org.apache.manifoldcf.agents.output.amazoncloudsearch.SDFModel.Document;
import org.apache.manifoldcf.core.interfaces.ConfigParams;
import org.apache.manifoldcf.core.interfaces.ConfigurationNode;
import org.apache.manifoldcf.core.interfaces.DBInterfaceFactory;
import org.apache.manifoldcf.core.interfaces.IDBInterface;
import org.apache.manifoldcf.core.interfaces.ManifoldCFException;
import org.apache.manifoldcf.core.interfaces.IThreadContext;
import org.apache.manifoldcf.core.interfaces.IHTTPOutput;
import org.apache.manifoldcf.core.interfaces.IPostParameters;
 import org.apache.manifoldcf.core.interfaces.IPasswordMapperActivity;
 import org.apache.manifoldcf.core.interfaces.SpecificationNode;
 import org.apache.manifoldcf.core.system.ManifoldCF;
@@ -103,21 +91,9 @@ public class AmazonCloudSearchConnector 
   
   private static final String VIEW_SPECIFICATION_HTML = "viewSpecification.html";
   
-  /** Local connection */
-  protected HttpPost poster = null;
-  
-  /** cloudsearch field name for file body text. */
-  private static final String FILE_BODY_TEXT_FIELDNAME = "f_bodytext";
-  
-  /** Constructor.
+  /** Local connection */
  protected HttpPost poster = null;
  
  /** Document Chunk Manager */
  private DocumentChunkManager documentChunkManager;
  
  /** cloudsearch field name for file body text. */
  private static final String FILE_BODY_TEXT_FIELDNAME = "f_bodytext";
  
  /** Constructor.
    */
-  public AmazonCloudSearchConnector(){
-  }
-  
-  /** Return the list of activities that this connector supports (i.e. writes into the log).
-  *@return the list.
-  */
-  @Override
+  public AmazonCloudSearchConnector(){
  }
  
  @Override
  public void install(IThreadContext threadContext) 
      throws ManifoldCFException
  {
    IDBInterface mainDatabase = DBInterfaceFactory.make(threadContext,
      ManifoldCF.getMasterDatabaseName(),
      ManifoldCF.getMasterDatabaseUsername(),
      ManifoldCF.getMasterDatabasePassword());
    
    DocumentChunkManager dcmanager = new DocumentChunkManager(threadContext,mainDatabase);
    dcmanager.install();
  }

  @Override
  public void deinstall(IThreadContext threadContext)
      throws ManifoldCFException
  {
    IDBInterface mainDatabase = DBInterfaceFactory.make(threadContext,
        ManifoldCF.getMasterDatabaseName(),
        ManifoldCF.getMasterDatabaseUsername(),
        ManifoldCF.getMasterDatabasePassword());
      
    DocumentChunkManager dcmanager = new DocumentChunkManager(threadContext,mainDatabase);
    dcmanager.deinstall();
  }

  /** Return the list of activities that this connector supports (i.e. writes into the log).
  *@return the list.
  */
  @Override
   public String[] getActivitiesList()
   {
     return new String[]{INGEST_ACTIVITY,REMOVE_ACTIVITY};
@@ -158,13 +134,7 @@ public class AmazonCloudSearchConnector 
   }
 
   /** Set up a session */
-  protected void getSession()
-    throws ManifoldCFException
-  {
-    String serverHost = params.getParameter(AmazonCloudSearchConfig.SERVER_HOST);
-    if (serverHost == null)
-      throw new ManifoldCFException("Server host parameter required");
-    String serverPath = params.getParameter(AmazonCloudSearchConfig.SERVER_PATH);
+  protected void getSession()
    throws ManifoldCFException
  {
    if (documentChunkManager == null)
    {
      IDBInterface databaseHandle = DBInterfaceFactory.make(currentContext,
        ManifoldCF.getMasterDatabaseName(),
        ManifoldCF.getMasterDatabaseUsername(),
        ManifoldCF.getMasterDatabasePassword());
      documentChunkManager = new DocumentChunkManager(currentContext,databaseHandle);
    }
    
    String serverHost = params.getParameter(AmazonCloudSearchConfig.SERVER_HOST);
    if (serverHost == null)
      throw new ManifoldCFException("Server host parameter required");
    String serverPath = params.getParameter(AmazonCloudSearchConfig.SERVER_PATH);
     if (serverPath == null)
       throw new ManifoldCFException("Server path parameter required");
     String proxyProtocol = params.getParameter(AmazonCloudSearchConfig.PROXY_PROTOCOL);
@@ -190,14 +160,7 @@ public class AmazonCloudSearchConnector 
         throw new ManifoldCFException("Number format exception: "+e.getMessage(),e);
       }
     }
-    
-    poster.addHeader("Content-Type", "application/json");
-  }
-
-  /** Test the connection.  Returns a string describing the connection integrity.
-  *@return the connection's status as a displayable string.
-  */
-  @Override
+    
    poster.addHeader("Content-Type", "application/json");
  }
  
  /** Test the connection.  Returns a string describing the connection integrity.
  *@return the connection's status as a displayable string.
  */
  @Override
   public String check() throws ManifoldCFException {
     try {
       getSession();
@@ -398,14 +361,7 @@ public class AmazonCloudSearchConnector 
         }
       }
       doc.setFields(fields);
-      model.addDocument(doc);
-      
-      //generate json data.
-      jsondata = model.toJSON();
-    } 
-    catch (SAXException e) {
-      // if document data could not be converted to JSON by jackson.
-      Logging.connectors.debug(e);
+      model.addDocument(doc);
      
      //generate json data.
      jsondata = model.toJSON();
      
      documentChunkManager.addDocument(doc.getId(), false, jsondata);
      return DOCUMENTSTATUS_ACCEPTED;
    } 
    catch (SAXException e) {
      // if document data could not be converted to JSON by jackson.
      Logging.connectors.debug(e);
       throw new ManifoldCFException(e);
     } catch (JsonProcessingException e) {
       // if document data could not be converted to JSON by jackson.
@@ -417,29 +373,7 @@ public class AmazonCloudSearchConnector 
       return DOCUMENTSTATUS_REJECTED;
     } catch (IOException e) {
       // if document data could not be read when the document parsing by tika.
-      Logging.connectors.debug(e);
-      throw new ManifoldCFException(e);
-    }
-    
-    //post data..
-    String responsbody = postData(jsondata);
-    
-    // check status
-    String status = getStatusFromJsonResponse(responsbody);
-    if("success".equals(status))
-    {
-      activities.recordActivity(null,INGEST_ACTIVITY,new Long(document.getBinaryLength()),documentURI,"OK",null);
-      return DOCUMENTSTATUS_ACCEPTED;
-    }
-    else {
-      throw new ManifoldCFException("recieved error status from service after feeding document. response body : " + responsbody);
-    }
-  }
-
-  private Metadata extractBinaryFile(RepositoryDocument document, HashMap fields)
-      throws IOException, SAXException, TikaException {
-    
-    //extract body text and metadata fields from binary file.
+      Logging.connectors.debug(e);
      throw new ManifoldCFException(e);
    }
  }
  
  private Metadata extractBinaryFile(RepositoryDocument document, HashMap fields)
      throws IOException, SAXException, TikaException {
    
    //extract body text and metadata fields from binary file.
     InputStream is = document.getBinaryStream();
     Parser parser = new AutoDetectParser();
     ContentHandler handler = new BodyContentHandler();
@@ -469,31 +403,7 @@ public class AmazonCloudSearchConnector 
     
     String jsonData = "";
     try {
-      SDFModel model = new SDFModel();
-      SDFModel.Document doc = model.new Document();
-      doc.setType("delete");
-      doc.setId(documentURI);
-      model.addDocument(doc);
-      jsonData = model.toJSON();
-    } catch (JsonProcessingException e) {
-      throw new ManifoldCFException(e);
-    }
-    String responsbody = postData(jsonData);
-    
-    // check status
-    String status = getStatusFromJsonResponse(responsbody);
-    if("success".equals(status))
-    {
-      activities.recordActivity(null,REMOVE_ACTIVITY,null,documentURI,"OK",null);
-    }
-    else {
-      throw new ManifoldCFException("recieved error status from service after feeding document.");
-    }
-  }
-
-  /**
-   * Fill in a Server tab configuration parameter map for calling a Velocity
-   * template.
+      SDFModel model = new SDFModel();
      SDFModel.Document doc = model.new Document();
      doc.setType("delete");
      doc.setId(ManifoldCF.hash(documentURI));
      model.addDocument(doc);
      jsonData = model.toJSON();
      
      documentChunkManager.addDocument(doc.getId(), false, jsonData);
      
    } catch (JsonProcessingException e) {
      throw new ManifoldCFException(e);
    }
  }
  
  @Override
  public void noteJobComplete(IOutputNotifyActivity activities)
      throws ManifoldCFException, ServiceInterruption {
    getSession();
    
    final int chunkNum = documentChunkManager.getDocumentChunkNum();
    
    for(int i = 0;i<chunkNum;i++)
    {
      String chunk = documentChunkManager.buildDocumentChunk(String.valueOf(i), i);
      
      //post data..
      String responsbody = postData(chunk);
      // check status
      String status = getStatusFromJsonResponse(responsbody);
      if("success".equals(status))
      {
        int n = i + 1;
        Logging.connectors.info("successfully send document chunk " + n + " of " + chunkNum);
        
        //remove documents from table..
        documentChunkManager.removeDocumentsInChunk(String.valueOf(i));
      }
      else
      {
        throw new ManifoldCFException("recieved error status from service after feeding document. response body : " + responsbody);
      }
      
    }
  }

  /**
   * Fill in a Server tab configuration parameter map for calling a Velocity
   * template.
    *
    * @param newMap is the map to fill in
    * @param parameters is the current set of configuration parameters

Added: manifoldcf/branches/CONNECTORS-916/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentChunkManager.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-916/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentChunkManager.java?rev=1597785&view=auto
==============================================================================
--- manifoldcf/branches/CONNECTORS-916/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentChunkManager.java (added)
+++ manifoldcf/branches/CONNECTORS-916/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentChunkManager.java Tue May 27 13:44:12 2014
@@ -0,0 +1 @@
+/* $Id$ */

/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.manifoldcf.agents.output.amazoncloudsearch;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.manifoldcf.core.interfaces.ColumnDescription;
import org.apache.manifoldcf.core.interfaces.IDBInterface;
import org.apache.manifoldcf.core.interfaces.IResultRow;
import org.apache.manifoldcf.core.interfaces.IResultSet;
import org.apache.manifoldcf.core.interfaces.IThreadContext;
import org.apache.manifoldcf.core.interfaces.ManifoldCFException;
import org.apache.manifoldcf.core.interfaces.StringSet;
import org.exolab.castor.util.Iterator;

public class DocumentChunkManager extends org.apache.manifoldcf.core.database.BaseTable
{
  // Database fields
  private final static String UID_FIELD = "uid";
  private final static String ON_DELETE_FIELD = "ondelete";
  private final static String SDF_DATA_FIELD = "sdfdata";
  
  private final int MAX_DOCNUM_IN_CHUNK = 5000;
  private final int MAX_DOCBYTES_IN_CHUNK = 5242880; //5MB
  
  Map<String, List<String>> uidMap = new HashMap<String, List<String>>();
  
  public DocumentChunkManager(IThreadContext threadContext,
      IDBInterface database)
  {
    super(database, "amazoncloudsearch_documentdata");
  }

  /** Install the manager 
   * @throws ManifoldCFException 
   */
  public void install() throws ManifoldCFException
  {
    // Standard practice: outer loop on install methods, no transactions
    while (true)
    {
      Map existing = getTableSchema(null,null);
      if (existing == null)
      {
        // Install the table.
        HashMap map = new HashMap();
        map.put(UID_FIELD,new ColumnDescription("VARCHAR(255)",true,false,null,null,false));
        map.put(ON_DELETE_FIELD,new ColumnDescription("CHAR(1)",false,false,null,null,false));
        map.put(SDF_DATA_FIELD,new ColumnDescription("CLOB",false,true,null,null,false));
        performCreate(map,null);
      }
      else
      {
        // Upgrade code, if needed, goes here
      }

      // Handle indexes, if needed
      
      break;
    }
  }
  
  /** Uninstall the manager.
  */
  public void deinstall()
    throws ManifoldCFException
  {
    performDrop(null);
  }
  
  /** Perform a table creation operation.
  *@param columnMap is the map describing the columns and types.  NOTE that these are abstract
  * types, which will be mapped to the proper types for the actual database inside this
  * layer.
  *@param invalidateKeys are the cache keys that should be invalidated, if any.
  */
  protected void performCreate(Map columnMap, StringSet invalidateKeys)
    throws ManifoldCFException
  {
    dbInterface.performCreate(tableName,columnMap,invalidateKeys);
  }
  
  /**
   * Add document infomation on table.
   * @param uid documentuid
   * @param onDelete if the document delete from Amazon CloudSearch, set true.
   * @param sdfData document SDF data.
   * @throws ManifoldCFException
   */
  public void addDocument(String uid, boolean onDelete, String sdfData) 
      throws ManifoldCFException
  {
    ArrayList params = new ArrayList();
    params.add(uid);
    IResultSet set = performQuery("SELECT * FROM "+getTableName()+" WHERE "+
        UID_FIELD+"=?",params,null,null);
    
    Map<String,String> parameterMap = new HashMap<String,String>();
    parameterMap.put(UID_FIELD, uid);
    if(onDelete)
    {
      parameterMap.put(ON_DELETE_FIELD, "1");
    }
    else 
    {
      parameterMap.put(ON_DELETE_FIELD, "0");
    }
    parameterMap.put(SDF_DATA_FIELD, sdfData);
    
    //if record exists on table, update record.
    if(set.getRowCount() > 0)
    {
      List<String> whereParameters = new ArrayList<String>();
      whereParameters.add(uid);
      performUpdate(parameterMap, " WHERE "+UID_FIELD+"=?", whereParameters, null);
    }
    else
    {
      performInsert(parameterMap, null);
    }
  }
  
  /**
   * get document chunk number.
   * @return number of document chunk.
   * @throws ManifoldCFException
   */
  public int getDocumentChunkNum() throws ManifoldCFException{
    String query = "select count(*) as COUNT from " + tableName;
    IResultSet resultset = performQuery(query, null, null, null);
    Object value = resultset.getRow(0).getValue("count");
    if(value == null)
    {
      return 0;
    }
    
    int count = (Integer)value;
    if(count <= 0)
    {
      return 0;
    }
    else
    {
      return (count/MAX_DOCNUM_IN_CHUNK)+1;
    }
  }
  
  /**
   * build document chunk (sdf formatted) from Table with chunkid which is given.
   * @param chunkid 
   * @return document chunk (sdf formatted)
   * @throws ManifoldCFException 
   */
  public String buildDocumentChunk(String chunkid, int chunkIdx) throws ManifoldCFException
  {
    int idxFrom = chunkIdx * MAX_DOCNUM_IN_CHUNK;
    int idxTo = idxFrom + MAX_DOCNUM_IN_CHUNK;
    
    String query = "SELECT * FROM ("
        + "SELECT ROW_NUMBER() OVER() as RN, " 
        + UID_FIELD +", " + ON_DELETE_FIELD + ", " + SDF_DATA_FIELD 
        + " FROM " + tableName
        + " ORDER BY " + UID_FIELD +" ) "
        + "as E WHERE E.RN BETWEEN " + idxFrom 
        + " AND "+ idxTo;
    
    IResultSet resultset = performQuery(query, null, null, null);
    
    int currentSize = 0;
    StringBuffer sdfChunk = new StringBuffer();
    List<String> uidListInChunk = new ArrayList<String>();
    for(int i=0;i<resultset.getRowCount();i++)
    {
      IResultRow result = resultset.getRow(i);
      String uidValue = (String)result.getValue(UID_FIELD);
      String onDeleteValue = (String)result.getValue(ON_DELETE_FIELD);
      String sdfValue = (String)result.getValue(SDF_DATA_FIELD);
      
      // if sdf size is over max size, break the loop. 
      currentSize = currentSize + sdfValue.getBytes().length;
      if(currentSize > MAX_DOCBYTES_IN_CHUNK)
      {
        break;
      }
      
      //append sdf value to chunk.
      sdfChunk.append(sdfValue);
      uidListInChunk.add(uidValue);
    }
    
    uidMap.put(chunkid, uidListInChunk);
    return sdfChunk.toString();
  }
  
  /**
   * remvoe documents in chunk.
   * @param chunkid
   * @throws ManifoldCFException
   */
  public void removeDocumentsInChunk(String chunkid)
    throws ManifoldCFException
  {
    List<String> uidList = uidMap.get(chunkid);
    for(String uid : uidList)
    {
      List<String> param = new ArrayList<String>();
      param.add(uid);
      performDelete("WHERE " + UID_FIELD + "=?", param, null);
    }
    uidMap.remove(chunkid);
  }
}
\ No newline at end of file