You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2014/06/20 00:24:21 UTC

svn commit: r1604042 - /manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/

Author: kwright
Date: Thu Jun 19 22:24:20 2014
New Revision: 1604042

URL: http://svn.apache.org/r1604042
Log:
Revamp DocumentChunkManager; this breaks the build

Added:
    manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentRecord.java   (with props)
Modified:
    manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/AmazonCloudSearchConnector.java   (contents, props changed)
    manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentChunkManager.java   (contents, props changed)
    manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/SDFModel.java   (props changed)

Modified: manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/AmazonCloudSearchConnector.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/AmazonCloudSearchConnector.java?rev=1604042&r1=1604041&r2=1604042&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/AmazonCloudSearchConnector.java (original)
+++ manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/AmazonCloudSearchConnector.java Thu Jun 19 22:24:20 2014
@@ -17,16 +17,16 @@
 * limitations under the License.
 */
 package org.apache.manifoldcf.agents.output.amazoncloudsearch;
-
-import java.io.IOException;
-import java.io.InputStream;
+
+import java.io.IOException;
+import java.io.InputStream;
 import java.io.InterruptedIOException;
 import java.io.StringReader;
-import java.io.BufferedReader;
-import java.util.ArrayList;
-import java.util.HashMap;
+import java.io.BufferedReader;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Iterator;
-import java.util.List;
+import java.util.List;
 import java.util.Map;
 import java.util.Locale;
 import java.util.Set;
@@ -35,21 +35,30 @@ import java.util.HashSet;
 import org.apache.commons.io.FilenameUtils;
 import org.apache.http.Consts;
 import org.apache.http.HttpEntity;
-import org.apache.http.HttpHost;
-import org.apache.http.HttpResponse;
-import org.apache.http.client.ClientProtocolException;
-import org.apache.http.client.config.RequestConfig;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.entity.StringEntity;
+import org.apache.http.HttpHost;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.ClientProtocolException;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClients;
 import org.apache.http.util.EntityUtils;
-import org.apache.manifoldcf.agents.interfaces.IOutputAddActivity;
import org.apache.manifoldcf.agents.interfaces.IOutputNotifyActivity;
import org.apache.manifoldcf.agents.interfaces.IOutputRemoveActivity;
import org.apache.manifoldcf.agents.interfaces.RepositoryDocument;
+import org.apache.manifoldcf.agents.interfaces.IOutputAddActivity;
+import org.apache.manifoldcf.agents.interfaces.IOutputNotifyActivity;
+import org.apache.manifoldcf.agents.interfaces.IOutputRemoveActivity;
+import org.apache.manifoldcf.agents.interfaces.RepositoryDocument;
 import org.apache.manifoldcf.agents.interfaces.ServiceInterruption;
 import org.apache.manifoldcf.agents.output.BaseOutputConnector;
 import org.apache.manifoldcf.agents.output.amazoncloudsearch.SDFModel.Document;
 import org.apache.manifoldcf.core.interfaces.Specification;
-import org.apache.manifoldcf.core.interfaces.ConfigParams;
import org.apache.manifoldcf.core.interfaces.ConfigurationNode;
import org.apache.manifoldcf.core.interfaces.DBInterfaceFactory;
import org.apache.manifoldcf.core.interfaces.IDBInterface;
import org.apache.manifoldcf.core.interfaces.ManifoldCFException;
import org.apache.manifoldcf.core.interfaces.IThreadContext;
import org.apache.manifoldcf.core.interfaces.IHTTPOutput;
+import org.apache.manifoldcf.core.interfaces.ConfigParams;
+import org.apache.manifoldcf.core.interfaces.ConfigurationNode;
+import org.apache.manifoldcf.core.interfaces.DBInterfaceFactory;
+import org.apache.manifoldcf.core.interfaces.IDBInterface;
+import org.apache.manifoldcf.core.interfaces.ManifoldCFException;
+import org.apache.manifoldcf.core.interfaces.IThreadContext;
+import org.apache.manifoldcf.core.interfaces.IHTTPOutput;
 import org.apache.manifoldcf.core.interfaces.IPostParameters;
 import org.apache.manifoldcf.core.interfaces.IPasswordMapperActivity;
 import org.apache.manifoldcf.core.interfaces.SpecificationNode;
@@ -97,9 +106,20 @@ public class AmazonCloudSearchConnector 
   
   private static final String VIEW_SPECIFICATION_HTML = "viewSpecification.html";
   
-  /** Local connection */
  protected HttpPost poster = null;
  
  /** Document Chunk Manager */
  private DocumentChunkManager documentChunkManager;
  
  /** cloudsearch field name for file body text. */
  private static final String FILE_BODY_TEXT_FIELDNAME = "f_bodytext";
  
  /** Constructor.
+  /** Local connection */
+  protected HttpPost poster = null;
+  
+  /** Document Chunk Manager */
+  private DocumentChunkManager documentChunkManager;
+  
+  /** cloudsearch field name for file body text. */
+  private static final String FILE_BODY_TEXT_FIELDNAME = "f_bodytext";
+  
+  /** Constructor.
    */
-  public AmazonCloudSearchConnector(){
  }
  
+  public AmazonCloudSearchConnector(){
+  }
+  
   /** Clear out any state information specific to a given thread.
   * This method is called when this object is returned to the connection pool.
   */
