You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2018/11/02 11:33:37 UTC
[16/25] lucene-solr:jira/gradle: Adding dataimporthandler-extras
module
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d7c03684/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DataImporter.java
----------------------------------------------------------------------
diff --git a/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DataImporter.java b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DataImporter.java
new file mode 100644
index 0000000..d610d66
--- /dev/null
+++ b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DataImporter.java
@@ -0,0 +1,622 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.dataimport;
+
+import org.apache.solr.common.EmptyEntityResolver;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.schema.IndexSchema;
+import org.apache.solr.util.SystemIdResolver;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.XMLErrorLogger;
+import org.apache.solr.handler.dataimport.config.ConfigNameConstants;
+import org.apache.solr.handler.dataimport.config.ConfigParseUtil;
+import org.apache.solr.handler.dataimport.config.DIHConfiguration;
+import org.apache.solr.handler.dataimport.config.Entity;
+import org.apache.solr.handler.dataimport.config.PropertyWriter;
+import org.apache.solr.handler.dataimport.config.Script;
+
+import static org.apache.solr.handler.dataimport.DataImportHandlerException.wrapAndThrow;
+import static org.apache.solr.handler.dataimport.DataImportHandlerException.SEVERE;
+import static org.apache.solr.handler.dataimport.DocBuilder.loadClass;
+import static org.apache.solr.handler.dataimport.config.ConfigNameConstants.CLASS;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.NodeList;
+import org.xml.sax.InputSource;
+import org.apache.commons.io.IOUtils;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * <p> Stores all configuration information for pulling and indexing data. </p>
+ * <p>
+ * <b>This API is experimental and subject to change</b>
+ *
+ * @since solr 1.3
+ */
+public class DataImporter {
+
+ public enum Status {
+ IDLE, RUNNING_FULL_DUMP, RUNNING_DELTA_DUMP, JOB_FAILED
+ }
+
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private static final XMLErrorLogger XMLLOG = new XMLErrorLogger(log);
+
+ private Status status = Status.IDLE;
+ private DIHConfiguration config;
+ private Date indexStartTime;
+ private Properties store = new Properties();
+ private Map<String, Map<String,String>> requestLevelDataSourceProps = new HashMap<>();
+ private IndexSchema schema;
+ public DocBuilder docBuilder;
+ public DocBuilder.Statistics cumulativeStatistics = new DocBuilder.Statistics();
+ private SolrCore core;
+ private Map<String, Object> coreScopeSession = new ConcurrentHashMap<>();
+ private ReentrantLock importLock = new ReentrantLock();
+ private boolean isDeltaImportSupported = false;
+ private final String handlerName;
+
+ /**
+ * Only for testing purposes
+ */
+ DataImporter() {
+ this.handlerName = "dataimport" ;
+ }
+
+ DataImporter(SolrCore core, String handlerName) {
+ this.handlerName = handlerName;
+ this.core = core;
+ this.schema = core.getLatestSchema();
+ }
+
+
+
+
+ boolean maybeReloadConfiguration(RequestInfo params,
+ NamedList<?> defaultParams) throws IOException {
+ if (importLock.tryLock()) {
+ boolean success = false;
+ try {
+ if (null != params.getRequest()) {
+ if (schema != params.getRequest().getSchema()) {
+ schema = params.getRequest().getSchema();
+ }
+ }
+ String dataConfigText = params.getDataConfig();
+ String dataconfigFile = params.getConfigFile();
+ InputSource is = null;
+ if(dataConfigText!=null && dataConfigText.length()>0) {
+ is = new InputSource(new StringReader(dataConfigText));
+ } else if(dataconfigFile!=null) {
+ is = new InputSource(core.getResourceLoader().openResource(dataconfigFile));
+ is.setSystemId(SystemIdResolver.createSystemIdFromResourceName(dataconfigFile));
+ log.info("Loading DIH Configuration: " + dataconfigFile);
+ }
+ if(is!=null) {
+ config = loadDataConfig(is);
+ success = true;
+ }
+
+ Map<String,Map<String,String>> dsProps = new HashMap<>();
+ if(defaultParams!=null) {
+ int position = 0;
+ while (position < defaultParams.size()) {
+ if (defaultParams.getName(position) == null) {
+ break;
+ }
+ String name = defaultParams.getName(position);
+ if (name.equals("datasource")) {
+ success = true;
+ NamedList dsConfig = (NamedList) defaultParams.getVal(position);
+ log.info("Getting configuration for Global Datasource...");
+ Map<String,String> props = new HashMap<>();
+ for (int i = 0; i < dsConfig.size(); i++) {
+ props.put(dsConfig.getName(i), dsConfig.getVal(i).toString());
+ }
+ log.info("Adding properties to datasource: " + props);
+ dsProps.put((String) dsConfig.get("name"), props);
+ }
+ position++;
+ }
+ }
+ requestLevelDataSourceProps = Collections.unmodifiableMap(dsProps);
+ } catch(IOException ioe) {
+ throw ioe;
+ } finally {
+ importLock.unlock();
+ }
+ return success;
+ } else {
+ return false;
+ }
+ }
+
+
+
+ public String getHandlerName() {
+ return handlerName;
+ }
+
+ public IndexSchema getSchema() {
+ return schema;
+ }
+
+ /**
+ * Used by tests
+ */
+ void loadAndInit(String configStr) {
+ config = loadDataConfig(new InputSource(new StringReader(configStr)));
+ }
+
+ void loadAndInit(InputSource configFile) {
+ config = loadDataConfig(configFile);
+ }
+
+ public DIHConfiguration loadDataConfig(InputSource configFile) {
+
+ DIHConfiguration dihcfg = null;
+ try {
+ DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
+ dbf.setValidating(false);
+
+ // only enable xinclude, if XML is coming from safe source (local file)
+ // and a a SolrCore and SystemId is present (makes no sense otherwise):
+ if (core != null && configFile.getSystemId() != null) {
+ try {
+ dbf.setXIncludeAware(true);
+ dbf.setNamespaceAware(true);
+ } catch( UnsupportedOperationException e ) {
+ log.warn( "XML parser doesn't support XInclude option" );
+ }
+ }
+
+ DocumentBuilder builder = dbf.newDocumentBuilder();
+ // only enable xinclude / external entities, if XML is coming from
+ // safe source (local file) and a a SolrCore and SystemId is present:
+ if (core != null && configFile.getSystemId() != null) {
+ builder.setEntityResolver(new SystemIdResolver(core.getResourceLoader()));
+ } else {
+ // Don't allow external entities without having a system ID:
+ builder.setEntityResolver(EmptyEntityResolver.SAX_INSTANCE);
+ }
+ builder.setErrorHandler(XMLLOG);
+ Document document;
+ try {
+ document = builder.parse(configFile);
+ } finally {
+ // some XML parsers are broken and don't close the byte stream (but they should according to spec)
+ IOUtils.closeQuietly(configFile.getByteStream());
+ }
+
+ dihcfg = readFromXml(document);
+ log.info("Data Configuration loaded successfully");
+ } catch (Exception e) {
+ throw new DataImportHandlerException(SEVERE,
+ "Data Config problem: " + e.getMessage(), e);
+ }
+ for (Entity e : dihcfg.getEntities()) {
+ if (e.getAllAttributes().containsKey(SqlEntityProcessor.DELTA_QUERY)) {
+ isDeltaImportSupported = true;
+ break;
+ }
+ }
+ return dihcfg;
+ }
+
+ public DIHConfiguration readFromXml(Document xmlDocument) {
+ DIHConfiguration config;
+ List<Map<String, String >> functions = new ArrayList<>();
+ Script script = null;
+ Map<String, Map<String,String>> dataSources = new HashMap<>();
+
+ NodeList dataConfigTags = xmlDocument.getElementsByTagName("dataConfig");
+ if(dataConfigTags == null || dataConfigTags.getLength() == 0) {
+ throw new DataImportHandlerException(SEVERE, "the root node '<dataConfig>' is missing");
+ }
+ Element e = (Element) dataConfigTags.item(0);
+ List<Element> documentTags = ConfigParseUtil.getChildNodes(e, "document");
+ if (documentTags.isEmpty()) {
+ throw new DataImportHandlerException(SEVERE, "DataImportHandler " +
+ "configuration file must have one <document> node.");
+ }
+
+ List<Element> scriptTags = ConfigParseUtil.getChildNodes(e, ConfigNameConstants.SCRIPT);
+ if (!scriptTags.isEmpty()) {
+ script = new Script(scriptTags.get(0));
+ }
+
+ // Add the provided evaluators
+ List<Element> functionTags = ConfigParseUtil.getChildNodes(e, ConfigNameConstants.FUNCTION);
+ if (!functionTags.isEmpty()) {
+ for (Element element : functionTags) {
+ String func = ConfigParseUtil.getStringAttribute(element, NAME, null);
+ String clz = ConfigParseUtil.getStringAttribute(element, ConfigNameConstants.CLASS, null);
+ if (func == null || clz == null){
+ throw new DataImportHandlerException(
+ SEVERE,
+ "<function> must have a 'name' and 'class' attributes");
+ } else {
+ functions.add(ConfigParseUtil.getAllAttributes(element));
+ }
+ }
+ }
+ List<Element> dataSourceTags = ConfigParseUtil.getChildNodes(e, ConfigNameConstants.DATA_SRC);
+ if (!dataSourceTags.isEmpty()) {
+ for (Element element : dataSourceTags) {
+ Map<String,String> p = new HashMap<>();
+ HashMap<String, String> attrs = ConfigParseUtil.getAllAttributes(element);
+ for (Map.Entry<String, String> entry : attrs.entrySet()) {
+ p.put(entry.getKey(), entry.getValue());
+ }
+ dataSources.put(p.get("name"), p);
+ }
+ }
+ if(dataSources.get(null) == null){
+ for (Map<String,String> properties : dataSources.values()) {
+ dataSources.put(null,properties);
+ break;
+ }
+ }
+ PropertyWriter pw = null;
+ List<Element> propertyWriterTags = ConfigParseUtil.getChildNodes(e, ConfigNameConstants.PROPERTY_WRITER);
+ if (propertyWriterTags.isEmpty()) {
+ boolean zookeeper = false;
+ if (this.core != null
+ && this.core.getCoreContainer().isZooKeeperAware()) {
+ zookeeper = true;
+ }
+ pw = new PropertyWriter(zookeeper ? "ZKPropertiesWriter"
+ : "SimplePropertiesWriter", Collections.<String,String> emptyMap());
+ } else if (propertyWriterTags.size() > 1) {
+ throw new DataImportHandlerException(SEVERE, "Only one "
+ + ConfigNameConstants.PROPERTY_WRITER + " can be configured.");
+ } else {
+ Element pwElement = propertyWriterTags.get(0);
+ String type = null;
+ Map<String,String> params = new HashMap<>();
+ for (Map.Entry<String,String> entry : ConfigParseUtil.getAllAttributes(
+ pwElement).entrySet()) {
+ if (TYPE.equals(entry.getKey())) {
+ type = entry.getValue();
+ } else {
+ params.put(entry.getKey(), entry.getValue());
+ }
+ }
+ if (type == null) {
+ throw new DataImportHandlerException(SEVERE, "The "
+ + ConfigNameConstants.PROPERTY_WRITER + " element must specify "
+ + TYPE);
+ }
+ pw = new PropertyWriter(type, params);
+ }
+ return new DIHConfiguration(documentTags.get(0), this, functions, script, dataSources, pw);
+ }
+
+ @SuppressWarnings("unchecked")
+ private DIHProperties createPropertyWriter() {
+ DIHProperties propWriter = null;
+ PropertyWriter configPw = config.getPropertyWriter();
+ try {
+ Class<DIHProperties> writerClass = DocBuilder.loadClass(configPw.getType(), this.core);
+ propWriter = writerClass.newInstance();
+ propWriter.init(this, configPw.getParameters());
+ } catch (Exception e) {
+ throw new DataImportHandlerException(DataImportHandlerException.SEVERE, "Unable to PropertyWriter implementation:" + configPw.getType(), e);
+ }
+ return propWriter;
+ }
+
+ public DIHConfiguration getConfig() {
+ return config;
+ }
+
+ Date getIndexStartTime() {
+ return indexStartTime;
+ }
+
+ void setIndexStartTime(Date indextStartTime) {
+ this.indexStartTime = indextStartTime;
+ }
+
+ void store(Object key, Object value) {
+ store.put(key, value);
+ }
+
+ Object retrieve(Object key) {
+ return store.get(key);
+ }
+
+ public DataSource getDataSourceInstance(Entity key, String name, Context ctx) {
+ Map<String,String> p = requestLevelDataSourceProps.get(name);
+ if (p == null)
+ p = config.getDataSources().get(name);
+ if (p == null)
+ p = requestLevelDataSourceProps.get(null);// for default data source
+ if (p == null)
+ p = config.getDataSources().get(null);
+ if (p == null)
+ throw new DataImportHandlerException(SEVERE,
+ "No dataSource :" + name + " available for entity :" + key.getName());
+ String type = p.get(TYPE);
+ DataSource dataSrc = null;
+ if (type == null) {
+ dataSrc = new JdbcDataSource();
+ } else {
+ try {
+ dataSrc = (DataSource) DocBuilder.loadClass(type, getCore()).newInstance();
+ } catch (Exception e) {
+ wrapAndThrow(SEVERE, e, "Invalid type for data source: " + type);
+ }
+ }
+ try {
+ Properties copyProps = new Properties();
+ copyProps.putAll(p);
+ Map<String, Object> map = ctx.getRequestParameters();
+ if (map.containsKey("rows")) {
+ int rows = Integer.parseInt((String) map.get("rows"));
+ if (map.containsKey("start")) {
+ rows += Integer.parseInt((String) map.get("start"));
+ }
+ copyProps.setProperty("maxRows", String.valueOf(rows));
+ }
+ dataSrc.init(ctx, copyProps);
+ } catch (Exception e) {
+ wrapAndThrow(SEVERE, e, "Failed to initialize DataSource: " + key.getDataSourceName());
+ }
+ return dataSrc;
+ }
+
+ public Status getStatus() {
+ return status;
+ }
+
+ public void setStatus(Status status) {
+ this.status = status;
+ }
+
+ public boolean isBusy() {
+ return importLock.isLocked();
+ }
+
+ public void doFullImport(DIHWriter writer, RequestInfo requestParams) {
+ log.info("Starting Full Import");
+ setStatus(Status.RUNNING_FULL_DUMP);
+ try {
+ DIHProperties dihPropWriter = createPropertyWriter();
+ setIndexStartTime(dihPropWriter.getCurrentTimestamp());
+ docBuilder = new DocBuilder(this, writer, dihPropWriter, requestParams);
+ checkWritablePersistFile(writer, dihPropWriter);
+ docBuilder.execute();
+ if (!requestParams.isDebug())
+ cumulativeStatistics.add(docBuilder.importStatistics);
+ } catch (Exception e) {
+ SolrException.log(log, "Full Import failed", e);
+ docBuilder.handleError("Full Import failed", e);
+ } finally {
+ setStatus(Status.IDLE);
+ DocBuilder.INSTANCE.set(null);
+ }
+
+ }
+
+ private void checkWritablePersistFile(DIHWriter writer, DIHProperties dihPropWriter) {
+ if (isDeltaImportSupported && !dihPropWriter.isWritable()) {
+ throw new DataImportHandlerException(SEVERE,
+ "Properties is not writable. Delta imports are supported by data config but will not work.");
+ }
+ }
+
+ public void doDeltaImport(DIHWriter writer, RequestInfo requestParams) {
+ log.info("Starting Delta Import");
+ setStatus(Status.RUNNING_DELTA_DUMP);
+ try {
+ DIHProperties dihPropWriter = createPropertyWriter();
+ setIndexStartTime(dihPropWriter.getCurrentTimestamp());
+ docBuilder = new DocBuilder(this, writer, dihPropWriter, requestParams);
+ checkWritablePersistFile(writer, dihPropWriter);
+ docBuilder.execute();
+ if (!requestParams.isDebug())
+ cumulativeStatistics.add(docBuilder.importStatistics);
+ } catch (Exception e) {
+ log.error("Delta Import Failed", e);
+ docBuilder.handleError("Delta Import Failed", e);
+ } finally {
+ setStatus(Status.IDLE);
+ DocBuilder.INSTANCE.set(null);
+ }
+
+ }
+
+ public void runAsync(final RequestInfo reqParams, final DIHWriter sw) {
+ new Thread(() -> runCmd(reqParams, sw)).start();
+ }
+
+ void runCmd(RequestInfo reqParams, DIHWriter sw) {
+ String command = reqParams.getCommand();
+ if (command.equals(ABORT_CMD)) {
+ if (docBuilder != null) {
+ docBuilder.abort();
+ }
+ return;
+ }
+ if (!importLock.tryLock()){
+ log.warn("Import command failed . another import is running");
+ return;
+ }
+ try {
+ if (FULL_IMPORT_CMD.equals(command) || IMPORT_CMD.equals(command)) {
+ doFullImport(sw, reqParams);
+ } else if (command.equals(DELTA_IMPORT_CMD)) {
+ doDeltaImport(sw, reqParams);
+ }
+ } finally {
+ importLock.unlock();
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ Map<String, String> getStatusMessages() {
+ //this map object is a Collections.synchronizedMap(new LinkedHashMap()). if we
+ // synchronize on the object it must be safe to iterate through the map
+ Map statusMessages = (Map) retrieve(STATUS_MSGS);
+ Map<String, String> result = new LinkedHashMap<>();
+ if (statusMessages != null) {
+ synchronized (statusMessages) {
+ for (Object o : statusMessages.entrySet()) {
+ Map.Entry e = (Map.Entry) o;
+ //the toString is taken because some of the Objects create the data lazily when toString() is called
+ result.put((String) e.getKey(), e.getValue().toString());
+ }
+ }
+ }
+ return result;
+
+ }
+
+ public DocBuilder getDocBuilder() {
+ return docBuilder;
+ }
+
+ public DocBuilder getDocBuilder(DIHWriter writer, RequestInfo requestParams) {
+ DIHProperties dihPropWriter = createPropertyWriter();
+ return new DocBuilder(this, writer, dihPropWriter, requestParams);
+ }
+
+ Map<String, Evaluator> getEvaluators() {
+ return getEvaluators(config.getFunctions());
+ }
+
+ /**
+ * used by tests.
+ */
+ Map<String, Evaluator> getEvaluators(List<Map<String,String>> fn) {
+ Map<String, Evaluator> evaluators = new HashMap<>();
+ evaluators.put(Evaluator.DATE_FORMAT_EVALUATOR, new DateFormatEvaluator());
+ evaluators.put(Evaluator.SQL_ESCAPE_EVALUATOR, new SqlEscapingEvaluator());
+ evaluators.put(Evaluator.URL_ENCODE_EVALUATOR, new UrlEvaluator());
+ evaluators.put(Evaluator.ESCAPE_SOLR_QUERY_CHARS, new SolrQueryEscapingEvaluator());
+ SolrCore core = docBuilder == null ? null : docBuilder.dataImporter.getCore();
+ for (Map<String, String> map : fn) {
+ try {
+ evaluators.put(map.get(NAME), (Evaluator) loadClass(map.get(CLASS), core).newInstance());
+ } catch (Exception e) {
+ wrapAndThrow(SEVERE, e, "Unable to instantiate evaluator: " + map.get(CLASS));
+ }
+ }
+ return evaluators;
+ }
+
+ static final ThreadLocal<AtomicLong> QUERY_COUNT = new ThreadLocal<AtomicLong>() {
+ @Override
+ protected AtomicLong initialValue() {
+ return new AtomicLong();
+ }
+ };
+
+
+
+ static final class MSG {
+ public static final String NO_CONFIG_FOUND = "Configuration not found";
+
+ public static final String NO_INIT = "DataImportHandler started. Not Initialized. No commands can be run";
+
+ public static final String INVALID_CONFIG = "FATAL: Could not create importer. DataImporter config invalid";
+
+ public static final String LOAD_EXP = "Exception while loading DataImporter";
+
+ public static final String JMX_DESC = "Manage data import from databases to Solr";
+
+ public static final String CMD_RUNNING = "A command is still running...";
+
+ public static final String DEBUG_NOT_ENABLED = "Debug not enabled. Add a tag <str name=\"enableDebug\">true</str> in solrconfig.xml";
+
+ public static final String CONFIG_RELOADED = "Configuration Re-loaded sucessfully";
+
+ public static final String CONFIG_NOT_RELOADED = "Configuration NOT Re-loaded...Data Importer is busy.";
+
+ public static final String TOTAL_DOC_PROCESSED = "Total Documents Processed";
+
+ public static final String TOTAL_FAILED_DOCS = "Total Documents Failed";
+
+ public static final String TOTAL_QUERIES_EXECUTED = "Total Requests made to DataSource";
+
+ public static final String TOTAL_ROWS_EXECUTED = "Total Rows Fetched";
+
+ public static final String TOTAL_DOCS_DELETED = "Total Documents Deleted";
+
+ public static final String TOTAL_DOCS_SKIPPED = "Total Documents Skipped";
+ }
+
+ public SolrCore getCore() {
+ return core;
+ }
+
+ void putToCoreScopeSession(String key, Object val) {
+ coreScopeSession.put(key, val);
+ }
+ Object getFromCoreScopeSession(String key) {
+ return coreScopeSession.get(key);
+ }
+
+ public static final String COLUMN = "column";
+
+ public static final String TYPE = "type";
+
+ public static final String DATA_SRC = "dataSource";
+
+ public static final String MULTI_VALUED = "multiValued";
+
+ public static final String NAME = "name";
+
+ public static final String STATUS_MSGS = "status-messages";
+
+ public static final String FULL_IMPORT_CMD = "full-import";
+
+ public static final String IMPORT_CMD = "import";
+
+ public static final String DELTA_IMPORT_CMD = "delta-import";
+
+ public static final String ABORT_CMD = "abort";
+
+ public static final String DEBUG_MODE = "debug";
+
+ public static final String RELOAD_CONF_CMD = "reload-config";
+
+ public static final String SHOW_CONF_CMD = "show-config";
+
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d7c03684/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DataSource.java
----------------------------------------------------------------------
diff --git a/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DataSource.java b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DataSource.java
new file mode 100644
index 0000000..e217ddd
--- /dev/null
+++ b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DataSource.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.dataimport;
+
+import java.util.Properties;
+
+/**
+ * <p>
+ * Provides data from a source with a given query.
+ * </p>
+ * <p>
+ * Implementation of this abstract class must provide a default no-arg constructor
+ * </p>
+ * <p>
+ * Refer to <a
+ * href="http://wiki.apache.org/solr/DataImportHandler">http://wiki.apache.org/solr/DataImportHandler</a>
+ * for more details.
+ * </p>
+ * <p>
+ * <b>This API is experimental and may change in the future.</b>
+ *
+ * @since solr 1.3
+ */
+public abstract class DataSource<T> {
+
+ /**
+ * Initializes the DataSource with the <code>Context</code> and
+ * initialization properties.
+ * <p>
+ * This is invoked by the <code>DataImporter</code> after creating an
+ * instance of this class.
+ */
+ public abstract void init(Context context, Properties initProps);
+
+ /**
+ * Get records for the given query.The return type depends on the
+ * implementation .
+ *
+ * @param query The query string. It can be a SQL for JdbcDataSource or a URL
+ * for HttpDataSource or a file location for FileDataSource or a custom
+ * format for your own custom DataSource.
+ * @return Depends on the implementation. For instance JdbcDataSource returns
+ * an Iterator<Map <String,Object>>
+ */
+ public abstract T getData(String query);
+
+ /**
+ * Cleans up resources of this DataSource after use.
+ */
+ public abstract void close();
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d7c03684/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DateFormatEvaluator.java
----------------------------------------------------------------------
diff --git a/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DateFormatEvaluator.java b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DateFormatEvaluator.java
new file mode 100644
index 0000000..f4df820
--- /dev/null
+++ b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DateFormatEvaluator.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.dataimport;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.IllformedLocaleException;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.TimeZone;
+
+import org.apache.solr.common.util.SuppressForbidden;
+import org.apache.solr.handler.dataimport.config.EntityField;
+import org.apache.solr.util.DateMathParser;
+
+import static org.apache.solr.handler.dataimport.DataImportHandlerException.SEVERE;
+import static org.apache.solr.handler.dataimport.DataImportHandlerException.wrapAndThrow;
+
+/**
+ * <p>Formats values using a given date format. </p>
+ * <p>Pass three parameters:
+ * <ul>
+ * <li>An {@link EntityField} or a date expression to be parsed with
+ * the {@link DateMathParser} class If the value is in a String,
+ * then it is assumed to be a datemath expression, otherwise it
+ * resolved using a {@link VariableResolver} instance</li>
+ * <li>A date format see {@link SimpleDateFormat} for the syntax.</li>
+ * <li>The {@link Locale} to parse.
+ * (optional. Defaults to the Root Locale) </li>
+ * </ul>
+ */
+public class DateFormatEvaluator extends Evaluator {
+
+ public static final String DEFAULT_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
+ protected Map<String, Locale> availableLocales = new HashMap<>();
+ protected Set<String> availableTimezones = new HashSet<>();
+
+ @SuppressForbidden(reason = "Usage of outdated locale parsing with Locale#toString() because of backwards compatibility")
+ public DateFormatEvaluator() {
+ for (Locale locale : Locale.getAvailableLocales()) {
+ availableLocales.put(locale.toString(), locale);
+ }
+ for (String tz : TimeZone.getAvailableIDs()) {
+ availableTimezones.add(tz);
+ }
+ }
+
+ private SimpleDateFormat getDateFormat(String pattern, TimeZone timezone, Locale locale) {
+ final SimpleDateFormat sdf = new SimpleDateFormat(pattern, locale);
+ sdf.setTimeZone(timezone);
+ return sdf;
+ }
+
+ @Override
+ public String evaluate(String expression, Context context) {
+ List<Object> l = parseParams(expression, context.getVariableResolver());
+ if (l.size() < 2 || l.size() > 4) {
+ throw new DataImportHandlerException(SEVERE, "'formatDate()' must have two, three or four parameters ");
+ }
+ Object o = l.get(0);
+ Object format = l.get(1);
+ if (format instanceof VariableWrapper) {
+ VariableWrapper wrapper = (VariableWrapper) format;
+ o = wrapper.resolve();
+ format = o.toString();
+ }
+ Locale locale = Locale.ENGLISH; // we default to ENGLISH for dates for full Java 9 compatibility
+ if(l.size()>2) {
+ Object localeObj = l.get(2);
+ String localeStr = null;
+ if (localeObj instanceof VariableWrapper) {
+ localeStr = ((VariableWrapper) localeObj).resolve().toString();
+ } else {
+ localeStr = localeObj.toString();
+ }
+ locale = availableLocales.get(localeStr);
+ if (locale == null) try {
+ locale = new Locale.Builder().setLanguageTag(localeStr).build();
+ } catch (IllformedLocaleException ex) {
+ throw new DataImportHandlerException(SEVERE, "Malformed / non-existent locale: " + localeStr, ex);
+ }
+ }
+ TimeZone tz = TimeZone.getDefault(); // DWS TODO: is this the right default for us? Deserves explanation if so.
+ if(l.size()==4) {
+ Object tzObj = l.get(3);
+ String tzStr = null;
+ if (tzObj instanceof VariableWrapper) {
+ tzStr = ((VariableWrapper) tzObj).resolve().toString();
+ } else {
+ tzStr = tzObj.toString();
+ }
+ if(availableTimezones.contains(tzStr)) {
+ tz = TimeZone.getTimeZone(tzStr);
+ } else {
+ throw new DataImportHandlerException(SEVERE, "Unsupported Timezone: " + tzStr);
+ }
+ }
+ String dateFmt = format.toString();
+ SimpleDateFormat fmt = getDateFormat(dateFmt, tz, locale);
+ Date date = null;
+ if (o instanceof VariableWrapper) {
+ date = evaluateWrapper((VariableWrapper) o, locale, tz);
+ } else {
+ date = evaluateString(o.toString(), locale, tz);
+ }
+ return fmt.format(date);
+ }
+
+ /**
+ * NOTE: declared as a method to allow for extensibility
+ *
+ * @lucene.experimental this API is experimental and subject to change
+ * @return the result of evaluating a string
+ */
+ protected Date evaluateWrapper(VariableWrapper variableWrapper, Locale locale, TimeZone tz) {
+ Date date = null;
+ Object variableval = resolveWrapper(variableWrapper,locale,tz);
+ if (variableval instanceof Date) {
+ date = (Date) variableval;
+ } else {
+ String s = variableval.toString();
+ try {
+ date = getDateFormat(DEFAULT_DATE_FORMAT, tz, locale).parse(s);
+ } catch (ParseException exp) {
+ wrapAndThrow(SEVERE, exp, "Invalid expression for date");
+ }
+ }
+ return date;
+ }
+
+ /**
+ * NOTE: declared as a method to allow for extensibility
+ * @lucene.experimental
+ * @return the result of evaluating a string
+ */
+ protected Date evaluateString(String datemathfmt, Locale locale, TimeZone tz) {
+ // note: DMP does not use the locale but perhaps a subclass might use it, for e.g. parsing a date in a custom
+ // string that doesn't necessarily have date math?
+ //TODO refactor DateMathParser.parseMath a bit to have a static method for this logic.
+ if (datemathfmt.startsWith("NOW")) {
+ datemathfmt = datemathfmt.substring("NOW".length());
+ }
+ try {
+ DateMathParser parser = new DateMathParser(tz);
+ parser.setNow(new Date());// thus do *not* use SolrRequestInfo
+ return parser.parseMath(datemathfmt);
+ } catch (ParseException e) {
+ throw wrapAndThrow(SEVERE, e, "Invalid expression for date");
+ }
+ }
+
+ /**
+ * NOTE: declared as a method to allow for extensibility
+ * @lucene.experimental
+ * @return the result of resolving the variable wrapper
+ */
+ protected Object resolveWrapper(VariableWrapper variableWrapper, Locale locale, TimeZone tz) {
+ return variableWrapper.resolve();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d7c03684/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DateFormatTransformer.java
----------------------------------------------------------------------
diff --git a/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DateFormatTransformer.java b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DateFormatTransformer.java
new file mode 100644
index 0000000..6da9cc1
--- /dev/null
+++ b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DateFormatTransformer.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.dataimport;
+
+import java.lang.invoke.MethodHandles;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.*;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p>
+ * {@link Transformer} instance which creates {@link Date} instances out of {@link String}s.
+ * </p>
+ * <p>
+ * Refer to <a
+ * href="http://wiki.apache.org/solr/DataImportHandler">http://wiki.apache.org/solr/DataImportHandler</a>
+ * for more details.
+ * <p>
+ * <b>This API is experimental and subject to change</b>
+ *
+ * @since solr 1.3
+ */
+public class DateFormatTransformer extends Transformer {
+ private Map<String, SimpleDateFormat> fmtCache = new HashMap<>();
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public Object transformRow(Map<String, Object> aRow, Context context) {
+
+ for (Map<String, String> map : context.getAllEntityFields()) {
+ Locale locale = Locale.ENGLISH; // we default to ENGLISH for dates for full Java 9 compatibility
+ String customLocale = map.get(LOCALE);
+ if (customLocale != null) {
+ try {
+ locale = new Locale.Builder().setLanguageTag(customLocale).build();
+ } catch (IllformedLocaleException e) {
+ throw new DataImportHandlerException(DataImportHandlerException.SEVERE, "Invalid Locale specified: " + customLocale, e);
+ }
+ }
+
+ String fmt = map.get(DATE_TIME_FMT);
+ if (fmt == null)
+ continue;
+ VariableResolver resolver = context.getVariableResolver();
+ fmt = resolver.replaceTokens(fmt);
+ String column = map.get(DataImporter.COLUMN);
+ String srcCol = map.get(RegexTransformer.SRC_COL_NAME);
+ if (srcCol == null)
+ srcCol = column;
+ try {
+ Object o = aRow.get(srcCol);
+ if (o instanceof List) {
+ List inputs = (List) o;
+ List<Date> results = new ArrayList<>();
+ for (Object input : inputs) {
+ results.add(process(input, fmt, locale));
+ }
+ aRow.put(column, results);
+ } else {
+ if (o != null) {
+ aRow.put(column, process(o, fmt, locale));
+ }
+ }
+ } catch (ParseException e) {
+ log.warn("Could not parse a Date field ", e);
+ }
+ }
+ return aRow;
+ }
+
+ private Date process(Object value, String format, Locale locale) throws ParseException {
+ if (value == null) return null;
+ String strVal = value.toString().trim();
+ if (strVal.length() == 0)
+ return null;
+ SimpleDateFormat fmt = fmtCache.get(format);
+ if (fmt == null) {
+ fmt = new SimpleDateFormat(format, locale);
+ fmtCache.put(format, fmt);
+ }
+ return fmt.parse(strVal);
+ }
+
+ public static final String DATE_TIME_FMT = "dateTimeFormat";
+
+ public static final String LOCALE = "locale";
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d7c03684/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DebugInfo.java
----------------------------------------------------------------------
diff --git a/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DebugInfo.java b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DebugInfo.java
new file mode 100644
index 0000000..623832f
--- /dev/null
+++ b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DebugInfo.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.dataimport;
+
+import java.util.AbstractList;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.StrUtils;
+
+public class DebugInfo {
+
+ private static final class ChildRollupDocs extends AbstractList<SolrInputDocument> {
+
+ private List<SolrInputDocument> delegate = new ArrayList<>();
+
+ @Override
+ public SolrInputDocument get(int index) {
+ return delegate.get(index);
+ }
+
+ @Override
+ public int size() {
+ return delegate.size();
+ }
+
+ public boolean add(SolrInputDocument e) {
+ SolrInputDocument transformed = e.deepCopy();
+ if (transformed.hasChildDocuments()) {
+ ChildRollupDocs childList = new ChildRollupDocs();
+ childList.addAll(transformed.getChildDocuments());
+ transformed.addField("_childDocuments_", childList);
+ transformed.getChildDocuments().clear();
+ }
+ return delegate.add(transformed);
+ }
+ }
+
+ public List<SolrInputDocument> debugDocuments = new ChildRollupDocs();
+
+ public NamedList<String> debugVerboseOutput = null;
+ public boolean verbose;
+
+ public DebugInfo(Map<String,Object> requestParams) {
+ verbose = StrUtils.parseBool((String) requestParams.get("verbose"), false);
+ debugVerboseOutput = new NamedList<>();
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d7c03684/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DebugLogger.java
----------------------------------------------------------------------
diff --git a/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DebugLogger.java b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DebugLogger.java
new file mode 100644
index 0000000..2fd9303
--- /dev/null
+++ b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DebugLogger.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.dataimport;
+import org.apache.solr.common.util.NamedList;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.text.MessageFormat;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Stack;
+
+/**
+ * <p>
+ * Implements most of the interactive development functionality
+ * </p>
+ * <p/>
+ * <p>
+ * Refer to <a
+ * href="http://wiki.apache.org/solr/DataImportHandler">http://wiki.apache.org/solr/DataImportHandler</a>
+ * for more details.
+ * </p>
+ * <p/>
+ * <b>This API is experimental and subject to change</b>
+ *
+ * @since solr 1.3
+ */
+class DebugLogger {
+ private Stack<DebugInfo> debugStack;
+
+ NamedList output;
+// private final SolrWriter writer1;
+
+ private static final String LINE = "---------------------------------------------";
+
+ private MessageFormat fmt = new MessageFormat(
+ "----------- row #{0}-------------", Locale.ROOT);
+
+ boolean enabled = true;
+
+ public DebugLogger() {
+// writer = solrWriter;
+ output = new NamedList();
+ debugStack = new Stack<DebugInfo>() {
+
+ @Override
+ public DebugInfo pop() {
+ if (size() == 1)
+ throw new DataImportHandlerException(
+ DataImportHandlerException.SEVERE, "Stack is becoming empty");
+ return super.pop();
+ }
+ };
+ debugStack.push(new DebugInfo(null, DIHLogLevels.NONE, null));
+ output = debugStack.peek().lst;
+ }
+
+ private DebugInfo peekStack() {
+ return debugStack.isEmpty() ? null : debugStack.peek();
+ }
+
+ public void log(DIHLogLevels event, String name, Object row) {
+ if (event == DIHLogLevels.DISABLE_LOGGING) {
+ enabled = false;
+ return;
+ } else if (event == DIHLogLevels.ENABLE_LOGGING) {
+ enabled = true;
+ return;
+ }
+
+ if (!enabled && event != DIHLogLevels.START_ENTITY
+ && event != DIHLogLevels.END_ENTITY) {
+ return;
+ }
+
+ if (event == DIHLogLevels.START_DOC) {
+ debugStack.push(new DebugInfo(null, DIHLogLevels.START_DOC, peekStack()));
+ } else if (DIHLogLevels.START_ENTITY == event) {
+ debugStack
+ .push(new DebugInfo(name, DIHLogLevels.START_ENTITY, peekStack()));
+ } else if (DIHLogLevels.ENTITY_OUT == event
+ || DIHLogLevels.PRE_TRANSFORMER_ROW == event) {
+ if (debugStack.peek().type == DIHLogLevels.START_ENTITY
+ || debugStack.peek().type == DIHLogLevels.START_DOC) {
+ debugStack.peek().lst.add(null, fmt.format(new Object[]{++debugStack
+ .peek().rowCount}));
+ addToNamedList(debugStack.peek().lst, row);
+ debugStack.peek().lst.add(null, LINE);
+ }
+ } else if (event == DIHLogLevels.ROW_END) {
+ popAllTransformers();
+ } else if (DIHLogLevels.END_ENTITY == event) {
+ while (debugStack.pop().type != DIHLogLevels.START_ENTITY)
+ ;
+ } else if (DIHLogLevels.END_DOC == event) {
+ while (debugStack.pop().type != DIHLogLevels.START_DOC)
+ ;
+ } else if (event == DIHLogLevels.TRANSFORMER_EXCEPTION) {
+ debugStack.push(new DebugInfo(name, event, peekStack()));
+ debugStack.peek().lst.add("EXCEPTION",
+ getStacktraceString((Exception) row));
+ } else if (DIHLogLevels.TRANSFORMED_ROW == event) {
+ debugStack.push(new DebugInfo(name, event, peekStack()));
+ debugStack.peek().lst.add(null, LINE);
+ addToNamedList(debugStack.peek().lst, row);
+ debugStack.peek().lst.add(null, LINE);
+ if (row instanceof DataImportHandlerException) {
+ DataImportHandlerException dataImportHandlerException = (DataImportHandlerException) row;
+ dataImportHandlerException.debugged = true;
+ }
+ } else if (DIHLogLevels.ENTITY_META == event) {
+ popAllTransformers();
+ debugStack.peek().lst.add(name, row);
+ } else if (DIHLogLevels.ENTITY_EXCEPTION == event) {
+ if (row instanceof DataImportHandlerException) {
+ DataImportHandlerException dihe = (DataImportHandlerException) row;
+ if (dihe.debugged)
+ return;
+ dihe.debugged = true;
+ }
+
+ popAllTransformers();
+ debugStack.peek().lst.add("EXCEPTION",
+ getStacktraceString((Exception) row));
+ }
+ }
+
+ private void popAllTransformers() {
+ while (true) {
+ DIHLogLevels type = debugStack.peek().type;
+ if (type == DIHLogLevels.START_DOC || type == DIHLogLevels.START_ENTITY)
+ break;
+ debugStack.pop();
+ }
+ }
+
+ private void addToNamedList(NamedList nl, Object row) {
+ if (row instanceof List) {
+ List list = (List) row;
+ NamedList l = new NamedList();
+ nl.add(null, l);
+ for (Object o : list) {
+ Map<String, Object> map = (Map<String, Object>) o;
+ for (Map.Entry<String, Object> entry : map.entrySet())
+ nl.add(entry.getKey(), entry.getValue());
+ }
+ } else if (row instanceof Map) {
+ Map<String, Object> map = (Map<String, Object>) row;
+ for (Map.Entry<String, Object> entry : map.entrySet())
+ nl.add(entry.getKey(), entry.getValue());
+ }
+ }
+
+ DataSource wrapDs(final DataSource ds) {
+ return new DataSource() {
+ @Override
+ public void init(Context context, Properties initProps) {
+ ds.init(context, initProps);
+ }
+
+ @Override
+ public void close() {
+ ds.close();
+ }
+
+ @Override
+ public Object getData(String query) {
+ log(DIHLogLevels.ENTITY_META, "query", query);
+ long start = System.nanoTime();
+ try {
+ return ds.getData(query);
+ } catch (DataImportHandlerException de) {
+ log(DIHLogLevels.ENTITY_EXCEPTION,
+ null, de);
+ throw de;
+ } catch (Exception e) {
+ log(DIHLogLevels.ENTITY_EXCEPTION,
+ null, e);
+ DataImportHandlerException de = new DataImportHandlerException(
+ DataImportHandlerException.SEVERE, "", e);
+ de.debugged = true;
+ throw de;
+ } finally {
+ log(DIHLogLevels.ENTITY_META, "time-taken", DocBuilder
+ .getTimeElapsedSince(start));
+ }
+ }
+ };
+ }
+
+ Transformer wrapTransformer(final Transformer t) {
+ return new Transformer() {
+ @Override
+ public Object transformRow(Map<String, Object> row, Context context) {
+ log(DIHLogLevels.PRE_TRANSFORMER_ROW, null, row);
+ String tName = getTransformerName(t);
+ Object result = null;
+ try {
+ result = t.transformRow(row, context);
+ log(DIHLogLevels.TRANSFORMED_ROW, tName, result);
+ } catch (DataImportHandlerException de) {
+ log(DIHLogLevels.TRANSFORMER_EXCEPTION, tName, de);
+ de.debugged = true;
+ throw de;
+ } catch (Exception e) {
+ log(DIHLogLevels.TRANSFORMER_EXCEPTION, tName, e);
+ DataImportHandlerException de = new DataImportHandlerException(DataImportHandlerException.SEVERE, "", e);
+ de.debugged = true;
+ throw de;
+ }
+ return result;
+ }
+ };
+ }
+
+ public static String getStacktraceString(Exception e) {
+ StringWriter sw = new StringWriter();
+ e.printStackTrace(new PrintWriter(sw));
+ return sw.toString();
+ }
+
+ static String getTransformerName(Transformer t) {
+ Class transClass = t.getClass();
+ if (t instanceof EntityProcessorWrapper.ReflectionTransformer) {
+ return ((EntityProcessorWrapper.ReflectionTransformer) t).trans;
+ }
+ if (t instanceof ScriptTransformer) {
+ ScriptTransformer scriptTransformer = (ScriptTransformer) t;
+ return "script:" + scriptTransformer.getFunctionName();
+ }
+ if (transClass.getPackage().equals(DebugLogger.class.getPackage())) {
+ return transClass.getSimpleName();
+ } else {
+ return transClass.getName();
+ }
+ }
+
+ private static class DebugInfo {
+ String name;
+
+ int tCount, rowCount;
+
+ NamedList lst;
+
+ DIHLogLevels type;
+
+ DebugInfo parent;
+
+ public DebugInfo(String name, DIHLogLevels type, DebugInfo parent) {
+ this.name = name;
+ this.type = type;
+ this.parent = parent;
+ lst = new NamedList();
+ if (parent != null) {
+ String displayName = null;
+ if (type == DIHLogLevels.START_ENTITY) {
+ displayName = "entity:" + name;
+ } else if (type == DIHLogLevels.TRANSFORMED_ROW
+ || type == DIHLogLevels.TRANSFORMER_EXCEPTION) {
+ displayName = "transformer:" + name;
+ } else if (type == DIHLogLevels.START_DOC) {
+ this.name = displayName = "document#" + SolrWriter.getDocCount();
+ }
+ parent.lst.add(displayName, lst);
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d7c03684/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DocBuilder.java
----------------------------------------------------------------------
diff --git a/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DocBuilder.java b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DocBuilder.java
new file mode 100644
index 0000000..164cf70
--- /dev/null
+++ b/solr/contrib/dataimporthandler/src/main/java/org/apache/solr/handler/dataimport/DocBuilder.java
@@ -0,0 +1,988 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.dataimport;
+
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.handler.dataimport.config.ConfigNameConstants;
+import org.apache.solr.handler.dataimport.config.DIHConfiguration;
+import org.apache.solr.handler.dataimport.config.Entity;
+import org.apache.solr.handler.dataimport.config.EntityField;
+
+import static org.apache.solr.handler.dataimport.SolrWriter.LAST_INDEX_KEY;
+import static org.apache.solr.handler.dataimport.DataImportHandlerException.SEVERE;
+import static org.apache.solr.handler.dataimport.DataImportHandlerException.wrapAndThrow;
+
+import org.apache.solr.schema.IndexSchema;
+import org.apache.solr.schema.SchemaField;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.text.SimpleDateFormat;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * <p> {@link DocBuilder} is responsible for creating Solr documents out of the given configuration. It also maintains
+ * statistics information. It depends on the {@link EntityProcessor} implementations to fetch data. </p>
+ * <p>
+ * <b>This API is experimental and subject to change</b>
+ *
+ * @since solr 1.3
+ */
+public class DocBuilder {
+
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private static final AtomicBoolean WARNED_ABOUT_INDEX_TIME_BOOSTS = new AtomicBoolean();
+
+ private static final Date EPOCH = new Date(0);
+ public static final String DELETE_DOC_BY_ID = "$deleteDocById";
+ public static final String DELETE_DOC_BY_QUERY = "$deleteDocByQuery";
+ public static final String DOC_BOOST = "$docBoost";
+ public static final String SKIP_DOC = "$skipDoc";
+ public static final String SKIP_ROW = "$skipRow";
+
+ DataImporter dataImporter;
+
+ private DIHConfiguration config;
+
+ private EntityProcessorWrapper currentEntityProcessorWrapper;
+
+ @SuppressWarnings("unchecked")
+ private Map statusMessages = Collections.synchronizedMap(new LinkedHashMap());
+
+ public Statistics importStatistics = new Statistics();
+
+ DIHWriter writer;
+
+ boolean verboseDebug = false;
+
+ Map<String, Object> session = new HashMap<>();
+
+ static final ThreadLocal<DocBuilder> INSTANCE = new ThreadLocal<>();
+ private Map<String, Object> persistedProperties;
+
+ private DIHProperties propWriter;
+ private DebugLogger debugLogger;
+ private final RequestInfo reqParams;
+
+ public DocBuilder(DataImporter dataImporter, DIHWriter solrWriter, DIHProperties propWriter, RequestInfo reqParams) {
+ INSTANCE.set(this);
+ this.dataImporter = dataImporter;
+ this.reqParams = reqParams;
+ this.propWriter = propWriter;
+ DataImporter.QUERY_COUNT.set(importStatistics.queryCount);
+ verboseDebug = reqParams.isDebug() && reqParams.getDebugInfo().verbose;
+ persistedProperties = propWriter.readIndexerProperties();
+
+ writer = solrWriter;
+ ContextImpl ctx = new ContextImpl(null, null, null, null, reqParams.getRawParams(), null, this);
+ if (writer != null) {
+ writer.init(ctx);
+ }
+ }
+
+
+ DebugLogger getDebugLogger(){
+ if (debugLogger == null) {
+ debugLogger = new DebugLogger();
+ }
+ return debugLogger;
+ }
+
+ private VariableResolver getVariableResolver() {
+ try {
+ VariableResolver resolver = null;
+ String epoch = propWriter.convertDateToString(EPOCH);
+ if(dataImporter != null && dataImporter.getCore() != null
+ && dataImporter.getCore().getResourceLoader().getCoreProperties() != null){
+ resolver = new VariableResolver(dataImporter.getCore().getResourceLoader().getCoreProperties());
+ } else {
+ resolver = new VariableResolver();
+ }
+ resolver.setEvaluators(dataImporter.getEvaluators());
+ Map<String, Object> indexerNamespace = new HashMap<>();
+ if (persistedProperties.get(LAST_INDEX_TIME) != null) {
+ indexerNamespace.put(LAST_INDEX_TIME, persistedProperties.get(LAST_INDEX_TIME));
+ } else {
+ // set epoch
+ indexerNamespace.put(LAST_INDEX_TIME, epoch);
+ }
+ indexerNamespace.put(INDEX_START_TIME, dataImporter.getIndexStartTime());
+ indexerNamespace.put("request", new HashMap<>(reqParams.getRawParams()));
+ indexerNamespace.put("handlerName", dataImporter.getHandlerName());
+ for (Entity entity : dataImporter.getConfig().getEntities()) {
+ Map<String, Object> entityNamespace = new HashMap<>();
+ String key = SolrWriter.LAST_INDEX_KEY;
+ Object lastIndex = persistedProperties.get(entity.getName() + "." + key);
+ if (lastIndex != null) {
+ entityNamespace.put(SolrWriter.LAST_INDEX_KEY, lastIndex);
+ } else {
+ entityNamespace.put(SolrWriter.LAST_INDEX_KEY, epoch);
+ }
+ indexerNamespace.put(entity.getName(), entityNamespace);
+ }
+ resolver.addNamespace(ConfigNameConstants.IMPORTER_NS_SHORT, indexerNamespace);
+ resolver.addNamespace(ConfigNameConstants.IMPORTER_NS, indexerNamespace);
+ return resolver;
+ } catch (Exception e) {
+ wrapAndThrow(SEVERE, e);
+ // unreachable statement
+ return null;
+ }
+ }
+
+ private void invokeEventListener(String className) {
+ invokeEventListener(className, null);
+ }
+
+
+ private void invokeEventListener(String className, Exception lastException) {
+ try {
+ EventListener listener = (EventListener) loadClass(className, dataImporter.getCore()).newInstance();
+ notifyListener(listener, lastException);
+ } catch (Exception e) {
+ wrapAndThrow(SEVERE, e, "Unable to load class : " + className);
+ }
+ }
+
+ private void notifyListener(EventListener listener, Exception lastException) {
+ String currentProcess;
+ if (dataImporter.getStatus() == DataImporter.Status.RUNNING_DELTA_DUMP) {
+ currentProcess = Context.DELTA_DUMP;
+ } else {
+ currentProcess = Context.FULL_DUMP;
+ }
+ ContextImpl ctx = new ContextImpl(null, getVariableResolver(), null, currentProcess, session, null, this);
+ ctx.setLastException(lastException);
+ listener.onEvent(ctx);
+ }
+
+ @SuppressWarnings("unchecked")
+ public void execute() {
+ List<EntityProcessorWrapper> epwList = null;
+ try {
+ dataImporter.store(DataImporter.STATUS_MSGS, statusMessages);
+ config = dataImporter.getConfig();
+ final AtomicLong startTime = new AtomicLong(System.nanoTime());
+ statusMessages.put(TIME_ELAPSED, new Object() {
+ @Override
+ public String toString() {
+ return getTimeElapsedSince(startTime.get());
+ }
+ });
+
+ statusMessages.put(DataImporter.MSG.TOTAL_QUERIES_EXECUTED,
+ importStatistics.queryCount);
+ statusMessages.put(DataImporter.MSG.TOTAL_ROWS_EXECUTED,
+ importStatistics.rowsCount);
+ statusMessages.put(DataImporter.MSG.TOTAL_DOC_PROCESSED,
+ importStatistics.docCount);
+ statusMessages.put(DataImporter.MSG.TOTAL_DOCS_SKIPPED,
+ importStatistics.skipDocCount);
+
+ List<String> entities = reqParams.getEntitiesToRun();
+
+ // Trigger onImportStart
+ if (config.getOnImportStart() != null) {
+ invokeEventListener(config.getOnImportStart());
+ }
+ AtomicBoolean fullCleanDone = new AtomicBoolean(false);
+ //we must not do a delete of *:* multiple times if there are multiple root entities to be run
+ Map<String,Object> lastIndexTimeProps = new HashMap<>();
+ lastIndexTimeProps.put(LAST_INDEX_KEY, dataImporter.getIndexStartTime());
+
+ epwList = new ArrayList<>(config.getEntities().size());
+ for (Entity e : config.getEntities()) {
+ epwList.add(getEntityProcessorWrapper(e));
+ }
+ for (EntityProcessorWrapper epw : epwList) {
+ if (entities != null && !entities.contains(epw.getEntity().getName()))
+ continue;
+ lastIndexTimeProps.put(epw.getEntity().getName() + "." + LAST_INDEX_KEY, propWriter.getCurrentTimestamp());
+ currentEntityProcessorWrapper = epw;
+ String delQuery = epw.getEntity().getAllAttributes().get("preImportDeleteQuery");
+ if (dataImporter.getStatus() == DataImporter.Status.RUNNING_DELTA_DUMP) {
+ cleanByQuery(delQuery, fullCleanDone);
+ doDelta();
+ delQuery = epw.getEntity().getAllAttributes().get("postImportDeleteQuery");
+ if (delQuery != null) {
+ fullCleanDone.set(false);
+ cleanByQuery(delQuery, fullCleanDone);
+ }
+ } else {
+ cleanByQuery(delQuery, fullCleanDone);
+ doFullDump();
+ delQuery = epw.getEntity().getAllAttributes().get("postImportDeleteQuery");
+ if (delQuery != null) {
+ fullCleanDone.set(false);
+ cleanByQuery(delQuery, fullCleanDone);
+ }
+ }
+ }
+
+ if (stop.get()) {
+ // Dont commit if aborted using command=abort
+ statusMessages.put("Aborted", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.ROOT).format(new Date()));
+ handleError("Aborted", null);
+ } else {
+ // Do not commit unnecessarily if this is a delta-import and no documents were created or deleted
+ if (!reqParams.isClean()) {
+ if (importStatistics.docCount.get() > 0 || importStatistics.deletedDocCount.get() > 0) {
+ finish(lastIndexTimeProps);
+ }
+ } else {
+ // Finished operation normally, commit now
+ finish(lastIndexTimeProps);
+ }
+
+ if (config.getOnImportEnd() != null) {
+ invokeEventListener(config.getOnImportEnd());
+ }
+ }
+
+ statusMessages.remove(TIME_ELAPSED);
+ statusMessages.put(DataImporter.MSG.TOTAL_DOC_PROCESSED, ""+ importStatistics.docCount.get());
+ if(importStatistics.failedDocCount.get() > 0)
+ statusMessages.put(DataImporter.MSG.TOTAL_FAILED_DOCS, ""+ importStatistics.failedDocCount.get());
+
+ statusMessages.put("Time taken", getTimeElapsedSince(startTime.get()));
+ log.info("Time taken = " + getTimeElapsedSince(startTime.get()));
+ } catch(Exception e)
+ {
+ throw new RuntimeException(e);
+ } finally
+ {
+ if (writer != null) {
+ writer.close();
+ }
+ if (epwList != null) {
+ closeEntityProcessorWrappers(epwList);
+ }
+ if(reqParams.isDebug()) {
+ reqParams.getDebugInfo().debugVerboseOutput = getDebugLogger().output;
+ }
+ }
+ }
+ private void closeEntityProcessorWrappers(List<EntityProcessorWrapper> epwList) {
+ for(EntityProcessorWrapper epw : epwList) {
+ epw.close();
+ if(epw.getDatasource()!=null) {
+ epw.getDatasource().close();
+ }
+ closeEntityProcessorWrappers(epw.getChildren());
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private void finish(Map<String,Object> lastIndexTimeProps) {
+ log.info("Import completed successfully");
+ statusMessages.put("", "Indexing completed. Added/Updated: "
+ + importStatistics.docCount + " documents. Deleted "
+ + importStatistics.deletedDocCount + " documents.");
+ if(reqParams.isCommit()) {
+ writer.commit(reqParams.isOptimize());
+ addStatusMessage("Committed");
+ if (reqParams.isOptimize())
+ addStatusMessage("Optimized");
+ }
+ try {
+ propWriter.persist(lastIndexTimeProps);
+ } catch (Exception e) {
+ log.error("Could not write property file", e);
+ statusMessages.put("error", "Could not write property file. Delta imports will not work. " +
+ "Make sure your conf directory is writable");
+ }
+ }
+
+ void handleError(String message, Exception e) {
+ if (!dataImporter.getCore().getCoreContainer().isZooKeeperAware()) {
+ writer.rollback();
+ }
+
+ statusMessages.put(message, "Indexing error");
+ addStatusMessage(message);
+ if ((config != null) && (config.getOnError() != null)) {
+ invokeEventListener(config.getOnError(), e);
+ }
+ }
+
+ private void doFullDump() {
+ addStatusMessage("Full Dump Started");
+ buildDocument(getVariableResolver(), null, null, currentEntityProcessorWrapper, true, null);
+ }
+
+ @SuppressWarnings("unchecked")
+ private void doDelta() {
+ addStatusMessage("Delta Dump started");
+ VariableResolver resolver = getVariableResolver();
+
+ if (config.getDeleteQuery() != null) {
+ writer.deleteByQuery(config.getDeleteQuery());
+ }
+
+ addStatusMessage("Identifying Delta");
+ log.info("Starting delta collection.");
+ Set<Map<String, Object>> deletedKeys = new HashSet<>();
+ Set<Map<String, Object>> allPks = collectDelta(currentEntityProcessorWrapper, resolver, deletedKeys);
+ if (stop.get())
+ return;
+ addStatusMessage("Deltas Obtained");
+ addStatusMessage("Building documents");
+ if (!deletedKeys.isEmpty()) {
+ allPks.removeAll(deletedKeys);
+ deleteAll(deletedKeys);
+ // Make sure that documents are not re-created
+ }
+ deletedKeys = null;
+ writer.setDeltaKeys(allPks);
+
+ statusMessages.put("Total Changed Documents", allPks.size());
+ VariableResolver vri = getVariableResolver();
+ Iterator<Map<String, Object>> pkIter = allPks.iterator();
+ while (pkIter.hasNext()) {
+ Map<String, Object> map = pkIter.next();
+ vri.addNamespace(ConfigNameConstants.IMPORTER_NS_SHORT + ".delta", map);
+ buildDocument(vri, null, map, currentEntityProcessorWrapper, true, null);
+ pkIter.remove();
+ // check for abort
+ if (stop.get())
+ break;
+ }
+
+ if (!stop.get()) {
+ log.info("Delta Import completed successfully");
+ }
+ }
+
+ private void deleteAll(Set<Map<String, Object>> deletedKeys) {
+ log.info("Deleting stale documents ");
+ Iterator<Map<String, Object>> iter = deletedKeys.iterator();
+ while (iter.hasNext()) {
+ Map<String, Object> map = iter.next();
+ String keyName = currentEntityProcessorWrapper.getEntity().isDocRoot() ? currentEntityProcessorWrapper.getEntity().getPk() : currentEntityProcessorWrapper.getEntity().getSchemaPk();
+ Object key = map.get(keyName);
+ if(key == null) {
+ keyName = findMatchingPkColumn(keyName, map);
+ key = map.get(keyName);
+ }
+ if(key == null) {
+ log.warn("no key was available for deleted pk query. keyName = " + keyName);
+ continue;
+ }
+ writer.deleteDoc(key);
+ importStatistics.deletedDocCount.incrementAndGet();
+ iter.remove();
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public void addStatusMessage(String msg) {
+ statusMessages.put(msg, new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.ROOT).format(new Date()));
+ }
+
+ private void resetEntity(EntityProcessorWrapper epw) {
+ epw.setInitialized(false);
+ for (EntityProcessorWrapper child : epw.getChildren()) {
+ resetEntity(child);
+ }
+
+ }
+
+ private void buildDocument(VariableResolver vr, DocWrapper doc,
+ Map<String,Object> pk, EntityProcessorWrapper epw, boolean isRoot,
+ ContextImpl parentCtx) {
+ List<EntityProcessorWrapper> entitiesToDestroy = new ArrayList<>();
+ try {
+ buildDocument(vr, doc, pk, epw, isRoot, parentCtx, entitiesToDestroy);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+ for (EntityProcessorWrapper entityWrapper : entitiesToDestroy) {
+ entityWrapper.destroy();
+ }
+ resetEntity(epw);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private void buildDocument(VariableResolver vr, DocWrapper doc,
+ Map<String, Object> pk, EntityProcessorWrapper epw, boolean isRoot,
+ ContextImpl parentCtx, List<EntityProcessorWrapper> entitiesToDestroy) {
+
+ ContextImpl ctx = new ContextImpl(epw, vr, null,
+ pk == null ? Context.FULL_DUMP : Context.DELTA_DUMP,
+ session, parentCtx, this);
+ epw.init(ctx);
+ if (!epw.isInitialized()) {
+ entitiesToDestroy.add(epw);
+ epw.setInitialized(true);
+ }
+
+ if (reqParams.getStart() > 0) {
+ getDebugLogger().log(DIHLogLevels.DISABLE_LOGGING, null, null);
+ }
+
+ if (verboseDebug) {
+ getDebugLogger().log(DIHLogLevels.START_ENTITY, epw.getEntity().getName(), null);
+ }
+
+ int seenDocCount = 0;
+
+ try {
+ while (true) {
+ if (stop.get())
+ return;
+ if(importStatistics.docCount.get() > (reqParams.getStart() + reqParams.getRows())) break;
+ try {
+ seenDocCount++;
+
+ if (seenDocCount > reqParams.getStart()) {
+ getDebugLogger().log(DIHLogLevels.ENABLE_LOGGING, null, null);
+ }
+
+ if (verboseDebug && epw.getEntity().isDocRoot()) {
+ getDebugLogger().log(DIHLogLevels.START_DOC, epw.getEntity().getName(), null);
+ }
+ if (doc == null && epw.getEntity().isDocRoot()) {
+ doc = new DocWrapper();
+ ctx.setDoc(doc);
+ Entity e = epw.getEntity();
+ while (e.getParentEntity() != null) {
+ addFields(e.getParentEntity(), doc, (Map<String, Object>) vr
+ .resolve(e.getParentEntity().getName()), vr);
+ e = e.getParentEntity();
+ }
+ }
+
+ Map<String, Object> arow = epw.nextRow();
+ if (arow == null) {
+ break;
+ }
+
+ // Support for start parameter in debug mode
+ if (epw.getEntity().isDocRoot()) {
+ if (seenDocCount <= reqParams.getStart())
+ continue;
+ if (seenDocCount > reqParams.getStart() + reqParams.getRows()) {
+ log.info("Indexing stopped at docCount = " + importStatistics.docCount);
+ break;
+ }
+ }
+
+ if (verboseDebug) {
+ getDebugLogger().log(DIHLogLevels.ENTITY_OUT, epw.getEntity().getName(), arow);
+ }
+ importStatistics.rowsCount.incrementAndGet();
+
+ DocWrapper childDoc = null;
+ if (doc != null) {
+ if (epw.getEntity().isChild()) {
+ childDoc = new DocWrapper();
+ handleSpecialCommands(arow, childDoc);
+ addFields(epw.getEntity(), childDoc, arow, vr);
+ doc.addChildDocument(childDoc);
+ } else {
+ handleSpecialCommands(arow, doc);
+ vr.addNamespace(epw.getEntity().getName(), arow);
+ addFields(epw.getEntity(), doc, arow, vr);
+ vr.removeNamespace(epw.getEntity().getName());
+ }
+ }
+ if (epw.getEntity().getChildren() != null) {
+ vr.addNamespace(epw.getEntity().getName(), arow);
+ for (EntityProcessorWrapper child : epw.getChildren()) {
+ if (childDoc != null) {
+ buildDocument(vr, childDoc,
+ child.getEntity().isDocRoot() ? pk : null, child, false, ctx, entitiesToDestroy);
+ } else {
+ buildDocument(vr, doc,
+ child.getEntity().isDocRoot() ? pk : null, child, false, ctx, entitiesToDestroy);
+ }
+ }
+ vr.removeNamespace(epw.getEntity().getName());
+ }
+ if (epw.getEntity().isDocRoot()) {
+ if (stop.get())
+ return;
+ if (!doc.isEmpty()) {
+ boolean result = writer.upload(doc);
+ if(reqParams.isDebug()) {
+ reqParams.getDebugInfo().debugDocuments.add(doc);
+ }
+ doc = null;
+ if (result){
+ importStatistics.docCount.incrementAndGet();
+ } else {
+ importStatistics.failedDocCount.incrementAndGet();
+ }
+ }
+ }
+ } catch (DataImportHandlerException e) {
+ if (verboseDebug) {
+ getDebugLogger().log(DIHLogLevels.ENTITY_EXCEPTION, epw.getEntity().getName(), e);
+ }
+ if(e.getErrCode() == DataImportHandlerException.SKIP_ROW){
+ continue;
+ }
+ if (isRoot) {
+ if (e.getErrCode() == DataImportHandlerException.SKIP) {
+ importStatistics.skipDocCount.getAndIncrement();
+ doc = null;
+ } else {
+ SolrException.log(log, "Exception while processing: "
+ + epw.getEntity().getName() + " document : " + doc, e);
+ }
+ if (e.getErrCode() == DataImportHandlerException.SEVERE)
+ throw e;
+ } else
+ throw e;
+ } catch (Exception t) {
+ if (verboseDebug) {
+ getDebugLogger().log(DIHLogLevels.ENTITY_EXCEPTION, epw.getEntity().getName(), t);
+ }
+ throw new DataImportHandlerException(DataImportHandlerException.SEVERE, t);
+ } finally {
+ if (verboseDebug) {
+ getDebugLogger().log(DIHLogLevels.ROW_END, epw.getEntity().getName(), null);
+ if (epw.getEntity().isDocRoot())
+ getDebugLogger().log(DIHLogLevels.END_DOC, null, null);
+ }
+ }
+ }
+ } finally {
+ if (verboseDebug) {
+ getDebugLogger().log(DIHLogLevels.END_ENTITY, null, null);
+ }
+ }
+ }
+
+ static class DocWrapper extends SolrInputDocument {
+ //final SolrInputDocument solrDocument = new SolrInputDocument();
+ Map<String ,Object> session;
+
+ public void setSessionAttribute(String key, Object val){
+ if(session == null) session = new HashMap<>();
+ session.put(key, val);
+ }
+
+ public Object getSessionAttribute(String key) {
+ return session == null ? null : session.get(key);
+ }
+ }
+
+ private void handleSpecialCommands(Map<String, Object> arow, DocWrapper doc) {
+ Object value = arow.get(DELETE_DOC_BY_ID);
+ if (value != null) {
+ if (value instanceof Collection) {
+ Collection collection = (Collection) value;
+ for (Object o : collection) {
+ writer.deleteDoc(o.toString());
+ importStatistics.deletedDocCount.incrementAndGet();
+ }
+ } else {
+ writer.deleteDoc(value);
+ importStatistics.deletedDocCount.incrementAndGet();
+ }
+ }
+ value = arow.get(DELETE_DOC_BY_QUERY);
+ if (value != null) {
+ if (value instanceof Collection) {
+ Collection collection = (Collection) value;
+ for (Object o : collection) {
+ writer.deleteByQuery(o.toString());
+ importStatistics.deletedDocCount.incrementAndGet();
+ }
+ } else {
+ writer.deleteByQuery(value.toString());
+ importStatistics.deletedDocCount.incrementAndGet();
+ }
+ }
+ value = arow.get(DOC_BOOST);
+ if (value != null) {
+ String message = "Ignoring document boost: " + value + " as index-time boosts are not supported anymore";
+ if (WARNED_ABOUT_INDEX_TIME_BOOSTS.compareAndSet(false, true)) {
+ log.warn(message);
+ } else {
+ log.debug(message);
+ }
+ }
+
+ value = arow.get(SKIP_DOC);
+ if (value != null) {
+ if (Boolean.parseBoolean(value.toString())) {
+ throw new DataImportHandlerException(DataImportHandlerException.SKIP,
+ "Document skipped :" + arow);
+ }
+ }
+
+ value = arow.get(SKIP_ROW);
+ if (value != null) {
+ if (Boolean.parseBoolean(value.toString())) {
+ throw new DataImportHandlerException(DataImportHandlerException.SKIP_ROW);
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private void addFields(Entity entity, DocWrapper doc,
+ Map<String, Object> arow, VariableResolver vr) {
+ for (Map.Entry<String, Object> entry : arow.entrySet()) {
+ String key = entry.getKey();
+ Object value = entry.getValue();
+ if (value == null) continue;
+ if (key.startsWith("$")) continue;
+ Set<EntityField> field = entity.getColNameVsField().get(key);
+ IndexSchema schema = null == reqParams.getRequest() ? null : reqParams.getRequest().getSchema();
+ if (field == null && schema != null) {
+ // This can be a dynamic field or a field which does not have an entry in data-config ( an implicit field)
+ SchemaField sf = schema.getFieldOrNull(key);
+ if (sf == null) {
+ sf = config.getSchemaField(key);
+ }
+ if (sf != null) {
+ addFieldToDoc(entry.getValue(), sf.getName(), sf.multiValued(), doc);
+ }
+ //else do nothing. if we add it it may fail
+ } else {
+ if (field != null) {
+ for (EntityField f : field) {
+ String name = f.getName();
+ boolean multiValued = f.isMultiValued();
+ boolean toWrite = f.isToWrite();
+ if(f.isDynamicName()){
+ name = vr.replaceTokens(name);
+ SchemaField schemaField = config.getSchemaField(name);
+ if(schemaField == null) {
+ toWrite = false;
+ } else {
+ multiValued = schemaField.multiValued();
+ toWrite = true;
+ }
+ }
+ if (toWrite) {
+ addFieldToDoc(entry.getValue(), name, multiValued, doc);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ private void addFieldToDoc(Object value, String name, boolean multiValued, DocWrapper doc) {
+ if (value instanceof Collection) {
+ Collection collection = (Collection) value;
+ if (multiValued) {
+ for (Object o : collection) {
+ if (o != null)
+ doc.addField(name, o);
+ }
+ } else {
+ if (doc.getField(name) == null)
+ for (Object o : collection) {
+ if (o != null) {
+ doc.addField(name, o);
+ break;
+ }
+ }
+ }
+ } else if (multiValued) {
+ if (value != null) {
+ doc.addField(name, value);
+ }
+ } else {
+ if (doc.getField(name) == null && value != null)
+ doc.addField(name, value);
+ }
+ }
+
+ public EntityProcessorWrapper getEntityProcessorWrapper(Entity entity) {
+ EntityProcessor entityProcessor = null;
+ if (entity.getProcessorName() == null) {
+ entityProcessor = new SqlEntityProcessor();
+ } else {
+ try {
+ entityProcessor = (EntityProcessor) loadClass(entity.getProcessorName(), dataImporter.getCore())
+ .newInstance();
+ } catch (Exception e) {
+ wrapAndThrow (SEVERE,e,
+ "Unable to load EntityProcessor implementation for entity:" + entity.getName());
+ }
+ }
+ EntityProcessorWrapper epw = new EntityProcessorWrapper(entityProcessor, entity, this);
+ for(Entity e1 : entity.getChildren()) {
+ epw.getChildren().add(getEntityProcessorWrapper(e1));
+ }
+
+ return epw;
+ }
+
+ private String findMatchingPkColumn(String pk, Map<String, Object> row) {
+ if (row.containsKey(pk)) {
+ throw new IllegalArgumentException(String.format(Locale.ROOT,
+ "deltaQuery returned a row with null for primary key %s", pk));
+ }
+ String resolvedPk = null;
+ for (String columnName : row.keySet()) {
+ if (columnName.endsWith("." + pk) || pk.endsWith("." + columnName)) {
+ if (resolvedPk != null)
+ throw new IllegalArgumentException(
+ String.format(Locale.ROOT,
+ "deltaQuery has more than one column (%s and %s) that might resolve to declared primary key pk='%s'",
+ resolvedPk, columnName, pk));
+ resolvedPk = columnName;
+ }
+ }
+ if (resolvedPk == null) {
+ throw new IllegalArgumentException(
+ String
+ .format(
+ Locale.ROOT,
+ "deltaQuery has no column to resolve to declared primary key pk='%s'",
+ pk));
+ }
+ log.info(String.format(Locale.ROOT,
+ "Resolving deltaQuery column '%s' to match entity's declared pk '%s'",
+ resolvedPk, pk));
+ return resolvedPk;
+ }
+
+ /**
+ * <p> Collects unique keys of all Solr documents for whom one or more source tables have been changed since the last
+ * indexed time. </p> <p> Note: In our definition, unique key of Solr document is the primary key of the top level
+ * entity (unless skipped using docRoot=false) in the Solr document in data-config.xml </p>
+ *
+ * @return an iterator to the list of keys for which Solr documents should be updated.
+ */
+ @SuppressWarnings("unchecked")
+ public Set<Map<String, Object>> collectDelta(EntityProcessorWrapper epw, VariableResolver resolver,
+ Set<Map<String, Object>> deletedRows) {
+ //someone called abort
+ if (stop.get())
+ return new HashSet();
+
+ ContextImpl context1 = new ContextImpl(epw, resolver, null, Context.FIND_DELTA, session, null, this);
+ epw.init(context1);
+
+ Set<Map<String, Object>> myModifiedPks = new HashSet<>();
+
+
+
+ for (EntityProcessorWrapper childEpw : epw.getChildren()) {
+ //this ensures that we start from the leaf nodes
+ myModifiedPks.addAll(collectDelta(childEpw, resolver, deletedRows));
+ //someone called abort
+ if (stop.get())
+ return new HashSet();
+ }
+
+ // identifying the modified rows for this entity
+ Map<String, Map<String, Object>> deltaSet = new HashMap<>();
+ log.info("Running ModifiedRowKey() for Entity: " + epw.getEntity().getName());
+ //get the modified rows in this entity
+ String pk = epw.getEntity().getPk();
+ while (true) {
+ Map<String, Object> row = epw.nextModifiedRowKey();
+
+ if (row == null)
+ break;
+
+ Object pkValue = row.get(pk);
+ if (pkValue == null) {
+ pk = findMatchingPkColumn(pk, row);
+ pkValue = row.get(pk);
+ }
+
+ deltaSet.put(pkValue.toString(), row);
+ importStatistics.rowsCount.incrementAndGet();
+ // check for abort
+ if (stop.get())
+ return new HashSet();
+ }
+ //get the deleted rows for this entity
+ Set<Map<String, Object>> deletedSet = new HashSet<>();
+ while (true) {
+ Map<String, Object> row = epw.nextDeletedRowKey();
+ if (row == null)
+ break;
+
+ deletedSet.add(row);
+
+ Object pkValue = row.get(pk);
+ if (pkValue == null) {
+ pk = findMatchingPkColumn(pk, row);
+ pkValue = row.get(pk);
+ }
+
+ // Remove deleted rows from the delta rows
+ String deletedRowPk = pkValue.toString();
+ if (deltaSet.containsKey(deletedRowPk)) {
+ deltaSet.remove(deletedRowPk);
+ }
+
+ importStatistics.rowsCount.incrementAndGet();
+ // check for abort
+ if (stop.get())
+ return new HashSet();
+ }
+
+ log.info("Completed ModifiedRowKey for Entity: " + epw.getEntity().getName() + " rows obtained : " + deltaSet.size());
+ log.info("Completed DeletedRowKey for Entity: " + epw.getEntity().getName() + " rows obtained : " + deletedSet.size());
+
+ myModifiedPks.addAll(deltaSet.values());
+ Set<Map<String, Object>> parentKeyList = new HashSet<>();
+ //all that we have captured is useless (in a sub-entity) if no rows in the parent is modified because of these
+ //propogate up the changes in the chain
+ if (epw.getEntity().getParentEntity() != null) {
+ // identifying deleted rows with deltas
+
+ for (Map<String, Object> row : myModifiedPks) {
+ resolver.addNamespace(epw.getEntity().getName(), row);
+ getModifiedParentRows(resolver, epw.getEntity().getName(), epw, parentKeyList);
+ // check for abort
+ if (stop.get())
+ return new HashSet();
+ }
+ // running the same for deletedrows
+ for (Map<String, Object> row : deletedSet) {
+ resolver.addNamespace(epw.getEntity().getName(), row);
+ getModifiedParentRows(resolver, epw.getEntity().getName(), epw, parentKeyList);
+ // check for abort
+ if (stop.get())
+ return new HashSet();
+ }
+ }
+ log.info("Completed parentDeltaQuery for Entity: " + epw.getEntity().getName());
+ if (epw.getEntity().isDocRoot())
+ deletedRows.addAll(deletedSet);
+
+ // Do not use entity.isDocRoot here because one of descendant entities may set rootEntity="true"
+ return epw.getEntity().getParentEntity() == null ?
+ myModifiedPks : new HashSet<>(parentKeyList);
+ }
+
+ private void getModifiedParentRows(VariableResolver resolver,
+ String entity, EntityProcessor entityProcessor,
+ Set<Map<String, Object>> parentKeyList) {
+ try {
+ while (true) {
+ Map<String, Object> parentRow = entityProcessor
+ .nextModifiedParentRowKey();
+ if (parentRow == null)
+ break;
+
+ parentKeyList.add(parentRow);
+ importStatistics.rowsCount.incrementAndGet();
+ // check for abort
+ if (stop.get())
+ return;
+ }
+
+ } finally {
+ resolver.removeNamespace(entity);
+ }
+ }
+
+ public void abort() {
+ stop.set(true);
+ }
+
+ private AtomicBoolean stop = new AtomicBoolean(false);
+
+ public static final String TIME_ELAPSED = "Time Elapsed";
+
+ static String getTimeElapsedSince(long l) {
+ l = TimeUnit.MILLISECONDS.convert(System.nanoTime() - l, TimeUnit.NANOSECONDS);
+ return (l / (60000 * 60)) + ":" + (l / 60000) % 60 + ":" + (l / 1000)
+ % 60 + "." + l % 1000;
+ }
+
+ public RequestInfo getReqParams() {
+ return reqParams;
+ }
+
+ @SuppressWarnings("unchecked")
+ static Class loadClass(String name, SolrCore core) throws ClassNotFoundException {
+ try {
+ return core != null ?
+ core.getResourceLoader().findClass(name, Object.class) :
+ Class.forName(name);
+ } catch (Exception e) {
+ try {
+ String n = DocBuilder.class.getPackage().getName() + "." + name;
+ return core != null ?
+ core.getResourceLoader().findClass(n, Object.class) :
+ Class.forName(n);
+ } catch (Exception e1) {
+ throw new ClassNotFoundException("Unable to load " + name + " or " + DocBuilder.class.getPackage().getName() + "." + name, e);
+ }
+ }
+ }
+
+ public static class Statistics {
+ public AtomicLong docCount = new AtomicLong();
+
+ public AtomicLong deletedDocCount = new AtomicLong();
+
+ public AtomicLong failedDocCount = new AtomicLong();
+
+ public AtomicLong rowsCount = new AtomicLong();
+
+ public AtomicLong queryCount = new AtomicLong();
+
+ public AtomicLong skipDocCount = new AtomicLong();
+
+ public Statistics add(Statistics stats) {
+ this.docCount.addAndGet(stats.docCount.get());
+ this.deletedDocCount.addAndGet(stats.deletedDocCount.get());
+ this.rowsCount.addAndGet(stats.rowsCount.get());
+ this.queryCount.addAndGet(stats.queryCount.get());
+
+ return this;
+ }
+
+ public Map<String, Object> getStatsSnapshot() {
+ Map<String, Object> result = new HashMap<>();
+ result.put("docCount", docCount.get());
+ result.put("deletedDocCount", deletedDocCount.get());
+ result.put("rowCount", rowsCount.get());
+ result.put("queryCount", rowsCount.get());
+ result.put("skipDocCount", skipDocCount.get());
+ return result;
+ }
+
+ }
+
+ private void cleanByQuery(String delQuery, AtomicBoolean completeCleanDone) {
+ delQuery = getVariableResolver().replaceTokens(delQuery);
+ if (reqParams.isClean()) {
+ if (delQuery == null && !completeCleanDone.get()) {
+ writer.doDeleteAll();
+ completeCleanDone.set(true);
+ } else if (delQuery != null) {
+ writer.deleteByQuery(delQuery);
+ }
+ }
+ }
+
+ public static final String LAST_INDEX_TIME = "last_index_time";
+ public static final String INDEX_START_TIME = "index_start_time";
+}