You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2018/11/02 11:33:36 UTC

[15/25] lucene-solr:jira/gradle: Adding dataimporthandler-extras module

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d7c03684/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/EntityProcessor.java
----------------------------------------------------------------------
diff --git a/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/EntityProcessor.java b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/EntityProcessor.java
new file mode 100644
index 0000000..8cfbed9
--- /dev/null
+++ b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/EntityProcessor.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.dataimport;
+
+import java.util.Map;
+
+/**
+ * <p>
+ * An instance of entity processor serves an entity. It is reused throughout the
+ * import process.
+ * </p>
+ * <p>
+ * Implementations of this abstract class must provide a public no-args constructor.
+ * </p>
+ * <p>
+ * Refer to <a
+ * href="http://wiki.apache.org/solr/DataImportHandler">http://wiki.apache.org/solr/DataImportHandler</a>
+ * for more details.
+ * </p>
+ * <p>
+ * <b>This API is experimental and may change in the future.</b>
+ *
+ * @since solr 1.3
+ */
+public abstract class EntityProcessor {
+
+  /**
+   * This method is called when it starts processing an entity. When it comes
+   * back to the entity it is called again. So it can reset anything at that point.
+   * For a rootmost entity this is called only once for an ingestion. For sub-entities , this
+   * is called multiple once for each row from its parent entity
+   *
+   * @param context The current context
+   */
+  public abstract void init(Context context);
+
+  /**
+   * This method helps streaming the data for each row . The implementation
+   * would fetch as many rows as needed and gives one 'row' at a time. Only this
+   * method is used during a full import
+   *
+   * @return A 'row'.  The 'key' for the map is the column name and the 'value'
+   *         is the value of that column. If there are no more rows to be
+   *         returned, return 'null'
+   */
+  public abstract Map<String, Object> nextRow();
+
+  /**
+   * This is used for delta-import. It gives the pks of the changed rows in this
+   * entity
+   *
+   * @return the pk vs value of all changed rows
+   */
+  public abstract Map<String, Object> nextModifiedRowKey();
+
+  /**
+   * This is used during delta-import. It gives the primary keys of the rows
+   * that are deleted from this entity. If this entity is the root entity, solr
+   * document is deleted. If this is a sub-entity, the Solr document is
+   * considered as 'changed' and will be recreated
+   *
+   * @return the pk vs value of all changed rows
+   */
+  public abstract Map<String, Object> nextDeletedRowKey();
+
+  /**
+   * This is used during delta-import. This gives the primary keys and their
+   * values of all the rows changed in a parent entity due to changes in this
+   * entity.
+   *
+   * @return the pk vs value of all changed rows in the parent entity
+   */
+  public abstract Map<String, Object> nextModifiedParentRowKey();
+
+  /**
+   * Invoked for each entity at the very end of the import to do any needed cleanup tasks.
+   * 
+   */
+  public abstract void destroy();
+
+  /**
+   * Invoked after the transformers are invoked. EntityProcessors can add, remove or modify values
+   * added by Transformers in this method.
+   *
+   * @param r The transformed row
+   * @since solr 1.4
+   */
+  public void postTransform(Map<String, Object> r) {
+  }
+
+  /**
+   * Invoked when the Entity processor is destroyed towards the end of import.
+   *
+   * @since solr 1.4
+   */
+  public void close() {
+    //no-op
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d7c03684/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/EntityProcessorBase.java
----------------------------------------------------------------------
diff --git a/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/EntityProcessorBase.java b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/EntityProcessorBase.java
new file mode 100644
index 0000000..8311f36
--- /dev/null
+++ b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/EntityProcessorBase.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.dataimport;
+
+import org.apache.solr.common.SolrException;
+
+import static org.apache.solr.handler.dataimport.DataImportHandlerException.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.*;
+
+/**
+ * <p> Base class for all implementations of {@link EntityProcessor} </p> <p> Most implementations of {@link EntityProcessor}
+ * extend this base class which provides common functionality. </p>
+ * <p>
+ * <b>This API is experimental and subject to change</b>
+ *
+ * @since solr 1.3
+ */
+public class EntityProcessorBase extends EntityProcessor {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  protected boolean isFirstInit = true;
+
+  protected String entityName;
+
+  protected Context context;
+
+  protected Iterator<Map<String, Object>> rowIterator;
+
+  protected String query;  
+  
+  protected String onError = ABORT;  
+  
+  protected DIHCacheSupport cacheSupport = null;
+  
+  private Zipper zipper;
+
+
+  @Override
+  public void init(Context context) {
+    this.context = context;
+    if (isFirstInit) {
+      firstInit(context);
+    }
+    if(zipper!=null){
+      zipper.onNewParent(context);
+    }else{
+      if(cacheSupport!=null) {
+        cacheSupport.initNewParent(context);
+      }   
+    }
+  }
+
+  /**
+   * first time init call. do one-time operations here
+   * it's necessary to call it from the overridden method,
+   * otherwise it throws NPE on accessing zipper from nextRow()
+   */
+  protected void firstInit(Context context) {
+    entityName = context.getEntityAttribute("name");
+    String s = context.getEntityAttribute(ON_ERROR);
+    if (s != null) onError = s;
+    
+    zipper = Zipper.createOrNull(context);
+    
+    if(zipper==null){
+      initCache(context);
+    }
+    isFirstInit = false;
+  }
+
+    protected void initCache(Context context) {
+        String cacheImplName = context
+            .getResolvedEntityAttribute(DIHCacheSupport.CACHE_IMPL);
+
+        if (cacheImplName != null ) {
+          cacheSupport = new DIHCacheSupport(context, cacheImplName);
+        }
+    }
+
+    @Override
+  public Map<String, Object> nextModifiedRowKey() {
+    return null;
+  }
+
+  @Override
+  public Map<String, Object> nextDeletedRowKey() {
+    return null;
+  }
+
+  @Override
+  public Map<String, Object> nextModifiedParentRowKey() {
+    return null;
+  }
+
+  /**
+   * For a simple implementation, this is the only method that the sub-class should implement. This is intended to
+   * stream rows one-by-one. Return null to signal end of rows
+   *
+   * @return a row where the key is the name of the field and value can be any Object or a Collection of objects. Return
+   *         null to signal end of rows
+   */
+  @Override
+  public Map<String, Object> nextRow() {
+    return null;// do not do anything
+  }
+  
+  protected Map<String, Object> getNext() {
+    if(zipper!=null){
+      return zipper.supplyNextChild(rowIterator);
+    }else{
+      if(cacheSupport==null) {
+        try {
+          if (rowIterator == null)
+            return null;
+          if (rowIterator.hasNext())
+            return rowIterator.next();
+          query = null;
+          rowIterator = null;
+          return null;
+        } catch (Exception e) {
+          SolrException.log(log, "getNext() failed for query '" + query + "'", e);
+          query = null;
+          rowIterator = null;
+          wrapAndThrow(DataImportHandlerException.WARN, e);
+          return null;
+        }
+      } else  {
+        return cacheSupport.getCacheData(context, query, rowIterator);
+      }  
+    }
+  }
+
+
+  @Override
+  public void destroy() {
+    query = null;
+    if(cacheSupport!=null){
+      cacheSupport.destroyAll();
+    }
+    cacheSupport = null;
+  }
+
+  
+
+  public static final String TRANSFORMER = "transformer";
+
+  public static final String TRANSFORM_ROW = "transformRow";
+
+  public static final String ON_ERROR = "onError";
+
+  public static final String ABORT = "abort";
+
+  public static final String CONTINUE = "continue";
+
+  public static final String SKIP = "skip";
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d7c03684/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/EntityProcessorWrapper.java
----------------------------------------------------------------------
diff --git a/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/EntityProcessorWrapper.java b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/EntityProcessorWrapper.java
new file mode 100644
index 0000000..8a76e11
--- /dev/null
+++ b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/EntityProcessorWrapper.java
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.dataimport;
+
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.handler.dataimport.config.ConfigNameConstants;
+import org.apache.solr.handler.dataimport.config.Entity;
+
+import static org.apache.solr.handler.dataimport.DataImportHandlerException.*;
+import static org.apache.solr.handler.dataimport.EntityProcessorBase.*;
+import static org.apache.solr.handler.dataimport.EntityProcessorBase.SKIP;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A Wrapper over {@link EntityProcessor} instance which performs transforms and handles multi-row outputs correctly.
+ *
+ * @since solr 1.4
+ */
+public class EntityProcessorWrapper extends EntityProcessor {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private EntityProcessor delegate;
+  private Entity entity;
+  private DataSource datasource;
+  private List<EntityProcessorWrapper> children = new ArrayList<>();
+  private DocBuilder docBuilder;
+  private boolean initialized;
+  private String onError;
+  private Context context;
+  private VariableResolver resolver;
+  private String entityName;
+
+  protected List<Transformer> transformers;
+
+  protected List<Map<String, Object>> rowcache;
+  
+  public EntityProcessorWrapper(EntityProcessor delegate, Entity entity, DocBuilder docBuilder) {
+    this.delegate = delegate;
+    this.entity = entity;
+    this.docBuilder = docBuilder;
+  }
+
+  @Override
+  public void init(Context context) {
+    rowcache = null;
+    this.context = context;
+    resolver = (VariableResolver) context.getVariableResolver();
+    if (entityName == null) {
+      onError = resolver.replaceTokens(context.getEntityAttribute(ON_ERROR));
+      if (onError == null) onError = ABORT;
+      entityName = context.getEntityAttribute(ConfigNameConstants.NAME);
+    }
+    delegate.init(context);
+
+  }
+
+  @SuppressWarnings("unchecked")
+  void loadTransformers() {
+    String transClasses = context.getEntityAttribute(TRANSFORMER);
+
+    if (transClasses == null) {
+      transformers = Collections.EMPTY_LIST;
+      return;
+    }
+
+    String[] transArr = transClasses.split(",");
+    transformers = new ArrayList<Transformer>() {
+      @Override
+      public boolean add(Transformer transformer) {
+        if (docBuilder != null && docBuilder.verboseDebug) {
+          transformer = docBuilder.getDebugLogger().wrapTransformer(transformer);
+        }
+        return super.add(transformer);
+      }
+    };
+    for (String aTransArr : transArr) {
+      String trans = aTransArr.trim();
+      if (trans.startsWith("script:")) {
+        // The script transformer is a potential vulnerability, esp. when the script is
+        // provided from an untrusted source. Check and don't proceed if source is untrusted.
+        checkIfTrusted(trans);
+        String functionName = trans.substring("script:".length());
+        ScriptTransformer scriptTransformer = new ScriptTransformer();
+        scriptTransformer.setFunctionName(functionName);
+        transformers.add(scriptTransformer);
+        continue;
+      }
+      try {
+        Class clazz = DocBuilder.loadClass(trans, context.getSolrCore());
+        if (Transformer.class.isAssignableFrom(clazz)) {
+          transformers.add((Transformer) clazz.newInstance());
+        } else {
+          Method meth = clazz.getMethod(TRANSFORM_ROW, Map.class);
+          transformers.add(new ReflectionTransformer(meth, clazz, trans));
+        }
+      } catch (NoSuchMethodException nsme){
+         String msg = "Transformer :"
+                    + trans
+                    + "does not implement Transformer interface or does not have a transformRow(Map<String.Object> m)method";
+            log.error(msg);
+            wrapAndThrow(SEVERE, nsme,msg);        
+      } catch (Exception e) {
+        log.error("Unable to load Transformer: " + aTransArr, e);
+        wrapAndThrow(SEVERE, e,"Unable to load Transformer: " + trans);
+      }
+    }
+
+  }
+
+  private void checkIfTrusted(String trans) {
+    if (docBuilder != null) {
+      SolrCore core = docBuilder.dataImporter.getCore();
+      boolean trusted = (core != null)? core.getCoreDescriptor().isConfigSetTrusted(): true;
+      if (!trusted) {
+        Exception ex = new SolrException(ErrorCode.UNAUTHORIZED, "The configset for this collection was uploaded "
+            + "without any authentication in place,"
+            + " and this transformer is not available for collections with untrusted configsets. To use this transformer,"
+            + " re-upload the configset after enabling authentication and authorization.");
+        String msg = "Transformer: "
+            + trans
+            + ". " + ex.getMessage();
+        log.error(msg);
+        wrapAndThrow(SEVERE, ex, msg);
+      }
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  static class ReflectionTransformer extends Transformer {
+    final Method meth;
+
+    final Class clazz;
+
+    final String trans;
+
+    final Object o;
+
+    public ReflectionTransformer(Method meth, Class clazz, String trans)
+            throws Exception {
+      this.meth = meth;
+      this.clazz = clazz;
+      this.trans = trans;
+      o = clazz.newInstance();
+    }
+
+    @Override
+    public Object transformRow(Map<String, Object> aRow, Context context) {
+      try {
+        return meth.invoke(o, aRow);
+      } catch (Exception e) {
+        log.warn("method invocation failed on transformer : " + trans, e);
+        throw new DataImportHandlerException(WARN, e);
+      }
+    }
+  }
+
+  protected Map<String, Object> getFromRowCache() {
+    Map<String, Object> r = rowcache.remove(0);
+    if (rowcache.isEmpty())
+      rowcache = null;
+    return r;
+  }
+
+  @SuppressWarnings("unchecked")
+  protected Map<String, Object> applyTransformer(Map<String, Object> row) {
+    if(row == null) return null;
+    if (transformers == null)
+      loadTransformers();
+    if (transformers == Collections.EMPTY_LIST)
+      return row;
+    Map<String, Object> transformedRow = row;
+    List<Map<String, Object>> rows = null;
+    boolean stopTransform = checkStopTransform(row);
+    VariableResolver resolver = (VariableResolver) context.getVariableResolver();
+    for (Transformer t : transformers) {
+      if (stopTransform) break;
+      try {
+        if (rows != null) {
+          List<Map<String, Object>> tmpRows = new ArrayList<>();
+          for (Map<String, Object> map : rows) {
+            resolver.addNamespace(entityName, map);
+            Object o = t.transformRow(map, context);
+            if (o == null)
+              continue;
+            if (o instanceof Map) {
+              Map oMap = (Map) o;
+              stopTransform = checkStopTransform(oMap);
+              tmpRows.add((Map) o);
+            } else if (o instanceof List) {
+              tmpRows.addAll((List) o);
+            } else {
+              log.error("Transformer must return Map<String, Object> or a List<Map<String, Object>>");
+            }
+          }
+          rows = tmpRows;
+        } else {
+          resolver.addNamespace(entityName, transformedRow);
+          Object o = t.transformRow(transformedRow, context);
+          if (o == null)
+            return null;
+          if (o instanceof Map) {
+            Map oMap = (Map) o;
+            stopTransform = checkStopTransform(oMap);
+            transformedRow = (Map) o;
+          } else if (o instanceof List) {
+            rows = (List) o;
+          } else {
+            log.error("Transformer must return Map<String, Object> or a List<Map<String, Object>>");
+          }
+        }
+      } catch (Exception e) {
+        log.warn("transformer threw error", e);
+        if (ABORT.equals(onError)) {
+          wrapAndThrow(SEVERE, e);
+        } else if (SKIP.equals(onError)) {
+          wrapAndThrow(DataImportHandlerException.SKIP, e);
+        }
+        // onError = continue
+      }
+    }
+    if (rows == null) {
+      return transformedRow;
+    } else {
+      rowcache = rows;
+      return getFromRowCache();
+    }
+
+  }
+
+  private boolean checkStopTransform(Map oMap) {
+    return oMap.get("$stopTransform") != null
+            && Boolean.parseBoolean(oMap.get("$stopTransform").toString());
+  }
+
+  @Override
+  public Map<String, Object> nextRow() {
+    if (rowcache != null) {
+      return getFromRowCache();
+    }
+    while (true) {
+      Map<String, Object> arow = null;
+      try {
+        arow = delegate.nextRow();
+      } catch (Exception e) {
+        if(ABORT.equals(onError)){
+          wrapAndThrow(SEVERE, e);
+        } else {
+          //SKIP is not really possible. If this calls the nextRow() again the Entityprocessor would be in an inconisttent state           
+          SolrException.log(log, "Exception in entity : "+ entityName, e);
+          return null;
+        }
+      }
+      if (arow == null) {
+        return null;
+      } else {
+        arow = applyTransformer(arow);
+        if (arow != null) {
+          delegate.postTransform(arow);
+          return arow;
+        }
+      }
+    }
+  }
+
+  @Override
+  public Map<String, Object> nextModifiedRowKey() {
+    Map<String, Object> row = delegate.nextModifiedRowKey();
+    row = applyTransformer(row);
+    rowcache = null;
+    return row;
+  }
+
+  @Override
+  public Map<String, Object> nextDeletedRowKey() {
+    Map<String, Object> row = delegate.nextDeletedRowKey();
+    row = applyTransformer(row);
+    rowcache = null;
+    return row;
+  }
+
+  @Override
+  public Map<String, Object> nextModifiedParentRowKey() {
+    return delegate.nextModifiedParentRowKey();
+  }
+
+  @Override
+  public void destroy() {
+    delegate.destroy();
+  }
+
+  public VariableResolver getVariableResolver() {
+    return (VariableResolver) context.getVariableResolver();
+  }
+
+  public Context getContext() {
+    return context;
+  }
+
+  @Override
+  public void close() {
+    delegate.close();
+  }
+
+  public Entity getEntity() {
+    return entity;
+  }
+
+  public List<EntityProcessorWrapper> getChildren() {
+    return children;
+  }
+
+  public DataSource getDatasource() {
+    return datasource;
+  }
+
+  public void setDatasource(DataSource datasource) {
+    this.datasource = datasource;
+  }
+
+  public boolean isInitialized() {
+    return initialized;
+  }
+
+  public void setInitialized(boolean initialized) {
+    this.initialized = initialized;
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d7c03684/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/Evaluator.java
----------------------------------------------------------------------
diff --git a/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/Evaluator.java b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/Evaluator.java
new file mode 100644
index 0000000..22282b9
--- /dev/null
+++ b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/Evaluator.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.dataimport;
+
+import static org.apache.solr.handler.dataimport.DataImportHandlerException.SEVERE;
+import static org.apache.solr.handler.dataimport.DataImportHandlerException.wrapAndThrow;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Pattern;
+
+/**
+ * <p>
+ * Pluggable functions for resolving variables
+ * </p>
+ * <p>
+ * Implementations of this abstract class must provide a public no-arg constructor.
+ * </p>
+ * <p>
+ * Refer to <a
+ * href="http://wiki.apache.org/solr/DataImportHandler">http://wiki.apache.org/solr/DataImportHandler</a>
+ * for more details.
+ * </p>
+ * <b>This API is experimental and may change in the future.</b>
+ *
+ * @since solr 1.3
+ */
+public abstract class Evaluator {
+
+  /**
+   * Return a String after processing an expression and a {@link VariableResolver}
+   *
+   * @see VariableResolver
+   * @param expression string to be evaluated
+   * @param context instance
+   * @return the value of the given expression evaluated using the resolver
+   */
+  public abstract String evaluate(String expression, Context context);
+
+  /**
+   * Parses a string of expression into separate params. The values are separated by commas. each value will be
+   * translated into one of the following:
+   * &lt;ol&gt;
+   * &lt;li&gt;If it is in single quotes the value will be translated to a String&lt;/li&gt;
+   * &lt;li&gt;If is is not in quotes and is a number a it will be translated into a Double&lt;/li&gt;
+   * &lt;li&gt;else it is a variable which can be resolved and it will be put in as an instance of VariableWrapper&lt;/li&gt;
+   * &lt;/ol&gt;
+   *
+   * @param expression the expression to be parsed
+   * @param vr the VariableResolver instance for resolving variables
+   *
+   * @return a List of objects which can either be a string, number or a variable wrapper
+   */
+  protected List<Object> parseParams(String expression, VariableResolver vr) {
+    List<Object> result = new ArrayList<>();
+    expression = expression.trim();
+    String[] ss = expression.split(",");
+    for (int i = 0; i < ss.length; i++) {
+      ss[i] = ss[i].trim();
+      if (ss[i].startsWith("'")) {//a string param has started
+        StringBuilder sb = new StringBuilder();
+        while (true) {
+          sb.append(ss[i]);
+          if (ss[i].endsWith("'")) break;
+          i++;
+          if (i >= ss.length)
+            throw new DataImportHandlerException(SEVERE, "invalid string at " + ss[i - 1] + " in function params: " + expression);
+          sb.append(",");
+        }
+        String s = sb.substring(1, sb.length() - 1);
+        s = s.replaceAll("\\\\'", "'");
+        result.add(s);
+      } else {
+        if (Character.isDigit(ss[i].charAt(0))) {
+          try {
+            Double doub = Double.parseDouble(ss[i]);
+            result.add(doub);
+          } catch (NumberFormatException e) {
+            if (vr.resolve(ss[i]) == null) {
+              wrapAndThrow(
+                      SEVERE, e, "Invalid number :" + ss[i] +
+                              "in parameters  " + expression);
+            }
+          }
+        } else {
+          result.add(getVariableWrapper(ss[i], vr));
+        }
+      }
+    }
+    return result;
+  }
+
+  protected VariableWrapper getVariableWrapper(String s, VariableResolver vr) {
+    return new VariableWrapper(s,vr);
+  }
+
+  static protected class VariableWrapper {
+    public final String varName;
+    public final VariableResolver vr;
+
+    public VariableWrapper(String s, VariableResolver vr) {
+      this.varName = s;
+      this.vr = vr;
+    }
+
+    public Object resolve() {
+      return vr.resolve(varName);
+    }
+
+    @Override
+    public String toString() {
+      Object o = vr.resolve(varName);
+      return o == null ? null : o.toString();
+    }
+  }
+
+  static Pattern IN_SINGLE_QUOTES = Pattern.compile("^'(.*?)'$");
+  
+  public static final String DATE_FORMAT_EVALUATOR = "formatDate";
+
+  public static final String URL_ENCODE_EVALUATOR = "encodeUrl";
+
+  public static final String ESCAPE_SOLR_QUERY_CHARS = "escapeQueryChars";
+
+  public static final String SQL_ESCAPE_EVALUATOR = "escapeSql";
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d7c03684/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/EventListener.java
----------------------------------------------------------------------
diff --git a/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/EventListener.java b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/EventListener.java
new file mode 100644
index 0000000..0c43a0b
--- /dev/null
+++ b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/EventListener.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.dataimport;
+
+/**
+ * Event listener for DataImportHandler
+ *
+ * <b>This API is experimental and subject to change</b>
+ *
+ * @since solr 1.4
+ */
+public interface EventListener {
+
+  /**
+   * Event callback
+   *
+   * @param ctx the Context in which this event was called
+   */
+  void onEvent(Context ctx);
+
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d7c03684/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/FieldReaderDataSource.java
----------------------------------------------------------------------
diff --git a/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/FieldReaderDataSource.java b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/FieldReaderDataSource.java
new file mode 100644
index 0000000..571c280
--- /dev/null
+++ b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/FieldReaderDataSource.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.dataimport;
+
+import static org.apache.solr.handler.dataimport.DataImportHandlerException.SEVERE;
+import static org.apache.solr.handler.dataimport.DataImportHandlerException.wrapAndThrow;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.lang.invoke.MethodHandles;
+import java.nio.charset.StandardCharsets;
+import java.sql.Blob;
+import java.sql.Clob;
+import java.sql.SQLException;
+import java.util.Properties;
+
+/**
+ * This can be useful for users who have a DB field containing xml and wish to use a nested {@link XPathEntityProcessor}
+ * <p>
+ * The datasouce may be configured as follows
+ * <p>
+ * &lt;datasource name="f1" type="FieldReaderDataSource" /&gt;
+ * <p>
+ * The entity which uses this datasource must keep the url value as the variable name url="field-name"
+ * <p>
+ * The fieldname must be resolvable from {@link VariableResolver}
+ * <p>
+ * This may be used with any {@link EntityProcessor} which uses a {@link DataSource}&lt;{@link Reader}&gt; eg: {@link XPathEntityProcessor}
+ * <p>
+ * Supports String, BLOB, CLOB data types and there is an extra field (in the entity) 'encoding' for BLOB types
+ *
+ * @since 1.4
+ */
+public class FieldReaderDataSource extends DataSource<Reader> {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  protected VariableResolver vr;
+  protected String dataField;
+  private String encoding;
+  private EntityProcessorWrapper entityProcessor;
+
+  @Override
+  public void init(Context context, Properties initProps) {
+    dataField = context.getEntityAttribute("dataField");
+    encoding = context.getEntityAttribute("encoding");
+    entityProcessor = (EntityProcessorWrapper) context.getEntityProcessor();
+    /*no op*/
+  }
+
+  @Override
+  public Reader getData(String query) {
+    Object o = entityProcessor.getVariableResolver().resolve(dataField);
+    if (o == null) {
+       throw new DataImportHandlerException (SEVERE, "No field available for name : " +dataField);
+    }
+    if (o instanceof String) {
+      return new StringReader((String) o);
+    } else if (o instanceof Clob) {
+      Clob clob = (Clob) o;
+      try {
+        //Most of the JDBC drivers have getCharacterStream defined as public
+        // so let us just check it
+        return readCharStream(clob);
+      } catch (Exception e) {
+        log.info("Unable to get data from CLOB");
+        return null;
+
+      }
+
+    } else if (o instanceof Blob) {
+      Blob blob = (Blob) o;
+      try {
+        return getReader(blob);
+      } catch (Exception e) {
+        log.info("Unable to get data from BLOB");
+        return null;
+
+      }
+    } else {
+      return new StringReader(o.toString());
+    }
+
+  }
+
+  static Reader readCharStream(Clob clob) {
+    try {
+      return clob.getCharacterStream();
+    } catch (Exception e) {
+      wrapAndThrow(SEVERE, e,"Unable to get reader from clob");
+      return null;//unreachable
+    }
+  }
+
+  private Reader getReader(Blob blob)
+          throws SQLException, UnsupportedEncodingException {
+    if (encoding == null) {
+      return (new InputStreamReader(blob.getBinaryStream(), StandardCharsets.UTF_8));
+    } else {
+      return (new InputStreamReader(blob.getBinaryStream(), encoding));
+    }
+  }
+
+  @Override
+  public void close() {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d7c03684/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/FieldStreamDataSource.java
----------------------------------------------------------------------
diff --git a/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/FieldStreamDataSource.java b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/FieldStreamDataSource.java
new file mode 100644
index 0000000..ba7ca5d
--- /dev/null
+++ b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/FieldStreamDataSource.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.dataimport;
+
+import static org.apache.solr.handler.dataimport.DataImportHandlerException.SEVERE;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.lang.invoke.MethodHandles;
+import java.sql.Blob;
+import java.sql.SQLException;
+import java.util.Properties;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This can be useful for users who have a DB field containing BLOBs which may be Rich documents
+ * <p>
+ * The datasource may be configured as follows
+ * <p>
+ * &lt;dataSource name="f1" type="FieldStreamDataSource" /&gt;
+ * <p>
+ * The entity which uses this datasource must keep and attribute dataField
+ * <p>
+ * The fieldname must be resolvable from {@link VariableResolver}
+ * <p>
+ * This may be used with any {@link EntityProcessor} which uses a {@link DataSource}&lt;{@link InputStream}&gt; eg: TikaEntityProcessor
+ *
+ * @since 3.1
+ */
+public class FieldStreamDataSource extends DataSource<InputStream> {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  protected VariableResolver vr;
+  protected String dataField;
+  private EntityProcessorWrapper wrapper;
+
+  @Override
+  public void init(Context context, Properties initProps) {
+    dataField = context.getEntityAttribute("dataField");
+    wrapper = (EntityProcessorWrapper) context.getEntityProcessor();
+    /*no op*/
+  }
+
+  @Override
+  public InputStream getData(String query) {
+    Object o = wrapper.getVariableResolver().resolve(dataField);
+    if (o == null) {
+      throw new DataImportHandlerException(SEVERE, "No field available for name : " + dataField);
+    } else if (o instanceof Blob) {
+      Blob blob = (Blob) o;
+      try {
+        return blob.getBinaryStream();
+      } catch (SQLException sqle) {
+        log.info("Unable to get data from BLOB");
+        return null;
+      }
+    } else if (o instanceof byte[]) {
+      byte[] bytes = (byte[]) o;
+      return new ByteArrayInputStream(bytes);
+    } else {
+      throw new RuntimeException("unsupported type : " + o.getClass());
+    } 
+
+  }
+
+  @Override
+  public void close() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d7c03684/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/FileDataSource.java
----------------------------------------------------------------------
diff --git a/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/FileDataSource.java b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/FileDataSource.java
new file mode 100644
index 0000000..920472e
--- /dev/null
+++ b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/FileDataSource.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.dataimport;
+
+import java.io.*;
+import java.lang.invoke.MethodHandles;
+import java.nio.charset.StandardCharsets;
+import java.util.Properties;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.handler.dataimport.DataImportHandlerException.wrapAndThrow;
+import static org.apache.solr.handler.dataimport.DataImportHandlerException.SEVERE;
+
+/**
+ * <p>
+ * A {@link DataSource} which reads from local files
+ * </p>
+ * <p>
+ * The file is read with the default platform encoding. It can be overriden by
+ * specifying the encoding in solrconfig.xml
+ * </p>
+ * <p>
+ * Refer to <a
+ * href="http://wiki.apache.org/solr/DataImportHandler">http://wiki.apache.org/solr/DataImportHandler</a>
+ * for more details.
+ * </p>
+ * <p>
+ * <b>This API is experimental and may change in the future.</b>
+ *
+ * @since solr 1.3
+ */
+public class FileDataSource extends DataSource<Reader> {
+  public static final String BASE_PATH = "basePath";
+
+  /**
+   * The basePath for this data source
+   */
+  protected String basePath;
+
+  /**
+   * The encoding using which the given file should be read
+   */
+  protected String encoding = null;
+
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  @Override
+  public void init(Context context, Properties initProps) {
+    basePath = initProps.getProperty(BASE_PATH);
+    if (initProps.get(URLDataSource.ENCODING) != null)
+      encoding = initProps.getProperty(URLDataSource.ENCODING);
+  }
+
+  /**
+   * <p>
+   * Returns a reader for the given file.
+   * </p>
+   * <p>
+   * If the given file is not absolute, we try to construct an absolute path
+   * using basePath configuration. If that fails, then the relative path is
+   * tried. If file is not found a RuntimeException is thrown.
+   * </p>
+   * <p>
+   * <b>It is the responsibility of the calling method to properly close the
+   * returned Reader</b>
+   * </p>
+   */
+  @Override
+  public Reader getData(String query) {
+    File f = getFile(basePath,query);
+    try {
+      return openStream(f);
+    } catch (Exception e) {
+      wrapAndThrow(SEVERE,e,"Unable to open File : "+f.getAbsolutePath());
+      return null;
+    }
+  }
+
+  static File getFile(String basePath, String query) {
+    try {
+      File file = new File(query);
+
+      // If it's not an absolute path, try relative from basePath. 
+      if (!file.isAbsolute()) {
+        // Resolve and correct basePath.
+        File basePathFile;
+        if (basePath == null) {
+          basePathFile = new File(".").getAbsoluteFile(); 
+          log.warn("FileDataSource.basePath is empty. " +
+              "Resolving to: " + basePathFile.getAbsolutePath());
+        } else {
+          basePathFile = new File(basePath);
+          if (!basePathFile.isAbsolute()) {
+            basePathFile = basePathFile.getAbsoluteFile();
+            log.warn("FileDataSource.basePath is not absolute. Resolving to: "
+                + basePathFile.getAbsolutePath());
+          }
+        }
+
+        file = new File(basePathFile, query).getAbsoluteFile();
+      }
+
+      if (file.isFile() && file.canRead()) {
+        log.debug("Accessing File: " + file.getAbsolutePath());
+        return file;
+      } else {
+        throw new FileNotFoundException("Could not find file: " + query + 
+            " (resolved to: " + file.getAbsolutePath());
+      }
+    } catch (FileNotFoundException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Open a {@link java.io.Reader} for the given file name
+   *
+   * @param file a {@link java.io.File} instance
+   * @return a Reader on the given file
+   * @throws FileNotFoundException if the File does not exist
+   * @throws UnsupportedEncodingException if the encoding is unsupported
+   * @since solr 1.4
+   */
+  protected Reader openStream(File file) throws FileNotFoundException,
+          UnsupportedEncodingException {
+    if (encoding == null) {
+      return new InputStreamReader(new FileInputStream(file), StandardCharsets.UTF_8);
+    } else {
+      return new InputStreamReader(new FileInputStream(file), encoding);
+    }
+  }
+
+  @Override
+  public void close() {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d7c03684/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/FileListEntityProcessor.java
----------------------------------------------------------------------
diff --git a/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/FileListEntityProcessor.java b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/FileListEntityProcessor.java
new file mode 100644
index 0000000..a03354f
--- /dev/null
+++ b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/FileListEntityProcessor.java
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.dataimport;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.solr.util.DateMathParser;
+
+/**
+ * <p>
+ * An {@link EntityProcessor} instance which can stream file names found in a given base
+ * directory matching patterns and returning rows containing file information.
+ * </p>
+ * <p>
+ * It supports querying a give base directory by matching:
+ * <ul>
+ * <li>regular expressions to file names</li>
+ * <li>excluding certain files based on regular expression</li>
+ * <li>last modification date (newer or older than a given date or time)</li>
+ * <li>size (bigger or smaller than size given in bytes)</li>
+ * <li>recursively iterating through sub-directories</li>
+ * </ul>
+ * Its output can be used along with {@link FileDataSource} to read from files in file
+ * systems.
+ * <p>
+ * Refer to <a
+ * href="http://wiki.apache.org/solr/DataImportHandler">http://wiki.apache.org/solr/DataImportHandler</a>
+ * for more details.
+ * </p>
+ * <p>
+ * <b>This API is experimental and may change in the future.</b>
+ *
+ * @since solr 1.3
+ * @see Pattern
+ */
+public class FileListEntityProcessor extends EntityProcessorBase {
+  /**
+   * A regex pattern to identify files given in data-config.xml after resolving any variables 
+   */
+  protected String fileName;
+
+  /**
+   * The baseDir given in data-config.xml after resolving any variables
+   */
+  protected String baseDir;
+
+  /**
+   * A Regex pattern of excluded file names as given in data-config.xml after resolving any variables
+   */
+  protected String excludes;
+
+  /**
+   * The newerThan given in data-config as a {@link java.util.Date}
+   * <p>
+   * <b>Note: </b> This variable is resolved just-in-time in the {@link #nextRow()} method.
+   * </p>
+   */
+  protected Date newerThan;
+
+  /**
+   * The newerThan given in data-config as a {@link java.util.Date}
+   */
+  protected Date olderThan;
+
+  /**
+   * The biggerThan given in data-config as a long value
+   * <p>
+   * <b>Note: </b> This variable is resolved just-in-time in the {@link #nextRow()} method.
+   * </p>
+   */
+  protected long biggerThan = -1;
+
+  /**
+   * The smallerThan given in data-config as a long value
+   * <p>
+   * <b>Note: </b> This variable is resolved just-in-time in the {@link #nextRow()} method.
+   * </p>
+   */
+  protected long smallerThan = -1;
+
+  /**
+   * The recursive given in data-config. Default value is false.
+   */
+  protected boolean recursive = false;
+
+  private Pattern fileNamePattern, excludesPattern;
+
+  @Override
+  public void init(Context context) {
+    super.init(context);
+    fileName = context.getEntityAttribute(FILE_NAME);
+    if (fileName != null) {
+      fileName = context.replaceTokens(fileName);
+      fileNamePattern = Pattern.compile(fileName);
+    }
+    baseDir = context.getEntityAttribute(BASE_DIR);
+    if (baseDir == null)
+      throw new DataImportHandlerException(DataImportHandlerException.SEVERE,
+              "'baseDir' is a required attribute");
+    baseDir = context.replaceTokens(baseDir);
+    File dir = new File(baseDir);
+    if (!dir.isDirectory())
+      throw new DataImportHandlerException(DataImportHandlerException.SEVERE,
+              "'baseDir' value: " + baseDir + " is not a directory");
+
+    String r = context.getEntityAttribute(RECURSIVE);
+    if (r != null)
+      recursive = Boolean.parseBoolean(r);
+    excludes = context.getEntityAttribute(EXCLUDES);
+    if (excludes != null) {
+      excludes = context.replaceTokens(excludes);
+      excludesPattern = Pattern.compile(excludes);
+    }
+  }
+
+  /**
+   * Get the Date object corresponding to the given string.
+   *
+   * @param dateStr the date string. It can be a DateMath string or it may have a evaluator function
+   * @return a Date instance corresponding to the input string
+   */
+  private Date getDate(String dateStr) {
+    if (dateStr == null)
+      return null;
+
+    Matcher m = PLACE_HOLDER_PATTERN.matcher(dateStr);
+    if (m.find()) {
+      Object o = context.resolve(m.group(1));
+      if (o instanceof Date)  return (Date)o;
+      dateStr = (String) o;
+    } else  {
+      dateStr = context.replaceTokens(dateStr);
+    }
+    m = Evaluator.IN_SINGLE_QUOTES.matcher(dateStr);
+    if (m.find()) {
+      String expr = m.group(1);
+      //TODO refactor DateMathParser.parseMath a bit to have a static method for this logic.
+      if (expr.startsWith("NOW")) {
+        expr = expr.substring("NOW".length());
+      }
+      try {
+        // DWS TODO: is this TimeZone the right default for us?  Deserves explanation if so.
+        return new DateMathParser(TimeZone.getDefault()).parseMath(expr);
+      } catch (ParseException exp) {
+        throw new DataImportHandlerException(DataImportHandlerException.SEVERE,
+                "Invalid expression for date", exp);
+      }
+    }
+    try {
+      return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.ROOT).parse(dateStr);
+    } catch (ParseException exp) {
+      throw new DataImportHandlerException(DataImportHandlerException.SEVERE,
+              "Invalid expression for date", exp);
+    }
+  }
+
+  /**
+   * Get the Long value for the given string after resolving any evaluator or variable.
+   *
+   * @param sizeStr the size as a string
+   * @return the Long value corresponding to the given string
+   */
+  private Long getSize(String sizeStr)  {
+    if (sizeStr == null)
+      return null;
+
+    Matcher m = PLACE_HOLDER_PATTERN.matcher(sizeStr);
+    if (m.find()) {
+      Object o = context.resolve(m.group(1));
+      if (o instanceof Number) {
+        Number number = (Number) o;
+        return number.longValue();
+      }
+      sizeStr = (String) o;
+    } else  {
+      sizeStr = context.replaceTokens(sizeStr);
+    }
+
+    return Long.parseLong(sizeStr);
+  }
+
+  @Override
+  public Map<String, Object> nextRow() {
+    if (rowIterator != null)
+      return getNext();
+    List<Map<String, Object>> fileDetails = new ArrayList<>();
+    File dir = new File(baseDir);
+
+    String dateStr = context.getEntityAttribute(NEWER_THAN);
+    newerThan = getDate(dateStr);
+    dateStr = context.getEntityAttribute(OLDER_THAN);
+    olderThan = getDate(dateStr);
+    String biggerThanStr = context.getEntityAttribute(BIGGER_THAN);
+    if (biggerThanStr != null)
+      biggerThan = getSize(biggerThanStr);
+    String smallerThanStr = context.getEntityAttribute(SMALLER_THAN);
+    if (smallerThanStr != null)
+      smallerThan = getSize(smallerThanStr);
+
+    getFolderFiles(dir, fileDetails);
+    rowIterator = fileDetails.iterator();
+    return getNext();
+  }
+
+  private void getFolderFiles(File dir, final List<Map<String, Object>> fileDetails) {
+    // Fetch an array of file objects that pass the filter, however the
+    // returned array is never populated; accept() always returns false.
+    // Rather we make use of the fileDetails array which is populated as
+    // a side affect of the accept method.
+    dir.list(new FilenameFilter() {
+      @Override
+      public boolean accept(File dir, String name) {
+        File fileObj = new File(dir, name);
+        if (fileObj.isDirectory()) {
+          if (recursive) getFolderFiles(fileObj, fileDetails);
+        } else if (fileNamePattern == null) {
+          addDetails(fileDetails, dir, name);
+        } else if (fileNamePattern.matcher(name).find()) {
+          if (excludesPattern != null && excludesPattern.matcher(name).find())
+            return false;
+          addDetails(fileDetails, dir, name);
+        }
+        return false;
+      }
+    });
+  }
+
+  private void addDetails(List<Map<String, Object>> files, File dir, String name) {
+    Map<String, Object> details = new HashMap<>();
+    File aFile = new File(dir, name);
+    if (aFile.isDirectory()) return;
+    long sz = aFile.length();
+    Date lastModified = new Date(aFile.lastModified());
+    if (biggerThan != -1 && sz <= biggerThan)
+      return;
+    if (smallerThan != -1 && sz >= smallerThan)
+      return;
+    if (olderThan != null && lastModified.after(olderThan))
+      return;
+    if (newerThan != null && lastModified.before(newerThan))
+      return;
+    details.put(DIR, dir.getAbsolutePath());
+    details.put(FILE, name);
+    details.put(ABSOLUTE_FILE, aFile.getAbsolutePath());
+    details.put(SIZE, sz);
+    details.put(LAST_MODIFIED, lastModified);
+    files.add(details);
+  }
+
+  public static final Pattern PLACE_HOLDER_PATTERN = Pattern
+          .compile("\\$\\{(.*?)\\}");
+
+  public static final String DIR = "fileDir";
+
+  public static final String FILE = "file";
+
+  public static final String ABSOLUTE_FILE = "fileAbsolutePath";
+
+  public static final String SIZE = "fileSize";
+
+  public static final String LAST_MODIFIED = "fileLastModified";
+
+  public static final String FILE_NAME = "fileName";
+
+  public static final String BASE_DIR = "baseDir";
+
+  public static final String EXCLUDES = "excludes";
+
+  public static final String NEWER_THAN = "newerThan";
+
+  public static final String OLDER_THAN = "olderThan";
+
+  public static final String BIGGER_THAN = "biggerThan";
+
+  public static final String SMALLER_THAN = "smallerThan";
+
+  public static final String RECURSIVE = "recursive";
+
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d7c03684/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/HTMLStripTransformer.java
----------------------------------------------------------------------
diff --git a/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/HTMLStripTransformer.java b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/HTMLStripTransformer.java
new file mode 100644
index 0000000..e62c329
--- /dev/null
+++ b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/HTMLStripTransformer.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.dataimport;
+
+import org.apache.lucene.analysis.charfilter.HTMLStripCharFilter;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.io.BufferedReader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A {@link Transformer} implementation which strip off HTML tags using {@link HTMLStripCharFilter} This is useful
+ * in case you don't need this HTML anyway.
+ *
+ * @see HTMLStripCharFilter
+ * @since solr 1.4
+ */
+public class HTMLStripTransformer extends Transformer {
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public Object transformRow(Map<String, Object> row, Context context) {
+    List<Map<String, String>> fields = context.getAllEntityFields();
+    for (Map<String, String> field : fields) {
+      String col = field.get(DataImporter.COLUMN);
+      String splitHTML = context.replaceTokens(field.get(STRIP_HTML));
+      if (!TRUE.equals(splitHTML))
+        continue;
+      Object tmpVal = row.get(col);
+      if (tmpVal == null)
+        continue;
+
+      if (tmpVal instanceof List) {
+        List<String> inputs = (List<String>) tmpVal;
+        List results = new ArrayList();
+        for (String input : inputs) {
+          if (input == null)
+            continue;
+          Object o = stripHTML(input, col);
+          if (o != null)
+            results.add(o);
+        }
+        row.put(col, results);
+      } else {
+        String value = tmpVal.toString();
+        Object o = stripHTML(value, col);
+        if (o != null)
+          row.put(col, o);
+      }
+    }
+    return row;
+  }
+
+  private Object stripHTML(String value, String column) {
+    StringBuilder out = new StringBuilder();
+    StringReader strReader = new StringReader(value);
+    try {
+      HTMLStripCharFilter html = new HTMLStripCharFilter(strReader.markSupported() ? strReader : new BufferedReader(strReader));
+      char[] cbuf = new char[1024 * 10];
+      while (true) {
+        int count = html.read(cbuf);
+        if (count == -1)
+          break; // end of stream mark is -1
+        if (count > 0)
+          out.append(cbuf, 0, count);
+      }
+      html.close();
+    } catch (IOException e) {
+      throw new DataImportHandlerException(DataImportHandlerException.SEVERE,
+              "Failed stripping HTML for column: " + column, e);
+    }
+    return out.toString();
+  }
+
+  public static final String STRIP_HTML = "stripHTML";
+
+  public static final String TRUE = "true";
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d7c03684/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/JdbcDataSource.java
----------------------------------------------------------------------
diff --git a/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/JdbcDataSource.java b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/JdbcDataSource.java
new file mode 100644
index 0000000..a8eed55
--- /dev/null
+++ b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/JdbcDataSource.java
@@ -0,0 +1,593 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.dataimport;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.solr.handler.dataimport.DataImportHandlerException.wrapAndThrow;
+import static org.apache.solr.handler.dataimport.DataImportHandlerException.SEVERE;
+
+import org.apache.solr.common.SolrException;
+import org.apache.solr.util.CryptoKeys;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.lang.invoke.MethodHandles;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.*;
+import java.util.*;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * <p> A DataSource implementation which can fetch data using JDBC. </p> <p> Refer to <a
+ * href="http://wiki.apache.org/solr/DataImportHandler">http://wiki.apache.org/solr/DataImportHandler</a> for more
+ * details. </p>
+ * <p>
+ * <b>This API is experimental and may change in the future.</b>
+ *
+ * @since solr 1.3
+ */
+public class JdbcDataSource extends
+        DataSource<Iterator<Map<String, Object>>> {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  protected Callable<Connection> factory;
+
+  private long connLastUsed = 0;
+
+  private Connection conn;
+  
+  private ResultSetIterator resultSetIterator;  
+
+  private Map<String, Integer> fieldNameVsType = new HashMap<>();
+
+  private boolean convertType = false;
+
+  private int batchSize = FETCH_SIZE;
+
+  private int maxRows = 0;
+
+  @Override
+  public void init(Context context, Properties initProps) {
+    resolveVariables(context, initProps);
+    initProps = decryptPwd(context, initProps);
+    Object o = initProps.get(CONVERT_TYPE);
+    if (o != null)
+      convertType = Boolean.parseBoolean(o.toString());
+
+    factory = createConnectionFactory(context, initProps);
+
+    String bsz = initProps.getProperty("batchSize");
+    if (bsz != null) {
+      bsz = context.replaceTokens(bsz);
+      try {
+        batchSize = Integer.parseInt(bsz);
+        if (batchSize == -1)
+          batchSize = Integer.MIN_VALUE;
+      } catch (NumberFormatException e) {
+        log.warn("Invalid batch size: " + bsz);
+      }
+    }
+
+    for (Map<String, String> map : context.getAllEntityFields()) {
+      String n = map.get(DataImporter.COLUMN);
+      String t = map.get(DataImporter.TYPE);
+      if ("sint".equals(t) || "integer".equals(t))
+        fieldNameVsType.put(n, Types.INTEGER);
+      else if ("slong".equals(t) || "long".equals(t))
+        fieldNameVsType.put(n, Types.BIGINT);
+      else if ("float".equals(t) || "sfloat".equals(t))
+        fieldNameVsType.put(n, Types.FLOAT);
+      else if ("double".equals(t) || "sdouble".equals(t))
+        fieldNameVsType.put(n, Types.DOUBLE);
+      else if ("date".equals(t))
+        fieldNameVsType.put(n, Types.DATE);
+      else if ("boolean".equals(t))
+        fieldNameVsType.put(n, Types.BOOLEAN);
+      else if ("binary".equals(t))
+        fieldNameVsType.put(n, Types.BLOB);
+      else
+        fieldNameVsType.put(n, Types.VARCHAR);
+    }
+  }
+
+  private Properties decryptPwd(Context context, Properties initProps) {
+    String encryptionKey = initProps.getProperty("encryptKeyFile");
+    if (initProps.getProperty("password") != null && encryptionKey != null) {
+      // this means the password is encrypted and use the file to decode it
+      try {
+        try (Reader fr = new InputStreamReader(new FileInputStream(encryptionKey), UTF_8)) {
+          char[] chars = new char[100];//max 100 char password
+          int len = fr.read(chars);
+          if (len < 6)
+            throw new DataImportHandlerException(SEVERE, "There should be a password of length 6 atleast " + encryptionKey);
+          Properties props = new Properties();
+          props.putAll(initProps);
+          String password = null;
+          try {
+            password = CryptoKeys.decodeAES(initProps.getProperty("password"), new String(chars, 0, len)).trim();
+          } catch (SolrException se) {
+            throw new DataImportHandlerException(SEVERE, "Error decoding password", se.getCause());
+          }
+          props.put("password", password);
+          initProps = props;
+        }
+      } catch (IOException e) {
+        throw new DataImportHandlerException(SEVERE, "Could not load encryptKeyFile  " + encryptionKey);
+      }
+    }
+    return initProps;
+  }
+
+  protected Callable<Connection> createConnectionFactory(final Context context,
+                                       final Properties initProps) {
+//    final VariableResolver resolver = context.getVariableResolver();
+    final String jndiName = initProps.getProperty(JNDI_NAME);
+    final String url = initProps.getProperty(URL);
+    final String driver = initProps.getProperty(DRIVER);
+
+    if (url == null && jndiName == null)
+      throw new DataImportHandlerException(SEVERE,
+              "JDBC URL or JNDI name has to be specified");
+
+    if (driver != null) {
+      try {
+        DocBuilder.loadClass(driver, context.getSolrCore());
+      } catch (ClassNotFoundException e) {
+        wrapAndThrow(SEVERE, e, "Could not load driver: " + driver);
+      }
+    } else {
+      if(jndiName == null){
+        throw new DataImportHandlerException(SEVERE, "One of driver or jndiName must be specified in the data source");
+      }
+    }
+
+    String s = initProps.getProperty("maxRows");
+    if (s != null) {
+      maxRows = Integer.parseInt(s);
+    }
+
+    return factory = new Callable<Connection>() {
+      @Override
+      public Connection call() throws Exception {
+        log.info("Creating a connection for entity "
+                + context.getEntityAttribute(DataImporter.NAME) + " with URL: "
+                + url);
+        long start = System.nanoTime();
+        Connection c = null;
+
+        if (jndiName != null) {
+          c = getFromJndi(initProps, jndiName);
+        } else if (url != null) {
+          try {
+            c = DriverManager.getConnection(url, initProps);
+          } catch (SQLException e) {
+            // DriverManager does not allow you to use a driver which is not loaded through
+            // the class loader of the class which is trying to make the connection.
+            // This is a workaround for cases where the user puts the driver jar in the
+            // solr.home/lib or solr.home/core/lib directories.
+            Driver d = (Driver) DocBuilder.loadClass(driver, context.getSolrCore()).newInstance();
+            c = d.connect(url, initProps);
+          }
+        }
+        if (c != null) {
+          try {
+            initializeConnection(c, initProps);
+          } catch (SQLException e) {
+            try {
+              c.close();
+            } catch (SQLException e2) {
+              log.warn("Exception closing connection during cleanup", e2);
+            }
+
+            throw new DataImportHandlerException(SEVERE, "Exception initializing SQL connection", e);
+          }
+        }
+        log.info("Time taken for getConnection(): "
+            + TimeUnit.MILLISECONDS.convert(System.nanoTime() - start, TimeUnit.NANOSECONDS));
+        return c;
+      }
+
+      private void initializeConnection(Connection c, final Properties initProps)
+          throws SQLException {
+        if (Boolean.parseBoolean(initProps.getProperty("readOnly"))) {
+          c.setReadOnly(true);
+          // Add other sane defaults
+          c.setAutoCommit(true);
+          c.setTransactionIsolation(Connection.TRANSACTION_READ_UNCOMMITTED);
+          c.setHoldability(ResultSet.CLOSE_CURSORS_AT_COMMIT);
+        }
+        if (!Boolean.parseBoolean(initProps.getProperty("autoCommit"))) {
+          c.setAutoCommit(false);
+        }
+        String transactionIsolation = initProps.getProperty("transactionIsolation");
+        if ("TRANSACTION_READ_UNCOMMITTED".equals(transactionIsolation)) {
+          c.setTransactionIsolation(Connection.TRANSACTION_READ_UNCOMMITTED);
+        } else if ("TRANSACTION_READ_COMMITTED".equals(transactionIsolation)) {
+          c.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
+        } else if ("TRANSACTION_REPEATABLE_READ".equals(transactionIsolation)) {
+          c.setTransactionIsolation(Connection.TRANSACTION_REPEATABLE_READ);
+        } else if ("TRANSACTION_SERIALIZABLE".equals(transactionIsolation)) {
+          c.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
+        } else if ("TRANSACTION_NONE".equals(transactionIsolation)) {
+          c.setTransactionIsolation(Connection.TRANSACTION_NONE);
+        }
+        String holdability = initProps.getProperty("holdability");
+        if ("CLOSE_CURSORS_AT_COMMIT".equals(holdability)) {
+          c.setHoldability(ResultSet.CLOSE_CURSORS_AT_COMMIT);
+        } else if ("HOLD_CURSORS_OVER_COMMIT".equals(holdability)) {
+          c.setHoldability(ResultSet.HOLD_CURSORS_OVER_COMMIT);
+        }
+      }
+
+      private Connection getFromJndi(final Properties initProps, final String jndiName) throws NamingException,
+          SQLException {
+
+        Connection c = null;
+        InitialContext ctx =  new InitialContext();
+        Object jndival =  ctx.lookup(jndiName);
+        if (jndival instanceof javax.sql.DataSource) {
+          javax.sql.DataSource dataSource = (javax.sql.DataSource) jndival;
+          String user = (String) initProps.get("user");
+          String pass = (String) initProps.get("password");
+          if(user == null || user.trim().equals("")){
+            c = dataSource.getConnection();
+          } else {
+            c = dataSource.getConnection(user, pass);
+          }
+        } else {
+          throw new DataImportHandlerException(SEVERE,
+                  "the jndi name : '"+jndiName +"' is not a valid javax.sql.DataSource");
+        }
+        return c;
+      }
+    };
+  }
+
+  private void resolveVariables(Context ctx, Properties initProps) {
+    for (Map.Entry<Object, Object> entry : initProps.entrySet()) {
+      if (entry.getValue() != null) {
+        entry.setValue(ctx.replaceTokens((String) entry.getValue()));
+      }
+    }
+  }
+
+  @Override
+  public Iterator<Map<String, Object>> getData(String query) {
+    if (resultSetIterator != null) {
+      resultSetIterator.close();
+      resultSetIterator = null;
+    }
+    resultSetIterator = createResultSetIterator(query);
+    return resultSetIterator.getIterator();
+  }
+
+  protected ResultSetIterator createResultSetIterator(String query) {
+    return new ResultSetIterator(query);
+  }
+
+  private void logError(String msg, Exception e) {
+    log.warn(msg, e);
+  }
+
+  protected List<String> readFieldNames(ResultSetMetaData metaData)
+          throws SQLException {
+    List<String> colNames = new ArrayList<>();
+    int count = metaData.getColumnCount();
+    for (int i = 0; i < count; i++) {
+      colNames.add(metaData.getColumnLabel(i + 1));
+    }
+    return colNames;
+  }
+
+  protected class ResultSetIterator {
+    private ResultSet resultSet;
+
+    private Statement stmt = null;
+
+    private List<String> colNames; 
+   
+    private Iterator<Map<String, Object>> rSetIterator;
+
+    public ResultSetIterator(String query) {
+
+      try {
+        Connection c = getConnection();
+        stmt = createStatement(c, batchSize, maxRows);
+        log.debug("Executing SQL: " + query);
+        long start = System.nanoTime();
+        resultSet = executeStatement(stmt, query);
+        log.trace("Time taken for sql :"
+                + TimeUnit.MILLISECONDS.convert(System.nanoTime() - start, TimeUnit.NANOSECONDS));
+        setColNames(resultSet);
+      } catch (Exception e) {
+        close();
+        wrapAndThrow(SEVERE, e, "Unable to execute query: " + query);
+        return;
+      }
+      if (resultSet == null) {
+        close();
+        rSetIterator = new ArrayList<Map<String, Object>>().iterator();
+        return;
+      }
+
+      rSetIterator = createIterator(convertType, fieldNameVsType);
+    }
+
+    
+    protected Statement createStatement(final Connection c, final int batchSize, final int maxRows)
+        throws SQLException {
+      Statement statement = c.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+      statement.setFetchSize(batchSize);
+      statement.setMaxRows(maxRows);
+      return statement;
+    }
+
+    protected ResultSet executeStatement(Statement statement, String query) throws SQLException {
+      boolean resultSetReturned = statement.execute(query);
+      return getNextResultSet(resultSetReturned, statement);
+    }
+
+    protected ResultSet getNextResultSet(final boolean initialResultSetAvailable, final Statement statement) throws SQLException {
+      boolean resultSetAvailable = initialResultSetAvailable;
+      while (!resultSetAvailable && statement.getUpdateCount() != -1) {
+        resultSetAvailable = statement.getMoreResults();
+      }
+      if (resultSetAvailable) {
+        return statement.getResultSet();
+      }
+      return null;
+    }
+    
+    protected void setColNames(final ResultSet resultSet) throws SQLException {
+      if (resultSet != null) {
+        colNames = readFieldNames(resultSet.getMetaData());
+      } else {
+        colNames = Collections.emptyList();
+      }
+    }
+
+    protected Iterator<Map<String,Object>> createIterator(final boolean convertType,
+        final Map<String,Integer> fieldNameVsType) {
+      return new Iterator<Map<String,Object>>() {
+        @Override
+        public boolean hasNext() {
+          return hasnext();
+        }
+
+        @Override
+        public Map<String,Object> next() {
+          return getARow(convertType, fieldNameVsType);
+        }
+
+        @Override
+        public void remove() {/* do nothing */
+        }
+      };
+    }
+    
+ 
+
+    protected Map<String,Object> getARow(boolean convertType, Map<String,Integer> fieldNameVsType) {
+      if (getResultSet() == null)
+        return null;
+      Map<String, Object> result = new HashMap<>();
+      for (String colName : getColNames()) {
+        try {
+          if (!convertType) {
+            // Use underlying database's type information except for BigDecimal and BigInteger
+            // which cannot be serialized by JavaBin/XML. See SOLR-6165
+            Object value = getResultSet().getObject(colName);
+            if (value instanceof BigDecimal || value instanceof BigInteger) {
+              result.put(colName, value.toString());
+            } else {
+              result.put(colName, value);
+            }
+            continue;
+          }
+
+          Integer type = fieldNameVsType.get(colName);
+          if (type == null)
+            type = Types.VARCHAR;
+          switch (type) {
+            case Types.INTEGER:
+              result.put(colName, getResultSet().getInt(colName));
+              break;
+            case Types.FLOAT:
+              result.put(colName, getResultSet().getFloat(colName));
+              break;
+            case Types.BIGINT:
+              result.put(colName, getResultSet().getLong(colName));
+              break;
+            case Types.DOUBLE:
+              result.put(colName, getResultSet().getDouble(colName));
+              break;
+            case Types.DATE:
+              result.put(colName, getResultSet().getTimestamp(colName));
+              break;
+            case Types.BOOLEAN:
+              result.put(colName, getResultSet().getBoolean(colName));
+              break;
+            case Types.BLOB:
+              result.put(colName, getResultSet().getBytes(colName));
+              break;
+            default:
+              result.put(colName, getResultSet().getString(colName));
+              break;
+          }
+        } catch (SQLException e) {
+          logError("Error reading data ", e);
+          wrapAndThrow(SEVERE, e, "Error reading data from database");
+        }
+      }
+      return result;
+    }
+
+    protected boolean hasnext() {
+      if (getResultSet() == null) {
+        close();
+        return false;
+      }
+      try {
+        if (getResultSet().next()) {
+          return true;
+        } else {
+          closeResultSet();
+          setResultSet(getNextResultSet(getStatement().getMoreResults(), getStatement()));
+          setColNames(getResultSet());
+          return hasnext();
+        }
+      } catch (SQLException e) {
+        close();
+        wrapAndThrow(SEVERE,e);
+        return false;
+      }
+    }
+
+    protected void close() {
+      closeResultSet();
+      try {
+        if (getStatement() != null)
+          getStatement().close();
+      } catch (Exception e) {
+        logError("Exception while closing statement", e);
+      } finally {
+        setStatement(null);
+      }
+    }
+
+    protected void closeResultSet() {
+      try {
+        if (getResultSet() != null) {
+          getResultSet().close();
+        }
+      } catch (Exception e) {
+        logError("Exception while closing result set", e);
+      } finally {
+        setResultSet(null);
+      }
+    }
+
+    protected final Iterator<Map<String,Object>> getIterator() {
+      return rSetIterator;
+    }
+    
+    
+    protected final Statement getStatement() {
+      return stmt;
+    }
+    
+    protected final void setStatement(Statement stmt) {
+      this.stmt = stmt;
+    }
+    
+    protected final ResultSet getResultSet() {
+      return resultSet;
+    }
+    
+    protected final void setResultSet(ResultSet resultSet) {
+      this.resultSet = resultSet;
+    }
+    
+    protected final List<String> getColNames() {
+      return colNames;
+    }
+
+    protected final void setColNames(List<String> colNames) {
+      this.colNames = colNames;
+    }
+    
+  }
+
+  protected Connection getConnection() throws Exception {
+    long currTime = System.nanoTime();
+    if (currTime - connLastUsed > CONN_TIME_OUT) {
+      synchronized (this) {
+        Connection tmpConn = factory.call();
+        closeConnection();
+        connLastUsed = System.nanoTime();
+        return conn = tmpConn;
+      }
+
+    } else {
+      connLastUsed = currTime;
+      return conn;
+    }
+  }
+
+  @Override
+  protected void finalize() throws Throwable {
+    try {
+      if(!isClosed){
+        log.error("JdbcDataSource was not closed prior to finalize(), indicates a bug -- POSSIBLE RESOURCE LEAK!!!");
+        close();
+      }
+    } finally {
+      super.finalize();
+    }
+  }
+
+  private boolean isClosed = false;
+
+  @Override
+  public void close() {
+    if (resultSetIterator != null) {
+      resultSetIterator.close();
+    }
+    try {
+      closeConnection();
+    } finally {
+      isClosed = true;
+    }
+  }
+
+  private void closeConnection()  {
+    try {
+      if (conn != null) {
+        try {
+          //SOLR-2045
+          conn.commit();
+        } catch(Exception ex) {
+          //ignore.
+        }
+        conn.close();
+      }
+    } catch (Exception e) {
+      log.error("Ignoring Error when closing connection", e);
+    }
+  }
+
+  private static final long CONN_TIME_OUT = TimeUnit.NANOSECONDS.convert(10, TimeUnit.SECONDS);
+
+  private static final int FETCH_SIZE = 500;
+
+  public static final String URL = "url";
+
+  public static final String JNDI_NAME = "jndiName";
+
+  public static final String DRIVER = "driver";
+
+  public static final String CONVERT_TYPE = "convertType";
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d7c03684/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/LineEntityProcessor.java
----------------------------------------------------------------------
diff --git a/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/LineEntityProcessor.java b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/LineEntityProcessor.java
new file mode 100644
index 0000000..0940cbd
--- /dev/null
+++ b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/LineEntityProcessor.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.dataimport;
+
+import java.io.*;
+import java.util.*;
+import java.util.regex.Pattern;
+
+import org.apache.commons.io.IOUtils;
+
+
+/**
+ * <p>
+ * An {@link EntityProcessor} instance which can stream lines of text read from a 
+ * datasource. Options allow lines to be explicitly skipped or included in the index.
+ * </p>
+ * <p>
+ * Attribute summary 
+ * <ul>
+ * <li>url is the required location of the input file. If this value is
+ *     relative, it assumed to be relative to baseLoc.</li>
+ * <li>acceptLineRegex is an optional attribute that if present discards any 
+ *     line which does not match the regExp.</li>
+ * <li>skipLineRegex is an optional attribute that is applied after any 
+ *     acceptLineRegex and discards any line which matches this regExp.</li>
+ * </ul>
+ * <p>
+ * Although envisioned for reading lines from a file or url, LineEntityProcessor may also be useful
+ * for dealing with change lists, where each line contains filenames which can be used by subsequent entities
+ * to parse content from those files.
+ * <p>
+ * Refer to <a
+ * href="http://wiki.apache.org/solr/DataImportHandler">http://wiki.apache.org/solr/DataImportHandler</a>
+ * for more details.
+ * </p>
+ * <p>
+ * <b>This API is experimental and may change in the future.</b>
+ *
+ * @since solr 1.4
+ * @see Pattern
+ */
+public class LineEntityProcessor extends EntityProcessorBase {
+  private Pattern acceptLineRegex, skipLineRegex;
+  private String url;
+  private BufferedReader reader;
+
+  /**
+   * Parses each of the entity attributes.
+   */
+  @Override
+  public void init(Context context) {
+    super.init(context);
+    String s;
+
+    // init a regex to locate files from the input we want to index
+    s = context.getResolvedEntityAttribute(ACCEPT_LINE_REGEX);
+    if (s != null) {
+      acceptLineRegex = Pattern.compile(s);
+    }
+
+    // init a regex to locate files from the input to be skipped
+    s = context.getResolvedEntityAttribute(SKIP_LINE_REGEX);
+    if (s != null) {
+      skipLineRegex = Pattern.compile(s);
+    }
+
+    // the FileName is required.
+    url = context.getResolvedEntityAttribute(URL);
+    if (url == null) throw
+      new DataImportHandlerException(DataImportHandlerException.SEVERE,
+           "'"+ URL +"' is a required attribute");
+  }
+
+
+  /**
+   * Reads lines from the url till it finds a lines that matches the
+   * optional acceptLineRegex and does not match the optional skipLineRegex.
+   *
+   * @return A row containing a minimum of one field "rawLine" or null to signal
+   * end of file. The rawLine is the as line as returned by readLine()
+   * from the url. However transformers can be used to create as 
+   * many other fields as required.
+   */
+  @Override
+  public Map<String, Object> nextRow() {
+    if (reader == null) {
+      reader = new BufferedReader((Reader) context.getDataSource().getData(url));
+    }
+
+    String line;
+    
+    while ( true ) { 
+      // read a line from the input file
+      try {
+        line = reader.readLine();
+      }
+      catch (IOException exp) {
+        throw new DataImportHandlerException(DataImportHandlerException.SEVERE,
+             "Problem reading from input", exp);
+      }
+  
+      // end of input
+      if (line == null) {
+        closeResources();
+        return null;
+      }
+
+      // First scan whole line to see if we want it
+      if (acceptLineRegex != null && ! acceptLineRegex.matcher(line).find()) continue;
+      if (skipLineRegex != null &&   skipLineRegex.matcher(line).find()) continue;
+      // Contruct the 'row' of fields
+      Map<String, Object> row = new HashMap<>();
+      row.put("rawLine", line);
+      return row;
+    }
+  }
+  
+  public void closeResources() {
+    if (reader != null) {
+      IOUtils.closeQuietly(reader);
+    }
+    reader= null;
+  }
+
+    @Override
+    public void destroy() {
+      closeResources();
+      super.destroy();
+    }
+
+  /**
+   * Holds the name of entity attribute that will be parsed to obtain
+   * the filename containing the changelist.
+   */
+  public static final String URL = "url";
+
+  /**
+   * Holds the name of entity attribute that will be parsed to obtain
+   * the pattern to be used when checking to see if a line should
+   * be returned.
+   */
+  public static final String ACCEPT_LINE_REGEX = "acceptLineRegex";
+
+  /**
+   * Holds the name of entity attribute that will be parsed to obtain
+   * the pattern to be used when checking to see if a line should
+   * be ignored.
+   */
+  public static final String SKIP_LINE_REGEX = "skipLineRegex";
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d7c03684/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/LogTransformer.java
----------------------------------------------------------------------
diff --git a/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/LogTransformer.java b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/LogTransformer.java
new file mode 100644
index 0000000..66c525e
--- /dev/null
+++ b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/LogTransformer.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.dataimport;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.Map;
+
+/**
+ * A {@link Transformer} implementation which logs messages in a given template format.
+ * <p>
+ * Refer to <a href="http://wiki.apache.org/solr/DataImportHandler">http://wiki.apache.org/solr/DataImportHandler</a>
+ * for more details.
+ * <p>
+ * <b>This API is experimental and may change in the future.</b>
+ *
+ * @since solr 1.4
+ */
+public class LogTransformer extends Transformer {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  @Override
+  public Object transformRow(Map<String, Object> row, Context ctx) {
+    String expr = ctx.getEntityAttribute(LOG_TEMPLATE);
+    String level = ctx.replaceTokens(ctx.getEntityAttribute(LOG_LEVEL));
+
+    if (expr == null || level == null) return row;
+
+    if ("info".equals(level)) {
+      if (log.isInfoEnabled())
+        log.info(ctx.replaceTokens(expr));
+    } else if ("trace".equals(level)) {
+      if (log.isTraceEnabled())
+        log.trace(ctx.replaceTokens(expr));
+    } else if ("warn".equals(level)) {
+      if (log.isWarnEnabled())
+        log.warn(ctx.replaceTokens(expr));
+    } else if ("error".equals(level)) {
+      if (log.isErrorEnabled())
+        log.error(ctx.replaceTokens(expr));
+    } else if ("debug".equals(level)) {
+      if (log.isDebugEnabled())
+        log.debug(ctx.replaceTokens(expr));
+    }
+
+    return row;
+  }
+
+  public static final String LOG_TEMPLATE = "logTemplate";
+  public static final String LOG_LEVEL = "logLevel";
+}