@@ -109,7 +129,37 @@ public class AmazonCloudSearchConnector 
     super.clearThreadContext();
     documentChunkManager = null;
   }
-
  @Override
  public void install(IThreadContext threadContext) 
      throws ManifoldCFException
  {
    IDBInterface mainDatabase = DBInterfaceFactory.make(threadContext,
      ManifoldCF.getMasterDatabaseName(),
      ManifoldCF.getMasterDatabaseUsername(),
      ManifoldCF.getMasterDatabasePassword());
    
    DocumentChunkManager dcmanager = new DocumentChunkManager(threadContext,mainDatabase);
    dcmanager.install();
  }

  @Override
  public void deinstall(IThreadContext threadContext)
      throws ManifoldCFException
  {
    IDBInterface mainDatabase = DBInterfaceFactory.make(threadContext,
        ManifoldCF.getMasterDatabaseName(),
        ManifoldCF.getMasterDatabaseUsername(),
        ManifoldCF.getMasterDatabasePassword());
      
    DocumentChunkManager dcmanager = new DocumentChunkManager(threadContext,mainDatabase);
    dcmanager.deinstall();
  }

  /** Return the list of activities that this connector supports (i.e. writes into the log).
  *@return the list.
  */
  @Override
+
+  @Override
+  public void install(IThreadContext threadContext) 
+      throws ManifoldCFException
+  {
+    IDBInterface mainDatabase = DBInterfaceFactory.make(threadContext,
+      ManifoldCF.getMasterDatabaseName(),
+      ManifoldCF.getMasterDatabaseUsername(),
+      ManifoldCF.getMasterDatabasePassword());
+    
+    DocumentChunkManager dcmanager = new DocumentChunkManager(threadContext,mainDatabase);
+    dcmanager.install();
+  }
+
+  @Override
+  public void deinstall(IThreadContext threadContext)
+      throws ManifoldCFException
+  {
+    IDBInterface mainDatabase = DBInterfaceFactory.make(threadContext,
+        ManifoldCF.getMasterDatabaseName(),
+        ManifoldCF.getMasterDatabaseUsername(),
+        ManifoldCF.getMasterDatabasePassword());
+      
+    DocumentChunkManager dcmanager = new DocumentChunkManager(threadContext,mainDatabase);
+    dcmanager.deinstall();
+  }
+
+  /** Return the list of activities that this connector supports (i.e. writes into the log).
+  *@return the list.
+  */
+  @Override
   public String[] getActivitiesList()
   {
     return new String[]{INGEST_ACTIVITY,REMOVE_ACTIVITY};
@@ -150,7 +200,10 @@ public class AmazonCloudSearchConnector 
   }
 
   /** Set up a session */
-  protected void getSession()
    throws ManifoldCFException
  {
    if (documentChunkManager == null)
+  protected void getSession()
+    throws ManifoldCFException
+  {
+    if (documentChunkManager == null)
     {
       IDBInterface databaseHandle = DBInterfaceFactory.make(currentContext,
         ManifoldCF.getMasterDatabaseName(),
@@ -158,7 +211,11 @@ public class AmazonCloudSearchConnector 
         ManifoldCF.getMasterDatabasePassword());
       documentChunkManager = new DocumentChunkManager(currentContext,databaseHandle);
     }
-
    String serverHost = params.getParameter(AmazonCloudSearchConfig.SERVER_HOST);
    if (serverHost == null)
      throw new ManifoldCFException("Server host parameter required");
    String serverPath = params.getParameter(AmazonCloudSearchConfig.SERVER_PATH);
+
+    String serverHost = params.getParameter(AmazonCloudSearchConfig.SERVER_HOST);
+    if (serverHost == null)
+      throw new ManifoldCFException("Server host parameter required");
+    String serverPath = params.getParameter(AmazonCloudSearchConfig.SERVER_PATH);
     if (serverPath == null)
       throw new ManifoldCFException("Server path parameter required");
     String proxyProtocol = params.getParameter(AmazonCloudSearchConfig.PROXY_PROTOCOL);
@@ -184,7 +241,14 @@ public class AmazonCloudSearchConnector 
         throw new ManifoldCFException("Number format exception: "+e.getMessage(),e);
       }
     }
-    
    poster.addHeader("Content-Type", "application/json");
  }
  
  /** Test the connection.  Returns a string describing the connection integrity.
  *@return the connection's status as a displayable string.
  */
  @Override
