You are viewing a plain text version of this content. The canonical link for it is here.
Posted to solr-commits@lucene.apache.org by no...@apache.org on 2009/11/21 08:56:14 UTC
svn commit: r882852 - in /lucene/solr/trunk/contrib/dataimporthandler/src:
main/java/org/apache/solr/handler/dataimport/XPathEntityProcessor.java
test/java/org/apache/solr/handler/dataimport/TestXPathEntityProcessor.java
Author: noble
Date: Sat Nov 21 07:56:14 2009
New Revision: 882852
URL: http://svn.apache.org/viewvc?rev=882852&view=rev
Log:
SOLR-1539
Modified:
lucene/solr/trunk/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/XPathEntityProcessor.java
lucene/solr/trunk/contrib/dataimporthandler/src/test/java/org/apache/solr/handler/dataimport/TestXPathEntityProcessor.java
Modified: lucene/solr/trunk/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/XPathEntityProcessor.java
URL: http://svn.apache.org/viewvc/lucene/solr/trunk/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/XPathEntityProcessor.java?rev=882852&r1=882851&r2=882852&view=diff
==============================================================================
--- lucene/solr/trunk/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/XPathEntityProcessor.java (original)
+++ lucene/solr/trunk/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/XPathEntityProcessor.java Sat Nov 21 07:56:14 2009
@@ -51,6 +51,8 @@
public class XPathEntityProcessor extends EntityProcessorBase {
private static final Logger LOG = LoggerFactory.getLogger(XPathEntityProcessor.class);
+ private static final Map<String, Object> END_MARKER = new HashMap<String, Object>();
+
protected List<String> placeHolderVariables;
protected List<String> commonFields;
@@ -67,8 +69,17 @@
protected boolean streamRows = false;
- private int batchSz = 1000;
+ // Amount of time to block reading/writing to queue when streaming
+ protected int blockingQueueTimeOut = 10;
+
+ // Units for pumpTimeOut
+ protected TimeUnit blockingQueueTimeOutUnits = TimeUnit.SECONDS;
+
+ // Number of rows to queue for asynchronous processing
+ protected int blockingQueueSize = 1000;
+ protected Thread publisherThread;
+
@SuppressWarnings("unchecked")
public void init(Context context) {
super.init(context);
@@ -85,8 +96,11 @@
.getEntityAttribute(USE_SOLR_ADD_SCHEMA));
streamRows = Boolean.parseBoolean(context
.getEntityAttribute(STREAM));
- if (context.getEntityAttribute("batchSize") != null) {
- batchSz = Integer.parseInt(context.getEntityAttribute("batchSize"));
+ if (context.getResolvedEntityAttribute("batchSize") != null) {
+ blockingQueueSize = Integer.parseInt(context.getEntityAttribute("batchSize"));
+ }
+ if (context.getResolvedEntityAttribute("readTimeOut") != null) {
+ blockingQueueTimeOut = Integer.parseInt(context.getEntityAttribute("readTimeOut"));
}
String xslt = context.getEntityAttribute(XSL);
if (xslt != null) {
@@ -316,7 +330,7 @@
}
}
- private Map<String, Object> readRow(Map<String, Object> record, String xpath) {
+ protected Map<String, Object> readRow(Map<String, Object> record, String xpath) {
if (useSolrAddXml) {
List<String> names = (List<String>) record.get("name");
List<String> values = (List<String>) record.get("value");
@@ -381,33 +395,58 @@
private Iterator<Map<String, Object>> getRowIterator(final Reader data, final String s) {
//nothing atomic about it. I just needed a StongReference
final AtomicReference<Exception> exp = new AtomicReference<Exception>();
- final BlockingQueue<Map<String, Object>> blockingQueue = new ArrayBlockingQueue<Map<String, Object>>(batchSz);
+ final BlockingQueue<Map<String, Object>> blockingQueue = new ArrayBlockingQueue<Map<String, Object>>(blockingQueueSize);
final AtomicBoolean isEnd = new AtomicBoolean(false);
- new Thread() {
+ final AtomicBoolean throwExp = new AtomicBoolean(true);
+ publisherThread = new Thread() {
public void run() {
try {
xpathReader.streamRecords(data, new XPathRecordReader.Handler() {
@SuppressWarnings("unchecked")
public void handle(Map<String, Object> record, String xpath) {
- if (isEnd.get()) return;
+ if (isEnd.get()) {
+ throwExp.set(false);
+ //To end the streaming . otherwise the parsing will go on forever
+ //though consumer has gone away
+ throw new RuntimeException("BREAK");
+ }
+ Map<String, Object> row;
try {
- blockingQueue.offer(readRow(record, xpath), 10, TimeUnit.SECONDS);
+ row = readRow(record, xpath);
} catch (Exception e) {
isEnd.set(true);
+ return;
}
+ offer(row);
}
});
} catch (Exception e) {
- exp.set(e);
+ if(throwExp.get()) exp.set(e);
} finally {
closeIt(data);
- try {
- blockingQueue.offer(Collections.EMPTY_MAP, 10, TimeUnit.SECONDS);
- } catch (Exception e) {
+ if (!isEnd.get()) {
+ offer(END_MARKER);
+ }
+ }
+ }
+
+ private void offer(Map<String, Object> row) {
+ try {
+ while (!blockingQueue.offer(row, blockingQueueTimeOut, blockingQueueTimeOutUnits)) {
+ if (isEnd.get()) return;
+ LOG.debug("Timeout elapsed writing records. Perhaps buffer size should be increased.");
+ }
+ } catch (InterruptedException e) {
+ return;
+ } finally {
+ synchronized (this) {
+ notifyAll();
}
}
}
- }.start();
+ };
+
+ publisherThread.start();
return new Iterator<Map<String, Object>>() {
private Map<String, Object> lastRow;
@@ -418,29 +457,38 @@
}
public Map<String, Object> next() {
- try {
- Map<String, Object> row = blockingQueue.poll(10, TimeUnit.SECONDS);
- if (row == null || row == Collections.EMPTY_MAP) {
- isEnd.set(true);
- if (exp.get() != null) {
- String msg = "Parsing failed for xml, url:" + s + " rows processed in this xml:" + count;
- if (lastRow != null) msg += " last row in this xml:" + lastRow;
- if (ABORT.equals(onError)) {
- wrapAndThrow(SEVERE, exp.get(), msg);
- } else if (SKIP.equals(onError)) {
- wrapAndThrow(DataImportHandlerException.SKIP, exp.get());
- } else {
- LOG.warn(msg, exp.get());
- }
+ Map<String, Object> row;
+
+ do {
+ try {
+ row = blockingQueue.poll(blockingQueueTimeOut, blockingQueueTimeOutUnits);
+ if (row == null) {
+ LOG.debug("Timeout elapsed reading records.");
}
+ } catch (InterruptedException e) {
+ LOG.debug("Caught InterruptedException while waiting for row. Aborting.");
+ isEnd.set(true);
return null;
}
- count++;
- return lastRow = row;
- } catch (InterruptedException e) {
+ } while (row == null);
+
+ if (row == END_MARKER) {
isEnd.set(true);
+ if (exp.get() != null) {
+ String msg = "Parsing failed for xml, url:" + s + " rows processed in this xml:" + count;
+ if (lastRow != null) msg += " last row in this xml:" + lastRow;
+ if (ABORT.equals(onError)) {
+ wrapAndThrow(SEVERE, exp.get(), msg);
+ } else if (SKIP.equals(onError)) {
+ wrapAndThrow(DataImportHandlerException.SKIP, exp.get());
+ } else {
+ LOG.warn(msg, exp.get());
+ }
+ }
return null;
- }
+ }
+ count++;
+ return lastRow = row;
}
public void remove() {
Modified: lucene/solr/trunk/contrib/dataimporthandler/src/test/java/org/apache/solr/handler/dataimport/TestXPathEntityProcessor.java
URL: http://svn.apache.org/viewvc/lucene/solr/trunk/contrib/dataimporthandler/src/test/java/org/apache/solr/handler/dataimport/TestXPathEntityProcessor.java?rev=882852&r1=882851&r2=882852&view=diff
==============================================================================
--- lucene/solr/trunk/contrib/dataimporthandler/src/test/java/org/apache/solr/handler/dataimport/TestXPathEntityProcessor.java (original)
+++ lucene/solr/trunk/contrib/dataimporthandler/src/test/java/org/apache/solr/handler/dataimport/TestXPathEntityProcessor.java Sat Nov 21 07:56:14 2009
@@ -27,6 +27,7 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.TimeUnit;
/**
* <p>
@@ -37,6 +38,10 @@
* @since solr 1.3
*/
public class TestXPathEntityProcessor {
+ boolean simulateSlowReader;
+ boolean simulateSlowResultProcessor;
+ int rowsToRead = -1;
+
@Test
public void withFieldsAndXpath() throws Exception {
long time = System.currentTimeMillis();
@@ -110,6 +115,9 @@
@Test
public void withFieldsAndXpathStream() throws Exception {
+ final Object monitor = new Object();
+ final boolean[] done = new boolean[1];
+
Map entityAttrs = createMap("name", "e", "url", "cd.xml",
XPathEntityProcessor.FOR_EACH, "/catalog/cd", "stream", "true", "batchSize","1");
List fields = new ArrayList();
@@ -118,22 +126,89 @@
fields.add(createMap("column", "year", "xpath", "/catalog/cd/year"));
Context c = AbstractDataImportHandlerTest.getContext(null,
new VariableResolverImpl(), getDataSource(cdData), Context.FULL_DUMP, fields, entityAttrs);
- XPathEntityProcessor xPathEntityProcessor = new XPathEntityProcessor();
+ XPathEntityProcessor xPathEntityProcessor = new XPathEntityProcessor() {
+ private int count;
+
+ @Override
+ protected Map<String, Object> readRow(Map<String, Object> record,
+ String xpath) {
+ synchronized (monitor) {
+ if (simulateSlowReader && !done[0]) {
+ try {
+ monitor.wait(100);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ return super.readRow(record, xpath);
+ }
+ };
+
+ if (simulateSlowResultProcessor) {
+ xPathEntityProcessor.blockingQueueSize = 1;
+ }
+ xPathEntityProcessor.blockingQueueTimeOut = 1;
+ xPathEntityProcessor.blockingQueueTimeOutUnits = TimeUnit.MICROSECONDS;
+
xPathEntityProcessor.init(c);
List<Map<String, Object>> result = new ArrayList<Map<String, Object>>();
while (true) {
+ if (rowsToRead >= 0 && result.size() >= rowsToRead) {
+ Thread.currentThread().interrupt();
+ }
Map<String, Object> row = xPathEntityProcessor.nextRow();
if (row == null)
break;
result.add(row);
+ if (simulateSlowResultProcessor) {
+ synchronized (xPathEntityProcessor.publisherThread) {
+ if (xPathEntityProcessor.publisherThread.isAlive()) {
+ xPathEntityProcessor.publisherThread.wait(1000);
+ }
+ }
+ }
+ }
+
+ synchronized (monitor) {
+ done[0] = true;
+ monitor.notify();
+ }
+
+ // confirm that publisher thread stops.
+ xPathEntityProcessor.publisherThread.join(1000);
+ Assert.assertEquals("Expected thread to stop", false, xPathEntityProcessor.publisherThread.isAlive());
+
+ Assert.assertEquals(rowsToRead < 0 ? 3 : rowsToRead, result.size());
+
+ if (rowsToRead < 0) {
+ Assert.assertEquals("Empire Burlesque", result.get(0).get("title"));
+ Assert.assertEquals("Bonnie Tyler", result.get(1).get("artist"));
+ Assert.assertEquals("1982", result.get(2).get("year"));
}
- Assert.assertEquals(3, result.size());
- Assert.assertEquals("Empire Burlesque", result.get(0).get("title"));
- Assert.assertEquals("Bonnie Tyler", result.get(1).get("artist"));
- Assert.assertEquals("1982", result.get(2).get("year"));
}
@Test
+ public void withFieldsAndXpathStreamContinuesOnTimeout() throws Exception {
+ simulateSlowReader = true;
+ withFieldsAndXpathStream();
+ }
+
+ @Test
+ public void streamWritesMessageAfterBlockedAttempt() throws Exception {
+ simulateSlowResultProcessor = true;
+ withFieldsAndXpathStream();
+ }
+
+ @Test
+ public void streamStopsAfterInterrupt() throws Exception {
+ simulateSlowResultProcessor = true;
+ rowsToRead = 1;
+ withFieldsAndXpathStream();
+ }
+
+ @Test
public void withDefaultSolrAndXsl() throws Exception {
long time = System.currentTimeMillis();
File tmpdir = new File("." + time);