You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2010/02/18 15:31:31 UTC
svn commit: r911418 [3/8] - in /incubator/lcf/trunk/modules:
connectors/webcrawler/connector/org/apache/lcf/crawler/connectors/webcrawler/
framework/agents/org/apache/lcf/agents/agentmanager/
framework/agents/org/apache/lcf/agents/incrementalingest/ fr...
Modified: incubator/lcf/trunk/modules/framework/agents/org/apache/lcf/agents/incrementalingest/IncrementalIngester.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/modules/framework/agents/org/apache/lcf/agents/incrementalingest/IncrementalIngester.java?rev=911418&r1=911417&r2=911418&view=diff
==============================================================================
--- incubator/lcf/trunk/modules/framework/agents/org/apache/lcf/agents/incrementalingest/IncrementalIngester.java (original)
+++ incubator/lcf/trunk/modules/framework/agents/org/apache/lcf/agents/incrementalingest/IncrementalIngester.java Thu Feb 18 14:31:31 2010
@@ -35,1653 +35,1429 @@
*/
public class IncrementalIngester extends org.apache.lcf.core.database.BaseTable implements IIncrementalIngester
{
- public static final String _rcsid = "@(#)$Id$";
+ public static final String _rcsid = "@(#)$Id$";
- // Fields
- protected final static String idField = "id";
- protected final static String outputConnNameField = "connectionname";
- protected final static String docKeyField = "dockey";
- protected final static String docURIField = "docuri";
- protected final static String uriHashField = "urihash";
- protected final static String lastVersionField = "lastversion";
- protected final static String lastOutputVersionField = "lastoutputversion";
- protected final static String changeCountField = "changecount";
- protected final static String firstIngestField = "firstingest";
- protected final static String lastIngestField = "lastingest";
- protected final static String authorityNameField = "authorityname";
-
- // Thread context.
- protected IThreadContext threadContext;
- // Lock manager.
- protected ILockManager lockManager;
- // Output connection manager
- protected IOutputConnectionManager connectionManager;
-
- // Analyze tracker
- protected static AnalyzeTracker tracker = new AnalyzeTracker();
-
- /** Constructor.
- */
- public IncrementalIngester(IThreadContext threadContext, IDBInterface database)
- throws LCFException
- {
- super(database,"ingeststatus");
- this.threadContext = threadContext;
- lockManager = LockManagerFactory.make(threadContext);
- connectionManager = OutputConnectionManagerFactory.make(threadContext);
- }
-
- /** Install the incremental ingestion manager.
- */
- public void install()
- throws LCFException
- {
- String outputConnectionTableName = connectionManager.getTableName();
- String outputConnectionNameField = connectionManager.getConnectionNameColumn();
-
- // We need an outer loop, in case we run into trouble during unique index creation. Should that happen, we would need to fix up the table
- // and try again.
- while (true)
- {
- // Schema first
- boolean clearTable = false;
-
- beginTransaction();
- try
- {
-
- // As of postgresql 8.x, it is possible to create indices on LONGTEXT columns that do not cause large value inserts (>1666 characters) to fail.
- // As a result, a significant schema revision is possible, eliminating the hash fields, and installing appropriate indexes with the proper constraints.
-
- Map existing = getTableSchema(null,null);
- if (existing == null)
- {
- HashMap map = new HashMap();
- map.put(idField,new ColumnDescription("BIGINT",true,false,null,null,false));
- map.put(outputConnNameField,new ColumnDescription("VARCHAR(32)",false,false,outputConnectionTableName,outputConnectionNameField,false));
- map.put(docKeyField,new ColumnDescription("VARCHAR(73)",false,false,null,null,false));
- // The document URI field, if null, indicates that the document was not actually ingested!
- // This happens when a connector wishes to keep track of a version string, but not actually ingest the doc.
- map.put(docURIField,new ColumnDescription("LONGTEXT",false,true,null,null,false));
- map.put(uriHashField,new ColumnDescription("VARCHAR(40)",false,true,null,null,false));
- map.put(lastVersionField,new ColumnDescription("LONGTEXT",false,true,null,null,false));
- map.put(lastOutputVersionField,new ColumnDescription("LONGTEXT",false,true,null,null,false));
- map.put(changeCountField,new ColumnDescription("BIGINT",false,false,null,null,false));
- map.put(firstIngestField,new ColumnDescription("BIGINT",false,false,null,null,false));
- map.put(lastIngestField,new ColumnDescription("BIGINT",false,false,null,null,false));
- map.put(authorityNameField,new ColumnDescription("VARCHAR(32)",false,true,null,null,false));
- // map.put(documentTemplateField,new ColumnDescription("LONGTEXT",false,true,null,null,false));
- performCreate(map,null);
- }
- else
- {
- // Need to revamp to convert to new schema!! (Get rid of the "docid" field, for instance.)
- ColumnDescription cd;
-
- // Add the output connection name field
- cd = (ColumnDescription)existing.get(outputConnNameField);
- if (cd == null)
- {
- // This is temporary; inserting a column will only work if there are no existing rows.
- // MHL
- HashMap map = new HashMap();
- map.put(outputConnNameField,new ColumnDescription("VARCHAR(32)",false,false,outputConnectionTableName,outputConnectionNameField,false));
- performAlter(map,null,null,null);
- }
- // Add the last output version field
- cd = (ColumnDescription)existing.get(lastOutputVersionField);
- if (cd == null)
- {
- HashMap map = new HashMap();
- map.put(lastOutputVersionField,new ColumnDescription("LONGTEXT",false,true,null,null,false));
- performAlter(map,null,null,null);
- }
- // Make sure docuri field is nullable
- cd = (ColumnDescription)existing.get(docURIField);
- if (!cd.getIsNull())
- {
- HashMap map = new HashMap();
- map.put(docURIField,new ColumnDescription("LONGTEXT",false,true,null,null,false));
- performAlter(null,map,null,null);
- }
-
- cd = (ColumnDescription)existing.get(docKeyField);
- if (cd == null)
- {
- // Create a doc key field from the existing doc id field.
- HashMap map = new HashMap();
- map.put(docKeyField,new ColumnDescription("VARCHAR(73)",false,true,null,null,false));
- performAlter(map,null,null,null);
-
- // Now, fill in values, before we alter the column to make it be non-nullable.
- while (true)
- {
- IResultSet set = performQuery("SELECT "+idField+",docid FROM "+
- getTableName()+" WHERE "+docKeyField+" IS NULL OR character_length("+docKeyField+") < 40 LIMIT 1000",null,null,null);
- if (set.getRowCount() == 0)
- break;
- int i = 0;
- while (i < set.getRowCount())
- {
- IResultRow row = set.getRow(i++);
- Long id = (Long)row.getValue(idField);
- String docID = (String)row.getValue("docid");
- // Split the docID into class and doc id
- int index = docID.indexOf(":");
- if (index == -1)
- throw new LCFException("Unexpected docID form during upgrade of ingeststatus: "+docID);
- String docClass = docID.substring(0,index);
- String docIdentifier = docID.substring(index+1);
-
- HashMap newMap = new HashMap();
- newMap.put(docKeyField,makeKey(docClass,LCF.hash(docIdentifier)));
- ArrayList newList = new ArrayList();
- newList.add(id);
- performUpdate(newMap,"WHERE "+idField+"=?",newList,null);
- }
- }
-
- // Make the column non-nullable
- // Hmm. It turns out that postgresql doesn't like this, because the data inserted above has not yet been committed!!
- // Instead, we'll have to notice that the field is nullable OUTSIDE the transaction, and alter it there...
-
- //map = new HashMap();
- //map.put(docKeyField,new ColumnDescription("VARCHAR(73)",false,false,null,null,false));
- //performAlter(null,map,null,null);
- }
-
- cd = (ColumnDescription)existing.get("dochash");
- if (cd != null)
- {
- // Delete this column
- ArrayList list = new ArrayList();
- list.add("dochash");
- performAlter(null,null,list,null);
- }
-
- cd = (ColumnDescription)existing.get("docid");
- if (cd != null)
- {
- // Delete this column
- ArrayList list = new ArrayList();
- list.add("docid");
- performAlter(null,null,list,null);
- }
-
- // Build uri hash field, if it doesn't exist properly
- cd = (ColumnDescription)existing.get(uriHashField);
- if (cd == null)
- {
- HashMap map = new HashMap();
- map.put(uriHashField,new ColumnDescription("VARCHAR(40)",false,true,null,null,false));
- performAlter(map,null,null,null);
- while (true)
- {
- IResultSet set = performQuery("SELECT "+idField+","+docURIField+" FROM "+
- getTableName()+" WHERE character_length("+uriHashField+") IS NULL AND "+
- docURIField+" IS NOT NULL LIMIT 1000",null,null,null);
- if (set.getRowCount() == 0)
- break;
- int i = 0;
- while (i < set.getRowCount())
- {
- IResultRow row = set.getRow(i++);
- Long id = (Long)row.getValue(idField);
- String docURI = (String)row.getValue(docURIField);
- HashMap newMap = new HashMap();
- newMap.put(uriHashField,LCF.hash(docURI));
- ArrayList newList = new ArrayList();
- newList.add(id);
- performUpdate(newMap,"WHERE "+idField+"=?",newList,null);
- }
- }
- }
-
- cd = (ColumnDescription)existing.get(authorityNameField);
- if (cd == null)
- {
- HashMap map = new HashMap();
- map.put(authorityNameField,new ColumnDescription("VARCHAR(32)",false,true,null,null,false));
- performAlter(map,null,null,null);
- clearTable = true;
- }
-
- cd = (ColumnDescription)existing.get("documenttemplate");
- if (cd != null)
- {
- ArrayList list = new ArrayList();
- list.add("documenttemplate");
- performAlter(null,null,list,null);
- }
- }
- }
- catch (LCFException e)
- {
- signalRollback();
- throw e;
- }
- catch (Error e)
- {
- signalRollback();
- throw e;
- }
- finally
- {
- endTransaction();
- }
-
- // Get the schema again, and make sure that the docKeyField column is not nullable. This *must* be outside of the above transaction, because otherwise Postgresql doesn't
- // necessarily see the transaction's data.
- Map currentSchema = getTableSchema(null,null);
- ColumnDescription currentCD = (ColumnDescription)currentSchema.get(docKeyField);
- if (currentCD.getIsNull())
- {
- // Make it non-nullable.
- HashMap map = new HashMap();
- map.put(docKeyField,new ColumnDescription("VARCHAR(73)",false,false,null,null,false));
- performAlter(null,map,null,null);
- }
-
- // Drop the "ingeststatuscollections" table, if it exists.
- // MHL
-
- if (clearTable)
- {
- // We upgraded the ingeststatus table family in a way that requires us to forget all information about previous ingestions,
- // so get rid of that information. This will force all documents to be reingested, but that's what we want in this case.
- performDelete("",null,null);
- }
-
- // Now, do indexes
- IndexDescription keyIndex = new IndexDescription(true,new String[]{outputConnNameField,docKeyField});
- IndexDescription uriHashIndex = new IndexDescription(false,new String[]{uriHashField});
-
- // Get rid of indexes that shouldn't be there
- Map indexes = getTableIndexes(null,null);
- Iterator iter = indexes.keySet().iterator();
- while (iter.hasNext())
- {
- String indexName = (String)iter.next();
- IndexDescription id = (IndexDescription)indexes.get(indexName);
-
- if (keyIndex != null && id.equals(keyIndex))
- keyIndex = null;
- else if (uriHashIndex != null && id.equals(uriHashIndex))
- uriHashIndex = null;
- else if (indexName.indexOf("_pkey") == -1)
- // This index shouldn't be here; drop it
- performRemoveIndex(indexName);
- }
-
- // Add the ones we didn't find
- if (uriHashIndex != null)
- performAddIndex(null,uriHashIndex);
-
- if (keyIndex != null)
- {
- // This index create may fail
- try
- {
- performAddIndex(null,keyIndex);
- }
- catch (LCFException e)
- {
- if (e.getMessage().indexOf("could not create unique index") == -1)
- throw e;
- removeDuplicates();
- continue;
- }
- }
-
- // All done
- break;
- }
-
- }
-
- /** Remove duplicates (as part of upgrade) */
- protected void removeDuplicates()
- throws LCFException
- {
- // If we get here, it means we have to clean duplicates out of the table and try again.
- // If there *are* duplicates for an ID, we basically don't know anything about it really, so we blindly get rid of all rows corresponding to
- // duplicate ids.
-
-
- // The query I want to do to discover duplicate keys is:
- // DELETE FROM ingeststatus WHERE docid IN (SELECT t0.docid FROM (SELECT COUNT(*) AS countval,docid FROM ingeststatus GROUP BY docid) t0 WHERE t0.countval>1)
- Logging.ingest.warn("Found duplicate rows in ingeststatus table; correcting!");
-
- // First, create a temporary non-unique index that we intend to remove at the end of this process. We need this index in order to be able to
- // do the group-by efficiently.
- performAddIndex("temp_index_ingeststatus",new IndexDescription(false,new String[]{docKeyField}));
-
- performDelete("WHERE "+docKeyField+" IN (SELECT t0."+docKeyField+" FROM (SELECT COUNT(*) AS countval,"+docKeyField+" FROM "+
- getTableName()+" GROUP BY "+docKeyField+") t0 WHERE t0.countval>1)",null,null);
-
- // Drop the temporary index
- performRemoveIndex("temp_index_ingeststatus");
-
- Logging.ingest.warn("Duplicate rows in ingeststatus table corrected.");
- }
-
- /** Come up with a maximum time (in minutes) for re-analyzing tables.
- *@Return the time, in minutes.
- */
- public int getAnalyzeTime()
- throws LCFException
- {
- // For this table, we base the wait time on the number of rows in it.
- IResultSet set = performQuery("SELECT COUNT("+idField+") FROM "+getTableName(),null,null,null);
- if (set.getRowCount() < 1)
- throw new LCFException("Expected result with one row");
- IResultRow row = set.getRow(0);
- Iterator columnNames = row.getColumns();
- if (!columnNames.hasNext())
- throw new LCFException("Expected result with one column");
- String columnName = (String)columnNames.next();
- long value = new Long(row.getValue(columnName).toString()).longValue();
- if (value < 10000L)
- return 5;
- else if (value < 100000L)
- return 2*60;
- else
- return 24*60;
-
- }
-
- /** Analyze database tables.
- */
- public void analyzeTables()
- throws LCFException
- {
- analyzeTable();
- }
-
- /** Uninstall the incremental ingestion manager.
- */
- public void deinstall()
- throws LCFException
- {
- performDrop(null);
- }
-
- /** Flush all knowledge of what was ingested before.
- */
- public void clearAll()
- throws LCFException
- {
- performDelete("",null,null);
- }
-
- /** Record a document version, but don't ingest it.
- * The purpose of this method is to keep track of the frequency at which ingestion "attempts" take place.
- * ServiceInterruption is thrown if this action must be rescheduled.
- *@param outputConnectionName is the name of the output connection associated with this action.
- *@param identifierClass is the name of the space in which the identifier hash should be interpreted.
- *@param identifierHash is the hashed document identifier.
- *@param documentVersion is the document version.
- *@param recordTime is the time at which the recording took place, in milliseconds since epoch.
- *@param activities is the object used in case a document needs to be removed from the output index as the result of this operation.
- */
- public void documentRecord(String outputConnectionName,
+ // Fields
+ protected final static String idField = "id";
+ protected final static String outputConnNameField = "connectionname";
+ protected final static String docKeyField = "dockey";
+ protected final static String docURIField = "docuri";
+ protected final static String uriHashField = "urihash";
+ protected final static String lastVersionField = "lastversion";
+ protected final static String lastOutputVersionField = "lastoutputversion";
+ protected final static String changeCountField = "changecount";
+ protected final static String firstIngestField = "firstingest";
+ protected final static String lastIngestField = "lastingest";
+ protected final static String authorityNameField = "authorityname";
+
+ // Thread context.
+ protected IThreadContext threadContext;
+ // Lock manager.
+ protected ILockManager lockManager;
+ // Output connection manager
+ protected IOutputConnectionManager connectionManager;
+
+ // Analyze tracker
+ protected static AnalyzeTracker tracker = new AnalyzeTracker();
+
+ /** Constructor.
+ */
+ public IncrementalIngester(IThreadContext threadContext, IDBInterface database)
+ throws LCFException
+ {
+ super(database,"ingeststatus");
+ this.threadContext = threadContext;
+ lockManager = LockManagerFactory.make(threadContext);
+ connectionManager = OutputConnectionManagerFactory.make(threadContext);
+ }
+
+ /** Install the incremental ingestion manager.
+ */
+ public void install()
+ throws LCFException
+ {
+ String outputConnectionTableName = connectionManager.getTableName();
+ String outputConnectionNameField = connectionManager.getConnectionNameColumn();
+
+ // We always include an outer loop, because some upgrade conditions require retries.
+ while (true)
+ {
+ // Postgresql has a limitation on the number of characters that can be indexed in a column. So we use hashes instead.
+ Map existing = getTableSchema(null,null);
+ if (existing == null)
+ {
+ HashMap map = new HashMap();
+ map.put(idField,new ColumnDescription("BIGINT",true,false,null,null,false));
+ map.put(outputConnNameField,new ColumnDescription("VARCHAR(32)",false,false,outputConnectionTableName,outputConnectionNameField,false));
+ map.put(docKeyField,new ColumnDescription("VARCHAR(73)",false,false,null,null,false));
+ // The document URI field, if null, indicates that the document was not actually ingested!
+ // This happens when a connector wishes to keep track of a version string, but not actually ingest the doc.
+ map.put(docURIField,new ColumnDescription("LONGTEXT",false,true,null,null,false));
+ map.put(uriHashField,new ColumnDescription("VARCHAR(40)",false,true,null,null,false));
+ map.put(lastVersionField,new ColumnDescription("LONGTEXT",false,true,null,null,false));
+ map.put(lastOutputVersionField,new ColumnDescription("LONGTEXT",false,true,null,null,false));
+ map.put(changeCountField,new ColumnDescription("BIGINT",false,false,null,null,false));
+ map.put(firstIngestField,new ColumnDescription("BIGINT",false,false,null,null,false));
+ map.put(lastIngestField,new ColumnDescription("BIGINT",false,false,null,null,false));
+ map.put(authorityNameField,new ColumnDescription("VARCHAR(32)",false,true,null,null,false));
+ performCreate(map,null);
+ }
+ else
+ {
+ // This is where any schema upgrade code must go, should it be needed.
+ }
+
+ // Now, do indexes
+ IndexDescription keyIndex = new IndexDescription(true,new String[]{outputConnNameField,docKeyField});
+ IndexDescription uriHashIndex = new IndexDescription(false,new String[]{uriHashField});
+
+ // Get rid of indexes that shouldn't be there
+ Map indexes = getTableIndexes(null,null);
+ Iterator iter = indexes.keySet().iterator();
+ while (iter.hasNext())
+ {
+ String indexName = (String)iter.next();
+ IndexDescription id = (IndexDescription)indexes.get(indexName);
+
+ if (keyIndex != null && id.equals(keyIndex))
+ keyIndex = null;
+ else if (uriHashIndex != null && id.equals(uriHashIndex))
+ uriHashIndex = null;
+ else if (indexName.indexOf("_pkey") == -1)
+ // This index shouldn't be here; drop it
+ performRemoveIndex(indexName);
+ }
+
+ // Add the ones we didn't find
+ if (uriHashIndex != null)
+ performAddIndex(null,uriHashIndex);
+
+ if (keyIndex != null)
+ performAddIndex(null,keyIndex);
+
+ // All done; break out of loop
+ break;
+ }
+
+ }
+
+ /** Come up with a maximum time (in minutes) for re-analyzing tables.
+ *@Return the time, in minutes.
+ */
+ public int getAnalyzeTime()
+ throws LCFException
+ {
+ // For this table, we base the wait time on the number of rows in it.
+ IResultSet set = performQuery("SELECT COUNT("+idField+") FROM "+getTableName(),null,null,null);
+ if (set.getRowCount() < 1)
+ throw new LCFException("Expected result with one row");
+ IResultRow row = set.getRow(0);
+ Iterator columnNames = row.getColumns();
+ if (!columnNames.hasNext())
+ throw new LCFException("Expected result with one column");
+ String columnName = (String)columnNames.next();
+ long value = new Long(row.getValue(columnName).toString()).longValue();
+ if (value < 10000L)
+ return 5;
+ else if (value < 100000L)
+ return 2*60;
+ else
+ return 24*60;
+
+ }
+
+ /** Analyze database tables.
+ */
+ public void analyzeTables()
+ throws LCFException
+ {
+ analyzeTable();
+ }
+
+ /** Uninstall the incremental ingestion manager.
+ */
+ public void deinstall()
+ throws LCFException
+ {
+ performDrop(null);
+ }
+
+ /** Flush all knowledge of what was ingested before.
+ */
+ public void clearAll()
+ throws LCFException
+ {
+ performDelete("",null,null);
+ }
+
+ /** Record a document version, but don't ingest it.
+ * The purpose of this method is to keep track of the frequency at which ingestion "attempts" take place.
+ * ServiceInterruption is thrown if this action must be rescheduled.
+ *@param outputConnectionName is the name of the output connection associated with this action.
+ *@param identifierClass is the name of the space in which the identifier hash should be interpreted.
+ *@param identifierHash is the hashed document identifier.
+ *@param documentVersion is the document version.
+ *@param recordTime is the time at which the recording took place, in milliseconds since epoch.
+ *@param activities is the object used in case a document needs to be removed from the output index as the result of this operation.
+ */
+ public void documentRecord(String outputConnectionName,
String identifierClass, String identifierHash,
- String documentVersion,
- long recordTime, IOutputActivity activities)
- throws LCFException, ServiceInterruption
- {
- IOutputConnection connection = connectionManager.load(outputConnectionName);
-
- String docKey = makeKey(identifierClass,identifierHash);
-
- if (Logging.ingest.isDebugEnabled())
- {
- Logging.ingest.debug("Recording document '"+docKey+"' for output connection '"+outputConnectionName+"'");
- }
-
- performIngestion(connection,docKey,documentVersion,null,null,null,recordTime,null,activities);
- }
-
- /** Ingest a document.
- * This ingests the document, and notes it. If this is a repeat ingestion of the document, this
- * method also REMOVES ALL OLD METADATA. When complete, the index will contain only the metadata
- * described by the RepositoryDocument object passed to this method.
- * ServiceInterruption is thrown if the document ingestion must be rescheduled.
- *@param outputConnectionName is the name of the output connection associated with this action.
- *@param identifierClass is the name of the space in which the identifier hash should be interpreted.
- *@param identifierHash is the hashed document identifier.
- *@param documentVersion is the document version.
- *@param outputVersion is the output version string constructed from the output specification by the output connector.
- *@param authorityName is the name of the authority associated with the document, if any.
- *@param data is the document data. The data is closed after ingestion is complete.
- *@param ingestTime is the time at which the ingestion took place, in milliseconds since epoch.
- *@param documentURI is the URI of the document, which will be used as the key of the document in the index.
- *@param activities is an object providing a set of methods that the implementer can use to perform the operation.
- *@return true if the ingest was ok, false if the ingest is illegal (and should not be repeated).
- */
- public boolean documentIngest(String outputConnectionName,
- String identifierClass, String identifierHash,
- String documentVersion,
- String outputVersion,
- String authorityName,
- RepositoryDocument data,
- long ingestTime, String documentURI,
- IOutputActivity activities)
- throws LCFException, ServiceInterruption
- {
- IOutputConnection connection = connectionManager.load(outputConnectionName);
-
- String docKey = makeKey(identifierClass,identifierHash);
-
- if (Logging.ingest.isDebugEnabled())
- {
- Logging.ingest.debug("Ingesting document '"+docKey+"' into output connection '"+outputConnectionName+"'");
- }
-
- return performIngestion(connection,docKey,documentVersion,outputVersion,authorityName,
- data,ingestTime,documentURI,activities);
- }
-
- /** Do the actual ingestion, or just record it if there's nothing to ingest. */
- protected boolean performIngestion(IOutputConnection connection,
- String docKey, String documentVersion, String outputVersion,
- String authorityNameString,
- RepositoryDocument data,
- long ingestTime, String documentURI,
- IOutputActivity activities)
- throws LCFException, ServiceInterruption
- {
- // No transactions; not safe because post may take too much time
-
- // First, calculate a document uri hash value
- String documentURIHash = null;
- if (documentURI != null)
- documentURIHash = LCF.hash(documentURI);
-
- // See what uri was used before for this doc, if any
- ArrayList list = new ArrayList();
- list.add(docKey);
- list.add(connection.getName());
- IResultSet set = performQuery("SELECT "+docURIField+","+uriHashField+","+lastOutputVersionField+" FROM "+getTableName()+
- " WHERE "+docKeyField+"=? AND "+outputConnNameField+"=?",list,null,null);
-
- String oldURI = null;
- String oldURIHash = null;
- String oldOutputVersion = null;
- if (set.getRowCount() > 0)
- {
- IResultRow row = set.getRow(0);
- oldURI = (String)row.getValue(docURIField);
- oldURIHash = (String)row.getValue(uriHashField);
- oldOutputVersion = (String)row.getValue(lastOutputVersionField);
- }
-
- // If uri hashes collide, then we must be sure to eliminate only the *correct* records from the table, or we will leave
- // dangling documents around. So, all uri searches and comparisons MUST compare the actual uri as well.
-
- // But, since we need to insure that any given URI is only worked on by one thread at a time, use critical sections
- // to block the rare case that multiple threads try to work on the same URI.
- int uriCount = 0;
- if (documentURI != null)
- uriCount++;
- if (oldURI != null && (documentURI == null || !documentURI.equals(oldURI)))
- uriCount++;
- String[] lockArray = new String[uriCount];
- uriCount = 0;
- if (documentURI != null)
- lockArray[uriCount++] = connection.getName()+":"+documentURI;
- if (oldURI != null && (documentURI == null || !documentURI.equals(oldURI)))
- lockArray[uriCount++] = connection.getName()+":"+oldURI;
-
- lockManager.enterCriticalSections(null,null,lockArray);
- try
- {
-
- if (oldURI != null && (documentURI == null || !oldURI.equals(documentURI)))
- {
- // Delete all records from the database that match the old URI, except for THIS record.
- list.clear();
- list.add(oldURIHash);
- list.add(oldURI);
- list.add(docKey);
- list.add(connection.getName());
- performDelete("WHERE "+uriHashField+"=? AND "+docURIField+"=? AND "+docKeyField+"!=? AND "+outputConnNameField+"=?",list,null);
- removeDocument(connection,oldURI,oldOutputVersion,activities);
- }
-
- if (documentURI != null)
- {
- // Get rid of all records that match the NEW uri, except for this record.
- list.clear();
- list.add(documentURIHash);
- list.add(documentURI);
- list.add(docKey);
- list.add(connection.getName());
- performDelete("WHERE "+uriHashField+"=? AND "+docURIField+"=? AND "+docKeyField+"!=? AND "+outputConnNameField+"=?",list,null);
- }
-
- // Now, we know we are ready for the ingest.
- if (documentURI != null)
- {
- // Here are the cases:
- // 1) There was a service interruption before the upload started.
- // (In that case, we don't need to log anything, just reschedule).
- // 2) There was a service interruption after the document was transmitted.
- // (In that case, we should presume that the document was ingested, but
- // reschedule another import anyway.)
- // 3) Everything went OK
- // (need to log the ingestion.)
- // 4) Everything went OK, but we were told we have an illegal document.
- // (We note the ingestion because if we don't we will be forced to repeat ourselves.
- // In theory, document doesn't need to be deleted, but there is no way to signal
- // that at the moment.)
-
- // Note an ingestion before we actually try it.
- // This is a marker that says "something is there"; it has an empty version, which indicates
- // that we don't know anything about it. That means it will be reingested when the
- // next version comes along, and will be deleted if called for also.
- noteDocumentIngest(connection.getName(),docKey,null,null,null,ingestTime,documentURI,documentURIHash);
- int result = addOrReplaceDocument(connection,documentURI,outputVersion,data,authorityNameString,activities);
- noteDocumentIngest(connection.getName(),docKey,documentVersion,outputVersion,authorityNameString,ingestTime,documentURI,documentURIHash);
- return result == IOutputConnector.DOCUMENTSTATUS_ACCEPTED;
- }
-
- // If we get here, it means we are noting that the document was examined, but that no change was required. This is signaled
- // to noteDocumentIngest by having the null documentURI.
- noteDocumentIngest(connection.getName(),docKey,documentVersion,outputVersion,authorityNameString,ingestTime,null,null);
- return true;
- }
- finally
- {
- lockManager.leaveCriticalSections(null,null,lockArray);
- }
- }
-
- /** Note the fact that we checked a document (and found that it did not need to be ingested, because the
- * versions agreed).
- *@param outputConnectionName is the name of the output connection associated with this action.
- *@param identifierClasses are the names of the spaces in which the identifier hashes should be interpreted.
- *@param identifierHashes are the set of document identifier hashes.
- *@param checkTime is the time at which the check took place, in milliseconds since epoch.
- */
- public void documentCheckMultiple(String outputConnectionName,
- String[] identifierClasses, String[] identifierHashes,
- long checkTime)
- throws LCFException
- {
- beginTransaction();
- try
- {
- int maxInClause = getMaxInClause();
-
- HashMap docIDValues = new HashMap();
- int j = 0;
- while (j < identifierHashes.length)
- {
- String docDBString = makeKey(identifierClasses[j],identifierHashes[j]);
- docIDValues.put(docDBString,docDBString);
- j++;
- }
-
- // Now, perform n queries, each of them no larger the maxInClause in length.
- // Create a list of row id's from this.
- HashMap rowIDSet = new HashMap();
- Iterator iter = docIDValues.keySet().iterator();
- j = 0;
- ArrayList list = new ArrayList();
- StringBuffer sb = new StringBuffer();
- while (iter.hasNext())
- {
- if (j == maxInClause)
- {
- findRowIdsForDocIds(outputConnectionName,rowIDSet,list,sb.toString());
- list.clear();
- sb.setLength(0);
- j = 0;
- }
- String idValue = (String)iter.next();
- if (j > 0)
- sb.append(',');
- sb.append('?');
- list.add(idValue);
- j++;
- }
-
- if (j > 0)
- findRowIdsForDocIds(outputConnectionName,rowIDSet,list,sb.toString());
-
- // Now, break row id's into chunks too; submit one chunk at a time
- j = 0;
- list.clear();
- sb.setLength(0);
- iter = rowIDSet.keySet().iterator();
- while (iter.hasNext())
- {
- if (j == maxInClause)
- {
- updateRowIds(list,sb.toString(),checkTime);
- list.clear();
- sb.setLength(0);
- j = 0;
- }
- Long idValue = (Long)iter.next();
- if (j > 0)
- sb.append(',');
- sb.append('?');
- list.add(idValue);
- j++;
- }
-
- if (j > 0)
- updateRowIds(list,sb.toString(),checkTime);
- }
- catch (LCFException e)
- {
- signalRollback();
- throw e;
- }
- catch (Error e)
- {
- signalRollback();
- throw e;
- }
- finally
- {
- endTransaction();
- }
- }
-
- /** Note the fact that we checked a document (and found that it did not need to be ingested, because the
- * versions agreed).
- *@param outputConnectionName is the name of the output connection associated with this action.
- *@param identifierClass is the name of the space in which the identifier hash should be interpreted.
- *@param identifierHash is the hashed document identifier.
- *@param checkTime is the time at which the check took place, in milliseconds since epoch.
- */
- public void documentCheck(String outputConnectionName,
- String identifierClass, String identifierHash,
- long checkTime)
- throws LCFException
- {
- documentCheckMultiple(outputConnectionName,new String[]{identifierClass},new String[]{identifierHash},checkTime);
- }
-
- /** Update a chunk of row ids.
- */
- protected void updateRowIds(ArrayList list, String queryPart, long checkTime)
- throws LCFException
- {
- HashMap map = new HashMap();
- map.put(lastIngestField,new Long(checkTime));
- performUpdate(map,"WHERE "+idField+" IN ("+queryPart+")",list,null);
- }
-
- /** Delete multiple documents from the search engine index.
- *@param outputConnectionNames are the names of the output connections associated with this action.
- *@param identifierClasses are the names of the spaces in which the identifier hashes should be interpreted.
- *@param identifierHashes is tha array of document identifier hashes if the documents.
- *@param activities is the object to use to log the details of the ingestion attempt. May be null.
- */
- public void documentDeleteMultiple(String[] outputConnectionNames,
- String[] identifierClasses, String[] identifierHashes,
- IOutputRemoveActivity activities)
- throws LCFException, ServiceInterruption
- {
- // Segregate request by connection names
- HashMap keyMap = new HashMap();
- int i = 0;
- while (i < outputConnectionNames.length)
- {
- String outputConnectionName = outputConnectionNames[i];
- ArrayList list = (ArrayList)keyMap.get(outputConnectionName);
- if (list == null)
- {
- list = new ArrayList();
- keyMap.put(outputConnectionName,list);
- }
- list.add(new Integer(i));
- i++;
- }
-
- // Create the return array.
- Iterator iter = keyMap.keySet().iterator();
- while (iter.hasNext())
- {
- String outputConnectionName = (String)iter.next();
- ArrayList list = (ArrayList)keyMap.get(outputConnectionName);
- String[] localIdentifierClasses = new String[list.size()];
- String[] localIdentifierHashes = new String[list.size()];
- i = 0;
- while (i < localIdentifierClasses.length)
- {
- int index = ((Integer)list.get(i)).intValue();
- localIdentifierClasses[i] = identifierClasses[index];
- localIdentifierHashes[i] = identifierHashes[index];
- i++;
- }
- documentDeleteMultiple(outputConnectionName,localIdentifierClasses,localIdentifierHashes,activities);
- }
- }
-
- /** Delete multiple documents from the search engine index.
- *@param outputConnectionName is the name of the output connection associated with this action.
- *@param identifierClasses are the names of the spaces in which the identifier hashes should be interpreted.
- *@param identifierHashes is tha array of document identifier hashes if the documents.
- *@param activities is the object to use to log the details of the ingestion attempt. May be null.
- */
- public void documentDeleteMultiple(String outputConnectionName,
- String[] identifierClasses, String[] identifierHashes,
- IOutputRemoveActivity activities)
- throws LCFException, ServiceInterruption
- {
- IOutputConnection connection = connectionManager.load(outputConnectionName);
-
- if (Logging.ingest.isDebugEnabled())
- {
- int i = 0;
- while (i < identifierHashes.length)
- {
- Logging.ingest.debug("Request to delete document '"+makeKey(identifierClasses[i],identifierHashes[i])+"' from output connection '"+outputConnectionName+"'");
- i++;
- }
- }
-
- // No transactions. Time for the operation may exceed transaction timeout.
-
- // Obtain the current URIs of all of these.
- DeleteInfo[] uris = getDocumentURIMultiple(outputConnectionName,identifierClasses,identifierHashes);
-
- // Grab critical section locks so that we can't attempt to ingest at the same time we are deleting.
- // (This guarantees that when this operation is complete the database reflects reality.)
- int validURIcount = 0;
- int i = 0;
- while (i < uris.length)
- {
- if (uris[i] != null && uris[i].getURI() != null)
- validURIcount++;
- i++;
- }
- String[] lockArray = new String[validURIcount];
- String[] validURIArray = new String[validURIcount];
- validURIcount = 0;
- i = 0;
- while (i < uris.length)
- {
- if (uris[i] != null && uris[i].getURI() != null)
- {
- validURIArray[validURIcount] = uris[i].getURI();
- lockArray[validURIcount] = outputConnectionName+":"+validURIArray[validURIcount];
- validURIcount++;
- }
- i++;
- }
-
- lockManager.enterCriticalSections(null,null,lockArray);
- try
- {
- // Fetch the document URIs for the listed documents
- int j = 0;
- while (j < uris.length)
- {
- if (uris[j] != null && uris[j].getURI() != null)
- removeDocument(connection,uris[j].getURI(),uris[j].getOutputVersion(),activities);
- j++;
- }
-
- // Now, get rid of all rows that match the given uris.
- // Do the queries together, then the deletes
- beginTransaction();
- try
- {
- // The basic process is this:
- // 1) Come up with a set of urihash values
- // 2) Find the matching, corresponding id values
- // 3) Delete the rows corresponding to the id values, in sequence
-
- // Process (1 & 2) has to be broken down into chunks that contain the maximum
- // number of doc hash values each. We need to avoid repeating doc hash values,
- // so the first step is to come up with ALL the doc hash values before looping
- // over them.
-
- int maxInClause = getMaxInClause();
-
- // Find all the documents that match this set of URIs
- HashMap docURIHashValues = new HashMap();
- HashMap docURIValues = new HashMap();
- j = 0;
- while (j < validURIArray.length)
- {
- String docDBString = validURIArray[j++];
- String docDBHashString = LCF.hash(docDBString);
- docURIValues.put(docDBString,docDBString);
- docURIHashValues.put(docDBHashString,docDBHashString);
- }
-
- // Now, perform n queries, each of them no larger the maxInClause in length.
- // Create a list of row id's from this.
- HashMap rowIDSet = new HashMap();
- Iterator iter = docURIHashValues.keySet().iterator();
- j = 0;
- ArrayList hashList = new ArrayList();
- StringBuffer sb = new StringBuffer();
- while (iter.hasNext())
- {
- if (j == maxInClause)
- {
- findRowIdsForURIs(outputConnectionName,rowIDSet,docURIValues,hashList,sb.toString());
- hashList.clear();
- sb.setLength(0);
- j = 0;
- }
- String hashValue = (String)iter.next();
- if (j > 0)
- sb.append(',');
- sb.append('?');
- hashList.add(hashValue);
- j++;
- }
-
- if (j > 0)
- findRowIdsForURIs(outputConnectionName,rowIDSet,docURIValues,hashList,sb.toString());
-
- // Next, go through the list of row IDs, and delete them in chunks
- j = 0;
- ArrayList list = new ArrayList();
- sb.setLength(0);
- iter = rowIDSet.keySet().iterator();
- while (iter.hasNext())
- {
- if (j == maxInClause)
- {
- deleteRowIds(list,sb.toString());
- list.clear();
- sb.setLength(0);
- j = 0;
- }
- Long idValue = (Long)iter.next();
- if (j > 0)
- sb.append(',');
- sb.append('?');
- list.add(idValue);
- j++;
- }
-
- if (j > 0)
- deleteRowIds(list,sb.toString());
-
- // Now, find the set of documents that remain that match the document identifiers.
- HashMap docIdValues = new HashMap();
- j = 0;
- while (j < identifierHashes.length)
- {
- String docDBString = makeKey(identifierClasses[j],identifierHashes[j]);
- docIdValues.put(docDBString,docDBString);
- j++;
- }
-
- // Now, perform n queries, each of them no larger the maxInClause in length.
- // Create a list of row id's from this.
- rowIDSet.clear();
- iter = docIdValues.keySet().iterator();
- j = 0;
- list.clear();
- sb.setLength(0);
- while (iter.hasNext())
- {
- if (j == maxInClause)
- {
- findRowIdsForDocIds(outputConnectionName,rowIDSet,list,sb.toString());
- list.clear();
- sb.setLength(0);
- j = 0;
- }
- String hashValue = (String)iter.next();
- if (j > 0)
- sb.append(',');
- sb.append('?');
- list.add(hashValue);
- j++;
- }
-
- if (j > 0)
- findRowIdsForDocIds(outputConnectionName,rowIDSet,list,sb.toString());
-
- // Next, go through the list of row IDs, and delete them in chunks
- j = 0;
- list.clear();
- sb.setLength(0);
- iter = rowIDSet.keySet().iterator();
- while (iter.hasNext())
- {
- if (j == maxInClause)
- {
- deleteRowIds(list,sb.toString());
- list.clear();
- sb.setLength(0);
- j = 0;
- }
- Long idValue = (Long)iter.next();
- if (j > 0)
- sb.append(',');
- sb.append('?');
- list.add(idValue);
- j++;
- }
-
- if (j > 0)
- deleteRowIds(list,sb.toString());
-
- }
- catch (LCFException e)
- {
- signalRollback();
- throw e;
- }
- catch (Error e)
- {
- signalRollback();
- throw e;
- }
- finally
- {
- endTransaction();
- }
- }
- finally
- {
- lockManager.leaveCriticalSections(null,null,lockArray);
- }
- }
-
- /** Given values and parameters corresponding to a set of hash values, add corresponding
- * table row id's to the output map.
- */
- protected void findRowIdsForURIs(String outputConnectionName, HashMap rowIDSet, HashMap uris, ArrayList hashParamValues,
- String paramList)
- throws LCFException
- {
- hashParamValues.add(outputConnectionName);
- IResultSet set = performQuery("SELECT "+idField+","+docURIField+" FROM "+
- getTableName()+" WHERE "+uriHashField+" IN ("+paramList+") AND "+outputConnNameField+"=?",hashParamValues,null,null);
- int i = 0;
- while (i < set.getRowCount())
- {
- IResultRow row = set.getRow(i++);
- String docURI = (String)row.getValue(docURIField);
- if (docURI != null && docURI.length() > 0)
- {
- if (uris.get(docURI) != null)
- {
- Long rowID = (Long)row.getValue(idField);
- rowIDSet.put(rowID,rowID);
- }
- }
- }
- }
-
- /** Given values and parameters corresponding to a set of hash values, add corresponding
- * table row id's to the output map.
- */
- protected void findRowIdsForDocIds(String outputConnectionName, HashMap rowIDSet, ArrayList paramValues,
- String paramList)
- throws LCFException
- {
- paramValues.add(outputConnectionName);
- IResultSet set = performQuery("SELECT "+idField+" FROM "+
- getTableName()+" WHERE "+docKeyField+" IN ("+paramList+") AND "+outputConnNameField+"=?",paramValues,null,null);
- int i = 0;
- while (i < set.getRowCount())
- {
- IResultRow row = set.getRow(i++);
- Long rowID = (Long)row.getValue(idField);
- rowIDSet.put(rowID,rowID);
- }
- }
-
- /** Delete a chunk of row ids.
- */
- protected void deleteRowIds(ArrayList list, String queryPart)
- throws LCFException
- {
- performDelete("WHERE "+idField+" IN ("+queryPart+")",list,null);
- }
-
- /** Delete a document from the search engine index.
- *@param outputConnectionName is the name of the output connection associated with this action.
- *@param identifierClass is the name of the space in which the identifier hash should be interpreted.
- *@param identifierHash is the hash of the id of the document.
- *@param activities is the object to use to log the details of the ingestion attempt. May be null.
- */
- public void documentDelete(String outputConnectionName,
- String identifierClass, String identifierHash,
- IOutputRemoveActivity activities)
- throws LCFException, ServiceInterruption
- {
- documentDeleteMultiple(outputConnectionName,new String[]{identifierClass},new String[]{identifierHash},activities);
- }
-
- /** Find out what URIs a SET of document URIs are currently ingested.
- *@param identifierHashes is the array of document id's to check.
- *@return the array of current document uri's. Null returned for identifiers
- * that don't exist in the index.
- */
- protected DeleteInfo[] getDocumentURIMultiple(String outputConnectionName, String[] identifierClasses, String[] identifierHashes)
- throws LCFException
- {
- DeleteInfo[] rval = new DeleteInfo[identifierHashes.length];
- HashMap map = new HashMap();
- int i = 0;
- while (i < identifierHashes.length)
- {
- map.put(makeKey(identifierClasses[i],identifierHashes[i]),new Integer(i));
- rval[i] = null;
- i++;
- }
-
- beginTransaction();
- try
- {
- StringBuffer sb = new StringBuffer();
- ArrayList list = new ArrayList();
- int maxCount = getMaxInClause();
- int j = 0;
- Iterator iter = map.keySet().iterator();
- while (iter.hasNext())
- {
- if (j == maxCount)
- {
- getDocumentURIChunk(rval,map,outputConnectionName,sb.toString(),list);
- j = 0;
- sb.setLength(0);
- list.clear();
- }
- if (j > 0)
- sb.append(',');
- sb.append('?');
- list.add(iter.next());
- j++;
- }
- if (j > 0)
- getDocumentURIChunk(rval,map,outputConnectionName,sb.toString(),list);
- return rval;
- }
- catch (LCFException e)
- {
- signalRollback();
- throw e;
- }
- catch (Error e)
- {
- signalRollback();
- throw e;
- }
- finally
- {
- endTransaction();
- }
- }
-
- /** Look up ingestion data for a SET of documents.
- *@param outputConnectionNames are the names of the output connections associated with this action.
- *@param identifierClasses are the names of the spaces in which the identifier hashes should be interpreted.
- *@param identifierHashes is the array of document identifier hashes to look up.
- *@return the array of document data. Null will come back for any identifier that doesn't
- * exist in the index.
- */
- public DocumentIngestStatus[] getDocumentIngestDataMultiple(String[] outputConnectionNames,
- String[] identifierClasses, String[] identifierHashes)
- throws LCFException
- {
- // Segregate request by connection names
- HashMap keyMap = new HashMap();
- int i = 0;
- while (i < outputConnectionNames.length)
- {
- String outputConnectionName = outputConnectionNames[i];
- ArrayList list = (ArrayList)keyMap.get(outputConnectionName);
- if (list == null)
- {
- list = new ArrayList();
- keyMap.put(outputConnectionName,list);
- }
- list.add(new Integer(i));
- i++;
- }
-
- // Create the return array.
- DocumentIngestStatus[] rval = new DocumentIngestStatus[outputConnectionNames.length];
- Iterator iter = keyMap.keySet().iterator();
- while (iter.hasNext())
- {
- String outputConnectionName = (String)iter.next();
- ArrayList list = (ArrayList)keyMap.get(outputConnectionName);
- String[] localIdentifierClasses = new String[list.size()];
- String[] localIdentifierHashes = new String[list.size()];
- i = 0;
- while (i < localIdentifierClasses.length)
- {
- int index = ((Integer)list.get(i)).intValue();
- localIdentifierClasses[i] = identifierClasses[index];
- localIdentifierHashes[i] = identifierHashes[index];
- i++;
- }
- DocumentIngestStatus[] localRval = getDocumentIngestDataMultiple(outputConnectionName,localIdentifierClasses,localIdentifierHashes);
- i = 0;
- while (i < localRval.length)
- {
- int index = ((Integer)list.get(i)).intValue();
- rval[index] = localRval[i];
- i++;
- }
- }
- return rval;
- }
-
- /** Look up ingestion data for a SET of documents.
- *@param outputConnectionName is the names of the output connection associated with this action.
- *@param identifierClasses are the names of the spaces in which the identifier hashes should be interpreted.
- *@param identifierHashes is the array of document identifier hashes to look up.
- *@return the array of document data. Null will come back for any identifier that doesn't
- * exist in the index.
- */
- public DocumentIngestStatus[] getDocumentIngestDataMultiple(String outputConnectionName,
- String[] identifierClasses, String[] identifierHashes)
- throws LCFException
- {
- // Build the return array
- DocumentIngestStatus[] rval = new DocumentIngestStatus[identifierHashes.length];
-
- // Build a map, so we can convert an identifier into an array index.
- HashMap indexMap = new HashMap();
- int i = 0;
- while (i < identifierHashes.length)
- {
- indexMap.put(makeKey(identifierClasses[i],identifierHashes[i]),new Integer(i));
- rval[i] = null;
- i++;
- }
-
- beginTransaction();
- try
- {
- StringBuffer sb = new StringBuffer();
- ArrayList list = new ArrayList();
- int maxCount = getMaxInClause();
- int j = 0;
- Iterator iter = indexMap.keySet().iterator();
- while (iter.hasNext())
- {
- if (j == maxCount)
- {
- getDocumentIngestDataChunk(rval,indexMap,outputConnectionName,sb.toString(),list);
- j = 0;
- sb.setLength(0);
- list.clear();
- }
- if (j > 0)
- sb.append(',');
- sb.append('?');
- list.add(iter.next());
- j++;
- }
- if (j > 0)
- getDocumentIngestDataChunk(rval,indexMap,outputConnectionName,sb.toString(),list);
- return rval;
- }
- catch (LCFException e)
- {
- signalRollback();
- throw e;
- }
- catch (Error e)
- {
- signalRollback();
- throw e;
- }
- finally
- {
- endTransaction();
- }
- }
-
- /** Look up ingestion data for a documents.
- *@param outputConnectionName is the name of the output connection associated with this action.
- *@param identifierClass is the name of the space in which the identifier hash should be interpreted.
- *@param identifierHash is the hash of the id of the document.
- *@return the current document's ingestion data, or null if the document is not currently ingested.
- */
- public DocumentIngestStatus getDocumentIngestData(String outputConnectionName,
- String identifierClass, String identifierHash)
- throws LCFException
- {
- return getDocumentIngestDataMultiple(outputConnectionName,new String[]{identifierClass},new String[]{identifierHash})[0];
- }
-
- /** Calculate the average time interval between changes for a document.
- * This is based on the data gathered for the document.
- *@param outputConnectionName is the name of the output connection associated with this action.
- *@param identifierClass is the name of the space in which the identifier hash should be interpreted.
- *@param identifierHash is the hash of the id of the document.
- *@return the number of milliseconds between changes, or 0 if this cannot be calculated.
- */
- public long getDocumentUpdateInterval(String outputConnectionName,
- String identifierClass, String identifierHash)
- throws LCFException
- {
- return getDocumentUpdateIntervalMultiple(outputConnectionName,new String[]{identifierClass},new String[]{identifierHash})[0];
- }
-
- /** Calculate the average time interval between changes for a document.
- * This is based on the data gathered for the document.
- *@param outputConnectionName is the name of the output connection associated with this action.
- *@param identifierClasses are the names of the spaces in which the identifier hashes should be interpreted.
- *@param identifierHashes is the hashes of the ids of the documents.
- *@return the number of milliseconds between changes, or 0 if this cannot be calculated.
- */
- public long[] getDocumentUpdateIntervalMultiple(String outputConnectionName,
- String[] identifierClasses, String[] identifierHashes)
- throws LCFException
- {
- // Do these all at once!!
- // First, create a return array
- long[] rval = new long[identifierHashes.length];
- // Also create a map from identifier to return index.
- HashMap returnMap = new HashMap();
- // Finally, need the set of hash codes
- HashMap idCodes = new HashMap();
- int j = 0;
- while (j < identifierHashes.length)
- {
- String key = makeKey(identifierClasses[j],identifierHashes[j]);
- rval[j] = 0L;
- returnMap.put(key,new Integer(j));
- idCodes.put(key,key);
- j++;
- }
-
- // Get the chunk size
- int maxInClause = getMaxInClause();
-
-
- // Loop through the hash codes
- Iterator iter = idCodes.keySet().iterator();
- ArrayList list = new ArrayList();
- StringBuffer sb = new StringBuffer();
- j = 0;
- while (iter.hasNext())
- {
- if (j == maxInClause)
- {
- getIntervals(rval,outputConnectionName,list,sb.toString(),returnMap);
- list.clear();
- sb.setLength(0);
- j = 0;
- }
-
- if (j > 0)
- sb.append(',');
- sb.append('?');
- list.add(iter.next());
- j++;
- }
-
- if (j > 0)
- getIntervals(rval,outputConnectionName,list,sb.toString(),returnMap);
-
- return rval;
- }
-
-
- /** Query for and calculate the interval for a bunch of hashcodes.
- *@param rval is the array to stuff calculated return values into.
- *@param list is the list of parameters.
- *@param queryPart is the part of the query pertaining to the list of hashcodes
- *@param returnMap is a mapping from document id to rval index.
- */
- protected void getIntervals(long[] rval, String outputConnectionName, ArrayList list, String queryPart, HashMap returnMap)
- throws LCFException
- {
- list.add(outputConnectionName);
- IResultSet set = performQuery("SELECT "+docKeyField+","+changeCountField+","+firstIngestField+","+lastIngestField+
- " FROM "+getTableName()+" WHERE "+
- docKeyField+" IN("+queryPart+") AND "+outputConnNameField+"=?",list,null,null);
- int i = 0;
- while (i < set.getRowCount())
- {
- IResultRow row = set.getRow(i++);
- String docHash = (String)row.getValue(docKeyField);
- Integer index = (Integer)returnMap.get(docHash);
- if (index != null)
- {
- // Calculate the return value
- long changeCount = ((Long)row.getValue(changeCountField)).longValue();
- long firstIngest = ((Long)row.getValue(firstIngestField)).longValue();
- long lastIngest = ((Long)row.getValue(lastIngestField)).longValue();
- rval[index.intValue()] = (long)(((double)(lastIngest-firstIngest))/(double)changeCount);
- }
- }
- }
-
-
- /** Note the ingestion of a document, or the "update" of a document.
- *@param identifierHash is the hash of the id of the document.
- *@param documentVersion is a string describing the new version of the document.
- *@param ingestTime is the time at which the ingestion took place, in milliseconds since epoch.
- *@param documentURI is the uri the document can be accessed at, or null (which signals that we are to record the version, but no
- * ingestion took place).
- *@param documentURIHash is the hash of the document uri.
- */
- protected void noteDocumentIngest(String outputConnectionName,
- String docKey, String documentVersion,
- String outputVersion, String authorityNameString,
- long ingestTime, String documentURI, String documentURIHash)
- throws LCFException
- {
- while (true)
- {
- // The table can have at most one row per URI, for non-null URIs. It can also have at most one row per document identifier.
- // However, for null URI's, multiple rows are allowed. Null URIs have a special meaning, which is that
- // the document was not actually ingested.
-
- // To make sure the constraints are enforced, we cannot simply look for the row and insert one if not found. This is because
- // postgresql does not cause a lock to be created on rows that don't yet exist, so multiple transactions of the kind described
- // can lead to multiple rows with the same key. Instead, we *could* lock the whole table down, but that would interfere with
- // parallelism. The lowest-impact approach is to make sure an index constraint is in place, and first attempt to do an INSERT.
- // That attempt will fail if a record already exists. Then, an update can be attempted.
- //
- // In the situation where the INSERT fails, the current transaction is aborted and a new transaction must be performed.
- // This means that it is impossible to structure things so that the UPDATE is guaranteed to succeed. So, on the event of an
- // INSERT failure, the UPDATE is tried, but if that fails too, then the INSERT is tried again. This should also handle the
- // case where a DELETE in another transaction removes the database row before it can be UPDATEd.
- //
- // If the UPDATE does not appear to modify any rows, this is also a signal that the INSERT must be retried.
- //
-
- // Set up for insert
- HashMap map = new HashMap();
- map.put(lastVersionField,documentVersion);
- map.put(lastOutputVersionField,outputVersion);
- map.put(lastIngestField,new Long(ingestTime));
- if (documentURI != null)
- {
- map.put(docURIField,documentURI);
- map.put(uriHashField,documentURIHash);
- }
- if (authorityNameString != null)
- map.put(authorityNameField,authorityNameString);
- else
- map.put(authorityNameField,"");
-
- Long id = new Long(IDFactory.make());
- map.put(idField,id);
- map.put(outputConnNameField,outputConnectionName);
- map.put(docKeyField,docKey);
- map.put(changeCountField,new Long(1));
- map.put(firstIngestField,map.get(lastIngestField));
- beginTransaction();
- try
- {
- performInsert(map,null);
- conditionallyAnalyzeInsert();
- return;
- }
- catch (LCFException e)
- {
- signalRollback();
- // If this is simply a constraint violation, we just want to fall through and try the update!
- if (e.getErrorCode() != LCFException.DATABASE_TRANSACTION_ABORT)
- throw e;
- // Otherwise, exit transaction and fall through to 'update' attempt
- }
- catch (Error e)
- {
- signalRollback();
- throw e;
- }
- finally
- {
- endTransaction();
- }
-
- // Insert must have failed. Attempt an update.
- map.clear();
- map.put(lastVersionField,documentVersion);
- map.put(lastOutputVersionField,outputVersion);
- map.put(lastIngestField,new Long(ingestTime));
- if (documentURI != null)
- {
- map.put(docURIField,documentURI);
- map.put(uriHashField,documentURIHash);
- }
- if (authorityNameString != null)
- map.put(authorityNameField,authorityNameString);
- else
- map.put(authorityNameField,"");
-
- beginTransaction();
- try
- {
- // Look for existing row.
- ArrayList list = new ArrayList();
- list.add(docKey);
- list.add(outputConnectionName);
- IResultSet set = performQuery("SELECT "+idField+","+changeCountField+" FROM "+getTableName()+" WHERE "+
- docKeyField+"=? AND "+outputConnNameField+"=? FOR UPDATE",list,null,null);
- IResultRow row = null;
- if (set.getRowCount() > 0)
- row = set.getRow(0);
-
- if (row != null)
- {
- // Update the record
- list.clear();
- list.add(row.getValue(idField));
- long changeCount = ((Long)row.getValue(changeCountField)).longValue();
- changeCount++;
- map.put(changeCountField,new Long(changeCount));
- performUpdate(map,"WHERE "+idField+"=?",list,null);
- // Update successful!
- return;
- }
-
- // Update failed to find a matching record, so cycle back to retry the insert
- }
- catch (LCFException e)
- {
- signalRollback();
- throw e;
- }
- catch (Error e)
- {
- signalRollback();
- throw e;
- }
- finally
- {
- endTransaction();
- }
- }
- }
-
- /** Get a chunk of document uris.
- *@param rval is the string array where the uris should be put.
- *@param map is the map from id to index.
- *@param clause is the in clause for the query.
- *@param list is the parameter list for the query.
- */
- protected void getDocumentURIChunk(DeleteInfo[] rval, Map map, String outputConnectionName, String clause, ArrayList list)
- throws LCFException
- {
- list.add(outputConnectionName);
- IResultSet set = performQuery("SELECT "+docKeyField+","+docURIField+","+lastOutputVersionField+" FROM "+getTableName()+" WHERE "+
- docKeyField+" IN ("+clause+") AND "+outputConnNameField+"=?",list,null,null);
- // Go through list and put into buckets.
- int i = 0;
- while (i < set.getRowCount())
- {
- IResultRow row = set.getRow(i++);
- String docHash = row.getValue(docKeyField).toString();
- Integer position = (Integer)map.get(docHash);
- if (position != null)
- {
- String lastURI = (String)row.getValue(docURIField);
- if (lastURI != null && lastURI.length() == 0)
- lastURI = null;
- String lastOutputVersion = (String)row.getValue(lastOutputVersionField);
- rval[position.intValue()] = new DeleteInfo(lastURI,lastOutputVersion);
- }
- }
- }
-
- /** Get a chunk of document ingest data records.
- *@param rval is the document ingest status array where the data should be put.
- *@param map is the map from id to index.
- *@param clause is the in clause for the query.
- *@param list is the parameter list for the query.
- */
- protected void getDocumentIngestDataChunk(DocumentIngestStatus[] rval, Map map, String outputConnectionName, String clause, ArrayList list)
- throws LCFException
- {
- // Get the primary records associated with this hash value
- list.add(outputConnectionName);
- IResultSet set = performQuery("SELECT "+idField+","+docKeyField+","+lastVersionField+","+lastOutputVersionField+","+authorityNameField+" FROM "+getTableName()+" WHERE "+
- docKeyField+" IN ("+clause+") AND "+outputConnNameField+"=?",list,null,null);
-
- // Now, go through the original request once more, this time building the result
- int i = 0;
- while (i < set.getRowCount())
- {
- IResultRow row = set.getRow(i++);
- String docHash = row.getValue(docKeyField).toString();
- Integer position = (Integer)map.get(docHash);
- if (position != null)
- {
- Long id = (Long)row.getValue(idField);
- String lastVersion = (String)row.getValue(lastVersionField);
- String lastOutputVersion = (String)row.getValue(lastOutputVersionField);
- String authorityName = (String)row.getValue(authorityNameField);
- rval[position.intValue()] = new DocumentIngestStatus(lastVersion,lastOutputVersion,authorityName);
- }
- }
- }
-
- // Protected methods
-
- /** Add or replace document, using the specified output connection, via the standard pool.
- */
- protected int addOrReplaceDocument(IOutputConnection connection, String documentURI, String outputDescription, RepositoryDocument document, String authorityNameString, IOutputAddActivity activities)
+ String documentVersion,
+ long recordTime, IOutputActivity activities)
throws LCFException, ServiceInterruption
- {
- IOutputConnector connector = OutputConnectorFactory.grab(threadContext,connection.getClassName(),connection.getConfigParams(),connection.getMaxConnections());
- if (connector == null)
- // The connector is not installed; treat this as a service interruption.
- throw new ServiceInterruption("Output connector not installed",300000L);
- try
- {
- return connector.addOrReplaceDocument(documentURI,outputDescription,document,authorityNameString,activities);
- }
- finally
- {
- OutputConnectorFactory.release(connector);
- }
- }
-
- /** Remove document, using the specified output connection, via the standard pool.
- */
- protected void removeDocument(IOutputConnection connection, String documentURI, String outputDescription, IOutputRemoveActivity activities)
+ {
+ IOutputConnection connection = connectionManager.load(outputConnectionName);
+
+ String docKey = makeKey(identifierClass,identifierHash);
+
+ if (Logging.ingest.isDebugEnabled())
+ {
+ Logging.ingest.debug("Recording document '"+docKey+"' for output connection '"+outputConnectionName+"'");
+ }
+
+ performIngestion(connection,docKey,documentVersion,null,null,null,recordTime,null,activities);
+ }
+
+ /** Ingest a document.
+ * This ingests the document, and notes it. If this is a repeat ingestion of the document, this
+ * method also REMOVES ALL OLD METADATA. When complete, the index will contain only the metadata
+ * described by the RepositoryDocument object passed to this method.
+ * ServiceInterruption is thrown if the document ingestion must be rescheduled.
+ *@param outputConnectionName is the name of the output connection associated with this action.
+ *@param identifierClass is the name of the space in which the identifier hash should be interpreted.
+ *@param identifierHash is the hashed document identifier.
+ *@param documentVersion is the document version.
+ *@param outputVersion is the output version string constructed from the output specification by the output connector.
+ *@param authorityName is the name of the authority associated with the document, if any.
+ *@param data is the document data. The data is closed after ingestion is complete.
+ *@param ingestTime is the time at which the ingestion took place, in milliseconds since epoch.
+ *@param documentURI is the URI of the document, which will be used as the key of the document in the index.
+ *@param activities is an object providing a set of methods that the implementer can use to perform the operation.
+ *@return true if the ingest was ok, false if the ingest is illegal (and should not be repeated).
+ */
+ public boolean documentIngest(String outputConnectionName,
+ String identifierClass, String identifierHash,
+ String documentVersion,
+ String outputVersion,
+ String authorityName,
+ RepositoryDocument data,
+ long ingestTime, String documentURI,
+ IOutputActivity activities)
+ throws LCFException, ServiceInterruption
+ {
+ IOutputConnection connection = connectionManager.load(outputConnectionName);
+
+ String docKey = makeKey(identifierClass,identifierHash);
+
+ if (Logging.ingest.isDebugEnabled())
+ {
+ Logging.ingest.debug("Ingesting document '"+docKey+"' into output connection '"+outputConnectionName+"'");
+ }
+
+ return performIngestion(connection,docKey,documentVersion,outputVersion,authorityName,
+ data,ingestTime,documentURI,activities);
+ }
+
+ /** Do the actual ingestion, or just record it if there's nothing to ingest. */
+ protected boolean performIngestion(IOutputConnection connection,
+ String docKey, String documentVersion, String outputVersion,
+ String authorityNameString,
+ RepositoryDocument data,
+ long ingestTime, String documentURI,
+ IOutputActivity activities)
+ throws LCFException, ServiceInterruption
+ {
+ // No transactions; not safe because post may take too much time
+
+ // First, calculate a document uri hash value
+ String documentURIHash = null;
+ if (documentURI != null)
+ documentURIHash = LCF.hash(documentURI);
+
+ // See what uri was used before for this doc, if any
+ ArrayList list = new ArrayList();
+ list.add(docKey);
+ list.add(connection.getName());
+ IResultSet set = performQuery("SELECT "+docURIField+","+uriHashField+","+lastOutputVersionField+" FROM "+getTableName()+
+ " WHERE "+docKeyField+"=? AND "+outputConnNameField+"=?",list,null,null);
+
+ String oldURI = null;
+ String oldURIHash = null;
+ String oldOutputVersion = null;
+ if (set.getRowCount() > 0)
+ {
+ IResultRow row = set.getRow(0);
+ oldURI = (String)row.getValue(docURIField);
+ oldURIHash = (String)row.getValue(uriHashField);
+ oldOutputVersion = (String)row.getValue(lastOutputVersionField);
+ }
+
+ // If uri hashes collide, then we must be sure to eliminate only the *correct* records from the table, or we will leave
+ // dangling documents around. So, all uri searches and comparisons MUST compare the actual uri as well.
+
+ // But, since we need to insure that any given URI is only worked on by one thread at a time, use critical sections
+ // to block the rare case that multiple threads try to work on the same URI.
+ int uriCount = 0;
+ if (documentURI != null)
+ uriCount++;
+ if (oldURI != null && (documentURI == null || !documentURI.equals(oldURI)))
+ uriCount++;
+ String[] lockArray = new String[uriCount];
+ uriCount = 0;
+ if (documentURI != null)
+ lockArray[uriCount++] = connection.getName()+":"+documentURI;
+ if (oldURI != null && (documentURI == null || !documentURI.equals(oldURI)))
+ lockArray[uriCount++] = connection.getName()+":"+oldURI;
+
+ lockManager.enterCriticalSections(null,null,lockArray);
+ try
+ {
+
+ if (oldURI != null && (documentURI == null || !oldURI.equals(documentURI)))
+ {
+ // Delete all records from the database that match the old URI, except for THIS record.
+ list.clear();
+ list.add(oldURIHash);
+ list.add(oldURI);
+ list.add(docKey);
+ list.add(connection.getName());
+ performDelete("WHERE "+uriHashField+"=? AND "+docURIField+"=? AND "+docKeyField+"!=? AND "+outputConnNameField+"=?",list,null);
+ removeDocument(connection,oldURI,oldOutputVersion,activities);
+ }
+
+ if (documentURI != null)
+ {
+ // Get rid of all records that match the NEW uri, except for this record.
+ list.clear();
+ list.add(documentURIHash);
+ list.add(documentURI);
+ list.add(docKey);
+ list.add(connection.getName());
+ performDelete("WHERE "+uriHashField+"=? AND "+docURIField+"=? AND "+docKeyField+"!=? AND "+outputConnNameField+"=?",list,null);
+ }
+
+ // Now, we know we are ready for the ingest.
+ if (documentURI != null)
+ {
+ // Here are the cases:
+ // 1) There was a service interruption before the upload started.
+ // (In that case, we don't need to log anything, just reschedule).
+ // 2) There was a service interruption after the document was transmitted.
+ // (In that case, we should presume that the document was ingested, but
+ // reschedule another import anyway.)
+ // 3) Everything went OK
+ // (need to log the ingestion.)
+ // 4) Everything went OK, but we were told we have an illegal document.
+ // (We note the ingestion because if we don't we will be forced to repeat ourselves.
+ // In theory, document doesn't need to be deleted, but there is no way to signal
+ // that at the moment.)
+
+ // Note an ingestion before we actually try it.
+ // This is a marker that says "something is there"; it has an empty version, which indicates
+ // that we don't know anything about it. That means it will be reingested when the
+ // next version comes along, and will be deleted if called for also.
+ noteDocumentIngest(connection.getName(),docKey,null,null,null,ingestTime,documentURI,documentURIHash);
+ int result = addOrReplaceDocument(connection,documentURI,outputVersion,data,authorityNameString,activities);
+ noteDocumentIngest(connection.getName(),docKey,documentVersion,outputVersion,authorityNameString,ingestTime,documentURI,documentURIHash);
+ return result == IOutputConnector.DOCUMENTSTATUS_ACCEPTED;
+ }
+
+ // If we get here, it means we are noting that the document was examined, but that no change was required. This is signaled
+ // to noteDocumentIngest by having the null documentURI.
+ noteDocumentIngest(connection.getName(),docKey,documentVersion,outputVersion,authorityNameString,ingestTime,null,null);
+ return true;
+ }
+ finally
+ {
+ lockManager.leaveCriticalSections(null,null,lockArray);
+ }
+ }
+
+ /** Note the fact that we checked a document (and found that it did not need to be ingested, because the
+ * versions agreed).
+ *@param outputConnectionName is the name of the output connection associated with this action.
+ *@param identifierClasses are the names of the spaces in which the identifier hashes should be interpreted.
+ *@param identifierHashes are the set of document identifier hashes.
+ *@param checkTime is the time at which the check took place, in milliseconds since epoch.
+ */
+ public void documentCheckMultiple(String outputConnectionName,
+ String[] identifierClasses, String[] identifierHashes,
+ long checkTime)
+ throws LCFException
+ {
+ beginTransaction();
+ try
+ {
+ int maxInClause = getMaxInClause();
+
+ HashMap docIDValues = new HashMap();
+ int j = 0;
+ while (j < identifierHashes.length)
+ {
+ String docDBString = makeKey(identifierClasses[j],identifierHashes[j]);
+ docIDValues.put(docDBString,docDBString);
+ j++;
+ }
+
+ // Now, perform n queries, each of them no larger the maxInClause in length.
+ // Create a list of row id's from this.
+ HashMap rowIDSet = new HashMap();
+ Iterator iter = docIDValues.keySet().iterator();
+ j = 0;
+ ArrayList list = new ArrayList();
+ StringBuffer sb = new StringBuffer();
+ while (iter.hasNext())
+ {
+ if (j == maxInClause)
+ {
+ findRowIdsForDocIds(outputConnectionName,rowIDSet,list,sb.toString());
+ list.clear();
+ sb.setLength(0);
+ j = 0;
+ }
+ String idValue = (String)iter.next();
+ if (j > 0)
+ sb.append(',');
+ sb.append('?');
+ list.add(idValue);
+ j++;
+ }
+
+ if (j > 0)
+ findRowIdsForDocIds(outputConnectionName,rowIDSet,list,sb.toString());
+
+ // Now, break row id's into chunks too; submit one chunk at a time
+ j = 0;
+ list.clear();
+ sb.setLength(0);
+ iter = rowIDSet.keySet().iterator();
+ while (iter.hasNext())
+ {
+ if (j == maxInClause)
+ {
+ updateRowIds(list,sb.toString(),checkTime);
+ list.clear();
+ sb.setLength(0);
+ j = 0;
+ }
+ Long idValue = (Long)iter.next();
+ if (j > 0)
+ sb.append(',');
+ sb.append('?');
+ list.add(idValue);
+ j++;
+ }
+
+ if (j > 0)
+ updateRowIds(list,sb.toString(),checkTime);
+ }
+ catch (LCFException e)
+ {
+ signalRollback();
+ throw e;
+ }
+ catch (Error e)
+ {
+ signalRollback();
+ throw e;
+ }
+ finally
+ {
+ endTransaction();
+ }
+ }
+
+ /** Note the fact that we checked a document (and found that it did not need to be ingested, because the
+ * versions agreed).
+ *@param outputConnectionName is the name of the output connection associated with this action.
+ *@param identifierClass is the name of the space in which the identifier hash should be interpreted.
+ *@param identifierHash is the hashed document identifier.
+ *@param checkTime is the time at which the check took place, in milliseconds since epoch.
+ */
+ public void documentCheck(String outputConnectionName,
+ String identifierClass, String identifierHash,
+ long checkTime)
+ throws LCFException
+ {
+ documentCheckMultiple(outputConnectionName,new String[]{identifierClass},new String[]{identifierHash},checkTime);
+ }
+
+ /** Update a chunk of row ids.
+ */
+ protected void updateRowIds(ArrayList list, String queryPart, long checkTime)
+ throws LCFException
+ {
+ HashMap map = new HashMap();
+ map.put(lastIngestField,new Long(checkTime));
+ performUpdate(map,"WHERE "+idField+" IN ("+queryPart+")",list,null);
+ }
+
+ /** Delete multiple documents from the search engine index.
+ *@param outputConnectionNames are the names of the output connections associated with this action.
+ *@param identifierClasses are the names of the spaces in which the identifier hashes should be interpreted.
+ *@param identifierHashes is tha array of document identifier hashes if the documents.
+ *@param activities is the object to use to log the details of the ingestion attempt. May be null.
+ */
+ public void documentDeleteMultiple(String[] outputConnectionNames,
+ String[] identifierClasses, String[] identifierHashes,
+ IOutputRemoveActivity activities)
+ throws LCFException, ServiceInterruption
+ {
+ // Segregate request by connection names
+ HashMap keyMap = new HashMap();
+ int i = 0;
+ while (i < outputConnectionNames.length)
+ {
+ String outputConnectionName = outputConnectionNames[i];
+ ArrayList list = (ArrayList)keyMap.get(outputConnectionName);
+ if (list == null)
+ {
+ list = new ArrayList();
+ keyMap.put(outputConnectionName,list);
+ }
+ list.add(new Integer(i));
+ i++;
+ }
+
+ // Create the return array.
+ Iterator iter = keyMap.keySet().iterator();
+ while (iter.hasNext())
+ {
+ String outputConnectionName = (String)iter.next();
+ ArrayList list = (ArrayList)keyMap.get(outputConnectionName);
+ String[] localIdentifierClasses = new String[list.size()];
+ String[] localIdentifierHashes = new String[list.size()];
+ i = 0;
+ while (i < localIdentifierClasses.length)
+ {
+ int index = ((Integer)list.get(i)).intValue();
+ localIdentifierClasses[i] = identifierClasses[index];
+ localIdentifierHashes[i] = identifierHashes[index];
+ i++;
+ }
+ documentDeleteMultiple(outputConnectionName,localIdentifierClasses,localIdentifierHashes,activities);
+ }
+ }
+
+ /** Delete multiple documents from the search engine index.
+ *@param outputConnectionName is the name of the output connection associated with this action.
+ *@param identifierClasses are the names of the spaces in which the identifier hashes should be interpreted.
+ *@param identifierHashes is tha array of document identifier hashes if the documents.
+ *@param activities is the object to use to log the details of the ingestion attempt. May be null.
+ */
+ public void documentDeleteMultiple(String outputConnectionName,
+ String[] identifierClasses, String[] identifierHashes,
+ IOutputRemoveActivity activities)
+ throws LCFException, ServiceInterruption
+ {
+ IOutputConnection connection = connectionManager.load(outputConnectionName);
+
+ if (Logging.ingest.isDebugEnabled())
+ {
+ int i = 0;
+ while (i < identifierHashes.length)
+ {
+ Logging.ingest.debug("Request to delete document '"+makeKey(identifierClasses[i],identifierHashes[i])+"' from output connection '"+outputConnectionName+"'");
+ i++;
+ }
+ }
+
+ // No transactions. Time for the operation may exceed transaction timeout.
+
+ // Obtain the current URIs of all of these.
+ DeleteInfo[] uris = getDocumentURIMultiple(outputConnectionName,identifierClasses,identifierHashes);
+
+ // Grab critical section locks so that we can't attempt to ingest at the same time we are deleting.
+ // (This guarantees that when this operation is complete the database reflects reality.)
+ int validURIcount = 0;
+ int i = 0;
+ while (i < uris.length)
+ {
+ if (uris[i] != null && uris[i].getURI() != null)
+ validURIcount++;
+ i++;
+ }
+ String[] lockArray = new String[validURIcount];
+ String[] validURIArray = new String[validURIcount];
+ validURIcount = 0;
+ i = 0;
+ while (i < uris.length)
+ {
+ if (uris[i] != null && uris[i].getURI() != null)
+ {
+ validURIArray[validURIcount] = uris[i].getURI();
+ lockArray[validURIcount] = outputConnectionName+":"+validURIArray[validURIcount];
+ validURIcount++;
+ }
+ i++;
+ }
+
+ lockManager.enterCriticalSections(null,null,lockArray);
+ try
+ {
+ // Fetch the document URIs for the listed documents
+ int j = 0;
+ while (j < uris.length)
+ {
+ if (uris[j] != null && uris[j].getURI() != null)
+ removeDocument(connection,uris[j].getURI(),uris[j].getOutputVersion(),activities);
+ j++;
+ }
+
+ // Now, get rid of all rows that match the given uris.
+ // Do the queries together, then the deletes
+ beginTransaction();
+ try
+ {
+ // The basic process is this:
+ // 1) Come up with a set of urihash values
+ // 2) Find the matching, corresponding id values
+ // 3) Delete the rows corresponding to the id values, in sequence
+
+ // Process (1 & 2) has to be broken down into chunks that contain the maximum
+ // number of doc hash values each. We need to avoid repeating doc hash values,
+ // so the first step is to come up with ALL the doc hash values before looping
+ // over them.
+
+ int maxInClause = getMaxInClause();
+
+ // Find all the documents that match this set of URIs
+ HashMap docURIHashValues = new HashMap();
+ HashMap docURIValues = new HashMap();
+ j = 0;
+ while (j < validURIArray.length)
+ {
+ String docDBString = validURIArray[j++];
+ String docDBHashString = LCF.hash(docDBString);
[... 927 lines stripped ...]