+    
+    poster.addHeader("Content-Type", "application/json");
+  }
+  
+  /** Test the connection.  Returns a string describing the connection integrity.
+  *@return the connection's status as a displayable string.
+  */
+  @Override
   public String check() throws ManifoldCFException {
     try {
       getSession();
@@ -227,36 +291,36 @@ public class AmazonCloudSearchConnector 
       Logging.connectors.debug(e);
       return "Transient exception: "+e.getMessage();
     }
-  }
-  
-  private String getStatusFromJsonResponse(String responsbody) throws ManifoldCFException {
-    try {
-      JsonParser parser = new JsonFactory().createJsonParser(responsbody);
-      while (parser.nextToken() != JsonToken.END_OBJECT)
-      {
-        String name = parser.getCurrentName();
-        if("status".equalsIgnoreCase(name)){
-          parser.nextToken();
-          return parser.getText();
-        }
-      }
-    } catch (JsonParseException e) {
-      throw new ManifoldCFException(e);
-    } catch (IOException e) {
-      throw new ManifoldCFException(e);
-    }
-    return null;
-  }
-  
-  private String parseMessage(JsonParser parser) throws JsonParseException, IOException {
-    while(parser.nextToken() != JsonToken.END_ARRAY){
-      String name = parser.getCurrentName();
-      if("message".equalsIgnoreCase(name)){
-        parser.nextToken();
-        return parser.getText();
-      }
-    }
-    return null;
+  }
+  
+  private String getStatusFromJsonResponse(String responsbody) throws ManifoldCFException {
+    try {
+      JsonParser parser = new JsonFactory().createJsonParser(responsbody);
+      while (parser.nextToken() != JsonToken.END_OBJECT)
+      {
+        String name = parser.getCurrentName();
+        if("status".equalsIgnoreCase(name)){
+          parser.nextToken();
+          return parser.getText();
+        }
+      }
+    } catch (JsonParseException e) {
+      throw new ManifoldCFException(e);
+    } catch (IOException e) {
+      throw new ManifoldCFException(e);
+    }
+    return null;
+  }
+  
+  private String parseMessage(JsonParser parser) throws JsonParseException, IOException {
+    while(parser.nextToken() != JsonToken.END_ARRAY){
+      String name = parser.getCurrentName();
+      if("message".equalsIgnoreCase(name)){
+        parser.nextToken();
+        return parser.getText();
+      }
+    }
+    return null;
   }
 
   /** Get an output version string, given an output specification.  The output version string is used to uniquely describe the pertinent details of
@@ -331,13 +395,13 @@ public class AmazonCloudSearchConnector 
   @Override
   public int addOrReplaceDocument(String documentURI, String outputDescription, RepositoryDocument document, String authorityNameString, IOutputAddActivity activities)
     throws ManifoldCFException, ServiceInterruption
-  {
-    // Establish a session
+  {
+    // Establish a session
     getSession();
     
     SpecPacker sp = new SpecPacker(outputDescription);
     
-    String jsondata = "";
+    String jsondata = "";
     try {
       //build json..
       SDFModel model = new SDFModel();
@@ -385,19 +449,37 @@ public class AmazonCloudSearchConnector 
         }
       }
       doc.setFields(fields);
-      model.addDocument(doc);
      
      //generate json data.
      jsondata = model.toJSON();
      
      documentChunkManager.addDocument(doc.getId(), false, jsondata);
      return DOCUMENTSTATUS_ACCEPTED;
    } 
    catch (SAXException e) {
      // if document data could not be converted to JSON by jackson.
      Logging.connectors.debug(e);
-      throw new ManifoldCFException(e);
-    } catch (JsonProcessingException e) {
+      model.addDocument(doc);
+      
+      //generate json data.
+      jsondata = model.toJSON();
+      
+      documentChunkManager.addDocument(doc.getId(), false, jsondata);
+      return DOCUMENTSTATUS_ACCEPTED;
+    } 
+    catch (SAXException e) {
       // if document data could not be converted to JSON by jackson.
       Logging.connectors.debug(e);
-      throw new ManifoldCFException(e);
-    } catch (TikaException e) {
+      throw new ManifoldCFException(e);
+    } catch (JsonProcessingException e) {
+      // if document data could not be converted to JSON by jackson.
+      Logging.connectors.debug(e);
+      throw new ManifoldCFException(e);
+    } catch (TikaException e) {
       // if document could not be parsed by tika.
-      Logging.connectors.debug(e);
-      return DOCUMENTSTATUS_REJECTED;
-    } catch (IOException e) {
+      Logging.connectors.debug(e);
+      return DOCUMENTSTATUS_REJECTED;
+    } catch (IOException e) {
       // if document data could not be read when the document parsing by tika.
-      Logging.connectors.debug(e);
      throw new ManifoldCFException(e);
    }
  }
  
  private Metadata extractBinaryFile(RepositoryDocument document, HashMap fields)
      throws IOException, SAXException, TikaException {
    
    //extract body text and metadata fields from binary file.
+      Logging.connectors.debug(e);
+      throw new ManifoldCFException(e);
+    }
+  }
+  
+  private Metadata extractBinaryFile(RepositoryDocument document, HashMap fields)
+      throws IOException, SAXException, TikaException {
+    
+    //extract body text and metadata fields from binary file.
     InputStream is = document.getBinaryStream();
     Parser parser = new AutoDetectParser();
     ContentHandler handler = new BodyContentHandler();
@@ -409,9 +491,9 @@ public class AmazonCloudSearchConnector 
       fields.put(FILE_BODY_TEXT_FIELDNAME, bodyStr);
     }
     return metadata;
-  }
-
-  /** Remove a document using the connector.
+  }
+
+  /** Remove a document using the connector.
   * Note that the last outputDescription is included, since it may be necessary for the connector to use such information to know how to properly remove the document.
   *@param documentURI is the URI of the document.  The URI is presumed to be the unique identifier which the output data store will use to process
   * and serve the document.  This URI is constructed by the repository connector which fetches the document, and is thus universal across all output connectors.
@@ -427,7 +509,54 @@ public class AmazonCloudSearchConnector 
     
     String jsonData = "";
     try {
-      SDFModel model = new SDFModel();
      SDFModel.Document doc = model.new Document();
      doc.setType("delete");
      doc.setId(ManifoldCF.hash(documentURI));
      model.addDocument(doc);
      jsonData = model.toJSON();
      
      documentChunkManager.addDocument(doc.getId(), false, jsonData);
      
    } catch (JsonProcessingException e) {
      throw new ManifoldCFException(e);
    }
  }
  
  @Override
  public void noteJobComplete(IOutputNotifyActivity activities)
      throws ManifoldCFException, ServiceInterruption {
    getSession();
    
    final int chunkNum = documentChunkManager.getDocumentChunkNum();
    
    for(int i = 0;i<chunkNum;i++)
    {
      String chunk = documentChunkManager.buildDocumentChunk(String.valueOf(i), i);
      
      //post data..
      String responsbody = postData(chunk);
      // check status
      String status = getStatusFromJsonResponse(responsbody);
      if("success".equals(status))
      {
        int n = i + 1;
        Logging.connectors.info("successfully send document chunk " + n + " of " + chunkNum);
        
        //remove documents from table..
        documentChunkManager.removeDocumentsInChunk(String.valueOf(i));
      }
      else
      {
        throw new ManifoldCFException("recieved error status from service after feeding document. response body : " + responsbody);
      }
      
    }
  }

  /**
   * Fill in a Server tab configuration parameter map for calling a Velocity
   * template.
+      SDFModel model = new SDFModel();
+      SDFModel.Document doc = model.new Document();
+      doc.setType("delete");
+      doc.setId(ManifoldCF.hash(documentURI));
+      model.addDocument(doc);
+      jsonData = model.toJSON();
+      
+      documentChunkManager.addDocument(doc.getId(), false, jsonData);
+      
+    } catch (JsonProcessingException e) {
+      throw new ManifoldCFException(e);
+    }
+  }
+  
+  @Override
+  public void noteJobComplete(IOutputNotifyActivity activities)
+      throws ManifoldCFException, ServiceInterruption {
+    getSession();
+    
+    final int chunkNum = documentChunkManager.getDocumentChunkNum();
+    
+    for(int i = 0;i<chunkNum;i++)
+    {
+      String chunk = documentChunkManager.buildDocumentChunk(String.valueOf(i), i);
+      
+      //post data..
+      String responsbody = postData(chunk);
+      // check status
+      String status = getStatusFromJsonResponse(responsbody);
+      if("success".equals(status))
+      {
+        int n = i + 1;
+        Logging.connectors.info("successfully send document chunk " + n + " of " + chunkNum);
+        
+        //remove documents from table..
+        documentChunkManager.removeDocumentsInChunk(String.valueOf(i));
+      }
+      else
+      {
+        throw new ManifoldCFException("recieved error status from service after feeding document. response body : " + responsbody);
+      }
+      
+    }
+  }
+
+  /**
+   * Fill in a Server tab configuration parameter map for calling a Velocity
+   * template.
    *
    * @param newMap is the map to fill in
    * @param parameters is the current set of configuration parameters
@@ -569,42 +698,42 @@ public class AmazonCloudSearchConnector 
 
     return null;
   }
-
-  private String postData(String jsonData) throws ServiceInterruption, ManifoldCFException {
-    CloseableHttpClient httpclient = HttpClients.createDefault();
-    try {
-      poster.setEntity(new StringEntity(jsonData, Consts.UTF_8));
+
+  private String postData(String jsonData) throws ServiceInterruption, ManifoldCFException {
+    CloseableHttpClient httpclient = HttpClients.createDefault();
+    try {
+      poster.setEntity(new StringEntity(jsonData, Consts.UTF_8));
       HttpResponse res = httpclient.execute(poster);
-      
-      HttpEntity resEntity = res.getEntity();
-      return EntityUtils.toString(resEntity);
-    } catch (ClientProtocolException e) {
-      throw new ManifoldCFException(e);
-    } catch (IOException e) {
-      handleIOException(e);
-    } finally {
-      try {
-        httpclient.close();
+      
+      HttpEntity resEntity = res.getEntity();
+      return EntityUtils.toString(resEntity);
+    } catch (ClientProtocolException e) {
+      throw new ManifoldCFException(e);
+    } catch (IOException e) {
+      handleIOException(e);
+    } finally {
+      try {
+        httpclient.close();
       } catch (IOException e) {
-        //do nothing
-      }
-    }
-    return null;
-  }
-  
-  private static void handleIOException(IOException e)
-      throws ManifoldCFException, ServiceInterruption {
-    if (!(e instanceof java.net.SocketTimeoutException)
-        && (e instanceof InterruptedIOException)) {
-      throw new ManifoldCFException("Interrupted: " + e.getMessage(), e,
-          ManifoldCFException.INTERRUPTED);
-    }
-    Logging.connectors.warn(
-        "Amazon CloudSearch: IO exception: " + e.getMessage(), e);
-    long currentTime = System.currentTimeMillis();
-    throw new ServiceInterruption("IO exception: " + e.getMessage(), e,
-        currentTime + 300000L, currentTime + 3 * 60 * 60000L, -1, false);
-  }
+        //do nothing
+      }
+    }
+    return null;
+  }
+  
+  private static void handleIOException(IOException e)
+      throws ManifoldCFException, ServiceInterruption {
+    if (!(e instanceof java.net.SocketTimeoutException)
+        && (e instanceof InterruptedIOException)) {
+      throw new ManifoldCFException("Interrupted: " + e.getMessage(), e,
+          ManifoldCFException.INTERRUPTED);
+    }
+    Logging.connectors.warn(
+        "Amazon CloudSearch: IO exception: " + e.getMessage(), e);
+    long currentTime = System.currentTimeMillis();
+    throw new ServiceInterruption("IO exception: " + e.getMessage(), e,
+        currentTime + 300000L, currentTime + 3 * 60 * 60000L, -1, false);
+  }
   
   protected static void fillInFieldMappingSpecificationMap(Map<String,Object> paramMap, Specification os)
   {
@@ -1090,5 +1219,5 @@ public class AmazonCloudSearchConnector 
       return keepAllMetadata;
     }
   }
-  
+  
 }

Propchange: manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/AmazonCloudSearchConnector.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/AmazonCloudSearchConnector.java
------------------------------------------------------------------------------
    svn:keywords = Id

Modified: manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentChunkManager.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentChunkManager.java?rev=1604042&r1=1604041&r2=1604042&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentChunkManager.java (original)
+++ manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentChunkManager.java Thu Jun 19 22:24:20 2014
@@ -1 +1,381 @@
-/* $Id$ */

