You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2018/11/02 11:33:37 UTC

[16/25] lucene-solr:jira/gradle: Adding dataimporthandler-extras module

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d7c03684/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DataImporter.java
----------------------------------------------------------------------
diff --git a/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DataImporter.java b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DataImporter.java
new file mode 100644
index 0000000..d610d66
--- /dev/null
+++ b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DataImporter.java
@@ -0,0 +1,622 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.dataimport;
+
+import org.apache.solr.common.EmptyEntityResolver;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.schema.IndexSchema;
+import org.apache.solr.util.SystemIdResolver;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.XMLErrorLogger;
+import org.apache.solr.handler.dataimport.config.ConfigNameConstants;
+import org.apache.solr.handler.dataimport.config.ConfigParseUtil;
+import org.apache.solr.handler.dataimport.config.DIHConfiguration;
+import org.apache.solr.handler.dataimport.config.Entity;
+import org.apache.solr.handler.dataimport.config.PropertyWriter;
+import org.apache.solr.handler.dataimport.config.Script;
+
+import static org.apache.solr.handler.dataimport.DataImportHandlerException.wrapAndThrow;
+import static org.apache.solr.handler.dataimport.DataImportHandlerException.SEVERE;
+import static org.apache.solr.handler.dataimport.DocBuilder.loadClass;
+import static org.apache.solr.handler.dataimport.config.ConfigNameConstants.CLASS;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.NodeList;
+import org.xml.sax.InputSource;
+import org.apache.commons.io.IOUtils;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * <p> Stores all configuration information for pulling and indexing data. </p>
+ * <p>
+ * <b>This API is experimental and subject to change</b>
+ *
+ * @since solr 1.3
+ */
+public class DataImporter {
+
+  public enum Status {
+    IDLE, RUNNING_FULL_DUMP, RUNNING_DELTA_DUMP, JOB_FAILED
+  }
+
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private static final XMLErrorLogger XMLLOG = new XMLErrorLogger(log);
+
+  private Status status = Status.IDLE;
+  private DIHConfiguration config;
+  private Date indexStartTime;
+  private Properties store = new Properties();
+  private Map<String, Map<String,String>> requestLevelDataSourceProps = new HashMap<>();
+  private IndexSchema schema;
+  public DocBuilder docBuilder;
+  public DocBuilder.Statistics cumulativeStatistics = new DocBuilder.Statistics();
+  private SolrCore core;  
+  private Map<String, Object> coreScopeSession = new ConcurrentHashMap<>();
+  private ReentrantLock importLock = new ReentrantLock();
+  private boolean isDeltaImportSupported = false;  
+  private final String handlerName;  
+
+  /**
+   * Only for testing purposes
+   */
+  DataImporter() {
+    this.handlerName = "dataimport" ;
+  }
+  
+  DataImporter(SolrCore core, String handlerName) {
+    this.handlerName = handlerName;
+    this.core = core;
+    this.schema = core.getLatestSchema();
+  }
+  
+  
+
+  
+  boolean maybeReloadConfiguration(RequestInfo params,
+      NamedList<?> defaultParams) throws IOException {
+  if (importLock.tryLock()) {
+      boolean success = false;
+      try {        
+        if (null != params.getRequest()) {
+          if (schema != params.getRequest().getSchema()) {
+            schema = params.getRequest().getSchema();
+          }
+        }
+        String dataConfigText = params.getDataConfig();
+        String dataconfigFile = params.getConfigFile();        
+        InputSource is = null;
+        if(dataConfigText!=null && dataConfigText.length()>0) {
+          is = new InputSource(new StringReader(dataConfigText));
+        } else if(dataconfigFile!=null) {
+          is = new InputSource(core.getResourceLoader().openResource(dataconfigFile));
+          is.setSystemId(SystemIdResolver.createSystemIdFromResourceName(dataconfigFile));
+          log.info("Loading DIH Configuration: " + dataconfigFile);
+        }
+        if(is!=null) {          
+          config = loadDataConfig(is);
+          success = true;
+        }      
+        
+        Map<String,Map<String,String>> dsProps = new HashMap<>();
+        if(defaultParams!=null) {
+          int position = 0;
+          while (position < defaultParams.size()) {
+            if (defaultParams.getName(position) == null) {
+              break;
+            }
+            String name = defaultParams.getName(position);            
+            if (name.equals("datasource")) {
+              success = true;
+              NamedList dsConfig = (NamedList) defaultParams.getVal(position);
+              log.info("Getting configuration for Global Datasource...");
+              Map<String,String> props = new HashMap<>();
+              for (int i = 0; i < dsConfig.size(); i++) {
+                props.put(dsConfig.getName(i), dsConfig.getVal(i).toString());
+              }
+              log.info("Adding properties to datasource: " + props);
+              dsProps.put((String) dsConfig.get("name"), props);
+            }
+            position++;
+          }
+        }
+        requestLevelDataSourceProps = Collections.unmodifiableMap(dsProps);
+      } catch(IOException ioe) {
+        throw ioe;
+      } finally {
+        importLock.unlock();
+      }
+      return success;
+    } else {
+      return false;
+    }
+  }
+  
+  
+  
+  public String getHandlerName() {
+    return handlerName;
+  }
+
+  public IndexSchema getSchema() {
+    return schema;
+  }
+
+  /**
+   * Used by tests
+   */
+  void loadAndInit(String configStr) {
+    config = loadDataConfig(new InputSource(new StringReader(configStr)));
+  }
+
+  void loadAndInit(InputSource configFile) {
+    config = loadDataConfig(configFile);
+  }
+
+  public DIHConfiguration loadDataConfig(InputSource configFile) {
+
+    DIHConfiguration dihcfg = null;
+    try {
+      DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
+      dbf.setValidating(false);
+      
+      // only enable xinclude, if XML is coming from safe source (local file)
+      // and a a SolrCore and SystemId is present (makes no sense otherwise):
+      if (core != null && configFile.getSystemId() != null) {
+        try {
+          dbf.setXIncludeAware(true);
+          dbf.setNamespaceAware(true);
+        } catch( UnsupportedOperationException e ) {
+          log.warn( "XML parser doesn't support XInclude option" );
+        }
+      }
+      
+      DocumentBuilder builder = dbf.newDocumentBuilder();
+      // only enable xinclude / external entities, if XML is coming from
+      // safe source (local file) and a a SolrCore and SystemId is present:
+      if (core != null && configFile.getSystemId() != null) {
+        builder.setEntityResolver(new SystemIdResolver(core.getResourceLoader()));
+      } else {
+        // Don't allow external entities without having a system ID:
+        builder.setEntityResolver(EmptyEntityResolver.SAX_INSTANCE);
+      }
+      builder.setErrorHandler(XMLLOG);
+      Document document;
+      try {
+        document = builder.parse(configFile);
+      } finally {
+        // some XML parsers are broken and don't close the byte stream (but they should according to spec)
+        IOUtils.closeQuietly(configFile.getByteStream());
+      }
+
+      dihcfg = readFromXml(document);
+      log.info("Data Configuration loaded successfully");
+    } catch (Exception e) {
+      throw new DataImportHandlerException(SEVERE,
+              "Data Config problem: " + e.getMessage(), e);
+    }
+    for (Entity e : dihcfg.getEntities()) {
+      if (e.getAllAttributes().containsKey(SqlEntityProcessor.DELTA_QUERY)) {
+        isDeltaImportSupported = true;
+        break;
+      }
+    }
+    return dihcfg;
+  }
+  
+  public DIHConfiguration readFromXml(Document xmlDocument) {
+    DIHConfiguration config;
+    List<Map<String, String >> functions = new ArrayList<>();
+    Script script = null;
+    Map<String, Map<String,String>> dataSources = new HashMap<>();
+    
+    NodeList dataConfigTags = xmlDocument.getElementsByTagName("dataConfig");
+    if(dataConfigTags == null || dataConfigTags.getLength() == 0) {
+      throw new DataImportHandlerException(SEVERE, "the root node '<dataConfig>' is missing");
+    }
+    Element e = (Element) dataConfigTags.item(0);
+    List<Element> documentTags = ConfigParseUtil.getChildNodes(e, "document");
+    if (documentTags.isEmpty()) {
+      throw new DataImportHandlerException(SEVERE, "DataImportHandler " +
+              "configuration file must have one <document> node.");
+    }
+
+    List<Element> scriptTags = ConfigParseUtil.getChildNodes(e, ConfigNameConstants.SCRIPT);
+    if (!scriptTags.isEmpty()) {
+      script = new Script(scriptTags.get(0));
+    }
+
+    // Add the provided evaluators
+    List<Element> functionTags = ConfigParseUtil.getChildNodes(e, ConfigNameConstants.FUNCTION);
+    if (!functionTags.isEmpty()) {
+      for (Element element : functionTags) {
+        String func = ConfigParseUtil.getStringAttribute(element, NAME, null);
+        String clz = ConfigParseUtil.getStringAttribute(element, ConfigNameConstants.CLASS, null);
+        if (func == null || clz == null){
+          throw new DataImportHandlerException(
+                  SEVERE,
+                  "<function> must have a 'name' and 'class' attributes");
+        } else {
+          functions.add(ConfigParseUtil.getAllAttributes(element));
+        }
+      }
+    }
+    List<Element> dataSourceTags = ConfigParseUtil.getChildNodes(e, ConfigNameConstants.DATA_SRC);
+    if (!dataSourceTags.isEmpty()) {
+      for (Element element : dataSourceTags) {
+        Map<String,String> p = new HashMap<>();
+        HashMap<String, String> attrs = ConfigParseUtil.getAllAttributes(element);
+        for (Map.Entry<String, String> entry : attrs.entrySet()) {
+          p.put(entry.getKey(), entry.getValue());
+        }
+        dataSources.put(p.get("name"), p);
+      }
+    }
+    if(dataSources.get(null) == null){
+      for (Map<String,String> properties : dataSources.values()) {
+        dataSources.put(null,properties);
+        break;        
+      } 
+    }
+    PropertyWriter pw = null;
+    List<Element> propertyWriterTags = ConfigParseUtil.getChildNodes(e, ConfigNameConstants.PROPERTY_WRITER);
+    if (propertyWriterTags.isEmpty()) {
+      boolean zookeeper = false;
+      if (this.core != null
+          && this.core.getCoreContainer().isZooKeeperAware()) {
+        zookeeper = true;
+      }
+      pw = new PropertyWriter(zookeeper ? "ZKPropertiesWriter"
+          : "SimplePropertiesWriter", Collections.<String,String> emptyMap());
+    } else if (propertyWriterTags.size() > 1) {
+      throw new DataImportHandlerException(SEVERE, "Only one "
+          + ConfigNameConstants.PROPERTY_WRITER + " can be configured.");
+    } else {
+      Element pwElement = propertyWriterTags.get(0);
+      String type = null;
+      Map<String,String> params = new HashMap<>();
+      for (Map.Entry<String,String> entry : ConfigParseUtil.getAllAttributes(
+          pwElement).entrySet()) {
+        if (TYPE.equals(entry.getKey())) {
+          type = entry.getValue();
+        } else {
+          params.put(entry.getKey(), entry.getValue());
+        }
+      }
+      if (type == null) {
+        throw new DataImportHandlerException(SEVERE, "The "
+            + ConfigNameConstants.PROPERTY_WRITER + " element must specify "
+            + TYPE);
+      }
+      pw = new PropertyWriter(type, params);
+    }
+    return new DIHConfiguration(documentTags.get(0), this, functions, script, dataSources, pw);
+  }
+    
+  @SuppressWarnings("unchecked")
+  private DIHProperties createPropertyWriter() {
+    DIHProperties propWriter = null;
+    PropertyWriter configPw = config.getPropertyWriter();
+    try {
+      Class<DIHProperties> writerClass = DocBuilder.loadClass(configPw.getType(), this.core);
+      propWriter = writerClass.newInstance();
+      propWriter.init(this, configPw.getParameters());
+    } catch (Exception e) {
+      throw new DataImportHandlerException(DataImportHandlerException.SEVERE, "Unable to PropertyWriter implementation:" + configPw.getType(), e);
+    }
+    return propWriter;
+  }
+
+  public DIHConfiguration getConfig() {
+    return config;
+  }
+
+  Date getIndexStartTime() {
+    return indexStartTime;
+  }
+
+  void setIndexStartTime(Date indextStartTime) {
+    this.indexStartTime = indextStartTime;
+  }
+
+  void store(Object key, Object value) {
+    store.put(key, value);
+  }
+
+  Object retrieve(Object key) {
+    return store.get(key);
+  }
+
+  public DataSource getDataSourceInstance(Entity key, String name, Context ctx) {
+    Map<String,String> p = requestLevelDataSourceProps.get(name);
+    if (p == null)
+      p = config.getDataSources().get(name);
+    if (p == null)
+      p = requestLevelDataSourceProps.get(null);// for default data source
+    if (p == null)
+      p = config.getDataSources().get(null);
+    if (p == null)  
+      throw new DataImportHandlerException(SEVERE,
+              "No dataSource :" + name + " available for entity :" + key.getName());
+    String type = p.get(TYPE);
+    DataSource dataSrc = null;
+    if (type == null) {
+      dataSrc = new JdbcDataSource();
+    } else {
+      try {
+        dataSrc = (DataSource) DocBuilder.loadClass(type, getCore()).newInstance();
+      } catch (Exception e) {
+        wrapAndThrow(SEVERE, e, "Invalid type for data source: " + type);
+      }
+    }
+    try {
+      Properties copyProps = new Properties();
+      copyProps.putAll(p);
+      Map<String, Object> map = ctx.getRequestParameters();
+      if (map.containsKey("rows")) {
+        int rows = Integer.parseInt((String) map.get("rows"));
+        if (map.containsKey("start")) {
+          rows += Integer.parseInt((String) map.get("start"));
+        }
+        copyProps.setProperty("maxRows", String.valueOf(rows));
+      }
+      dataSrc.init(ctx, copyProps);
+    } catch (Exception e) {
+      wrapAndThrow(SEVERE, e, "Failed to initialize DataSource: " + key.getDataSourceName());
+    }
+    return dataSrc;
+  }
+
+  public Status getStatus() {
+    return status;
+  }
+
+  public void setStatus(Status status) {
+    this.status = status;
+  }
+
+  public boolean isBusy() {
+    return importLock.isLocked();
+  }
+
+  public void doFullImport(DIHWriter writer, RequestInfo requestParams) {
+    log.info("Starting Full Import");
+    setStatus(Status.RUNNING_FULL_DUMP);
+    try {
+      DIHProperties dihPropWriter = createPropertyWriter();
+      setIndexStartTime(dihPropWriter.getCurrentTimestamp());
+      docBuilder = new DocBuilder(this, writer, dihPropWriter, requestParams);
+      checkWritablePersistFile(writer, dihPropWriter);
+      docBuilder.execute();
+      if (!requestParams.isDebug())
+        cumulativeStatistics.add(docBuilder.importStatistics);
+    } catch (Exception e) {
+      SolrException.log(log, "Full Import failed", e);
+      docBuilder.handleError("Full Import failed", e);
+    } finally {
+      setStatus(Status.IDLE);
+      DocBuilder.INSTANCE.set(null);
+    }
+
+  }
+
+  private void checkWritablePersistFile(DIHWriter writer, DIHProperties dihPropWriter) {
+   if (isDeltaImportSupported && !dihPropWriter.isWritable()) {
+      throw new DataImportHandlerException(SEVERE,
+          "Properties is not writable. Delta imports are supported by data config but will not work.");
+    }
+  }
+
+  public void doDeltaImport(DIHWriter writer, RequestInfo requestParams) {
+    log.info("Starting Delta Import");
+    setStatus(Status.RUNNING_DELTA_DUMP);
+    try {
+      DIHProperties dihPropWriter = createPropertyWriter();
+      setIndexStartTime(dihPropWriter.getCurrentTimestamp());
+      docBuilder = new DocBuilder(this, writer, dihPropWriter, requestParams);
+      checkWritablePersistFile(writer, dihPropWriter);
+      docBuilder.execute();
+      if (!requestParams.isDebug())
+        cumulativeStatistics.add(docBuilder.importStatistics);
+    } catch (Exception e) {
+      log.error("Delta Import Failed", e);
+      docBuilder.handleError("Delta Import Failed", e);
+    } finally {
+      setStatus(Status.IDLE);
+      DocBuilder.INSTANCE.set(null);
+    }
+
+  }
+
+  public void runAsync(final RequestInfo reqParams, final DIHWriter sw) {
+    new Thread(() -> runCmd(reqParams, sw)).start();
+  }
+
+  void runCmd(RequestInfo reqParams, DIHWriter sw) {
+    String command = reqParams.getCommand();
+    if (command.equals(ABORT_CMD)) {
+      if (docBuilder != null) {
+        docBuilder.abort();
+      }
+      return;
+    }
+    if (!importLock.tryLock()){
+      log.warn("Import command failed . another import is running");
+      return;
+    }
+    try {
+      if (FULL_IMPORT_CMD.equals(command) || IMPORT_CMD.equals(command)) {
+        doFullImport(sw, reqParams);
+      } else if (command.equals(DELTA_IMPORT_CMD)) {
+        doDeltaImport(sw, reqParams);
+      }
+    } finally {
+      importLock.unlock();
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  Map<String, String> getStatusMessages() {
+    //this map object is a Collections.synchronizedMap(new LinkedHashMap()). if we
+    // synchronize on the object it must be safe to iterate through the map
+    Map statusMessages = (Map) retrieve(STATUS_MSGS);
+    Map<String, String> result = new LinkedHashMap<>();
+    if (statusMessages != null) {
+      synchronized (statusMessages) {
+        for (Object o : statusMessages.entrySet()) {
+          Map.Entry e = (Map.Entry) o;
+          //the toString is taken because some of the Objects create the data lazily when toString() is called
+          result.put((String) e.getKey(), e.getValue().toString());
+        }
+      }
+    }
+    return result;
+
+  }
+
+  public DocBuilder getDocBuilder() {
+    return docBuilder;
+  }
+
+  public DocBuilder getDocBuilder(DIHWriter writer, RequestInfo requestParams) {
+    DIHProperties dihPropWriter = createPropertyWriter();
+    return new DocBuilder(this, writer, dihPropWriter, requestParams);
+  }
+
+  Map<String, Evaluator> getEvaluators() {
+    return getEvaluators(config.getFunctions());
+  }
+  
+  /**
+   * used by tests.
+   */
+  Map<String, Evaluator> getEvaluators(List<Map<String,String>> fn) {
+    Map<String, Evaluator> evaluators = new HashMap<>();
+    evaluators.put(Evaluator.DATE_FORMAT_EVALUATOR, new DateFormatEvaluator());
+    evaluators.put(Evaluator.SQL_ESCAPE_EVALUATOR, new SqlEscapingEvaluator());
+    evaluators.put(Evaluator.URL_ENCODE_EVALUATOR, new UrlEvaluator());
+    evaluators.put(Evaluator.ESCAPE_SOLR_QUERY_CHARS, new SolrQueryEscapingEvaluator());
+    SolrCore core = docBuilder == null ? null : docBuilder.dataImporter.getCore();
+    for (Map<String, String> map : fn) {
+      try {
+        evaluators.put(map.get(NAME), (Evaluator) loadClass(map.get(CLASS), core).newInstance());
+      } catch (Exception e) {
+        wrapAndThrow(SEVERE, e, "Unable to instantiate evaluator: " + map.get(CLASS));
+      }
+    }
+    return evaluators;    
+  }
+
+  static final ThreadLocal<AtomicLong> QUERY_COUNT = new ThreadLocal<AtomicLong>() {
+    @Override
+    protected AtomicLong initialValue() {
+      return new AtomicLong();
+    }
+  };
+
+  
+
+  static final class MSG {
+    public static final String NO_CONFIG_FOUND = "Configuration not found";
+
+    public static final String NO_INIT = "DataImportHandler started. Not Initialized. No commands can be run";
+
+    public static final String INVALID_CONFIG = "FATAL: Could not create importer. DataImporter config invalid";
+
+    public static final String LOAD_EXP = "Exception while loading DataImporter";
+
+    public static final String JMX_DESC = "Manage data import from databases to Solr";
+
+    public static final String CMD_RUNNING = "A command is still running...";
+
+    public static final String DEBUG_NOT_ENABLED = "Debug not enabled. Add a tag <str name=\"enableDebug\">true</str> in solrconfig.xml";
+
+    public static final String CONFIG_RELOADED = "Configuration Re-loaded sucessfully";
+    
+    public static final String CONFIG_NOT_RELOADED = "Configuration NOT Re-loaded...Data Importer is busy.";
+
+    public static final String TOTAL_DOC_PROCESSED = "Total Documents Processed";
+
+    public static final String TOTAL_FAILED_DOCS = "Total Documents Failed";
+
+    public static final String TOTAL_QUERIES_EXECUTED = "Total Requests made to DataSource";
+
+    public static final String TOTAL_ROWS_EXECUTED = "Total Rows Fetched";
+
+    public static final String TOTAL_DOCS_DELETED = "Total Documents Deleted";
+
+    public static final String TOTAL_DOCS_SKIPPED = "Total Documents Skipped";
+  }
+
+  public SolrCore getCore() {
+    return core;
+  }
+  
+  void putToCoreScopeSession(String key, Object val) {
+    coreScopeSession.put(key, val);
+  }
+  Object getFromCoreScopeSession(String key) {
+    return coreScopeSession.get(key);
+  }
+
+  public static final String COLUMN = "column";
+
+  public static final String TYPE = "type";
+
+  public static final String DATA_SRC = "dataSource";
+
+  public static final String MULTI_VALUED = "multiValued";
+
+  public static final String NAME = "name";
+
+  public static final String STATUS_MSGS = "status-messages";
+
+  public static final String FULL_IMPORT_CMD = "full-import";
+
+  public static final String IMPORT_CMD = "import";
+
+  public static final String DELTA_IMPORT_CMD = "delta-import";
+
+  public static final String ABORT_CMD = "abort";
+
+  public static final String DEBUG_MODE = "debug";
+
+  public static final String RELOAD_CONF_CMD = "reload-config";
+
+  public static final String SHOW_CONF_CMD = "show-config";
+  
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d7c03684/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DataSource.java
----------------------------------------------------------------------
diff --git a/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DataSource.java b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DataSource.java
new file mode 100644
index 0000000..e217ddd
--- /dev/null
+++ b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DataSource.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.dataimport;
+
+import java.util.Properties;
+
+/**
+ * <p>
+ * Provides data from a source with a given query.
+ * </p>
+ * <p>
+ * Implementation of this abstract class must provide a default no-arg constructor
+ * </p>
+ * <p>
+ * Refer to <a
+ * href="http://wiki.apache.org/solr/DataImportHandler">http://wiki.apache.org/solr/DataImportHandler</a>
+ * for more details.
+ * </p>
+ * <p>
+ * <b>This API is experimental and may change in the future.</b>
+ *
+ * @since solr 1.3
+ */
+public abstract class DataSource<T> {
+
+  /**
+   * Initializes the DataSource with the <code>Context</code> and
+   * initialization properties.
+   * <p>
+   * This is invoked by the <code>DataImporter</code> after creating an
+   * instance of this class.
+   */
+  public abstract void init(Context context, Properties initProps);
+
+  /**
+   * Get records for the given query.The return type depends on the
+   * implementation .
+   *
+   * @param query The query string. It can be a SQL for JdbcDataSource or a URL
+   *              for HttpDataSource or a file location for FileDataSource or a custom
+   *              format for your own custom DataSource.
+   * @return Depends on the implementation. For instance JdbcDataSource returns
+   *         an Iterator&lt;Map &lt;String,Object&gt;&gt;
+   */
+  public abstract T getData(String query);
+
+  /**
+   * Cleans up resources of this DataSource after use.
+   */
+  public abstract void close();
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d7c03684/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DateFormatEvaluator.java
----------------------------------------------------------------------
diff --git a/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DateFormatEvaluator.java b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DateFormatEvaluator.java
new file mode 100644
index 0000000..f4df820
--- /dev/null
+++ b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DateFormatEvaluator.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.dataimport;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.IllformedLocaleException;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.TimeZone;
+
+import org.apache.solr.common.util.SuppressForbidden;
+import org.apache.solr.handler.dataimport.config.EntityField;
+import org.apache.solr.util.DateMathParser;
+
+import static org.apache.solr.handler.dataimport.DataImportHandlerException.SEVERE;
+import static org.apache.solr.handler.dataimport.DataImportHandlerException.wrapAndThrow;
+
+/**
+ * <p>Formats values using a given date format. </p>
+ * <p>Pass three parameters:
+ * <ul>
+ *  <li>An {@link EntityField} or a date expression to be parsed with 
+ *      the {@link DateMathParser} class  If the value is in a String, 
+ *      then it is assumed to be a datemath expression, otherwise it 
+ *      resolved using a {@link VariableResolver} instance</li>
+ *  <li>A date format see {@link SimpleDateFormat} for the syntax.</li>
+ *  <li>The {@link Locale} to parse.  
+ *      (optional. Defaults to the Root Locale) </li>
+ * </ul>
+ */
+public class DateFormatEvaluator extends Evaluator {
+  
+  public static final String DEFAULT_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
+  protected Map<String, Locale> availableLocales = new HashMap<>();
+  protected Set<String> availableTimezones = new HashSet<>();
+
+  @SuppressForbidden(reason = "Usage of outdated locale parsing with Locale#toString() because of backwards compatibility")
+  public DateFormatEvaluator() {  
+    for (Locale locale : Locale.getAvailableLocales()) {
+      availableLocales.put(locale.toString(), locale);
+    }
+    for (String tz : TimeZone.getAvailableIDs()) {
+      availableTimezones.add(tz);
+    }
+  }
+  
+  private SimpleDateFormat getDateFormat(String pattern, TimeZone timezone, Locale locale) {
+    final SimpleDateFormat sdf = new SimpleDateFormat(pattern, locale);
+    sdf.setTimeZone(timezone);
+    return sdf;
+  }
+  
+  @Override
+  public String evaluate(String expression, Context context) {
+    List<Object> l = parseParams(expression, context.getVariableResolver());
+    if (l.size() < 2 || l.size() > 4) {
+      throw new DataImportHandlerException(SEVERE, "'formatDate()' must have two, three or four parameters ");
+    }
+    Object o = l.get(0);
+    Object format = l.get(1);
+    if (format instanceof VariableWrapper) {
+      VariableWrapper wrapper = (VariableWrapper) format;
+      o = wrapper.resolve();
+      format = o.toString();
+    }
+    Locale locale = Locale.ENGLISH; // we default to ENGLISH for dates for full Java 9 compatibility
+    if(l.size()>2) {
+      Object localeObj = l.get(2);
+      String localeStr = null;
+      if (localeObj  instanceof VariableWrapper) {
+        localeStr = ((VariableWrapper) localeObj).resolve().toString();
+      } else {
+        localeStr = localeObj.toString();
+      }
+      locale = availableLocales.get(localeStr);
+      if (locale == null) try {
+        locale = new Locale.Builder().setLanguageTag(localeStr).build();
+      } catch (IllformedLocaleException ex) {
+        throw new DataImportHandlerException(SEVERE, "Malformed / non-existent locale: " + localeStr, ex);
+      }
+    }
+    TimeZone tz = TimeZone.getDefault(); // DWS TODO: is this the right default for us?  Deserves explanation if so.
+    if(l.size()==4) {
+      Object tzObj = l.get(3);
+      String tzStr = null;
+      if (tzObj  instanceof VariableWrapper) {
+        tzStr = ((VariableWrapper) tzObj).resolve().toString();
+      } else {
+        tzStr = tzObj.toString();
+      }
+      if(availableTimezones.contains(tzStr)) {
+        tz = TimeZone.getTimeZone(tzStr);
+      } else {
+        throw new DataImportHandlerException(SEVERE, "Unsupported Timezone: " + tzStr);
+      }
+    }
+    String dateFmt = format.toString();
+    SimpleDateFormat fmt = getDateFormat(dateFmt, tz, locale);
+    Date date = null;
+    if (o instanceof VariableWrapper) {
+      date = evaluateWrapper((VariableWrapper) o, locale, tz);
+    } else {
+      date = evaluateString(o.toString(), locale, tz);
+    }
+    return fmt.format(date);
+  }
+
+  /**
+   * NOTE: declared as a method to allow for extensibility
+   *
+   * @lucene.experimental this API is experimental and subject to change
+   * @return the result of evaluating a string
+   */
+  protected Date evaluateWrapper(VariableWrapper variableWrapper, Locale locale, TimeZone tz) {
+    Date date = null;
+    Object variableval = resolveWrapper(variableWrapper,locale,tz);
+    if (variableval instanceof Date) {
+      date = (Date) variableval;
+    } else {
+      String s = variableval.toString();
+      try {
+        date = getDateFormat(DEFAULT_DATE_FORMAT, tz, locale).parse(s);
+      } catch (ParseException exp) {
+        wrapAndThrow(SEVERE, exp, "Invalid expression for date");
+      }
+    }
+    return date;
+  }
+
+  /**
+   * NOTE: declared as a method to allow for extensibility
+   * @lucene.experimental
+   * @return the result of evaluating a string
+   */
+  protected Date evaluateString(String datemathfmt, Locale locale, TimeZone tz) {
+    // note: DMP does not use the locale but perhaps a subclass might use it, for e.g. parsing a date in a custom
+    // string that doesn't necessarily have date math?
+    //TODO refactor DateMathParser.parseMath a bit to have a static method for this logic.
+    if (datemathfmt.startsWith("NOW")) {
+      datemathfmt = datemathfmt.substring("NOW".length());
+    }
+    try {
+      DateMathParser parser = new DateMathParser(tz);
+      parser.setNow(new Date());// thus do *not* use SolrRequestInfo
+      return parser.parseMath(datemathfmt);
+    } catch (ParseException e) {
+      throw wrapAndThrow(SEVERE, e, "Invalid expression for date");
+    }
+  }
+
+  /**
+   * NOTE: declared as a method to allow for extensibility
+   * @lucene.experimental
+   * @return the result of resolving the variable wrapper
+   */
+  protected Object resolveWrapper(VariableWrapper variableWrapper, Locale locale, TimeZone tz) {
+    return variableWrapper.resolve();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d7c03684/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DateFormatTransformer.java
----------------------------------------------------------------------
diff --git a/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DateFormatTransformer.java b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DateFormatTransformer.java
new file mode 100644
index 0000000..6da9cc1
--- /dev/null
+++ b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DateFormatTransformer.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.dataimport;
+
+import java.lang.invoke.MethodHandles;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.*;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p>
+ * {@link Transformer} instance which creates {@link Date} instances out of {@link String}s.
+ * </p>
+ * <p>
+ * Refer to <a
+ * href="http://wiki.apache.org/solr/DataImportHandler">http://wiki.apache.org/solr/DataImportHandler</a>
+ * for more details.
+ * <p>
+ * <b>This API is experimental and subject to change</b>
+ *
+ * @since solr 1.3
+ */
+public class DateFormatTransformer extends Transformer {
+  private Map<String, SimpleDateFormat> fmtCache = new HashMap<>();
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public Object transformRow(Map<String, Object> aRow, Context context) {
+
+    for (Map<String, String> map : context.getAllEntityFields()) {
+      Locale locale = Locale.ENGLISH; // we default to ENGLISH for dates for full Java 9 compatibility
+      String customLocale = map.get(LOCALE);
+      if (customLocale != null) {
+        try {
+          locale = new Locale.Builder().setLanguageTag(customLocale).build();
+        } catch (IllformedLocaleException e) {
+          throw new DataImportHandlerException(DataImportHandlerException.SEVERE, "Invalid Locale specified: " + customLocale, e);
+        }
+      }
+
+      String fmt = map.get(DATE_TIME_FMT);
+      if (fmt == null)
+        continue;
+      VariableResolver resolver = context.getVariableResolver();
+      fmt = resolver.replaceTokens(fmt);
+      String column = map.get(DataImporter.COLUMN);
+      String srcCol = map.get(RegexTransformer.SRC_COL_NAME);
+      if (srcCol == null)
+        srcCol = column;
+      try {
+        Object o = aRow.get(srcCol);
+        if (o instanceof List) {
+          List inputs = (List) o;
+          List<Date> results = new ArrayList<>();
+          for (Object input : inputs) {
+            results.add(process(input, fmt, locale));
+          }
+          aRow.put(column, results);
+        } else {
+          if (o != null) {
+            aRow.put(column, process(o, fmt, locale));
+          }
+        }
+      } catch (ParseException e) {
+        log.warn("Could not parse a Date field ", e);
+      }
+    }
+    return aRow;
+  }
+
+  private Date process(Object value, String format, Locale locale) throws ParseException {
+    if (value == null) return null;
+    String strVal = value.toString().trim();
+    if (strVal.length() == 0)
+      return null;
+    SimpleDateFormat fmt = fmtCache.get(format);
+    if (fmt == null) {
+      fmt = new SimpleDateFormat(format, locale);
+      fmtCache.put(format, fmt);
+    }
+    return fmt.parse(strVal);
+  }
+
+  public static final String DATE_TIME_FMT = "dateTimeFormat";
+  
+  public static final String LOCALE = "locale";
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d7c03684/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DebugInfo.java
----------------------------------------------------------------------
diff --git a/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DebugInfo.java b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DebugInfo.java
new file mode 100644
index 0000000..623832f
--- /dev/null
+++ b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DebugInfo.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.dataimport;
+
+import java.util.AbstractList;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.StrUtils;
+
+public class DebugInfo {
+
+  private static final class ChildRollupDocs extends AbstractList<SolrInputDocument> {
+
+    private List<SolrInputDocument> delegate = new ArrayList<>();
+
+    @Override
+    public SolrInputDocument get(int index) {
+      return delegate.get(index);
+    }
+
+    @Override
+    public int size() {
+      return delegate.size();
+    }
+
+    public boolean add(SolrInputDocument e) {
+      SolrInputDocument transformed = e.deepCopy();
+      if (transformed.hasChildDocuments()) {
+        ChildRollupDocs childList = new ChildRollupDocs();
+        childList.addAll(transformed.getChildDocuments());
+        transformed.addField("_childDocuments_", childList);
+        transformed.getChildDocuments().clear();
+      }
+      return delegate.add(transformed);
+    }
+  }
+
+  public List<SolrInputDocument> debugDocuments = new ChildRollupDocs();
+
+  public NamedList<String> debugVerboseOutput = null;
+  public boolean verbose;
+  
+  public DebugInfo(Map<String,Object> requestParams) {
+    verbose = StrUtils.parseBool((String) requestParams.get("verbose"), false);
+    debugVerboseOutput = new NamedList<>();
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d7c03684/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DebugLogger.java
----------------------------------------------------------------------
diff --git a/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DebugLogger.java b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DebugLogger.java
new file mode 100644
index 0000000..2fd9303
--- /dev/null
+++ b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DebugLogger.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.dataimport;
+import org.apache.solr.common.util.NamedList;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.text.MessageFormat;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Stack;
+
+/**
+ * <p>
+ * Implements most of the interactive development functionality
+ * </p>
+ * <p/>
+ * <p>
+ * Refer to <a
+ * href="http://wiki.apache.org/solr/DataImportHandler">http://wiki.apache.org/solr/DataImportHandler</a>
+ * for more details.
+ * </p>
+ * <p/>
+ * <b>This API is experimental and subject to change</b>
+ *
+ * @since solr 1.3
+ */
+class DebugLogger {
+  private Stack<DebugInfo> debugStack;
+
+  NamedList output;
+//  private final SolrWriter writer1;
+
+  private static final String LINE = "---------------------------------------------";
+
+  private MessageFormat fmt = new MessageFormat(
+          "----------- row #{0}-------------", Locale.ROOT);
+
+  boolean enabled = true;
+
+  public DebugLogger() {
+//    writer = solrWriter;
+    output = new NamedList();
+    debugStack = new Stack<DebugInfo>() {
+
+      @Override
+      public DebugInfo pop() {
+        if (size() == 1)
+          throw new DataImportHandlerException(
+                  DataImportHandlerException.SEVERE, "Stack is becoming empty");
+        return super.pop();
+      }
+    };
+    debugStack.push(new DebugInfo(null, DIHLogLevels.NONE, null));
+    output = debugStack.peek().lst;
+  }
+
+    private DebugInfo peekStack() {
+    return debugStack.isEmpty() ? null : debugStack.peek();
+  }
+
+  public void log(DIHLogLevels event, String name, Object row) {
+    if (event == DIHLogLevels.DISABLE_LOGGING) {
+      enabled = false;
+      return;
+    } else if (event == DIHLogLevels.ENABLE_LOGGING) {
+      enabled = true;
+      return;
+    }
+
+    if (!enabled && event != DIHLogLevels.START_ENTITY
+            && event != DIHLogLevels.END_ENTITY) {
+      return;
+    }
+
+    if (event == DIHLogLevels.START_DOC) {
+      debugStack.push(new DebugInfo(null, DIHLogLevels.START_DOC, peekStack()));
+    } else if (DIHLogLevels.START_ENTITY == event) {
+      debugStack
+              .push(new DebugInfo(name, DIHLogLevels.START_ENTITY, peekStack()));
+    } else if (DIHLogLevels.ENTITY_OUT == event
+            || DIHLogLevels.PRE_TRANSFORMER_ROW == event) {
+      if (debugStack.peek().type == DIHLogLevels.START_ENTITY
+              || debugStack.peek().type == DIHLogLevels.START_DOC) {
+        debugStack.peek().lst.add(null, fmt.format(new Object[]{++debugStack
+                .peek().rowCount}));
+        addToNamedList(debugStack.peek().lst, row);
+        debugStack.peek().lst.add(null, LINE);
+      }
+    } else if (event == DIHLogLevels.ROW_END) {
+      popAllTransformers();
+    } else if (DIHLogLevels.END_ENTITY == event) {
+      while (debugStack.pop().type != DIHLogLevels.START_ENTITY)
+        ;
+    } else if (DIHLogLevels.END_DOC == event) {
+      while (debugStack.pop().type != DIHLogLevels.START_DOC)
+        ;
+    } else if (event == DIHLogLevels.TRANSFORMER_EXCEPTION) {
+      debugStack.push(new DebugInfo(name, event, peekStack()));
+      debugStack.peek().lst.add("EXCEPTION",
+              getStacktraceString((Exception) row));
+    } else if (DIHLogLevels.TRANSFORMED_ROW == event) {
+      debugStack.push(new DebugInfo(name, event, peekStack()));
+      debugStack.peek().lst.add(null, LINE);
+      addToNamedList(debugStack.peek().lst, row);
+      debugStack.peek().lst.add(null, LINE);
+      if (row instanceof DataImportHandlerException) {
+        DataImportHandlerException dataImportHandlerException = (DataImportHandlerException) row;
+        dataImportHandlerException.debugged = true;
+      }
+    } else if (DIHLogLevels.ENTITY_META == event) {
+      popAllTransformers();
+      debugStack.peek().lst.add(name, row);
+    } else if (DIHLogLevels.ENTITY_EXCEPTION == event) {
+      if (row instanceof DataImportHandlerException) {
+        DataImportHandlerException dihe = (DataImportHandlerException) row;
+        if (dihe.debugged)
+          return;
+        dihe.debugged = true;
+      }
+
+      popAllTransformers();
+      debugStack.peek().lst.add("EXCEPTION",
+              getStacktraceString((Exception) row));
+    }
+  }
+
+  private void popAllTransformers() {
+    while (true) {
+      DIHLogLevels type = debugStack.peek().type;
+      if (type == DIHLogLevels.START_DOC || type == DIHLogLevels.START_ENTITY)
+        break;
+      debugStack.pop();
+    }
+  }
+
+  private void addToNamedList(NamedList nl, Object row) {
+    if (row instanceof List) {
+      List list = (List) row;
+      NamedList l = new NamedList();
+      nl.add(null, l);
+      for (Object o : list) {
+        Map<String, Object> map = (Map<String, Object>) o;
+        for (Map.Entry<String, Object> entry : map.entrySet())
+          nl.add(entry.getKey(), entry.getValue());
+      }
+    } else if (row instanceof Map) {
+      Map<String, Object> map = (Map<String, Object>) row;
+      for (Map.Entry<String, Object> entry : map.entrySet())
+        nl.add(entry.getKey(), entry.getValue());
+    }
+  }
+
+  DataSource wrapDs(final DataSource ds) {
+    return new DataSource() {
+      @Override
+      public void init(Context context, Properties initProps) {
+        ds.init(context, initProps);
+      }
+
+      @Override
+      public void close() {
+        ds.close();
+      }
+
+      @Override
+      public Object getData(String query) {
+        log(DIHLogLevels.ENTITY_META, "query", query);
+        long start = System.nanoTime();
+        try {
+          return ds.getData(query);
+        } catch (DataImportHandlerException de) {
+          log(DIHLogLevels.ENTITY_EXCEPTION,
+                  null, de);
+          throw de;
+        } catch (Exception e) {
+          log(DIHLogLevels.ENTITY_EXCEPTION,
+                  null, e);
+          DataImportHandlerException de = new DataImportHandlerException(
+                  DataImportHandlerException.SEVERE, "", e);
+          de.debugged = true;
+          throw de;
+        } finally {
+          log(DIHLogLevels.ENTITY_META, "time-taken", DocBuilder
+                  .getTimeElapsedSince(start));
+        }
+      }
+    };
+  }
+
+  Transformer wrapTransformer(final Transformer t) {
+    return new Transformer() {
+      @Override
+      public Object transformRow(Map<String, Object> row, Context context) {
+        log(DIHLogLevels.PRE_TRANSFORMER_ROW, null, row);
+        String tName = getTransformerName(t);
+        Object result = null;
+        try {
+          result = t.transformRow(row, context);
+          log(DIHLogLevels.TRANSFORMED_ROW, tName, result);
+        } catch (DataImportHandlerException de) {
+          log(DIHLogLevels.TRANSFORMER_EXCEPTION, tName, de);
+          de.debugged = true;
+          throw de;
+        } catch (Exception e) {
+          log(DIHLogLevels.TRANSFORMER_EXCEPTION, tName, e);
+          DataImportHandlerException de = new DataImportHandlerException(DataImportHandlerException.SEVERE, "", e);
+          de.debugged = true;
+          throw de;
+        }
+        return result;
+      }
+    };
+  }
+
+  public static String getStacktraceString(Exception e) {
+    StringWriter sw = new StringWriter();
+    e.printStackTrace(new PrintWriter(sw));
+    return sw.toString();
+  }
+
+  static String getTransformerName(Transformer t) {
+    Class transClass = t.getClass();
+    if (t instanceof EntityProcessorWrapper.ReflectionTransformer) {
+      return ((EntityProcessorWrapper.ReflectionTransformer) t).trans;
+    }
+    if (t instanceof ScriptTransformer) {
+      ScriptTransformer scriptTransformer = (ScriptTransformer) t;
+      return "script:" + scriptTransformer.getFunctionName();
+    }
+    if (transClass.getPackage().equals(DebugLogger.class.getPackage())) {
+      return transClass.getSimpleName();
+    } else {
+      return transClass.getName();
+    }
+  }
+
+  private static class DebugInfo {
+    String name;
+
+    int tCount, rowCount;
+
+    NamedList lst;
+
+    DIHLogLevels type;
+
+    DebugInfo parent;
+
+    public DebugInfo(String name, DIHLogLevels type, DebugInfo parent) {
+      this.name = name;
+      this.type = type;
+      this.parent = parent;
+      lst = new NamedList();
+      if (parent != null) {
+        String displayName = null;
+        if (type == DIHLogLevels.START_ENTITY) {
+          displayName = "entity:" + name;
+        } else if (type == DIHLogLevels.TRANSFORMED_ROW
+                || type == DIHLogLevels.TRANSFORMER_EXCEPTION) {
+          displayName = "transformer:" + name;
+        } else if (type == DIHLogLevels.START_DOC) {
+          this.name = displayName = "document#" + SolrWriter.getDocCount();
+        }
+        parent.lst.add(displayName, lst);
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d7c03684/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DocBuilder.java
----------------------------------------------------------------------
diff --git a/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DocBuilder.java b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DocBuilder.java
new file mode 100644
index 0000000..164cf70
--- /dev/null
+++ b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DocBuilder.java
@@ -0,0 +1,988 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.dataimport;
+
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.handler.dataimport.config.ConfigNameConstants;
+import org.apache.solr.handler.dataimport.config.DIHConfiguration;
+import org.apache.solr.handler.dataimport.config.Entity;
+import org.apache.solr.handler.dataimport.config.EntityField;
+
+import static org.apache.solr.handler.dataimport.SolrWriter.LAST_INDEX_KEY;
+import static org.apache.solr.handler.dataimport.DataImportHandlerException.SEVERE;
+import static org.apache.solr.handler.dataimport.DataImportHandlerException.wrapAndThrow;
+
+import org.apache.solr.schema.IndexSchema;
+import org.apache.solr.schema.SchemaField;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.text.SimpleDateFormat;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * <p> {@link DocBuilder} is responsible for creating Solr documents out of the given configuration. It also maintains
+ * statistics information. It depends on the {@link EntityProcessor} implementations to fetch data. </p>
+ * <p>
+ * <b>This API is experimental and subject to change</b>
+ *
+ * @since solr 1.3
+ */
+public class DocBuilder {
+
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private static final AtomicBoolean WARNED_ABOUT_INDEX_TIME_BOOSTS = new AtomicBoolean();
+
+  private static final Date EPOCH = new Date(0);
+  public static final String DELETE_DOC_BY_ID = "$deleteDocById";
+  public static final String DELETE_DOC_BY_QUERY = "$deleteDocByQuery";
+  public static final String DOC_BOOST = "$docBoost";
+  public static final String SKIP_DOC = "$skipDoc";
+  public static final String SKIP_ROW = "$skipRow";
+
+  DataImporter dataImporter;
+
+  private DIHConfiguration config;
+
+  private EntityProcessorWrapper currentEntityProcessorWrapper;
+
+  @SuppressWarnings("unchecked")
+  private Map statusMessages = Collections.synchronizedMap(new LinkedHashMap());
+
+  public Statistics importStatistics = new Statistics();
+
+  DIHWriter writer;
+
+  boolean verboseDebug = false;
+
+  Map<String, Object> session = new HashMap<>();
+
+  static final ThreadLocal<DocBuilder> INSTANCE = new ThreadLocal<>();
+  private Map<String, Object> persistedProperties;
+  
+  private DIHProperties propWriter;
+  private DebugLogger debugLogger;
+  private final RequestInfo reqParams;
+  
+  public DocBuilder(DataImporter dataImporter, DIHWriter solrWriter, DIHProperties propWriter, RequestInfo reqParams) {
+    INSTANCE.set(this);
+    this.dataImporter = dataImporter;
+    this.reqParams = reqParams;
+    this.propWriter = propWriter;
+    DataImporter.QUERY_COUNT.set(importStatistics.queryCount);
+    verboseDebug = reqParams.isDebug() && reqParams.getDebugInfo().verbose;
+    persistedProperties = propWriter.readIndexerProperties();
+     
+    writer = solrWriter;
+    ContextImpl ctx = new ContextImpl(null, null, null, null, reqParams.getRawParams(), null, this);
+    if (writer != null) {
+      writer.init(ctx);
+    }
+  }
+
+
+  DebugLogger getDebugLogger(){
+    if (debugLogger == null) {
+      debugLogger = new DebugLogger();
+    }
+    return debugLogger;
+  }
+
+  private VariableResolver getVariableResolver() {
+    try {
+      VariableResolver resolver = null;
+      String epoch = propWriter.convertDateToString(EPOCH);
+      if(dataImporter != null && dataImporter.getCore() != null
+          && dataImporter.getCore().getResourceLoader().getCoreProperties() != null){
+        resolver =  new VariableResolver(dataImporter.getCore().getResourceLoader().getCoreProperties());
+      } else {
+        resolver = new VariableResolver();
+      }
+      resolver.setEvaluators(dataImporter.getEvaluators());
+      Map<String, Object> indexerNamespace = new HashMap<>();
+      if (persistedProperties.get(LAST_INDEX_TIME) != null) {
+        indexerNamespace.put(LAST_INDEX_TIME, persistedProperties.get(LAST_INDEX_TIME));
+      } else  {
+        // set epoch
+        indexerNamespace.put(LAST_INDEX_TIME, epoch);
+      }
+      indexerNamespace.put(INDEX_START_TIME, dataImporter.getIndexStartTime());
+      indexerNamespace.put("request", new HashMap<>(reqParams.getRawParams()));
+      indexerNamespace.put("handlerName", dataImporter.getHandlerName());
+      for (Entity entity : dataImporter.getConfig().getEntities()) {
+        Map<String, Object> entityNamespace = new HashMap<>();
+        String key = SolrWriter.LAST_INDEX_KEY;
+        Object lastIndex = persistedProperties.get(entity.getName() + "." + key);
+        if (lastIndex != null) {
+          entityNamespace.put(SolrWriter.LAST_INDEX_KEY, lastIndex);
+        } else  {
+          entityNamespace.put(SolrWriter.LAST_INDEX_KEY, epoch);
+        }
+        indexerNamespace.put(entity.getName(), entityNamespace);
+      }
+      resolver.addNamespace(ConfigNameConstants.IMPORTER_NS_SHORT, indexerNamespace);
+      resolver.addNamespace(ConfigNameConstants.IMPORTER_NS, indexerNamespace);
+      return resolver;
+    } catch (Exception e) {
+      wrapAndThrow(SEVERE, e);
+      // unreachable statement
+      return null;
+    }
+  }
+
+  private void invokeEventListener(String className) {
+    invokeEventListener(className, null);
+  }
+
+
+  private void invokeEventListener(String className, Exception lastException) {
+    try {
+      EventListener listener = (EventListener) loadClass(className, dataImporter.getCore()).newInstance();
+      notifyListener(listener, lastException);
+    } catch (Exception e) {
+      wrapAndThrow(SEVERE, e, "Unable to load class : " + className);
+    }
+  }
+
+  private void notifyListener(EventListener listener, Exception lastException) {
+    String currentProcess;
+    if (dataImporter.getStatus() == DataImporter.Status.RUNNING_DELTA_DUMP) {
+      currentProcess = Context.DELTA_DUMP;
+    } else {
+      currentProcess = Context.FULL_DUMP;
+    }
+    ContextImpl ctx = new ContextImpl(null, getVariableResolver(), null, currentProcess, session, null, this);
+    ctx.setLastException(lastException);
+    listener.onEvent(ctx);
+  }
+
+  @SuppressWarnings("unchecked")
+  public void execute() {
+    List<EntityProcessorWrapper> epwList = null;
+    try {
+      dataImporter.store(DataImporter.STATUS_MSGS, statusMessages);
+      config = dataImporter.getConfig();
+      final AtomicLong startTime = new AtomicLong(System.nanoTime());
+      statusMessages.put(TIME_ELAPSED, new Object() {
+        @Override
+        public String toString() {
+          return getTimeElapsedSince(startTime.get());
+        }
+      });
+
+      statusMessages.put(DataImporter.MSG.TOTAL_QUERIES_EXECUTED,
+              importStatistics.queryCount);
+      statusMessages.put(DataImporter.MSG.TOTAL_ROWS_EXECUTED,
+              importStatistics.rowsCount);
+      statusMessages.put(DataImporter.MSG.TOTAL_DOC_PROCESSED,
+              importStatistics.docCount);
+      statusMessages.put(DataImporter.MSG.TOTAL_DOCS_SKIPPED,
+              importStatistics.skipDocCount);
+
+      List<String> entities = reqParams.getEntitiesToRun();
+
+      // Trigger onImportStart
+      if (config.getOnImportStart() != null) {
+        invokeEventListener(config.getOnImportStart());
+      }
+      AtomicBoolean fullCleanDone = new AtomicBoolean(false);
+      //we must not do a delete of *:* multiple times if there are multiple root entities to be run
+      Map<String,Object> lastIndexTimeProps = new HashMap<>();
+      lastIndexTimeProps.put(LAST_INDEX_KEY, dataImporter.getIndexStartTime());
+
+      epwList = new ArrayList<>(config.getEntities().size());
+      for (Entity e : config.getEntities()) {
+        epwList.add(getEntityProcessorWrapper(e));
+      }
+      for (EntityProcessorWrapper epw : epwList) {
+        if (entities != null && !entities.contains(epw.getEntity().getName()))
+          continue;
+        lastIndexTimeProps.put(epw.getEntity().getName() + "." + LAST_INDEX_KEY, propWriter.getCurrentTimestamp());
+        currentEntityProcessorWrapper = epw;
+        String delQuery = epw.getEntity().getAllAttributes().get("preImportDeleteQuery");
+        if (dataImporter.getStatus() == DataImporter.Status.RUNNING_DELTA_DUMP) {
+          cleanByQuery(delQuery, fullCleanDone);
+          doDelta();
+          delQuery = epw.getEntity().getAllAttributes().get("postImportDeleteQuery");
+          if (delQuery != null) {
+            fullCleanDone.set(false);
+            cleanByQuery(delQuery, fullCleanDone);
+          }
+        } else {
+          cleanByQuery(delQuery, fullCleanDone);
+          doFullDump();
+          delQuery = epw.getEntity().getAllAttributes().get("postImportDeleteQuery");
+          if (delQuery != null) {
+            fullCleanDone.set(false);
+            cleanByQuery(delQuery, fullCleanDone);
+          }
+        }
+      }
+
+      if (stop.get()) {
+        // Dont commit if aborted using command=abort
+        statusMessages.put("Aborted", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.ROOT).format(new Date()));
+        handleError("Aborted", null);
+      } else {
+        // Do not commit unnecessarily if this is a delta-import and no documents were created or deleted
+        if (!reqParams.isClean()) {
+          if (importStatistics.docCount.get() > 0 || importStatistics.deletedDocCount.get() > 0) {
+            finish(lastIndexTimeProps);
+          }
+        } else {
+          // Finished operation normally, commit now
+          finish(lastIndexTimeProps);
+        }
+
+        if (config.getOnImportEnd() != null) {
+          invokeEventListener(config.getOnImportEnd());
+        }
+      }
+
+      statusMessages.remove(TIME_ELAPSED);
+      statusMessages.put(DataImporter.MSG.TOTAL_DOC_PROCESSED, ""+ importStatistics.docCount.get());
+      if(importStatistics.failedDocCount.get() > 0)
+        statusMessages.put(DataImporter.MSG.TOTAL_FAILED_DOCS, ""+ importStatistics.failedDocCount.get());
+
+      statusMessages.put("Time taken", getTimeElapsedSince(startTime.get()));
+      log.info("Time taken = " + getTimeElapsedSince(startTime.get()));
+    } catch(Exception e)
+    {
+      throw new RuntimeException(e);
+    } finally
+    {
+      if (writer != null) {
+        writer.close();
+      }
+      if (epwList != null) {
+        closeEntityProcessorWrappers(epwList);
+      }
+      if(reqParams.isDebug()) {
+        reqParams.getDebugInfo().debugVerboseOutput = getDebugLogger().output;
+      }
+    }
+  }
+  private void closeEntityProcessorWrappers(List<EntityProcessorWrapper> epwList) {
+    for(EntityProcessorWrapper epw : epwList) {
+      epw.close();
+      if(epw.getDatasource()!=null) {
+        epw.getDatasource().close();
+      }
+      closeEntityProcessorWrappers(epw.getChildren());
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private void finish(Map<String,Object> lastIndexTimeProps) {
+    log.info("Import completed successfully");
+    statusMessages.put("", "Indexing completed. Added/Updated: "
+            + importStatistics.docCount + " documents. Deleted "
+            + importStatistics.deletedDocCount + " documents.");
+    if(reqParams.isCommit()) {
+      writer.commit(reqParams.isOptimize());
+      addStatusMessage("Committed");
+      if (reqParams.isOptimize())
+        addStatusMessage("Optimized");
+    }
+    try {
+      propWriter.persist(lastIndexTimeProps);
+    } catch (Exception e) {
+      log.error("Could not write property file", e);
+      statusMessages.put("error", "Could not write property file. Delta imports will not work. " +
+          "Make sure your conf directory is writable");
+    }
+  }
+
+  void handleError(String message, Exception e) {
+    if (!dataImporter.getCore().getCoreContainer().isZooKeeperAware()) {
+      writer.rollback();
+    }
+
+    statusMessages.put(message, "Indexing error");
+    addStatusMessage(message);
+    if ((config != null) && (config.getOnError() != null)) {
+      invokeEventListener(config.getOnError(), e);
+    }
+  }
+
+  private void doFullDump() {
+    addStatusMessage("Full Dump Started");    
+    buildDocument(getVariableResolver(), null, null, currentEntityProcessorWrapper, true, null);
+  }
+
+  @SuppressWarnings("unchecked")
+  private void doDelta() {
+    addStatusMessage("Delta Dump started");
+    VariableResolver resolver = getVariableResolver();
+
+    if (config.getDeleteQuery() != null) {
+      writer.deleteByQuery(config.getDeleteQuery());
+    }
+
+    addStatusMessage("Identifying Delta");
+    log.info("Starting delta collection.");
+    Set<Map<String, Object>> deletedKeys = new HashSet<>();
+    Set<Map<String, Object>> allPks = collectDelta(currentEntityProcessorWrapper, resolver, deletedKeys);
+    if (stop.get())
+      return;
+    addStatusMessage("Deltas Obtained");
+    addStatusMessage("Building documents");
+    if (!deletedKeys.isEmpty()) {
+      allPks.removeAll(deletedKeys);
+      deleteAll(deletedKeys);
+      // Make sure that documents are not re-created
+    }
+    deletedKeys = null;
+    writer.setDeltaKeys(allPks);
+
+    statusMessages.put("Total Changed Documents", allPks.size());
+    VariableResolver vri = getVariableResolver();
+    Iterator<Map<String, Object>> pkIter = allPks.iterator();
+    while (pkIter.hasNext()) {
+      Map<String, Object> map = pkIter.next();
+      vri.addNamespace(ConfigNameConstants.IMPORTER_NS_SHORT + ".delta", map);
+      buildDocument(vri, null, map, currentEntityProcessorWrapper, true, null);
+      pkIter.remove();
+      // check for abort
+      if (stop.get())
+        break;
+    }
+
+    if (!stop.get()) {
+      log.info("Delta Import completed successfully");
+    }
+  }
+
+  private void deleteAll(Set<Map<String, Object>> deletedKeys) {
+    log.info("Deleting stale documents ");
+    Iterator<Map<String, Object>> iter = deletedKeys.iterator();
+    while (iter.hasNext()) {
+      Map<String, Object> map = iter.next();
+      String keyName = currentEntityProcessorWrapper.getEntity().isDocRoot() ? currentEntityProcessorWrapper.getEntity().getPk() : currentEntityProcessorWrapper.getEntity().getSchemaPk();
+      Object key = map.get(keyName);
+      if(key == null) {
+        keyName = findMatchingPkColumn(keyName, map);
+        key = map.get(keyName);
+      }
+      if(key == null) {
+        log.warn("no key was available for deleted pk query. keyName = " + keyName);
+        continue;
+      }
+      writer.deleteDoc(key);
+      importStatistics.deletedDocCount.incrementAndGet();
+      iter.remove();
+    }
+  }
+  
+  @SuppressWarnings("unchecked")
+  public void addStatusMessage(String msg) {
+    statusMessages.put(msg, new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.ROOT).format(new Date()));
+  }
+
+  private void resetEntity(EntityProcessorWrapper epw) {
+    epw.setInitialized(false);
+    for (EntityProcessorWrapper child : epw.getChildren()) {
+      resetEntity(child);
+    }
+    
+  }
+  
+  private void buildDocument(VariableResolver vr, DocWrapper doc,
+      Map<String,Object> pk, EntityProcessorWrapper epw, boolean isRoot,
+      ContextImpl parentCtx) {
+    List<EntityProcessorWrapper> entitiesToDestroy = new ArrayList<>();
+    try {
+      buildDocument(vr, doc, pk, epw, isRoot, parentCtx, entitiesToDestroy);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    } finally {
+      for (EntityProcessorWrapper entityWrapper : entitiesToDestroy) {
+        entityWrapper.destroy();
+      }
+      resetEntity(epw);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private void buildDocument(VariableResolver vr, DocWrapper doc,
+                             Map<String, Object> pk, EntityProcessorWrapper epw, boolean isRoot,
+                             ContextImpl parentCtx, List<EntityProcessorWrapper> entitiesToDestroy) {
+
+    ContextImpl ctx = new ContextImpl(epw, vr, null,
+            pk == null ? Context.FULL_DUMP : Context.DELTA_DUMP,
+            session, parentCtx, this);
+    epw.init(ctx);
+    if (!epw.isInitialized()) {
+      entitiesToDestroy.add(epw);
+      epw.setInitialized(true);
+    }
+    
+    if (reqParams.getStart() > 0) {
+      getDebugLogger().log(DIHLogLevels.DISABLE_LOGGING, null, null);
+    }
+
+    if (verboseDebug) {
+      getDebugLogger().log(DIHLogLevels.START_ENTITY, epw.getEntity().getName(), null);
+    }
+
+    int seenDocCount = 0;
+
+    try {
+      while (true) {
+        if (stop.get())
+          return;
+        if(importStatistics.docCount.get() > (reqParams.getStart() + reqParams.getRows())) break;
+        try {
+          seenDocCount++;
+
+          if (seenDocCount > reqParams.getStart()) {
+            getDebugLogger().log(DIHLogLevels.ENABLE_LOGGING, null, null);
+          }
+
+          if (verboseDebug && epw.getEntity().isDocRoot()) {
+            getDebugLogger().log(DIHLogLevels.START_DOC, epw.getEntity().getName(), null);
+          }
+          if (doc == null && epw.getEntity().isDocRoot()) {
+            doc = new DocWrapper();
+            ctx.setDoc(doc);
+            Entity e = epw.getEntity();
+            while (e.getParentEntity() != null) {
+              addFields(e.getParentEntity(), doc, (Map<String, Object>) vr
+                      .resolve(e.getParentEntity().getName()), vr);
+              e = e.getParentEntity();
+            }
+          }
+
+          Map<String, Object> arow = epw.nextRow();
+          if (arow == null) {
+            break;
+          }
+
+          // Support for start parameter in debug mode
+          if (epw.getEntity().isDocRoot()) {
+            if (seenDocCount <= reqParams.getStart())
+              continue;
+            if (seenDocCount > reqParams.getStart() + reqParams.getRows()) {
+              log.info("Indexing stopped at docCount = " + importStatistics.docCount);
+              break;
+            }
+          }
+
+          if (verboseDebug) {
+            getDebugLogger().log(DIHLogLevels.ENTITY_OUT, epw.getEntity().getName(), arow);
+          }
+          importStatistics.rowsCount.incrementAndGet();
+          
+          DocWrapper childDoc = null;
+          if (doc != null) {
+            if (epw.getEntity().isChild()) {
+              childDoc = new DocWrapper();
+              handleSpecialCommands(arow, childDoc);
+              addFields(epw.getEntity(), childDoc, arow, vr);
+              doc.addChildDocument(childDoc);
+            } else {
+              handleSpecialCommands(arow, doc);
+              vr.addNamespace(epw.getEntity().getName(), arow);
+              addFields(epw.getEntity(), doc, arow, vr);
+              vr.removeNamespace(epw.getEntity().getName());
+            }
+          }
+          if (epw.getEntity().getChildren() != null) {
+            vr.addNamespace(epw.getEntity().getName(), arow);
+            for (EntityProcessorWrapper child : epw.getChildren()) {
+              if (childDoc != null) {
+              buildDocument(vr, childDoc,
+                  child.getEntity().isDocRoot() ? pk : null, child, false, ctx, entitiesToDestroy);
+              } else {
+                buildDocument(vr, doc,
+                    child.getEntity().isDocRoot() ? pk : null, child, false, ctx, entitiesToDestroy);
+              }
+            }
+            vr.removeNamespace(epw.getEntity().getName());
+          }
+          if (epw.getEntity().isDocRoot()) {
+            if (stop.get())
+              return;
+            if (!doc.isEmpty()) {
+              boolean result = writer.upload(doc);
+              if(reqParams.isDebug()) {
+                reqParams.getDebugInfo().debugDocuments.add(doc);
+              }
+              doc = null;
+              if (result){
+                importStatistics.docCount.incrementAndGet();
+              } else {
+                importStatistics.failedDocCount.incrementAndGet();
+              }
+            }
+          }
+        } catch (DataImportHandlerException e) {
+          if (verboseDebug) {
+            getDebugLogger().log(DIHLogLevels.ENTITY_EXCEPTION, epw.getEntity().getName(), e);
+          }
+          if(e.getErrCode() == DataImportHandlerException.SKIP_ROW){
+            continue;
+          }
+          if (isRoot) {
+            if (e.getErrCode() == DataImportHandlerException.SKIP) {
+              importStatistics.skipDocCount.getAndIncrement();
+              doc = null;
+            } else {
+              SolrException.log(log, "Exception while processing: "
+                      + epw.getEntity().getName() + " document : " + doc, e);
+            }
+            if (e.getErrCode() == DataImportHandlerException.SEVERE)
+              throw e;
+          } else
+            throw e;
+        } catch (Exception t) {
+          if (verboseDebug) {
+            getDebugLogger().log(DIHLogLevels.ENTITY_EXCEPTION, epw.getEntity().getName(), t);
+          }
+          throw new DataImportHandlerException(DataImportHandlerException.SEVERE, t);
+        } finally {
+          if (verboseDebug) {
+            getDebugLogger().log(DIHLogLevels.ROW_END, epw.getEntity().getName(), null);
+            if (epw.getEntity().isDocRoot())
+              getDebugLogger().log(DIHLogLevels.END_DOC, null, null);
+          }
+        }
+      }
+    } finally {
+      if (verboseDebug) {
+        getDebugLogger().log(DIHLogLevels.END_ENTITY, null, null);
+      }
+    }
+  }
+
+  static class DocWrapper extends SolrInputDocument {
+    //final SolrInputDocument solrDocument = new SolrInputDocument();
+    Map<String ,Object> session;
+
+    public void setSessionAttribute(String key, Object val){
+      if(session == null) session = new HashMap<>();
+      session.put(key, val);
+    }
+
+    public Object getSessionAttribute(String key) {
+      return session == null ? null : session.get(key);
+    }
+  }
+
+  private void handleSpecialCommands(Map<String, Object> arow, DocWrapper doc) {
+    Object value = arow.get(DELETE_DOC_BY_ID);
+    if (value != null) {
+      if (value instanceof Collection) {
+        Collection collection = (Collection) value;
+        for (Object o : collection) {
+          writer.deleteDoc(o.toString());
+          importStatistics.deletedDocCount.incrementAndGet();
+        }
+      } else {
+        writer.deleteDoc(value);
+        importStatistics.deletedDocCount.incrementAndGet();
+      }
+    }    
+    value = arow.get(DELETE_DOC_BY_QUERY);
+    if (value != null) {
+      if (value instanceof Collection) {
+        Collection collection = (Collection) value;
+        for (Object o : collection) {
+          writer.deleteByQuery(o.toString());
+          importStatistics.deletedDocCount.incrementAndGet();
+        }
+      } else {
+        writer.deleteByQuery(value.toString());
+        importStatistics.deletedDocCount.incrementAndGet();
+      }
+    }
+    value = arow.get(DOC_BOOST);
+    if (value != null) {
+      String message = "Ignoring document boost: " + value + " as index-time boosts are not supported anymore";
+      if (WARNED_ABOUT_INDEX_TIME_BOOSTS.compareAndSet(false, true)) {
+        log.warn(message);
+      } else {
+        log.debug(message);
+      }
+    }
+
+    value = arow.get(SKIP_DOC);
+    if (value != null) {
+      if (Boolean.parseBoolean(value.toString())) {
+        throw new DataImportHandlerException(DataImportHandlerException.SKIP,
+                "Document skipped :" + arow);
+      }
+    }
+
+    value = arow.get(SKIP_ROW);
+    if (value != null) {
+      if (Boolean.parseBoolean(value.toString())) {
+        throw new DataImportHandlerException(DataImportHandlerException.SKIP_ROW);
+      }
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private void addFields(Entity entity, DocWrapper doc,
+                         Map<String, Object> arow, VariableResolver vr) {
+    for (Map.Entry<String, Object> entry : arow.entrySet()) {
+      String key = entry.getKey();
+      Object value = entry.getValue();
+      if (value == null)  continue;
+      if (key.startsWith("$")) continue;
+      Set<EntityField> field = entity.getColNameVsField().get(key);
+      IndexSchema schema = null == reqParams.getRequest() ? null : reqParams.getRequest().getSchema();
+      if (field == null && schema != null) {
+        // This can be a dynamic field or a field which does not have an entry in data-config ( an implicit field)
+        SchemaField sf = schema.getFieldOrNull(key);
+        if (sf == null) {
+          sf = config.getSchemaField(key);
+        }
+        if (sf != null) {
+          addFieldToDoc(entry.getValue(), sf.getName(), sf.multiValued(), doc);
+        }
+        //else do nothing. if we add it it may fail
+      } else {
+        if (field != null) {
+          for (EntityField f : field) {
+            String name = f.getName();
+            boolean multiValued = f.isMultiValued();
+            boolean toWrite = f.isToWrite();
+            if(f.isDynamicName()){
+              name =  vr.replaceTokens(name);
+              SchemaField schemaField = config.getSchemaField(name);
+              if(schemaField == null) {
+                toWrite = false;
+              } else {
+                multiValued = schemaField.multiValued();
+                toWrite = true;
+              }
+            }
+            if (toWrite) {
+              addFieldToDoc(entry.getValue(), name, multiValued, doc);
+            }
+          }
+        }
+      }
+    }
+  }
+
+  private void addFieldToDoc(Object value, String name, boolean multiValued, DocWrapper doc) {
+    if (value instanceof Collection) {
+      Collection collection = (Collection) value;
+      if (multiValued) {
+        for (Object o : collection) {
+          if (o != null)
+            doc.addField(name, o);
+        }
+      } else {
+        if (doc.getField(name) == null)
+          for (Object o : collection) {
+            if (o != null)  {
+              doc.addField(name, o);
+              break;
+            }
+          }
+      }
+    } else if (multiValued) {
+      if (value != null)  {
+        doc.addField(name, value);
+      }
+    } else {
+      if (doc.getField(name) == null && value != null)
+        doc.addField(name, value);
+    }
+  }
+
+  public EntityProcessorWrapper getEntityProcessorWrapper(Entity entity) {
+    EntityProcessor entityProcessor = null;
+    if (entity.getProcessorName() == null) {
+      entityProcessor = new SqlEntityProcessor();
+    } else {
+      try {
+        entityProcessor = (EntityProcessor) loadClass(entity.getProcessorName(), dataImporter.getCore())
+                .newInstance();
+      } catch (Exception e) {
+        wrapAndThrow (SEVERE,e,
+                "Unable to load EntityProcessor implementation for entity:" + entity.getName());
+      }
+    }
+    EntityProcessorWrapper epw = new EntityProcessorWrapper(entityProcessor, entity, this);
+    for(Entity e1 : entity.getChildren()) {
+      epw.getChildren().add(getEntityProcessorWrapper(e1));
+    }
+      
+    return epw;
+  }
+
+  private String findMatchingPkColumn(String pk, Map<String, Object> row) {
+    if (row.containsKey(pk)) {
+      throw new IllegalArgumentException(String.format(Locale.ROOT,
+          "deltaQuery returned a row with null for primary key %s", pk));
+    }
+    String resolvedPk = null;
+    for (String columnName : row.keySet()) {
+      if (columnName.endsWith("." + pk) || pk.endsWith("." + columnName)) {
+        if (resolvedPk != null)
+          throw new IllegalArgumentException(
+            String.format(Locale.ROOT, 
+              "deltaQuery has more than one column (%s and %s) that might resolve to declared primary key pk='%s'",
+              resolvedPk, columnName, pk));
+        resolvedPk = columnName;
+      }
+    }
+    if (resolvedPk == null) {
+      throw new IllegalArgumentException(
+          String
+              .format(
+                  Locale.ROOT,
+                  "deltaQuery has no column to resolve to declared primary key pk='%s'",
+                  pk));
+    }
+    log.info(String.format(Locale.ROOT,
+        "Resolving deltaQuery column '%s' to match entity's declared pk '%s'",
+        resolvedPk, pk));
+    return resolvedPk;
+  }
+
+  /**
+   * <p> Collects unique keys of all Solr documents for whom one or more source tables have been changed since the last
+   * indexed time. </p> <p> Note: In our definition, unique key of Solr document is the primary key of the top level
+   * entity (unless skipped using docRoot=false) in the Solr document in data-config.xml </p>
+   *
+   * @return an iterator to the list of keys for which Solr documents should be updated.
+   */
+  @SuppressWarnings("unchecked")
+  public Set<Map<String, Object>> collectDelta(EntityProcessorWrapper epw, VariableResolver resolver,
+                                               Set<Map<String, Object>> deletedRows) {
+    //someone called abort
+    if (stop.get())
+      return new HashSet();
+
+    ContextImpl context1 = new ContextImpl(epw, resolver, null, Context.FIND_DELTA, session, null, this);
+    epw.init(context1);
+
+    Set<Map<String, Object>> myModifiedPks = new HashSet<>();
+
+   
+
+    for (EntityProcessorWrapper childEpw : epw.getChildren()) {
+      //this ensures that we start from the leaf nodes
+      myModifiedPks.addAll(collectDelta(childEpw, resolver, deletedRows));
+      //someone called abort
+      if (stop.get())
+        return new HashSet();
+    }
+    
+    // identifying the modified rows for this entity
+    Map<String, Map<String, Object>> deltaSet = new HashMap<>();
+    log.info("Running ModifiedRowKey() for Entity: " + epw.getEntity().getName());
+    //get the modified rows in this entity
+    String pk = epw.getEntity().getPk();
+    while (true) {
+      Map<String, Object> row = epw.nextModifiedRowKey();
+
+      if (row == null)
+        break;
+
+      Object pkValue = row.get(pk);
+      if (pkValue == null) {
+        pk = findMatchingPkColumn(pk, row);
+        pkValue = row.get(pk);
+      }
+
+      deltaSet.put(pkValue.toString(), row);
+      importStatistics.rowsCount.incrementAndGet();
+      // check for abort
+      if (stop.get())
+        return new HashSet();
+    }
+    //get the deleted rows for this entity
+    Set<Map<String, Object>> deletedSet = new HashSet<>();
+    while (true) {
+      Map<String, Object> row = epw.nextDeletedRowKey();
+      if (row == null)
+        break;
+
+      deletedSet.add(row);
+      
+      Object pkValue = row.get(pk);
+      if (pkValue == null) {
+        pk = findMatchingPkColumn(pk, row);
+        pkValue = row.get(pk);
+      }
+
+      // Remove deleted rows from the delta rows
+      String deletedRowPk = pkValue.toString();
+      if (deltaSet.containsKey(deletedRowPk)) {
+        deltaSet.remove(deletedRowPk);
+      }
+
+      importStatistics.rowsCount.incrementAndGet();
+      // check for abort
+      if (stop.get())
+        return new HashSet();
+    }
+
+    log.info("Completed ModifiedRowKey for Entity: " + epw.getEntity().getName() + " rows obtained : " + deltaSet.size());
+    log.info("Completed DeletedRowKey for Entity: " + epw.getEntity().getName() + " rows obtained : " + deletedSet.size());
+
+    myModifiedPks.addAll(deltaSet.values());
+    Set<Map<String, Object>> parentKeyList = new HashSet<>();
+    //all that we have captured is useless (in a sub-entity) if no rows in the parent is modified because of these
+    //propogate up the changes in the chain
+    if (epw.getEntity().getParentEntity() != null) {
+      // identifying deleted rows with deltas
+
+      for (Map<String, Object> row : myModifiedPks) {
+        resolver.addNamespace(epw.getEntity().getName(), row);
+        getModifiedParentRows(resolver, epw.getEntity().getName(), epw, parentKeyList);
+        // check for abort
+        if (stop.get())
+          return new HashSet();
+      }
+      // running the same for deletedrows
+      for (Map<String, Object> row : deletedSet) {
+        resolver.addNamespace(epw.getEntity().getName(), row);
+        getModifiedParentRows(resolver, epw.getEntity().getName(), epw, parentKeyList);
+        // check for abort
+        if (stop.get())
+          return new HashSet();
+      }
+    }
+    log.info("Completed parentDeltaQuery for Entity: " + epw.getEntity().getName());
+    if (epw.getEntity().isDocRoot())
+      deletedRows.addAll(deletedSet);
+
+    // Do not use entity.isDocRoot here because one of descendant entities may set rootEntity="true"
+    return epw.getEntity().getParentEntity() == null ?
+        myModifiedPks : new HashSet<>(parentKeyList);
+  }
+
+  private void getModifiedParentRows(VariableResolver resolver,
+                                     String entity, EntityProcessor entityProcessor,
+                                     Set<Map<String, Object>> parentKeyList) {
+    try {
+      while (true) {
+        Map<String, Object> parentRow = entityProcessor
+                .nextModifiedParentRowKey();
+        if (parentRow == null)
+          break;
+
+        parentKeyList.add(parentRow);
+        importStatistics.rowsCount.incrementAndGet();
+        // check for abort
+        if (stop.get())
+          return;
+      }
+
+    } finally {
+      resolver.removeNamespace(entity);
+    }
+  }
+
+  public void abort() {
+    stop.set(true);
+  }
+
+  private AtomicBoolean stop = new AtomicBoolean(false);
+
+  public static final String TIME_ELAPSED = "Time Elapsed";
+
+  static String getTimeElapsedSince(long l) {
+    l = TimeUnit.MILLISECONDS.convert(System.nanoTime() - l, TimeUnit.NANOSECONDS);
+    return (l / (60000 * 60)) + ":" + (l / 60000) % 60 + ":" + (l / 1000)
+            % 60 + "." + l % 1000;
+  }
+
+  public RequestInfo getReqParams() {
+    return reqParams;
+  }
+
+  @SuppressWarnings("unchecked")
+  static Class loadClass(String name, SolrCore core) throws ClassNotFoundException {
+    try {
+      return core != null ?
+              core.getResourceLoader().findClass(name, Object.class) :
+              Class.forName(name);
+    } catch (Exception e) {
+      try {
+        String n = DocBuilder.class.getPackage().getName() + "." + name;
+        return core != null ?
+                core.getResourceLoader().findClass(n, Object.class) :
+                Class.forName(n);
+      } catch (Exception e1) {
+        throw new ClassNotFoundException("Unable to load " + name + " or " + DocBuilder.class.getPackage().getName() + "." + name, e);
+      }
+    }
+  }
+
+  public static class Statistics {
+    public AtomicLong docCount = new AtomicLong();
+
+    public AtomicLong deletedDocCount = new AtomicLong();
+
+    public AtomicLong failedDocCount = new AtomicLong();
+
+    public AtomicLong rowsCount = new AtomicLong();
+
+    public AtomicLong queryCount = new AtomicLong();
+
+    public AtomicLong skipDocCount = new AtomicLong();
+
+    public Statistics add(Statistics stats) {
+      this.docCount.addAndGet(stats.docCount.get());
+      this.deletedDocCount.addAndGet(stats.deletedDocCount.get());
+      this.rowsCount.addAndGet(stats.rowsCount.get());
+      this.queryCount.addAndGet(stats.queryCount.get());
+
+      return this;
+    }
+
+    public Map<String, Object> getStatsSnapshot() {
+      Map<String, Object> result = new HashMap<>();
+      result.put("docCount", docCount.get());
+      result.put("deletedDocCount", deletedDocCount.get());
+      result.put("rowCount", rowsCount.get());
+      result.put("queryCount", rowsCount.get());
+      result.put("skipDocCount", skipDocCount.get());
+      return result;
+    }
+
+  }
+
+  private void cleanByQuery(String delQuery, AtomicBoolean completeCleanDone) {
+    delQuery = getVariableResolver().replaceTokens(delQuery);
+    if (reqParams.isClean()) {
+      if (delQuery == null && !completeCleanDone.get()) {
+        writer.doDeleteAll();
+        completeCleanDone.set(true);
+      } else if (delQuery != null) {
+        writer.deleteByQuery(delQuery);
+      }
+    }
+  }
+
+  public static final String LAST_INDEX_TIME = "last_index_time";
+  public static final String INDEX_START_TIME = "index_start_time";
+}