You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2010/02/18 15:31:31 UTC
svn commit: r911418 [5/8] - in /incubator/lcf/trunk/modules:
connectors/webcrawler/connector/org/apache/lcf/crawler/connectors/webcrawler/
framework/agents/org/apache/lcf/agents/agentmanager/
framework/agents/org/apache/lcf/agents/incrementalingest/ fr...
Modified: incubator/lcf/trunk/modules/framework/pull-agent/org/apache/lcf/crawler/jobs/Carrydown.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/modules/framework/pull-agent/org/apache/lcf/crawler/jobs/Carrydown.java?rev=911418&r1=911417&r2=911418&view=diff
==============================================================================
--- incubator/lcf/trunk/modules/framework/pull-agent/org/apache/lcf/crawler/jobs/Carrydown.java (original)
+++ incubator/lcf/trunk/modules/framework/pull-agent/org/apache/lcf/crawler/jobs/Carrydown.java Thu Feb 18 14:31:31 2010
@@ -29,1101 +29,878 @@
*/
public class Carrydown extends org.apache.lcf.core.database.BaseTable
{
- public static final String _rcsid = "@(#)$Id$";
+ public static final String _rcsid = "@(#)$Id$";
- // Field names
- public static final String jobIDField = "jobid";
- public static final String parentIDHashField = "parentidhash";
- public static final String childIDHashField = "childidhash";
- public static final String dataNameField = "dataname";
- public static final String dataValueHashField = "datavaluehash";
- public static final String dataValueField = "datavalue";
- public static final String newField = "isnew";
-
- /** The standard value for the "isnew" field. Means that the link existed prior to this scan, and no new link
- * was found yet. */
- protected static final int ISNEW_BASE = 0;
- /** This value means that the link is brand-new; it did not exist before this pass. */
- protected static final int ISNEW_NEW = 1;
- /** This value means that the link existed before, and has been found during this scan. */
- protected static final int ISNEW_EXISTING = 2;
-
- /** Counter for kicking off analyze */
- protected static AnalyzeTracker tracker = new AnalyzeTracker();
-
- // Map from string character to link status
- protected static Map isNewMap;
- static
- {
- isNewMap = new HashMap();
- isNewMap.put("B",new Integer(ISNEW_BASE));
- isNewMap.put("N",new Integer(ISNEW_NEW));
- isNewMap.put("E",new Integer(ISNEW_EXISTING));
- }
-
- /** Constructor.
- *@param database is the database handle.
- */
- public Carrydown(IDBInterface database)
- throws LCFException
- {
- super(database,"carrydown");
- }
-
- /** Install or upgrade.
- */
- public void install(String jobsTable, String jobsColumn)
- throws LCFException
- {
- // Since adding a unique constraint may not work, we need the possibility of retrying.
- while (true)
- {
- // First, set up the schema.
- beginTransaction();
- try
- {
- Map existing = getTableSchema(null,null);
- if (existing == null)
- {
- // I'm going to allow the parent to be null, which basically will be able to represent carry-down from the seeding
- // process to the seed, in case this ever arises.
- //
- // I am also going to allow null data values.
- HashMap map = new HashMap();
- map.put(jobIDField,new ColumnDescription("BIGINT",false,false,jobsTable,jobsColumn,false));
- map.put(parentIDHashField,new ColumnDescription("VARCHAR(40)",false,true,null,null,false));
- map.put(childIDHashField,new ColumnDescription("VARCHAR(40)",false,false,null,null,false));
- map.put(dataNameField,new ColumnDescription("VARCHAR(255)",false,false,null,null,false));
- map.put(dataValueHashField,new ColumnDescription("VARCHAR(40)",false,true,null,null,false));
- map.put(dataValueField,new ColumnDescription("LONGTEXT",false,true,null,null,false));
- map.put(newField,new ColumnDescription("CHAR(1)",false,true,null,null,false));
-
- performCreate(map,null);
-
- }
- else
- {
- ColumnDescription cd;
- int i;
-
- // Get rid of unused columns
- cd = (ColumnDescription)existing.get("childid");
- if (cd != null)
- {
- ArrayList list = new ArrayList();
- list.add("childid");
- list.add("parentid");
- performAlter(null,null,list,null);
- }
-
-
- // Make sure there's a datavaluehash field
- cd = (ColumnDescription)existing.get(dataValueHashField);
- if (cd == null)
- {
- // Create the hash field, nullable
- HashMap map = new HashMap();
- map.put(dataValueHashField,new ColumnDescription("VARCHAR(40)",false,true,null,null,false));
- performAlter(map,null,null,null);
-
- // Walk through the existing data values, populating this new field
- while (true)
- {
- IResultSet set = performQuery("SELECT "+jobIDField+","+childIDHashField+","+parentIDHashField+","+
- dataNameField+","+dataValueField+" FROM "+
- getTableName()+" WHERE character_length("+dataValueHashField+") IS NULL AND "+
- dataValueField+" IS NOT NULL LIMIT 1000",null,null,null);
- if (set.getRowCount() == 0)
- break;
- i = 0;
- while (i < set.getRowCount())
- {
- IResultRow row = set.getRow(i++);
- Long jobID = (Long)row.getValue(jobIDField);
- String childIDHash = (String)row.getValue(childIDHashField);
- String parentIDHash = (String)row.getValue(parentIDHashField);
- String dataName = (String)row.getValue(dataNameField);
- String dataValue = (String)row.getValue(dataValueField);
-
- HashMap newMap = new HashMap();
- newMap.put(dataValueHashField,LCF.hash(dataValue));
- ArrayList newList = new ArrayList();
- newList.add(jobID);
- if (parentIDHash != null && parentIDHash.length() > 0)
- {
- newList.add(parentIDHash);
- newList.add(childIDHash);
- newList.add(dataName);
- newList.add(dataValue);
- performUpdate(newMap,"WHERE "+jobIDField+"=? AND "+parentIDHashField+"=? AND "+childIDHashField+"=? AND "+dataNameField+"=? AND "+dataValueField+"=?",newList,null);
- }
- else
- {
- newList.add(childIDHash);
- newList.add(dataName);
- newList.add(dataValue);
- performUpdate(newMap,"WHERE "+jobIDField+"=? AND "+parentIDHashField+" IS NULL AND "+childIDHashField+"=? AND "+dataNameField+"=? AND "+dataValueField+"=?",newList,null);
- }
- }
- }
-
- }
- }
- }
- catch (LCFException e)
- {
- signalRollback();
- throw e;
- }
- catch (Error e)
- {
- signalRollback();
- throw e;
- }
- finally
- {
- endTransaction();
- }
-
- // Now do index management
-
- IndexDescription uniqueIndex = new IndexDescription(true,new String[]{jobIDField,parentIDHashField,childIDHashField,dataNameField,dataValueHashField});
- IndexDescription jobParentIndex = new IndexDescription(false,new String[]{jobIDField,parentIDHashField});
- IndexDescription jobChildDataIndex = new IndexDescription(false,new String[]{jobIDField,childIDHashField,dataNameField});
- IndexDescription jobChildNewIndex = new IndexDescription(false,new String[]{jobIDField,childIDHashField,newField});
-
- Map indexes = getTableIndexes(null,null);
- Iterator iter = indexes.keySet().iterator();
- while (iter.hasNext())
- {
- String indexName = (String)iter.next();
- IndexDescription id = (IndexDescription)indexes.get(indexName);
-
- if (uniqueIndex != null && id.equals(uniqueIndex))
- uniqueIndex = null;
- else if (jobParentIndex != null && id.equals(jobParentIndex))
- jobParentIndex = null;
- else if (jobChildDataIndex != null && id.equals(jobChildDataIndex))
- jobChildDataIndex = null;
- else if (jobChildNewIndex != null && id.equals(jobChildNewIndex))
- jobChildNewIndex = null;
- else if (indexName.indexOf("_pkey") == -1)
- // This index shouldn't be here; drop it
- performRemoveIndex(indexName);
- }
-
- // Create the indexes we are missing
-
- if (jobParentIndex != null)
- performAddIndex(null,jobParentIndex);
-
- if (jobChildDataIndex != null)
- performAddIndex(null,jobChildDataIndex);
-
- if (jobChildNewIndex != null)
- performAddIndex(null,jobChildNewIndex);
-
- // This index is the constraint. Only one row per job,dataname,datavalue,parent,and child.
- if (uniqueIndex != null)
- {
- // This creation might fail...
- try
- {
- performAddIndex(null,uniqueIndex);
- }
- catch (LCFException e)
- {
- if (e.getMessage().indexOf("could not create unique index") == -1)
- throw e;
-
- // Remove duplicates, as part of upgrade
- removeDuplicates();
-
- // Try everything again
- continue;
- }
- }
-
- // Install/upgrade complete
- break;
- }
-
- }
-
- /** Remove duplicates (as part of upgrade */
- protected void removeDuplicates()
- throws LCFException
- {
- // If we got here, it means adding the unique constraint failed. Fix things up!
- Logging.jobs.warn("Carrydown has duplicate jobid,parent,child,dataname,datavalue tuples! Cleaning up...");
-
- // First, create a temporary non-unique index that we intend to remove at the end of this process. We need this index in order to be able to
- // order retrieval of rows by the proposed key order.
- performAddIndex("temp_index_carrydown",new IndexDescription(false,new String[]{jobIDField,parentIDHashField,childIDHashField,dataNameField}));
-
- // The fastest way to eliminate duplicates is to read rows in sorted order, and delete those that are duplicates. The index created above
- // will be used and will guarantee that we don't use excessive postgresql server memory. A client-side filter will be used to eliminate results
- // that are not duplicates, which should prevent unbounded client memory usage as well.
-
- // Count the rows first
- IResultSet countSet = performQuery("SELECT COUNT(*) AS countvar FROM "+getTableName(),null,null,null);
- IResultRow countRow = countSet.getRow(0);
- int count;
- try
- {
- count = Integer.parseInt(countRow.getValue("countvar").toString());
- }
- catch (NumberFormatException e)
- {
- throw new LCFException(e.getMessage(),e);
- }
-
- // Now, amass a list of duplicates
- ArrayList duplicateList = new ArrayList();
- DuplicateFinder duplicateFinder = new DuplicateFinder();
- int j = 0;
- while (j < count)
- {
-
- IResultSet resultSet = getDBInterface().performQuery("SELECT "+jobIDField+","+
- parentIDHashField+","+childIDHashField+","+dataNameField+","+dataValueHashField+","+dataValueField+","+newField+
- " FROM "+getTableName()+
- " ORDER BY "+jobIDField+" ASC,"+parentIDHashField+" ASC,"+childIDHashField+" ASC,"+dataNameField+" ASC,"+dataValueHashField+" ASC OFFSET "+Integer.toString(j)+" LIMIT 10000",null,null,null,-1,duplicateFinder);
-
- int i = 0;
- while (i < resultSet.getRowCount())
- {
- IResultRow row = resultSet.getRow(i++);
- Long jobID = (Long)row.getValue(jobIDField);
- String parentIDHash = (String)row.getValue(parentIDHashField);
- String childIDHash = (String)row.getValue(childIDHashField);
- String dataName = (String)row.getValue(dataNameField);
- String dataValueHash = (String)row.getValue(dataValueHashField);
- String dataValue = (String)row.getValue(dataValueField);
- String newValue = (String)row.getValue(newField);
-
- Logging.jobs.warn("Duplicate carrydown row detected: job "+jobID.toString()+", parentIDHash = "+((parentIDHash==null)?"None":parentIDHash)+", childIDHash = "+((childIDHash==null)?"None":childIDHash)+", dataName = "+dataName+", dataValueHash = "+((dataValueHash==null)?"None":dataValueHash));
-
- HashMap map = new HashMap();
- map.put(jobIDField,jobID);
- if (parentIDHash != null && parentIDHash.length() > 0)
- map.put(parentIDHashField,parentIDHash);
- if (childIDHash != null && childIDHash.length() > 0)
- map.put(childIDHashField,childIDHash);
- map.put(dataNameField,dataName);
- if (dataValueHash != null && dataValueHash.length() > 0)
- map.put(dataValueHashField,dataValueHash);
- if (dataValue != null && dataValue.length() > 0)
- map.put(dataValueField,dataValue);
- if (newValue != null && newValue.length() > 0)
- map.put(newField,newValue);
- duplicateList.add(map);
- }
-
- j += 10000;
- }
-
- // Go through the duplicatelist, and remove the duplicates
- j = 0;
- while (j < duplicateList.size())
- {
- HashMap map = (HashMap)duplicateList.get(j++);
-
- beginTransaction();
- try
- {
- // Since there is no row ID, delete *all* the rows that match the keys, and recreate the key row
- ArrayList list = new ArrayList();
- list.add(map.get(jobIDField));
- String parentIDHash = (String)map.get(parentIDHashField);
- String childIDHash = (String)map.get(childIDHashField);
- if (parentIDHash != null)
- list.add(parentIDHash);
- if (childIDHash != null)
- list.add(childIDHash);
- list.add(map.get(dataNameField));
- String dataValueHash = (String)map.get(dataValueHashField);
- if (dataValueHash != null)
- list.add(dataValueHash);
- performDelete("WHERE "+jobIDField+"=? AND "+parentIDHashField+((parentIDHash!=null)?"=?":" IS NULL")+
- " AND "+childIDHashField+((childIDHash!=null)?"=?":" IS NULL")+
- " AND "+dataNameField+"=? AND "+dataValueHashField+((dataValueHash!=null)?"=?":" IS NULL"),list,null);
-
- // Now, insert the proper row
- performInsert(map,null);
- }
- catch (LCFException e)
- {
- signalRollback();
- throw e;
- }
- catch (Error e)
- {
- signalRollback();
- throw e;
- }
- finally
- {
- endTransaction();
- }
-
- }
-
- // Clean up temporary index
- performRemoveIndex("temp_index_carrydown");
- }
-
- /** Uninstall.
- */
- public void deinstall()
- throws LCFException
- {
- performDrop(null);
- }
-
- /** Analyze job tables that need analysis.
- */
- public void analyzeTables()
- throws LCFException
- {
- long startTime = System.currentTimeMillis();
- Logging.perf.debug("Beginning to analyze carrydown table");
- analyzeTable();
- Logging.perf.debug("Done analyzing carrydown table in "+new Long(System.currentTimeMillis()-startTime)+" ms");
- }
-
- /** Delete an owning job (and clean up the corresponding carrydown rows).
- */
- public void deleteOwner(Long jobID)
- throws LCFException
- {
- ArrayList list = new ArrayList();
- list.add(jobID);
- performDelete("WHERE "+jobIDField+"=?",list,null);
- }
-
- // The strategy here is to leave all rows that have a given document as a parent labelled as "BASE" at the start of the
- // processing of that parent. As data are encountered, the values get written as "NEW" or flipped to "EXISTING".
- // When the document's processing has been completed, another method is called
- // that will remove all rows that belong to the parent which are still labelled "BASE", and will map the other rows that
- // belong to the parent back to the "BASE" state.
- //
- // If the daemon is aborted and restarted, the "new" rows should be deleted, and the EXISTING rows should be reset to
- // BASE, in order to restore the system to a good base state.
- //
-
- /** Reset, at startup time.
- */
- public void reset()
- throws LCFException
- {
- // Delete "new" rows
- HashMap map = new HashMap();
- ArrayList list = new ArrayList();
- list.add(statusToString(ISNEW_NEW));
- performDelete("WHERE "+newField+"=?",list,null);
-
- // Convert "existing" rows to base
- map.put(newField,statusToString(ISNEW_BASE));
- list.clear();
- list.add(statusToString(ISNEW_EXISTING));
- performUpdate(map,"WHERE "+newField+"=?",list,null);
- }
-
- /** Add carrydown data for a given parent/child pair.
- *
- *@return true if new carrydown data was recorded; false otherwise.
- */
- public boolean recordCarrydownData(Long jobID, String parentDocumentIDHash, String childDocumentIDHash,
- String[] documentDataNames, String[][] documentDataValueHashes, Object[][] documentDataValues)
- throws LCFException
- {
- return recordCarrydownDataMultiple(jobID,parentDocumentIDHash,new String[]{childDocumentIDHash},
- new String[][]{documentDataNames},new String[][][]{documentDataValueHashes},new Object[][][]{documentDataValues})[0];
- }
-
- /** Add carrydown data to the table.
- */
- public boolean[] recordCarrydownDataMultiple(Long jobID, String parentDocumentIDHash, String[] childDocumentIDHashes,
- String[][] dataNames, String[][][] dataValueHashes, Object[][][] dataValues)
- throws LCFException
- {
-
- // Need to go into a transaction because we need to distinguish between update and insert.
- HashMap duplicateRemoval = new HashMap();
- HashMap presentMap = new HashMap();
-
- int maxClause = 25;
- StringBuffer sb = new StringBuffer();
- ArrayList list = new ArrayList();
- int i = 0;
- int k = 0;
- // Keep track of the data items that have been seen vs. those that were unseen.
- while (k < childDocumentIDHashes.length)
- {
- String childDocumentIDHash = childDocumentIDHashes[k];
-
- // Loop through data names and values for this document
- String[] documentDataNames = dataNames[k];
- String[][] documentDataValueHashes = dataValueHashes[k];
- Object[][] documentDataValues = dataValues[k];
- k++;
-
- int q = 0;
- while (q < documentDataNames.length)
- {
- String documentDataName = documentDataNames[q];
- String[] documentDataValueHashSet = documentDataValueHashes[q];
- Object[] documentDataValueSet = documentDataValues[q];
- q++;
-
- if (documentDataValueHashSet != null)
- {
- int p = 0;
- while (p < documentDataValueHashSet.length)
- {
- String documentDataValueHash = documentDataValueHashSet[p];
- Object documentDataValue = documentDataValueSet[p];
- // blank values equivalent to null
- if (documentDataValueHash != null && documentDataValueHash.length() == 0)
- documentDataValueHash = null;
- // Build a hash record
- ValueRecord vr = new ValueRecord(childDocumentIDHash,
- documentDataName,documentDataValueHash,documentDataValue);
- if (duplicateRemoval.get(vr) != null)
- continue;
- duplicateRemoval.put(vr,vr);
- if (i == maxClause)
- {
- // Do the query and record the results
- performExistsCheck(presentMap,sb.toString(),list);
- i = 0;
- sb.setLength(0);
- list.clear();
- }
- if (i > 0)
- sb.append(" OR");
- sb.append("(").append(jobIDField).append("=? AND ")
- .append(dataNameField).append("=? AND ")
- .append(parentIDHashField).append("=? AND ").append(childIDHashField).append("=? AND ");
-
- list.add(jobID);
- list.add(documentDataName);
- list.add(parentDocumentIDHash);
- list.add(childDocumentIDHash);
-
- if (documentDataValueHash == null)
- {
- sb.append(dataValueHashField).append(" IS NULL");
- }
- else
- {
- sb.append(dataValueHashField).append("=?");
- list.add(documentDataValueHash);
- }
- sb.append(")");
-
- i++;
- p++;
- }
- }
- }
- }
- if (i > 0)
- performExistsCheck(presentMap,sb.toString(),list);
-
- // Go through the list again, and based on the results above, decide to do either an insert or
- // an update. Keep track of this information also, so we can build the return array when done.
-
- HashMap insertHappened = new HashMap();
-
- int j = 0;
- Iterator iter = duplicateRemoval.keySet().iterator();
- while (iter.hasNext())
- {
- ValueRecord childDocumentRecord = (ValueRecord)iter.next();
-
- String childDocumentIDHash = childDocumentRecord.getDocumentIDHash();
-
- HashMap map = new HashMap();
- String dataName = childDocumentRecord.getDataName();
- String dataValueHash = childDocumentRecord.getDataValueHash();
- Object dataValue = childDocumentRecord.getDataValue();
-
- if (presentMap.get(childDocumentRecord) == null)
- {
- map.put(jobIDField,jobID);
- map.put(parentIDHashField,parentDocumentIDHash);
- map.put(childIDHashField,childDocumentIDHash);
- map.put(dataNameField,dataName);
- if (dataValueHash != null)
- {
- map.put(dataValueHashField,dataValueHash);
- map.put(dataValueField,dataValue);
- }
-
- map.put(newField,statusToString(ISNEW_NEW));
- performInsert(map,null);
- tracker.noteInsert();
- insertHappened.put(childDocumentIDHash,new Boolean(true));
- }
- else
- {
- sb = new StringBuffer();
- sb.append("WHERE ").append(jobIDField).append("=? AND ")
- .append(dataNameField).append("=? AND ")
- .append(parentIDHashField).append("=? AND ").append(childIDHashField).append("=? AND ");
-
- ArrayList updateList = new ArrayList();
- updateList.add(jobID);
- updateList.add(dataName);
- updateList.add(parentDocumentIDHash);
- updateList.add(childDocumentIDHash);
- if (dataValueHash != null)
- {
- sb.append(dataValueHashField).append("=?");
- updateList.add(dataValueHash);
- }
- else
- {
- sb.append(dataValueHashField).append(" IS NULL");
- }
-
- map.put(newField,statusToString(ISNEW_EXISTING));
- performUpdate(map,sb.toString(),updateList,null);
- }
- }
-
- boolean[] rval = new boolean[childDocumentIDHashes.length];
- i = 0;
- while (i < rval.length)
- {
- String childDocumentIDHash = childDocumentIDHashes[i];
- rval[i++] = (insertHappened.get(childDocumentIDHash) != null);
- }
-
- return rval;
- }
-
- /** Do the exists check, in batch. */
- protected void performExistsCheck(Map presentMap, String query, ArrayList list)
- throws LCFException
- {
- // Note well: presentMap is only checked for the *existence* of a record, so we do not need to populate the datavalue field!
- // This is crucial, because otherwise we'd either be using an undetermined amount of memory, or we'd need to read into a temporary file.
- IResultSet result = performQuery("SELECT "+childIDHashField+","+dataNameField+","+dataValueHashField+" FROM "+getTableName()+" WHERE "+query+" FOR UPDATE",list,null,null);
- int i = 0;
- while (i < result.getRowCount())
- {
- IResultRow row = result.getRow(i++);
- String documentIDHash = (String)row.getValue(childIDHashField);
- String dataName = (String)row.getValue(dataNameField);
- String dataValueHash = (String)row.getValue(dataValueHashField);
- //String dataValue = (String)row.getValue(dataValueField);
- ValueRecord vr = new ValueRecord(documentIDHash,dataName,dataValueHash,null);
-
- presentMap.put(vr,vr);
- }
- }
- /** Return all records belonging to the specified parent documents to the base state,
- * and delete the old (eliminated) child records.
- */
- public void restoreRecords(Long jobID, String[] parentDocumentIDHashes)
- throws LCFException
- {
- beginTransaction();
- try
- {
- int maxClause = 25;
- StringBuffer sb = new StringBuffer();
- ArrayList list = new ArrayList();
- int i = 0;
- int k = 0;
- while (i < parentDocumentIDHashes.length)
- {
- if (k == maxClause)
- {
- performRestoreRecords(sb.toString(),list);
- sb.setLength(0);
- list.clear();
- k = 0;
- }
- if (k > 0)
- sb.append(" OR");
- sb.append("(").append(jobIDField).append("=? AND ")
- .append(parentIDHashField).append("=?)");
- String parentDocumentIDHash = parentDocumentIDHashes[i++];
- list.add(jobID);
- list.add(parentDocumentIDHash);
- k++;
- }
-
- if (k > 0)
- performRestoreRecords(sb.toString(),list);
- }
- catch (LCFException e)
- {
- signalRollback();
- throw e;
- }
- catch (Error e)
- {
- signalRollback();
- throw e;
- }
- finally
- {
- endTransaction();
- }
- }
-
- protected void performRestoreRecords(String query, ArrayList list)
- throws LCFException
- {
- // Delete
- StringBuffer sb = new StringBuffer("WHERE (");
- sb.append(query).append(") AND (").append(newField).append("=?)");
- ArrayList newList = (ArrayList)list.clone();
- newList.add(statusToString(ISNEW_BASE));
- performDelete(sb.toString(),newList,null);
-
- // Restore new values
- sb = new StringBuffer("WHERE (");
- sb.append(query).append(") AND (").append(newField).append("=? OR ").append(newField).append("=?)");
- list.add(statusToString(ISNEW_EXISTING));
- list.add(statusToString(ISNEW_NEW));
- HashMap map = new HashMap();
- map.put(newField,statusToString(ISNEW_BASE));
- performUpdate(map,sb.toString(),list,null);
- }
-
- /** Delete all records that mention a particular set of document identifiers.
- */
- public void deleteRecords(Long jobID, String[] documentIDHashes)
- throws LCFException
- {
- beginTransaction();
- try
- {
- int maxClause = 25;
- StringBuffer sb = new StringBuffer();
- StringBuffer sb2 = new StringBuffer();
- ArrayList list = new ArrayList();
- ArrayList list2 = new ArrayList();
- int i = 0;
- int k = 0;
- while (i < documentIDHashes.length)
- {
- if (k == maxClause)
- {
- performDeleteRecords(sb.toString(),sb2.toString(),list,list2);
- sb.setLength(0);
- sb2.setLength(0);
- list.clear();
- list2.clear();
- k = 0;
- }
- if (k > 0)
- {
- sb.append(" OR");
- sb2.append(" OR");
- }
-
- sb.append("(").append(jobIDField).append("=? AND ")
- .append(childIDHashField).append("=?)");
- sb2.append("(").append(jobIDField).append("=? AND ")
- .append(parentIDHashField).append("=?)");
-
- String documentIDHash = documentIDHashes[i++];
- list.add(jobID);
- list.add(documentIDHash);
- list2.add(jobID);
- list2.add(documentIDHash);
- k++;
- }
-
- if (k > 0)
- performDeleteRecords(sb.toString(),sb2.toString(),list,list2);
-
-
- }
- catch (LCFException e)
- {
- signalRollback();
- throw e;
- }
- catch (Error e)
- {
- signalRollback();
- throw e;
- }
- finally
- {
- endTransaction();
- }
-
- }
-
- protected void performDeleteRecords(String query, String query2, ArrayList list, ArrayList list2)
- throws LCFException
- {
- performDelete("WHERE "+query,list,null);
- performDelete("WHERE "+query2,list2,null);
- }
-
- /** Get unique values given a document identifier, data name, an job identifier */
- public String[] getDataValues(Long jobID, String documentIdentifierHash, String dataName)
- throws LCFException
- {
- ArrayList list = new ArrayList();
- list.add(jobID);
- list.add(dataName);
- list.add(documentIdentifierHash);
-
- IResultSet set = getDBInterface().performQuery("SELECT "+dataValueHashField+","+dataValueField+" FROM "+getTableName()+" WHERE "+
- jobIDField+"=? AND "+dataNameField+"=? AND "+childIDHashField+"=? ORDER BY 1 ASC",list,null,null,-1,null,new ResultDuplicateEliminator());
-
- String[] rval = new String[set.getRowCount()];
- int i = 0;
- while (i < rval.length)
- {
- IResultRow row = set.getRow(i);
- rval[i] = (String)row.getValue(dataValueField);
- if (rval[i] == null)
- rval[i] = "";
- i++;
- }
- return rval;
- }
-
- /** Get unique values given a document identifier, data name, an job identifier */
- public CharacterInput[] getDataValuesAsFiles(Long jobID, String documentIdentifierHash, String dataName)
- throws LCFException
- {
- ArrayList list = new ArrayList();
- list.add(jobID);
- list.add(dataName);
- list.add(documentIdentifierHash);
-
- ResultSpecification rs = new ResultSpecification();
- rs.setForm(dataValueField,ResultSpecification.FORM_STREAM);
- IResultSet set = getDBInterface().performQuery("SELECT "+dataValueHashField+","+dataValueField+" FROM "+getTableName()+" WHERE "+
- jobIDField+"=? AND "+dataNameField+"=? AND "+childIDHashField+"=? ORDER BY 1 ASC",list,null,null,-1,rs,new ResultDuplicateEliminator());
-
- CharacterInput[] rval = new CharacterInput[set.getRowCount()];
- int i = 0;
- while (i < rval.length)
- {
- IResultRow row = set.getRow(i);
- rval[i] = (CharacterInput)row.getValue(dataValueField);
- i++;
- }
- return rval;
- }
-
- /** Convert string to link status. */
- public static int stringToStatus(String status)
- {
- Integer value = (Integer)isNewMap.get(status);
- return value.intValue();
- }
-
- /** Convert link status to string */
- public static String statusToString(int status)
- {
- switch (status)
- {
- case ISNEW_BASE:
- return "B";
- case ISNEW_NEW:
- return "N";
- case ISNEW_EXISTING:
- return "E";
- default:
- return null;
- }
- }
-
- /** Conditionally do analyze operation.
- */
- public void conditionallyAnalyzeTables()
- throws LCFException
- {
- if (tracker.checkAnalyze())
- {
- try
- {
- // Do the analyze
- analyzeTable();
- }
- finally
- {
- // Get the size of the table
- // For this table, we base the wait time on the number of rows in it.
- // Simply reanalyze every n inserts
- tracker.doAnalyze(30000L);
- }
- }
- }
-
- /** Limit checker which removes duplicate rows, based on datavaluehash */
- protected static class ResultDuplicateEliminator implements ILimitChecker
- {
- // The last value of data hash
- protected String currentDataHashValue = null;
-
- public ResultDuplicateEliminator()
- {
- }
-
- public boolean doesCompareWork()
- {
- return false;
- }
-
- public ILimitChecker duplicate()
- {
- return null;
- }
-
- public int hashCode()
- {
- return 0;
- }
-
- public boolean equals(Object object)
- {
- return false;
- }
-
- /** See if a result row should be included in the final result set.
- *@param row is the result row to check.
- *@return true if it should be included, false otherwise.
- */
- public boolean checkInclude(IResultRow row)
- throws LCFException
- {
- // Check to be sure that this row is different from the last; only then agree to include it.
- String value = (String)row.getValue(dataValueHashField);
- if (value == null)
- value = "";
- if (currentDataHashValue == null || !value.equals(currentDataHashValue))
- {
- currentDataHashValue = value;
- return true;
- }
- return false;
- }
-
- /** See if we should examine another row.
- *@return true if we need to keep going, or false if we are done.
- */
- public boolean checkContinue()
- throws LCFException
- {
- return true;
- }
- }
-
- /** Analyze tracker class.
- */
- protected static class AnalyzeTracker
- {
- // Number of records to insert before we need to analyze again.
- // After start, we wait 1000 before analyzing the first time.
- protected long recordCount = 1000L;
- protected boolean busy = false;
-
- /** Constructor.
- */
- public AnalyzeTracker()
- {
-
- }
-
- /** Note an analyze.
- */
- public synchronized void doAnalyze(long repeatCount)
- {
- recordCount = repeatCount;
- busy = false;
- }
-
- /** Note an insert */
- public synchronized void noteInsert()
- {
- if (recordCount > 0L)
- recordCount--;
- }
-
- /** Prepare to insert/delete a record, and see if analyze is required.
- */
- public synchronized boolean checkAnalyze()
- {
- if (busy)
- return false;
- busy = (recordCount == 0L);
- return busy;
- }
-
-
- }
-
- protected static class ValueRecord
- {
- protected String documentIdentifierHash;
- protected String dataName;
- protected String dataValueHash;
- // This value may be null, if we're simply using this record as a key
- protected Object dataValue;
-
- public ValueRecord(String documentIdentifierHash, String dataName, String dataValueHash, Object dataValue)
- {
- this.documentIdentifierHash = documentIdentifierHash;
- this.dataName = dataName;
- this.dataValueHash = dataValueHash;
- this.dataValue = dataValue;
- }
-
- public String getDocumentIDHash()
- {
- return documentIdentifierHash;
- }
-
- public String getDataName()
- {
- return dataName;
- }
-
- public String getDataValueHash()
- {
- return dataValueHash;
- }
-
- public Object getDataValue()
- {
- return dataValue;
- }
-
- public int hashCode()
- {
- return documentIdentifierHash.hashCode() + dataName.hashCode() + ((dataValueHash == null)?0:dataValueHash.hashCode());
- }
-
- public boolean equals(Object o)
- {
- if (!(o instanceof ValueRecord))
- return false;
- ValueRecord v = (ValueRecord)o;
- if (!documentIdentifierHash.equals(v.documentIdentifierHash))
- return false;
- if (!dataName.equals(v.dataName))
- return false;
- if (dataValueHash == null && v.dataValueHash != null)
- return false;
- if (dataValueHash != null && v.dataValueHash == null)
- return false;
- if (dataValueHash == null)
- return true;
- return dataValueHash.equals(v.dataValueHash);
- }
- }
-
- // This class filters an ordered resultset to return only the duplicates
- protected static class DuplicateFinder implements ILimitChecker
- {
- protected Long prevJobID = null;
- protected String prevParentIDHash = null;
- protected String prevChildIDHash = null;
- protected String prevDataName = null;
- protected String prevDataValue = null;
-
- public DuplicateFinder()
- {
- }
-
- /** See if this class can be legitimately compared against another of
- * the same type.
- *@return true if comparisons will ever return "true".
- */
- public boolean doesCompareWork()
- {
- return false;
- }
-
- /** Create a duplicate of this class instance. All current state should be preserved.
- * NOTE: Since doesCompareWork() returns false, queries using this limit checker cannot
- * be cached, and therefore duplicate() is never called from the query executor.
- *@return the duplicate.
- */
- public ILimitChecker duplicate()
- {
- DuplicateFinder df = new DuplicateFinder();
- df.prevJobID = prevJobID;
- df.prevParentIDHash = prevParentIDHash;
- df.prevChildIDHash = prevChildIDHash;
- df.prevDataName = prevDataName;
- df.prevDataValue = prevDataValue;
- return df;
- }
-
- /** Find the hashcode for this class. This will only ever be used if
- * doesCompareWork() returns true.
- *@return the hashcode.
- */
- public int hashCode()
- {
- return 0;
- }
-
- /** Compare two objects and see if equal. This will only ever be used
- * if doesCompareWork() returns true.
- *@param object is the object to compare against.
- *@return true if equal.
- */
- public boolean equals(Object object)
- {
- return false;
- }
-
- /** See if a result row should be included in the final result set.
- *@param row is the result row to check.
- *@return true if it should be included, false otherwise.
- */
- public boolean checkInclude(IResultRow row)
- throws LCFException
- {
- Long jobID = (Long)row.getValue(jobIDField);
- String parentIDHash = (String)row.getValue(parentIDHashField);
- if (parentIDHash == null)
- parentIDHash = "";
- String childIDHash = (String)row.getValue(childIDHashField);
- if (childIDHash == null)
- childIDHash = "";
- String dataName = (String)row.getValue(dataNameField);
- String dataValue = (String)row.getValue(dataValueField);
- if (dataValue == null)
- dataValue = "";
-
- // If this is a duplicate, we want to keep it!
- if (prevJobID != null && jobID.equals(prevJobID) && dataName.equals(prevDataName) && dataValue.equals(prevDataValue) && parentIDHash.equals(prevParentIDHash) && childIDHash.equals(prevChildIDHash))
- return true;
- prevJobID = jobID;
- prevDataName = dataName;
- prevParentIDHash = parentIDHash;
- prevChildIDHash = childIDHash;
- prevDataValue = dataValue;
- return false;
- }
-
- /** See if we should examine another row.
- *@return true if we need to keep going, or false if we are done.
- */
- public boolean checkContinue()
- throws LCFException
- {
- return true;
- }
- }
+ // Field names
+ public static final String jobIDField = "jobid";
+ public static final String parentIDHashField = "parentidhash";
+ public static final String childIDHashField = "childidhash";
+ public static final String dataNameField = "dataname";
+ public static final String dataValueHashField = "datavaluehash";
+ public static final String dataValueField = "datavalue";
+ public static final String newField = "isnew";
+
+ /** The standard value for the "isnew" field. Means that the link existed prior to this scan, and no new link
+ * was found yet. */
+ protected static final int ISNEW_BASE = 0;
+ /** This value means that the link is brand-new; it did not exist before this pass. */
+ protected static final int ISNEW_NEW = 1;
+ /** This value means that the link existed before, and has been found during this scan. */
+ protected static final int ISNEW_EXISTING = 2;
+
+ /** Counter for kicking off analyze */
+ protected static AnalyzeTracker tracker = new AnalyzeTracker();
+
+ // Map from string character to link status
+ protected static Map isNewMap;
+ static
+ {
+ isNewMap = new HashMap();
+ isNewMap.put("B",new Integer(ISNEW_BASE));
+ isNewMap.put("N",new Integer(ISNEW_NEW));
+ isNewMap.put("E",new Integer(ISNEW_EXISTING));
+ }
+
+ /** Constructor.
+ *@param database is the database handle.
+ */
+ public Carrydown(IDBInterface database)
+ throws LCFException
+ {
+ super(database,"carrydown");
+ }
+
+ /** Install or upgrade.
+ */
+ public void install(String jobsTable, String jobsColumn)
+ throws LCFException
+ {
+ // Standard practice: Outer loop, to support upgrade requirements.
+ while (true)
+ {
+ Map existing = getTableSchema(null,null);
+ if (existing == null)
+ {
+ // I'm going to allow the parent to be null, which basically will be able to represent carry-down from the seeding
+ // process to the seed, in case this ever arises.
+ //
+ // I am also going to allow null data values.
+ HashMap map = new HashMap();
+ map.put(jobIDField,new ColumnDescription("BIGINT",false,false,jobsTable,jobsColumn,false));
+ map.put(parentIDHashField,new ColumnDescription("VARCHAR(40)",false,true,null,null,false));
+ map.put(childIDHashField,new ColumnDescription("VARCHAR(40)",false,false,null,null,false));
+ map.put(dataNameField,new ColumnDescription("VARCHAR(255)",false,false,null,null,false));
+ map.put(dataValueHashField,new ColumnDescription("VARCHAR(40)",false,true,null,null,false));
+ map.put(dataValueField,new ColumnDescription("LONGTEXT",false,true,null,null,false));
+ map.put(newField,new ColumnDescription("CHAR(1)",false,true,null,null,false));
+
+ performCreate(map,null);
+
+ }
+ else
+ {
+ // Upgrade code goes here, if needed.
+ }
+
+ // Now do index management
+
+ IndexDescription uniqueIndex = new IndexDescription(true,new String[]{jobIDField,parentIDHashField,childIDHashField,dataNameField,dataValueHashField});
+ IndexDescription jobParentIndex = new IndexDescription(false,new String[]{jobIDField,parentIDHashField});
+ IndexDescription jobChildDataIndex = new IndexDescription(false,new String[]{jobIDField,childIDHashField,dataNameField});
+ IndexDescription jobChildNewIndex = new IndexDescription(false,new String[]{jobIDField,childIDHashField,newField});
+
+ Map indexes = getTableIndexes(null,null);
+ Iterator iter = indexes.keySet().iterator();
+ while (iter.hasNext())
+ {
+ String indexName = (String)iter.next();
+ IndexDescription id = (IndexDescription)indexes.get(indexName);
+
+ if (uniqueIndex != null && id.equals(uniqueIndex))
+ uniqueIndex = null;
+ else if (jobParentIndex != null && id.equals(jobParentIndex))
+ jobParentIndex = null;
+ else if (jobChildDataIndex != null && id.equals(jobChildDataIndex))
+ jobChildDataIndex = null;
+ else if (jobChildNewIndex != null && id.equals(jobChildNewIndex))
+ jobChildNewIndex = null;
+ else if (indexName.indexOf("_pkey") == -1)
+ // This index shouldn't be here; drop it
+ performRemoveIndex(indexName);
+ }
+
+ // Create the indexes we are missing
+
+ if (jobParentIndex != null)
+ performAddIndex(null,jobParentIndex);
+
+ if (jobChildDataIndex != null)
+ performAddIndex(null,jobChildDataIndex);
+
+ if (jobChildNewIndex != null)
+ performAddIndex(null,jobChildNewIndex);
+
+ // This index is the constraint. Only one row per job,dataname,datavalue,parent,and child.
+ if (uniqueIndex != null)
+ performAddIndex(null,uniqueIndex);
+
+ // Install/upgrade complete
+ break;
+ }
+
+ }
+
+ /** Uninstall.
+ */
+ public void deinstall()
+ throws LCFException
+ {
+ performDrop(null);
+ }
+
+ /** Analyze job tables that need analysis.
+ */
+ public void analyzeTables()
+ throws LCFException
+ {
+ long startTime = System.currentTimeMillis();
+ Logging.perf.debug("Beginning to analyze carrydown table");
+ analyzeTable();
+ Logging.perf.debug("Done analyzing carrydown table in "+new Long(System.currentTimeMillis()-startTime)+" ms");
+ }
+
+ /** Delete an owning job (and clean up the corresponding carrydown rows).
+ */
+ public void deleteOwner(Long jobID)
+ throws LCFException
+ {
+ ArrayList list = new ArrayList();
+ list.add(jobID);
+ performDelete("WHERE "+jobIDField+"=?",list,null);
+ }
+
+ // The strategy here is to leave all rows that have a given document as a parent labelled as "BASE" at the start of the
+ // processing of that parent. As data are encountered, the values get written as "NEW" or flipped to "EXISTING".
+ // When the document's processing has been completed, another method is called
+ // that will remove all rows that belong to the parent which are still labelled "BASE", and will map the other rows that
+ // belong to the parent back to the "BASE" state.
+ //
+ // If the daemon is aborted and restarted, the "new" rows should be deleted, and the EXISTING rows should be reset to
+ // BASE, in order to restore the system to a good base state.
+ //
+
+ /** Reset, at startup time.
+ */
+ public void reset()
+ throws LCFException
+ {
+ // Delete "new" rows
+ HashMap map = new HashMap();
+ ArrayList list = new ArrayList();
+ list.add(statusToString(ISNEW_NEW));
+ performDelete("WHERE "+newField+"=?",list,null);
+
+ // Convert "existing" rows to base
+ map.put(newField,statusToString(ISNEW_BASE));
+ list.clear();
+ list.add(statusToString(ISNEW_EXISTING));
+ performUpdate(map,"WHERE "+newField+"=?",list,null);
+ }
+
+ /** Add carrydown data for a given parent/child pair.
+ *
+ *@return true if new carrydown data was recorded; false otherwise.
+ */
+ public boolean recordCarrydownData(Long jobID, String parentDocumentIDHash, String childDocumentIDHash,
+ String[] documentDataNames, String[][] documentDataValueHashes, Object[][] documentDataValues)
+ throws LCFException
+ {
+ return recordCarrydownDataMultiple(jobID,parentDocumentIDHash,new String[]{childDocumentIDHash},
+ new String[][]{documentDataNames},new String[][][]{documentDataValueHashes},new Object[][][]{documentDataValues})[0];
+ }
+
+ /** Add carrydown data to the table.
+ */
+ public boolean[] recordCarrydownDataMultiple(Long jobID, String parentDocumentIDHash, String[] childDocumentIDHashes,
+ String[][] dataNames, String[][][] dataValueHashes, Object[][][] dataValues)
+ throws LCFException
+ {
+
+ // Need to go into a transaction because we need to distinguish between update and insert.
+ HashMap duplicateRemoval = new HashMap();
+ HashMap presentMap = new HashMap();
+
+ int maxClause = 25;
+ StringBuffer sb = new StringBuffer();
+ ArrayList list = new ArrayList();
+ int i = 0;
+ int k = 0;
+ // Keep track of the data items that have been seen vs. those that were unseen.
+ while (k < childDocumentIDHashes.length)
+ {
+ String childDocumentIDHash = childDocumentIDHashes[k];
+
+ // Loop through data names and values for this document
+ String[] documentDataNames = dataNames[k];
+ String[][] documentDataValueHashes = dataValueHashes[k];
+ Object[][] documentDataValues = dataValues[k];
+ k++;
+
+ int q = 0;
+ while (q < documentDataNames.length)
+ {
+ String documentDataName = documentDataNames[q];
+ String[] documentDataValueHashSet = documentDataValueHashes[q];
+ Object[] documentDataValueSet = documentDataValues[q];
+ q++;
+
+ if (documentDataValueHashSet != null)
+ {
+ int p = 0;
+ while (p < documentDataValueHashSet.length)
+ {
+ String documentDataValueHash = documentDataValueHashSet[p];
+ Object documentDataValue = documentDataValueSet[p];
+ // blank values equivalent to null
+ if (documentDataValueHash != null && documentDataValueHash.length() == 0)
+ documentDataValueHash = null;
+ // Build a hash record
+ ValueRecord vr = new ValueRecord(childDocumentIDHash,
+ documentDataName,documentDataValueHash,documentDataValue);
+ if (duplicateRemoval.get(vr) != null)
+ continue;
+ duplicateRemoval.put(vr,vr);
+ if (i == maxClause)
+ {
+ // Do the query and record the results
+ performExistsCheck(presentMap,sb.toString(),list);
+ i = 0;
+ sb.setLength(0);
+ list.clear();
+ }
+ if (i > 0)
+ sb.append(" OR");
+ sb.append("(").append(jobIDField).append("=? AND ")
+ .append(dataNameField).append("=? AND ")
+ .append(parentIDHashField).append("=? AND ").append(childIDHashField).append("=? AND ");
+
+ list.add(jobID);
+ list.add(documentDataName);
+ list.add(parentDocumentIDHash);
+ list.add(childDocumentIDHash);
+
+ if (documentDataValueHash == null)
+ {
+ sb.append(dataValueHashField).append(" IS NULL");
+ }
+ else
+ {
+ sb.append(dataValueHashField).append("=?");
+ list.add(documentDataValueHash);
+ }
+ sb.append(")");
+
+ i++;
+ p++;
+ }
+ }
+ }
+ }
+ if (i > 0)
+ performExistsCheck(presentMap,sb.toString(),list);
+
+ // Go through the list again, and based on the results above, decide to do either an insert or
+ // an update. Keep track of this information also, so we can build the return array when done.
+
+ HashMap insertHappened = new HashMap();
+
+ int j = 0;
+ Iterator iter = duplicateRemoval.keySet().iterator();
+ while (iter.hasNext())
+ {
+ ValueRecord childDocumentRecord = (ValueRecord)iter.next();
+
+ String childDocumentIDHash = childDocumentRecord.getDocumentIDHash();
+
+ HashMap map = new HashMap();
+ String dataName = childDocumentRecord.getDataName();
+ String dataValueHash = childDocumentRecord.getDataValueHash();
+ Object dataValue = childDocumentRecord.getDataValue();
+
+ if (presentMap.get(childDocumentRecord) == null)
+ {
+ map.put(jobIDField,jobID);
+ map.put(parentIDHashField,parentDocumentIDHash);
+ map.put(childIDHashField,childDocumentIDHash);
+ map.put(dataNameField,dataName);
+ if (dataValueHash != null)
+ {
+ map.put(dataValueHashField,dataValueHash);
+ map.put(dataValueField,dataValue);
+ }
+
+ map.put(newField,statusToString(ISNEW_NEW));
+ performInsert(map,null);
+ tracker.noteInsert();
+ insertHappened.put(childDocumentIDHash,new Boolean(true));
+ }
+ else
+ {
+ sb = new StringBuffer();
+ sb.append("WHERE ").append(jobIDField).append("=? AND ")
+ .append(dataNameField).append("=? AND ")
+ .append(parentIDHashField).append("=? AND ").append(childIDHashField).append("=? AND ");
+
+ ArrayList updateList = new ArrayList();
+ updateList.add(jobID);
+ updateList.add(dataName);
+ updateList.add(parentDocumentIDHash);
+ updateList.add(childDocumentIDHash);
+ if (dataValueHash != null)
+ {
+ sb.append(dataValueHashField).append("=?");
+ updateList.add(dataValueHash);
+ }
+ else
+ {
+ sb.append(dataValueHashField).append(" IS NULL");
+ }
+
+ map.put(newField,statusToString(ISNEW_EXISTING));
+ performUpdate(map,sb.toString(),updateList,null);
+ }
+ }
+
+ boolean[] rval = new boolean[childDocumentIDHashes.length];
+ i = 0;
+ while (i < rval.length)
+ {
+ String childDocumentIDHash = childDocumentIDHashes[i];
+ rval[i++] = (insertHappened.get(childDocumentIDHash) != null);
+ }
+
+ return rval;
+ }
+
+ /** Do the exists check, in batch. */
+ protected void performExistsCheck(Map presentMap, String query, ArrayList list)
+ throws LCFException
+ {
+ // Note well: presentMap is only checked for the *existence* of a record, so we do not need to populate the datavalue field!
+ // This is crucial, because otherwise we'd either be using an undetermined amount of memory, or we'd need to read into a temporary file.
+ IResultSet result = performQuery("SELECT "+childIDHashField+","+dataNameField+","+dataValueHashField+" FROM "+getTableName()+" WHERE "+query+" FOR UPDATE",list,null,null);
+ int i = 0;
+ while (i < result.getRowCount())
+ {
+ IResultRow row = result.getRow(i++);
+ String documentIDHash = (String)row.getValue(childIDHashField);
+ String dataName = (String)row.getValue(dataNameField);
+ String dataValueHash = (String)row.getValue(dataValueHashField);
+ //String dataValue = (String)row.getValue(dataValueField);
+ ValueRecord vr = new ValueRecord(documentIDHash,dataName,dataValueHash,null);
+
+ presentMap.put(vr,vr);
+ }
+ }
+ /** Return all records belonging to the specified parent documents to the base state,
+ * and delete the old (eliminated) child records.
+ */
+ public void restoreRecords(Long jobID, String[] parentDocumentIDHashes)
+ throws LCFException
+ {
+ beginTransaction();
+ try
+ {
+ int maxClause = 25;
+ StringBuffer sb = new StringBuffer();
+ ArrayList list = new ArrayList();
+ int i = 0;
+ int k = 0;
+ while (i < parentDocumentIDHashes.length)
+ {
+ if (k == maxClause)
+ {
+ performRestoreRecords(sb.toString(),list);
+ sb.setLength(0);
+ list.clear();
+ k = 0;
+ }
+ if (k > 0)
+ sb.append(" OR");
+ sb.append("(").append(jobIDField).append("=? AND ")
+ .append(parentIDHashField).append("=?)");
+ String parentDocumentIDHash = parentDocumentIDHashes[i++];
+ list.add(jobID);
+ list.add(parentDocumentIDHash);
+ k++;
+ }
+
+ if (k > 0)
+ performRestoreRecords(sb.toString(),list);
+ }
+ catch (LCFException e)
+ {
+ signalRollback();
+ throw e;
+ }
+ catch (Error e)
+ {
+ signalRollback();
+ throw e;
+ }
+ finally
+ {
+ endTransaction();
+ }
+ }
+
+ protected void performRestoreRecords(String query, ArrayList list)
+ throws LCFException
+ {
+ // Delete
+ StringBuffer sb = new StringBuffer("WHERE (");
+ sb.append(query).append(") AND (").append(newField).append("=?)");
+ ArrayList newList = (ArrayList)list.clone();
+ newList.add(statusToString(ISNEW_BASE));
+ performDelete(sb.toString(),newList,null);
+
+ // Restore new values
+ sb = new StringBuffer("WHERE (");
+ sb.append(query).append(") AND (").append(newField).append("=? OR ").append(newField).append("=?)");
+ list.add(statusToString(ISNEW_EXISTING));
+ list.add(statusToString(ISNEW_NEW));
+ HashMap map = new HashMap();
+ map.put(newField,statusToString(ISNEW_BASE));
+ performUpdate(map,sb.toString(),list,null);
+ }
+
+ /** Delete all records that mention a particular set of document identifiers.
+ */
+ public void deleteRecords(Long jobID, String[] documentIDHashes)
+ throws LCFException
+ {
+ beginTransaction();
+ try
+ {
+ int maxClause = 25;
+ StringBuffer sb = new StringBuffer();
+ StringBuffer sb2 = new StringBuffer();
+ ArrayList list = new ArrayList();
+ ArrayList list2 = new ArrayList();
+ int i = 0;
+ int k = 0;
+ while (i < documentIDHashes.length)
+ {
+ if (k == maxClause)
+ {
+ performDeleteRecords(sb.toString(),sb2.toString(),list,list2);
+ sb.setLength(0);
+ sb2.setLength(0);
+ list.clear();
+ list2.clear();
+ k = 0;
+ }
+ if (k > 0)
+ {
+ sb.append(" OR");
+ sb2.append(" OR");
+ }
+
+ sb.append("(").append(jobIDField).append("=? AND ")
+ .append(childIDHashField).append("=?)");
+ sb2.append("(").append(jobIDField).append("=? AND ")
+ .append(parentIDHashField).append("=?)");
+
+ String documentIDHash = documentIDHashes[i++];
+ list.add(jobID);
+ list.add(documentIDHash);
+ list2.add(jobID);
+ list2.add(documentIDHash);
+ k++;
+ }
+
+ if (k > 0)
+ performDeleteRecords(sb.toString(),sb2.toString(),list,list2);
+
+
+ }
+ catch (LCFException e)
+ {
+ signalRollback();
+ throw e;
+ }
+ catch (Error e)
+ {
+ signalRollback();
+ throw e;
+ }
+ finally
+ {
+ endTransaction();
+ }
+
+ }
+
+ protected void performDeleteRecords(String query, String query2, ArrayList list, ArrayList list2)
+ throws LCFException
+ {
+ performDelete("WHERE "+query,list,null);
+ performDelete("WHERE "+query2,list2,null);
+ }
+
+ /** Get unique values given a document identifier, data name, an job identifier */
+ public String[] getDataValues(Long jobID, String documentIdentifierHash, String dataName)
+ throws LCFException
+ {
+ ArrayList list = new ArrayList();
+ list.add(jobID);
+ list.add(dataName);
+ list.add(documentIdentifierHash);
+
+ IResultSet set = getDBInterface().performQuery("SELECT "+dataValueHashField+","+dataValueField+" FROM "+getTableName()+" WHERE "+
+ jobIDField+"=? AND "+dataNameField+"=? AND "+childIDHashField+"=? ORDER BY 1 ASC",list,null,null,-1,null,new ResultDuplicateEliminator());
+
+ String[] rval = new String[set.getRowCount()];
+ int i = 0;
+ while (i < rval.length)
+ {
+ IResultRow row = set.getRow(i);
+ rval[i] = (String)row.getValue(dataValueField);
+ if (rval[i] == null)
+ rval[i] = "";
+ i++;
+ }
+ return rval;
+ }
+
+ /** Get unique values given a document identifier, data name, an job identifier */
+ public CharacterInput[] getDataValuesAsFiles(Long jobID, String documentIdentifierHash, String dataName)
+ throws LCFException
+ {
+ ArrayList list = new ArrayList();
+ list.add(jobID);
+ list.add(dataName);
+ list.add(documentIdentifierHash);
+
+ ResultSpecification rs = new ResultSpecification();
+ rs.setForm(dataValueField,ResultSpecification.FORM_STREAM);
+ IResultSet set = getDBInterface().performQuery("SELECT "+dataValueHashField+","+dataValueField+" FROM "+getTableName()+" WHERE "+
+ jobIDField+"=? AND "+dataNameField+"=? AND "+childIDHashField+"=? ORDER BY 1 ASC",list,null,null,-1,rs,new ResultDuplicateEliminator());
+
+ CharacterInput[] rval = new CharacterInput[set.getRowCount()];
+ int i = 0;
+ while (i < rval.length)
+ {
+ IResultRow row = set.getRow(i);
+ rval[i] = (CharacterInput)row.getValue(dataValueField);
+ i++;
+ }
+ return rval;
+ }
+
+ /** Convert string to link status. */
+ public static int stringToStatus(String status)
+ {
+ Integer value = (Integer)isNewMap.get(status);
+ return value.intValue();
+ }
+
+ /** Convert link status to string */
+ public static String statusToString(int status)
+ {
+ switch (status)
+ {
+ case ISNEW_BASE:
+ return "B";
+ case ISNEW_NEW:
+ return "N";
+ case ISNEW_EXISTING:
+ return "E";
+ default:
+ return null;
+ }
+ }
+
+ /** Conditionally do analyze operation.
+ */
+ public void conditionallyAnalyzeTables()
+ throws LCFException
+ {
+ if (tracker.checkAnalyze())
+ {
+ try
+ {
+ // Do the analyze
+ analyzeTable();
+ }
+ finally
+ {
+ // Get the size of the table
+ // For this table, we base the wait time on the number of rows in it.
+ // Simply reanalyze every n inserts
+ tracker.doAnalyze(30000L);
+ }
+ }
+ }
+
+ /** Limit checker which removes duplicate rows, based on datavaluehash */
+ protected static class ResultDuplicateEliminator implements ILimitChecker
+ {
+ // The last value of data hash
+ protected String currentDataHashValue = null;
+
+ public ResultDuplicateEliminator()
+ {
+ }
+
+ public boolean doesCompareWork()
+ {
+ return false;
+ }
+
+ public ILimitChecker duplicate()
+ {
+ return null;
+ }
+
+ public int hashCode()
+ {
+ return 0;
+ }
+
+ public boolean equals(Object object)
+ {
+ return false;
+ }
+
+ /** See if a result row should be included in the final result set.
+ *@param row is the result row to check.
+ *@return true if it should be included, false otherwise.
+ */
+ public boolean checkInclude(IResultRow row)
+ throws LCFException
+ {
+ // Check to be sure that this row is different from the last; only then agree to include it.
+ String value = (String)row.getValue(dataValueHashField);
+ if (value == null)
+ value = "";
+ if (currentDataHashValue == null || !value.equals(currentDataHashValue))
+ {
+ currentDataHashValue = value;
+ return true;
+ }
+ return false;
+ }
+
+ /** See if we should examine another row.
+ *@return true if we need to keep going, or false if we are done.
+ */
+ public boolean checkContinue()
+ throws LCFException
+ {
+ return true;
+ }
+ }
+
+ /** Analyze tracker class.
+ */
+ protected static class AnalyzeTracker
+ {
+ // Number of records to insert before we need to analyze again.
+ // After start, we wait 1000 before analyzing the first time.
+ protected long recordCount = 1000L;
+ protected boolean busy = false;
+
+ /** Constructor.
+ */
+ public AnalyzeTracker()
+ {
+
+ }
+
+ /** Note an analyze.
+ */
+ public synchronized void doAnalyze(long repeatCount)
+ {
+ recordCount = repeatCount;
+ busy = false;
+ }
+
+ /** Note an insert */
+ public synchronized void noteInsert()
+ {
+ if (recordCount > 0L)
+ recordCount--;
+ }
+
+ /** Prepare to insert/delete a record, and see if analyze is required.
+ */
+ public synchronized boolean checkAnalyze()
+ {
+ if (busy)
+ return false;
+ busy = (recordCount == 0L);
+ return busy;
+ }
+
+
+ }
+
+ protected static class ValueRecord
+ {
+ protected String documentIdentifierHash;
+ protected String dataName;
+ protected String dataValueHash;
+ // This value may be null, if we're simply using this record as a key
+ protected Object dataValue;
+
+ public ValueRecord(String documentIdentifierHash, String dataName, String dataValueHash, Object dataValue)
+ {
+ this.documentIdentifierHash = documentIdentifierHash;
+ this.dataName = dataName;
+ this.dataValueHash = dataValueHash;
+ this.dataValue = dataValue;
+ }
+
+ public String getDocumentIDHash()
+ {
+ return documentIdentifierHash;
+ }
+
+ public String getDataName()
+ {
+ return dataName;
+ }
+
+ public String getDataValueHash()
+ {
+ return dataValueHash;
+ }
+
+ public Object getDataValue()
+ {
+ return dataValue;
+ }
+
+ public int hashCode()
+ {
+ return documentIdentifierHash.hashCode() + dataName.hashCode() + ((dataValueHash == null)?0:dataValueHash.hashCode());
+ }
+
+ public boolean equals(Object o)
+ {
+ if (!(o instanceof ValueRecord))
+ return false;
+ ValueRecord v = (ValueRecord)o;
+ if (!documentIdentifierHash.equals(v.documentIdentifierHash))
+ return false;
+ if (!dataName.equals(v.dataName))
+ return false;
+ if (dataValueHash == null && v.dataValueHash != null)
+ return false;
+ if (dataValueHash != null && v.dataValueHash == null)
+ return false;
+ if (dataValueHash == null)
+ return true;
+ return dataValueHash.equals(v.dataValueHash);
+ }
+ }
+
+ // This class filters an ordered resultset to return only the duplicates
+ protected static class DuplicateFinder implements ILimitChecker
+ {
+ protected Long prevJobID = null;
+ protected String prevParentIDHash = null;
+ protected String prevChildIDHash = null;
+ protected String prevDataName = null;
+ protected String prevDataValue = null;
+
+ public DuplicateFinder()
+ {
+ }
+
+ /** See if this class can be legitimately compared against another of
+ * the same type.
+ *@return true if comparisons will ever return "true".
+ */
+ public boolean doesCompareWork()
+ {
+ return false;
+ }
+
+ /** Create a duplicate of this class instance. All current state should be preserved.
+ * NOTE: Since doesCompareWork() returns false, queries using this limit checker cannot
+ * be cached, and therefore duplicate() is never called from the query executor.
+ *@return the duplicate.
+ */
+ public ILimitChecker duplicate()
+ {
+ DuplicateFinder df = new DuplicateFinder();
+ df.prevJobID = prevJobID;
+ df.prevParentIDHash = prevParentIDHash;
+ df.prevChildIDHash = prevChildIDHash;
+ df.prevDataName = prevDataName;
+ df.prevDataValue = prevDataValue;
+ return df;
+ }
+
+ /** Find the hashcode for this class. This will only ever be used if
+ * doesCompareWork() returns true.
+ *@return the hashcode.
+ */
+ public int hashCode()
+ {
+ return 0;
+ }
+
+ /** Compare two objects and see if equal. This will only ever be used
+ * if doesCompareWork() returns true.
+ *@param object is the object to compare against.
+ *@return true if equal.
+ */
+ public boolean equals(Object object)
+ {
+ return false;
+ }
+
+ /** See if a result row should be included in the final result set.
+ *@param row is the result row to check.
+ *@return true if it should be included, false otherwise.
+ */
+ public boolean checkInclude(IResultRow row)
+ throws LCFException
+ {
+ Long jobID = (Long)row.getValue(jobIDField);
+ String parentIDHash = (String)row.getValue(parentIDHashField);
+ if (parentIDHash == null)
+ parentIDHash = "";
+ String childIDHash = (String)row.getValue(childIDHashField);
+ if (childIDHash == null)
+ childIDHash = "";
+ String dataName = (String)row.getValue(dataNameField);
+ String dataValue = (String)row.getValue(dataValueField);
+ if (dataValue == null)
+ dataValue = "";
+
+ // If this is a duplicate, we want to keep it!
+ if (prevJobID != null && jobID.equals(prevJobID) && dataName.equals(prevDataName) && dataValue.equals(prevDataValue) && parentIDHash.equals(prevParentIDHash) && childIDHash.equals(prevChildIDHash))
+ return true;
+ prevJobID = jobID;
+ prevDataName = dataName;
+ prevParentIDHash = parentIDHash;
+ prevChildIDHash = childIDHash;
+ prevDataValue = dataValue;
+ return false;
+ }
+
+ /** See if we should examine another row.
+ *@return true if we need to keep going, or false if we are done.
+ */
+ public boolean checkContinue()
+ throws LCFException
+ {
+ return true;
+ }
+ }
}
Modified: incubator/lcf/trunk/modules/framework/pull-agent/org/apache/lcf/crawler/jobs/EventManager.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/modules/framework/pull-agent/org/apache/lcf/crawler/jobs/EventManager.java?rev=911418&r1=911417&r2=911418&view=diff
==============================================================================
--- incubator/lcf/trunk/modules/framework/pull-agent/org/apache/lcf/crawler/jobs/EventManager.java (original)
+++ incubator/lcf/trunk/modules/framework/pull-agent/org/apache/lcf/crawler/jobs/EventManager.java Thu Feb 18 14:31:31 2010
@@ -49,8 +49,8 @@
public void install()
throws LCFException
{
- beginTransaction();
- try
+ // Standard practice: outer loop for installs
+ while (true)
{
Map existing = getTableSchema(null,null);
if (existing == null)
@@ -61,22 +61,12 @@
}
else
{
- // No upgrade is possible since this table has just been introduced.
+ // Upgrade goes here if needed
}
- }
- catch (LCFException e)
- {
- signalRollback();
- throw e;
- }
- catch (Error e)
- {
- signalRollback();
- throw e;
- }
- finally
- {
- endTransaction();
+
+ // Index management goes here
+
+ break;
}
}