You are viewing a plain text version of this content. The canonical link for it is here.
Posted to solr-commits@lucene.apache.org by sh...@apache.org on 2009/02/19 06:43:06 UTC

svn commit: r745742 - in /lucene/solr/trunk/contrib/dataimporthandler: CHANGES.txt src/main/java/org/apache/solr/handler/dataimport/DocBuilder.java

Author: shalin
Date: Thu Feb 19 05:43:06 2009
New Revision: 745742

URL: http://svn.apache.org/viewvc?rev=745742&view=rev
Log:
SOLR-1004 -- Check for abort more frequently during delta-imports

Modified:
    lucene/solr/trunk/contrib/dataimporthandler/CHANGES.txt
    lucene/solr/trunk/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DocBuilder.java

Modified: lucene/solr/trunk/contrib/dataimporthandler/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/solr/trunk/contrib/dataimporthandler/CHANGES.txt?rev=745742&r1=745741&r2=745742&view=diff
==============================================================================
--- lucene/solr/trunk/contrib/dataimporthandler/CHANGES.txt (original)
+++ lucene/solr/trunk/contrib/dataimporthandler/CHANGES.txt Thu Feb 19 05:43:06 2009
@@ -83,6 +83,9 @@
 2. SOLR-974:  DataImportHandler skips commit if no data has been updated.
               (Wojtek Piaseczny, shalin)
 
+3. SOLR-1004: Check for abort more frequently during delta-imports.
+              (Marc Sturlese, shalin)
+
 Bug Fixes
 ----------------------
 1. SOLR-800:  Deep copy collections to avoid ConcurrentModificationException in XPathEntityprocessor while streaming

Modified: lucene/solr/trunk/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DocBuilder.java
URL: http://svn.apache.org/viewvc/lucene/solr/trunk/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DocBuilder.java?rev=745742&r1=745741&r2=745742&view=diff
==============================================================================
--- lucene/solr/trunk/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DocBuilder.java (original)
+++ lucene/solr/trunk/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DocBuilder.java Thu Feb 19 05:43:06 2009
@@ -25,7 +25,6 @@
 
 import java.util.*;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
@@ -62,7 +61,7 @@
   private Map<String, Object> session = new HashMap<String, Object>();
 
   static final ThreadLocal<DocBuilder> INSTANCE = new ThreadLocal<DocBuilder>();