/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.manifoldcf.agents.output.amazoncloudsearch;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.manifoldcf.core.interfaces.ColumnDescription;
import org.apache.manifoldcf.core.interfaces.IDBInterface;
import org.apache.manifoldcf.core.interfaces.IResultRow;
import org.apache.manifoldcf.core.interfaces.IResultSet;
import org.apache.manifoldcf.core.interfaces.IThreadContext;
import org.apache.manifoldcf.core.interfaces.ManifoldCFException;
import org.apache.manifoldcf.core.interfaces.StringSet;
import org.exolab.castor.util.Iterator;

public class DocumentChunkManager extends org.apache.manifoldcf.core.database.BaseTable
{
  // Database fields
  private final static String UID_FIELD = "uid";
  private final static String ON_DELETE_FIELD = "ondelete";
  private final static String SDF_DATA_FIELD = "sdfdata";
  
  private final int MAX_DOCNUM_IN_CHUNK = 5000;
  private final int MAX_DOCBYTES_IN_CHUNK = 5242880; //5MB
  
  Map<String, List<String>> uidMap = new HashMap<String, List<String>>();
  
  public DocumentChunkManager(IThreadContext threadContext,
      IDBInterface database)
  {
    super(database, "amazoncloudsearch_documentdata");
  }

  /** Install the manager 
   * @throws ManifoldCFException 
   */
  public void install() throws ManifoldCFException
  {
    // Standard practice: outer loop on install methods, no transactions
    while (true)
    {
      Map existing = getTableSchema(null,null);
      if (existing == null)
      {
        // Install the table.
        HashMap map = new HashMap();
        map.put(UID_FIELD,new ColumnDescription("VARCHAR(255)",true,false,null,null,false));
        map.put(ON_DELETE_FIELD,new ColumnDescription("CHAR(1)",false,false,null,null,false));
        map.put(SDF_DATA_FIELD,new ColumnDescription("CLOB",false,true,null,null,false));
        performCreate(map,null);
      }
      else
      {
        // Upgrade code, if needed, goes here
      }

      // Handle indexes, if needed
      
      break;
    }
  }
  
