You are viewing a plain text version of this content. The canonical link for it is here.
Posted to solr-commits@lucene.apache.org by no...@apache.org on 2010/01/12 08:49:41 UTC

svn commit: r898209 - in /lucene/solr/trunk/contrib/dataimporthandler/src: main/java/org/apache/solr/handler/dataimport/ test/java/org/apache/solr/handler/dataimport/

Author: noble
Date: Tue Jan 12 07:49:40 2010
New Revision: 898209

URL: http://svn.apache.org/viewvc?rev=898209&view=rev
Log:
SOLR-1352 . Multithreaded implementation

Added:
    lucene/solr/trunk/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/ThreadedContext.java
    lucene/solr/trunk/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/ThreadedEntityProcessorWrapper.java
    lucene/solr/trunk/contrib/dataimporthandler/src/test/java/org/apache/solr/handler/dataimport/TestThreaded.java
Modified:
    lucene/solr/trunk/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/Context.java
    lucene/solr/trunk/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/ContextImpl.java
    lucene/solr/trunk/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DataConfig.java
    lucene/solr/trunk/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DataImporter.java
    lucene/solr/trunk/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DocBuilder.java
    lucene/solr/trunk/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/EntityProcessorWrapper.java
    lucene/solr/trunk/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/EvaluatorBag.java
    lucene/solr/trunk/contrib/dataimporthandler/src/test/java/org/apache/solr/handler/dataimport/TestDocBuilder.java
    lucene/solr/trunk/contrib/dataimporthandler/src/test/java/org/apache/solr/handler/dataimport/TestEvaluatorBag.java
    lucene/solr/trunk/contrib/dataimporthandler/src/test/java/org/apache/solr/handler/dataimport/TestSqlEntityProcessor2.java
    lucene/solr/trunk/contrib/dataimporthandler/src/test/java/org/apache/solr/handler/dataimport/TestVariableResolver.java

Modified: lucene/solr/trunk/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/Context.java
URL: http://svn.apache.org/viewvc/lucene/solr/trunk/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/Context.java?rev=898209&r1=898208&r2=898209&view=diff
==============================================================================
--- lucene/solr/trunk/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/Context.java (original)
+++ lucene/solr/trunk/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/Context.java Tue Jan 12 07:49:40 2010
@@ -216,9 +216,11 @@
 
   /** Resolve variables in a template
    * @param template
+   *
    * @return The string w/ variables resolved
    */
   public abstract String replaceTokens(String template);
 
+  static final ThreadLocal<Context> CURRENT_CONTEXT = new ThreadLocal<Context>();
 
 }

Modified: lucene/solr/trunk/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/ContextImpl.java
URL: http://svn.apache.org/viewvc/lucene/solr/trunk/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/ContextImpl.java?rev=898209&r1=898208&r2=898209&view=diff
==============================================================================
--- lucene/solr/trunk/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/ContextImpl.java (original)
+++ lucene/solr/trunk/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/ContextImpl.java Tue Jan 12 07:49:40 2010
@@ -20,9 +20,9 @@
 import org.apache.solr.core.SolrCore;
 
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * <p>
@@ -122,23 +122,32 @@
   }
 
   public void setSessionAttribute(String name, Object val, String scope) {
+    if(name == null) return;
     if (Context.SCOPE_ENTITY.equals(scope)) {
       if (entitySession == null)
-        entitySession = new HashMap<String, Object>();
-      entitySession.put(name, val);
+        entitySession = new ConcurrentHashMap<String, Object>();
+
+      putVal(name, val,entitySession);
     } else if (Context.SCOPE_GLOBAL.equals(scope)) {
       if (globalSession != null) {
-        globalSession.put(name, val);
+        putVal(name, val,globalSession);
       }
     } else if (Context.SCOPE_DOC.equals(scope)) {
       DocBuilder.DocWrapper doc = getDocument();
       if (doc != null)
         doc.setSessionAttribute(name, val);
     } else if (SCOPE_SOLR_CORE.equals(scope)){
-      if(dataImporter != null) dataImporter.getCoreScopeSession().put(name, val);
+      if(dataImporter != null) {
+        putVal(name, val,dataImporter.getCoreScopeSession());
+      }
     }
   }
 
