You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2018/11/02 11:33:40 UTC
[19/25] lucene-solr:jira/gradle: Adding dataimporthandler-extras
module
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d7c03684/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/SimplePropertiesWriter.java
----------------------------------------------------------------------
diff --git a/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/SimplePropertiesWriter.java b/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/SimplePropertiesWriter.java
deleted file mode 100644
index 1ee18ef..0000000
--- a/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/SimplePropertiesWriter.java
+++ /dev/null
@@ -1,247 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.handler.dataimport;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
-import java.lang.invoke.MethodHandles;
-import java.nio.charset.StandardCharsets;
-import java.security.AccessControlException;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.IllformedLocaleException;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.lucene.util.IOUtils;
-import org.apache.solr.common.util.SuppressForbidden;
-import org.apache.solr.core.SolrCore;
-import org.apache.solr.core.SolrResourceLoader;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.solr.handler.dataimport.DataImportHandlerException.SEVERE;
-/**
- * <p>
- * Writes properties using {@link Properties#store} .
- * The special property "last_index_time" is converted to a formatted date.
- * Users can configure the location, filename, locale and date format to use.
- * </p>
- */
-public class SimplePropertiesWriter extends DIHProperties {
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- static final String LAST_INDEX_KEY = "last_index_time";
-
- protected String filename = null;
-
- protected String configDir = null;
-
- protected Locale locale = null;
-
- protected SimpleDateFormat dateFormat = null;
-
- /**
- * The locale to use when writing the properties file. Default is {@link Locale#ROOT}
- */
- public static final String LOCALE = "locale";
- /**
- * The date format to use when writing values for "last_index_time" to the properties file.
- * See {@link SimpleDateFormat} for patterns. Default is yyyy-MM-dd HH:mm:ss .
- */
- public static final String DATE_FORMAT = "dateFormat";
- /**
- * The directory to save the properties file in. Default is the current core's "config" directory.
- */
- public static final String DIRECTORY = "directory";
- /**
- * The filename to save the properties file to. Default is this Handler's name from solrconfig.xml.
- */
- public static final String FILENAME = "filename";
-
- @Override
- public void init(DataImporter dataImporter, Map<String, String> params) {
- if(params.get(FILENAME) != null) {
- filename = params.get(FILENAME);
- } else if(dataImporter.getHandlerName()!=null) {
- filename = dataImporter.getHandlerName() + ".properties";
- } else {
- filename = "dataimport.properties";
- }
- findDirectory(dataImporter, params);
- if(params.get(LOCALE) != null) {
- locale = getLocale(params.get(LOCALE));
- } else {
- locale = Locale.ROOT;
- }
- if(params.get(DATE_FORMAT) != null) {
- dateFormat = new SimpleDateFormat(params.get(DATE_FORMAT), locale);
- } else {
- dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", locale);
- }
- }
-
- @SuppressForbidden(reason = "Usage of outdated locale parsing with Locale#toString() because of backwards compatibility")
- private Locale getLocale(String name) {
- if (name == null) {
- return Locale.ROOT;
- }
- for (final Locale l : Locale.getAvailableLocales()) {
- if(name.equals(l.toString()) || name.equals(l.getDisplayName(Locale.ROOT))) {
- return locale;
- }
- }
- try {
- return new Locale.Builder().setLanguageTag(name).build();
- } catch (IllformedLocaleException ex) {
- throw new DataImportHandlerException(SEVERE, "Unsupported locale for PropertyWriter: " + name);
- }
- }
-
- protected void findDirectory(DataImporter dataImporter, Map<String, String> params) {
- if(params.get(DIRECTORY) != null) {
- configDir = params.get(DIRECTORY);
- } else {
- SolrCore core = dataImporter.getCore();
- if (core == null) {
- configDir = SolrResourceLoader.locateSolrHome().toString();
- } else {
- configDir = core.getResourceLoader().getConfigDir();
- }
- }
- }
-
- private File getPersistFile() {
- final File filePath;
- if (new File(filename).isAbsolute() || configDir == null) {
- filePath = new File(filename);
- } else {
- filePath = new File(new File(configDir), filename);
- }
- return filePath;
- }
-
- @Override
- public boolean isWritable() {
- File persistFile = getPersistFile();
- try {
- return persistFile.exists()
- ? persistFile.canWrite()
- : persistFile.getParentFile().canWrite();
- } catch (AccessControlException e) {
- return false;
- }
- }
-
- @Override
- public String convertDateToString(Date d) {
- return dateFormat.format(d);
- }
- protected Date convertStringToDate(String s) {
- try {
- return dateFormat.parse(s);
- } catch (ParseException e) {
- throw new DataImportHandlerException(SEVERE, "Value for "
- + LAST_INDEX_KEY + " is invalid for date format "
- + dateFormat.toLocalizedPattern() + " : " + s);
- }
- }
- /**
- * {@link DocBuilder} sends the date as an Object because
- * this class knows how to convert it to a String
- */
- protected Properties mapToProperties(Map<String,Object> propObjs) {
- Properties p = new Properties();
- for(Map.Entry<String,Object> entry : propObjs.entrySet()) {
- String key = entry.getKey();
- String val = null;
- String lastKeyPart = key;
- int lastDotPos = key.lastIndexOf('.');
- if(lastDotPos!=-1 && key.length() > lastDotPos+1) {
- lastKeyPart = key.substring(lastDotPos + 1);
- }
- if(LAST_INDEX_KEY.equals(lastKeyPart) && entry.getValue() instanceof Date) {
- val = convertDateToString((Date) entry.getValue());
- } else {
- val = entry.getValue().toString();
- }
- p.put(key, val);
- }
- return p;
- }
- /**
- * We'll send everything back as Strings as this class has
- * already converted them.
- */
- protected Map<String,Object> propertiesToMap(Properties p) {
- Map<String,Object> theMap = new HashMap<>();
- for(Map.Entry<Object,Object> entry : p.entrySet()) {
- String key = entry.getKey().toString();
- Object val = entry.getValue().toString();
- theMap.put(key, val);
- }
- return theMap;
- }
-
- @Override
- public void persist(Map<String, Object> propObjs) {
- Writer propOutput = null;
- Properties existingProps = mapToProperties(readIndexerProperties());
- Properties newProps = mapToProperties(propObjs);
- try {
- existingProps.putAll(newProps);
- propOutput = new OutputStreamWriter(new FileOutputStream(getPersistFile()), StandardCharsets.UTF_8);
- existingProps.store(propOutput, null);
- log.info("Wrote last indexed time to " + filename);
- } catch (Exception e) {
- throw new DataImportHandlerException(DataImportHandlerException.SEVERE,
- "Unable to persist Index Start Time", e);
- } finally {
- IOUtils.closeWhileHandlingException(propOutput);
- }
- }
-
- @Override
- public Map<String, Object> readIndexerProperties() {
- Properties props = new Properties();
- InputStream propInput = null;
- try {
- String filePath = configDir;
- if (configDir != null && !configDir.endsWith(File.separator)) {
- filePath += File.separator;
- }
- filePath += filename;
- propInput = new FileInputStream(filePath);
- props.load(new InputStreamReader(propInput, StandardCharsets.UTF_8));
- log.info("Read " + filename);
- } catch (Exception e) {
- log.warn("Unable to read: " + filename);
- } finally {
- IOUtils.closeWhileHandlingException(propInput);
- }
- return propertiesToMap(props);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d7c03684/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/SolrEntityProcessor.java
----------------------------------------------------------------------
diff --git a/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/SolrEntityProcessor.java b/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/SolrEntityProcessor.java
deleted file mode 100644
index 7732673..0000000
--- a/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/SolrEntityProcessor.java
+++ /dev/null
@@ -1,321 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.handler.dataimport;
-
-import static org.apache.solr.handler.dataimport.DataImportHandlerException.SEVERE;
-import static org.apache.solr.handler.dataimport.DataImportHandlerException.wrapAndThrow;
-
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
-import org.apache.http.client.HttpClient;
-import org.apache.solr.client.solrj.SolrClient;
-import org.apache.solr.client.solrj.SolrQuery;
-import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.impl.HttpClientUtil;
-import org.apache.solr.client.solrj.impl.HttpSolrClient;
-import org.apache.solr.client.solrj.impl.HttpSolrClient.Builder;
-import org.apache.solr.client.solrj.impl.XMLResponseParser;
-import org.apache.solr.client.solrj.response.QueryResponse;
-import org.apache.solr.common.SolrDocument;
-import org.apache.solr.common.SolrDocumentList;
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.params.CommonParams;
-import org.apache.solr.common.params.CursorMarkParams;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * <p>
- * An implementation of {@link EntityProcessor} which fetches values from a
- * separate Solr implementation using the SolrJ client library. Yield a row per
- * Solr document.
- * </p>
- * <p>
- * Limitations:
- * All configuration is evaluated at the beginning;
- * Only one query is walked;
- * </p>
- */
-public class SolrEntityProcessor extends EntityProcessorBase {
-
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- public static final String SOLR_SERVER = "url";
- public static final String QUERY = "query";
- public static final String TIMEOUT = "timeout";
-
- public static final int TIMEOUT_SECS = 5 * 60; // 5 minutes
- public static final int ROWS_DEFAULT = 50;
-
- private SolrClient solrClient = null;
- private String queryString;
- private int rows = ROWS_DEFAULT;
- private String[] filterQueries;
- private String[] fields;
- private String requestHandler;// 'qt' param
- private int timeout = TIMEOUT_SECS;
-
- @Override
- public void destroy() {
- try {
- solrClient.close();
- } catch (IOException e) {
-
- } finally {
- HttpClientUtil.close(((HttpSolrClient) solrClient).getHttpClient());
- }
- }
-
- /**
- * Factory method that returns a {@link HttpClient} instance used for interfacing with a source Solr service.
- * One can override this method to return a differently configured {@link HttpClient} instance.
- * For example configure https and http authentication.
- *
- * @return a {@link HttpClient} instance used for interfacing with a source Solr service
- */
- protected HttpClient getHttpClient() {
- return HttpClientUtil.createClient(null);
- }
-
- @Override
- protected void firstInit(Context context) {
- super.firstInit(context);
-
- try {
- String serverPath = context.getResolvedEntityAttribute(SOLR_SERVER);
- if (serverPath == null) {
- throw new DataImportHandlerException(DataImportHandlerException.SEVERE,
- "SolrEntityProcessor: parameter 'url' is required");
- }
-
- HttpClient client = getHttpClient();
- URL url = new URL(serverPath);
- // (wt="javabin|xml") default is javabin
- if ("xml".equals(context.getResolvedEntityAttribute(CommonParams.WT))) {
- // TODO: it doesn't matter for this impl when passing a client currently, but we should close this!
- solrClient = new Builder(url.toExternalForm())
- .withHttpClient(client)
- .withResponseParser(new XMLResponseParser())
- .build();
- log.info("using XMLResponseParser");
- } else {
- // TODO: it doesn't matter for this impl when passing a client currently, but we should close this!
- solrClient = new Builder(url.toExternalForm())
- .withHttpClient(client)
- .build();
- log.info("using BinaryResponseParser");
- }
- } catch (MalformedURLException e) {
- throw new DataImportHandlerException(DataImportHandlerException.SEVERE, e);
- }
- }
-
- @Override
- public Map<String,Object> nextRow() {
- buildIterator();
- return getNext();
- }
-
- /**
- * The following method changes the rowIterator mutable field. It requires
- * external synchronization.
- */
- protected void buildIterator() {
- if (rowIterator != null) {
- SolrDocumentListIterator documentListIterator = (SolrDocumentListIterator) rowIterator;
- if (!documentListIterator.hasNext() && documentListIterator.hasMoreRows()) {
- nextPage();
- }
- } else {
- boolean cursor = Boolean.parseBoolean(context
- .getResolvedEntityAttribute(CursorMarkParams.CURSOR_MARK_PARAM));
- rowIterator = !cursor ? new SolrDocumentListIterator(new SolrDocumentList())
- : new SolrDocumentListCursor(new SolrDocumentList(), CursorMarkParams.CURSOR_MARK_START);
- nextPage();
- }
- }
-
- protected void nextPage() {
- ((SolrDocumentListIterator)rowIterator).doQuery();
- }
-
- class SolrDocumentListCursor extends SolrDocumentListIterator {
-
- private final String cursorMark;
-
- public SolrDocumentListCursor(SolrDocumentList solrDocumentList, String cursorMark) {
- super(solrDocumentList);
- this.cursorMark = cursorMark;
- }
-
- @Override
- protected void passNextPage(SolrQuery solrQuery) {
- String timeoutAsString = context.getResolvedEntityAttribute(TIMEOUT);
- if (timeoutAsString != null) {
- throw new DataImportHandlerException(SEVERE,"cursorMark can't be used with timeout");
- }
-
- solrQuery.set(CursorMarkParams.CURSOR_MARK_PARAM, cursorMark);
- }
-
- @Override
- protected Iterator<Map<String,Object>> createNextPageIterator(QueryResponse response) {
- return
- new SolrDocumentListCursor(response.getResults(),
- response.getNextCursorMark()) ;
- }
- }
-
- class SolrDocumentListIterator implements Iterator<Map<String,Object>> {
-
- private final int start;
- private final int size;
- private final long numFound;
- private final Iterator<SolrDocument> solrDocumentIterator;
-
- public SolrDocumentListIterator(SolrDocumentList solrDocumentList) {
- this.solrDocumentIterator = solrDocumentList.iterator();
- this.numFound = solrDocumentList.getNumFound();
- // SolrQuery has the start field of type int while SolrDocumentList of
- // type long. We are always querying with an int so we can't receive a
- // long as output. That's the reason why the following cast seems safe
- this.start = (int) solrDocumentList.getStart();
- this.size = solrDocumentList.size();
- }
-
- protected QueryResponse doQuery() {
- SolrEntityProcessor.this.queryString = context.getResolvedEntityAttribute(QUERY);
- if (SolrEntityProcessor.this.queryString == null) {
- throw new DataImportHandlerException(
- DataImportHandlerException.SEVERE,
- "SolrEntityProcessor: parameter 'query' is required"
- );
- }
-
- String rowsP = context.getResolvedEntityAttribute(CommonParams.ROWS);
- if (rowsP != null) {
- rows = Integer.parseInt(rowsP);
- }
-
- String sortParam = context.getResolvedEntityAttribute(CommonParams.SORT);
-
- String fqAsString = context.getResolvedEntityAttribute(CommonParams.FQ);
- if (fqAsString != null) {
- SolrEntityProcessor.this.filterQueries = fqAsString.split(",");
- }
-
- String fieldsAsString = context.getResolvedEntityAttribute(CommonParams.FL);
- if (fieldsAsString != null) {
- SolrEntityProcessor.this.fields = fieldsAsString.split(",");
- }
- SolrEntityProcessor.this.requestHandler = context.getResolvedEntityAttribute(CommonParams.QT);
-
-
- SolrQuery solrQuery = new SolrQuery(queryString);
- solrQuery.setRows(rows);
-
- if (sortParam!=null) {
- solrQuery.setParam(CommonParams.SORT, sortParam);
- }
-
- passNextPage(solrQuery);
-
- if (fields != null) {
- for (String field : fields) {
- solrQuery.addField(field);
- }
- }
- solrQuery.setRequestHandler(requestHandler);
- solrQuery.setFilterQueries(filterQueries);
-
-
- QueryResponse response = null;
- try {
- response = solrClient.query(solrQuery);
- } catch (SolrServerException | IOException | SolrException e) {
- if (ABORT.equals(onError)) {
- wrapAndThrow(SEVERE, e);
- } else if (SKIP.equals(onError)) {
- wrapAndThrow(DataImportHandlerException.SKIP_ROW, e);
- }
- }
-
- if (response != null) {
- SolrEntityProcessor.this.rowIterator = createNextPageIterator(response);
- }
- return response;
- }
-
- protected Iterator<Map<String,Object>> createNextPageIterator(QueryResponse response) {
- return new SolrDocumentListIterator(response.getResults());
- }
-
- protected void passNextPage(SolrQuery solrQuery) {
- String timeoutAsString = context.getResolvedEntityAttribute(TIMEOUT);
- if (timeoutAsString != null) {
- SolrEntityProcessor.this.timeout = Integer.parseInt(timeoutAsString);
- }
-
- solrQuery.setTimeAllowed(timeout * 1000);
-
- solrQuery.setStart(getStart() + getSize());
- }
-
- @Override
- public boolean hasNext() {
- return solrDocumentIterator.hasNext();
- }
-
- @Override
- public Map<String,Object> next() {
- SolrDocument solrDocument = solrDocumentIterator.next();
-
- HashMap<String,Object> map = new HashMap<>();
- Collection<String> fields = solrDocument.getFieldNames();
- for (String field : fields) {
- Object fieldValue = solrDocument.getFieldValue(field);
- map.put(field, fieldValue);
- }
- return map;
- }
-
- public int getStart() {
- return start;
- }
-
- public int getSize() {
- return size;
- }
-
- public boolean hasMoreRows() {
- return numFound > start + size;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d7c03684/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/SolrQueryEscapingEvaluator.java
----------------------------------------------------------------------
diff --git a/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/SolrQueryEscapingEvaluator.java b/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/SolrQueryEscapingEvaluator.java
deleted file mode 100644
index aece031..0000000
--- a/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/SolrQueryEscapingEvaluator.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.handler.dataimport;
-
-import static org.apache.solr.handler.dataimport.DataImportHandlerException.SEVERE;
-
-import java.util.List;
-
-import org.apache.solr.client.solrj.util.ClientUtils;
-
-public class SolrQueryEscapingEvaluator extends Evaluator {
- @Override
- public String evaluate(String expression, Context context) {
- List<Object> l = parseParams(expression, context.getVariableResolver());
- if (l.size() != 1) {
- throw new DataImportHandlerException(SEVERE, "'escapeQueryChars' must have at least one parameter ");
- }
- String s = l.get(0).toString();
- return ClientUtils.escapeQueryChars(s);
- }
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d7c03684/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/SolrWriter.java
----------------------------------------------------------------------
diff --git a/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/SolrWriter.java b/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/SolrWriter.java
deleted file mode 100644
index 3964f3f..0000000
--- a/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/SolrWriter.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.handler.dataimport;
-
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.SolrInputDocument;
-import org.apache.solr.common.params.UpdateParams;
-import org.apache.solr.request.SolrQueryRequest;
-import org.apache.solr.update.AddUpdateCommand;
-import org.apache.solr.update.CommitUpdateCommand;
-import org.apache.solr.update.DeleteUpdateCommand;
-import org.apache.solr.update.RollbackUpdateCommand;
-import org.apache.solr.update.processor.UpdateRequestProcessor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.*;
-import java.lang.invoke.MethodHandles;
-import java.nio.charset.StandardCharsets;
-
-/**
- * <p> Writes documents to SOLR. </p>
- * <p>
- * <b>This API is experimental and may change in the future.</b>
- *
- * @since solr 1.3
- */
-public class SolrWriter extends DIHWriterBase implements DIHWriter {
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- public static final String LAST_INDEX_KEY = "last_index_time";
-
- private final UpdateRequestProcessor processor;
- private final int commitWithin;
-
- SolrQueryRequest req;
-
- public SolrWriter(UpdateRequestProcessor processor, SolrQueryRequest req) {
- this.processor = processor;
- this.req = req;
- commitWithin = (req != null) ? req.getParams().getInt(UpdateParams.COMMIT_WITHIN, -1): -1;
- }
-
- @Override
- public void close() {
- try {
- processor.finish();
- } catch (IOException e) {
- throw new DataImportHandlerException(DataImportHandlerException.SEVERE,
- "Unable to call finish() on UpdateRequestProcessor", e);
- } finally {
- deltaKeys = null;
- try {
- processor.close();
- } catch (IOException e) {
- SolrException.log(log, e);
- }
- }
- }
- @Override
- public boolean upload(SolrInputDocument d) {
- try {
- AddUpdateCommand command = new AddUpdateCommand(req);
- command.solrDoc = d;
- command.commitWithin = commitWithin;
- processor.processAdd(command);
- } catch (Exception e) {
- log.warn("Error creating document : " + d, e);
- return false;
- }
-
- return true;
- }
-
- @Override
- public void deleteDoc(Object id) {
- try {
- log.info("Deleting document: " + id);
- DeleteUpdateCommand delCmd = new DeleteUpdateCommand(req);
- delCmd.setId(id.toString());
- processor.processDelete(delCmd);
- } catch (IOException e) {
- log.error("Exception while deleteing: " + id, e);
- }
- }
-
- @Override
- public void deleteByQuery(String query) {
- try {
- log.info("Deleting documents from Solr with query: " + query);
- DeleteUpdateCommand delCmd = new DeleteUpdateCommand(req);
- delCmd.query = query;
- processor.processDelete(delCmd);
- } catch (IOException e) {
- log.error("Exception while deleting by query: " + query, e);
- }
- }
-
- @Override
- public void commit(boolean optimize) {
- try {
- CommitUpdateCommand commit = new CommitUpdateCommand(req,optimize);
- processor.processCommit(commit);
- } catch (Exception e) {
- log.error("Exception while solr commit.", e);
- }
- }
-
- @Override
- public void rollback() {
- try {
- RollbackUpdateCommand rollback = new RollbackUpdateCommand(req);
- processor.processRollback(rollback);
- } catch (Exception e) {
- log.error("Exception during rollback command.", e);
- }
- }
-
- @Override
- public void doDeleteAll() {
- try {
- DeleteUpdateCommand deleteCommand = new DeleteUpdateCommand(req);
- deleteCommand.query = "*:*";
- processor.processDelete(deleteCommand);
- } catch (IOException e) {
- throw new DataImportHandlerException(DataImportHandlerException.SEVERE,
- "Exception in full dump while deleting all documents.", e);
- }
- }
-
- static String getResourceAsString(InputStream in) throws IOException {
- ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
- byte[] buf = new byte[1024];
- int sz = 0;
- try {
- while ((sz = in.read(buf)) != -1) {
- baos.write(buf, 0, sz);
- }
- } finally {
- try {
- in.close();
- } catch (Exception e) {
-
- }
- }
- return new String(baos.toByteArray(), StandardCharsets.UTF_8);
- }
-
- static String getDocCount() {
- if (DocBuilder.INSTANCE.get() != null) {
- return ""
- + (DocBuilder.INSTANCE.get().importStatistics.docCount.get() + 1);
- } else {
- return null;
- }
- }
- @Override
- public void init(Context context) {
- /* NO-OP */
- }
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d7c03684/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/SortedMapBackedCache.java
----------------------------------------------------------------------
diff --git a/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/SortedMapBackedCache.java b/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/SortedMapBackedCache.java
deleted file mode 100644
index bb84ba9..0000000
--- a/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/SortedMapBackedCache.java
+++ /dev/null
@@ -1,238 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.handler.dataimport;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedMap;
-import java.util.TreeMap;
-
-public class SortedMapBackedCache implements DIHCache {
- private SortedMap<Object,List<Map<String,Object>>> theMap = null;
- private boolean isOpen = false;
- private boolean isReadOnly = false;
- String primaryKeyName = null;
-
- @SuppressWarnings("unchecked")
- @Override
- public void add(Map<String,Object> rec) {
- checkOpen(true);
- checkReadOnly();
-
- if (rec == null || rec.size() == 0) {
- return;
- }
-
- if (primaryKeyName == null) {
- primaryKeyName = rec.keySet().iterator().next();
- }
-
- Object pk = rec.get(primaryKeyName);
- if (pk instanceof Collection<?>) {
- Collection<Object> c = (Collection<Object>) pk;
- if (c.size() != 1) {
- throw new RuntimeException(
- "The primary key must have exactly 1 element.");
- }
- pk = c.iterator().next();
- }
- //Rows with null keys are not added.
- if(pk==null) {
- return;
- }
- List<Map<String,Object>> thisKeysRecs = theMap.get(pk);
- if (thisKeysRecs == null) {
- thisKeysRecs = new ArrayList<>();
- theMap.put(pk, thisKeysRecs);
- }
- thisKeysRecs.add(rec);
- }
-
- private void checkOpen(boolean shouldItBe) {
- if (!isOpen && shouldItBe) {
- throw new IllegalStateException(
- "Must call open() before using this cache.");
- }
- if (isOpen && !shouldItBe) {
- throw new IllegalStateException("The cache is already open.");
- }
- }
-
- private void checkReadOnly() {
- if (isReadOnly) {
- throw new IllegalStateException("Cache is read-only.");
- }
- }
-
- @Override
- public void close() {
- isOpen = false;
- }
-
- @Override
- public void delete(Object key) {
- checkOpen(true);
- checkReadOnly();
- if(key==null) {
- return;
- }
- theMap.remove(key);
- }
-
- @Override
- public void deleteAll() {
- deleteAll(false);
- }
-
- private void deleteAll(boolean readOnlyOk) {
- if (!readOnlyOk) {
- checkReadOnly();
- }
- if (theMap != null) {
- theMap.clear();
- }
- }
-
- @Override
- public void destroy() {
- deleteAll(true);
- theMap = null;
- isOpen = false;
- }
-
- @Override
- public void flush() {
- checkOpen(true);
- checkReadOnly();
- }
-
- @Override
- public Iterator<Map<String,Object>> iterator(Object key) {
- checkOpen(true);
- if(key==null) {
- return null;
- }
- if(key instanceof Iterable<?>) {
- List<Map<String,Object>> vals = new ArrayList<>();
- Iterator<?> iter = ((Iterable<?>) key).iterator();
- while(iter.hasNext()) {
- List<Map<String,Object>> val = theMap.get(iter.next());
- if(val!=null) {
- vals.addAll(val);
- }
- }
- if(vals.size()==0) {
- return null;
- }
- return vals.iterator();
- }
- List<Map<String,Object>> val = theMap.get(key);
- if (val == null) {
- return null;
- }
- return val.iterator();
- }
-
- @Override
- public Iterator<Map<String,Object>> iterator() {
- return new Iterator<Map<String, Object>>() {
- private Iterator<Map.Entry<Object,List<Map<String,Object>>>> theMapIter;
- private List<Map<String,Object>> currentKeyResult = null;
- private Iterator<Map<String,Object>> currentKeyResultIter = null;
-
- {
- theMapIter = theMap.entrySet().iterator();
- }
-
- @Override
- public boolean hasNext() {
- if (currentKeyResultIter != null) {
- if (currentKeyResultIter.hasNext()) {
- return true;
- } else {
- currentKeyResult = null;
- currentKeyResultIter = null;
- }
- }
-
- Map.Entry<Object,List<Map<String,Object>>> next = null;
- if (theMapIter.hasNext()) {
- next = theMapIter.next();
- currentKeyResult = next.getValue();
- currentKeyResultIter = currentKeyResult.iterator();
- if (currentKeyResultIter.hasNext()) {
- return true;
- }
- }
- return false;
- }
-
- @Override
- public Map<String,Object> next() {
- if (currentKeyResultIter != null) {
- if (currentKeyResultIter.hasNext()) {
- return currentKeyResultIter.next();
- } else {
- currentKeyResult = null;
- currentKeyResultIter = null;
- }
- }
-
- Map.Entry<Object,List<Map<String,Object>>> next = null;
- if (theMapIter.hasNext()) {
- next = theMapIter.next();
- currentKeyResult = next.getValue();
- currentKeyResultIter = currentKeyResult.iterator();
- if (currentKeyResultIter.hasNext()) {
- return currentKeyResultIter.next();
- }
- }
- return null;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- };
- }
-
- @Override
- public void open(Context context) {
- checkOpen(false);
- isOpen = true;
- if (theMap == null) {
- theMap = new TreeMap<>();
- }
-
- String pkName = CachePropertyUtil.getAttributeValueAsString(context,
- DIHCacheSupport.CACHE_PRIMARY_KEY);
- if (pkName != null) {
- primaryKeyName = pkName;
- }
- isReadOnly = false;
- String readOnlyStr = CachePropertyUtil.getAttributeValueAsString(context,
- DIHCacheSupport.CACHE_READ_ONLY);
- if ("true".equalsIgnoreCase(readOnlyStr)) {
- isReadOnly = true;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d7c03684/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/SqlEntityProcessor.java
----------------------------------------------------------------------
diff --git a/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/SqlEntityProcessor.java b/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/SqlEntityProcessor.java
deleted file mode 100644
index 19c6d0f..0000000
--- a/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/SqlEntityProcessor.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.handler.dataimport;
-
-import java.lang.invoke.MethodHandles;
-import java.util.Iterator;
-import java.util.Map;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-/**
- * <p>
- * An {@link EntityProcessor} instance which provides support for reading from
- * databases. It is used in conjunction with {@link JdbcDataSource}. This is the default
- * {@link EntityProcessor} if none is specified explicitly in data-config.xml
- * </p>
- * <p>
- * Refer to <a
- * href="http://wiki.apache.org/solr/DataImportHandler">http://wiki.apache.org/solr/DataImportHandler</a>
- * for more details.
- * </p>
- * <p>
- * <b>This API is experimental and may change in the future.</b>
- *
- *
- * @since solr 1.3
- */
-public class SqlEntityProcessor extends EntityProcessorBase {
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- protected DataSource<Iterator<Map<String, Object>>> dataSource;
-
- @Override
- @SuppressWarnings("unchecked")
- public void init(Context context) {
- super.init(context);
- dataSource = context.getDataSource();
- }
-
- protected void initQuery(String q) {
- try {
- DataImporter.QUERY_COUNT.get().incrementAndGet();
- rowIterator = dataSource.getData(q);
- this.query = q;
- } catch (DataImportHandlerException e) {
- throw e;
- } catch (Exception e) {
- log.error( "The query failed '" + q + "'", e);
- throw new DataImportHandlerException(DataImportHandlerException.SEVERE, e);
- }
- }
-
- @Override
- public Map<String, Object> nextRow() {
- if (rowIterator == null) {
- String q = getQuery();
- initQuery(context.replaceTokens(q));
- }
- return getNext();
- }
-
- @Override
- public Map<String, Object> nextModifiedRowKey() {
- if (rowIterator == null) {
- String deltaQuery = context.getEntityAttribute(DELTA_QUERY);
- if (deltaQuery == null)
- return null;
- initQuery(context.replaceTokens(deltaQuery));
- }
- return getNext();
- }
-
- @Override
- public Map<String, Object> nextDeletedRowKey() {
- if (rowIterator == null) {
- String deletedPkQuery = context.getEntityAttribute(DEL_PK_QUERY);
- if (deletedPkQuery == null)
- return null;
- initQuery(context.replaceTokens(deletedPkQuery));
- }
- return getNext();
- }
-
- @Override
- public Map<String, Object> nextModifiedParentRowKey() {
- if (rowIterator == null) {
- String parentDeltaQuery = context.getEntityAttribute(PARENT_DELTA_QUERY);
- if (parentDeltaQuery == null)
- return null;
- log.info("Running parentDeltaQuery for Entity: "
- + context.getEntityAttribute("name"));
- initQuery(context.replaceTokens(parentDeltaQuery));
- }
- return getNext();
- }
-
- public String getQuery() {
- String queryString = context.getEntityAttribute(QUERY);
- if (Context.FULL_DUMP.equals(context.currentProcess())) {
- return queryString;
- }
- if (Context.DELTA_DUMP.equals(context.currentProcess())) {
- String deltaImportQuery = context.getEntityAttribute(DELTA_IMPORT_QUERY);
- if(deltaImportQuery != null) return deltaImportQuery;
- }
- log.warn("'deltaImportQuery' attribute is not specified for entity : "+ entityName);
- return getDeltaImportQuery(queryString);
- }
-
- public String getDeltaImportQuery(String queryString) {
- StringBuilder sb = new StringBuilder(queryString);
- if (SELECT_WHERE_PATTERN.matcher(queryString).find()) {
- sb.append(" and ");
- } else {
- sb.append(" where ");
- }
- boolean first = true;
- String[] primaryKeys = context.getEntityAttribute("pk").split(",");
- for (String primaryKey : primaryKeys) {
- if (!first) {
- sb.append(" and ");
- }
- first = false;
- Object val = context.resolve("dataimporter.delta." + primaryKey);
- if (val == null) {
- Matcher m = DOT_PATTERN.matcher(primaryKey);
- if (m.find()) {
- val = context.resolve("dataimporter.delta." + m.group(1));
- }
- }
- sb.append(primaryKey).append(" = ");
- if (val instanceof Number) {
- sb.append(val.toString());
- } else {
- sb.append("'").append(val.toString()).append("'");
- }
- }
- return sb.toString();
- }
-
- private static Pattern SELECT_WHERE_PATTERN = Pattern.compile(
- "^\\s*(select\\b.*?\\b)(where).*", Pattern.CASE_INSENSITIVE);
-
- public static final String QUERY = "query";
-
- public static final String DELTA_QUERY = "deltaQuery";
-
- public static final String DELTA_IMPORT_QUERY = "deltaImportQuery";
-
- public static final String PARENT_DELTA_QUERY = "parentDeltaQuery";
-
- public static final String DEL_PK_QUERY = "deletedPkQuery";
-
- public static final Pattern DOT_PATTERN = Pattern.compile(".*?\\.(.*)$");
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d7c03684/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/SqlEscapingEvaluator.java
----------------------------------------------------------------------
diff --git a/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/SqlEscapingEvaluator.java b/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/SqlEscapingEvaluator.java
deleted file mode 100644
index 7f9c26e..0000000
--- a/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/SqlEscapingEvaluator.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.handler.dataimport;
-
-import static org.apache.solr.handler.dataimport.DataImportHandlerException.SEVERE;
-
-import java.util.List;
-
-/**
- * <p> Escapes values in SQL queries. It escapes the value of the given expression
- * by replacing all occurrences of single-quotes by two single-quotes and similarily
- * for double-quotes </p>
- */
-public class SqlEscapingEvaluator extends Evaluator {
- @Override
- public String evaluate(String expression, Context context) {
- List<Object> l = parseParams(expression, context.getVariableResolver());
- if (l.size() != 1) {
- throw new DataImportHandlerException(SEVERE, "'escapeSql' must have at least one parameter ");
- }
- String s = l.get(0).toString();
- // escape single quote with two single quotes, double quote
- // with two doule quotes, and backslash with double backslash.
- // See: http://dev.mysql.com/doc/refman/4.1/en/mysql-real-escape-string.html
- return s.replaceAll("'", "''").replaceAll("\"", "\"\"").replaceAll("\\\\", "\\\\\\\\");
- }
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d7c03684/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/TemplateTransformer.java
----------------------------------------------------------------------
diff --git a/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/TemplateTransformer.java b/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/TemplateTransformer.java
deleted file mode 100644
index f655edd..0000000
--- a/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/TemplateTransformer.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.handler.dataimport;
-
-import java.lang.invoke.MethodHandles;
-import java.util.HashMap;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Map;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * <p>
- * A {@link Transformer} which can put values into a column by resolving an expression
- * containing other columns
- * </p>
- * <p>
- * For example:<br>
- * <field column="name" template="${e.lastName}, ${e.firstName}
- * ${e.middleName}" /> will produce the name by combining values from
- * lastName, firstName and middleName fields as given in the template attribute.
- * </p>
- * <p>
- * Refer to <a
- * href="http://wiki.apache.org/solr/DataImportHandler">http://wiki.apache.org/solr/DataImportHandler</a>
- * for more details.
- * </p>
- * <p>
- * <b>This API is experimental and may change in the future.</b>
- *
- *
- * @since solr 1.3
- */
-public class TemplateTransformer extends Transformer {
-
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private Map<String ,List<String>> templateVsVars = new HashMap<>();
-
- @Override
- @SuppressWarnings("unchecked")
- public Object transformRow(Map<String, Object> row, Context context) {
-
-
- VariableResolver resolver = context.getVariableResolver();
- // Add current row to the copy of resolver map
-
- for (Map<String, String> map : context.getAllEntityFields()) {
- map.entrySet();
- String expr = map.get(TEMPLATE);
- if (expr == null)
- continue;
-
- String column = map.get(DataImporter.COLUMN);
-
- // Verify if all variables can be resolved or not
- boolean resolvable = true;
- List<String> variables = this.templateVsVars.get(expr);
- if(variables == null){
- variables = resolver.getVariables(expr);
- this.templateVsVars.put(expr, variables);
- }
- for (String v : variables) {
- if (resolver.resolve(v) == null) {
- log.warn("Unable to resolve variable: " + v
- + " while parsing expression: " + expr);
- resolvable = false;
- }
- }
-
- if (!resolvable)
- continue;
- if(variables.size() == 1 && expr.startsWith("${") && expr.endsWith("}")){
- addToRow(column, row, resolver.resolve(variables.get(0)));
- } else {
- addToRow(column, row, resolver.replaceTokens(expr));
- }
- }
-
- return row;
- }
-
- private void addToRow(String key, Map<String, Object> row, Object value) {
- Object prevVal = row.get(key);
- if (prevVal != null) {
- if (prevVal instanceof List) {
- ((List) prevVal).add(value);
- } else {
- ArrayList<Object> valList = new ArrayList<Object>();
- valList.add(prevVal);
- valList.add(value);
- row.put(key, valList);
- }
- } else {
- row.put(key, value);
- }
- }
-
- public static final String TEMPLATE = "template";
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d7c03684/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/Transformer.java
----------------------------------------------------------------------
diff --git a/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/Transformer.java b/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/Transformer.java
deleted file mode 100644
index c7923e1..0000000
--- a/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/Transformer.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.handler.dataimport;
-
-import java.util.Map;
-
-/**
- * <p>
- * Use this API to implement a custom transformer for any given entity
- * </p>
- * <p>
- * Implementations of this abstract class must provide a public no-args constructor.
- * </p>
- * <p>
- * Refer to <a
- * href="http://wiki.apache.org/solr/DataImportHandler">http://wiki.apache.org/solr/DataImportHandler</a>
- * for more details.
- * </p>
- * <p>
- * <b>This API is experimental and may change in the future.</b>
- *
- *
- * @since solr 1.3
- */
-public abstract class Transformer {
- /**
- * The input is a row of data and the output has to be a new row.
- *
- * @param context The current context
- * @param row A row of data
- * @return The changed data. It must be a {@link Map}<{@link String}, {@link Object}> if it returns
- * only one row or if there are multiple rows to be returned it must
- * be a {@link java.util.List}<{@link Map}<{@link String}, {@link Object}>>
- */
- public abstract Object transformRow(Map<String, Object> row, Context context);
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d7c03684/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/URLDataSource.java
----------------------------------------------------------------------
diff --git a/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/URLDataSource.java b/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/URLDataSource.java
deleted file mode 100644
index 145ffc4..0000000
--- a/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/URLDataSource.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.handler.dataimport;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.Reader;
-import java.lang.invoke.MethodHandles;
-import java.net.URL;
-import java.net.URLConnection;
-import java.nio.charset.StandardCharsets;
-import java.util.Properties;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-/**
- * <p> A data source implementation which can be used to read character files using HTTP. </p> <p> Refer to <a
- * href="http://wiki.apache.org/solr/DataImportHandler">http://wiki.apache.org/solr/DataImportHandler</a> for more
- * details. </p>
- * <p>
- * <b>This API is experimental and may change in the future.</b>
- *
- *
- * @since solr 1.4
- */
-public class URLDataSource extends DataSource<Reader> {
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- private String baseUrl;
-
- private String encoding;
-
- private int connectionTimeout = CONNECTION_TIMEOUT;
-
- private int readTimeout = READ_TIMEOUT;
-
- private Context context;
-
- private Properties initProps;
-
- public URLDataSource() {
- }
-
- @Override
- public void init(Context context, Properties initProps) {
- this.context = context;
- this.initProps = initProps;
-
- baseUrl = getInitPropWithReplacements(BASE_URL);
- if (getInitPropWithReplacements(ENCODING) != null)
- encoding = getInitPropWithReplacements(ENCODING);
- String cTimeout = getInitPropWithReplacements(CONNECTION_TIMEOUT_FIELD_NAME);
- String rTimeout = getInitPropWithReplacements(READ_TIMEOUT_FIELD_NAME);
- if (cTimeout != null) {
- try {
- connectionTimeout = Integer.parseInt(cTimeout);
- } catch (NumberFormatException e) {
- log.warn("Invalid connection timeout: " + cTimeout);
- }
- }
- if (rTimeout != null) {
- try {
- readTimeout = Integer.parseInt(rTimeout);
- } catch (NumberFormatException e) {
- log.warn("Invalid read timeout: " + rTimeout);
- }
- }
- }
-
- @Override
- public Reader getData(String query) {
- URL url = null;
- try {
- if (URIMETHOD.matcher(query).find()) url = new URL(query);
- else url = new URL(baseUrl + query);
-
- log.debug("Accessing URL: " + url.toString());
-
- URLConnection conn = url.openConnection();
- conn.setConnectTimeout(connectionTimeout);
- conn.setReadTimeout(readTimeout);
- InputStream in = conn.getInputStream();
- String enc = encoding;
- if (enc == null) {
- String cType = conn.getContentType();
- if (cType != null) {
- Matcher m = CHARSET_PATTERN.matcher(cType);
- if (m.find()) {
- enc = m.group(1);
- }
- }
- }
- if (enc == null)
- enc = UTF_8;
- DataImporter.QUERY_COUNT.get().incrementAndGet();
- return new InputStreamReader(in, enc);
- } catch (Exception e) {
- log.error("Exception thrown while getting data", e);
- throw new DataImportHandlerException(DataImportHandlerException.SEVERE,
- "Exception in invoking url " + url, e);
- }
- }
-
- @Override
- public void close() {
- }
-
- public String getBaseUrl() {
- return baseUrl;
- }
-
- private String getInitPropWithReplacements(String propertyName) {
- final String expr = initProps.getProperty(propertyName);
- if (expr == null) {
- return null;
- }
- return context.replaceTokens(expr);
- }
-
- static final Pattern URIMETHOD = Pattern.compile("\\w{3,}:/");
-
- private static final Pattern CHARSET_PATTERN = Pattern.compile(".*?charset=(.*)$", Pattern.CASE_INSENSITIVE);
-
- public static final String ENCODING = "encoding";
-
- public static final String BASE_URL = "baseUrl";
-
- public static final String UTF_8 = StandardCharsets.UTF_8.name();
-
- public static final String CONNECTION_TIMEOUT_FIELD_NAME = "connectionTimeout";
-
- public static final String READ_TIMEOUT_FIELD_NAME = "readTimeout";
-
- public static final int CONNECTION_TIMEOUT = 5000;
-
- public static final int READ_TIMEOUT = 10000;
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d7c03684/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/UrlEvaluator.java
----------------------------------------------------------------------
diff --git a/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/UrlEvaluator.java b/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/UrlEvaluator.java
deleted file mode 100644
index 8a6654c..0000000
--- a/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/UrlEvaluator.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.handler.dataimport;
-
-import static org.apache.solr.handler.dataimport.DataImportHandlerException.SEVERE;
-import static org.apache.solr.handler.dataimport.DataImportHandlerException.wrapAndThrow;
-
-import java.net.URLEncoder;
-import java.util.List;
-
-/**
- * <p>Escapes reserved characters in Solr queries</p>
- *
- * @see org.apache.solr.client.solrj.util.ClientUtils#escapeQueryChars(String)
- */
-public class UrlEvaluator extends Evaluator {
- @Override
- public String evaluate(String expression, Context context) {
- List<Object> l = parseParams(expression, context.getVariableResolver());
- if (l.size() != 1) {
- throw new DataImportHandlerException(SEVERE, "'encodeUrl' must have at least one parameter ");
- }
- String s = l.get(0).toString();
-
- try {
- return URLEncoder.encode(s.toString(), "UTF-8");
- } catch (Exception e) {
- wrapAndThrow(SEVERE, e, "Unable to encode expression: " + expression + " with value: " + s);
- return null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d7c03684/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/VariableResolver.java
----------------------------------------------------------------------
diff --git a/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/VariableResolver.java b/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/VariableResolver.java
deleted file mode 100644
index 090e21b..0000000
--- a/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/VariableResolver.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.handler.dataimport;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.WeakHashMap;
-import java.util.function.Function;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.solr.common.util.Cache;
-import org.apache.solr.common.util.MapBackedCache;
-import org.apache.solr.update.processor.TemplateUpdateProcessorFactory;
-
-import static org.apache.solr.update.processor.TemplateUpdateProcessorFactory.Resolved;
-
-/**
- * <p>
- * A set of nested maps that can resolve variables by namespaces. Variables are
- * enclosed with a dollar sign then an opening curly brace, ending with a
- * closing curly brace. Namespaces are delimited with '.' (period).
- * </p>
- * <p>
- * This class also has special logic to resolve evaluator calls by recognizing
- * the reserved function namespace: dataimporter.functions.xxx
- * </p>
- * <p>
- * This class caches strings that have already been resolved from the current
- * dih import.
- * </p>
- * <b>This API is experimental and may change in the future.</b>
- *
- *
- * @since solr 1.3
- */
-public class VariableResolver {
-
- private static final Pattern DOT_PATTERN = Pattern.compile("[.]");
- private static final Pattern EVALUATOR_FORMAT_PATTERN = Pattern
- .compile("^(\\w*?)\\((.*?)\\)$");
- private Map<String,Object> rootNamespace;
- private Map<String,Evaluator> evaluators;
- private Cache<String,Resolved> cache = new MapBackedCache<>(new WeakHashMap<>());
- private Function<String,Object> fun = this::resolve;
-
- public static final String FUNCTIONS_NAMESPACE = "dataimporter.functions.";
- public static final String FUNCTIONS_NAMESPACE_SHORT = "dih.functions.";
-
- public VariableResolver() {
- rootNamespace = new HashMap<>();
- }
-
- public VariableResolver(Properties defaults) {
- rootNamespace = new HashMap<>();
- for (Map.Entry<Object,Object> entry : defaults.entrySet()) {
- rootNamespace.put(entry.getKey().toString(), entry.getValue());
- }
- }
-
- public VariableResolver(Map<String,Object> defaults) {
- rootNamespace = new HashMap<>(defaults);
- }
-
- /**
- * Resolves a given value with a name
- *
- * @param name
- * the String to be resolved
- * @return an Object which is the result of evaluation of given name
- */
- public Object resolve(String name) {
- Object r = null;
- if (name != null) {
- String[] nameParts = DOT_PATTERN.split(name);
- CurrentLevel cr = currentLevelMap(nameParts,
- rootNamespace, false);
- Map<String,Object> currentLevel = cr.map;
- r = currentLevel.get(nameParts[nameParts.length - 1]);
- if (r == null && name.startsWith(FUNCTIONS_NAMESPACE)
- && name.length() > FUNCTIONS_NAMESPACE.length()) {
- return resolveEvaluator(FUNCTIONS_NAMESPACE, name);
- }
- if (r == null && name.startsWith(FUNCTIONS_NAMESPACE_SHORT)
- && name.length() > FUNCTIONS_NAMESPACE_SHORT.length()) {
- return resolveEvaluator(FUNCTIONS_NAMESPACE_SHORT, name);
- }
- if (r == null) {
- StringBuilder sb = new StringBuilder();
- for(int i=cr.level ; i<nameParts.length ; i++) {
- if(sb.length()>0) {
- sb.append(".");
- }
- sb.append(nameParts[i]);
- }
- r = cr.map.get(sb.toString());
- }
- if (r == null) {
- r = System.getProperty(name);
- }
- }
- return r == null ? "" : r;
- }
-
- private Object resolveEvaluator(String namespace, String name) {
- if (evaluators == null) {
- return "";
- }
- Matcher m = EVALUATOR_FORMAT_PATTERN.matcher(name
- .substring(namespace.length()));
- if (m.find()) {
- String fname = m.group(1);
- Evaluator evaluator = evaluators.get(fname);
- if (evaluator == null) return "";
- ContextImpl ctx = new ContextImpl(null, this, null, null, null, null,
- null);
- String g2 = m.group(2);
- return evaluator.evaluate(g2, ctx);
- } else {
- return "";
- }
- }
-
- /**
- * Given a String with place holders, replace them with the value tokens.
- *
- * @return the string with the placeholders replaced with their values
- */
- public String replaceTokens(String template) {
- return TemplateUpdateProcessorFactory.replaceTokens(template, cache, fun, TemplateUpdateProcessorFactory.DOLLAR_BRACES_PLACEHOLDER_PATTERN);
- }
- public void addNamespace(String name, Map<String,Object> newMap) {
- if (newMap != null) {
- if (name != null) {
- String[] nameParts = DOT_PATTERN.split(name);
- Map<String,Object> nameResolveLevel = currentLevelMap(nameParts,
- rootNamespace, false).map;
- nameResolveLevel.put(nameParts[nameParts.length - 1], newMap);
- } else {
- for (Map.Entry<String,Object> entry : newMap.entrySet()) {
- String[] keyParts = DOT_PATTERN.split(entry.getKey());
- Map<String,Object> currentLevel = rootNamespace;
- currentLevel = currentLevelMap(keyParts, currentLevel, false).map;
- currentLevel.put(keyParts[keyParts.length - 1], entry.getValue());
- }
- }
- }
- }
-
- public List<String> getVariables(String expr) {
- return TemplateUpdateProcessorFactory.getVariables(expr, cache, TemplateUpdateProcessorFactory.DOLLAR_BRACES_PLACEHOLDER_PATTERN);
- }
-
- static class CurrentLevel {
- final Map<String,Object> map;
- final int level;
- CurrentLevel(int level, Map<String,Object> map) {
- this.level = level;
- this.map = map;
- }
- }
-
- private CurrentLevel currentLevelMap(String[] keyParts,
- Map<String,Object> currentLevel, boolean includeLastLevel) {
- int j = includeLastLevel ? keyParts.length : keyParts.length - 1;
- for (int i = 0; i < j; i++) {
- Object o = currentLevel.get(keyParts[i]);
- if (o == null) {
- if(i == j-1) {
- Map<String,Object> nextLevel = new HashMap<>();
- currentLevel.put(keyParts[i], nextLevel);
- currentLevel = nextLevel;
- } else {
- return new CurrentLevel(i, currentLevel);
- }
- } else if (o instanceof Map<?,?>) {
- @SuppressWarnings("unchecked")
- Map<String,Object> nextLevel = (Map<String,Object>) o;
- currentLevel = nextLevel;
- } else {
- throw new AssertionError(
- "Non-leaf nodes should be of type java.util.Map");
- }
- }
- return new CurrentLevel(j-1, currentLevel);
- }
-
- public void removeNamespace(String name) {
- rootNamespace.remove(name);
- }
-
- public void setEvaluators(Map<String,Evaluator> evaluators) {
- this.evaluators = evaluators;
- }
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d7c03684/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/XPathEntityProcessor.java
----------------------------------------------------------------------
diff --git a/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/XPathEntityProcessor.java b/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/XPathEntityProcessor.java
deleted file mode 100644
index c93b581..0000000
--- a/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/XPathEntityProcessor.java
+++ /dev/null
@@ -1,548 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.handler.dataimport;
-
-import static org.apache.solr.handler.dataimport.DataImportHandlerException.SEVERE;
-import static org.apache.solr.handler.dataimport.DataImportHandlerException.wrapAndThrow;
-import org.apache.solr.core.SolrCore;
-import org.apache.lucene.analysis.util.ResourceLoader;
-import org.apache.solr.util.SystemIdResolver;
-import org.apache.solr.common.util.XMLErrorLogger;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.commons.io.IOUtils;
-
-import javax.xml.transform.TransformerException;
-import javax.xml.transform.TransformerFactory;
-import javax.xml.transform.stream.StreamResult;
-import javax.xml.transform.stream.StreamSource;
-import java.io.CharArrayReader;
-import java.io.CharArrayWriter;
-import java.io.Reader;
-import java.lang.invoke.MethodHandles;
-import java.util.*;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * <p> An implementation of {@link EntityProcessor} which uses a streaming xpath parser to extract values out of XML documents.
- * It is typically used in conjunction with {@link URLDataSource} or {@link FileDataSource}. </p> <p> Refer to <a
- * href="http://wiki.apache.org/solr/DataImportHandler">http://wiki.apache.org/solr/DataImportHandler</a> for more
- * details. </p>
- * <p>
- * <b>This API is experimental and may change in the future.</b>
- *
- *
- * @see XPathRecordReader
- * @since solr 1.3
- */
-public class XPathEntityProcessor extends EntityProcessorBase {
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private static final XMLErrorLogger xmllog = new XMLErrorLogger(log);
-
- private static final Map<String, Object> END_MARKER = new HashMap<>();
-
- protected List<String> placeHolderVariables;
-
- protected List<String> commonFields;
-
- private String pk;
-
- private XPathRecordReader xpathReader;
-
- protected DataSource<Reader> dataSource;
-
- protected javax.xml.transform.Transformer xslTransformer;
-
- protected boolean useSolrAddXml = false;
-
- protected boolean streamRows = false;
-
- // Amount of time to block reading/writing to queue when streaming
- protected int blockingQueueTimeOut = 10;
-
- // Units for pumpTimeOut
- protected TimeUnit blockingQueueTimeOutUnits = TimeUnit.SECONDS;
-
- // Number of rows to queue for asynchronous processing
- protected int blockingQueueSize = 1000;
-
- protected Thread publisherThread;
-
- protected boolean reinitXPathReader = true;
-
- @Override
- @SuppressWarnings("unchecked")
- public void init(Context context) {
- super.init(context);
- if (reinitXPathReader)
- initXpathReader(context.getVariableResolver());
- pk = context.getEntityAttribute("pk");
- dataSource = context.getDataSource();
- rowIterator = null;
-
- }
-
- private void initXpathReader(VariableResolver resolver) {
- reinitXPathReader = false;
- useSolrAddXml = Boolean.parseBoolean(context
- .getEntityAttribute(USE_SOLR_ADD_SCHEMA));
- streamRows = Boolean.parseBoolean(context
- .getEntityAttribute(STREAM));
- if (context.getResolvedEntityAttribute("batchSize") != null) {
- blockingQueueSize = Integer.parseInt(context.getEntityAttribute("batchSize"));
- }
- if (context.getResolvedEntityAttribute("readTimeOut") != null) {
- blockingQueueTimeOut = Integer.parseInt(context.getEntityAttribute("readTimeOut"));
- }
- String xslt = context.getEntityAttribute(XSL);
- if (xslt != null) {
- xslt = context.replaceTokens(xslt);
- try {
- // create an instance of TransformerFactory
- TransformerFactory transFact = TransformerFactory.newInstance();
- final SolrCore core = context.getSolrCore();
- final StreamSource xsltSource;
- if (core != null) {
- final ResourceLoader loader = core.getResourceLoader();
- transFact.setURIResolver(new SystemIdResolver(loader).asURIResolver());
- xsltSource = new StreamSource(loader.openResource(xslt),
- SystemIdResolver.createSystemIdFromResourceName(xslt));
- } else {
- // fallback for tests
- xsltSource = new StreamSource(xslt);
- }
- transFact.setErrorListener(xmllog);
- try {
- xslTransformer = transFact.newTransformer(xsltSource);
- } finally {
- // some XML parsers are broken and don't close the byte stream (but they should according to spec)
- IOUtils.closeQuietly(xsltSource.getInputStream());
- }
- log.info("Using xslTransformer: "
- + xslTransformer.getClass().getName());
- } catch (Exception e) {
- throw new DataImportHandlerException(SEVERE,
- "Error initializing XSL ", e);
- }
- }
-
- if (useSolrAddXml) {
- // Support solr add documents
- xpathReader = new XPathRecordReader("/add/doc");
- xpathReader.addField("name", "/add/doc/field/@name", true);
- xpathReader.addField("value", "/add/doc/field", true);
- } else {
- String forEachXpath = context.getResolvedEntityAttribute(FOR_EACH);
- if (forEachXpath == null)
- throw new DataImportHandlerException(SEVERE,
- "Entity : " + context.getEntityAttribute("name")
- + " must have a 'forEach' attribute");
- if (forEachXpath.equals(context.getEntityAttribute(FOR_EACH))) reinitXPathReader = true;
-
- try {
- xpathReader = new XPathRecordReader(forEachXpath);
- for (Map<String, String> field : context.getAllEntityFields()) {
- if (field.get(XPATH) == null)
- continue;
- int flags = 0;
- if ("true".equals(field.get("flatten"))) {
- flags = XPathRecordReader.FLATTEN;
- }
- String xpath = field.get(XPATH);
- xpath = context.replaceTokens(xpath);
- //!xpath.equals(field.get(XPATH) means the field xpath has a template
- //in that case ensure that the XPathRecordReader is reinitialized
- //for each xml
- if (!xpath.equals(field.get(XPATH)) && !context.isRootEntity()) reinitXPathReader = true;
- xpathReader.addField(field.get(DataImporter.COLUMN),
- xpath,
- Boolean.parseBoolean(field.get(DataImporter.MULTI_VALUED)),
- flags);
- }
- } catch (RuntimeException e) {
- throw new DataImportHandlerException(SEVERE,
- "Exception while reading xpaths for fields", e);
- }
- }
- String url = context.getEntityAttribute(URL);
- List<String> l = url == null ? Collections.EMPTY_LIST : resolver.getVariables(url);
- for (String s : l) {
- if (s.startsWith(entityName + ".")) {
- if (placeHolderVariables == null)
- placeHolderVariables = new ArrayList<>();
- placeHolderVariables.add(s.substring(entityName.length() + 1));
- }
- }
- for (Map<String, String> fld : context.getAllEntityFields()) {
- if (fld.get(COMMON_FIELD) != null && "true".equals(fld.get(COMMON_FIELD))) {
- if (commonFields == null)
- commonFields = new ArrayList<>();
- commonFields.add(fld.get(DataImporter.COLUMN));
- }
- }
-
- }
-
- @Override
- public Map<String, Object> nextRow() {
- Map<String, Object> result;
-
- if (!context.isRootEntity())
- return fetchNextRow();
-
- while (true) {
- result = fetchNextRow();
-
- if (result == null)
- return null;
-
- if (pk == null || result.get(pk) != null)
- return result;
- }
- }
-
- @Override
- public void postTransform(Map<String, Object> r) {
- readUsefulVars(r);
- }
-
- @SuppressWarnings("unchecked")
- private Map<String, Object> fetchNextRow() {
- Map<String, Object> r = null;
- while (true) {
- if (rowIterator == null)
- initQuery(context.replaceTokens(context.getEntityAttribute(URL)));
- r = getNext();
- if (r == null) {
- Object hasMore = context.getSessionAttribute(HAS_MORE, Context.SCOPE_ENTITY);
- try {
- if ("true".equals(hasMore) || Boolean.TRUE.equals(hasMore)) {
- String url = (String) context.getSessionAttribute(NEXT_URL, Context.SCOPE_ENTITY);
- if (url == null)
- url = context.getEntityAttribute(URL);
- addNamespace();
- initQuery(context.replaceTokens(url));
- r = getNext();
- if (r == null)
- return null;
- } else {
- return null;
- }
- } finally {
- context.setSessionAttribute(HAS_MORE,null,Context.SCOPE_ENTITY);
- context.setSessionAttribute(NEXT_URL,null,Context.SCOPE_ENTITY);
- }
- }
- addCommonFields(r);
- return r;
- }
- }
-
- private void addNamespace() {
- Map<String, Object> namespace = new HashMap<>();
- Set<String> allNames = new HashSet<>();
- if (commonFields != null) allNames.addAll(commonFields);
- if (placeHolderVariables != null) allNames.addAll(placeHolderVariables);
- if(allNames.isEmpty()) return;
-
- for (String name : allNames) {
- Object val = context.getSessionAttribute(name, Context.SCOPE_ENTITY);
- if (val != null) namespace.put(name, val);
- }
- ((VariableResolver)context.getVariableResolver()).addNamespace(entityName, namespace);
- }
-
- private void addCommonFields(Map<String, Object> r) {
- if(commonFields != null){
- for (String commonField : commonFields) {
- if(r.get(commonField) == null) {
- Object val = context.getSessionAttribute(commonField, Context.SCOPE_ENTITY);
- if(val != null) r.put(commonField, val);
- }
-
- }
- }
-
- }
-
- private void initQuery(String s) {
- Reader data = null;
- try {
- final List<Map<String, Object>> rows = new ArrayList<>();
- try {
- data = dataSource.getData(s);
- } catch (Exception e) {
- if (ABORT.equals(onError)) {
- wrapAndThrow(SEVERE, e);
- } else if (SKIP.equals(onError)) {
- if (log.isDebugEnabled()) log.debug("Skipping url : " + s, e);
- wrapAndThrow(DataImportHandlerException.SKIP, e);
- } else {
- log.warn("Failed for url : " + s, e);
- rowIterator = Collections.EMPTY_LIST.iterator();
- return;
- }
- }
- if (xslTransformer != null) {
- try {
- SimpleCharArrayReader caw = new SimpleCharArrayReader();
- xslTransformer.transform(new StreamSource(data),
- new StreamResult(caw));
- data = caw.getReader();
- } catch (TransformerException e) {
- if (ABORT.equals(onError)) {
- wrapAndThrow(SEVERE, e, "Exception in applying XSL Transformation");
- } else if (SKIP.equals(onError)) {
- wrapAndThrow(DataImportHandlerException.SKIP, e);
- } else {
- log.warn("Failed for url : " + s, e);
- rowIterator = Collections.EMPTY_LIST.iterator();
- return;
- }
- }
- }
- if (streamRows) {
- rowIterator = getRowIterator(data, s);
- } else {
- try {
- xpathReader.streamRecords(data, (record, xpath) -> rows.add(readRow(record, xpath)));
- } catch (Exception e) {
- String msg = "Parsing failed for xml, url:" + s + " rows processed:" + rows.size();
- if (rows.size() > 0) msg += " last row: " + rows.get(rows.size() - 1);
- if (ABORT.equals(onError)) {
- wrapAndThrow(SEVERE, e, msg);
- } else if (SKIP.equals(onError)) {
- log.warn(msg, e);
- Map<String, Object> map = new HashMap<>();
- map.put(DocBuilder.SKIP_DOC, Boolean.TRUE);
- rows.add(map);
- } else if (CONTINUE.equals(onError)) {
- log.warn(msg, e);
- }
- }
- rowIterator = rows.iterator();
- }
- } finally {
- if (!streamRows) {
- closeIt(data);
- }
-
- }
- }
-
- private void closeIt(Reader data) {
- try {
- data.close();
- } catch (Exception e) { /* Ignore */
- }
- }
-
- protected Map<String, Object> readRow(Map<String, Object> record, String xpath) {
- if (useSolrAddXml) {
- List<String> names = (List<String>) record.get("name");
- List<String> values = (List<String>) record.get("value");
- Map<String, Object> row = new HashMap<>();
- for (int i = 0; i < names.size() && i < values.size(); i++) {
- if (row.containsKey(names.get(i))) {
- Object existing = row.get(names.get(i));
- if (existing instanceof List) {
- List list = (List) existing;
- list.add(values.get(i));
- } else {
- List list = new ArrayList();
- list.add(existing);
- list.add(values.get(i));
- row.put(names.get(i), list);
- }
- } else {
- row.put(names.get(i), values.get(i));
- }
- }
- return row;
- } else {
- record.put(XPATH_FIELD_NAME, xpath);
- return record;
- }
- }
-
-
- private static class SimpleCharArrayReader extends CharArrayWriter {
- public Reader getReader() {
- return new CharArrayReader(super.buf, 0, super.count);
- }
-
- }
-
- @SuppressWarnings("unchecked")
- private Map<String, Object> readUsefulVars(Map<String, Object> r) {
- Object val = r.get(HAS_MORE);
- if (val != null)
- context.setSessionAttribute(HAS_MORE, val,Context.SCOPE_ENTITY);
- val = r.get(NEXT_URL);
- if (val != null)
- context.setSessionAttribute(NEXT_URL, val,Context.SCOPE_ENTITY);
- if (placeHolderVariables != null) {
- for (String s : placeHolderVariables) {
- val = r.get(s);
- context.setSessionAttribute(s, val,Context.SCOPE_ENTITY);
- }
- }
- if (commonFields != null) {
- for (String s : commonFields) {
- Object commonVal = r.get(s);
- if (commonVal != null) {
- context.setSessionAttribute(s, commonVal,Context.SCOPE_ENTITY);
- }
- }
- }
- return r;
-
- }
-
- private Iterator<Map<String, Object>> getRowIterator(final Reader data, final String s) {
- //nothing atomic about it. I just needed a StongReference
- final AtomicReference<Exception> exp = new AtomicReference<>();
- final BlockingQueue<Map<String, Object>> blockingQueue = new ArrayBlockingQueue<>(blockingQueueSize);
- final AtomicBoolean isEnd = new AtomicBoolean(false);
- final AtomicBoolean throwExp = new AtomicBoolean(true);
- publisherThread = new Thread() {
- @Override
- public void run() {
- try {
- xpathReader.streamRecords(data, (record, xpath) -> {
- if (isEnd.get()) {
- throwExp.set(false);
- //To end the streaming . otherwise the parsing will go on forever
- //though consumer has gone away
- throw new RuntimeException("BREAK");
- }
- Map<String, Object> row;
- try {
- row = readRow(record, xpath);
- } catch (Exception e) {
- isEnd.set(true);
- return;
- }
- offer(row);
- });
- } catch (Exception e) {
- if(throwExp.get()) exp.set(e);
- } finally {
- closeIt(data);
- if (!isEnd.get()) {
- offer(END_MARKER);
- }
- }
- }
-
- private void offer(Map<String, Object> row) {
- try {
- while (!blockingQueue.offer(row, blockingQueueTimeOut, blockingQueueTimeOutUnits)) {
- if (isEnd.get()) return;
- log.debug("Timeout elapsed writing records. Perhaps buffer size should be increased.");
- }
- } catch (InterruptedException e) {
- return;
- } finally {
- synchronized (this) {
- notifyAll();
- }
- }
- }
- };
-
- publisherThread.start();
-
- return new Iterator<Map<String, Object>>() {
- private Map<String, Object> lastRow;
- int count = 0;
-
- @Override
- public boolean hasNext() {
- return !isEnd.get();
- }
-
- @Override
- public Map<String, Object> next() {
- Map<String, Object> row;
-
- do {
- try {
- row = blockingQueue.poll(blockingQueueTimeOut, blockingQueueTimeOutUnits);
- if (row == null) {
- log.debug("Timeout elapsed reading records.");
- }
- } catch (InterruptedException e) {
- log.debug("Caught InterruptedException while waiting for row. Aborting.");
- isEnd.set(true);
- return null;
- }
- } while (row == null);
-
- if (row == END_MARKER) {
- isEnd.set(true);
- if (exp.get() != null) {
- String msg = "Parsing failed for xml, url:" + s + " rows processed in this xml:" + count;
- if (lastRow != null) msg += " last row in this xml:" + lastRow;
- if (ABORT.equals(onError)) {
- wrapAndThrow(SEVERE, exp.get(), msg);
- } else if (SKIP.equals(onError)) {
- wrapAndThrow(DataImportHandlerException.SKIP, exp.get());
- } else {
- log.warn(msg, exp.get());
- }
- }
- return null;
- }
- count++;
- return lastRow = row;
- }
-
- @Override
- public void remove() {
- /*no op*/
- }
- };
-
- }
-
-
- public static final String URL = "url";
-
- public static final String HAS_MORE = "$hasMore";
-
- public static final String NEXT_URL = "$nextUrl";
-
- public static final String XPATH_FIELD_NAME = "$forEach";
-
- public static final String FOR_EACH = "forEach";
-
- public static final String XPATH = "xpath";
-
- public static final String COMMON_FIELD = "commonField";
-
- public static final String USE_SOLR_ADD_SCHEMA = "useSolrAddSchema";
-
- public static final String XSL = "xsl";
-
- public static final String STREAM = "stream";
-
-}