  /** Uninstall the manager.
  */
  public void deinstall()
    throws ManifoldCFException
  {
    performDrop(null);
  }
  
  /** Perform a table creation operation.
  *@param columnMap is the map describing the columns and types.  NOTE that these are abstract
  * types, which will be mapped to the proper types for the actual database inside this
  * layer.
  *@param invalidateKeys are the cache keys that should be invalidated, if any.
  */
  protected void performCreate(Map columnMap, StringSet invalidateKeys)
    throws ManifoldCFException
  {
    dbInterface.performCreate(tableName,columnMap,invalidateKeys);
  }
  
  /**
   * Add document infomation on table.
   * @param uid documentuid
   * @param onDelete if the document delete from Amazon CloudSearch, set true.
   * @param sdfData document SDF data.
   * @throws ManifoldCFException
   */
  public void addDocument(String uid, boolean onDelete, String sdfData) 
      throws ManifoldCFException
  {
    ArrayList params = new ArrayList();
    params.add(uid);
    IResultSet set = performQuery("SELECT * FROM "+getTableName()+" WHERE "+
        UID_FIELD+"=?",params,null,null);
    
    Map<String,String> parameterMap = new HashMap<String,String>();
    parameterMap.put(UID_FIELD, uid);
    if(onDelete)
    {
      parameterMap.put(ON_DELETE_FIELD, "1");
    }
    else 
    {
      parameterMap.put(ON_DELETE_FIELD, "0");
    }
    parameterMap.put(SDF_DATA_FIELD, sdfData);
    
    //if record exists on table, update record.
    if(set.getRowCount() > 0)
    {
      List<String> whereParameters = new ArrayList<String>();
      whereParameters.add(uid);
      performUpdate(parameterMap, " WHERE "+UID_FIELD+"=?", whereParameters, null);
    }
    else
    {
      performInsert(parameterMap, null);
    }
  }
  
