You are viewing a plain text version of this content. The canonical link for it is here.
Posted to solr-commits@lucene.apache.org by no...@apache.org on 2009/11/21 08:56:14 UTC

svn commit: r882852 - in /lucene/solr/trunk/contrib/dataimporthandler/src: main/java/org/apache/solr/handler/dataimport/XPathEntityProcessor.java test/java/org/apache/solr/handler/dataimport/TestXPathEntityProcessor.java

Author: noble
Date: Sat Nov 21 07:56:14 2009
New Revision: 882852

URL: http://svn.apache.org/viewvc?rev=882852&view=rev
Log:
SOLR-1539

Modified:
    lucene/solr/trunk/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/XPathEntityProcessor.java
    lucene/solr/trunk/contrib/dataimporthandler/src/test/java/org/apache/solr/handler/dataimport/TestXPathEntityProcessor.java

Modified: lucene/solr/trunk/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/XPathEntityProcessor.java
URL: http://svn.apache.org/viewvc/lucene/solr/trunk/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/XPathEntityProcessor.java?rev=882852&r1=882851&r2=882852&view=diff
==============================================================================
--- lucene/solr/trunk/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/XPathEntityProcessor.java (original)
+++ lucene/solr/trunk/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/XPathEntityProcessor.java Sat Nov 21 07:56:14 2009
@@ -51,6 +51,8 @@
 public class XPathEntityProcessor extends EntityProcessorBase {
   private static final Logger LOG = LoggerFactory.getLogger(XPathEntityProcessor.class);
 
+  private static final Map<String, Object> END_MARKER = new HashMap<String, Object>();
+  
   protected List<String> placeHolderVariables;
 
   protected List<String> commonFields;
@@ -67,8 +69,17 @@
 
   protected boolean streamRows = false;
 
-  private int batchSz = 1000;
+  // Amount of time to block reading/writing to queue when streaming
+  protected int blockingQueueTimeOut = 10;
+  
+  // Units for pumpTimeOut
+  protected TimeUnit blockingQueueTimeOutUnits = TimeUnit.SECONDS;
+  
+  // Number of rows to queue for asynchronous processing
+  protected int blockingQueueSize = 1000;
 
+  protected Thread publisherThread;
+  
   @SuppressWarnings("unchecked")
   public void init(Context context) {
     super.init(context);
@@ -85,8 +96,11 @@
             .getEntityAttribute(USE_SOLR_ADD_SCHEMA));
     streamRows = Boolean.parseBoolean(context
             .getEntityAttribute(STREAM));
-    if (context.getEntityAttribute("batchSize") != null) {
-      batchSz = Integer.parseInt(context.getEntityAttribute("batchSize"));
+    if (context.getResolvedEntityAttribute("batchSize") != null) {
+      blockingQueueSize = Integer.parseInt(context.getEntityAttribute("batchSize"));
+    }
+    if (context.getResolvedEntityAttribute("readTimeOut") != null) {
+      blockingQueueTimeOut = Integer.parseInt(context.getEntityAttribute("readTimeOut"));
     }
     String xslt = context.getEntityAttribute(XSL);
     if (xslt != null) {
@@ -316,7 +330,7 @@
     }
   }
 
-  private Map<String, Object> readRow(Map<String, Object> record, String xpath) {
+  protected Map<String, Object> readRow(Map<String, Object> record, String xpath) {
     if (useSolrAddXml) {
       List<String> names = (List<String>) record.get("name");
       List<String> values = (List<String>) record.get("value");
@@ -381,33 +395,58 @@
   private Iterator<Map<String, Object>> getRowIterator(final Reader data, final String s) {
     //nothing atomic about it. I just needed a StongReference
     final AtomicReference<Exception> exp = new AtomicReference<Exception>();
-    final BlockingQueue<Map<String, Object>> blockingQueue = new ArrayBlockingQueue<Map<String, Object>>(batchSz);
+    final BlockingQueue<Map<String, Object>> blockingQueue = new ArrayBlockingQueue<Map<String, Object>>(blockingQueueSize);
     final AtomicBoolean isEnd = new AtomicBoolean(false);
-    new Thread() {
+    final AtomicBoolean throwExp = new AtomicBoolean(true);
+    publisherThread = new Thread() {
       public void run() {
         try {
           xpathReader.streamRecords(data, new XPathRecordReader.Handler() {
             @SuppressWarnings("unchecked")
             public void handle(Map<String, Object> record, String xpath) {
-              if (isEnd.get()) return;
+              if (isEnd.get()) {
+                throwExp.set(false);
+                //To end the streaming . otherwise the parsing will go on forever
+                //though consumer has gone away
+                throw new RuntimeException("BREAK");
+              }
+              Map<String, Object> row;
               try {
-                blockingQueue.offer(readRow(record, xpath), 10, TimeUnit.SECONDS);
+                row = readRow(record, xpath);
               } catch (Exception e) {
                 isEnd.set(true);
+                return;
               }
+              offer(row);
             }
           });
         } catch (Exception e) {
-          exp.set(e);
+          if(throwExp.get()) exp.set(e);
         } finally {
           closeIt(data);
-          try {
-            blockingQueue.offer(Collections.EMPTY_MAP, 10, TimeUnit.SECONDS);
-          } catch (Exception e) {
+          if (!isEnd.get()) {
+            offer(END_MARKER);
+          }
+        }
+      }
+      
+      private void offer(Map<String, Object> row) {
+        try {
+          while (!blockingQueue.offer(row, blockingQueueTimeOut, blockingQueueTimeOutUnits)) {
+            if (isEnd.get()) return;
+            LOG.debug("Timeout elapsed writing records.  Perhaps buffer size should be increased.");
+          }
+        } catch (InterruptedException e) {
+          return;
+        } finally {
+          synchronized (this) {
+            notifyAll();
           }
         }
       }
-    }.start();
+    };
+    
+    publisherThread.start();
 
     return new Iterator<Map<String, Object>>() {
       private Map<String, Object> lastRow;
@@ -418,29 +457,38 @@
       }
 
       public Map<String, Object> next() {
-        try {
-          Map<String, Object> row = blockingQueue.poll(10, TimeUnit.SECONDS);
-          if (row == null || row == Collections.EMPTY_MAP) {
-            isEnd.set(true);
-            if (exp.get() != null) {
-              String msg = "Parsing failed for xml, url:" + s + " rows processed in this xml:" + count;
-              if (lastRow != null) msg += " last row in this xml:" + lastRow;
-              if (ABORT.equals(onError)) {
-                wrapAndThrow(SEVERE, exp.get(), msg);
-              } else if (SKIP.equals(onError)) {
-                wrapAndThrow(DataImportHandlerException.SKIP, exp.get());
-              } else {
-                LOG.warn(msg, exp.get());
-              }
+        Map<String, Object> row;
+        
+        do {
+          try {
+            row = blockingQueue.poll(blockingQueueTimeOut, blockingQueueTimeOutUnits);
+            if (row == null) {
+              LOG.debug("Timeout elapsed reading records.");
             }
+          } catch (InterruptedException e) {
+            LOG.debug("Caught InterruptedException while waiting for row.  Aborting.");
+            isEnd.set(true);
             return null;
           }
-          count++;
-          return lastRow = row;
-        } catch (InterruptedException e) {
+        } while (row == null);
+        
+        if (row == END_MARKER) {
           isEnd.set(true);
+          if (exp.get() != null) {
+            String msg = "Parsing failed for xml, url:" + s + " rows processed in this xml:" + count;
+            if (lastRow != null) msg += " last row in this xml:" + lastRow;
+            if (ABORT.equals(onError)) {
+              wrapAndThrow(SEVERE, exp.get(), msg);
+            } else if (SKIP.equals(onError)) {
+              wrapAndThrow(DataImportHandlerException.SKIP, exp.get());
+            } else {
+              LOG.warn(msg, exp.get());
+            }
+          }
           return null;
-        }
+        } 
+        count++;
+        return lastRow = row;
       }
 
       public void remove() {

Modified: lucene/solr/trunk/contrib/dataimporthandler/src/test/java/org/apache/solr/handler/dataimport/TestXPathEntityProcessor.java
URL: http://svn.apache.org/viewvc/lucene/solr/trunk/contrib/dataimporthandler/src/test/java/org/apache/solr/handler/dataimport/TestXPathEntityProcessor.java?rev=882852&r1=882851&r2=882852&view=diff
==============================================================================
--- lucene/solr/trunk/contrib/dataimporthandler/src/test/java/org/apache/solr/handler/dataimport/TestXPathEntityProcessor.java (original)
+++ lucene/solr/trunk/contrib/dataimporthandler/src/test/java/org/apache/solr/handler/dataimport/TestXPathEntityProcessor.java Sat Nov 21 07:56:14 2009
@@ -27,6 +27,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.TimeUnit;
 
 /**
  * <p>
@@ -37,6 +38,10 @@
  * @since solr 1.3
  */
 public class TestXPathEntityProcessor {
+  boolean simulateSlowReader;
+  boolean simulateSlowResultProcessor;
+  int rowsToRead = -1;
+  
   @Test
   public void withFieldsAndXpath() throws Exception {
     long time = System.currentTimeMillis();
@@ -110,6 +115,9 @@
 
   @Test
   public void withFieldsAndXpathStream() throws Exception {
+    final Object monitor = new Object();
+    final boolean[] done = new boolean[1];
+    
     Map entityAttrs = createMap("name", "e", "url", "cd.xml",
         XPathEntityProcessor.FOR_EACH, "/catalog/cd", "stream", "true", "batchSize","1");
     List fields = new ArrayList();
@@ -118,22 +126,89 @@
     fields.add(createMap("column", "year", "xpath", "/catalog/cd/year"));
     Context c = AbstractDataImportHandlerTest.getContext(null,
         new VariableResolverImpl(), getDataSource(cdData), Context.FULL_DUMP, fields, entityAttrs);
-    XPathEntityProcessor xPathEntityProcessor = new XPathEntityProcessor();
+    XPathEntityProcessor xPathEntityProcessor = new XPathEntityProcessor() {
+      private int count;
+      
+      @Override
+      protected Map<String, Object> readRow(Map<String, Object> record,
+          String xpath) {
+        synchronized (monitor) {
+          if (simulateSlowReader && !done[0]) {
+            try {
+              monitor.wait(100);
+            } catch (InterruptedException e) {
+              throw new RuntimeException(e);
+            }
+          }
+        }
+        
+        return super.readRow(record, xpath);
+      }
+    };
+    
+    if (simulateSlowResultProcessor) {
+      xPathEntityProcessor.blockingQueueSize = 1;
+    }
+    xPathEntityProcessor.blockingQueueTimeOut = 1;
+    xPathEntityProcessor.blockingQueueTimeOutUnits = TimeUnit.MICROSECONDS;
+    
     xPathEntityProcessor.init(c);
     List<Map<String, Object>> result = new ArrayList<Map<String, Object>>();
     while (true) {
+      if (rowsToRead >= 0 && result.size() >= rowsToRead) {
+        Thread.currentThread().interrupt();
+      }
       Map<String, Object> row = xPathEntityProcessor.nextRow();
       if (row == null)
         break;
       result.add(row);
+      if (simulateSlowResultProcessor) {
+        synchronized (xPathEntityProcessor.publisherThread) {
+          if (xPathEntityProcessor.publisherThread.isAlive()) {
+            xPathEntityProcessor.publisherThread.wait(1000);
+          }
+        }
+      }
+    }
+    
+    synchronized (monitor) {
+      done[0] = true;
+      monitor.notify();
+    }
+    
+    // confirm that publisher thread stops.
+    xPathEntityProcessor.publisherThread.join(1000);
+    Assert.assertEquals("Expected thread to stop", false, xPathEntityProcessor.publisherThread.isAlive());
+    
+    Assert.assertEquals(rowsToRead < 0 ? 3 : rowsToRead, result.size());
+    
+    if (rowsToRead < 0) {
+      Assert.assertEquals("Empire Burlesque", result.get(0).get("title"));
+      Assert.assertEquals("Bonnie Tyler", result.get(1).get("artist"));
+      Assert.assertEquals("1982", result.get(2).get("year"));
     }
-    Assert.assertEquals(3, result.size());
-    Assert.assertEquals("Empire Burlesque", result.get(0).get("title"));
-    Assert.assertEquals("Bonnie Tyler", result.get(1).get("artist"));
-    Assert.assertEquals("1982", result.get(2).get("year"));
   }
 
   @Test
+  public void withFieldsAndXpathStreamContinuesOnTimeout() throws Exception {
+    simulateSlowReader = true;
+    withFieldsAndXpathStream();
+  }
+  
+  @Test
+  public void streamWritesMessageAfterBlockedAttempt() throws Exception {
+    simulateSlowResultProcessor = true;
+    withFieldsAndXpathStream();
+  }
+  
+  @Test
+  public void streamStopsAfterInterrupt() throws Exception {
+    simulateSlowResultProcessor = true;
+    rowsToRead = 1;
+    withFieldsAndXpathStream();
+  }
+  
+  @Test
   public void withDefaultSolrAndXsl() throws Exception {
     long time = System.currentTimeMillis();
     File tmpdir = new File("." + time);