+  private void putVal(String name, Object val, Map map) {
+    if(val == null) map.remove(name);
+    else entitySession.put(name, val);
+  }
+
   public Object getSessionAttribute(String name, String scope) {
     if (Context.SCOPE_ENTITY.equals(scope)) {
       if (entitySession == null)

Modified: lucene/solr/trunk/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DataConfig.java
URL: http://svn.apache.org/viewvc/lucene/solr/trunk/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DataConfig.java?rev=898209&r1=898208&r2=898209&view=diff
==============================================================================
--- lucene/solr/trunk/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DataConfig.java (original)
+++ lucene/solr/trunk/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DataConfig.java Tue Jan 12 07:49:40 2010
@@ -56,6 +56,8 @@
 
   public Map<String, SchemaField> lowerNameVsSchemaField = new HashMap<String, SchemaField>();
 
+  boolean isMultiThreaded = false;
+
   public static class Document {
     // TODO - remove from here and add it to entity
     public String deleteQuery;

Modified: lucene/solr/trunk/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DataImporter.java
URL: http://svn.apache.org/viewvc/lucene/solr/trunk/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DataImporter.java?rev=898209&r1=898208&r2=898209&view=diff
==============================================================================
--- lucene/solr/trunk/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DataImporter.java (original)
+++ lucene/solr/trunk/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DataImporter.java Tue Jan 12 07:49:40 2010
@@ -38,6 +38,7 @@
 import java.util.*;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * <p> Stores all configuration information for pulling and indexing data. </p>
@@ -85,7 +86,7 @@
    * Only for testing purposes
    */
   DataImporter() {
-    coreScopeSession = new HashMap<String, Object>();
+    coreScopeSession = new ConcurrentHashMap<String, Object>();
   }
 
   DataImporter(String dataConfig, SolrCore core, Map<String, Properties> ds, Map<String, Object> session) {
@@ -206,6 +207,10 @@
       // if in this chain no document root is found()
       e.isDocRoot = true;
     }
+    if (e.allAttributes.get("threads") != null) {
+      if(docRootFound) throw new DataImportHandlerException(DataImportHandlerException.SEVERE, "'threads' not allowed below rootEntity ");
+      config.isMultiThreaded = true;      
+    }
 
     if (e.fields != null) {
       for (DataConfig.Field f : e.fields) {

Modified: lucene/solr/trunk/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DocBuilder.java
URL: http://svn.apache.org/viewvc/lucene/solr/trunk/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DocBuilder.java?rev=898209&r1=898208&r2=898209&view=diff
==============================================================================
--- lucene/solr/trunk/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DocBuilder.java (original)
+++ lucene/solr/trunk/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DocBuilder.java Tue Jan 12 07:49:40 2010
@@ -20,6 +20,8 @@
 import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.core.SolrCore;
 import static org.apache.solr.handler.dataimport.SolrWriter.LAST_INDEX_KEY;
+import static org.apache.solr.handler.dataimport.DataImportHandlerException.*;
+import static org.apache.solr.handler.dataimport.DataImportHandlerException.wrapAndThrow;
 import org.apache.solr.schema.SchemaField;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -27,7 +29,7 @@
 import java.util.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
-import java.text.ParseException;
+import java.util.concurrent.*;
 
 /**
  * <p> DocBuilder is responsible for creating Solr documents out of the given configuration. It also maintains
@@ -61,7 +63,7 @@
 
   boolean verboseDebug = false;
 
-  private Map<String, Object> session = new HashMap<String, Object>();
+   Map<String, Object> session = new ConcurrentHashMap<String, Object>();
 
   static final ThreadLocal<DocBuilder> INSTANCE = new ThreadLocal<DocBuilder>();
   Map<String, Object> functionsNamespace;
@@ -81,8 +83,9 @@
   public VariableResolverImpl getVariableResolver() {
     try {
       VariableResolverImpl resolver = null;
-      if(dataImporter != null && dataImporter.getCore() != null) resolver =  new VariableResolverImpl(dataImporter.getCore().getResourceLoader().getCoreProperties());
-      else resolver = new VariableResolverImpl();
+      if(dataImporter != null && dataImporter.getCore() != null){
+        resolver =  new VariableResolverImpl(dataImporter.getCore().getResourceLoader().getCoreProperties());
+      } else resolver = new VariableResolverImpl();
       Map<String, Object> indexerNamespace = new HashMap<String, Object>();
       if (persistedProperties.getProperty(LAST_INDEX_TIME) != null) {
         indexerNamespace.put(LAST_INDEX_TIME, persistedProperties.getProperty(LAST_INDEX_TIME));
@@ -106,7 +109,7 @@
       resolver.addNamespace(DataConfig.IMPORTER_NS, indexerNamespace);
       return resolver;
     } catch (Exception e) {
-      DataImportHandlerException.wrapAndThrow(DataImportHandlerException.SEVERE, e);
+      wrapAndThrow(SEVERE, e);
       // unreachable statement
       return null;
     }
@@ -117,7 +120,7 @@
       EventListener listener = (EventListener) loadClass(className, dataImporter.getCore()).newInstance();
       notifyListener(listener);
     } catch (Exception e) {
-      DataImportHandlerException.wrapAndThrow(DataImportHandlerException.SEVERE, e, "Unable to load class : " + className);
+      wrapAndThrow(SEVERE, e, "Unable to load class : " + className);
     }
   }
 
@@ -247,8 +250,17 @@
   @SuppressWarnings("unchecked")
   private void doFullDump() {
     addStatusMessage("Full Dump Started");
-    buildDocument(getVariableResolver(), null, null, root, true,
-            null);
+    if(dataImporter.getConfig().isMultiThreaded && !verboseDebug){
+      try {
+        LOG.info("running multithreaded full-import");
+        new EntityRunner(root,null).run(null,Context.FULL_DUMP,null);
+      } catch (Exception e) {
+        LOG.error("error in import", e);
+      }
+    } else {
+      buildDocument(getVariableResolver(), null, null, root, true, null);
+    }
+
   }
 
   @SuppressWarnings("unchecked")
@@ -269,7 +281,7 @@
     addStatusMessage("Deltas Obtained");
     addStatusMessage("Building documents");
     if (!deletedKeys.isEmpty()) {
-      allPks.removeAll(deletedKeys);      
+      allPks.removeAll(deletedKeys);
       deleteAll(deletedKeys);
       // Make sure that documents are not re-created
     }
@@ -298,21 +310,222 @@
     Iterator<Map<String, Object>> iter = deletedKeys.iterator();
     while (iter.hasNext()) {
       Map<String, Object> map = iter.next();
-      Object key = map.get(root.getPk()); 
+      Object key = map.get(root.getPk());
       if(key == null) {
         LOG.warn("no key was available for deleteted pk query");
         continue;
       }
       writer.deleteDoc(key);
-      importStatistics.deletedDocCount.incrementAndGet();      
+      importStatistics.deletedDocCount.incrementAndGet();
       iter.remove();
     }
   }
+  Executor executorSvc = new ThreadPoolExecutor(
+          0,
+          Integer.MAX_VALUE,
+          5, TimeUnit.SECONDS, // terminate idle threads after 5 sec
+          new SynchronousQueue<Runnable>()  // directly hand off tasks
+  );
 
   @SuppressWarnings("unchecked")
   public void addStatusMessage(String msg) {
     statusMessages.put(msg, DataImporter.DATE_TIME_FORMAT.get().format(new Date()));
   }
+  EntityRunner createRunner(DataConfig.Entity entity, EntityRunner parent){
+    return new EntityRunner(entity, parent);
+  }
+
+  /**This class is a just a structure to hold runtime information of one entity
+   *
+   */
+  class EntityRunner {
+    final DataConfig.Entity entity;
+    private EntityProcessor entityProcessor;
+    private final List<ThreadedEntityProcessorWrapper> entityProcessorWrapper = new ArrayList<ThreadedEntityProcessorWrapper>();
+    private DocWrapper docWrapper;
+    private volatile boolean entityInitialized ;
+    String currentProcess;
+    ThreadLocal<ThreadedEntityProcessorWrapper> currentEntityProcWrapper = new ThreadLocal<ThreadedEntityProcessorWrapper>();
+
+    private ContextImpl context;
+    EntityRunner parent;
+    AtomicBoolean entityEnded = new AtomicBoolean(false);
+    private Exception exception;
+
+    public EntityRunner(DataConfig.Entity entity, EntityRunner parent) {
+      this.parent = parent;
+      this.entity = entity;
+      if (entity.proc == null) {
+        entityProcessor = new SqlEntityProcessor();
+      } else {
+        try {
+          entityProcessor = (EntityProcessor) loadClass(entity.proc, dataImporter.getCore())
+                  .newInstance();
+        } catch (Exception e) {
+          wrapAndThrow(SEVERE, e,
+                  "Unable to load EntityProcessor implementation for entity:" + entity.name);
+        } 
+      }
+      int threads = 1;
+      if (entity.allAttributes.get("threads") != null) {
+        threads = Integer.parseInt(entity.allAttributes.get("threads"));
+      }
+      for (int i = 0; i < threads; i++) {
+        entityProcessorWrapper.add(new ThreadedEntityProcessorWrapper(entityProcessor, DocBuilder.this, this, getVariableResolver()));
+      }
+      context = new ThreadedContext(this, DocBuilder.this);
+    }
+
+
+    public void run(DocWrapper docWrapper, final String currProcess, final EntityRow rows) throws Exception {
+      entityInitialized =  false;
+      this.docWrapper = docWrapper;
+      this.currentProcess = currProcess;
+      entityEnded.set(false);
+      try {
+        if(entityProcessorWrapper.size() <= 1){
+          runAThread(entityProcessorWrapper.get(0), rows, currProcess);
+        } else {
+          final CountDownLatch latch = new CountDownLatch(entityProcessorWrapper.size());
+          for (final ThreadedEntityProcessorWrapper processorWrapper : entityProcessorWrapper) {
+            Runnable runnable = new Runnable() {
+              public void run() {
+                try {
+                  runAThread(processorWrapper, rows, currProcess);
+                }catch(Exception e) {
+                  entityEnded.set(true);
+                  exception = e;
+                } finally {
+                  latch.countDown();
+                } 
+              }
+            };
+            executorSvc.execute(runnable);
+          }          
+          try {
+            latch.await();
+          } catch (InterruptedException e) {
+            //TODO
+          }
+          Exception copy = exception;
+          if(copy != null){
+            exception = null;
+            throw copy;
+          }
+        }
+      } finally {
+        entityProcessor.destroy();
+      }
+
+
+    }
+
+    private void runAThread(ThreadedEntityProcessorWrapper epw, EntityRow rows, String currProcess) throws Exception {
+      currentEntityProcWrapper.set(epw);
+      epw.threadedInit(context);
+      initEntity();
+      try {
+        epw.init(rows);
+        DocWrapper docWrapper = this.docWrapper;
+        Context.CURRENT_CONTEXT.set(context);
+        for (; ;) {
+          try {
+            Map<String, Object> arow = epw.nextRow();
+            if (arow == null) {
+              break;
+            } else {
+              importStatistics.rowsCount.incrementAndGet();
+              if (docWrapper == null && entity.isDocRoot) {
+                docWrapper = new DocWrapper();
+                context.setDoc(docWrapper);
+                DataConfig.Entity e = entity.parentEntity;
+                for (EntityRow row = rows;  row != null&& e !=null; row = row.tail,e=e.parentEntity) {
+                    addFields(e, docWrapper, row.row, epw.resolver);
+                }
+              }
+              if (docWrapper != null) {
+                handleSpecialCommands(arow, docWrapper);
+                addFields(entity, docWrapper, arow, epw.resolver);
+              }
+              if (entity.entities != null) {
+                EntityRow nextRow = new EntityRow(arow, rows, entity.name);
+                for (DataConfig.Entity e : entity.entities) {
+                  epw.children.get(e).run(docWrapper,currProcess,nextRow);
+                }
+              }
+            }
+            if (entity.isDocRoot) {
+              LOG.info("a row on docroot" + docWrapper);
+              if (!docWrapper.isEmpty()) {
+                LOG.info("adding a doc "+docWrapper);
+                boolean result = writer.upload(docWrapper);
+                docWrapper = null;
+                if (result){
+                  importStatistics.docCount.incrementAndGet();
+                } else {
+                  importStatistics.failedDocCount.incrementAndGet();
+                }
+              }
+            }
+          } catch (DataImportHandlerException dihe) {
+            exception = dihe;
+            if(dihe.getErrCode() == SKIP_ROW || dihe.getErrCode() == SKIP) {
+              importStatistics.skipDocCount.getAndIncrement();
+              exception = null;//should not propogate up
+              continue;
+            }
+            if (entity.isDocRoot) {
+              if (dihe.getErrCode() == DataImportHandlerException.SKIP) {
+                importStatistics.skipDocCount.getAndIncrement();
+                exception = null;//should not propogate up
+              } else {
+                LOG.error("Exception while processing: "
+                        + entity.name + " document : " + docWrapper, dihe);
+              }
+              if (dihe.getErrCode() == DataImportHandlerException.SEVERE)
+                throw dihe;
+            } else {
+              //if this is not the docRoot then the execution has happened in the same thread. so propogate up,
+              // it will be handled at the docroot
+              entityEnded.set(true); 
+              throw dihe;
+            }
+            entityEnded.set(true);
+          }
+        }
+      } finally {
+        epw.destroy();
+        currentEntityProcWrapper.remove();
+        Context.CURRENT_CONTEXT.remove();
+      }
+    }
+
+    private void initEntity() {
+      if (!entityInitialized) {
+        synchronized (this) {
+          if (!entityInitialized) {
+            entityProcessor.init(context);
+            entityInitialized = true;
+          }
+        }
+      }
+    }
+  }
+
+  /**A reverse linked list .
+   *
+   */
+  static class EntityRow {
+    final Map<String, Object> row;
+    final EntityRow tail;
+    final String name;
+
+    EntityRow(Map<String, Object> row, EntityRow tail, String name) {
+      this.row = row;
+      this.tail = tail;
+      this.name = name;
+    }
+  }
 
   @SuppressWarnings("unchecked")
   private void buildDocument(VariableResolverImpl vr, DocWrapper doc,
@@ -325,7 +538,8 @@
             pk == null ? Context.FULL_DUMP : Context.DELTA_DUMP,
             session, parentCtx, this);
     entityProcessor.init(ctx);
-
+    Context.CURRENT_CONTEXT.set(ctx);
+    
     if (requestParameters.start > 0) {
       writer.log(SolrWriter.DISABLE_LOGGING, null, null);
     }
@@ -392,6 +606,10 @@
             }
             vr.removeNamespace(entity.name);
           }
+          /*The child entities would have changed the CURRENT_CONTEXT. So when they are done, set it back to the old.
+           *
+           */
+          Context.CURRENT_CONTEXT.set(ctx);
 
           if (entity.isDocRoot) {
             if (stop.get())
@@ -402,7 +620,7 @@
               if (result){
                 importStatistics.docCount.incrementAndGet();
               } else {
-                importStatistics.failedDocCount.incrementAndGet(); 
+                importStatistics.failedDocCount.incrementAndGet();
               }
             }
           }
@@ -435,6 +653,7 @@
             writer.log(SolrWriter.ROW_END, entity.name, null);
             if (entity.isDocRoot)
               writer.log(SolrWriter.END_DOC, null, null);
+            Context.CURRENT_CONTEXT.remove();
           }
         }
       }
