You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2014/06/20 00:24:21 UTC
svn commit: r1604042 -
/manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/
Author: kwright
Date: Thu Jun 19 22:24:20 2014
New Revision: 1604042
URL: http://svn.apache.org/r1604042
Log:
Revamp DocumentChunkManager; this breaks the build
Added:
manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentRecord.java (with props)
Modified:
manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/AmazonCloudSearchConnector.java (contents, props changed)
manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentChunkManager.java (contents, props changed)
manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/SDFModel.java (props changed)
Modified: manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/AmazonCloudSearchConnector.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/AmazonCloudSearchConnector.java?rev=1604042&r1=1604041&r2=1604042&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/AmazonCloudSearchConnector.java (original)
+++ manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/AmazonCloudSearchConnector.java Thu Jun 19 22:24:20 2014
@@ -17,16 +17,16 @@
* limitations under the License.
*/
package org.apache.manifoldcf.agents.output.amazoncloudsearch;
-
-import java.io.IOException;
-import java.io.InputStream;
+
+import java.io.IOException;
+import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.StringReader;
-import java.io.BufferedReader;
-import java.util.ArrayList;
-import java.util.HashMap;
+import java.io.BufferedReader;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.Iterator;
-import java.util.List;
+import java.util.List;
import java.util.Map;
import java.util.Locale;
import java.util.Set;
@@ -35,21 +35,30 @@ import java.util.HashSet;
import org.apache.commons.io.FilenameUtils;
import org.apache.http.Consts;
import org.apache.http.HttpEntity;
-import org.apache.http.HttpHost;
-import org.apache.http.HttpResponse;
-import org.apache.http.client.ClientProtocolException;
-import org.apache.http.client.config.RequestConfig;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.entity.StringEntity;
+import org.apache.http.HttpHost;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.ClientProtocolException;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
-import org.apache.manifoldcf.agents.interfaces.IOutputAddActivity;
import org.apache.manifoldcf.agents.interfaces.IOutputNotifyActivity;
import org.apache.manifoldcf.agents.interfaces.IOutputRemoveActivity;
import org.apache.manifoldcf.agents.interfaces.RepositoryDocument;
+import org.apache.manifoldcf.agents.interfaces.IOutputAddActivity;
+import org.apache.manifoldcf.agents.interfaces.IOutputNotifyActivity;
+import org.apache.manifoldcf.agents.interfaces.IOutputRemoveActivity;
+import org.apache.manifoldcf.agents.interfaces.RepositoryDocument;
import org.apache.manifoldcf.agents.interfaces.ServiceInterruption;
import org.apache.manifoldcf.agents.output.BaseOutputConnector;
import org.apache.manifoldcf.agents.output.amazoncloudsearch.SDFModel.Document;
import org.apache.manifoldcf.core.interfaces.Specification;
-import org.apache.manifoldcf.core.interfaces.ConfigParams;
import org.apache.manifoldcf.core.interfaces.ConfigurationNode;
import org.apache.manifoldcf.core.interfaces.DBInterfaceFactory;
import org.apache.manifoldcf.core.interfaces.IDBInterface;
import org.apache.manifoldcf.core.interfaces.ManifoldCFException;
import org.apache.manifoldcf.core.interfaces.IThreadContext;
import org.apache.manifoldcf.core.interfaces.IHTTPOutput;
+import org.apache.manifoldcf.core.interfaces.ConfigParams;
+import org.apache.manifoldcf.core.interfaces.ConfigurationNode;
+import org.apache.manifoldcf.core.interfaces.DBInterfaceFactory;
+import org.apache.manifoldcf.core.interfaces.IDBInterface;
+import org.apache.manifoldcf.core.interfaces.ManifoldCFException;
+import org.apache.manifoldcf.core.interfaces.IThreadContext;
+import org.apache.manifoldcf.core.interfaces.IHTTPOutput;
import org.apache.manifoldcf.core.interfaces.IPostParameters;
import org.apache.manifoldcf.core.interfaces.IPasswordMapperActivity;
import org.apache.manifoldcf.core.interfaces.SpecificationNode;
@@ -97,9 +106,20 @@ public class AmazonCloudSearchConnector
private static final String VIEW_SPECIFICATION_HTML = "viewSpecification.html";
- /** Local connection */
protected HttpPost poster = null;
/** Document Chunk Manager */
private DocumentChunkManager documentChunkManager;
/** cloudsearch field name for file body text. */
private static final String FILE_BODY_TEXT_FIELDNAME = "f_bodytext";
/** Constructor.
+ /** Local connection */
+ protected HttpPost poster = null;
+
+ /** Document Chunk Manager */
+ private DocumentChunkManager documentChunkManager;
+
+ /** cloudsearch field name for file body text. */
+ private static final String FILE_BODY_TEXT_FIELDNAME = "f_bodytext";
+
+ /** Constructor.
*/
- public AmazonCloudSearchConnector(){
}
+ public AmazonCloudSearchConnector(){
+ }
+
/** Clear out any state information specific to a given thread.
* This method is called when this object is returned to the connection pool.
*/
@@ -109,7 +129,37 @@ public class AmazonCloudSearchConnector
super.clearThreadContext();
documentChunkManager = null;
}
-
@Override
public void install(IThreadContext threadContext)
throws ManifoldCFException
{
IDBInterface mainDatabase = DBInterfaceFactory.make(threadContext,
ManifoldCF.getMasterDatabaseName(),
ManifoldCF.getMasterDatabaseUsername(),
ManifoldCF.getMasterDatabasePassword());
DocumentChunkManager dcmanager = new DocumentChunkManager(threadContext,mainDatabase);
dcmanager.install();
}
@Override
public void deinstall(IThreadContext threadContext)
throws ManifoldCFException
{
IDBInterface mainDatabase = DBInterfaceFactory.make(threadContext,
ManifoldCF.getMasterDatabaseName(),
ManifoldCF.getMasterDatabaseUsername(),
ManifoldCF.getMasterDatabasePassword());
DocumentChunkManager dcmanager = new DocumentChunkManager(threadContext,mainDatabase);
dcmanager.deinstall();
}
/** Return the list of activities that this connector supports (i.e. writes into the log).
*@return the list.
*/
@Override
+
+ @Override
+ public void install(IThreadContext threadContext)
+ throws ManifoldCFException
+ {
+ IDBInterface mainDatabase = DBInterfaceFactory.make(threadContext,
+ ManifoldCF.getMasterDatabaseName(),
+ ManifoldCF.getMasterDatabaseUsername(),
+ ManifoldCF.getMasterDatabasePassword());
+
+ DocumentChunkManager dcmanager = new DocumentChunkManager(threadContext,mainDatabase);
+ dcmanager.install();
+ }
+
+ @Override
+ public void deinstall(IThreadContext threadContext)
+ throws ManifoldCFException
+ {
+ IDBInterface mainDatabase = DBInterfaceFactory.make(threadContext,
+ ManifoldCF.getMasterDatabaseName(),
+ ManifoldCF.getMasterDatabaseUsername(),
+ ManifoldCF.getMasterDatabasePassword());
+
+ DocumentChunkManager dcmanager = new DocumentChunkManager(threadContext,mainDatabase);
+ dcmanager.deinstall();
+ }
+
+ /** Return the list of activities that this connector supports (i.e. writes into the log).
+ *@return the list.
+ */
+ @Override
public String[] getActivitiesList()
{
return new String[]{INGEST_ACTIVITY,REMOVE_ACTIVITY};
@@ -150,7 +200,10 @@ public class AmazonCloudSearchConnector
}
/** Set up a session */
- protected void getSession()
throws ManifoldCFException
{
if (documentChunkManager == null)
+ protected void getSession()
+ throws ManifoldCFException
+ {
+ if (documentChunkManager == null)
{
IDBInterface databaseHandle = DBInterfaceFactory.make(currentContext,
ManifoldCF.getMasterDatabaseName(),
@@ -158,7 +211,11 @@ public class AmazonCloudSearchConnector
ManifoldCF.getMasterDatabasePassword());
documentChunkManager = new DocumentChunkManager(currentContext,databaseHandle);
}
-
String serverHost = params.getParameter(AmazonCloudSearchConfig.SERVER_HOST);
if (serverHost == null)
throw new ManifoldCFException("Server host parameter required");
String serverPath = params.getParameter(AmazonCloudSearchConfig.SERVER_PATH);
+
+ String serverHost = params.getParameter(AmazonCloudSearchConfig.SERVER_HOST);
+ if (serverHost == null)
+ throw new ManifoldCFException("Server host parameter required");
+ String serverPath = params.getParameter(AmazonCloudSearchConfig.SERVER_PATH);
if (serverPath == null)
throw new ManifoldCFException("Server path parameter required");
String proxyProtocol = params.getParameter(AmazonCloudSearchConfig.PROXY_PROTOCOL);
@@ -184,7 +241,14 @@ public class AmazonCloudSearchConnector
throw new ManifoldCFException("Number format exception: "+e.getMessage(),e);
}
}
-
poster.addHeader("Content-Type", "application/json");
}
/** Test the connection. Returns a string describing the connection integrity.
*@return the connection's status as a displayable string.
*/
@Override
+
+ poster.addHeader("Content-Type", "application/json");
+ }
+
+ /** Test the connection. Returns a string describing the connection integrity.
+ *@return the connection's status as a displayable string.
+ */
+ @Override
public String check() throws ManifoldCFException {
try {
getSession();
@@ -227,36 +291,36 @@ public class AmazonCloudSearchConnector
Logging.connectors.debug(e);
return "Transient exception: "+e.getMessage();
}
- }
-
- private String getStatusFromJsonResponse(String responsbody) throws ManifoldCFException {
- try {
- JsonParser parser = new JsonFactory().createJsonParser(responsbody);
- while (parser.nextToken() != JsonToken.END_OBJECT)
- {
- String name = parser.getCurrentName();
- if("status".equalsIgnoreCase(name)){
- parser.nextToken();
- return parser.getText();
- }
- }
- } catch (JsonParseException e) {
- throw new ManifoldCFException(e);
- } catch (IOException e) {
- throw new ManifoldCFException(e);
- }
- return null;
- }
-
- private String parseMessage(JsonParser parser) throws JsonParseException, IOException {
- while(parser.nextToken() != JsonToken.END_ARRAY){
- String name = parser.getCurrentName();
- if("message".equalsIgnoreCase(name)){
- parser.nextToken();
- return parser.getText();
- }
- }
- return null;
+ }
+
+ private String getStatusFromJsonResponse(String responsbody) throws ManifoldCFException {
+ try {
+ JsonParser parser = new JsonFactory().createJsonParser(responsbody);
+ while (parser.nextToken() != JsonToken.END_OBJECT)
+ {
+ String name = parser.getCurrentName();
+ if("status".equalsIgnoreCase(name)){
+ parser.nextToken();
+ return parser.getText();
+ }
+ }
+ } catch (JsonParseException e) {
+ throw new ManifoldCFException(e);
+ } catch (IOException e) {
+ throw new ManifoldCFException(e);
+ }
+ return null;
+ }
+
+ private String parseMessage(JsonParser parser) throws JsonParseException, IOException {
+ while(parser.nextToken() != JsonToken.END_ARRAY){
+ String name = parser.getCurrentName();
+ if("message".equalsIgnoreCase(name)){
+ parser.nextToken();
+ return parser.getText();
+ }
+ }
+ return null;
}
/** Get an output version string, given an output specification. The output version string is used to uniquely describe the pertinent details of
@@ -331,13 +395,13 @@ public class AmazonCloudSearchConnector
@Override
public int addOrReplaceDocument(String documentURI, String outputDescription, RepositoryDocument document, String authorityNameString, IOutputAddActivity activities)
throws ManifoldCFException, ServiceInterruption
- {
- // Establish a session
+ {
+ // Establish a session
getSession();
SpecPacker sp = new SpecPacker(outputDescription);
- String jsondata = "";
+ String jsondata = "";
try {
//build json..
SDFModel model = new SDFModel();
@@ -385,19 +449,37 @@ public class AmazonCloudSearchConnector
}
}
doc.setFields(fields);
- model.addDocument(doc);
//generate json data.
jsondata = model.toJSON();
documentChunkManager.addDocument(doc.getId(), false, jsondata);
return DOCUMENTSTATUS_ACCEPTED;
}
catch (SAXException e) {
// if document data could not be converted to JSON by jackson.
Logging.connectors.debug(e);
- throw new ManifoldCFException(e);
- } catch (JsonProcessingException e) {
+ model.addDocument(doc);
+
+ //generate json data.
+ jsondata = model.toJSON();
+
+ documentChunkManager.addDocument(doc.getId(), false, jsondata);
+ return DOCUMENTSTATUS_ACCEPTED;
+ }
+ catch (SAXException e) {
// if document data could not be converted to JSON by jackson.
Logging.connectors.debug(e);
- throw new ManifoldCFException(e);
- } catch (TikaException e) {
+ throw new ManifoldCFException(e);
+ } catch (JsonProcessingException e) {
+ // if document data could not be converted to JSON by jackson.
+ Logging.connectors.debug(e);
+ throw new ManifoldCFException(e);
+ } catch (TikaException e) {
// if document could not be parsed by tika.
- Logging.connectors.debug(e);
- return DOCUMENTSTATUS_REJECTED;
- } catch (IOException e) {
+ Logging.connectors.debug(e);
+ return DOCUMENTSTATUS_REJECTED;
+ } catch (IOException e) {
// if document data could not be read when the document parsing by tika.
- Logging.connectors.debug(e);
throw new ManifoldCFException(e);
}
}
private Metadata extractBinaryFile(RepositoryDocument document, HashMap fields)
throws IOException, SAXException, TikaException {
//extract body text and metadata fields from binary file.
+ Logging.connectors.debug(e);
+ throw new ManifoldCFException(e);
+ }
+ }
+
+ private Metadata extractBinaryFile(RepositoryDocument document, HashMap fields)
+ throws IOException, SAXException, TikaException {
+
+ //extract body text and metadata fields from binary file.
InputStream is = document.getBinaryStream();
Parser parser = new AutoDetectParser();
ContentHandler handler = new BodyContentHandler();
@@ -409,9 +491,9 @@ public class AmazonCloudSearchConnector
fields.put(FILE_BODY_TEXT_FIELDNAME, bodyStr);
}
return metadata;
- }
-
- /** Remove a document using the connector.
+ }
+
+ /** Remove a document using the connector.
* Note that the last outputDescription is included, since it may be necessary for the connector to use such information to know how to properly remove the document.
*@param documentURI is the URI of the document. The URI is presumed to be the unique identifier which the output data store will use to process
* and serve the document. This URI is constructed by the repository connector which fetches the document, and is thus universal across all output connectors.
@@ -427,7 +509,54 @@ public class AmazonCloudSearchConnector
String jsonData = "";
try {
- SDFModel model = new SDFModel();
SDFModel.Document doc = model.new Document();
doc.setType("delete");
doc.setId(ManifoldCF.hash(documentURI));
model.addDocument(doc);
jsonData = model.toJSON();
documentChunkManager.addDocument(doc.getId(), false, jsonData);
} catch (JsonProcessingException e) {
throw new ManifoldCFException(e);
}
}
@Override
public void noteJobComplete(IOutputNotifyActivity activities)
throws ManifoldCFException, ServiceInterruption {
getSession();
final int chunkNum = documentChunkManager.getDocumentChunkNum();
for(int i = 0;i<chunkNum;i++)
{
String chunk = documentChunkManager.buildDocumentChunk(String.valueOf(i), i);
//post data..
String responsbody = postData(chunk);
// check status
String status = getStatusFromJsonResponse(responsbody);
if("success".equals(status))
{
int n = i + 1;
Logging.connectors.info("successfully send document chunk " + n + " of " + chunkNum);
//remove documents from table..
documentChunkManager.removeDocumentsInChunk(String.valueOf(i));
}
else
{
throw new ManifoldCFException("recieved error status from service after feeding document. response body : " + responsbody);
}
}
}
/**
* Fill in a Server tab configuration parameter map for calling a Velocity
* template.
+ SDFModel model = new SDFModel();
+ SDFModel.Document doc = model.new Document();
+ doc.setType("delete");
+ doc.setId(ManifoldCF.hash(documentURI));
+ model.addDocument(doc);
+ jsonData = model.toJSON();
+
+ documentChunkManager.addDocument(doc.getId(), false, jsonData);
+
+ } catch (JsonProcessingException e) {
+ throw new ManifoldCFException(e);
+ }
+ }
+
+ @Override
+ public void noteJobComplete(IOutputNotifyActivity activities)
+ throws ManifoldCFException, ServiceInterruption {
+ getSession();
+
+ final int chunkNum = documentChunkManager.getDocumentChunkNum();
+
+ for(int i = 0;i<chunkNum;i++)
+ {
+ String chunk = documentChunkManager.buildDocumentChunk(String.valueOf(i), i);
+
+ //post data..
+ String responsbody = postData(chunk);
+ // check status
+ String status = getStatusFromJsonResponse(responsbody);
+ if("success".equals(status))
+ {
+ int n = i + 1;
+ Logging.connectors.info("successfully send document chunk " + n + " of " + chunkNum);
+
+ //remove documents from table..
+ documentChunkManager.removeDocumentsInChunk(String.valueOf(i));
+ }
+ else
+ {
+ throw new ManifoldCFException("recieved error status from service after feeding document. response body : " + responsbody);
+ }
+
+ }
+ }
+
+ /**
+ * Fill in a Server tab configuration parameter map for calling a Velocity
+ * template.
*
* @param newMap is the map to fill in
* @param parameters is the current set of configuration parameters
@@ -569,42 +698,42 @@ public class AmazonCloudSearchConnector
return null;
}
-
- private String postData(String jsonData) throws ServiceInterruption, ManifoldCFException {
- CloseableHttpClient httpclient = HttpClients.createDefault();
- try {
- poster.setEntity(new StringEntity(jsonData, Consts.UTF_8));
+
+ private String postData(String jsonData) throws ServiceInterruption, ManifoldCFException {
+ CloseableHttpClient httpclient = HttpClients.createDefault();
+ try {
+ poster.setEntity(new StringEntity(jsonData, Consts.UTF_8));
HttpResponse res = httpclient.execute(poster);
-
- HttpEntity resEntity = res.getEntity();
- return EntityUtils.toString(resEntity);
- } catch (ClientProtocolException e) {
- throw new ManifoldCFException(e);
- } catch (IOException e) {
- handleIOException(e);
- } finally {
- try {
- httpclient.close();
+
+ HttpEntity resEntity = res.getEntity();
+ return EntityUtils.toString(resEntity);
+ } catch (ClientProtocolException e) {
+ throw new ManifoldCFException(e);
+ } catch (IOException e) {
+ handleIOException(e);
+ } finally {
+ try {
+ httpclient.close();
} catch (IOException e) {
- //do nothing
- }
- }
- return null;
- }
-
- private static void handleIOException(IOException e)
- throws ManifoldCFException, ServiceInterruption {
- if (!(e instanceof java.net.SocketTimeoutException)
- && (e instanceof InterruptedIOException)) {
- throw new ManifoldCFException("Interrupted: " + e.getMessage(), e,
- ManifoldCFException.INTERRUPTED);
- }
- Logging.connectors.warn(
- "Amazon CloudSearch: IO exception: " + e.getMessage(), e);
- long currentTime = System.currentTimeMillis();
- throw new ServiceInterruption("IO exception: " + e.getMessage(), e,
- currentTime + 300000L, currentTime + 3 * 60 * 60000L, -1, false);
- }
+ //do nothing
+ }
+ }
+ return null;
+ }
+
+ private static void handleIOException(IOException e)
+ throws ManifoldCFException, ServiceInterruption {
+ if (!(e instanceof java.net.SocketTimeoutException)
+ && (e instanceof InterruptedIOException)) {
+ throw new ManifoldCFException("Interrupted: " + e.getMessage(), e,
+ ManifoldCFException.INTERRUPTED);
+ }
+ Logging.connectors.warn(
+ "Amazon CloudSearch: IO exception: " + e.getMessage(), e);
+ long currentTime = System.currentTimeMillis();
+ throw new ServiceInterruption("IO exception: " + e.getMessage(), e,
+ currentTime + 300000L, currentTime + 3 * 60 * 60000L, -1, false);
+ }
protected static void fillInFieldMappingSpecificationMap(Map<String,Object> paramMap, Specification os)
{
@@ -1090,5 +1219,5 @@ public class AmazonCloudSearchConnector
return keepAllMetadata;
}
}
-
+
}
Propchange: manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/AmazonCloudSearchConnector.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/AmazonCloudSearchConnector.java
------------------------------------------------------------------------------
svn:keywords = Id
Modified: manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentChunkManager.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentChunkManager.java?rev=1604042&r1=1604041&r2=1604042&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentChunkManager.java (original)
+++ manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentChunkManager.java Thu Jun 19 22:24:20 2014
@@ -1 +1,381 @@
-/* $Id$ */
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.manifoldcf.agents.output.amazoncloudsearch;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.manifoldcf.core.interfaces.ColumnDescription;
import org.apache.manifoldcf.core.interfaces.IDBInterface;
import org.apache.manifoldcf.core.interfaces.IResultRow;
import org.apache.manifoldcf.core.interfaces.IResultSet;
import org.apache.manifoldcf.core.interfaces.IThreadContext;
import org.apache.manifoldcf.core.interfaces.ManifoldCFException;
import org.apache.manifoldcf.core.interfaces.StringSet;
import org.exolab.castor.util.Iterator;
public class DocumentChunkManager extends org.apache.manifoldcf.core.database.BaseTable
{
// Database fields
private final static String UID_FIELD = "uid";
private final static String ON_DELETE_FIELD = "ondelete";
private final static String SDF_DATA_FIELD = "sdfdata";
private final int MAX_DOCNUM_IN_CHUNK = 5000;
private final int MAX_DOCBYTES_IN_CHUNK = 5242880; //5MB
Map<String, List<String>> uidMap = new HashMap<String, List<String>>();
public DocumentChunkManager(IThreadContext threadContext,
IDBInterface database)
{
super(database, "amazoncloudsearch_documentdata");
}
/** Install the manager
* @throws ManifoldCFException
*/
public void install() throws ManifoldCFException
{
// Standard practice: outer loop on install methods, no transactions
while (true)
{
Map existing = getTableSchema(null,null);
if (existing == null)
{
// Install the table.
HashMap map = new HashMap();
map.put(UID_FIELD,new ColumnDescription("VARCHAR(255)",true,false,null,null,false));
map.put(ON_DELETE_FIELD,new ColumnDescription("CHAR(1)",false,false,null,null,false));
map.put(SDF_DATA_FIELD,new ColumnDescription("CLOB",false,true,null,null,false));
performCreate(map,null);
}
else
{
// Upgrade code, if needed, goes here
}
// Handle indexes, if needed
break;
}
}
/** Uninstall the manager.
*/
public void deinstall()
throws ManifoldCFException
{
performDrop(null);
}
/** Perform a table creation operation.
*@param columnMap is the map describing the columns and types. NOTE that these are abstract
* types, which will be mapped to the proper types for the actual database inside this
* layer.
*@param invalidateKeys are the cache keys that should be invalidated, if any.
*/
protected void performCreate(Map columnMap, StringSet invalidateKeys)
throws ManifoldCFException
{
dbInterface.performCreate(tableName,columnMap,invalidateKeys);
}
/**
* Add document infomation on table.
* @param uid documentuid
* @param onDelete if the document delete from Amazon CloudSearch, set true.
* @param sdfData document SDF data.
* @throws ManifoldCFException
*/
public void addDocument(String uid, boolean onDelete, String sdfData)
throws ManifoldCFException
{
ArrayList params = new ArrayList();
params.add(uid);
IResultSet set = performQuery("SELECT * FROM "+getTableName()+" WHERE "+
UID_FIELD+"=?",params,null,null);
Map<String,String> parameterMap = new HashMap<String,String>();
parameterMap.put(UID_FIELD, uid);
if(onDelete)
{
parameterMap.put(ON_DELETE_FIELD, "1");
}
else
{
parameterMap.put(ON_DELETE_FIELD, "0");
}
parameterMap.put(SDF_DATA_FIELD, sdfData);
//if record exists on table, update record.
if(set.getRowCount() > 0)
{
List<String> whereParameters = new ArrayList<String>();
whereParameters.add(uid);
performUpdate(parameterMap, " WHERE "+UID_FIELD+"=?", whereParameters, null);
}
else
{
performInsert(parameterMap, null);
}
}
/**
* get document chunk number.
* @return number of document chunk.
* @throws ManifoldCFException
*/
public int getDocumentChunkNum() throws ManifoldCFException{
String query = "select count(*) as COUNT from " + tableName;
IResultSet resultset = performQuery(query, null, null, null);
Object value = resultset.getRow(0).getValue("count");
if(value == null)
{
return 0;
}
int count = (Integer)value;
if(count <= 0)
{
return 0;
}
else
{
return (count/MAX_DOCNUM_IN_CHUNK)+1;
}
}
/**
* build document chunk (sdf formatted) from Table with chunkid which is given.
* @param chunkid
* @return document chunk (sdf formatted)
* @throws ManifoldCFException
*/
public String buildDocumentChunk(String chunkid, int chunkIdx) throws ManifoldCFException
{
int idxFrom = chunkIdx * MAX_DOCNUM_IN_CHUNK;
int idxTo = idxFrom + MAX_DOCNUM_IN_CHUNK;
String query = "SELECT * FROM ("
+ "SELECT ROW_NUMBER() OVER() as RN, "
+ UID_FIELD +", " + ON_DELETE_FIELD + ", " + SDF_DATA_FIELD
+ " FROM " + tableName
+ " ORDER BY " + UID_FIELD +" ) "
+ "as E WHERE E.RN BETWEEN " + idxFrom
+ " AND "+ idxTo;
IResultSet resultset = performQuery(query, null, null, null);
int currentSize = 0;
StringBuffer sdfChunk = new StringBuffer();
List<String> uidListInChunk = new ArrayList<String>();
for(int i=0;i<resultset.getRowCount();i++)
{
IResultRow result = resultset.getRow(i);
String uidValue = (String)result.getValue(UID_FIELD);
String onDeleteValue = (String)result.getValue(ON_DELETE_FIELD);
String sdfValue = (String)result.getValue(SDF_DATA_FIELD);
// if sdf size is over max size, break the loop.
currentSize = currentSize + sdfValue.getBytes().length;
if(currentSize > MAX_DOCBYTES_IN_CHUNK)
{
break;
}
//append sdf value to chunk.
sdfChunk.append(sdfValue);
uidListInChunk.add(uidValue);
}
uidMap.put(chunkid, uidListInChunk);
return sdfChunk.toString();
}
/**
* remvoe documents in chunk.
* @param chunkid
* @throws ManifoldCFException
*/
public void removeDocumentsInChunk(String chunkid)
throws ManifoldCFException
{
List<String> uidList = uidMap.get(chunkid);
for(String uid : uidList)
{
List<String> param = new ArrayList<String>();
param.add(uid);
performDelete("WHERE " + UID_FIELD + "=?", param, null);
}
uidMap.remove(chunkid);
}
}
\ No newline at end of file
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.manifoldcf.agents.output.amazoncloudsearch;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Iterator;
+
+import java.io.InputStream;
+import java.io.IOException;
+
+import org.apache.manifoldcf.core.interfaces.ColumnDescription;
+import org.apache.manifoldcf.core.interfaces.IndexDescription;
+import org.apache.manifoldcf.core.interfaces.IDBInterface;
+import org.apache.manifoldcf.core.interfaces.IResultRow;
+import org.apache.manifoldcf.core.interfaces.IResultSet;
+import org.apache.manifoldcf.core.interfaces.ManifoldCFException;
+import org.apache.manifoldcf.core.interfaces.BinaryInput;
+import org.apache.manifoldcf.core.interfaces.TempFileInput;
+import org.apache.manifoldcf.core.interfaces.ClauseDescription;
+import org.apache.manifoldcf.core.interfaces.UnitaryClause;
+
+public class DocumentChunkManager extends org.apache.manifoldcf.core.database.BaseTable
+{
+ // Database fields
+ private final static String UID_FIELD = "uid"; // This is the document key, which is a dochash value
+ private final static String HOST_FIELD = "serverhost"; // The host and path are there to make sure we don't collide between connections
+ private final static String PATH_FIELD = "serverpath";
+ private final static String ON_DELETE_FIELD = "ondelete";
+ private final static String SDF_DATA_FIELD = "sdfdata";
+
+ public DocumentChunkManager(
+ IDBInterface database)
+ {
+ super(database, "amazoncloudsearch_documentdata");
+ }
+
+ /** Install the manager
+ * @throws ManifoldCFException
+ */
+ public void install() throws ManifoldCFException
+ {
+ // Standard practice: outer loop on install methods, no transactions
+ while (true)
+ {
+ Map existing = getTableSchema(null,null);
+ if (existing == null)
+ {
+ // Install the table.
+ HashMap map = new HashMap();
+ map.put(UID_FIELD,new ColumnDescription("VARCHAR(40)",false,false,null,null,false));
+ map.put(HOST_FIELD,new ColumnDescription("VARCHAR(255)",false,false,null,null,false));
+ map.put(PATH_FIELD,new ColumnDescription("VARCHAR(255)",false,false,null,null,false));
+ map.put(ON_DELETE_FIELD,new ColumnDescription("CHAR(1)",false,false,null,null,false));
+ map.put(SDF_DATA_FIELD,new ColumnDescription("BLOB",false,true,null,null,false));
+ performCreate(map,null);
+ }
+ else
+ {
+ // Upgrade code, if needed, goes here
+ }
+
+ // Handle indexes, if needed
+ IndexDescription keyIndex = new IndexDescription(true,new String[]{HOST_FIELD,PATH_FIELD,UID_FIELD});
+
+ Map indexes = getTableIndexes(null,null);
+ Iterator iter = indexes.keySet().iterator();
+ while (iter.hasNext())
+ {
+ String indexName = (String)iter.next();
+ IndexDescription id = (IndexDescription)indexes.get(indexName);
+
+ if (keyIndex != null && id.equals(keyIndex))
+ keyIndex = null;
+ else if (indexName.indexOf("_pkey") == -1)
+ // This index shouldn't be here; drop it
+ performRemoveIndex(indexName);
+ }
+
+ // Add the ones we didn't find
+ if (keyIndex != null)
+ performAddIndex(null,keyIndex);
+
+
+ break;
+ }
+ }
+
+ /** Uninstall the manager.
+ */
+ public void deinstall()
+ throws ManifoldCFException
+ {
+ performDrop(null);
+ }
+
+ /**
+ * Remove document information in table (and make a delete marker).
+ * @param uid document uid
+ */
+ public void removeDocument(String uid, String host, String path)
+ throws ManifoldCFException
+ {
+ while (true)
+ {
+ long sleepAmt = 0L;
+ try
+ {
+ beginTransaction();
+ try
+ {
+
+ ArrayList params = new ArrayList();
+ String query = buildConjunctionClause(params,new ClauseDescription[]{
+ new UnitaryClause(HOST_FIELD,host),
+ new UnitaryClause(PATH_FIELD,path),
+ new UnitaryClause(UID_FIELD,uid)});
+
+ IResultSet set = performQuery("SELECT "+UID_FIELD+" FROM "+getTableName()+" WHERE "+
+ query+" FOR UPDATE",params,null,null);
+
+ Map<String,String> parameterMap = new HashMap<String,String>();
+ parameterMap.put(ON_DELETE_FIELD, "1");
+ parameterMap.put(SDF_DATA_FIELD, null);
+
+ //if record exists on table, update record.
+ if(set.getRowCount() > 0)
+ {
+ performUpdate(parameterMap, " WHERE "+query, whereParameters, null);
+ }
+ else
+ {
+ parameterMap.put(UID_FIELD, uid);
+ parameterMap.put(HOST_FIELD, host);
+ parameterMap.put(PATH_FIELD, path);
+ performInsert(parameterMap, null);
+ }
+
+ break;
+ }
+ catch (ManifoldCFException e)
+ {
+ signalRollback();
+ throw e;
+ }
+ catch (RuntimeException e)
+ {
+ signalRollback();
+ throw e;
+ }
+ catch (Error e)
+ {
+ signalRollback();
+ throw e;
+ }
+ finally
+ {
+ endTransaction();
+ }
+ }
+ catch (ManifoldCFException e)
+ {
+ // Look for deadlock and retry if so
+ if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
+ {
+ sleepAmt = getSleepAmt();
+ continue;
+ }
+ throw e;
+ }
+ }
+ }
+
+ /**
+ * Add/replace document information in table.
+ * @param uid documentuid
+ * @param sdfData document SDF data.
+ * @throws ManifoldCFException
+ */
+ public void addOrReplaceDocument(String uid, String host, String path, InputStream sdfData)
+ throws ManifoldCFException, IOException
+ {
+ TempFileInput tfi = null;
+ try
+ {
+ // This downloads all the data from upstream!
+ try
+ {
+ tfi = new TempFileInput(sdfData);
+ }
+ catch (ManifoldCFException e)
+ {
+ if (e.getErrorCode() == ManifoldCFException.INTERRUPTED)
+ throw e;
+ throw new IOException("Fetch failed: "+e.getMessage());
+ }
+
+ while (true)
+ {
+ long sleepAmt = 0L;
+ try
+ {
+ beginTransaction();
+ try
+ {
+
+ ArrayList params = new ArrayList();
+ String query = buildConjunctionClause(params,new ClauseDescription[]{
+ new UnitaryClause(HOST_FIELD,host),
+ new UnitaryClause(PATH_FIELD,path),
+ new UnitaryClause(UID_FIELD,uid)});
+
+ IResultSet set = performQuery("SELECT "+UID_FIELD+" FROM "+getTableName()+" WHERE "+
+ query+" FOR UPDATE",params,null,null);
+
+ Map<String,String> parameterMap = new HashMap<String,String>();
+ parameterMap.put(ON_DELETE_FIELD, "0");
+ parameterMap.put(SDF_DATA_FIELD, tfi);
+
+ //if record exists on table, update record.
+ if(set.getRowCount() > 0)
+ {
+ performUpdate(parameterMap, " WHERE "+query, whereParameters, null);
+ }
+ else
+ {
+ parameterMap.put(UID_FIELD, uid);
+ parameterMap.put(HOST_FIELD, host);
+ parameterMap.put(PATH_FIELD, path);
+ performInsert(parameterMap, null);
+ }
+
+ break;
+ }
+ catch (ManifoldCFException e)
+ {
+ signalRollback();
+ throw e;
+ }
+ catch (RuntimeException e)
+ {
+ signalRollback();
+ throw e;
+ }
+ catch (Error e)
+ {
+ signalRollback();
+ throw e;
+ }
+ finally
+ {
+ endTransaction();
+ }
+ }
+ catch (ManifoldCFException e)
+ {
+ // Look for deadlock and retry if so
+ if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
+ {
+ sleepAmt = getSleepAmt();
+ continue;
+ }
+ throw e;
+ }
+ }
+
+ }
+ finally
+ {
+ if (tfi != null)
+ tfi.discard();
+ }
+ }
+
+ /** Read a chunk of documents.
+ */
+ public DocumentRecord[] readChunk(String host, String path, int maximumNumber)
+ throws ManifoldCFException
+ {
+ ArrayList params = new ArrayList();
+ String query = buildConjunctionClause(params,new ClauseDescription[]{
+ new UnitaryClause(HOST_FIELD,host),
+ new UnitaryClause(PATH_FIELD,path)});
+
+ IResultSet set = performQuery("SELECT * FROM "+getTableName()+" WHERE "+query+" "+constructOffsetLimitClause(0,maximumNumber),params,null,null);
+ DocumentRecord[] rval = new DocumentRecord[set.getRowCount()];
+ for (int i = 0; i < set.getRowCount(); i++)
+ {
+ IResultRow row = set.getRow(i);
+ rval[i] = new DocumentRecord(host,path,
+ (String)row.getValue(UID_FIELD),
+ ((String)row.getValue(ON_DELETE_FIELD)).equals("1"),
+ (BinaryInput)row.getValue(SDF_DATA_FIELD));
+ }
+ return rval;
+ }
+
+ /** Delete the chunk of documents (presumably because we processed them successfully)
+ */
+ public void deleteChunk(DocumentRecord[] records)
+ throws ManifoldCFException
+ {
+ // Do the whole thing in a transaction -- if we mess up, we'll have to try everything again
+ while (true)
+ {
+ long sleepAmt = 0L;
+ try
+ {
+ beginTransaction();
+ try
+ {
+
+ // Theoretically we could aggregate the records, but for now delete one at a time.
+ for (DocumentRecord dr : records)
+ {
+ String host = dr.getHost();
+ String path = dr.getPath();
+ String uid = dr.getUid();
+ ArrayList params = new ArrayList();
+ String query = buildConjunctionClause(params,new ClauseDescription[]{
+ new UnitaryClause(HOST_FIELD,host),
+ new UnitaryClause(PATH_FIELD,path),
+ new UnitaryClause(UID_FIELD,uid)});
+ performDelete("WHERE "+query,params,null,null);
+ }
+
+ break;
+ }
+ catch (ManifoldCFException e)
+ {
+ signalRollback();
+ throw e;
+ }
+ catch (RuntimeException e)
+ {
+ signalRollback();
+ throw e;
+ }
+ catch (Error e)
+ {
+ signalRollback();
+ throw e;
+ }
+ finally
+ {
+ endTransaction();
+ }
+ }
+ catch (ManifoldCFException e)
+ {
+ // Look for deadlock and retry if so
+ if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
+ {
+ sleepAmt = getSleepAmt();
+ continue;
+ }
+ throw e;
+ }
+ }
+
+ }
+
+}
Propchange: manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentChunkManager.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentChunkManager.java
------------------------------------------------------------------------------
svn:keywords = Id
Added: manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentRecord.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentRecord.java?rev=1604042&view=auto
==============================================================================
--- manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentRecord.java (added)
+++ manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentRecord.java Thu Jun 19 22:24:20 2014
@@ -0,0 +1,85 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.manifoldcf.agents.output.amazoncloudsearch;
+
+import org.apache.manifoldcf.core.*;
+
+import java.io.*;
+
+public class DocumentRecord {
+
+ protected final String host;
+ protected final String path;
+ protected final String uid;
+ protected final boolean delete;
+ protected final BinaryInput data;
+
+ public DocumentRecord(String host, String path, String uid, boolean delete, BinaryInput data)
+ {
+ this.host = host;
+ this.path = path;
+ this.uid = uid;
+ this.delete = delete;
+ this.data = data;
+ }
+
+ public String getHost()
+ {
+ return host;
+ }
+
+ public String getPath()
+ {
+ return path;
+ }
+
+ public String getUid()
+ {
+ return uid;
+ }
+
+ public boolean getDelete()
+ {
+ return delete;
+ }
+
+ public long getStreamLength()
+ throws ManifoldCFException
+ {
+ if (data != null)
+ return data.getLength();
+ return 0L;
+ }
+
+ public InputStream getDataStream()
+ throws ManifoldCFException
+ {
+ if (data != null)
+ return data.getStream();
+ return null;
+ }
+
+ public void close()
+ throws ManifoldCFException
+ {
+ if (data != null)
+ data.discard();
+ }
+
+}
Propchange: manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentRecord.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentRecord.java
------------------------------------------------------------------------------
svn:keywords = Id
Propchange: manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/SDFModel.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: manifoldcf/branches/CONNECTORS-954/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/SDFModel.java
------------------------------------------------------------------------------
svn:keywords = Id