You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2018/11/02 11:33:36 UTC
[15/25] lucene-solr:jira/gradle: Adding dataimporthandler-extras
module
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d7c03684/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/EntityProcessor.java
----------------------------------------------------------------------
diff --git a/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/EntityProcessor.java b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/EntityProcessor.java
new file mode 100644
index 0000000..8cfbed9
--- /dev/null
+++ b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/EntityProcessor.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.dataimport;
+
+import java.util.Map;
+
+/**
+ * <p>
+ * An instance of entity processor serves an entity. It is reused throughout the
+ * import process.
+ * </p>
+ * <p>
+ * Implementations of this abstract class must provide a public no-args constructor.
+ * </p>
+ * <p>
+ * Refer to <a
+ * href="http://wiki.apache.org/solr/DataImportHandler">http://wiki.apache.org/solr/DataImportHandler</a>
+ * for more details.
+ * </p>
+ * <p>
+ * <b>This API is experimental and may change in the future.</b>
+ *
+ * @since solr 1.3
+ */
+public abstract class EntityProcessor {
+
+ /**
+ * This method is called when it starts processing an entity. When it comes
+ * back to the entity it is called again. So it can reset anything at that point.
+ * For a rootmost entity this is called only once for an ingestion. For sub-entities , this
+ * is called multiple once for each row from its parent entity
+ *
+ * @param context The current context
+ */
+ public abstract void init(Context context);
+
+ /**
+ * This method helps streaming the data for each row . The implementation
+ * would fetch as many rows as needed and gives one 'row' at a time. Only this
+ * method is used during a full import
+ *
+ * @return A 'row'. The 'key' for the map is the column name and the 'value'
+ * is the value of that column. If there are no more rows to be
+ * returned, return 'null'
+ */
+ public abstract Map<String, Object> nextRow();
+
+ /**
+ * This is used for delta-import. It gives the pks of the changed rows in this
+ * entity
+ *
+ * @return the pk vs value of all changed rows
+ */
+ public abstract Map<String, Object> nextModifiedRowKey();
+
+ /**
+ * This is used during delta-import. It gives the primary keys of the rows
+ * that are deleted from this entity. If this entity is the root entity, solr
+ * document is deleted. If this is a sub-entity, the Solr document is
+ * considered as 'changed' and will be recreated
+ *
+ * @return the pk vs value of all changed rows
+ */
+ public abstract Map<String, Object> nextDeletedRowKey();
+
+ /**
+ * This is used during delta-import. This gives the primary keys and their
+ * values of all the rows changed in a parent entity due to changes in this
+ * entity.
+ *
+ * @return the pk vs value of all changed rows in the parent entity
+ */
+ public abstract Map<String, Object> nextModifiedParentRowKey();
+
+ /**
+ * Invoked for each entity at the very end of the import to do any needed cleanup tasks.
+ *
+ */
+ public abstract void destroy();
+
+ /**
+ * Invoked after the transformers are invoked. EntityProcessors can add, remove or modify values
+ * added by Transformers in this method.
+ *
+ * @param r The transformed row
+ * @since solr 1.4
+ */
+ public void postTransform(Map<String, Object> r) {
+ }
+
+ /**
+ * Invoked when the Entity processor is destroyed towards the end of import.
+ *
+ * @since solr 1.4
+ */
+ public void close() {
+ //no-op
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d7c03684/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/EntityProcessorBase.java
----------------------------------------------------------------------
diff --git a/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/EntityProcessorBase.java b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/EntityProcessorBase.java
new file mode 100644
index 0000000..8311f36
--- /dev/null
+++ b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/EntityProcessorBase.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.dataimport;
+
+import org.apache.solr.common.SolrException;
+
+import static org.apache.solr.handler.dataimport.DataImportHandlerException.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.*;
+
+/**
+ * <p> Base class for all implementations of {@link EntityProcessor} </p> <p> Most implementations of {@link EntityProcessor}
+ * extend this base class which provides common functionality. </p>
+ * <p>
+ * <b>This API is experimental and subject to change</b>
+ *
+ * @since solr 1.3
+ */
+public class EntityProcessorBase extends EntityProcessor {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ protected boolean isFirstInit = true;
+
+ protected String entityName;
+
+ protected Context context;
+
+ protected Iterator<Map<String, Object>> rowIterator;
+
+ protected String query;
+
+ protected String onError = ABORT;
+
+ protected DIHCacheSupport cacheSupport = null;
+
+ private Zipper zipper;
+
+
+ @Override
+ public void init(Context context) {
+ this.context = context;
+ if (isFirstInit) {
+ firstInit(context);
+ }
+ if(zipper!=null){
+ zipper.onNewParent(context);
+ }else{
+ if(cacheSupport!=null) {
+ cacheSupport.initNewParent(context);
+ }
+ }
+ }
+
+ /**
+ * first time init call. do one-time operations here
+ * it's necessary to call it from the overridden method,
+ * otherwise it throws NPE on accessing zipper from nextRow()
+ */
+ protected void firstInit(Context context) {
+ entityName = context.getEntityAttribute("name");
+ String s = context.getEntityAttribute(ON_ERROR);
+ if (s != null) onError = s;
+
+ zipper = Zipper.createOrNull(context);
+
+ if(zipper==null){
+ initCache(context);
+ }
+ isFirstInit = false;
+ }
+
+ protected void initCache(Context context) {
+ String cacheImplName = context
+ .getResolvedEntityAttribute(DIHCacheSupport.CACHE_IMPL);
+
+ if (cacheImplName != null ) {
+ cacheSupport = new DIHCacheSupport(context, cacheImplName);
+ }
+ }
+
+ @Override
+ public Map<String, Object> nextModifiedRowKey() {
+ return null;
+ }
+
+ @Override
+ public Map<String, Object> nextDeletedRowKey() {
+ return null;
+ }
+
+ @Override
+ public Map<String, Object> nextModifiedParentRowKey() {
+ return null;
+ }
+
+ /**
+ * For a simple implementation, this is the only method that the sub-class should implement. This is intended to
+ * stream rows one-by-one. Return null to signal end of rows
+ *
+ * @return a row where the key is the name of the field and value can be any Object or a Collection of objects. Return
+ * null to signal end of rows
+ */
+ @Override
+ public Map<String, Object> nextRow() {
+ return null;// do not do anything
+ }
+
+ protected Map<String, Object> getNext() {
+ if(zipper!=null){
+ return zipper.supplyNextChild(rowIterator);
+ }else{
+ if(cacheSupport==null) {
+ try {
+ if (rowIterator == null)
+ return null;
+ if (rowIterator.hasNext())
+ return rowIterator.next();
+ query = null;
+ rowIterator = null;
+ return null;
+ } catch (Exception e) {
+ SolrException.log(log, "getNext() failed for query '" + query + "'", e);
+ query = null;
+ rowIterator = null;
+ wrapAndThrow(DataImportHandlerException.WARN, e);
+ return null;
+ }
+ } else {
+ return cacheSupport.getCacheData(context, query, rowIterator);
+ }
+ }
+ }
+
+
+ @Override
+ public void destroy() {
+ query = null;
+ if(cacheSupport!=null){
+ cacheSupport.destroyAll();
+ }
+ cacheSupport = null;
+ }
+
+
+
+ public static final String TRANSFORMER = "transformer";
+
+ public static final String TRANSFORM_ROW = "transformRow";
+
+ public static final String ON_ERROR = "onError";
+
+ public static final String ABORT = "abort";
+
+ public static final String CONTINUE = "continue";
+
+ public static final String SKIP = "skip";
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d7c03684/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/EntityProcessorWrapper.java
----------------------------------------------------------------------
diff --git a/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/EntityProcessorWrapper.java b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/EntityProcessorWrapper.java
new file mode 100644
index 0000000..8a76e11
--- /dev/null
+++ b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/EntityProcessorWrapper.java
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.dataimport;
+
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.handler.dataimport.config.ConfigNameConstants;
+import org.apache.solr.handler.dataimport.config.Entity;
+
+import static org.apache.solr.handler.dataimport.DataImportHandlerException.*;
+import static org.apache.solr.handler.dataimport.EntityProcessorBase.*;
+import static org.apache.solr.handler.dataimport.EntityProcessorBase.SKIP;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A Wrapper over {@link EntityProcessor} instance which performs transforms and handles multi-row outputs correctly.
+ *
+ * @since solr 1.4
+ */
+public class EntityProcessorWrapper extends EntityProcessor {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private EntityProcessor delegate;
+ private Entity entity;
+ private DataSource datasource;
+ private List<EntityProcessorWrapper> children = new ArrayList<>();
+ private DocBuilder docBuilder;
+ private boolean initialized;
+ private String onError;
+ private Context context;
+ private VariableResolver resolver;
+ private String entityName;
+
+ protected List<Transformer> transformers;
+
+ protected List<Map<String, Object>> rowcache;
+
+ public EntityProcessorWrapper(EntityProcessor delegate, Entity entity, DocBuilder docBuilder) {
+ this.delegate = delegate;
+ this.entity = entity;
+ this.docBuilder = docBuilder;
+ }
+
+ @Override
+ public void init(Context context) {
+ rowcache = null;
+ this.context = context;
+ resolver = (VariableResolver) context.getVariableResolver();
+ if (entityName == null) {
+ onError = resolver.replaceTokens(context.getEntityAttribute(ON_ERROR));
+ if (onError == null) onError = ABORT;
+ entityName = context.getEntityAttribute(ConfigNameConstants.NAME);
+ }
+ delegate.init(context);
+
+ }
+
+ @SuppressWarnings("unchecked")
+ void loadTransformers() {
+ String transClasses = context.getEntityAttribute(TRANSFORMER);
+
+ if (transClasses == null) {
+ transformers = Collections.EMPTY_LIST;
+ return;
+ }
+
+ String[] transArr = transClasses.split(",");
+ transformers = new ArrayList<Transformer>() {
+ @Override
+ public boolean add(Transformer transformer) {
+ if (docBuilder != null && docBuilder.verboseDebug) {
+ transformer = docBuilder.getDebugLogger().wrapTransformer(transformer);
+ }
+ return super.add(transformer);
+ }
+ };
+ for (String aTransArr : transArr) {
+ String trans = aTransArr.trim();
+ if (trans.startsWith("script:")) {
+ // The script transformer is a potential vulnerability, esp. when the script is
+ // provided from an untrusted source. Check and don't proceed if source is untrusted.
+ checkIfTrusted(trans);
+ String functionName = trans.substring("script:".length());
+ ScriptTransformer scriptTransformer = new ScriptTransformer();
+ scriptTransformer.setFunctionName(functionName);
+ transformers.add(scriptTransformer);
+ continue;
+ }
+ try {
+ Class clazz = DocBuilder.loadClass(trans, context.getSolrCore());
+ if (Transformer.class.isAssignableFrom(clazz)) {
+ transformers.add((Transformer) clazz.newInstance());
+ } else {
+ Method meth = clazz.getMethod(TRANSFORM_ROW, Map.class);
+ transformers.add(new ReflectionTransformer(meth, clazz, trans));
+ }
+ } catch (NoSuchMethodException nsme){
+ String msg = "Transformer :"
+ + trans
+ + "does not implement Transformer interface or does not have a transformRow(Map<String.Object> m)method";
+ log.error(msg);
+ wrapAndThrow(SEVERE, nsme,msg);
+ } catch (Exception e) {
+ log.error("Unable to load Transformer: " + aTransArr, e);
+ wrapAndThrow(SEVERE, e,"Unable to load Transformer: " + trans);
+ }
+ }
+
+ }
+
+ private void checkIfTrusted(String trans) {
+ if (docBuilder != null) {
+ SolrCore core = docBuilder.dataImporter.getCore();
+ boolean trusted = (core != null)? core.getCoreDescriptor().isConfigSetTrusted(): true;
+ if (!trusted) {
+ Exception ex = new SolrException(ErrorCode.UNAUTHORIZED, "The configset for this collection was uploaded "
+ + "without any authentication in place,"
+ + " and this transformer is not available for collections with untrusted configsets. To use this transformer,"
+ + " re-upload the configset after enabling authentication and authorization.");
+ String msg = "Transformer: "
+ + trans
+ + ". " + ex.getMessage();
+ log.error(msg);
+ wrapAndThrow(SEVERE, ex, msg);
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ static class ReflectionTransformer extends Transformer {
+ final Method meth;
+
+ final Class clazz;
+
+ final String trans;
+
+ final Object o;
+
+ public ReflectionTransformer(Method meth, Class clazz, String trans)
+ throws Exception {
+ this.meth = meth;
+ this.clazz = clazz;
+ this.trans = trans;
+ o = clazz.newInstance();
+ }
+
+ @Override
+ public Object transformRow(Map<String, Object> aRow, Context context) {
+ try {
+ return meth.invoke(o, aRow);
+ } catch (Exception e) {
+ log.warn("method invocation failed on transformer : " + trans, e);
+ throw new DataImportHandlerException(WARN, e);
+ }
+ }
+ }
+
+ protected Map<String, Object> getFromRowCache() {
+ Map<String, Object> r = rowcache.remove(0);
+ if (rowcache.isEmpty())
+ rowcache = null;
+ return r;
+ }
+
+ @SuppressWarnings("unchecked")
+ protected Map<String, Object> applyTransformer(Map<String, Object> row) {
+ if(row == null) return null;
+ if (transformers == null)
+ loadTransformers();
+ if (transformers == Collections.EMPTY_LIST)
+ return row;
+ Map<String, Object> transformedRow = row;
+ List<Map<String, Object>> rows = null;
+ boolean stopTransform = checkStopTransform(row);
+ VariableResolver resolver = (VariableResolver) context.getVariableResolver();
+ for (Transformer t : transformers) {
+ if (stopTransform) break;
+ try {
+ if (rows != null) {
+ List<Map<String, Object>> tmpRows = new ArrayList<>();
+ for (Map<String, Object> map : rows) {
+ resolver.addNamespace(entityName, map);
+ Object o = t.transformRow(map, context);
+ if (o == null)
+ continue;
+ if (o instanceof Map) {
+ Map oMap = (Map) o;
+ stopTransform = checkStopTransform(oMap);
+ tmpRows.add((Map) o);
+ } else if (o instanceof List) {
+ tmpRows.addAll((List) o);
+ } else {
+ log.error("Transformer must return Map<String, Object> or a List<Map<String, Object>>");
+ }
+ }
+ rows = tmpRows;
+ } else {
+ resolver.addNamespace(entityName, transformedRow);
+ Object o = t.transformRow(transformedRow, context);
+ if (o == null)
+ return null;
+ if (o instanceof Map) {
+ Map oMap = (Map) o;
+ stopTransform = checkStopTransform(oMap);
+ transformedRow = (Map) o;
+ } else if (o instanceof List) {
+ rows = (List) o;
+ } else {
+ log.error("Transformer must return Map<String, Object> or a List<Map<String, Object>>");
+ }
+ }
+ } catch (Exception e) {
+ log.warn("transformer threw error", e);
+ if (ABORT.equals(onError)) {
+ wrapAndThrow(SEVERE, e);
+ } else if (SKIP.equals(onError)) {
+ wrapAndThrow(DataImportHandlerException.SKIP, e);
+ }
+ // onError = continue
+ }
+ }
+ if (rows == null) {
+ return transformedRow;
+ } else {
+ rowcache = rows;
+ return getFromRowCache();
+ }
+
+ }
+
+ private boolean checkStopTransform(Map oMap) {
+ return oMap.get("$stopTransform") != null
+ && Boolean.parseBoolean(oMap.get("$stopTransform").toString());
+ }
+
+ @Override
+ public Map<String, Object> nextRow() {
+ if (rowcache != null) {
+ return getFromRowCache();
+ }
+ while (true) {
+ Map<String, Object> arow = null;
+ try {
+ arow = delegate.nextRow();
+ } catch (Exception e) {
+ if(ABORT.equals(onError)){
+ wrapAndThrow(SEVERE, e);
+ } else {
+ //SKIP is not really possible. If this calls the nextRow() again the Entityprocessor would be in an inconisttent state
+ SolrException.log(log, "Exception in entity : "+ entityName, e);
+ return null;
+ }
+ }
+ if (arow == null) {
+ return null;
+ } else {
+ arow = applyTransformer(arow);
+ if (arow != null) {
+ delegate.postTransform(arow);
+ return arow;
+ }
+ }
+ }
+ }
+
+ @Override
+ public Map<String, Object> nextModifiedRowKey() {
+ Map<String, Object> row = delegate.nextModifiedRowKey();
+ row = applyTransformer(row);
+ rowcache = null;
+ return row;
+ }
+
+ @Override
+ public Map<String, Object> nextDeletedRowKey() {
+ Map<String, Object> row = delegate.nextDeletedRowKey();
+ row = applyTransformer(row);
+ rowcache = null;
+ return row;
+ }
+
+ @Override
+ public Map<String, Object> nextModifiedParentRowKey() {
+ return delegate.nextModifiedParentRowKey();
+ }
+
+ @Override
+ public void destroy() {
+ delegate.destroy();
+ }
+
+ public VariableResolver getVariableResolver() {
+ return (VariableResolver) context.getVariableResolver();
+ }
+
+ public Context getContext() {
+ return context;
+ }
+
+ @Override
+ public void close() {
+ delegate.close();
+ }
+
+ public Entity getEntity() {
+ return entity;
+ }
+
+ public List<EntityProcessorWrapper> getChildren() {
+ return children;
+ }
+
+ public DataSource getDatasource() {
+ return datasource;
+ }
+
+ public void setDatasource(DataSource datasource) {
+ this.datasource = datasource;
+ }
+
+ public boolean isInitialized() {
+ return initialized;
+ }
+
+ public void setInitialized(boolean initialized) {
+ this.initialized = initialized;
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d7c03684/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/Evaluator.java
----------------------------------------------------------------------
diff --git a/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/Evaluator.java b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/Evaluator.java
new file mode 100644
index 0000000..22282b9
--- /dev/null
+++ b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/Evaluator.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.dataimport;
+
+import static org.apache.solr.handler.dataimport.DataImportHandlerException.SEVERE;
+import static org.apache.solr.handler.dataimport.DataImportHandlerException.wrapAndThrow;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Pattern;
+
+/**
+ * <p>
+ * Pluggable functions for resolving variables
+ * </p>
+ * <p>
+ * Implementations of this abstract class must provide a public no-arg constructor.
+ * </p>
+ * <p>
+ * Refer to <a
+ * href="http://wiki.apache.org/solr/DataImportHandler">http://wiki.apache.org/solr/DataImportHandler</a>
+ * for more details.
+ * </p>
+ * <b>This API is experimental and may change in the future.</b>
+ *
+ * @since solr 1.3
+ */
+public abstract class Evaluator {
+
+ /**
+ * Return a String after processing an expression and a {@link VariableResolver}
+ *
+ * @see VariableResolver
+ * @param expression string to be evaluated
+ * @param context instance
+ * @return the value of the given expression evaluated using the resolver
+ */
+ public abstract String evaluate(String expression, Context context);
+
+ /**
+ * Parses a string of expression into separate params. The values are separated by commas. each value will be
+ * translated into one of the following:
+ * <ol>
+ * <li>If it is in single quotes the value will be translated to a String</li>
+ * <li>If is is not in quotes and is a number a it will be translated into a Double</li>
+ * <li>else it is a variable which can be resolved and it will be put in as an instance of VariableWrapper</li>
+ * </ol>
+ *
+ * @param expression the expression to be parsed
+ * @param vr the VariableResolver instance for resolving variables
+ *
+ * @return a List of objects which can either be a string, number or a variable wrapper
+ */
+ protected List<Object> parseParams(String expression, VariableResolver vr) {
+ List<Object> result = new ArrayList<>();
+ expression = expression.trim();
+ String[] ss = expression.split(",");
+ for (int i = 0; i < ss.length; i++) {
+ ss[i] = ss[i].trim();
+ if (ss[i].startsWith("'")) {//a string param has started
+ StringBuilder sb = new StringBuilder();
+ while (true) {
+ sb.append(ss[i]);
+ if (ss[i].endsWith("'")) break;
+ i++;
+ if (i >= ss.length)
+ throw new DataImportHandlerException(SEVERE, "invalid string at " + ss[i - 1] + " in function params: " + expression);
+ sb.append(",");
+ }
+ String s = sb.substring(1, sb.length() - 1);
+ s = s.replaceAll("\\\\'", "'");
+ result.add(s);
+ } else {
+ if (Character.isDigit(ss[i].charAt(0))) {
+ try {
+ Double doub = Double.parseDouble(ss[i]);
+ result.add(doub);
+ } catch (NumberFormatException e) {
+ if (vr.resolve(ss[i]) == null) {
+ wrapAndThrow(
+ SEVERE, e, "Invalid number :" + ss[i] +
+ "in parameters " + expression);
+ }
+ }
+ } else {
+ result.add(getVariableWrapper(ss[i], vr));
+ }
+ }
+ }
+ return result;
+ }
+
+ protected VariableWrapper getVariableWrapper(String s, VariableResolver vr) {
+ return new VariableWrapper(s,vr);
+ }
+
+ static protected class VariableWrapper {
+ public final String varName;
+ public final VariableResolver vr;
+
+ public VariableWrapper(String s, VariableResolver vr) {
+ this.varName = s;
+ this.vr = vr;
+ }
+
+ public Object resolve() {
+ return vr.resolve(varName);
+ }
+
+ @Override
+ public String toString() {
+ Object o = vr.resolve(varName);
+ return o == null ? null : o.toString();
+ }
+ }
+
+ static Pattern IN_SINGLE_QUOTES = Pattern.compile("^'(.*?)'$");
+
+ public static final String DATE_FORMAT_EVALUATOR = "formatDate";
+
+ public static final String URL_ENCODE_EVALUATOR = "encodeUrl";
+
+ public static final String ESCAPE_SOLR_QUERY_CHARS = "escapeQueryChars";
+
+ public static final String SQL_ESCAPE_EVALUATOR = "escapeSql";
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d7c03684/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/EventListener.java
----------------------------------------------------------------------
diff --git a/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/EventListener.java b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/EventListener.java
new file mode 100644
index 0000000..0c43a0b
--- /dev/null
+++ b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/EventListener.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.dataimport;
+
+/**
+ * Event listener for DataImportHandler
+ *
+ * <b>This API is experimental and subject to change</b>
+ *
+ * @since solr 1.4
+ */
+public interface EventListener {
+
+ /**
+ * Event callback
+ *
+ * @param ctx the Context in which this event was called
+ */
+ void onEvent(Context ctx);
+
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d7c03684/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/FieldReaderDataSource.java
----------------------------------------------------------------------
diff --git a/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/FieldReaderDataSource.java b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/FieldReaderDataSource.java
new file mode 100644
index 0000000..571c280
--- /dev/null
+++ b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/FieldReaderDataSource.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.dataimport;
+
+import static org.apache.solr.handler.dataimport.DataImportHandlerException.SEVERE;
+import static org.apache.solr.handler.dataimport.DataImportHandlerException.wrapAndThrow;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.lang.invoke.MethodHandles;
+import java.nio.charset.StandardCharsets;
+import java.sql.Blob;
+import java.sql.Clob;
+import java.sql.SQLException;
+import java.util.Properties;
+
+/**
+ * This can be useful for users who have a DB field containing xml and wish to use a nested {@link XPathEntityProcessor}
+ * <p>
+ * The datasouce may be configured as follows
+ * <p>
+ * <datasource name="f1" type="FieldReaderDataSource" />
+ * <p>
+ * The entity which uses this datasource must keep the url value as the variable name url="field-name"
+ * <p>
+ * The fieldname must be resolvable from {@link VariableResolver}
+ * <p>
+ * This may be used with any {@link EntityProcessor} which uses a {@link DataSource}<{@link Reader}> eg: {@link XPathEntityProcessor}
+ * <p>
+ * Supports String, BLOB, CLOB data types and there is an extra field (in the entity) 'encoding' for BLOB types
+ *
+ * @since 1.4
+ */
+public class FieldReaderDataSource extends DataSource<Reader> {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ protected VariableResolver vr;
+ protected String dataField;
+ private String encoding;
+ private EntityProcessorWrapper entityProcessor;
+
+ @Override
+ public void init(Context context, Properties initProps) {
+ dataField = context.getEntityAttribute("dataField");
+ encoding = context.getEntityAttribute("encoding");
+ entityProcessor = (EntityProcessorWrapper) context.getEntityProcessor();
+ /*no op*/
+ }
+
+ @Override
+ public Reader getData(String query) {
+ Object o = entityProcessor.getVariableResolver().resolve(dataField);
+ if (o == null) {
+ throw new DataImportHandlerException (SEVERE, "No field available for name : " +dataField);
+ }
+ if (o instanceof String) {
+ return new StringReader((String) o);
+ } else if (o instanceof Clob) {
+ Clob clob = (Clob) o;
+ try {
+ //Most of the JDBC drivers have getCharacterStream defined as public
+ // so let us just check it
+ return readCharStream(clob);
+ } catch (Exception e) {
+ log.info("Unable to get data from CLOB");
+ return null;
+
+ }
+
+ } else if (o instanceof Blob) {
+ Blob blob = (Blob) o;
+ try {
+ return getReader(blob);
+ } catch (Exception e) {
+ log.info("Unable to get data from BLOB");
+ return null;
+
+ }
+ } else {
+ return new StringReader(o.toString());
+ }
+
+ }
+
+ static Reader readCharStream(Clob clob) {
+ try {
+ return clob.getCharacterStream();
+ } catch (Exception e) {
+ wrapAndThrow(SEVERE, e,"Unable to get reader from clob");
+ return null;//unreachable
+ }
+ }
+
+ private Reader getReader(Blob blob)
+ throws SQLException, UnsupportedEncodingException {
+ if (encoding == null) {
+ return (new InputStreamReader(blob.getBinaryStream(), StandardCharsets.UTF_8));
+ } else {
+ return (new InputStreamReader(blob.getBinaryStream(), encoding));
+ }
+ }
+
+ @Override
+ public void close() {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d7c03684/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/FieldStreamDataSource.java
----------------------------------------------------------------------
diff --git a/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/FieldStreamDataSource.java b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/FieldStreamDataSource.java
new file mode 100644
index 0000000..ba7ca5d
--- /dev/null
+++ b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/FieldStreamDataSource.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.dataimport;
+
+import static org.apache.solr.handler.dataimport.DataImportHandlerException.SEVERE;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.lang.invoke.MethodHandles;
+import java.sql.Blob;
+import java.sql.SQLException;
+import java.util.Properties;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This can be useful for users who have a DB field containing BLOBs which may be Rich documents
+ * <p>
+ * The datasource may be configured as follows
+ * <p>
+ * <dataSource name="f1" type="FieldStreamDataSource" />
+ * <p>
+ * The entity which uses this datasource must keep and attribute dataField
+ * <p>
+ * The fieldname must be resolvable from {@link VariableResolver}
+ * <p>
+ * This may be used with any {@link EntityProcessor} which uses a {@link DataSource}<{@link InputStream}> eg: TikaEntityProcessor
+ *
+ * @since 3.1
+ */
+public class FieldStreamDataSource extends DataSource<InputStream> {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ protected VariableResolver vr;
+ protected String dataField;
+ private EntityProcessorWrapper wrapper;
+
+ @Override
+ public void init(Context context, Properties initProps) {
+ dataField = context.getEntityAttribute("dataField");
+ wrapper = (EntityProcessorWrapper) context.getEntityProcessor();
+ /*no op*/
+ }
+
+ @Override
+ public InputStream getData(String query) {
+ Object o = wrapper.getVariableResolver().resolve(dataField);
+ if (o == null) {
+ throw new DataImportHandlerException(SEVERE, "No field available for name : " + dataField);
+ } else if (o instanceof Blob) {
+ Blob blob = (Blob) o;
+ try {
+ return blob.getBinaryStream();
+ } catch (SQLException sqle) {
+ log.info("Unable to get data from BLOB");
+ return null;
+ }
+ } else if (o instanceof byte[]) {
+ byte[] bytes = (byte[]) o;
+ return new ByteArrayInputStream(bytes);
+ } else {
+ throw new RuntimeException("unsupported type : " + o.getClass());
+ }
+
+ }
+
+ @Override
+ public void close() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d7c03684/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/FileDataSource.java
----------------------------------------------------------------------
diff --git a/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/FileDataSource.java b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/FileDataSource.java
new file mode 100644
index 0000000..920472e
--- /dev/null
+++ b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/FileDataSource.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.dataimport;
+
+import java.io.*;
+import java.lang.invoke.MethodHandles;
+import java.nio.charset.StandardCharsets;
+import java.util.Properties;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.handler.dataimport.DataImportHandlerException.wrapAndThrow;
+import static org.apache.solr.handler.dataimport.DataImportHandlerException.SEVERE;
+
+/**
+ * <p>
+ * A {@link DataSource} which reads from local files
+ * </p>
+ * <p>
+ * The file is read with the default platform encoding. It can be overriden by
+ * specifying the encoding in solrconfig.xml
+ * </p>
+ * <p>
+ * Refer to <a
+ * href="http://wiki.apache.org/solr/DataImportHandler">http://wiki.apache.org/solr/DataImportHandler</a>
+ * for more details.
+ * </p>
+ * <p>
+ * <b>This API is experimental and may change in the future.</b>
+ *
+ * @since solr 1.3
+ */
+public class FileDataSource extends DataSource<Reader> {
+ public static final String BASE_PATH = "basePath";
+
+ /**
+ * The basePath for this data source
+ */
+ protected String basePath;
+
+ /**
+ * The encoding using which the given file should be read
+ */
+ protected String encoding = null;
+
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ @Override
+ public void init(Context context, Properties initProps) {
+ basePath = initProps.getProperty(BASE_PATH);
+ if (initProps.get(URLDataSource.ENCODING) != null)
+ encoding = initProps.getProperty(URLDataSource.ENCODING);
+ }
+
+ /**
+ * <p>
+ * Returns a reader for the given file.
+ * </p>
+ * <p>
+ * If the given file is not absolute, we try to construct an absolute path
+ * using basePath configuration. If that fails, then the relative path is
+ * tried. If file is not found a RuntimeException is thrown.
+ * </p>
+ * <p>
+ * <b>It is the responsibility of the calling method to properly close the
+ * returned Reader</b>
+ * </p>
+ */
+ @Override
+ public Reader getData(String query) {
+ File f = getFile(basePath,query);
+ try {
+ return openStream(f);
+ } catch (Exception e) {
+ wrapAndThrow(SEVERE,e,"Unable to open File : "+f.getAbsolutePath());
+ return null;
+ }
+ }
+
+ static File getFile(String basePath, String query) {
+ try {
+ File file = new File(query);
+
+ // If it's not an absolute path, try relative from basePath.
+ if (!file.isAbsolute()) {
+ // Resolve and correct basePath.
+ File basePathFile;
+ if (basePath == null) {
+ basePathFile = new File(".").getAbsoluteFile();
+ log.warn("FileDataSource.basePath is empty. " +
+ "Resolving to: " + basePathFile.getAbsolutePath());
+ } else {
+ basePathFile = new File(basePath);
+ if (!basePathFile.isAbsolute()) {
+ basePathFile = basePathFile.getAbsoluteFile();
+ log.warn("FileDataSource.basePath is not absolute. Resolving to: "
+ + basePathFile.getAbsolutePath());
+ }
+ }
+
+ file = new File(basePathFile, query).getAbsoluteFile();
+ }
+
+ if (file.isFile() && file.canRead()) {
+ log.debug("Accessing File: " + file.getAbsolutePath());
+ return file;
+ } else {
+ throw new FileNotFoundException("Could not find file: " + query +
+ " (resolved to: " + file.getAbsolutePath());
+ }
+ } catch (FileNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Open a {@link java.io.Reader} for the given file name
+ *
+ * @param file a {@link java.io.File} instance
+ * @return a Reader on the given file
+ * @throws FileNotFoundException if the File does not exist
+ * @throws UnsupportedEncodingException if the encoding is unsupported
+ * @since solr 1.4
+ */
+ protected Reader openStream(File file) throws FileNotFoundException,
+ UnsupportedEncodingException {
+ if (encoding == null) {
+ return new InputStreamReader(new FileInputStream(file), StandardCharsets.UTF_8);
+ } else {
+ return new InputStreamReader(new FileInputStream(file), encoding);
+ }
+ }
+
+ @Override
+ public void close() {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d7c03684/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/FileListEntityProcessor.java
----------------------------------------------------------------------
diff --git a/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/FileListEntityProcessor.java b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/FileListEntityProcessor.java
new file mode 100644
index 0000000..a03354f
--- /dev/null
+++ b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/FileListEntityProcessor.java
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.dataimport;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.solr.util.DateMathParser;
+
+/**
+ * <p>
+ * An {@link EntityProcessor} instance which can stream file names found in a given base
+ * directory matching patterns and returning rows containing file information.
+ * </p>
+ * <p>
+ * It supports querying a give base directory by matching:
+ * <ul>
+ * <li>regular expressions to file names</li>
+ * <li>excluding certain files based on regular expression</li>
+ * <li>last modification date (newer or older than a given date or time)</li>
+ * <li>size (bigger or smaller than size given in bytes)</li>
+ * <li>recursively iterating through sub-directories</li>
+ * </ul>
+ * Its output can be used along with {@link FileDataSource} to read from files in file
+ * systems.
+ * <p>
+ * Refer to <a
+ * href="http://wiki.apache.org/solr/DataImportHandler">http://wiki.apache.org/solr/DataImportHandler</a>
+ * for more details.
+ * </p>
+ * <p>
+ * <b>This API is experimental and may change in the future.</b>
+ *
+ * @since solr 1.3
+ * @see Pattern
+ */
+public class FileListEntityProcessor extends EntityProcessorBase {
+ /**
+ * A regex pattern to identify files given in data-config.xml after resolving any variables
+ */
+ protected String fileName;
+
+ /**
+ * The baseDir given in data-config.xml after resolving any variables
+ */
+ protected String baseDir;
+
+ /**
+ * A Regex pattern of excluded file names as given in data-config.xml after resolving any variables
+ */
+ protected String excludes;
+
+ /**
+ * The newerThan given in data-config as a {@link java.util.Date}
+ * <p>
+ * <b>Note: </b> This variable is resolved just-in-time in the {@link #nextRow()} method.
+ * </p>
+ */
+ protected Date newerThan;
+
+ /**
+ * The newerThan given in data-config as a {@link java.util.Date}
+ */
+ protected Date olderThan;
+
+ /**
+ * The biggerThan given in data-config as a long value
+ * <p>
+ * <b>Note: </b> This variable is resolved just-in-time in the {@link #nextRow()} method.
+ * </p>
+ */
+ protected long biggerThan = -1;
+
+ /**
+ * The smallerThan given in data-config as a long value
+ * <p>
+ * <b>Note: </b> This variable is resolved just-in-time in the {@link #nextRow()} method.
+ * </p>
+ */
+ protected long smallerThan = -1;
+
+ /**
+ * The recursive given in data-config. Default value is false.
+ */
+ protected boolean recursive = false;
+
+ private Pattern fileNamePattern, excludesPattern;
+
+ @Override
+ public void init(Context context) {
+ super.init(context);
+ fileName = context.getEntityAttribute(FILE_NAME);
+ if (fileName != null) {
+ fileName = context.replaceTokens(fileName);
+ fileNamePattern = Pattern.compile(fileName);
+ }
+ baseDir = context.getEntityAttribute(BASE_DIR);
+ if (baseDir == null)
+ throw new DataImportHandlerException(DataImportHandlerException.SEVERE,
+ "'baseDir' is a required attribute");
+ baseDir = context.replaceTokens(baseDir);
+ File dir = new File(baseDir);
+ if (!dir.isDirectory())
+ throw new DataImportHandlerException(DataImportHandlerException.SEVERE,
+ "'baseDir' value: " + baseDir + " is not a directory");
+
+ String r = context.getEntityAttribute(RECURSIVE);
+ if (r != null)
+ recursive = Boolean.parseBoolean(r);
+ excludes = context.getEntityAttribute(EXCLUDES);
+ if (excludes != null) {
+ excludes = context.replaceTokens(excludes);
+ excludesPattern = Pattern.compile(excludes);
+ }
+ }
+
+ /**
+ * Get the Date object corresponding to the given string.
+ *
+ * @param dateStr the date string. It can be a DateMath string or it may have a evaluator function
+ * @return a Date instance corresponding to the input string
+ */
+ private Date getDate(String dateStr) {
+ if (dateStr == null)
+ return null;
+
+ Matcher m = PLACE_HOLDER_PATTERN.matcher(dateStr);
+ if (m.find()) {
+ Object o = context.resolve(m.group(1));
+ if (o instanceof Date) return (Date)o;
+ dateStr = (String) o;
+ } else {
+ dateStr = context.replaceTokens(dateStr);
+ }
+ m = Evaluator.IN_SINGLE_QUOTES.matcher(dateStr);
+ if (m.find()) {
+ String expr = m.group(1);
+ //TODO refactor DateMathParser.parseMath a bit to have a static method for this logic.
+ if (expr.startsWith("NOW")) {
+ expr = expr.substring("NOW".length());
+ }
+ try {
+ // DWS TODO: is this TimeZone the right default for us? Deserves explanation if so.
+ return new DateMathParser(TimeZone.getDefault()).parseMath(expr);
+ } catch (ParseException exp) {
+ throw new DataImportHandlerException(DataImportHandlerException.SEVERE,
+ "Invalid expression for date", exp);
+ }
+ }
+ try {
+ return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.ROOT).parse(dateStr);
+ } catch (ParseException exp) {
+ throw new DataImportHandlerException(DataImportHandlerException.SEVERE,
+ "Invalid expression for date", exp);
+ }
+ }
+
+ /**
+ * Get the Long value for the given string after resolving any evaluator or variable.
+ *
+ * @param sizeStr the size as a string
+ * @return the Long value corresponding to the given string
+ */
+ private Long getSize(String sizeStr) {
+ if (sizeStr == null)
+ return null;
+
+ Matcher m = PLACE_HOLDER_PATTERN.matcher(sizeStr);
+ if (m.find()) {
+ Object o = context.resolve(m.group(1));
+ if (o instanceof Number) {
+ Number number = (Number) o;
+ return number.longValue();
+ }
+ sizeStr = (String) o;
+ } else {
+ sizeStr = context.replaceTokens(sizeStr);
+ }
+
+ return Long.parseLong(sizeStr);
+ }
+
+ @Override
+ public Map<String, Object> nextRow() {
+ if (rowIterator != null)
+ return getNext();
+ List<Map<String, Object>> fileDetails = new ArrayList<>();
+ File dir = new File(baseDir);
+
+ String dateStr = context.getEntityAttribute(NEWER_THAN);
+ newerThan = getDate(dateStr);
+ dateStr = context.getEntityAttribute(OLDER_THAN);
+ olderThan = getDate(dateStr);
+ String biggerThanStr = context.getEntityAttribute(BIGGER_THAN);
+ if (biggerThanStr != null)
+ biggerThan = getSize(biggerThanStr);
+ String smallerThanStr = context.getEntityAttribute(SMALLER_THAN);
+ if (smallerThanStr != null)
+ smallerThan = getSize(smallerThanStr);
+
+ getFolderFiles(dir, fileDetails);
+ rowIterator = fileDetails.iterator();
+ return getNext();
+ }
+
+ private void getFolderFiles(File dir, final List<Map<String, Object>> fileDetails) {
+ // Fetch an array of file objects that pass the filter, however the
+ // returned array is never populated; accept() always returns false.
+ // Rather we make use of the fileDetails array which is populated as
+ // a side affect of the accept method.
+ dir.list(new FilenameFilter() {
+ @Override
+ public boolean accept(File dir, String name) {
+ File fileObj = new File(dir, name);
+ if (fileObj.isDirectory()) {
+ if (recursive) getFolderFiles(fileObj, fileDetails);
+ } else if (fileNamePattern == null) {
+ addDetails(fileDetails, dir, name);
+ } else if (fileNamePattern.matcher(name).find()) {
+ if (excludesPattern != null && excludesPattern.matcher(name).find())
+ return false;
+ addDetails(fileDetails, dir, name);
+ }
+ return false;
+ }
+ });
+ }
+
+ private void addDetails(List<Map<String, Object>> files, File dir, String name) {
+ Map<String, Object> details = new HashMap<>();
+ File aFile = new File(dir, name);
+ if (aFile.isDirectory()) return;
+ long sz = aFile.length();
+ Date lastModified = new Date(aFile.lastModified());
+ if (biggerThan != -1 && sz <= biggerThan)
+ return;
+ if (smallerThan != -1 && sz >= smallerThan)
+ return;
+ if (olderThan != null && lastModified.after(olderThan))
+ return;
+ if (newerThan != null && lastModified.before(newerThan))
+ return;
+ details.put(DIR, dir.getAbsolutePath());
+ details.put(FILE, name);
+ details.put(ABSOLUTE_FILE, aFile.getAbsolutePath());
+ details.put(SIZE, sz);
+ details.put(LAST_MODIFIED, lastModified);
+ files.add(details);
+ }
+
+ public static final Pattern PLACE_HOLDER_PATTERN = Pattern
+ .compile("\\$\\{(.*?)\\}");
+
+ public static final String DIR = "fileDir";
+
+ public static final String FILE = "file";
+
+ public static final String ABSOLUTE_FILE = "fileAbsolutePath";
+
+ public static final String SIZE = "fileSize";
+
+ public static final String LAST_MODIFIED = "fileLastModified";
+
+ public static final String FILE_NAME = "fileName";
+
+ public static final String BASE_DIR = "baseDir";
+
+ public static final String EXCLUDES = "excludes";
+
+ public static final String NEWER_THAN = "newerThan";
+
+ public static final String OLDER_THAN = "olderThan";
+
+ public static final String BIGGER_THAN = "biggerThan";
+
+ public static final String SMALLER_THAN = "smallerThan";
+
+ public static final String RECURSIVE = "recursive";
+
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d7c03684/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/HTMLStripTransformer.java
----------------------------------------------------------------------
diff --git a/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/HTMLStripTransformer.java b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/HTMLStripTransformer.java
new file mode 100644
index 0000000..e62c329
--- /dev/null
+++ b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/HTMLStripTransformer.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.dataimport;
+
+import org.apache.lucene.analysis.charfilter.HTMLStripCharFilter;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.io.BufferedReader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A {@link Transformer} implementation which strip off HTML tags using {@link HTMLStripCharFilter} This is useful
+ * in case you don't need this HTML anyway.
+ *
+ * @see HTMLStripCharFilter
+ * @since solr 1.4
+ */
+public class HTMLStripTransformer extends Transformer {
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public Object transformRow(Map<String, Object> row, Context context) {
+ List<Map<String, String>> fields = context.getAllEntityFields();
+ for (Map<String, String> field : fields) {
+ String col = field.get(DataImporter.COLUMN);
+ String splitHTML = context.replaceTokens(field.get(STRIP_HTML));
+ if (!TRUE.equals(splitHTML))
+ continue;
+ Object tmpVal = row.get(col);
+ if (tmpVal == null)
+ continue;
+
+ if (tmpVal instanceof List) {
+ List<String> inputs = (List<String>) tmpVal;
+ List results = new ArrayList();
+ for (String input : inputs) {
+ if (input == null)
+ continue;
+ Object o = stripHTML(input, col);
+ if (o != null)
+ results.add(o);
+ }
+ row.put(col, results);
+ } else {
+ String value = tmpVal.toString();
+ Object o = stripHTML(value, col);
+ if (o != null)
+ row.put(col, o);
+ }
+ }
+ return row;
+ }
+
+ private Object stripHTML(String value, String column) {
+ StringBuilder out = new StringBuilder();
+ StringReader strReader = new StringReader(value);
+ try {
+ HTMLStripCharFilter html = new HTMLStripCharFilter(strReader.markSupported() ? strReader : new BufferedReader(strReader));
+ char[] cbuf = new char[1024 * 10];
+ while (true) {
+ int count = html.read(cbuf);
+ if (count == -1)
+ break; // end of stream mark is -1
+ if (count > 0)
+ out.append(cbuf, 0, count);
+ }
+ html.close();
+ } catch (IOException e) {
+ throw new DataImportHandlerException(DataImportHandlerException.SEVERE,
+ "Failed stripping HTML for column: " + column, e);
+ }
+ return out.toString();
+ }
+
+ public static final String STRIP_HTML = "stripHTML";
+
+ public static final String TRUE = "true";
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d7c03684/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/JdbcDataSource.java
----------------------------------------------------------------------
diff --git a/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/JdbcDataSource.java b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/JdbcDataSource.java
new file mode 100644
index 0000000..a8eed55
--- /dev/null
+++ b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/JdbcDataSource.java
@@ -0,0 +1,593 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.dataimport;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.solr.handler.dataimport.DataImportHandlerException.wrapAndThrow;
+import static org.apache.solr.handler.dataimport.DataImportHandlerException.SEVERE;
+
+import org.apache.solr.common.SolrException;
+import org.apache.solr.util.CryptoKeys;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.lang.invoke.MethodHandles;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.*;
+import java.util.*;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * <p> A DataSource implementation which can fetch data using JDBC. </p> <p> Refer to <a
+ * href="http://wiki.apache.org/solr/DataImportHandler">http://wiki.apache.org/solr/DataImportHandler</a> for more
+ * details. </p>
+ * <p>
+ * <b>This API is experimental and may change in the future.</b>
+ *
+ * @since solr 1.3
+ */
+public class JdbcDataSource extends
+ DataSource<Iterator<Map<String, Object>>> {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ protected Callable<Connection> factory;
+
+ private long connLastUsed = 0;
+
+ private Connection conn;
+
+ private ResultSetIterator resultSetIterator;
+
+ private Map<String, Integer> fieldNameVsType = new HashMap<>();
+
+ private boolean convertType = false;
+
+ private int batchSize = FETCH_SIZE;
+
+ private int maxRows = 0;
+
+ @Override
+ public void init(Context context, Properties initProps) {
+ resolveVariables(context, initProps);
+ initProps = decryptPwd(context, initProps);
+ Object o = initProps.get(CONVERT_TYPE);
+ if (o != null)
+ convertType = Boolean.parseBoolean(o.toString());
+
+ factory = createConnectionFactory(context, initProps);
+
+ String bsz = initProps.getProperty("batchSize");
+ if (bsz != null) {
+ bsz = context.replaceTokens(bsz);
+ try {
+ batchSize = Integer.parseInt(bsz);
+ if (batchSize == -1)
+ batchSize = Integer.MIN_VALUE;
+ } catch (NumberFormatException e) {
+ log.warn("Invalid batch size: " + bsz);
+ }
+ }
+
+ for (Map<String, String> map : context.getAllEntityFields()) {
+ String n = map.get(DataImporter.COLUMN);
+ String t = map.get(DataImporter.TYPE);
+ if ("sint".equals(t) || "integer".equals(t))
+ fieldNameVsType.put(n, Types.INTEGER);
+ else if ("slong".equals(t) || "long".equals(t))
+ fieldNameVsType.put(n, Types.BIGINT);
+ else if ("float".equals(t) || "sfloat".equals(t))
+ fieldNameVsType.put(n, Types.FLOAT);
+ else if ("double".equals(t) || "sdouble".equals(t))
+ fieldNameVsType.put(n, Types.DOUBLE);
+ else if ("date".equals(t))
+ fieldNameVsType.put(n, Types.DATE);
+ else if ("boolean".equals(t))
+ fieldNameVsType.put(n, Types.BOOLEAN);
+ else if ("binary".equals(t))
+ fieldNameVsType.put(n, Types.BLOB);
+ else
+ fieldNameVsType.put(n, Types.VARCHAR);
+ }
+ }
+
+ private Properties decryptPwd(Context context, Properties initProps) {
+ String encryptionKey = initProps.getProperty("encryptKeyFile");
+ if (initProps.getProperty("password") != null && encryptionKey != null) {
+ // this means the password is encrypted and use the file to decode it
+ try {
+ try (Reader fr = new InputStreamReader(new FileInputStream(encryptionKey), UTF_8)) {
+ char[] chars = new char[100];//max 100 char password
+ int len = fr.read(chars);
+ if (len < 6)
+ throw new DataImportHandlerException(SEVERE, "There should be a password of length 6 atleast " + encryptionKey);
+ Properties props = new Properties();
+ props.putAll(initProps);
+ String password = null;
+ try {
+ password = CryptoKeys.decodeAES(initProps.getProperty("password"), new String(chars, 0, len)).trim();
+ } catch (SolrException se) {
+ throw new DataImportHandlerException(SEVERE, "Error decoding password", se.getCause());
+ }
+ props.put("password", password);
+ initProps = props;
+ }
+ } catch (IOException e) {
+ throw new DataImportHandlerException(SEVERE, "Could not load encryptKeyFile " + encryptionKey);
+ }
+ }
+ return initProps;
+ }
+
+ protected Callable<Connection> createConnectionFactory(final Context context,
+ final Properties initProps) {
+// final VariableResolver resolver = context.getVariableResolver();
+ final String jndiName = initProps.getProperty(JNDI_NAME);
+ final String url = initProps.getProperty(URL);
+ final String driver = initProps.getProperty(DRIVER);
+
+ if (url == null && jndiName == null)
+ throw new DataImportHandlerException(SEVERE,
+ "JDBC URL or JNDI name has to be specified");
+
+ if (driver != null) {
+ try {
+ DocBuilder.loadClass(driver, context.getSolrCore());
+ } catch (ClassNotFoundException e) {
+ wrapAndThrow(SEVERE, e, "Could not load driver: " + driver);
+ }
+ } else {
+ if(jndiName == null){
+ throw new DataImportHandlerException(SEVERE, "One of driver or jndiName must be specified in the data source");
+ }
+ }
+
+ String s = initProps.getProperty("maxRows");
+ if (s != null) {
+ maxRows = Integer.parseInt(s);
+ }
+
+ return factory = new Callable<Connection>() {
+ @Override
+ public Connection call() throws Exception {
+ log.info("Creating a connection for entity "
+ + context.getEntityAttribute(DataImporter.NAME) + " with URL: "
+ + url);
+ long start = System.nanoTime();
+ Connection c = null;
+
+ if (jndiName != null) {
+ c = getFromJndi(initProps, jndiName);
+ } else if (url != null) {
+ try {
+ c = DriverManager.getConnection(url, initProps);
+ } catch (SQLException e) {
+ // DriverManager does not allow you to use a driver which is not loaded through
+ // the class loader of the class which is trying to make the connection.
+ // This is a workaround for cases where the user puts the driver jar in the
+ // solr.home/lib or solr.home/core/lib directories.
+ Driver d = (Driver) DocBuilder.loadClass(driver, context.getSolrCore()).newInstance();
+ c = d.connect(url, initProps);
+ }
+ }
+ if (c != null) {
+ try {
+ initializeConnection(c, initProps);
+ } catch (SQLException e) {
+ try {
+ c.close();
+ } catch (SQLException e2) {
+ log.warn("Exception closing connection during cleanup", e2);
+ }
+
+ throw new DataImportHandlerException(SEVERE, "Exception initializing SQL connection", e);
+ }
+ }
+ log.info("Time taken for getConnection(): "
+ + TimeUnit.MILLISECONDS.convert(System.nanoTime() - start, TimeUnit.NANOSECONDS));
+ return c;
+ }
+
+ private void initializeConnection(Connection c, final Properties initProps)
+ throws SQLException {
+ if (Boolean.parseBoolean(initProps.getProperty("readOnly"))) {
+ c.setReadOnly(true);
+ // Add other sane defaults
+ c.setAutoCommit(true);
+ c.setTransactionIsolation(Connection.TRANSACTION_READ_UNCOMMITTED);
+ c.setHoldability(ResultSet.CLOSE_CURSORS_AT_COMMIT);
+ }
+ if (!Boolean.parseBoolean(initProps.getProperty("autoCommit"))) {
+ c.setAutoCommit(false);
+ }
+ String transactionIsolation = initProps.getProperty("transactionIsolation");
+ if ("TRANSACTION_READ_UNCOMMITTED".equals(transactionIsolation)) {
+ c.setTransactionIsolation(Connection.TRANSACTION_READ_UNCOMMITTED);
+ } else if ("TRANSACTION_READ_COMMITTED".equals(transactionIsolation)) {
+ c.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
+ } else if ("TRANSACTION_REPEATABLE_READ".equals(transactionIsolation)) {
+ c.setTransactionIsolation(Connection.TRANSACTION_REPEATABLE_READ);
+ } else if ("TRANSACTION_SERIALIZABLE".equals(transactionIsolation)) {
+ c.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
+ } else if ("TRANSACTION_NONE".equals(transactionIsolation)) {
+ c.setTransactionIsolation(Connection.TRANSACTION_NONE);
+ }
+ String holdability = initProps.getProperty("holdability");
+ if ("CLOSE_CURSORS_AT_COMMIT".equals(holdability)) {
+ c.setHoldability(ResultSet.CLOSE_CURSORS_AT_COMMIT);
+ } else if ("HOLD_CURSORS_OVER_COMMIT".equals(holdability)) {
+ c.setHoldability(ResultSet.HOLD_CURSORS_OVER_COMMIT);
+ }
+ }
+
+ private Connection getFromJndi(final Properties initProps, final String jndiName) throws NamingException,
+ SQLException {
+
+ Connection c = null;
+ InitialContext ctx = new InitialContext();
+ Object jndival = ctx.lookup(jndiName);
+ if (jndival instanceof javax.sql.DataSource) {
+ javax.sql.DataSource dataSource = (javax.sql.DataSource) jndival;
+ String user = (String) initProps.get("user");
+ String pass = (String) initProps.get("password");
+ if(user == null || user.trim().equals("")){
+ c = dataSource.getConnection();
+ } else {
+ c = dataSource.getConnection(user, pass);
+ }
+ } else {
+ throw new DataImportHandlerException(SEVERE,
+ "the jndi name : '"+jndiName +"' is not a valid javax.sql.DataSource");
+ }
+ return c;
+ }
+ };
+ }
+
+ private void resolveVariables(Context ctx, Properties initProps) {
+ for (Map.Entry<Object, Object> entry : initProps.entrySet()) {
+ if (entry.getValue() != null) {
+ entry.setValue(ctx.replaceTokens((String) entry.getValue()));
+ }
+ }
+ }
+
+ @Override
+ public Iterator<Map<String, Object>> getData(String query) {
+ if (resultSetIterator != null) {
+ resultSetIterator.close();
+ resultSetIterator = null;
+ }
+ resultSetIterator = createResultSetIterator(query);
+ return resultSetIterator.getIterator();
+ }
+
+ protected ResultSetIterator createResultSetIterator(String query) {
+ return new ResultSetIterator(query);
+ }
+
+ private void logError(String msg, Exception e) {
+ log.warn(msg, e);
+ }
+
+ protected List<String> readFieldNames(ResultSetMetaData metaData)
+ throws SQLException {
+ List<String> colNames = new ArrayList<>();
+ int count = metaData.getColumnCount();
+ for (int i = 0; i < count; i++) {
+ colNames.add(metaData.getColumnLabel(i + 1));
+ }
+ return colNames;
+ }
+
+ protected class ResultSetIterator {
+ private ResultSet resultSet;
+
+ private Statement stmt = null;
+
+ private List<String> colNames;
+
+ private Iterator<Map<String, Object>> rSetIterator;
+
+ public ResultSetIterator(String query) {
+
+ try {
+ Connection c = getConnection();
+ stmt = createStatement(c, batchSize, maxRows);
+ log.debug("Executing SQL: " + query);
+ long start = System.nanoTime();
+ resultSet = executeStatement(stmt, query);
+ log.trace("Time taken for sql :"
+ + TimeUnit.MILLISECONDS.convert(System.nanoTime() - start, TimeUnit.NANOSECONDS));
+ setColNames(resultSet);
+ } catch (Exception e) {
+ close();
+ wrapAndThrow(SEVERE, e, "Unable to execute query: " + query);
+ return;
+ }
+ if (resultSet == null) {
+ close();
+ rSetIterator = new ArrayList<Map<String, Object>>().iterator();
+ return;
+ }
+
+ rSetIterator = createIterator(convertType, fieldNameVsType);
+ }
+
+
+ protected Statement createStatement(final Connection c, final int batchSize, final int maxRows)
+ throws SQLException {
+ Statement statement = c.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+ statement.setFetchSize(batchSize);
+ statement.setMaxRows(maxRows);
+ return statement;
+ }
+
+ protected ResultSet executeStatement(Statement statement, String query) throws SQLException {
+ boolean resultSetReturned = statement.execute(query);
+ return getNextResultSet(resultSetReturned, statement);
+ }
+
+ protected ResultSet getNextResultSet(final boolean initialResultSetAvailable, final Statement statement) throws SQLException {
+ boolean resultSetAvailable = initialResultSetAvailable;
+ while (!resultSetAvailable && statement.getUpdateCount() != -1) {
+ resultSetAvailable = statement.getMoreResults();
+ }
+ if (resultSetAvailable) {
+ return statement.getResultSet();
+ }
+ return null;
+ }
+
+ protected void setColNames(final ResultSet resultSet) throws SQLException {
+ if (resultSet != null) {
+ colNames = readFieldNames(resultSet.getMetaData());
+ } else {
+ colNames = Collections.emptyList();
+ }
+ }
+
+ protected Iterator<Map<String,Object>> createIterator(final boolean convertType,
+ final Map<String,Integer> fieldNameVsType) {
+ return new Iterator<Map<String,Object>>() {
+ @Override
+ public boolean hasNext() {
+ return hasnext();
+ }
+
+ @Override
+ public Map<String,Object> next() {
+ return getARow(convertType, fieldNameVsType);
+ }
+
+ @Override
+ public void remove() {/* do nothing */
+ }
+ };
+ }
+
+
+
+ protected Map<String,Object> getARow(boolean convertType, Map<String,Integer> fieldNameVsType) {
+ if (getResultSet() == null)
+ return null;
+ Map<String, Object> result = new HashMap<>();
+ for (String colName : getColNames()) {
+ try {
+ if (!convertType) {
+ // Use underlying database's type information except for BigDecimal and BigInteger
+ // which cannot be serialized by JavaBin/XML. See SOLR-6165
+ Object value = getResultSet().getObject(colName);
+ if (value instanceof BigDecimal || value instanceof BigInteger) {
+ result.put(colName, value.toString());
+ } else {
+ result.put(colName, value);
+ }
+ continue;
+ }
+
+ Integer type = fieldNameVsType.get(colName);
+ if (type == null)
+ type = Types.VARCHAR;
+ switch (type) {
+ case Types.INTEGER:
+ result.put(colName, getResultSet().getInt(colName));
+ break;
+ case Types.FLOAT:
+ result.put(colName, getResultSet().getFloat(colName));
+ break;
+ case Types.BIGINT:
+ result.put(colName, getResultSet().getLong(colName));
+ break;
+ case Types.DOUBLE:
+ result.put(colName, getResultSet().getDouble(colName));
+ break;
+ case Types.DATE:
+ result.put(colName, getResultSet().getTimestamp(colName));
+ break;
+ case Types.BOOLEAN:
+ result.put(colName, getResultSet().getBoolean(colName));
+ break;
+ case Types.BLOB:
+ result.put(colName, getResultSet().getBytes(colName));
+ break;
+ default:
+ result.put(colName, getResultSet().getString(colName));
+ break;
+ }
+ } catch (SQLException e) {
+ logError("Error reading data ", e);
+ wrapAndThrow(SEVERE, e, "Error reading data from database");
+ }
+ }
+ return result;
+ }
+
+ protected boolean hasnext() {
+ if (getResultSet() == null) {
+ close();
+ return false;
+ }
+ try {
+ if (getResultSet().next()) {
+ return true;
+ } else {
+ closeResultSet();
+ setResultSet(getNextResultSet(getStatement().getMoreResults(), getStatement()));
+ setColNames(getResultSet());
+ return hasnext();
+ }
+ } catch (SQLException e) {
+ close();
+ wrapAndThrow(SEVERE,e);
+ return false;
+ }
+ }
+
+ protected void close() {
+ closeResultSet();
+ try {
+ if (getStatement() != null)
+ getStatement().close();
+ } catch (Exception e) {
+ logError("Exception while closing statement", e);
+ } finally {
+ setStatement(null);
+ }
+ }
+
+ protected void closeResultSet() {
+ try {
+ if (getResultSet() != null) {
+ getResultSet().close();
+ }
+ } catch (Exception e) {
+ logError("Exception while closing result set", e);
+ } finally {
+ setResultSet(null);
+ }
+ }
+
+ protected final Iterator<Map<String,Object>> getIterator() {
+ return rSetIterator;
+ }
+
+
+ protected final Statement getStatement() {
+ return stmt;
+ }
+
+ protected final void setStatement(Statement stmt) {
+ this.stmt = stmt;
+ }
+
+ protected final ResultSet getResultSet() {
+ return resultSet;
+ }
+
+ protected final void setResultSet(ResultSet resultSet) {
+ this.resultSet = resultSet;
+ }
+
+ protected final List<String> getColNames() {
+ return colNames;
+ }
+
+ protected final void setColNames(List<String> colNames) {
+ this.colNames = colNames;
+ }
+
+ }
+
+ protected Connection getConnection() throws Exception {
+ long currTime = System.nanoTime();
+ if (currTime - connLastUsed > CONN_TIME_OUT) {
+ synchronized (this) {
+ Connection tmpConn = factory.call();
+ closeConnection();
+ connLastUsed = System.nanoTime();
+ return conn = tmpConn;
+ }
+
+ } else {
+ connLastUsed = currTime;
+ return conn;
+ }
+ }
+
+ @Override
+ protected void finalize() throws Throwable {
+ try {
+ if(!isClosed){
+ log.error("JdbcDataSource was not closed prior to finalize(), indicates a bug -- POSSIBLE RESOURCE LEAK!!!");
+ close();
+ }
+ } finally {
+ super.finalize();
+ }
+ }
+
+ private boolean isClosed = false;
+
+ @Override
+ public void close() {
+ if (resultSetIterator != null) {
+ resultSetIterator.close();
+ }
+ try {
+ closeConnection();
+ } finally {
+ isClosed = true;
+ }
+ }
+
+ private void closeConnection() {
+ try {
+ if (conn != null) {
+ try {
+ //SOLR-2045
+ conn.commit();
+ } catch(Exception ex) {
+ //ignore.
+ }
+ conn.close();
+ }
+ } catch (Exception e) {
+ log.error("Ignoring Error when closing connection", e);
+ }
+ }
+
+ private static final long CONN_TIME_OUT = TimeUnit.NANOSECONDS.convert(10, TimeUnit.SECONDS);
+
+ private static final int FETCH_SIZE = 500;
+
+ public static final String URL = "url";
+
+ public static final String JNDI_NAME = "jndiName";
+
+ public static final String DRIVER = "driver";
+
+ public static final String CONVERT_TYPE = "convertType";
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d7c03684/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/LineEntityProcessor.java
----------------------------------------------------------------------
diff --git a/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/LineEntityProcessor.java b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/LineEntityProcessor.java
new file mode 100644
index 0000000..0940cbd
--- /dev/null
+++ b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/LineEntityProcessor.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.dataimport;
+
+import java.io.*;
+import java.util.*;
+import java.util.regex.Pattern;
+
+import org.apache.commons.io.IOUtils;
+
+
+/**
+ * <p>
+ * An {@link EntityProcessor} instance which can stream lines of text read from a
+ * datasource. Options allow lines to be explicitly skipped or included in the index.
+ * </p>
+ * <p>
+ * Attribute summary
+ * <ul>
+ * <li>url is the required location of the input file. If this value is
+ * relative, it assumed to be relative to baseLoc.</li>
+ * <li>acceptLineRegex is an optional attribute that if present discards any
+ * line which does not match the regExp.</li>
+ * <li>skipLineRegex is an optional attribute that is applied after any
+ * acceptLineRegex and discards any line which matches this regExp.</li>
+ * </ul>
+ * <p>
+ * Although envisioned for reading lines from a file or url, LineEntityProcessor may also be useful
+ * for dealing with change lists, where each line contains filenames which can be used by subsequent entities
+ * to parse content from those files.
+ * <p>
+ * Refer to <a
+ * href="http://wiki.apache.org/solr/DataImportHandler">http://wiki.apache.org/solr/DataImportHandler</a>
+ * for more details.
+ * </p>
+ * <p>
+ * <b>This API is experimental and may change in the future.</b>
+ *
+ * @since solr 1.4
+ * @see Pattern
+ */
+public class LineEntityProcessor extends EntityProcessorBase {
+ private Pattern acceptLineRegex, skipLineRegex;
+ private String url;
+ private BufferedReader reader;
+
+ /**
+ * Parses each of the entity attributes.
+ */
+ @Override
+ public void init(Context context) {
+ super.init(context);
+ String s;
+
+ // init a regex to locate files from the input we want to index
+ s = context.getResolvedEntityAttribute(ACCEPT_LINE_REGEX);
+ if (s != null) {
+ acceptLineRegex = Pattern.compile(s);
+ }
+
+ // init a regex to locate files from the input to be skipped
+ s = context.getResolvedEntityAttribute(SKIP_LINE_REGEX);
+ if (s != null) {
+ skipLineRegex = Pattern.compile(s);
+ }
+
+ // the FileName is required.
+ url = context.getResolvedEntityAttribute(URL);
+ if (url == null) throw
+ new DataImportHandlerException(DataImportHandlerException.SEVERE,
+ "'"+ URL +"' is a required attribute");
+ }
+
+
+ /**
+ * Reads lines from the url till it finds a lines that matches the
+ * optional acceptLineRegex and does not match the optional skipLineRegex.
+ *
+ * @return A row containing a minimum of one field "rawLine" or null to signal
+ * end of file. The rawLine is the as line as returned by readLine()
+ * from the url. However transformers can be used to create as
+ * many other fields as required.
+ */
+ @Override
+ public Map<String, Object> nextRow() {
+ if (reader == null) {
+ reader = new BufferedReader((Reader) context.getDataSource().getData(url));
+ }
+
+ String line;
+
+ while ( true ) {
+ // read a line from the input file
+ try {
+ line = reader.readLine();
+ }
+ catch (IOException exp) {
+ throw new DataImportHandlerException(DataImportHandlerException.SEVERE,
+ "Problem reading from input", exp);
+ }
+
+ // end of input
+ if (line == null) {
+ closeResources();
+ return null;
+ }
+
+ // First scan whole line to see if we want it
+ if (acceptLineRegex != null && ! acceptLineRegex.matcher(line).find()) continue;
+ if (skipLineRegex != null && skipLineRegex.matcher(line).find()) continue;
+ // Contruct the 'row' of fields
+ Map<String, Object> row = new HashMap<>();
+ row.put("rawLine", line);
+ return row;
+ }
+ }
+
+ public void closeResources() {
+ if (reader != null) {
+ IOUtils.closeQuietly(reader);
+ }
+ reader= null;
+ }
+
+ @Override
+ public void destroy() {
+ closeResources();
+ super.destroy();
+ }
+
+ /**
+ * Holds the name of entity attribute that will be parsed to obtain
+ * the filename containing the changelist.
+ */
+ public static final String URL = "url";
+
+ /**
+ * Holds the name of entity attribute that will be parsed to obtain
+ * the pattern to be used when checking to see if a line should
+ * be returned.
+ */
+ public static final String ACCEPT_LINE_REGEX = "acceptLineRegex";
+
+ /**
+ * Holds the name of entity attribute that will be parsed to obtain
+ * the pattern to be used when checking to see if a line should
+ * be ignored.
+ */
+ public static final String SKIP_LINE_REGEX = "skipLineRegex";
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d7c03684/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/LogTransformer.java
----------------------------------------------------------------------
diff --git a/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/LogTransformer.java b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/LogTransformer.java
new file mode 100644
index 0000000..66c525e
--- /dev/null
+++ b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/LogTransformer.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.dataimport;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.Map;
+
+/**
+ * A {@link Transformer} implementation which logs messages in a given template format.
+ * <p>
+ * Refer to <a href="http://wiki.apache.org/solr/DataImportHandler">http://wiki.apache.org/solr/DataImportHandler</a>
+ * for more details.
+ * <p>
+ * <b>This API is experimental and may change in the future.</b>
+ *
+ * @since solr 1.4
+ */
+public class LogTransformer extends Transformer {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ @Override
+ public Object transformRow(Map<String, Object> row, Context ctx) {
+ String expr = ctx.getEntityAttribute(LOG_TEMPLATE);
+ String level = ctx.replaceTokens(ctx.getEntityAttribute(LOG_LEVEL));
+
+ if (expr == null || level == null) return row;
+
+ if ("info".equals(level)) {
+ if (log.isInfoEnabled())
+ log.info(ctx.replaceTokens(expr));
+ } else if ("trace".equals(level)) {
+ if (log.isTraceEnabled())
+ log.trace(ctx.replaceTokens(expr));
+ } else if ("warn".equals(level)) {
+ if (log.isWarnEnabled())
+ log.warn(ctx.replaceTokens(expr));
+ } else if ("error".equals(level)) {
+ if (log.isErrorEnabled())
+ log.error(ctx.replaceTokens(expr));
+ } else if ("debug".equals(level)) {
+ if (log.isDebugEnabled())
+ log.debug(ctx.replaceTokens(expr));
+ }
+
+ return row;
+ }
+
+ public static final String LOG_TEMPLATE = "logTemplate";
+ public static final String LOG_LEVEL = "logLevel";
+}