You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by sh...@apache.org on 2011/08/25 13:16:44 UTC

svn commit: r1161505 - in /lucene/dev/trunk/solr/contrib/dataimporthandler: CHANGES.txt src/java/org/apache/solr/handler/dataimport/DocBuilder.java src/test/org/apache/solr/handler/dataimport/TestDocBuilderThreaded.java

Author: shalin
Date: Thu Aug 25 11:16:44 2011
New Revision: 1161505

URL: http://svn.apache.org/viewvc?rev=1161505&view=rev
Log:
SOLR-2668 -- DIH multithreaded mode does not rollback on errors from EntityProcessor

Modified:
    lucene/dev/trunk/solr/contrib/dataimporthandler/CHANGES.txt
    lucene/dev/trunk/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DocBuilder.java
    lucene/dev/trunk/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestDocBuilderThreaded.java

Modified: lucene/dev/trunk/solr/contrib/dataimporthandler/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/contrib/dataimporthandler/CHANGES.txt?rev=1161505&r1=1161504&r2=1161505&view=diff
==============================================================================
--- lucene/dev/trunk/solr/contrib/dataimporthandler/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/contrib/dataimporthandler/CHANGES.txt Thu Aug 25 11:16:44 2011
@@ -21,6 +21,7 @@ Bug Fixes
 * SOLR-2186: DataImportHandler's multi-threaded option throws NPE (Lance Norskog, Frank Wesemann, shalin)
 * SOLR-2655: DIH multi threaded mode does not resolve attributes correctly (Frank Wesemann, shalin)
 * SOLR-2695: Documents are collected in unsynchronized list in multi-threaded debug mode (Michael McCandless, shalin)
+* SOLR-2668: DIH multithreaded mode does not rollback on errors from EntityProcessor (Frank Wesemann, shalin)
 
 ==================  3.3.0 ==================
 