  /**
   * get document chunk number.
   * @return number of document chunk.
   * @throws ManifoldCFException
   */
  public int getDocumentChunkNum() throws ManifoldCFException{
    String query = "select count(*) as COUNT from " + tableName;
    IResultSet resultset = performQuery(query, null, null, null);
    Object value = resultset.getRow(0).getValue("count");
    if(value == null)
    {
      return 0;
    }
    
    int count = (Integer)value;
    if(count <= 0)
    {
      return 0;
    }
    else
    {
      return (count/MAX_DOCNUM_IN_CHUNK)+1;
    }
  }
  
  /**
   * build document chunk (sdf formatted) from Table with chunkid which is given.
   * @param chunkid 
   * @return document chunk (sdf formatted)
   * @throws ManifoldCFException 
   */
  public String buildDocumentChunk(String chunkid, int chunkIdx) throws ManifoldCFException
  {
    int idxFrom = chunkIdx * MAX_DOCNUM_IN_CHUNK;
    int idxTo = idxFrom + MAX_DOCNUM_IN_CHUNK;
    
    String query = "SELECT * FROM ("
        + "SELECT ROW_NUMBER() OVER() as RN, " 
        + UID_FIELD +", " + ON_DELETE_FIELD + ", " + SDF_DATA_FIELD 
        + " FROM " + tableName
        + " ORDER BY " + UID_FIELD +" ) "
        + "as E WHERE E.RN BETWEEN " + idxFrom 
        + " AND "+ idxTo;
    
    IResultSet resultset = performQuery(query, null, null, null);
    
    int currentSize = 0;
    StringBuffer sdfChunk = new StringBuffer();
    List<String> uidListInChunk = new ArrayList<String>();
    for(int i=0;i<resultset.getRowCount();i++)
    {
      IResultRow result = resultset.getRow(i);
      String uidValue = (String)result.getValue(UID_FIELD);
      String onDeleteValue = (String)result.getValue(ON_DELETE_FIELD);
      String sdfValue = (String)result.getValue(SDF_DATA_FIELD);
      
      // if sdf size is over max size, break the loop. 
      currentSize = currentSize + sdfValue.getBytes().length;
      if(currentSize > MAX_DOCBYTES_IN_CHUNK)
      {
        break;
      }
      
      //append sdf value to chunk.
      sdfChunk.append(sdfValue);
      uidListInChunk.add(uidValue);
    }
    
    uidMap.put(chunkid, uidListInChunk);
    return sdfChunk.toString();
  }
  
