You are viewing a plain text version of this content. The canonical link for it is here.
Posted to solr-commits@lucene.apache.org by sh...@apache.org on 2009/02/19 06:43:06 UTC
svn commit: r745742 - in /lucene/solr/trunk/contrib/dataimporthandler:
CHANGES.txt src/main/java/org/apache/solr/handler/dataimport/DocBuilder.java
Author: shalin
Date: Thu Feb 19 05:43:06 2009
New Revision: 745742
URL: http://svn.apache.org/viewvc?rev=745742&view=rev
Log:
SOLR-1004 -- Check for abort more frequently during delta-imports
Modified:
lucene/solr/trunk/contrib/dataimporthandler/CHANGES.txt
lucene/solr/trunk/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DocBuilder.java
Modified: lucene/solr/trunk/contrib/dataimporthandler/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/solr/trunk/contrib/dataimporthandler/CHANGES.txt?rev=745742&r1=745741&r2=745742&view=diff
==============================================================================
--- lucene/solr/trunk/contrib/dataimporthandler/CHANGES.txt (original)
+++ lucene/solr/trunk/contrib/dataimporthandler/CHANGES.txt Thu Feb 19 05:43:06 2009
@@ -83,6 +83,9 @@
2. SOLR-974: DataImportHandler skips commit if no data has been updated.
(Wojtek Piaseczny, shalin)
+3. SOLR-1004: Check for abort more frequently during delta-imports.
+ (Marc Sturlese, shalin)
+
Bug Fixes
----------------------
1. SOLR-800: Deep copy collections to avoid ConcurrentModificationException in XPathEntityprocessor while streaming
Modified: lucene/solr/trunk/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DocBuilder.java
URL: http://svn.apache.org/viewvc/lucene/solr/trunk/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DocBuilder.java?rev=745742&r1=745741&r2=745742&view=diff
==============================================================================
--- lucene/solr/trunk/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DocBuilder.java (original)
+++ lucene/solr/trunk/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DocBuilder.java Thu Feb 19 05:43:06 2009
@@ -25,7 +25,6 @@
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
/**
@@ -62,7 +61,7 @@
private Map<String, Object> session = new HashMap<String, Object>();
static final ThreadLocal<DocBuilder> INSTANCE = new ThreadLocal<DocBuilder>();
- Map<String,Object> functionsNamespace;
+ Map<String, Object> functionsNamespace;
public DocBuilder(DataImporter context, SolrWriter writer, DataImporter.RequestParams reqParams) {
INSTANCE.set(this);
@@ -97,7 +96,7 @@
int currentProcess = -1;
if (dataImporter.getStatus() == DataImporter.Status.RUNNING_DELTA_DUMP) {
currentProcess = Context.DELTA_DUMP;
- } else {
+ } else {
currentProcess = Context.FULL_DUMP;
}
listener.onEvent(new ContextImpl(null, getVariableResolver(), null, currentProcess, session, null, this));
@@ -142,10 +141,10 @@
String delQuery = e.allAttributes.get("preImportDeleteQuery");
if (dataImporter.getStatus() == DataImporter.Status.RUNNING_DELTA_DUMP
&& dataImporter.getLastIndexTime() != null) {
- cleanByQuery(delQuery , fullCleanDone);
+ cleanByQuery(delQuery, fullCleanDone);
doDelta();
delQuery = e.allAttributes.get("postImportDeleteQuery");
- if(delQuery != null) {
+ if (delQuery != null) {
fullCleanDone.set(false);
cleanByQuery(delQuery, fullCleanDone);
}
@@ -153,7 +152,7 @@
cleanByQuery(delQuery, fullCleanDone);
doFullDump();
delQuery = e.allAttributes.get("postImportDeleteQuery");
- if(delQuery != null) {
+ if (delQuery != null) {
fullCleanDone.set(false);
cleanByQuery(delQuery, fullCleanDone);
}
@@ -175,11 +174,11 @@
}
} else {
// Do not commit unnecessarily if this is a delta-import and no documents were created or deleted
- if (!requestParameters.clean) {
- if (importStatistics.docCount.get() > 0 || importStatistics.deletedDocCount.get() > 0) {
+ if (!requestParameters.clean) {
+ if (importStatistics.docCount.get() > 0 || importStatistics.deletedDocCount.get() > 0) {
commit();
}
- } else {
+ } else {
// Finished operation normally, commit now
commit();
}
@@ -256,6 +255,9 @@
vri.addNamespace(DataConfig.IMPORTER_NS + ".delta", map);
buildDocument(vri, null, map, root, true, null);
pkIter.remove();
+ // check for abort
+ if (stop.get())
+ break;
}
if (!stop.get()) {
@@ -289,7 +291,6 @@
ContextImpl ctx = new ContextImpl(entity, vr, null,
pk == null ? Context.FULL_DUMP : Context.DELTA_DUMP,
session, parentCtx, this);
- vr.context = ctx;
entityProcessor.init(ctx);
if (requestParameters.start > 0) {
@@ -436,9 +437,9 @@
}
//else do nothing. if we add it it may fail
} else {
- if (field != null ) {
+ if (field != null) {
for (DataConfig.Field f : field) {
- if(f.toWrite) addFieldToDoc(entry.getValue(), f.getName(), f.boost, f.multiValued, doc);
+ if (f.toWrite) addFieldToDoc(entry.getValue(), f.getName(), f.boost, f.multiValued, doc);
}
}
}
@@ -509,6 +510,9 @@
//this ensures that we start from the leaf nodes
myModifiedPks.addAll(collectDelta(entity1, entity, resolver, context,
deletedRows));
+ //someone called abort
+ if (stop.get())
+ return new HashSet();
}
}
@@ -530,6 +534,9 @@
deltaSet.add(row);
importStatistics.rowsCount.incrementAndGet();
+ // check for abort
+ if (stop.get())
+ return new HashSet();
}
//get the deleted rows for this entity
Set<Map<String, Object>> deletedSet = new HashSet<Map<String, Object>>();
@@ -548,6 +555,9 @@
deletedSet.add(row);
importStatistics.rowsCount.incrementAndGet();
+ // check for abort
+ if (stop.get())
+ return new HashSet();
}
//asymmetric Set difference
@@ -567,11 +577,18 @@
parentEntityProcessor.init(context2);
// identifying deleted rows with deltas
- for (Map<String, Object> row : myModifiedPks)
+ for (Map<String, Object> row : myModifiedPks) {
getModifiedParentRows(resolver.addNamespace(entity.name, row), entity.name, parentEntityProcessor, parentKeyList);
+ // check for abort
+ if (stop.get())
+ return new HashSet();
+ }
// running the same for deletedrows
for (Map<String, Object> row : deletedSet) {
getModifiedParentRows(resolver.addNamespace(entity.name, row), entity.name, parentEntityProcessor, parentKeyList);
+ // check for abort
+ if (stop.get())
+ return new HashSet();
}
}
LOG.info("Completed parentDeltaQuery for Entity: " + entity.name);
@@ -594,6 +611,9 @@
parentKeyList.add(parentRow);
importStatistics.rowsCount.incrementAndGet();
+ // check for abort
+ if (stop.get())
+ return;
}
} finally {
@@ -662,11 +682,11 @@
public Map<String, Object> getStatsSnapshot() {
Map<String, Object> result = new HashMap<String, Object>();
- result.put("docCount",docCount.get());
- result.put("deletedDocCount",deletedDocCount.get());
- result.put("rowCount",rowsCount.get());
- result.put("queryCount",rowsCount.get());
- result.put("skipDocCount",skipDocCount.get());
+ result.put("docCount", docCount.get());
+ result.put("deletedDocCount", deletedDocCount.get());
+ result.put("rowCount", rowsCount.get());
+ result.put("queryCount", rowsCount.get());
+ result.put("skipDocCount", skipDocCount.get());
return result;
}