You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by jd...@apache.org on 2012/03/26 17:04:39 UTC
svn commit: r1305384 - in
/lucene/dev/trunk/solr/contrib/dataimporthandler/src:
java/org/apache/solr/handler/dataimport/
test/org/apache/solr/handler/dataimport/
Author: jdyer
Date: Mon Mar 26 15:04:38 2012
New Revision: 1305384
URL: http://svn.apache.org/viewvc?rev=1305384&view=rev
Log:
SOLR-3262: Remove "threads" from DIH
Removed:
lucene/dev/trunk/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/ThreadedContext.java
lucene/dev/trunk/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/ThreadedEntityProcessorWrapper.java
lucene/dev/trunk/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestDocBuilderThreaded.java
lucene/dev/trunk/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestThreaded.java
Modified:
lucene/dev/trunk/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/Context.java
lucene/dev/trunk/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/ContextImpl.java
lucene/dev/trunk/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DataConfig.java
lucene/dev/trunk/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DataImporter.java
lucene/dev/trunk/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DocBuilder.java
lucene/dev/trunk/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/EntityProcessorWrapper.java
lucene/dev/trunk/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/EvaluatorBag.java
lucene/dev/trunk/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/SolrEntityProcessor.java
lucene/dev/trunk/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestEphemeralCache.java
lucene/dev/trunk/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestEvaluatorBag.java
lucene/dev/trunk/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSolrEntityProcessorEndToEnd.java
lucene/dev/trunk/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSolrEntityProcessorUnit.java
lucene/dev/trunk/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSqlEntityProcessor2.java
lucene/dev/trunk/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestVariableResolver.java
Modified: lucene/dev/trunk/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/Context.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/Context.java?rev=1305384&r1=1305383&r2=1305384&view=diff
==============================================================================
--- lucene/dev/trunk/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/Context.java (original)
+++ lucene/dev/trunk/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/Context.java Mon Mar 26 15:04:38 2012
@@ -220,6 +220,4 @@ public abstract class Context {
*/
public abstract String replaceTokens(String template);
- static final ThreadLocal<Context> CURRENT_CONTEXT = new ThreadLocal<Context>();
-
}
Modified: lucene/dev/trunk/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/ContextImpl.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/ContextImpl.java?rev=1305384&r1=1305383&r2=1305384&view=diff
==============================================================================
--- lucene/dev/trunk/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/ContextImpl.java (original)
+++ lucene/dev/trunk/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/ContextImpl.java Mon Mar 26 15:04:38 2012
@@ -20,9 +20,9 @@ package org.apache.solr.handler.dataimpo
import org.apache.solr.core.SolrCore;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
/**
* <p>
@@ -132,32 +132,30 @@ public class ContextImpl extends Context
@Override
public void setSessionAttribute(String name, Object val, String scope) {
- if(name == null) return;
+ if(name == null) {
+ return;
+ }
if (Context.SCOPE_ENTITY.equals(scope)) {
- if (entitySession == null)
- entitySession = new ConcurrentHashMap<String, Object>();
-
- putVal(name, val,entitySession);
+ if (entitySession == null) {
+ entitySession = new HashMap<String, Object>();
+ }
+ entitySession.put(name, val);
} else if (Context.SCOPE_GLOBAL.equals(scope)) {
if (globalSession != null) {
- putVal(name, val,globalSession);
+ globalSession.put(name, val);
}
} else if (Context.SCOPE_DOC.equals(scope)) {
DocBuilder.DocWrapper doc = getDocument();
- if (doc != null)
+ if (doc != null) {
doc.setSessionAttribute(name, val);
+ }
} else if (SCOPE_SOLR_CORE.equals(scope)){
if(dataImporter != null) {
- putVal(name, val,dataImporter.getCoreScopeSession());
+ dataImporter.getCoreScopeSession().put(name, val);
}
}
}
- private void putVal(String name, Object val, Map map) {
- if(val == null) map.remove(name);
- else map.put(name, val);
- }
-
@Override
public Object getSessionAttribute(String name, String scope) {
if (Context.SCOPE_ENTITY.equals(scope)) {
Modified: lucene/dev/trunk/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DataConfig.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DataConfig.java?rev=1305384&r1=1305383&r2=1305384&view=diff
==============================================================================
--- lucene/dev/trunk/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DataConfig.java (original)
+++ lucene/dev/trunk/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DataConfig.java Mon Mar 26 15:04:38 2012
@@ -55,8 +55,6 @@ public class DataConfig {
public Map<String, SchemaField> lowerNameVsSchemaField = new HashMap<String, SchemaField>();
- boolean isMultiThreaded = false;
-
public static class Document {
// TODO - remove from here and add it to entity
public String deleteQuery;
Modified: lucene/dev/trunk/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DataImporter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DataImporter.java?rev=1305384&r1=1305383&r2=1305384&view=diff
==============================================================================
--- lucene/dev/trunk/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DataImporter.java (original)
+++ lucene/dev/trunk/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DataImporter.java Mon Mar 26 15:04:38 2012
@@ -45,7 +45,6 @@ import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.ConcurrentHashMap;
/**
* <p> Stores all configuration information for pulling and indexing data. </p>
@@ -94,7 +93,7 @@ public class DataImporter {
* Only for testing purposes
*/
DataImporter() {
- coreScopeSession = new ConcurrentHashMap<String, Object>();
+ coreScopeSession = new HashMap<String, Object>();
createPropertyWriter();
propWriter.init(this);
this.handlerName = "dataimport" ;
@@ -246,11 +245,6 @@ public class DataImporter {
// if in this chain no document root is found()
e.isDocRoot = true;
}
- if (e.allAttributes.get("threads") != null) {
- if(docRootFound) throw new DataImportHandlerException(DataImportHandlerException.SEVERE, "'threads' not allowed below rootEntity ");
- config.isMultiThreaded = true;
- }
-
if (e.fields != null) {
for (DataConfig.Field f : e.fields) {
if (schema != null) {
Modified: lucene/dev/trunk/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DocBuilder.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DocBuilder.java?rev=1305384&r1=1305383&r2=1305384&view=diff
==============================================================================
--- lucene/dev/trunk/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DocBuilder.java (original)
+++ lucene/dev/trunk/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DocBuilder.java Mon Mar 26 15:04:38 2012
@@ -29,7 +29,6 @@ import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.*;
/**
* <p> {@link DocBuilder} is responsible for creating Solr documents out of the given configuration. It also maintains
@@ -62,10 +61,10 @@ public class DocBuilder {
boolean verboseDebug = false;
- Map<String, Object> session = new ConcurrentHashMap<String, Object>();
+ Map<String, Object> session = new HashMap<String, Object>();
static final ThreadLocal<DocBuilder> INSTANCE = new ThreadLocal<DocBuilder>();
- Map<String, Object> functionsNamespace;
+ private Map<String, Object> functionsNamespace;
private Properties persistedProperties;
private DIHPropertiesWriter propWriter;
@@ -83,8 +82,8 @@ public class DocBuilder {
DataImporter.QUERY_COUNT.set(importStatistics.queryCount);
requestParameters = reqParams;
verboseDebug = requestParameters.debug && requestParameters.verbose;
- functionsNamespace = EvaluatorBag.getFunctionsNamespace(this.dataImporter.getConfig().functions, this);
persistedProperties = propWriter.readIndexerProperties();
+ functionsNamespace = EvaluatorBag.getFunctionsNamespace(this.dataImporter.getConfig().functions, this, getVariableResolver());
String writerClassStr = null;
if(reqParams!=null && reqParams.requestParams != null) {
@@ -149,6 +148,13 @@ public class DocBuilder {
return null;
}
}
+
+ private Map<String,Object> getFunctionsNamespace() {
+ if(functionsNamespace==null) {
+
+ }
+ return functionsNamespace;
+ }
private void invokeEventListener(String className) {
try {
@@ -298,31 +304,8 @@ public class DocBuilder {
}
private void doFullDump() {
- addStatusMessage("Full Dump Started");
- if (dataImporter.getConfig().isMultiThreaded && !verboseDebug) {
- EntityRunner entityRunner = null;
- try {
- LOG.info("running multithreaded full-import");
- entityRunner = new EntityRunner(root, null);
- entityRunner.run(null, Context.FULL_DUMP, null);
- } catch (Exception e) {
- throw new RuntimeException("Error in multi-threaded import", e);
- } finally {
- if (entityRunner != null) {
- List<EntityRunner> closure = new ArrayList<EntityRunner>();
- closure.add(entityRunner);
- for (int i = 0; i < closure.size(); i++) {
- assert(!closure.get(i).entityProcessorWrapper.isEmpty());
- closure.addAll(closure.get(i).entityProcessorWrapper.iterator().next().children.values());
- }
- for (EntityRunner er : closure) {
- er.entityProcessor.destroy();
- }
- }
- }
- } else {
- buildDocument(getVariableResolver(), null, null, root, true, null);
- }
+ addStatusMessage("Full Dump Started");
+ buildDocument(getVariableResolver(), null, null, root, true, null);
}
@SuppressWarnings("unchecked")
@@ -388,218 +371,11 @@ public class DocBuilder {
iter.remove();
}
}
- Executor executorSvc = new ThreadPoolExecutor(
- 0,
- Integer.MAX_VALUE,
- 5, TimeUnit.SECONDS, // terminate idle threads after 5 sec
- new SynchronousQueue<Runnable>() // directly hand off tasks
- );
-
+
@SuppressWarnings("unchecked")
public void addStatusMessage(String msg) {
statusMessages.put(msg, DataImporter.DATE_TIME_FORMAT.get().format(new Date()));
}
- EntityRunner createRunner(DataConfig.Entity entity, EntityRunner parent){
- return new EntityRunner(entity, parent);
- }
-
- /**This class is a just a structure to hold runtime information of one entity
- *
- */
- class EntityRunner {
- final DataConfig.Entity entity;
- private EntityProcessor entityProcessor;
- private final List<ThreadedEntityProcessorWrapper> entityProcessorWrapper = new ArrayList<ThreadedEntityProcessorWrapper>();
- private DocWrapper docWrapper;
- private volatile boolean entityInitialized ;
- String currentProcess;
- final ThreadLocal<ThreadedEntityProcessorWrapper> currentEntityProcWrapper = new ThreadLocal<ThreadedEntityProcessorWrapper>();
-
- private ContextImpl context;
- final EntityRunner parent;
- final AtomicBoolean entityEnded = new AtomicBoolean(false);
- private Exception exception;
-
- public EntityRunner(DataConfig.Entity entity, EntityRunner parent) {
- this.parent = parent;
- this.entity = entity;
- if (entity.proc == null) {
- entityProcessor = new SqlEntityProcessor();
- } else {
- try {
- entityProcessor = (EntityProcessor) loadClass(entity.proc, dataImporter.getCore())
- .newInstance();
- } catch (Exception e) {
- wrapAndThrow(SEVERE, e,
- "Unable to load EntityProcessor implementation for entity:" + entity.name);
- }
- }
- int threads = 1;
- if (entity.allAttributes.get("threads") != null) {
- threads = Integer.parseInt(entity.allAttributes.get("threads"));
- }
- for (int i = 0; i < threads; i++) {
- entityProcessorWrapper.add(new ThreadedEntityProcessorWrapper(entityProcessor, DocBuilder.this, this, getVariableResolver()));
- }
- context = new ThreadedContext(this, DocBuilder.this, getVariableResolver());
- }
-
-
- public void run(DocWrapper docWrapper, final String currProcess, final EntityRow rows) throws Exception {
- entityInitialized = false;
- this.docWrapper = docWrapper;
- this.currentProcess = currProcess;
- entityEnded.set(false);
- try {
- if(entityProcessorWrapper.size() <= 1){
- runAThread(entityProcessorWrapper.get(0), rows, currProcess);
- } else {
- final CountDownLatch latch = new CountDownLatch(entityProcessorWrapper.size());
- for (final ThreadedEntityProcessorWrapper processorWrapper : entityProcessorWrapper) {
- Runnable runnable = new Runnable() {
- public void run() {
- try {
- runAThread(processorWrapper, rows, currProcess);
- }catch(Exception e) {
- entityEnded.set(true);
- exception = e;
- } finally {
- latch.countDown();
- }
- }
- };
- executorSvc.execute(runnable);
- }
- try {
- latch.await();
- } catch (InterruptedException e) {
- //TODO
- }
- Exception copy = exception;
- if(copy != null){
- exception = null;
- throw copy;
- }
- }
- } finally {
- }
-
-
- }
-
- private void runAThread(ThreadedEntityProcessorWrapper epw, EntityRow rows, String currProcess) throws Exception {
- currentEntityProcWrapper.set(epw);
- epw.threadedInit(context);
- try {
- Context.CURRENT_CONTEXT.set(context);
- epw.init(rows);
- initEntity();
- DocWrapper docWrapper = this.docWrapper;
- for (; ;) {
- if(DocBuilder.this.stop.get()) break;
- try {
- Map<String, Object> arow = epw.nextRow();
- if (arow == null) {
- break;
- } else {
- importStatistics.rowsCount.incrementAndGet();
- if (docWrapper == null && entity.isDocRoot) {
- docWrapper = new DocWrapper();
- context.setDoc(docWrapper);
- DataConfig.Entity e = entity.parentEntity;
- for (EntityRow row = rows; row != null&& e !=null; row = row.tail,e=e.parentEntity) {
- addFields(e, docWrapper, row.row, epw.resolver);
- }
- }
- if (docWrapper != null) {
- handleSpecialCommands(arow, docWrapper);
- addFields(entity, docWrapper, arow, epw.resolver);
- }
- if (entity.entities != null) {
- EntityRow nextRow = new EntityRow(arow, rows, entity.name);
- for (DataConfig.Entity e : entity.entities) {
- epw.children.get(e).run(docWrapper,currProcess,nextRow);
- }
- }
- }
- if (entity.isDocRoot) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("a row on docroot" + docWrapper);
- }
- if (!docWrapper.isEmpty()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("adding a doc "+docWrapper);
- }
- boolean result = writer.upload(docWrapper);
- if(reqParams.debug) {
- reqParams.debugDocuments.add(docWrapper);
- }
- docWrapper = null;
- if (result){
- importStatistics.docCount.incrementAndGet();
- } else {
- importStatistics.failedDocCount.incrementAndGet();
- }
- }
- }
- } catch (DataImportHandlerException dihe) {
- exception = dihe;
- if(dihe.getErrCode() == SKIP_ROW || dihe.getErrCode() == SKIP) {
- importStatistics.skipDocCount.getAndIncrement();
- exception = null;//should not propogate up
- continue;
- }
- if (entity.isDocRoot) {
- if (dihe.getErrCode() == DataImportHandlerException.SKIP) {
- importStatistics.skipDocCount.getAndIncrement();
- exception = null;//should not propogate up
- } else {
- SolrException.log(LOG, "Exception while processing: "
- + entity.name + " document : " + docWrapper, dihe);
- }
- if (dihe.getErrCode() == DataImportHandlerException.SEVERE)
- throw dihe;
- } else {
- //if this is not the docRoot then the execution has happened in the same thread. so propogate up,
- // it will be handled at the docroot
- entityEnded.set(true);
- throw dihe;
- }
- entityEnded.set(true);
- }
- }
- } finally {
- currentEntityProcWrapper.remove();
- Context.CURRENT_CONTEXT.remove();
- }
- }
-
- private void initEntity() {
- if (!entityInitialized) {
- synchronized (this) {
- if (!entityInitialized) {
- entityProcessor.init(context);
- entityInitialized = true;
- }
- }
- }
- }
- }
-
- /**A reverse linked list .
- *
- */
- static class EntityRow {
- final Map<String, Object> row;
- final EntityRow tail;
- final String name;
-
- EntityRow(Map<String, Object> row, EntityRow tail, String name) {
- this.row = row;
- this.tail = tail;
- this.name = name;
- }
- }
private void resetEntity(DataConfig.Entity entity) {
entity.initalized = false;
@@ -637,7 +413,6 @@ public class DocBuilder {
pk == null ? Context.FULL_DUMP : Context.DELTA_DUMP,
session, parentCtx, this);
entityProcessor.init(ctx);
- Context.CURRENT_CONTEXT.set(ctx);
if (!entity.initalized) {
entitiesToDestroy.add(entityProcessor);
entity.initalized = true;
@@ -710,11 +485,6 @@ public class DocBuilder {
}
vr.removeNamespace(entity.name);
}
- /*The child entities would have changed the CURRENT_CONTEXT. So when they are done, set it back to the old.
- *
- */
- Context.CURRENT_CONTEXT.set(ctx);
-
if (entity.isDocRoot) {
if (stop.get())
return;
@@ -731,7 +501,6 @@ public class DocBuilder {
}
}
}
-
} catch (DataImportHandlerException e) {
if (verboseDebug) {
getDebugLogger().log(DIHLogLevels.ENTITY_EXCEPTION, entity.name, e);
@@ -761,7 +530,6 @@ public class DocBuilder {
getDebugLogger().log(DIHLogLevels.ROW_END, entity.name, null);
if (entity.isDocRoot)
getDebugLogger().log(DIHLogLevels.END_DOC, null, null);
- Context.CURRENT_CONTEXT.remove();
}
}
}
Modified: lucene/dev/trunk/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/EntityProcessorWrapper.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/EntityProcessorWrapper.java?rev=1305384&r1=1305383&r2=1305384&view=diff
==============================================================================
--- lucene/dev/trunk/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/EntityProcessorWrapper.java (original)
+++ lucene/dev/trunk/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/EntityProcessorWrapper.java Mon Mar 26 15:04:38 2012
@@ -37,18 +37,18 @@ import java.util.Map;
public class EntityProcessorWrapper extends EntityProcessor {
private static final Logger log = LoggerFactory.getLogger(EntityProcessorWrapper.class);
- EntityProcessor delegate;
+ private EntityProcessor delegate;
private DocBuilder docBuilder;
- String onError;
- protected Context context;
- protected VariableResolverImpl resolver;
- String entityName;
+ private String onError;
+ private Context context;
+ private VariableResolverImpl resolver;
+ private String entityName;
protected List<Transformer> transformers;
protected List<Map<String, Object>> rowcache;
-
+
public EntityProcessorWrapper(EntityProcessor delegate, DocBuilder docBuilder) {
this.delegate = delegate;
this.docBuilder = docBuilder;
@@ -59,7 +59,6 @@ public class EntityProcessorWrapper exte
rowcache = null;
this.context = context;
resolver = (VariableResolverImpl) context.getVariableResolver();
- //context has to be set correctly . keep the copy of the old one so that it can be restored in destroy
if (entityName == null) {
onError = resolver.replaceTokens(context.getEntityAttribute(ON_ERROR));
if (onError == null) onError = ABORT;
Modified: lucene/dev/trunk/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/EvaluatorBag.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/EvaluatorBag.java?rev=1305384&r1=1305383&r2=1305384&view=diff
==============================================================================
--- lucene/dev/trunk/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/EvaluatorBag.java (original)
+++ lucene/dev/trunk/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/EvaluatorBag.java Mon Mar 26 15:04:38 2012
@@ -190,7 +190,7 @@ public class EvaluatorBag {
};
}
- static Map<String, Object> getFunctionsNamespace(final List<Map<String, String>> fn, DocBuilder docBuilder) {
+ static Map<String, Object> getFunctionsNamespace(final List<Map<String, String>> fn, DocBuilder docBuilder, final VariableResolverImpl vr) {
final Map<String, Evaluator> evaluators = new HashMap<String, Evaluator>();
evaluators.put(DATE_FORMAT_EVALUATOR, getDateFormatEvaluator());
evaluators.put(SQL_ESCAPE_EVALUATOR, getSqlEscapingEvaluator());
@@ -217,7 +217,9 @@ public class EvaluatorBag {
Evaluator evaluator = evaluators.get(fname);
if (evaluator == null)
return null;
- return evaluator.evaluate(m.group(2), Context.CURRENT_CONTEXT.get());
+ ContextImpl ctx = new ContextImpl(null, vr, null, null, null, null, null);
+ String g2 = m.group(2);
+ return evaluator.evaluate(g2, ctx);
}
};
Modified: lucene/dev/trunk/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/SolrEntityProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/SolrEntityProcessor.java?rev=1305384&r1=1305383&r2=1305384&view=diff
==============================================================================
--- lucene/dev/trunk/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/SolrEntityProcessor.java (original)
+++ lucene/dev/trunk/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/SolrEntityProcessor.java Mon Mar 26 15:04:38 2012
@@ -147,9 +147,7 @@ public class SolrEntityProcessor extends
/**
* The following method changes the rowIterator mutable field. It requires
- * external synchronization. In fact when used in a multi-threaded setup the nextRow() method is called from a
- * synchronized block {@link ThreadedEntityProcessorWrapper#nextRow()}, so this
- * is taken care of.
+ * external synchronization.
*/
private void buildIterator() {
if (rowIterator == null) {
Modified: lucene/dev/trunk/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestEphemeralCache.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestEphemeralCache.java?rev=1305384&r1=1305383&r2=1305384&view=diff
==============================================================================
--- lucene/dev/trunk/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestEphemeralCache.java (original)
+++ lucene/dev/trunk/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestEphemeralCache.java Mon Mar 26 15:04:38 2012
@@ -42,24 +42,10 @@ public class TestEphemeralCache extends
}
@Test
- public void testSingleThreaded() throws Exception {
- assertFullImport(getDataConfigDotXml(0));
+ public void test() throws Exception {
+ assertFullImport(getDataConfigDotXml());
}
-
- @Test
- public void testWithThreadedParamEqualOne() throws Exception {
- assertFullImport(getDataConfigDotXml(1));
- }
-
- @Ignore("TODO: fix included in SOLR-3011")
- @Test
- public void testMultiThreaded() throws Exception {
- // Try between 2 and 6 threads
- int numThreads = random.nextInt(4) + 2;
- System.out.println("TRYING " + numThreads);
- assertFullImport(getDataConfigDotXml(numThreads));
- }
-
+
@SuppressWarnings("unchecked")
private void setupMockData() {
List parentRows = new ArrayList();
@@ -98,7 +84,7 @@ public class TestEphemeralCache extends
MockDataSource.setIterator("SELECT * FROM CHILD_2", child2Rows.iterator());
}
- private String getDataConfigDotXml(int numThreads) {
+ private String getDataConfigDotXml() {
return
"<dataConfig>" +
" <dataSource type=\"MockDataSource\" />" +
@@ -109,7 +95,6 @@ public class TestEphemeralCache extends
" cacheImpl=\"org.apache.solr.handler.dataimport.DestroyCountCache\"" +
" cacheName=\"PARENT\"" +
" query=\"SELECT * FROM PARENT\" " +
- (numThreads==0 ? "" : "threads=\"" + numThreads + "\" ") +
" >" +
" <entity" +
" name=\"CHILD_1\"" +
Modified: lucene/dev/trunk/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestEvaluatorBag.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestEvaluatorBag.java?rev=1305384&r1=1305383&r2=1305384&view=diff
==============================================================================
--- lucene/dev/trunk/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestEvaluatorBag.java (original)
+++ lucene/dev/trunk/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestEvaluatorBag.java Mon Mar 26 15:04:38 2012
@@ -98,19 +98,16 @@ public class TestEvaluatorBag extends Ab
public void testEscapeSolrQueryFunction() {
final VariableResolverImpl resolver = new VariableResolverImpl();
ContextImpl context = new ContextImpl(null, resolver, null, Context.FULL_DUMP, Collections.EMPTY_MAP, null, null);
- Context.CURRENT_CONTEXT.set(context);
- try {
- Map m= new HashMap();
- m.put("query","c:t");
- resolver.addNamespace("dataimporter.functions", EvaluatorBag
- .getFunctionsNamespace(Collections.EMPTY_LIST, null));
- resolver.addNamespace("e",m);
- String s = resolver
- .replaceTokens("${dataimporter.functions.escapeQueryChars(e.query)}");
- org.junit.Assert.assertEquals("c\\:t", s);
- } finally {
- Context.CURRENT_CONTEXT.remove();
- }
+
+ Map m= new HashMap();
+ m.put("query","c:t");
+ resolver.addNamespace("dataimporter.functions", EvaluatorBag
+ .getFunctionsNamespace(Collections.EMPTY_LIST, null, resolver));
+ resolver.addNamespace("e",m);
+ String s = resolver
+ .replaceTokens("${dataimporter.functions.escapeQueryChars(e.query)}");
+ org.junit.Assert.assertEquals("c\\:t", s);
+
}
/**
@@ -121,43 +118,36 @@ public class TestEvaluatorBag extends Ab
public void testGetDateFormatEvaluator() {
Evaluator dateFormatEval = EvaluatorBag.getDateFormatEvaluator();
ContextImpl context = new ContextImpl(null, resolver, null, Context.FULL_DUMP, Collections.EMPTY_MAP, null, null);
- Context.CURRENT_CONTEXT.set(context);
- try {
- Calendar calendar = new GregorianCalendar();
- calendar.add(Calendar.DAY_OF_YEAR, -2);
-
- assertEquals(new SimpleDateFormat("yyyy-MM-dd HH:mm").format(calendar.getTime()),
- dateFormatEval.evaluate("'NOW-2DAYS','yyyy-MM-dd HH:mm'", Context.CURRENT_CONTEXT.get()));
-
- calendar = new GregorianCalendar();
- Date date = calendar.getTime();
-
- Map<String, Object> map = new HashMap<String, Object>();
- map.put("key", date);
- resolver.addNamespace("A", map);
-
- assertEquals(new SimpleDateFormat("yyyy-MM-dd HH:mm").format(date),
- dateFormatEval.evaluate("A.key, 'yyyy-MM-dd HH:mm'", Context.CURRENT_CONTEXT.get()));
- } finally {
- Context.CURRENT_CONTEXT.remove();
- }
+
+ Calendar calendar = new GregorianCalendar();
+ calendar.add(Calendar.DAY_OF_YEAR, -2);
+
+ assertEquals(new SimpleDateFormat("yyyy-MM-dd HH:mm").format(calendar.getTime()),
+ dateFormatEval.evaluate("'NOW-2DAYS','yyyy-MM-dd HH:mm'", context));
+
+ calendar = new GregorianCalendar();
+ Date date = calendar.getTime();
+
+ Map<String, Object> map = new HashMap<String, Object>();
+ map.put("key", date);
+ resolver.addNamespace("A", map);
+
+ assertEquals(new SimpleDateFormat("yyyy-MM-dd HH:mm").format(date),
+ dateFormatEval.evaluate("A.key, 'yyyy-MM-dd HH:mm'", context));
+
}
private void runTests(Map<String, String> tests, Evaluator evaluator) {
- ContextImpl ctx = new ContextImpl(null, resolver, null, Context.FULL_DUMP, Collections.EMPTY_MAP, null, null);
- Context.CURRENT_CONTEXT.set(ctx);
- try {
- for (Map.Entry<String, String> entry : tests.entrySet()) {
- Map<String, Object> values = new HashMap<String, Object>();
- values.put("key", entry.getKey());
- resolver.addNamespace("A", values);
-
- String expected = entry.getValue();
- String actual = evaluator.evaluate("A.key", ctx);
- assertEquals(expected, actual);
- }
- } finally {
- Context.CURRENT_CONTEXT.remove();
+ ContextImpl ctx = new ContextImpl(null, resolver, null, Context.FULL_DUMP, Collections.EMPTY_MAP, null, null);
+ for (Map.Entry<String, String> entry : tests.entrySet()) {
+ Map<String, Object> values = new HashMap<String, Object>();
+ values.put("key", entry.getKey());
+ resolver.addNamespace("A", values);
+
+ String expected = entry.getValue();
+ String actual = evaluator.evaluate("A.key", ctx);
+ assertEquals(expected, actual);
}
+
}
}
Modified: lucene/dev/trunk/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSolrEntityProcessorEndToEnd.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSolrEntityProcessorEndToEnd.java?rev=1305384&r1=1305383&r2=1305384&view=diff
==============================================================================
--- lucene/dev/trunk/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSolrEntityProcessorEndToEnd.java (original)
+++ lucene/dev/trunk/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSolrEntityProcessorEndToEnd.java Mon Mar 26 15:04:38 2012
@@ -252,25 +252,7 @@ public class TestSolrEntityProcessorEndT
assertQ(req("*:*"), "//result[@numFound='0']");
}
-
- public void testFullImportMultiThreaded() {
- assertQ(req("*:*"), "//result[@numFound='0']");
- int numDocs = 37;
- List<Map<String,Object>> docList = generateSolrDocuments(numDocs);
-
- try {
- addDocumentsToSolr(docList);
- Map<String,String> map = new HashMap<String,String>();
- map.put("rows", "50");
- runFullImport(generateDIHConfig("query='*:*' rows='6' numThreads='4'", jetty.getLocalPort()), map);
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- fail(e.getMessage());
- }
- assertQ(req("*:*"), "//result[@numFound='" + numDocs + "']");
- }
-
private static List<Map<String,Object>> generateSolrDocuments(int num) {
List<Map<String,Object>> docList = new ArrayList<Map<String,Object>>();
for (int i = 1; i <= num; i++) {
Modified: lucene/dev/trunk/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSolrEntityProcessorUnit.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSolrEntityProcessorUnit.java?rev=1305384&r1=1305383&r2=1305384&view=diff
==============================================================================
--- lucene/dev/trunk/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSolrEntityProcessorUnit.java (original)
+++ lucene/dev/trunk/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSolrEntityProcessorUnit.java Mon Mar 26 15:04:38 2012
@@ -17,12 +17,6 @@
package org.apache.solr.handler.dataimport;
import java.util.*;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -71,59 +65,6 @@ public class TestSolrEntityProcessorUnit
assertNull(processor.nextRow());
}
- public void testMultiThread() throws Exception {
- int numThreads = 5;
- int numDocs = 40;
- List<Doc> docs = generateUniqueDocs(numDocs);
- final MockSolrEntityProcessor entityProcessor = new MockSolrEntityProcessor(docs, 25);
- ThreadPoolExecutor executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 5, TimeUnit.SECONDS,
- new SynchronousQueue<Runnable>());
-
- final Map<String, Map<String, Object>> rowList = new LinkedHashMap<String, Map<String, Object>>();
- final CountDownLatch latch = new CountDownLatch(numThreads);
- final AtomicInteger errorCount = new AtomicInteger();
- for (int i = 0; i < numThreads; i++) {
- Runnable runnable = new Runnable() {
- public void run() {
- try {
- while (true) {
- synchronized (entityProcessor) {
- Map<String, Object> row = entityProcessor.nextRow();
- if (row == null) {
- break;
- }
- rowList.put(row.get(ID).toString(), row);
- }
- }
- } catch (Throwable t) {
- errorCount.incrementAndGet();
- LOG.error("Error in thread", t);
- } finally {
- latch.countDown();
- }
- }
- };
- executor.execute(runnable);
- }
-
- latch.await();
- assertEquals(0, errorCount.get());
- assertEquals(numDocs, rowList.size());
-
- for (Doc expectedDoc : docs) {
- String id = (String) expectedDoc.getFirstValue("id");
- Map<String, Object> row = rowList.get(id);
- assertNotNull(id + " shouldn't yield null", row);
- assertEquals(2, row.size());
- assertEquals(expectedDoc.id, row.get("id"));
- assertEquals(expectedDoc.getValues("description"), row.get("description"));
- rowList.remove(id);
- }
-
- assertEquals(0, rowList.size());
- executor.shutdown();
- }
-
private List<Doc> generateUniqueDocs(int numDocs) {
List<FldType> types = new ArrayList<FldType>();
types.add(new FldType(ID, ONE_ONE, new SVal('A', 'Z', 4, 40)));
Modified: lucene/dev/trunk/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSqlEntityProcessor2.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSqlEntityProcessor2.java?rev=1305384&r1=1305383&r2=1305384&view=diff
==============================================================================
--- lucene/dev/trunk/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSqlEntityProcessor2.java (original)
+++ lucene/dev/trunk/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSqlEntityProcessor2.java Mon Mar 26 15:04:38 2012
@@ -69,27 +69,6 @@ public class TestSqlEntityProcessor2 ext
@Test
@SuppressWarnings("unchecked")
- public void testCompositePk_FullImport_MT() throws Exception {
- List parentRow = new ArrayList();
- parentRow.add(createMap("id", "1"));
- parentRow.add(createMap("id", "2"));
- MockDataSource.setIterator("select * from x", parentRow.iterator());
-
- List childRow = new ArrayList();
- childRow.add(createMap("desc", "hello"));
-
- MockDataSource.setIterator("select * from y where y.A=1", childRow.iterator());
- MockDataSource.setIterator("select * from y where y.A=2", childRow.iterator());
-
- runFullImport(dataConfig_2threads);
-
- assertQ(req("id:1"), "//*[@numFound='1']");
- assertQ(req("id:2"), "//*[@numFound='1']");
- assertQ(req("desc:hello"), "//*[@numFound='2']");
- }
-
- @Test
- @SuppressWarnings("unchecked")
public void testCompositePk_FullImportWithoutCommit() throws Exception {
List parentRow = new ArrayList();
parentRow.add(createMap("id", "10"));
@@ -250,15 +229,6 @@ public class TestSqlEntityProcessor2 ext
+ " </entity>\n" + " </entity>\n"
+ " </document>\n" + "</dataConfig>\n";
- private static String dataConfig_2threads = "<dataConfig><dataSource type=\"MockDataSource\"/>\n"
- + " <document>\n"
- + " <entity name=\"x\" pk=\"id\" query=\"select * from x\" threads=\"2\">\n"
- + " <field column=\"id\" />\n"
- + " <entity name=\"y\" query=\"select * from y where y.A=${x.id}\">\n"
- + " <field column=\"desc\" />\n"
- + " </entity>\n" + " </entity>\n"
- + " </document>\n" + "</dataConfig>\n";
-
private static String dataConfig_deltaimportquery = "<dataConfig><dataSource type=\"MockDataSource\"/>\n"
+ " <document>\n"
+ " <entity name=\"x\" deltaImportQuery=\"select * from x where id=${dataimporter.delta.id}\" deltaQuery=\"select id from x where last_modified > NOW\">\n"
Modified: lucene/dev/trunk/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestVariableResolver.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestVariableResolver.java?rev=1305384&r1=1305383&r2=1305384&view=diff
==============================================================================
--- lucene/dev/trunk/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestVariableResolver.java (original)
+++ lucene/dev/trunk/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestVariableResolver.java Mon Mar 26 15:04:38 2012
@@ -83,41 +83,28 @@ public class TestVariableResolver extend
@Test
public void dateNamespaceWithValue() {
VariableResolverImpl vri = new VariableResolverImpl();
- ContextImpl context = new ContextImpl(null, vri, null, Context.FULL_DUMP, Collections.EMPTY_MAP, null, null);
- Context.CURRENT_CONTEXT.set(context);
- try {
- vri.addNamespace("dataimporter.functions", EvaluatorBag
- .getFunctionsNamespace(Collections.EMPTY_LIST, null));
- Map<String, Object> ns = new HashMap<String, Object>();
- Date d = new Date();
- ns.put("dt", d);
- vri.addNamespace("A", ns);
- assertEquals(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(d),
- vri.replaceTokens("${dataimporter.functions.formatDate(A.dt,'yyyy-MM-dd HH:mm:ss')}"));
- } finally {
- Context.CURRENT_CONTEXT.remove();
- }
+ vri.addNamespace("dataimporter.functions", EvaluatorBag
+ .getFunctionsNamespace(Collections.EMPTY_LIST, null, vri));
+ Map<String, Object> ns = new HashMap<String, Object>();
+ Date d = new Date();
+ ns.put("dt", d);
+ vri.addNamespace("A", ns);
+ assertEquals(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(d),
+ vri.replaceTokens("${dataimporter.functions.formatDate(A.dt,'yyyy-MM-dd HH:mm:ss')}"));
}
@Test
public void dateNamespaceWithExpr() throws Exception {
VariableResolverImpl vri = new VariableResolverImpl();
- ContextImpl context = new ContextImpl(null, vri, null, Context.FULL_DUMP, Collections.EMPTY_MAP, null, null);
- Context.CURRENT_CONTEXT.set(context);
- try {
- vri.addNamespace("dataimporter.functions", EvaluatorBag
- .getFunctionsNamespace(Collections.EMPTY_LIST,null));
-
- SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
- format.setTimeZone(TimeZone.getTimeZone("UTC"));
- resetEvaluatorBagDateMathParser();
- DateMathParser dmp = new DateMathParser(TimeZone.getDefault(), Locale.getDefault());
-
- String s = vri.replaceTokens("${dataimporter.functions.formatDate('NOW/DAY','yyyy-MM-dd HH:mm')}");
- assertEquals(new SimpleDateFormat("yyyy-MM-dd HH:mm").format(dmp.parseMath("/DAY")), s);
- } finally {
- Context.CURRENT_CONTEXT.remove();
- }
+ vri.addNamespace("dataimporter.functions", EvaluatorBag
+ .getFunctionsNamespace(Collections.EMPTY_LIST,null, vri));
+ SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
+ format.setTimeZone(TimeZone.getTimeZone("UTC"));
+ resetEvaluatorBagDateMathParser();
+ DateMathParser dmp = new DateMathParser(TimeZone.getDefault(), Locale.getDefault());
+
+ String s = vri.replaceTokens("${dataimporter.functions.formatDate('NOW/DAY','yyyy-MM-dd HH:mm')}");
+ assertEquals(new SimpleDateFormat("yyyy-MM-dd HH:mm").format(dmp.parseMath("/DAY")), s);
}
@Test
@@ -142,30 +129,25 @@ public class TestVariableResolver extend
public void testFunctionNamespace1() throws Exception {
VariableResolverImpl resolver = new VariableResolverImpl();
ContextImpl context = new ContextImpl(null, resolver, null, Context.FULL_DUMP, Collections.EMPTY_MAP, null, null);
- Context.CURRENT_CONTEXT.set(context);
- try {
- final List<Map<String ,String >> l = new ArrayList<Map<String, String>>();
- Map<String ,String > m = new HashMap<String, String>();
- m.put("name","test");
- m.put("class",E.class.getName());
- l.add(m);
-
- SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
- format.setTimeZone(TimeZone.getTimeZone("UTC"));
- resetEvaluatorBagDateMathParser();
- DateMathParser dmp = new DateMathParser(TimeZone.getDefault(), Locale.getDefault());
-
- resolver.addNamespace("dataimporter.functions", EvaluatorBag
- .getFunctionsNamespace(l,null));
- String s = resolver
- .replaceTokens("${dataimporter.functions.formatDate('NOW/DAY','yyyy-MM-dd HH:mm')}");
- assertEquals(new SimpleDateFormat("yyyy-MM-dd HH:mm")
- .format(dmp.parseMath("/DAY")), s);
- assertEquals("Hello World", resolver
- .replaceTokens("${dataimporter.functions.test('TEST')}"));
- } finally {
- Context.CURRENT_CONTEXT.remove();
- }
+ final List<Map<String ,String >> l = new ArrayList<Map<String, String>>();
+ Map<String ,String > m = new HashMap<String, String>();
+ m.put("name","test");
+ m.put("class",E.class.getName());
+ l.add(m);
+
+ SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
+ format.setTimeZone(TimeZone.getTimeZone("UTC"));
+ resetEvaluatorBagDateMathParser();
+ DateMathParser dmp = new DateMathParser(TimeZone.getDefault(), Locale.getDefault());
+
+ resolver.addNamespace("dataimporter.functions", EvaluatorBag
+ .getFunctionsNamespace(l,null, resolver));
+ String s = resolver
+ .replaceTokens("${dataimporter.functions.formatDate('NOW/DAY','yyyy-MM-dd HH:mm')}");
+ assertEquals(new SimpleDateFormat("yyyy-MM-dd HH:mm")
+ .format(dmp.parseMath("/DAY")), s);
+ assertEquals("Hello World", resolver
+ .replaceTokens("${dataimporter.functions.test('TEST')}"));
}
public static class E extends Evaluator{