@@ -573,7 +792,7 @@
   private EntityProcessorWrapper getEntityProcessor(DataConfig.Entity entity) {
     if (entity.processor != null)
       return entity.processor;
-    EntityProcessor entityProcessor;
+    EntityProcessor entityProcessor = null;
     if (entity.proc == null) {
       entityProcessor = new SqlEntityProcessor();
     } else {
@@ -581,9 +800,8 @@
         entityProcessor = (EntityProcessor) loadClass(entity.proc, dataImporter.getCore())
                 .newInstance();
       } catch (Exception e) {
-        throw new DataImportHandlerException(DataImportHandlerException.SEVERE,
-                "Unable to load EntityProcessor implementation for entity:"
-                        + entity.name, e);
+        wrapAndThrow (SEVERE,e,
+                "Unable to load EntityProcessor implementation for entity:" + entity.name);
       }
     }
     return entity.processor = new EntityProcessorWrapper(entityProcessor, this);

Modified: lucene/solr/trunk/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/EntityProcessorWrapper.java
URL: http://svn.apache.org/viewvc/lucene/solr/trunk/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/EntityProcessorWrapper.java?rev=898209&r1=898208&r2=898209&view=diff
==============================================================================
--- lucene/solr/trunk/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/EntityProcessorWrapper.java (original)
+++ lucene/solr/trunk/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/EntityProcessorWrapper.java Tue Jan 12 07:49:40 2010
@@ -37,20 +37,18 @@
 public class EntityProcessorWrapper extends EntityProcessor {
   private static final Logger log = LoggerFactory.getLogger(EntityProcessorWrapper.class);
 
-  private EntityProcessor delegate;
+  EntityProcessor delegate;
   private DocBuilder docBuilder;
 
-  private String onError;
-  private Context context;
-  private VariableResolverImpl resolver;
-  private String entityName;
+  String onError;
+  protected Context context;
+  protected VariableResolverImpl resolver;
+  String entityName;
 
   protected List<Transformer> transformers;
 
   protected List<Map<String, Object>> rowcache;
 
-  private  Context contextCopy;
-
   public EntityProcessorWrapper(EntityProcessor delegate, DocBuilder docBuilder) {
     this.delegate = delegate;
     this.docBuilder = docBuilder;
@@ -61,8 +59,6 @@
     this.context = context;
     resolver = (VariableResolverImpl) context.getVariableResolver();
     //context has to be set correctly . keep the copy of the old one so that it can be restored in destroy
-    contextCopy = resolver.context;
-    resolver.context = context;
     if (entityName == null) {
       onError = resolver.replaceTokens(context.getEntityAttribute(ON_ERROR));
       if (onError == null) onError = ABORT;
@@ -276,8 +272,6 @@
 
   public void destroy() {
     delegate.destroy();
-    resolver.context = contextCopy;
-    contextCopy = null;
   }
 
   public VariableResolverImpl getVariableResolver() {

Modified: lucene/solr/trunk/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/EvaluatorBag.java
URL: http://svn.apache.org/viewvc/lucene/solr/trunk/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/EvaluatorBag.java?rev=898209&r1=898208&r2=898209&view=diff
==============================================================================
--- lucene/solr/trunk/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/EvaluatorBag.java (original)
+++ lucene/solr/trunk/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/EvaluatorBag.java Tue Jan 12 07:49:40 2010
@@ -212,8 +212,7 @@
         if (evaluator == null)
           return null;
         VariableResolverImpl vri = VariableResolverImpl.CURRENT_VARIABLE_RESOLVER.get();
-        Context ctx = vri == null ? null : vri.context;
-        return evaluator.evaluate(m.group(2), ctx);
+        return evaluator.evaluate(m.group(2), Context.CURRENT_CONTEXT.get());
       }
 
     };

Added: lucene/solr/trunk/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/ThreadedContext.java
URL: http://svn.apache.org/viewvc/lucene/solr/trunk/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/ThreadedContext.java?rev=898209&view=auto
==============================================================================
--- lucene/solr/trunk/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/ThreadedContext.java (added)
+++ lucene/solr/trunk/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/ThreadedContext.java Tue Jan 12 07:49:40 2010
@@ -0,0 +1,76 @@
+package org.apache.solr.handler.dataimport;
+
+
+
+public class ThreadedContext extends ContextImpl{
+  private DocBuilder.EntityRunner entityRunner;
+  private boolean limitedContext = false;
+
+  public ThreadedContext(DocBuilder.EntityRunner entityRunner, DocBuilder docBuilder) {
+    super(entityRunner.entity,
+            null,//to be fethed realtime
+            null,
+            null,
+            docBuilder.session,
+            null,
+            docBuilder);
+    this.entityRunner = entityRunner;
+  }
+
+  @Override
+  public VariableResolver getVariableResolver() {
+    checkLimited();
+    return entityRunner.currentEntityProcWrapper.get().resolver;
+  }
+
+  @Override
+  public Context getParentContext() {
+    ThreadedContext ctx = new ThreadedContext(entityRunner.parent, docBuilder);
+    ctx.limitedContext =  true;
+    return ctx;
+  }
+
+  @Override
+  public String currentProcess() {
+    return entityRunner.currentProcess;
+  }
+
+  @Override
+  public EntityProcessor getEntityProcessor() {
+    return entityRunner.currentEntityProcWrapper.get().delegate;    
+  }
+
+  @Override
+  public DataSource getDataSource() {
+    checkLimited();
+    return super.getDataSource();    
+  }
+
+
+
+  private void checkLimited() {
+    if(limitedContext) throw new RuntimeException("parentContext does not support this method");
+  }
+
+  @Override
+  public String getResolvedEntityAttribute(String name) {
+    checkLimited();
+    return super.getResolvedEntityAttribute(name);
+  }
+
+  @Override
+  public void setSessionAttribute(String name, Object val, String scope) {
+    checkLimited();
+    super.setSessionAttribute(name, val, scope);
+  }
+
+  @Override
+  public Object resolve(String var) {
+    return getVariableResolver().resolve(var);
+  }
+
+  @Override
+  public String replaceTokens(String template) {
+    return getVariableResolver().replaceTokens(template);    
+  }
+}

Added: lucene/solr/trunk/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/ThreadedEntityProcessorWrapper.java
URL: http://svn.apache.org/viewvc/lucene/solr/trunk/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/ThreadedEntityProcessorWrapper.java?rev=898209&view=auto
==============================================================================
--- lucene/solr/trunk/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/ThreadedEntityProcessorWrapper.java (added)
+++ lucene/solr/trunk/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/ThreadedEntityProcessorWrapper.java Tue Jan 12 07:49:40 2010
@@ -0,0 +1,98 @@
+package org.apache.solr.handler.dataimport;
+
+import static org.apache.solr.handler.dataimport.EntityProcessorBase.ON_ERROR;
+import static org.apache.solr.handler.dataimport.EntityProcessorBase.ABORT;
+import static org.apache.solr.handler.dataimport.DataImportHandlerException.wrapAndThrow;
+import static org.apache.solr.handler.dataimport.DataImportHandlerException.SEVERE;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Collections;
+
+/**
+ * Each Entity may have only a single EntityProcessor .  But the same entity can be run by
+ * multiple EntityProcessorWrapper (1 per thread) . thhis helps running transformations in multiple threads
+ */
+
+public class ThreadedEntityProcessorWrapper extends EntityProcessorWrapper {
+  private static final Logger LOG = LoggerFactory.getLogger(ThreadedEntityProcessorWrapper.class);
+
+  DocBuilder.EntityRunner entityRunner;
+  /**For each child entity there is one EntityRunner
+   */
+  Map<DataConfig.Entity ,DocBuilder.EntityRunner> children;
+
+  public ThreadedEntityProcessorWrapper(EntityProcessor delegate, DocBuilder docBuilder,
+                                  DocBuilder.EntityRunner entityRunner,
+                                  VariableResolverImpl resolver) {
+    super(delegate, docBuilder);
+    this.entityRunner = entityRunner;
+    this.resolver = resolver;
+    if (entityRunner.entity.entities == null) {
+      children = Collections.emptyMap();
+    } else {
+      children = new HashMap<DataConfig.Entity, DocBuilder.EntityRunner>(entityRunner.entity.entities.size());
+      for (DataConfig.Entity e : entityRunner.entity.entities) {
+        DocBuilder.EntityRunner runner = docBuilder.createRunner(e, entityRunner);
+        children.put(e, runner);
+      }
+    }
+
+  }
+
+  void threadedInit(Context context){
+    rowcache = null;
+    this.context = context;
+    resolver = (VariableResolverImpl) context.getVariableResolver();
+    //context has to be set correctly . keep the copy of the old one so that it can be restored in destroy
+    if (entityName == null) {
+      onError = resolver.replaceTokens(context.getEntityAttribute(ON_ERROR));
+      if (onError == null) onError = ABORT;
+      entityName = context.getEntityAttribute(DataConfig.NAME);
+    }    
+  }
+
+  @Override
+  public Map<String, Object> nextRow() {
+    if (rowcache != null) {
+      return getFromRowCache();
+    }
+    while (true) {
+      Map<String, Object> arow = null;
+      synchronized (delegate) {
+        if(entityRunner.entityEnded.get()) return null;
+        try {
+          arow = delegate.nextRow();
+        } catch (Exception e) {
+          if (ABORT.equals(onError)) {
+            wrapAndThrow(SEVERE, e);
+          } else {
+            //SKIP is not really possible. If this calls the nextRow() again the Entityprocessor would be in an inconisttent state
+            LOG.error("Exception in entity : " + entityName, e);
+            return null;
+          }
+        }
+        LOG.info("arow : "+arow);
+        if(arow == null) entityRunner.entityEnded.set(true);
+      }
+      if (arow == null) {
+        return null;
+      } else {
+        arow = applyTransformer(arow);
+        if (arow != null) {
+          delegate.postTransform(arow);
+          return arow;
+        }
+      }
+    } 
+  }
+
+  public void init(DocBuilder.EntityRow rows) {
+    for (DocBuilder.EntityRow row = rows; row != null; row = row.tail) resolver.addNamespace(row.name, row.row);
+  }
+
+
+ 
+}

Modified: lucene/solr/trunk/contrib/dataimporthandler/src/test/java/org/apache/solr/handler/dataimport/TestDocBuilder.java
URL: http://svn.apache.org/viewvc/lucene/solr/trunk/contrib/dataimporthandler/src/test/java/org/apache/solr/handler/dataimport/TestDocBuilder.java?rev=898209&r1=898208&r2=898209&view=diff
==============================================================================
--- lucene/solr/trunk/contrib/dataimporthandler/src/test/java/org/apache/solr/handler/dataimport/TestDocBuilder.java (original)
+++ lucene/solr/trunk/contrib/dataimporthandler/src/test/java/org/apache/solr/handler/dataimport/TestDocBuilder.java Tue Jan 12 07:49:40 2010
@@ -239,6 +239,7 @@
   }
 
   public static final String dc_singleEntity = "<dataConfig>\n"
+          + "<dataSource  type=\"MockDataSource\"/>\n"
           + "    <document name=\"X\" >\n"
           + "        <entity name=\"x\" query=\"select * from x\">\n"
           + "          <field column=\"id\"/>\n"
@@ -247,6 +248,7 @@
           + "    </document>\n" + "</dataConfig>";
 
   public static final String dc_deltaConfig = "<dataConfig>\n"
+          + "<dataSource  type=\"MockDataSource\"/>\n"          
           + "    <document name=\"X\" >\n"
           + "        <entity name=\"x\" query=\"select * from x\" deltaQuery=\"select id from x\">\n"
           + "          <field column=\"id\"/>\n"

Modified: lucene/solr/trunk/contrib/dataimporthandler/src/test/java/org/apache/solr/handler/dataimport/TestEvaluatorBag.java
URL: http://svn.apache.org/viewvc/lucene/solr/trunk/contrib/dataimporthandler/src/test/java/org/apache/solr/handler/dataimport/TestEvaluatorBag.java?rev=898209&r1=898208&r2=898209&view=diff
==============================================================================
--- lucene/solr/trunk/contrib/dataimporthandler/src/test/java/org/apache/solr/handler/dataimport/TestEvaluatorBag.java (original)
+++ lucene/solr/trunk/contrib/dataimporthandler/src/test/java/org/apache/solr/handler/dataimport/TestEvaluatorBag.java Tue Jan 12 07:49:40 2010
@@ -98,15 +98,19 @@
   public void testEscapeSolrQueryFunction() {
     final VariableResolverImpl resolver = new VariableResolverImpl();
     ContextImpl context = new ContextImpl(null, resolver, null, Context.FULL_DUMP, Collections.EMPTY_MAP, null, null);
-    resolver.context = context;
-    Map m= new HashMap();
-    m.put("query","c:t");
-    resolver.addNamespace("dataimporter.functions", EvaluatorBag
-            .getFunctionsNamespace(Collections.EMPTY_LIST, null));
-    resolver.addNamespace("e",m);
-    String s = resolver
-            .replaceTokens("${dataimporter.functions.escapeQueryChars(e.query)}");
-    org.junit.Assert.assertEquals("c\\:t", s);
+    Context.CURRENT_CONTEXT.set(context);
+    try {
+      Map m= new HashMap();
+      m.put("query","c:t");
+      resolver.addNamespace("dataimporter.functions", EvaluatorBag
+              .getFunctionsNamespace(Collections.EMPTY_LIST, null));
+      resolver.addNamespace("e",m);
+      String s = resolver
+              .replaceTokens("${dataimporter.functions.escapeQueryChars(e.query)}");
+      org.junit.Assert.assertEquals("c\\:t", s);
+    } finally {
+      Context.CURRENT_CONTEXT.remove();
+    }
   }
 
   /**
@@ -115,31 +119,39 @@
   @Test
   public void testGetDateFormatEvaluator() {
     Evaluator dateFormatEval = EvaluatorBag.getDateFormatEvaluator();
-    resolver.context = new ContextImpl(null, resolver, null, Context.FULL_DUMP, Collections.EMPTY_MAP, null, null);
-
-    long time = System.currentTimeMillis();
-    assertEquals(new SimpleDateFormat("yyyy-MM-dd HH:mm").format(new Date(time - 2*86400*1000)),
-            dateFormatEval.evaluate("'NOW-2DAYS','yyyy-MM-dd HH:mm'", resolver.context));
-
-    Map<String, Object> map = new HashMap<String, Object>();
-    map.put("key", new Date(time));
-    resolver.addNamespace("A", map);
-
-    assertEquals(new SimpleDateFormat("yyyy-MM-dd HH:mm").format(new Date(time)),
-            dateFormatEval.evaluate("A.key, 'yyyy-MM-dd HH:mm'", resolver.context));
+    ContextImpl context = new ContextImpl(null, resolver, null, Context.FULL_DUMP, Collections.EMPTY_MAP, null, null);
+    Context.CURRENT_CONTEXT.set(context);
+    try {
+      long time = System.currentTimeMillis();
+      assertEquals(new SimpleDateFormat("yyyy-MM-dd HH:mm").format(new Date(time - 2*86400*1000)),
+              dateFormatEval.evaluate("'NOW-2DAYS','yyyy-MM-dd HH:mm'", Context.CURRENT_CONTEXT.get()));
+
+      Map<String, Object> map = new HashMap<String, Object>();
+      map.put("key", new Date(time));
+      resolver.addNamespace("A", map);
+
+      assertEquals(new SimpleDateFormat("yyyy-MM-dd HH:mm").format(new Date(time)),
+              dateFormatEval.evaluate("A.key, 'yyyy-MM-dd HH:mm'", Context.CURRENT_CONTEXT.get()));
+    } finally {
+      Context.CURRENT_CONTEXT.remove();
+    }
   }
 
   private void runTests(Map<String, String> tests, Evaluator evaluator) {
     ContextImpl ctx = new ContextImpl(null, resolver, null, Context.FULL_DUMP, Collections.EMPTY_MAP, null, null);
-    resolver.context = ctx;
-    for (Map.Entry<String, String> entry : tests.entrySet()) {
-      Map<String, Object> values = new HashMap<String, Object>();
-      values.put("key", entry.getKey());
-      resolver.addNamespace("A", values);
-
-      String expected = (String) entry.getValue();
-      String actual = evaluator.evaluate("A.key", ctx);
-      assertEquals(expected, actual);
+    Context.CURRENT_CONTEXT.set(ctx);
+    try {
+      for (Map.Entry<String, String> entry : tests.entrySet()) {
+        Map<String, Object> values = new HashMap<String, Object>();
+        values.put("key", entry.getKey());
+        resolver.addNamespace("A", values);
+
+        String expected = (String) entry.getValue();
+        String actual = evaluator.evaluate("A.key", ctx);
+        assertEquals(expected, actual);
+      }
+    } finally {
+      Context.CURRENT_CONTEXT.remove();
     }
   }
 }

Modified: lucene/solr/trunk/contrib/dataimporthandler/src/test/java/org/apache/solr/handler/dataimport/TestSqlEntityProcessor2.java
URL: http://svn.apache.org/viewvc/lucene/solr/trunk/contrib/dataimporthandler/src/test/java/org/apache/solr/handler/dataimport/TestSqlEntityProcessor2.java?rev=898209&r1=898208&r2=898209&view=diff
==============================================================================
--- lucene/solr/trunk/contrib/dataimporthandler/src/test/java/org/apache/solr/handler/dataimport/TestSqlEntityProcessor2.java (original)
+++ lucene/solr/trunk/contrib/dataimporthandler/src/test/java/org/apache/solr/handler/dataimport/TestSqlEntityProcessor2.java Tue Jan 12 07:49:40 2010
@@ -71,6 +71,26 @@
     assertQ(req("id:1"), "//*[@numFound='1']");
     assertQ(req("desc:hello"), "//*[@numFound='1']");
   }
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testCompositePk_FullImport_MT() throws Exception {
+    List parentRow = new ArrayList();
+    parentRow.add(createMap("id", "1"));
+    parentRow.add(createMap("id", "2"));
+    MockDataSource.setIterator("select * from x", parentRow.iterator());
+
+    List childRow = new ArrayList();
+    childRow.add(createMap("desc", "hello"));
+
+    MockDataSource.setIterator("select * from y where y.A=1", childRow.iterator());
+    MockDataSource.setIterator("select * from y where y.A=2", childRow.iterator());
+
+    super.runFullImport(dataConfig_2threads);
+
+    assertQ(req("id:1"), "//*[@numFound='1']");
+    assertQ(req("id:2"), "//*[@numFound='1']");
+    assertQ(req("desc:hello"), "//*[@numFound='2']");
+  }
 
   @Test
   @SuppressWarnings("unchecked")
@@ -234,6 +254,15 @@
           + "                       </entity>\n" + "               </entity>\n"
           + "       </document>\n" + "</dataConfig>\n";
 
+  private static String dataConfig_2threads = "<dataConfig><dataSource  type=\"MockDataSource\"/>\n"
+          + "       <document>\n"
+          + "               <entity name=\"x\" pk=\"id\" query=\"select * from x\" threads=\"2\">\n"
+          + "                       <field column=\"id\" />\n"
+          + "                       <entity name=\"y\" query=\"select * from y where y.A=${x.id}\">\n"
+          + "                               <field column=\"desc\" />\n"
+          + "                       </entity>\n" + "               </entity>\n"
+          + "       </document>\n" + "</dataConfig>\n";
+
   private static String dataConfig_deltaimportquery = "<dataConfig><dataSource  type=\"MockDataSource\"/>\n"
           + "       <document>\n"
           + "               <entity name=\"x\" deltaImportQuery=\"select * from x where id=${dataimporter.delta.id}\" deltaQuery=\"select id from x where last_modified > NOW\">\n"

Added: lucene/solr/trunk/contrib/dataimporthandler/src/test/java/org/apache/solr/handler/dataimport/TestThreaded.java
URL: http://svn.apache.org/viewvc/lucene/solr/trunk/contrib/dataimporthandler/src/test/java/org/apache/solr/handler/dataimport/TestThreaded.java?rev=898209&view=auto
==============================================================================
--- lucene/solr/trunk/contrib/dataimporthandler/src/test/java/org/apache/solr/handler/dataimport/TestThreaded.java (added)
+++ lucene/solr/trunk/contrib/dataimporthandler/src/test/java/org/apache/solr/handler/dataimport/TestThreaded.java Tue Jan 12 07:49:40 2010
@@ -0,0 +1,56 @@
+package org.apache.solr.handler.dataimport;
+
+import org.junit.Test;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+
+
+public class TestThreaded extends AbstractDataImportHandlerTest {
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testCompositePk_FullImport() throws Exception {
+    List parentRow = new ArrayList();
+//    parentRow.add(createMap("id", "1"));
+    parentRow.add(createMap("id", "2"));
+    parentRow.add(createMap("id", "3"));
+    parentRow.add(createMap("id", "4"));
+    parentRow.add(createMap("id", "1"));
+    MockDataSource.setIterator("select * from x", parentRow.iterator());
+
+    List childRow = new ArrayList();
+    Map map = createMap("desc", "hello");
+    childRow.add(map);
+
+    MockDataSource.setIterator("select * from y where y.A=1", childRow.iterator());
+    MockDataSource.setIterator("select * from y where y.A=2", childRow.iterator());
+    MockDataSource.setIterator("select * from y where y.A=3", childRow.iterator());
+    MockDataSource.setIterator("select * from y where y.A=4", childRow.iterator());
+
+    super.runFullImport(dataConfig);
+
+    assertQ(req("id:1"), "//*[@numFound='1']");
+    assertQ(req("*:*"), "//*[@numFound='4']");
+    assertQ(req("desc:hello"), "//*[@numFound='4']");
+  }
+
+    @Override
+  public String getSchemaFile() {
+    return "dataimport-schema.xml";
+  }
+
+  @Override
+  public String getSolrConfigFile() {
+    return "dataimport-solrconfig.xml";
+  }
+   private static String dataConfig = "<dataConfig>\n"
+          +"<dataSource  type=\"MockDataSource\"/>\n"
+          + "       <document>\n"
+          + "               <entity name=\"x\" threads=\"2\" query=\"select * from x\" deletedPkQuery=\"select id from x where last_modified > NOW AND deleted='true'\" deltaQuery=\"select id from x where last_modified > NOW\">\n"
+          + "                       <field column=\"id\" />\n"
+          + "                       <entity name=\"y\" query=\"select * from y where y.A=${x.id}\">\n"
+          + "                               <field column=\"desc\" />\n"
+          + "                       </entity>\n" + "               </entity>\n"
+          + "       </document>\n" + "</dataConfig>";
+}

Modified: lucene/solr/trunk/contrib/dataimporthandler/src/test/java/org/apache/solr/handler/dataimport/TestVariableResolver.java
URL: http://svn.apache.org/viewvc/lucene/solr/trunk/contrib/dataimporthandler/src/test/java/org/apache/solr/handler/dataimport/TestVariableResolver.java?rev=898209&r1=898208&r2=898209&view=diff
==============================================================================
--- lucene/solr/trunk/contrib/dataimporthandler/src/test/java/org/apache/solr/handler/dataimport/TestVariableResolver.java (original)
+++ lucene/solr/trunk/contrib/dataimporthandler/src/test/java/org/apache/solr/handler/dataimport/TestVariableResolver.java Tue Jan 12 07:49:40 2010
@@ -84,30 +84,40 @@
   @Test
   public void dateNamespaceWithValue() {
     VariableResolverImpl vri = new VariableResolverImpl();
-    vri.context = new ContextImpl(null,vri, null, Context.FULL_DUMP, Collections.EMPTY_MAP, null,null);
-    vri.addNamespace("dataimporter.functions", EvaluatorBag
-            .getFunctionsNamespace(Collections.EMPTY_LIST, null));
-    Map<String, Object> ns = new HashMap<String, Object>();
-    Date d = new Date();
-    ns.put("dt", d);
-    vri.addNamespace("A", ns);
-    Assert.assertEquals(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(d),
-                    vri.replaceTokens("${dataimporter.functions.formatDate(A.dt,'yyyy-MM-dd HH:mm:ss')}"));
+    ContextImpl context = new ContextImpl(null, vri, null, Context.FULL_DUMP, Collections.EMPTY_MAP, null, null);
+    Context.CURRENT_CONTEXT.set(context);
+    try {
+      vri.addNamespace("dataimporter.functions", EvaluatorBag
+              .getFunctionsNamespace(Collections.EMPTY_LIST, null));
+      Map<String, Object> ns = new HashMap<String, Object>();
+      Date d = new Date();
+      ns.put("dt", d);
+      vri.addNamespace("A", ns);
+      Assert.assertEquals(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(d),
+                      vri.replaceTokens("${dataimporter.functions.formatDate(A.dt,'yyyy-MM-dd HH:mm:ss')}"));
+    } finally {
+      Context.CURRENT_CONTEXT.remove();
+    }
   }
 
   @Test
   public void dateNamespaceWithExpr() throws Exception {
     VariableResolverImpl vri = new VariableResolverImpl();
-    vri.context = new ContextImpl(null,vri, null, Context.FULL_DUMP, Collections.EMPTY_MAP, null,null);
-    vri.addNamespace("dataimporter.functions", EvaluatorBag
-            .getFunctionsNamespace(Collections.EMPTY_LIST,null));
-
-    SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
-    format.setTimeZone(TimeZone.getTimeZone("UTC"));
-    DateMathParser dmp = new DateMathParser(TimeZone.getDefault(), Locale.getDefault());
-
-    String s = vri.replaceTokens("${dataimporter.functions.formatDate('NOW/DAY','yyyy-MM-dd HH:mm')}");
-    Assert.assertEquals(new SimpleDateFormat("yyyy-MM-dd HH:mm").format(dmp.parseMath("/DAY")), s);
+    ContextImpl context = new ContextImpl(null, vri, null, Context.FULL_DUMP, Collections.EMPTY_MAP, null, null);
+    Context.CURRENT_CONTEXT.set(context);
+    try {
+      vri.addNamespace("dataimporter.functions", EvaluatorBag
+              .getFunctionsNamespace(Collections.EMPTY_LIST,null));
+
+      SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
+      format.setTimeZone(TimeZone.getTimeZone("UTC"));
+      DateMathParser dmp = new DateMathParser(TimeZone.getDefault(), Locale.getDefault());
+
+      String s = vri.replaceTokens("${dataimporter.functions.formatDate('NOW/DAY','yyyy-MM-dd HH:mm')}");
+      Assert.assertEquals(new SimpleDateFormat("yyyy-MM-dd HH:mm").format(dmp.parseMath("/DAY")), s);
+    } finally {
+      Context.CURRENT_CONTEXT.remove();
+    }
   }
 
   @Test
@@ -130,26 +140,31 @@
 
   @Test
   public void testFunctionNamespace1() throws Exception {
-    final VariableResolverImpl resolver = new VariableResolverImpl();
-    resolver.context = new ContextImpl(null,resolver, null, Context.FULL_DUMP, Collections.EMPTY_MAP, null,null);
-    final List<Map<String ,String >> l = new ArrayList<Map<String, String>>();
-    Map<String ,String > m = new HashMap<String, String>();
-    m.put("name","test");
-    m.put("class",E.class.getName());
-    l.add(m);
-
-    SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
-    format.setTimeZone(TimeZone.getTimeZone("UTC"));
-    DateMathParser dmp = new DateMathParser(TimeZone.getDefault(), Locale.getDefault());
-
-    resolver.addNamespace("dataimporter.functions", EvaluatorBag
-            .getFunctionsNamespace(l,null));
-    String s = resolver
-            .replaceTokens("${dataimporter.functions.formatDate('NOW/DAY','yyyy-MM-dd HH:mm')}");
-    Assert.assertEquals(new SimpleDateFormat("yyyy-MM-dd HH:mm")
-            .format(dmp.parseMath("/DAY")), s);
-    Assert.assertEquals("Hello World", resolver
-            .replaceTokens("${dataimporter.functions.test('TEST')}"));
+    VariableResolverImpl resolver = new VariableResolverImpl();
+    ContextImpl context = new ContextImpl(null, resolver, null, Context.FULL_DUMP, Collections.EMPTY_MAP, null, null);
+    Context.CURRENT_CONTEXT.set(context);
+    try {
+      final List<Map<String ,String >> l = new ArrayList<Map<String, String>>();
+      Map<String ,String > m = new HashMap<String, String>();
+      m.put("name","test");
+      m.put("class",E.class.getName());
+      l.add(m);
+
+      SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
+      format.setTimeZone(TimeZone.getTimeZone("UTC"));
+      DateMathParser dmp = new DateMathParser(TimeZone.getDefault(), Locale.getDefault());
+
+      resolver.addNamespace("dataimporter.functions", EvaluatorBag
+              .getFunctionsNamespace(l,null));
+      String s = resolver
+              .replaceTokens("${dataimporter.functions.formatDate('NOW/DAY','yyyy-MM-dd HH:mm')}");
+      Assert.assertEquals(new SimpleDateFormat("yyyy-MM-dd HH:mm")
+              .format(dmp.parseMath("/DAY")), s);
+      Assert.assertEquals("Hello World", resolver
+              .replaceTokens("${dataimporter.functions.test('TEST')}"));
+    } finally {
+      Context.CURRENT_CONTEXT.remove();
+    }
   }
 
   public static class E extends Evaluator{