Modified: lucene/dev/trunk/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DocBuilder.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DocBuilder.java?rev=1161505&r1=1161504&r2=1161505&view=diff
==============================================================================
--- lucene/dev/trunk/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DocBuilder.java (original)
+++ lucene/dev/trunk/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DocBuilder.java Thu Aug 25 11:16:44 2011
@@ -305,7 +305,7 @@ public class DocBuilder {
         LOG.info("running multithreaded full-import");
         new EntityRunner(root,null).run(null,Context.FULL_DUMP,null);
       } catch (Exception e) {
-        LOG.error("error in import", e);
+        throw new RuntimeException("Error in multi-threaded import", e);
       }
     } else {
       buildDocument(getVariableResolver(), null, null, root, true, null);

Modified: lucene/dev/trunk/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestDocBuilderThreaded.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestDocBuilderThreaded.java?rev=1161505&r1=1161504&r2=1161505&view=diff
==============================================================================
--- lucene/dev/trunk/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestDocBuilderThreaded.java (original)
+++ lucene/dev/trunk/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestDocBuilderThreaded.java Thu Aug 25 11:16:44 2011
@@ -16,15 +16,15 @@
  */
 package org.apache.solr.handler.dataimport;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
 import org.junit.After;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
 
 /**
  * Test DocBuilder with "threads"
@@ -60,6 +60,8 @@ public class TestDocBuilderThreaded exte
     DemoProcessor.entitiesInitied = 0;
     DemoEvaluator.evaluated = 0;
     MockDataSource.clearCache();
+    assertU(delQ("*:*"));
+    assertU(commit());
     super.tearDown();
   }
 
@@ -86,6 +88,23 @@ public class TestDocBuilderThreaded exte
     assertEquals("Evaluator was invoked less times than the number of rows",
         4, DemoEvaluator.evaluated);
   }
+  @Test 
+  public void testContinue() throws Exception {
+    runFullImport(twoEntitiesWithFailingProcessor);
+    assertQ(req("*:*"), "//*[@numFound='0']"); // should rollback
+  }
+  
+  @Test
+  public void testContinueThreaded() throws Exception {
+    runFullImport(twoThreadedEntitiesWithFailingProcessor);
+    assertQ(req("*:*"), "//*[@numFound='0']"); // should rollback
+  }
+
+  @Test
+  public void testFailingTransformerContinueThreaded() throws Exception {
+    runFullImport(twoThreadedEntitiesWithFailingTransformer);
+    assertQ(req("*:*"), "//*[@numFound='4']");
+  }
 
   @SuppressWarnings("unchecked")
   private List<Map<String, Object>> getDetails4Worker(String aWorker) {
@@ -116,8 +135,7 @@ public class TestDocBuilderThreaded exte
           "</entity>" +
           "</document>" +
           "</dataConfig>";
-
-  private final String twoEntitiesWithProcessor =
+	private final String twoEntitiesWithProcessor =
 
       "<dataConfig> <dataSource type=\"MockDataSource\"/>\n" +
           "<document>" +
@@ -138,7 +156,7 @@ public class TestDocBuilderThreaded exte
           "</entity>" +
           "</document>" +
           "</dataConfig>";
-
+          
   private final String twoEntitiesWithEvaluatorProcessor =
 
       "<dataConfig> <dataSource type=\"MockDataSource\"/>\n" +
@@ -164,6 +182,89 @@ public class TestDocBuilderThreaded exte
           "</dataConfig>";
 
 
+  private final String twoThreadedEntitiesWithFailingProcessor =
+  
+        "<dataConfig> <dataSource type=\"MockDataSource\"/>\n" +
+            "<document>" +
+            "<entity name=\"job\" processor=\"TestDocBuilderThreaded$DemoProcessor\" \n" +
+            " threads=\"1\" " +
+            " query=\"select * from y\"" +
+            " pk=\"id\" \n" +
+            " worker=\"id\" \n" +
+            " onError=\"continue\" " +
+            ">" +
+            "<field column=\"id\" />\n" +
+            "<entity name=\"details\" processor=\"TestDocBuilderThreaded$FailingProcessor\" \n" +
+            "worker=\"${job.worker}\" \n" +
+            "query=\"${job.worker}\" \n" +
+            "transformer=\"TemplateTransformer\" " +
+            "onError=\"continue\" " +
+            "fail=\"yes\" " +
+            " >" +
+            "<field column=\"author_s\" />" +
+            "<field column=\"title_s\" />" +
+            " <field column=\"text_s\" />" +
+            " <field column=\"generated_id_s\" template=\"generated_${job.id}\" />" +
+            "</entity>" +
+            "</entity>" +
+            "</document>" +
+            "</dataConfig>";
+  
+  private final String twoEntitiesWithFailingProcessor =
+    
+    "<dataConfig> <dataSource type=\"MockDataSource\"/>\n" +
+        "<document>" +
+        "<entity name=\"job\" processor=\"TestDocBuilderThreaded$DemoProcessor\" \n" +
+        " query=\"select * from y\"" +
+        " pk=\"id\" \n" +
+        " worker=\"id\" \n" +
+        " onError=\"continue\" " +
+        ">" +
+        "<field column=\"id\" />\n" +
+        "<entity name=\"details\" processor=\"TestDocBuilderThreaded$FailingProcessor\" \n" +
+        "worker=\"${job.worker}\" \n" +
+        "query=\"${job.worker}\" \n" +
+        "transformer=\"TemplateTransformer\" " +
+        "onError=\"continue\" " +
+        "fail=\"yes\" " +
+        " >" +
+        "<field column=\"author_s\" />" +
+        "<field column=\"title_s\" />" +
+        " <field column=\"text_s\" />" +
+        " <field column=\"generated_id_s\" template=\"generated_${job.id}\" />" +
+        "</entity>" +
+        "</entity>" +
+        "</document>" +
+        "</dataConfig>";
+
+  private final String twoThreadedEntitiesWithFailingTransformer =
+
+    "<dataConfig> <dataSource type=\"MockDataSource\"/>\n" +
+        "<document>" +
+        "<entity name=\"job\" processor=\"TestDocBuilderThreaded$DemoProcessor\" \n" +
+        " threads=\"1\" " +
+        " query=\"select * from y\"" +
+        " pk=\"id\" \n" +
+        " worker=\"id\" \n" +
+        " onError=\"continue\" " +
+        ">" +
+        "<field column=\"id\" />\n" +
+        "<entity name=\"details\" \n" +
+        "worker=\"${job.worker}\" \n" +
+        "query=\"${job.worker}\" \n" +
+        "transformer=\"TestDocBuilderThreaded$FailingTransformer\" " +
+        "onError=\"continue\" " +
+        " >" +
+        "<field column=\"author_s\" />" +
+        "<field column=\"title_s\" />" +
+        " <field column=\"text_s\" />" +
+        " <field column=\"generated_id_s\" template=\"generated_${job.id}\" />" +
+        "</entity>" +
+        "</entity>" +
+        "</document>" +
+        "</dataConfig>";
+
+
   public static class DemoProcessor extends SqlEntityProcessor {
 
     public static int entitiesInitied = 0;
@@ -177,6 +278,23 @@ public class TestDocBuilderThreaded exte
       } else entitiesInitied++;
     }
   }
+  public static class FailingProcessor extends SqlEntityProcessor {
+    @Override
+    public void init(Context context) {
+      super.init(context);
+      String fail = context.getResolvedEntityAttribute("fail");
+      if (fail != null && fail.equalsIgnoreCase("yes")) {
+        throw new NullPointerException("I was told to");
+      }      
+    }
+  }
+
+  public static class FailingTransformer extends Transformer  {
+    @Override
+    public Object transformRow(Map<String, Object> row, Context context) {
+      throw new RuntimeException("Always fail");
+    }
+  }
 
   public static class DemoEvaluator extends Evaluator {
     public static int evaluated = 0;
@@ -196,4 +314,5 @@ public class TestDocBuilderThreaded exte
       return result.toString();
     }
   }
+  
 }