-  Map<String,Object> functionsNamespace;
+  Map<String, Object> functionsNamespace;
 
   public DocBuilder(DataImporter context, SolrWriter writer, DataImporter.RequestParams reqParams) {
     INSTANCE.set(this);
@@ -97,7 +96,7 @@
       int currentProcess = -1;
       if (dataImporter.getStatus() == DataImporter.Status.RUNNING_DELTA_DUMP) {
         currentProcess = Context.DELTA_DUMP;
-      } else  {
+      } else {
         currentProcess = Context.FULL_DUMP;
       }
       listener.onEvent(new ContextImpl(null, getVariableResolver(), null, currentProcess, session, null, this));
@@ -142,10 +141,10 @@
       String delQuery = e.allAttributes.get("preImportDeleteQuery");
       if (dataImporter.getStatus() == DataImporter.Status.RUNNING_DELTA_DUMP
               && dataImporter.getLastIndexTime() != null) {
-        cleanByQuery(delQuery , fullCleanDone);
+        cleanByQuery(delQuery, fullCleanDone);
         doDelta();
         delQuery = e.allAttributes.get("postImportDeleteQuery");
-        if(delQuery != null)  {
+        if (delQuery != null) {
           fullCleanDone.set(false);
           cleanByQuery(delQuery, fullCleanDone);
         }
@@ -153,7 +152,7 @@
         cleanByQuery(delQuery, fullCleanDone);
         doFullDump();
         delQuery = e.allAttributes.get("postImportDeleteQuery");
-        if(delQuery != null)  {
+        if (delQuery != null) {
           fullCleanDone.set(false);
           cleanByQuery(delQuery, fullCleanDone);
         }
@@ -175,11 +174,11 @@
       }
     } else {
       // Do not commit unnecessarily if this is a delta-import and no documents were created or deleted
-      if (!requestParameters.clean)  {
-        if (importStatistics.docCount.get() > 0 || importStatistics.deletedDocCount.get() > 0)  {
+      if (!requestParameters.clean) {
+        if (importStatistics.docCount.get() > 0 || importStatistics.deletedDocCount.get() > 0) {
           commit();
         }
-      } else  {
+      } else {
         // Finished operation normally, commit now
         commit();
       }
@@ -256,6 +255,9 @@
       vri.addNamespace(DataConfig.IMPORTER_NS + ".delta", map);
       buildDocument(vri, null, map, root, true, null);
       pkIter.remove();
+      // check for abort
+      if (stop.get())
+        break;
     }
 
     if (!stop.get()) {
@@ -289,7 +291,6 @@
     ContextImpl ctx = new ContextImpl(entity, vr, null,
             pk == null ? Context.FULL_DUMP : Context.DELTA_DUMP,
             session, parentCtx, this);
-    vr.context = ctx;
     entityProcessor.init(ctx);
 
     if (requestParameters.start > 0) {
@@ -436,9 +437,9 @@
         }
         //else do nothing. if we add it it may fail
       } else {
-        if (field != null ) {
+        if (field != null) {
           for (DataConfig.Field f : field) {
-            if(f.toWrite) addFieldToDoc(entry.getValue(), f.getName(), f.boost, f.multiValued, doc);
+            if (f.toWrite) addFieldToDoc(entry.getValue(), f.getName(), f.boost, f.multiValued, doc);
           }
         }
       }
@@ -509,6 +510,9 @@
         //this ensures that we start from the leaf nodes
         myModifiedPks.addAll(collectDelta(entity1, entity, resolver, context,
                 deletedRows));
+        //someone called abort
+        if (stop.get())
+          return new HashSet();
       }
 
     }
@@ -530,6 +534,9 @@
 
       deltaSet.add(row);
       importStatistics.rowsCount.incrementAndGet();
+      // check for abort
+      if (stop.get())
+        return new HashSet();
     }
     //get the deleted rows for this entity
     Set<Map<String, Object>> deletedSet = new HashSet<Map<String, Object>>();
@@ -548,6 +555,9 @@
 
       deletedSet.add(row);
       importStatistics.rowsCount.incrementAndGet();
+      // check for abort
+      if (stop.get())
+        return new HashSet();
     }
 
     //asymmetric Set difference
@@ -567,11 +577,18 @@
       parentEntityProcessor.init(context2);
       // identifying deleted rows with deltas
 
-      for (Map<String, Object> row : myModifiedPks)
+      for (Map<String, Object> row : myModifiedPks) {
         getModifiedParentRows(resolver.addNamespace(entity.name, row), entity.name, parentEntityProcessor, parentKeyList);
+        // check for abort
+        if (stop.get())
+          return new HashSet();
+      }
       // running the same for deletedrows
       for (Map<String, Object> row : deletedSet) {
         getModifiedParentRows(resolver.addNamespace(entity.name, row), entity.name, parentEntityProcessor, parentKeyList);
+        // check for abort
+        if (stop.get())
+          return new HashSet();
       }
     }
     LOG.info("Completed parentDeltaQuery for Entity: " + entity.name);
@@ -594,6 +611,9 @@
 
         parentKeyList.add(parentRow);
         importStatistics.rowsCount.incrementAndGet();
+        // check for abort
+        if (stop.get())
+          return;
       }
 
     } finally {
@@ -662,11 +682,11 @@
 
     public Map<String, Object> getStatsSnapshot() {
       Map<String, Object> result = new HashMap<String, Object>();
-      result.put("docCount",docCount.get());
-      result.put("deletedDocCount",deletedDocCount.get());
-      result.put("rowCount",rowsCount.get());
-      result.put("queryCount",rowsCount.get());
-      result.put("skipDocCount",skipDocCount.get());
+      result.put("docCount", docCount.get());
+      result.put("deletedDocCount", deletedDocCount.get());
+      result.put("rowCount", rowsCount.get());
+      result.put("queryCount", rowsCount.get());
+      result.put("skipDocCount", skipDocCount.get());
       return result;
     }