  /**
   * remvoe documents in chunk.
   * @param chunkid
   * @throws ManifoldCFException
   */
  public void removeDocumentsInChunk(String chunkid)
    throws ManifoldCFException
  {
    List<String> uidList = uidMap.get(chunkid);
    for(String uid : uidList)
    {
      List<String> param = new ArrayList<String>();
      param.add(uid);
      performDelete("WHERE " + UID_FIELD + "=?", param, null);
    }
    uidMap.remove(chunkid);
  }
}
\ No newline at end of file
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.manifoldcf.agents.output.amazoncloudsearch;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Iterator;
+
+import java.io.InputStream;
+import java.io.IOException;
+
+import org.apache.manifoldcf.core.interfaces.ColumnDescription;
+import org.apache.manifoldcf.core.interfaces.IndexDescription;
+import org.apache.manifoldcf.core.interfaces.IDBInterface;
+import org.apache.manifoldcf.core.interfaces.IResultRow;
+import org.apache.manifoldcf.core.interfaces.IResultSet;
+import org.apache.manifoldcf.core.interfaces.ManifoldCFException;
+import org.apache.manifoldcf.core.interfaces.BinaryInput;
+import org.apache.manifoldcf.core.interfaces.TempFileInput;
+import org.apache.manifoldcf.core.interfaces.ClauseDescription;
+import org.apache.manifoldcf.core.interfaces.UnitaryClause;
+
+public class DocumentChunkManager extends org.apache.manifoldcf.core.database.BaseTable
+{
+  // Database fields
+  private final static String UID_FIELD = "uid";                        // This is the document key, which is a dochash value
+  private final static String HOST_FIELD = "serverhost";            // The host and path are there to make sure we don't collide between connections
+  private final static String PATH_FIELD = "serverpath";
+  private final static String ON_DELETE_FIELD = "ondelete";
+  private final static String SDF_DATA_FIELD = "sdfdata";
+  
+  public DocumentChunkManager(
+      IDBInterface database)
+  {
+    super(database, "amazoncloudsearch_documentdata");
+  }
+
+  /** Install the manager 
+   * @throws ManifoldCFException 
+   */
+  public void install() throws ManifoldCFException
+  {
+    // Standard practice: outer loop on install methods, no transactions
+    while (true)
+    {
+      Map existing = getTableSchema(null,null);
+      if (existing == null)
+      {
+        // Install the table.
+        HashMap map = new HashMap();
+        map.put(UID_FIELD,new ColumnDescription("VARCHAR(40)",false,false,null,null,false));
+        map.put(HOST_FIELD,new ColumnDescription("VARCHAR(255)",false,false,null,null,false));
+        map.put(PATH_FIELD,new ColumnDescription("VARCHAR(255)",false,false,null,null,false));
+        map.put(ON_DELETE_FIELD,new ColumnDescription("CHAR(1)",false,false,null,null,false));
+        map.put(SDF_DATA_FIELD,new ColumnDescription("BLOB",false,true,null,null,false));
+        performCreate(map,null);
+      }
+      else
+      {
+        // Upgrade code, if needed, goes here
+      }
+
+      // Handle indexes, if needed
+      IndexDescription keyIndex = new IndexDescription(true,new String[]{HOST_FIELD,PATH_FIELD,UID_FIELD});
+
+      Map indexes = getTableIndexes(null,null);
+      Iterator iter = indexes.keySet().iterator();
+      while (iter.hasNext())
+      {
+        String indexName = (String)iter.next();
+        IndexDescription id = (IndexDescription)indexes.get(indexName);
+
+        if (keyIndex != null && id.equals(keyIndex))
+          keyIndex = null;
+        else if (indexName.indexOf("_pkey") == -1)
+          // This index shouldn't be here; drop it
+          performRemoveIndex(indexName);
+      }
+
+      // Add the ones we didn't find
+      if (keyIndex != null)
+        performAddIndex(null,keyIndex);
+
+
+      break;
+    }
+  }
+  
+  /** Uninstall the manager.
+  */
+  public void deinstall()
+    throws ManifoldCFException
+  {
+    performDrop(null);
+  }
+  
+  /**
+  * Remove document information in table (and make a delete marker).
+  * @param uid document uid
+  */
+  public void removeDocument(String uid, String host, String path)
+    throws ManifoldCFException
+  {
+    while (true)
+    {
+      long sleepAmt = 0L;
+      try
+      {
+        beginTransaction();
+        try
+        {
+
+          ArrayList params = new ArrayList();
+          String query = buildConjunctionClause(params,new ClauseDescription[]{
+            new UnitaryClause(HOST_FIELD,host),
+            new UnitaryClause(PATH_FIELD,path),
+            new UnitaryClause(UID_FIELD,uid)});
+
+          IResultSet set = performQuery("SELECT "+UID_FIELD+" FROM "+getTableName()+" WHERE "+
+            query+" FOR UPDATE",params,null,null);
+            
+          Map<String,String> parameterMap = new HashMap<String,String>();
+          parameterMap.put(ON_DELETE_FIELD, "1");
+          parameterMap.put(SDF_DATA_FIELD, null);
+            
+          //if record exists on table, update record.
+          if(set.getRowCount() > 0)
+          {
+            performUpdate(parameterMap, " WHERE "+query, whereParameters, null);
+          }
+          else
+          {
+            parameterMap.put(UID_FIELD, uid);
+            parameterMap.put(HOST_FIELD, host);
+            parameterMap.put(PATH_FIELD, path);
+            performInsert(parameterMap, null);
+          }
+
+          break;
+        }
+        catch (ManifoldCFException e)
+        {
+          signalRollback();
+          throw e;
+        }
+        catch (RuntimeException e)
+        {
+          signalRollback();
+          throw e;
+        }
+        catch (Error e)
+        {
+          signalRollback();
+          throw e;
+        }
+        finally
+        {
+          endTransaction();
+        }
+      }
+      catch (ManifoldCFException e)
+      {
+        // Look for deadlock and retry if so
+        if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
+        {
+          sleepAmt = getSleepAmt();
+          continue;
+        }
+        throw e;
+      }
+    }
+  }
+  
+  /**
+   * Add/replace document information in table.
+   * @param uid documentuid
+   * @param sdfData document SDF data.
+   * @throws ManifoldCFException
+   */
+  public void addOrReplaceDocument(String uid, String host, String path, InputStream sdfData) 
+      throws ManifoldCFException, IOException
+  {
+    TempFileInput tfi = null;
+    try
+    {
+      // This downloads all the data from upstream!
+      try
+      {
+        tfi = new TempFileInput(sdfData);
+      }
+      catch (ManifoldCFException e)
+      {
+        if (e.getErrorCode() == ManifoldCFException.INTERRUPTED)
+          throw e;
+        throw new IOException("Fetch failed: "+e.getMessage());
+      }
+      
+      while (true)
+      {
+        long sleepAmt = 0L;
+        try
+        {
+          beginTransaction();
+          try
+          {
+
+            ArrayList params = new ArrayList();
+            String query = buildConjunctionClause(params,new ClauseDescription[]{
+              new UnitaryClause(HOST_FIELD,host),
+              new UnitaryClause(PATH_FIELD,path),
+              new UnitaryClause(UID_FIELD,uid)});
+
+            IResultSet set = performQuery("SELECT "+UID_FIELD+" FROM "+getTableName()+" WHERE "+
+              query+" FOR UPDATE",params,null,null);
+            
+            Map<String,String> parameterMap = new HashMap<String,String>();
+            parameterMap.put(ON_DELETE_FIELD, "0");
+            parameterMap.put(SDF_DATA_FIELD, tfi);
+            
+            //if record exists on table, update record.
+            if(set.getRowCount() > 0)
+            {
+              performUpdate(parameterMap, " WHERE "+query, whereParameters, null);
+            }
+            else
+            {
+              parameterMap.put(UID_FIELD, uid);
+              parameterMap.put(HOST_FIELD, host);
+              parameterMap.put(PATH_FIELD, path);
+              performInsert(parameterMap, null);
+            }
+      
+            break;
+          }
+          catch (ManifoldCFException e)
+          {
+            signalRollback();
+            throw e;
+          }
+          catch (RuntimeException e)
+          {
+            signalRollback();
+            throw e;
+          }
+          catch (Error e)
+          {
+            signalRollback();
+            throw e;
+          }
+          finally
+          {
+            endTransaction();
+          }
+        }
+        catch (ManifoldCFException e)
+        {
+          // Look for deadlock and retry if so
+          if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
+          {
+            sleepAmt = getSleepAmt();
+            continue;
+          }
+          throw e;
+        }
+      }
+
+    }
+    finally
+    {
+      if (tfi != null)
+        tfi.discard();
+    }
+  }
+  
+  /** Read a chunk of documents.
+  */
+  public DocumentRecord[] readChunk(String host, String path, int maximumNumber)
+    throws ManifoldCFException
+  {
+    ArrayList params = new ArrayList();
+    String query = buildConjunctionClause(params,new ClauseDescription[]{
+      new UnitaryClause(HOST_FIELD,host),
+      new UnitaryClause(PATH_FIELD,path)});
+
+    IResultSet set = performQuery("SELECT * FROM "+getTableName()+" WHERE "+query+" "+constructOffsetLimitClause(0,maximumNumber),params,null,null);
+    DocumentRecord[] rval = new DocumentRecord[set.getRowCount()];
+    for (int i = 0; i < set.getRowCount(); i++)
+    {
+      IResultRow row = set.getRow(i);
+      rval[i] = new DocumentRecord(host,path,
+        (String)row.getValue(UID_FIELD),
+        ((String)row.getValue(ON_DELETE_FIELD)).equals("1"),
+        (BinaryInput)row.getValue(SDF_DATA_FIELD));
+    }
+    return rval;
+  }
+  
+  /** Delete the chunk of documents (presumably because we processed them successfully)
+  */
+  public void deleteChunk(DocumentRecord[] records)
+    throws ManifoldCFException
+  {
+    // Do the whole thing in a transaction -- if we mess up, we'll have to try everything again
+    while (true)
+    {
+      long sleepAmt = 0L;
+      try
+      {
+        beginTransaction();
+        try
+        {
+
+          // Theoretically we could aggregate the records, but for now delete one at a time.
+          for (DocumentRecord dr : records)
+          {
+            String host = dr.getHost();
+            String path = dr.getPath();
+            String uid = dr.getUid();
+            ArrayList params = new ArrayList();
+            String query = buildConjunctionClause(params,new ClauseDescription[]{
+              new UnitaryClause(HOST_FIELD,host),
+              new UnitaryClause(PATH_FIELD,path),
+              new UnitaryClause(UID_FIELD,uid)});
+            performDelete("WHERE "+query,params,null,null);
+          }
+          
+          break;
+        }
+        catch (ManifoldCFException e)
+        {
+          signalRollback();
+          throw e;
+        }
+        catch (RuntimeException e)
+        {
+          signalRollback();
+          throw e;
+        }
+        catch (Error e)
+        {
+          signalRollback();
+          throw e;
+        }
+        finally
+        {
+          endTransaction();
+        }
+      }
+      catch (ManifoldCFException e)
+      {
+        // Look for deadlock and retry if so
+        if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
+        {
+          sleepAmt = getSleepAmt();
+          continue;
+        }
+        throw e;
+      }
+    }
+
+  }
+  
+}

