You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2014/05/27 15:44:13 UTC
svn commit: r1597785 - in
/manifoldcf/branches/CONNECTORS-916/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch:
AmazonCloudSearchConnector.java DocumentChunkManager.java
Author: kwright
Date: Tue May 27 13:44:12 2014
New Revision: 1597785
URL: http://svn.apache.org/r1597785
Log:
Patch from Takumi
Added:
manifoldcf/branches/CONNECTORS-916/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentChunkManager.java
Modified:
manifoldcf/branches/CONNECTORS-916/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/AmazonCloudSearchConnector.java
Modified: manifoldcf/branches/CONNECTORS-916/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/AmazonCloudSearchConnector.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-916/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/AmazonCloudSearchConnector.java?rev=1597785&r1=1597784&r2=1597785&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-916/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/AmazonCloudSearchConnector.java (original)
+++ manifoldcf/branches/CONNECTORS-916/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/AmazonCloudSearchConnector.java Tue May 27 13:44:12 2014
@@ -42,21 +42,9 @@ import org.apache.http.client.config.Req
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClients;
-import org.apache.http.util.EntityUtils;
-import org.apache.manifoldcf.agents.interfaces.IOutputAddActivity;
-import org.apache.manifoldcf.agents.interfaces.IOutputRemoveActivity;
-import org.apache.manifoldcf.agents.interfaces.OutputSpecification;
-import org.apache.manifoldcf.agents.interfaces.RepositoryDocument;
-import org.apache.manifoldcf.agents.interfaces.ServiceInterruption;
+import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.apache.manifoldcf.agents.interfaces.IOutputAddActivity;
import org.apache.manifoldcf.agents.interfaces.IOutputNotifyActivity;
import org.apache.manifoldcf.agents.interfaces.IOutputRemoveActivity;
import org.apache.manifoldcf.agents.interfaces.OutputSpecification;
import org.apache.manifoldcf.agents.interfaces.RepositoryDocument;
import org.apache.manifoldcf.agents.interfaces.ServiceInterruption;
import org.apache.manifoldcf.agents.output.BaseOutputConnector;
-import org.apache.manifoldcf.agents.output.amazoncloudsearch.SDFModel.Document;
-import org.apache.manifoldcf.core.interfaces.ConfigParams;
-import org.apache.manifoldcf.core.interfaces.ConfigurationNode;
-import org.apache.manifoldcf.core.interfaces.ManifoldCFException;
-import org.apache.manifoldcf.core.interfaces.IThreadContext;
-import org.apache.manifoldcf.core.interfaces.IHTTPOutput;
-import org.apache.manifoldcf.core.interfaces.IPostParameters;
+import org.apache.manifoldcf.agents.output.amazoncloudsearch.SDFModel.Document;
import org.apache.manifoldcf.core.interfaces.ConfigParams;
import org.apache.manifoldcf.core.interfaces.ConfigurationNode;
import org.apache.manifoldcf.core.interfaces.DBInterfaceFactory;
import org.apache.manifoldcf.core.interfaces.IDBInterface;
import org.apache.manifoldcf.core.interfaces.ManifoldCFException;
import org.apache.manifoldcf.core.interfaces.IThreadContext;
import org.apache.manifoldcf.core.interfaces.IHTTPOutput;
import org.apache.manifoldcf.core.interfaces.IPostParameters;
import org.apache.manifoldcf.core.interfaces.IPasswordMapperActivity;
import org.apache.manifoldcf.core.interfaces.SpecificationNode;
import org.apache.manifoldcf.core.system.ManifoldCF;
@@ -103,21 +91,9 @@ public class AmazonCloudSearchConnector
private static final String VIEW_SPECIFICATION_HTML = "viewSpecification.html";
- /** Local connection */
- protected HttpPost poster = null;
-
- /** cloudsearch field name for file body text. */
- private static final String FILE_BODY_TEXT_FIELDNAME = "f_bodytext";
-
- /** Constructor.
+ /** Local connection */
protected HttpPost poster = null;
/** Document Chunk Manager */
private DocumentChunkManager documentChunkManager;
/** cloudsearch field name for file body text. */
private static final String FILE_BODY_TEXT_FIELDNAME = "f_bodytext";
/** Constructor.
*/
- public AmazonCloudSearchConnector(){
- }
-
- /** Return the list of activities that this connector supports (i.e. writes into the log).
- *@return the list.
- */
- @Override
+ public AmazonCloudSearchConnector(){
}
@Override
public void install(IThreadContext threadContext)
throws ManifoldCFException
{
IDBInterface mainDatabase = DBInterfaceFactory.make(threadContext,
ManifoldCF.getMasterDatabaseName(),
ManifoldCF.getMasterDatabaseUsername(),
ManifoldCF.getMasterDatabasePassword());
DocumentChunkManager dcmanager = new DocumentChunkManager(threadContext,mainDatabase);
dcmanager.install();
}
@Override
public void deinstall(IThreadContext threadContext)
throws ManifoldCFException
{
IDBInterface mainDatabase = DBInterfaceFactory.make(threadContext,
ManifoldCF.getMasterDatabaseName(),
ManifoldCF.getMasterDatabaseUsername(),
ManifoldCF.getMasterDatabasePassword());
DocumentChunkManager dcmanager = new DocumentChunkManager(threadContext,mainDatabase);
dcmanager.deinstall();
}
/** Return the list of activities that this connector supports (i.e. writes into the log).
*@return the list.
*/
@Override
public String[] getActivitiesList()
{
return new String[]{INGEST_ACTIVITY,REMOVE_ACTIVITY};
@@ -158,13 +134,7 @@ public class AmazonCloudSearchConnector
}
/** Set up a session */
- protected void getSession()
- throws ManifoldCFException
- {
- String serverHost = params.getParameter(AmazonCloudSearchConfig.SERVER_HOST);
- if (serverHost == null)
- throw new ManifoldCFException("Server host parameter required");
- String serverPath = params.getParameter(AmazonCloudSearchConfig.SERVER_PATH);
+ protected void getSession()
throws ManifoldCFException
{
if (documentChunkManager == null)
{
IDBInterface databaseHandle = DBInterfaceFactory.make(currentContext,
ManifoldCF.getMasterDatabaseName(),
ManifoldCF.getMasterDatabaseUsername(),
ManifoldCF.getMasterDatabasePassword());
documentChunkManager = new DocumentChunkManager(currentContext,databaseHandle);
}
String serverHost = params.getParameter(AmazonCloudSearchConfig.SERVER_HOST);
if (serverHost == null)
throw new ManifoldCFException("Server host parameter required");
String serverPath = params.getParameter(AmazonCloudSearchConfig.SERVER_PATH);
if (serverPath == null)
throw new ManifoldCFException("Server path parameter required");
String proxyProtocol = params.getParameter(AmazonCloudSearchConfig.PROXY_PROTOCOL);
@@ -190,14 +160,7 @@ public class AmazonCloudSearchConnector
throw new ManifoldCFException("Number format exception: "+e.getMessage(),e);
}
}
-
- poster.addHeader("Content-Type", "application/json");
- }
-
- /** Test the connection. Returns a string describing the connection integrity.
- *@return the connection's status as a displayable string.
- */
- @Override
+
poster.addHeader("Content-Type", "application/json");
}
/** Test the connection. Returns a string describing the connection integrity.
*@return the connection's status as a displayable string.
*/
@Override
public String check() throws ManifoldCFException {
try {
getSession();
@@ -398,14 +361,7 @@ public class AmazonCloudSearchConnector
}
}
doc.setFields(fields);
- model.addDocument(doc);
-
- //generate json data.
- jsondata = model.toJSON();
- }
- catch (SAXException e) {
- // if document data could not be converted to JSON by jackson.
- Logging.connectors.debug(e);
+ model.addDocument(doc);
//generate json data.
jsondata = model.toJSON();
documentChunkManager.addDocument(doc.getId(), false, jsondata);
return DOCUMENTSTATUS_ACCEPTED;
}
catch (SAXException e) {
// if document data could not be converted to JSON by jackson.
Logging.connectors.debug(e);
throw new ManifoldCFException(e);
} catch (JsonProcessingException e) {
// if document data could not be converted to JSON by jackson.
@@ -417,29 +373,7 @@ public class AmazonCloudSearchConnector
return DOCUMENTSTATUS_REJECTED;
} catch (IOException e) {
// if document data could not be read when the document parsing by tika.
- Logging.connectors.debug(e);
- throw new ManifoldCFException(e);
- }
-
- //post data..
- String responsbody = postData(jsondata);
-
- // check status
- String status = getStatusFromJsonResponse(responsbody);
- if("success".equals(status))
- {
- activities.recordActivity(null,INGEST_ACTIVITY,new Long(document.getBinaryLength()),documentURI,"OK",null);
- return DOCUMENTSTATUS_ACCEPTED;
- }
- else {
- throw new ManifoldCFException("recieved error status from service after feeding document. response body : " + responsbody);
- }
- }
-
- private Metadata extractBinaryFile(RepositoryDocument document, HashMap fields)
- throws IOException, SAXException, TikaException {
-
- //extract body text and metadata fields from binary file.
+ Logging.connectors.debug(e);
throw new ManifoldCFException(e);
}
}
private Metadata extractBinaryFile(RepositoryDocument document, HashMap fields)
throws IOException, SAXException, TikaException {
//extract body text and metadata fields from binary file.
InputStream is = document.getBinaryStream();
Parser parser = new AutoDetectParser();
ContentHandler handler = new BodyContentHandler();
@@ -469,31 +403,7 @@ public class AmazonCloudSearchConnector
String jsonData = "";
try {
- SDFModel model = new SDFModel();
- SDFModel.Document doc = model.new Document();
- doc.setType("delete");
- doc.setId(documentURI);
- model.addDocument(doc);
- jsonData = model.toJSON();
- } catch (JsonProcessingException e) {
- throw new ManifoldCFException(e);
- }
- String responsbody = postData(jsonData);
-
- // check status
- String status = getStatusFromJsonResponse(responsbody);
- if("success".equals(status))
- {
- activities.recordActivity(null,REMOVE_ACTIVITY,null,documentURI,"OK",null);
- }
- else {
- throw new ManifoldCFException("recieved error status from service after feeding document.");
- }
- }
-
- /**
- * Fill in a Server tab configuration parameter map for calling a Velocity
- * template.
+ SDFModel model = new SDFModel();
SDFModel.Document doc = model.new Document();
doc.setType("delete");
doc.setId(ManifoldCF.hash(documentURI));
model.addDocument(doc);
jsonData = model.toJSON();
documentChunkManager.addDocument(doc.getId(), false, jsonData);
} catch (JsonProcessingException e) {
throw new ManifoldCFException(e);
}
}
@Override
public void noteJobComplete(IOutputNotifyActivity activities)
throws ManifoldCFException, ServiceInterruption {
getSession();
final int chunkNum = documentChunkManager.getDocumentChunkNum();
for(int i = 0;i<chunkNum;i++)
{
String chunk = documentChunkManager.buildDocumentChunk(String.valueOf(i), i);
//post data..
String responsbody = postData(chunk);
// check status
String status = getStatusFromJsonResponse(responsbody);
if("success".equals(status))
{
int n = i + 1;
Logging.connectors.info("successfully send document chunk " + n + " of " + chunkNum);
//remove documents from table..
documentChunkManager.removeDocumentsInChunk(String.valueOf(i));
}
else
{
throw new ManifoldCFException("recieved error status from service after feeding document. response body : " + responsbody);
}
}
}
/**
* Fill in a Server tab configuration parameter map for calling a Velocity
* template.
*
* @param newMap is the map to fill in
* @param parameters is the current set of configuration parameters
Added: manifoldcf/branches/CONNECTORS-916/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentChunkManager.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-916/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentChunkManager.java?rev=1597785&view=auto
==============================================================================
--- manifoldcf/branches/CONNECTORS-916/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentChunkManager.java (added)
+++ manifoldcf/branches/CONNECTORS-916/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentChunkManager.java Tue May 27 13:44:12 2014
@@ -0,0 +1 @@
+/* $Id$ */
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.manifoldcf.agents.output.amazoncloudsearch;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.manifoldcf.core.interfaces.ColumnDescription;
import org.apache.manifoldcf.core.interfaces.IDBInterface;
import org.apache.manifoldcf.core.interfaces.IResultRow;
import org.apache.manifoldcf.core.interfaces.IResultSet;
import org.apache.manifoldcf.core.interfaces.IThreadContext;
import org.apache.manifoldcf.core.interfaces.ManifoldCFException;
import org.apache.manifoldcf.core.interfaces.StringSet;
import org.exolab.castor.util.Iterator;
public class DocumentChunkManager extends org.apache.manifoldcf.core.database.BaseTable
{
// Database fields
private final static String UID_FIELD = "uid";
private final static String ON_DELETE_FIELD = "ondelete";
private final static String SDF_DATA_FIELD = "sdfdata";
private final int MAX_DOCNUM_IN_CHUNK = 5000;
private final int MAX_DOCBYTES_IN_CHUNK = 5242880; //5MB
Map<String, List<String>> uidMap = new HashMap<String, List<String>>();
public DocumentChunkManager(IThreadContext threadContext,
IDBInterface database)
{
super(database, "amazoncloudsearch_documentdata");
}
/** Install the manager
* @throws ManifoldCFException
*/
public void install() throws ManifoldCFException
{
// Standard practice: outer loop on install methods, no transactions
while (true)
{
Map existing = getTableSchema(null,null);
if (existing == null)
{
// Install the table.
HashMap map = new HashMap();
map.put(UID_FIELD,new ColumnDescription("VARCHAR(255)",true,false,null,null,false));
map.put(ON_DELETE_FIELD,new ColumnDescription("CHAR(1)",false,false,null,null,false));
map.put(SDF_DATA_FIELD,new ColumnDescription("CLOB",false,true,null,null,false));
performCreate(map,null);
}
else
{
// Upgrade code, if needed, goes here
}
// Handle indexes, if needed
break;
}
}
/** Uninstall the manager.
*/
public void deinstall()
throws ManifoldCFException
{
performDrop(null);
}
/** Perform a table creation operation.
*@param columnMap is the map describing the columns and types. NOTE that these are abstract
* types, which will be mapped to the proper types for the actual database inside this
* layer.
*@param invalidateKeys are the cache keys that should be invalidated, if any.
*/
protected void performCreate(Map columnMap, StringSet invalidateKeys)
throws ManifoldCFException
{
dbInterface.performCreate(tableName,columnMap,invalidateKeys);
}
/**
* Add document infomation on table.
* @param uid documentuid
* @param onDelete if the document delete from Amazon CloudSearch, set true.
* @param sdfData document SDF data.
* @throws ManifoldCFException
*/
public void addDocument(String uid, boolean onDelete, String sdfData)
throws ManifoldCFException
{
ArrayList params = new ArrayList();
params.add(uid);
IResultSet set = performQuery("SELECT * FROM "+getTableName()+" WHERE "+
UID_FIELD+"=?",params,null,null);
Map<String,String> parameterMap = new HashMap<String,String>();
parameterMap.put(UID_FIELD, uid);
if(onDelete)
{
parameterMap.put(ON_DELETE_FIELD, "1");
}
else
{
parameterMap.put(ON_DELETE_FIELD, "0");
}
parameterMap.put(SDF_DATA_FIELD, sdfData);
//if record exists on table, update record.
if(set.getRowCount() > 0)
{
List<String> whereParameters = new ArrayList<String>();
whereParameters.add(uid);
performUpdate(parameterMap, " WHERE "+UID_FIELD+"=?", whereParameters, null);
}
else
{
performInsert(parameterMap, null);
}
}
/**
* get document chunk number.
* @return number of document chunk.
* @throws ManifoldCFException
*/
public int getDocumentChunkNum() throws ManifoldCFException{
String query = "select count(*) as COUNT from " + tableName;
IResultSet resultset = performQuery(query, null, null, null);
Object value = resultset.getRow(0).getValue("count");
if(value == null)
{
return 0;
}
int count = (Integer)value;
if(count <= 0)
{
return 0;
}
else
{
return (count/MAX_DOCNUM_IN_CHUNK)+1;
}
}
/**
* build document chunk (sdf formatted) from Table with chunkid which is given.
* @param chunkid
* @return document chunk (sdf formatted)
* @throws ManifoldCFException
*/
public String buildDocumentChunk(String chunkid, int chunkIdx) throws ManifoldCFException
{
int idxFrom = chunkIdx * MAX_DOCNUM_IN_CHUNK;
int idxTo = idxFrom + MAX_DOCNUM_IN_CHUNK;
String query = "SELECT * FROM ("
+ "SELECT ROW_NUMBER() OVER() as RN, "
+ UID_FIELD +", " + ON_DELETE_FIELD + ", " + SDF_DATA_FIELD
+ " FROM " + tableName
+ " ORDER BY " + UID_FIELD +" ) "
+ "as E WHERE E.RN BETWEEN " + idxFrom
+ " AND "+ idxTo;
IResultSet resultset = performQuery(query, null, null, null);
int currentSize = 0;
StringBuffer sdfChunk = new StringBuffer();
List<String> uidListInChunk = new ArrayList<String>();
for(int i=0;i<resultset.getRowCount();i++)
{
IResultRow result = resultset.getRow(i);
String uidValue = (String)result.getValue(UID_FIELD);
String onDeleteValue = (String)result.getValue(ON_DELETE_FIELD);
String sdfValue = (String)result.getValue(SDF_DATA_FIELD);
// if sdf size is over max size, break the loop.
currentSize = currentSize + sdfValue.getBytes().length;
if(currentSize > MAX_DOCBYTES_IN_CHUNK)
{
break;
}
//append sdf value to chunk.
sdfChunk.append(sdfValue);
uidListInChunk.add(uidValue);
}
uidMap.put(chunkid, uidListInChunk);
return sdfChunk.toString();
}
/**
* remvoe documents in chunk.
* @param chunkid
* @throws ManifoldCFException
*/
public void removeDocumentsInChunk(String chunkid)
throws ManifoldCFException
{
List<String> uidList = uidMap.get(chunkid);
for(String uid : uidList)
{
List<String> param = new ArrayList<String>();
param.add(uid);
performDelete("WHERE " + UID_FIELD + "=?", param, null);
}
uidMap.remove(chunkid);
}
}
\ No newline at end of file