Propchange: manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentChunkManager.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentChunkManager.java
------------------------------------------------------------------------------
    svn:keywords = Id

Added: manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentRecord.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentRecord.java?rev=1604042&view=auto
==============================================================================
--- manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentRecord.java (added)
+++ manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentRecord.java Thu Jun 19 22:24:20 2014
@@ -0,0 +1,85 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.manifoldcf.agents.output.amazoncloudsearch;
+
+import org.apache.manifoldcf.core.*;
+
+import java.io.*;
+
+public class DocumentRecord {
+  
+  protected final String host;
+  protected final String path;
+  protected final String uid;
+  protected final boolean delete;
+  protected final BinaryInput data;
+  
+  public DocumentRecord(String host, String path, String uid, boolean delete, BinaryInput data)
+  {
+    this.host = host;
+    this.path = path;
+    this.uid = uid;
+    this.delete = delete;
+    this.data = data;
+  }
+
+  public String getHost()
+  {
+    return host;
+  }
+  
+  public String getPath()
+  {
+    return path;
+  }
+  
+  public String getUid()
+  {
+    return uid;
+  }
+  
+  public boolean getDelete()
+  {
+    return delete;
+  }
+  
+  public long getStreamLength()
+    throws ManifoldCFException
+  {
+    if (data != null)
+      return data.getLength();
+    return 0L;
+  }
+  
+  public InputStream getDataStream()
+    throws ManifoldCFException
+  {
+    if (data != null)
+      return data.getStream();
+    return null;
+  }
+  
+  public void close()
+    throws ManifoldCFException
+  {
+    if (data != null)
+      data.discard();
+  }
+  
+}

Propchange: manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentRecord.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentRecord.java
------------------------------------------------------------------------------
    svn:keywords = Id

Propchange: manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/SDFModel.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/SDFModel.java
------------------------------------------------------------------------------
    svn:keywords = Id