You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2018/11/02 11:33:42 UTC
[21/25] lucene-solr:jira/gradle: Adding dataimporthandler-extras
module
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d7c03684/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DebugLogger.java
----------------------------------------------------------------------
diff --git a/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DebugLogger.java b/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DebugLogger.java
deleted file mode 100644
index 2fd9303..0000000
--- a/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DebugLogger.java
+++ /dev/null
@@ -1,285 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.handler.dataimport;
-import org.apache.solr.common.util.NamedList;
-
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.text.MessageFormat;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Stack;
-
-/**
- * <p>
- * Implements most of the interactive development functionality
- * </p>
- * <p/>
- * <p>
- * Refer to <a
- * href="http://wiki.apache.org/solr/DataImportHandler">http://wiki.apache.org/solr/DataImportHandler</a>
- * for more details.
- * </p>
- * <p/>
- * <b>This API is experimental and subject to change</b>
- *
- * @since solr 1.3
- */
-class DebugLogger {
- private Stack<DebugInfo> debugStack;
-
- NamedList output;
-// private final SolrWriter writer1;
-
- private static final String LINE = "---------------------------------------------";
-
- private MessageFormat fmt = new MessageFormat(
- "----------- row #{0}-------------", Locale.ROOT);
-
- boolean enabled = true;
-
- public DebugLogger() {
-// writer = solrWriter;
- output = new NamedList();
- debugStack = new Stack<DebugInfo>() {
-
- @Override
- public DebugInfo pop() {
- if (size() == 1)
- throw new DataImportHandlerException(
- DataImportHandlerException.SEVERE, "Stack is becoming empty");
- return super.pop();
- }
- };
- debugStack.push(new DebugInfo(null, DIHLogLevels.NONE, null));
- output = debugStack.peek().lst;
- }
-
- private DebugInfo peekStack() {
- return debugStack.isEmpty() ? null : debugStack.peek();
- }
-
- public void log(DIHLogLevels event, String name, Object row) {
- if (event == DIHLogLevels.DISABLE_LOGGING) {
- enabled = false;
- return;
- } else if (event == DIHLogLevels.ENABLE_LOGGING) {
- enabled = true;
- return;
- }
-
- if (!enabled && event != DIHLogLevels.START_ENTITY
- && event != DIHLogLevels.END_ENTITY) {
- return;
- }
-
- if (event == DIHLogLevels.START_DOC) {
- debugStack.push(new DebugInfo(null, DIHLogLevels.START_DOC, peekStack()));
- } else if (DIHLogLevels.START_ENTITY == event) {
- debugStack
- .push(new DebugInfo(name, DIHLogLevels.START_ENTITY, peekStack()));
- } else if (DIHLogLevels.ENTITY_OUT == event
- || DIHLogLevels.PRE_TRANSFORMER_ROW == event) {
- if (debugStack.peek().type == DIHLogLevels.START_ENTITY
- || debugStack.peek().type == DIHLogLevels.START_DOC) {
- debugStack.peek().lst.add(null, fmt.format(new Object[]{++debugStack
- .peek().rowCount}));
- addToNamedList(debugStack.peek().lst, row);
- debugStack.peek().lst.add(null, LINE);
- }
- } else if (event == DIHLogLevels.ROW_END) {
- popAllTransformers();
- } else if (DIHLogLevels.END_ENTITY == event) {
- while (debugStack.pop().type != DIHLogLevels.START_ENTITY)
- ;
- } else if (DIHLogLevels.END_DOC == event) {
- while (debugStack.pop().type != DIHLogLevels.START_DOC)
- ;
- } else if (event == DIHLogLevels.TRANSFORMER_EXCEPTION) {
- debugStack.push(new DebugInfo(name, event, peekStack()));
- debugStack.peek().lst.add("EXCEPTION",
- getStacktraceString((Exception) row));
- } else if (DIHLogLevels.TRANSFORMED_ROW == event) {
- debugStack.push(new DebugInfo(name, event, peekStack()));
- debugStack.peek().lst.add(null, LINE);
- addToNamedList(debugStack.peek().lst, row);
- debugStack.peek().lst.add(null, LINE);
- if (row instanceof DataImportHandlerException) {
- DataImportHandlerException dataImportHandlerException = (DataImportHandlerException) row;
- dataImportHandlerException.debugged = true;
- }
- } else if (DIHLogLevels.ENTITY_META == event) {
- popAllTransformers();
- debugStack.peek().lst.add(name, row);
- } else if (DIHLogLevels.ENTITY_EXCEPTION == event) {
- if (row instanceof DataImportHandlerException) {
- DataImportHandlerException dihe = (DataImportHandlerException) row;
- if (dihe.debugged)
- return;
- dihe.debugged = true;
- }
-
- popAllTransformers();
- debugStack.peek().lst.add("EXCEPTION",
- getStacktraceString((Exception) row));
- }
- }
-
- private void popAllTransformers() {
- while (true) {
- DIHLogLevels type = debugStack.peek().type;
- if (type == DIHLogLevels.START_DOC || type == DIHLogLevels.START_ENTITY)
- break;
- debugStack.pop();
- }
- }
-
- private void addToNamedList(NamedList nl, Object row) {
- if (row instanceof List) {
- List list = (List) row;
- NamedList l = new NamedList();
- nl.add(null, l);
- for (Object o : list) {
- Map<String, Object> map = (Map<String, Object>) o;
- for (Map.Entry<String, Object> entry : map.entrySet())
- nl.add(entry.getKey(), entry.getValue());
- }
- } else if (row instanceof Map) {
- Map<String, Object> map = (Map<String, Object>) row;
- for (Map.Entry<String, Object> entry : map.entrySet())
- nl.add(entry.getKey(), entry.getValue());
- }
- }
-
- DataSource wrapDs(final DataSource ds) {
- return new DataSource() {
- @Override
- public void init(Context context, Properties initProps) {
- ds.init(context, initProps);
- }
-
- @Override
- public void close() {
- ds.close();
- }
-
- @Override
- public Object getData(String query) {
- log(DIHLogLevels.ENTITY_META, "query", query);
- long start = System.nanoTime();
- try {
- return ds.getData(query);
- } catch (DataImportHandlerException de) {
- log(DIHLogLevels.ENTITY_EXCEPTION,
- null, de);
- throw de;
- } catch (Exception e) {
- log(DIHLogLevels.ENTITY_EXCEPTION,
- null, e);
- DataImportHandlerException de = new DataImportHandlerException(
- DataImportHandlerException.SEVERE, "", e);
- de.debugged = true;
- throw de;
- } finally {
- log(DIHLogLevels.ENTITY_META, "time-taken", DocBuilder
- .getTimeElapsedSince(start));
- }
- }
- };
- }
-
- Transformer wrapTransformer(final Transformer t) {
- return new Transformer() {
- @Override
- public Object transformRow(Map<String, Object> row, Context context) {
- log(DIHLogLevels.PRE_TRANSFORMER_ROW, null, row);
- String tName = getTransformerName(t);
- Object result = null;
- try {
- result = t.transformRow(row, context);
- log(DIHLogLevels.TRANSFORMED_ROW, tName, result);
- } catch (DataImportHandlerException de) {
- log(DIHLogLevels.TRANSFORMER_EXCEPTION, tName, de);
- de.debugged = true;
- throw de;
- } catch (Exception e) {
- log(DIHLogLevels.TRANSFORMER_EXCEPTION, tName, e);
- DataImportHandlerException de = new DataImportHandlerException(DataImportHandlerException.SEVERE, "", e);
- de.debugged = true;
- throw de;
- }
- return result;
- }
- };
- }
-
- public static String getStacktraceString(Exception e) {
- StringWriter sw = new StringWriter();
- e.printStackTrace(new PrintWriter(sw));
- return sw.toString();
- }
-
- static String getTransformerName(Transformer t) {
- Class transClass = t.getClass();
- if (t instanceof EntityProcessorWrapper.ReflectionTransformer) {
- return ((EntityProcessorWrapper.ReflectionTransformer) t).trans;
- }
- if (t instanceof ScriptTransformer) {
- ScriptTransformer scriptTransformer = (ScriptTransformer) t;
- return "script:" + scriptTransformer.getFunctionName();
- }
- if (transClass.getPackage().equals(DebugLogger.class.getPackage())) {
- return transClass.getSimpleName();
- } else {
- return transClass.getName();
- }
- }
-
- private static class DebugInfo {
- String name;
-
- int tCount, rowCount;
-
- NamedList lst;
-
- DIHLogLevels type;
-
- DebugInfo parent;
-
- public DebugInfo(String name, DIHLogLevels type, DebugInfo parent) {
- this.name = name;
- this.type = type;
- this.parent = parent;
- lst = new NamedList();
- if (parent != null) {
- String displayName = null;
- if (type == DIHLogLevels.START_ENTITY) {
- displayName = "entity:" + name;
- } else if (type == DIHLogLevels.TRANSFORMED_ROW
- || type == DIHLogLevels.TRANSFORMER_EXCEPTION) {
- displayName = "transformer:" + name;
- } else if (type == DIHLogLevels.START_DOC) {
- this.name = displayName = "document#" + SolrWriter.getDocCount();
- }
- parent.lst.add(displayName, lst);
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d7c03684/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DocBuilder.java
----------------------------------------------------------------------
diff --git a/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DocBuilder.java b/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DocBuilder.java
deleted file mode 100644
index 164cf70..0000000
--- a/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DocBuilder.java
+++ /dev/null
@@ -1,988 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.handler.dataimport;
-
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.SolrInputDocument;
-import org.apache.solr.core.SolrCore;
-import org.apache.solr.handler.dataimport.config.ConfigNameConstants;
-import org.apache.solr.handler.dataimport.config.DIHConfiguration;
-import org.apache.solr.handler.dataimport.config.Entity;
-import org.apache.solr.handler.dataimport.config.EntityField;
-
-import static org.apache.solr.handler.dataimport.SolrWriter.LAST_INDEX_KEY;
-import static org.apache.solr.handler.dataimport.DataImportHandlerException.SEVERE;
-import static org.apache.solr.handler.dataimport.DataImportHandlerException.wrapAndThrow;
-
-import org.apache.solr.schema.IndexSchema;
-import org.apache.solr.schema.SchemaField;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.lang.invoke.MethodHandles;
-import java.text.SimpleDateFormat;
-import java.util.*;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * <p> {@link DocBuilder} is responsible for creating Solr documents out of the given configuration. It also maintains
- * statistics information. It depends on the {@link EntityProcessor} implementations to fetch data. </p>
- * <p>
- * <b>This API is experimental and subject to change</b>
- *
- * @since solr 1.3
- */
-public class DocBuilder {
-
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private static final AtomicBoolean WARNED_ABOUT_INDEX_TIME_BOOSTS = new AtomicBoolean();
-
- private static final Date EPOCH = new Date(0);
- public static final String DELETE_DOC_BY_ID = "$deleteDocById";
- public static final String DELETE_DOC_BY_QUERY = "$deleteDocByQuery";
- public static final String DOC_BOOST = "$docBoost";
- public static final String SKIP_DOC = "$skipDoc";
- public static final String SKIP_ROW = "$skipRow";
-
- DataImporter dataImporter;
-
- private DIHConfiguration config;
-
- private EntityProcessorWrapper currentEntityProcessorWrapper;
-
- @SuppressWarnings("unchecked")
- private Map statusMessages = Collections.synchronizedMap(new LinkedHashMap());
-
- public Statistics importStatistics = new Statistics();
-
- DIHWriter writer;
-
- boolean verboseDebug = false;
-
- Map<String, Object> session = new HashMap<>();
-
- static final ThreadLocal<DocBuilder> INSTANCE = new ThreadLocal<>();
- private Map<String, Object> persistedProperties;
-
- private DIHProperties propWriter;
- private DebugLogger debugLogger;
- private final RequestInfo reqParams;
-
- public DocBuilder(DataImporter dataImporter, DIHWriter solrWriter, DIHProperties propWriter, RequestInfo reqParams) {
- INSTANCE.set(this);
- this.dataImporter = dataImporter;
- this.reqParams = reqParams;
- this.propWriter = propWriter;
- DataImporter.QUERY_COUNT.set(importStatistics.queryCount);
- verboseDebug = reqParams.isDebug() && reqParams.getDebugInfo().verbose;
- persistedProperties = propWriter.readIndexerProperties();
-
- writer = solrWriter;
- ContextImpl ctx = new ContextImpl(null, null, null, null, reqParams.getRawParams(), null, this);
- if (writer != null) {
- writer.init(ctx);
- }
- }
-
-
- DebugLogger getDebugLogger(){
- if (debugLogger == null) {
- debugLogger = new DebugLogger();
- }
- return debugLogger;
- }
-
- private VariableResolver getVariableResolver() {
- try {
- VariableResolver resolver = null;
- String epoch = propWriter.convertDateToString(EPOCH);
- if(dataImporter != null && dataImporter.getCore() != null
- && dataImporter.getCore().getResourceLoader().getCoreProperties() != null){
- resolver = new VariableResolver(dataImporter.getCore().getResourceLoader().getCoreProperties());
- } else {
- resolver = new VariableResolver();
- }
- resolver.setEvaluators(dataImporter.getEvaluators());
- Map<String, Object> indexerNamespace = new HashMap<>();
- if (persistedProperties.get(LAST_INDEX_TIME) != null) {
- indexerNamespace.put(LAST_INDEX_TIME, persistedProperties.get(LAST_INDEX_TIME));
- } else {
- // set epoch
- indexerNamespace.put(LAST_INDEX_TIME, epoch);
- }
- indexerNamespace.put(INDEX_START_TIME, dataImporter.getIndexStartTime());
- indexerNamespace.put("request", new HashMap<>(reqParams.getRawParams()));
- indexerNamespace.put("handlerName", dataImporter.getHandlerName());
- for (Entity entity : dataImporter.getConfig().getEntities()) {
- Map<String, Object> entityNamespace = new HashMap<>();
- String key = SolrWriter.LAST_INDEX_KEY;
- Object lastIndex = persistedProperties.get(entity.getName() + "." + key);
- if (lastIndex != null) {
- entityNamespace.put(SolrWriter.LAST_INDEX_KEY, lastIndex);
- } else {
- entityNamespace.put(SolrWriter.LAST_INDEX_KEY, epoch);
- }
- indexerNamespace.put(entity.getName(), entityNamespace);
- }
- resolver.addNamespace(ConfigNameConstants.IMPORTER_NS_SHORT, indexerNamespace);
- resolver.addNamespace(ConfigNameConstants.IMPORTER_NS, indexerNamespace);
- return resolver;
- } catch (Exception e) {
- wrapAndThrow(SEVERE, e);
- // unreachable statement
- return null;
- }
- }
-
- private void invokeEventListener(String className) {
- invokeEventListener(className, null);
- }
-
-
- private void invokeEventListener(String className, Exception lastException) {
- try {
- EventListener listener = (EventListener) loadClass(className, dataImporter.getCore()).newInstance();
- notifyListener(listener, lastException);
- } catch (Exception e) {
- wrapAndThrow(SEVERE, e, "Unable to load class : " + className);
- }
- }
-
- private void notifyListener(EventListener listener, Exception lastException) {
- String currentProcess;
- if (dataImporter.getStatus() == DataImporter.Status.RUNNING_DELTA_DUMP) {
- currentProcess = Context.DELTA_DUMP;
- } else {
- currentProcess = Context.FULL_DUMP;
- }
- ContextImpl ctx = new ContextImpl(null, getVariableResolver(), null, currentProcess, session, null, this);
- ctx.setLastException(lastException);
- listener.onEvent(ctx);
- }
-
- @SuppressWarnings("unchecked")
- public void execute() {
- List<EntityProcessorWrapper> epwList = null;
- try {
- dataImporter.store(DataImporter.STATUS_MSGS, statusMessages);
- config = dataImporter.getConfig();
- final AtomicLong startTime = new AtomicLong(System.nanoTime());
- statusMessages.put(TIME_ELAPSED, new Object() {
- @Override
- public String toString() {
- return getTimeElapsedSince(startTime.get());
- }
- });
-
- statusMessages.put(DataImporter.MSG.TOTAL_QUERIES_EXECUTED,
- importStatistics.queryCount);
- statusMessages.put(DataImporter.MSG.TOTAL_ROWS_EXECUTED,
- importStatistics.rowsCount);
- statusMessages.put(DataImporter.MSG.TOTAL_DOC_PROCESSED,
- importStatistics.docCount);
- statusMessages.put(DataImporter.MSG.TOTAL_DOCS_SKIPPED,
- importStatistics.skipDocCount);
-
- List<String> entities = reqParams.getEntitiesToRun();
-
- // Trigger onImportStart
- if (config.getOnImportStart() != null) {
- invokeEventListener(config.getOnImportStart());
- }
- AtomicBoolean fullCleanDone = new AtomicBoolean(false);
- //we must not do a delete of *:* multiple times if there are multiple root entities to be run
- Map<String,Object> lastIndexTimeProps = new HashMap<>();
- lastIndexTimeProps.put(LAST_INDEX_KEY, dataImporter.getIndexStartTime());
-
- epwList = new ArrayList<>(config.getEntities().size());
- for (Entity e : config.getEntities()) {
- epwList.add(getEntityProcessorWrapper(e));
- }
- for (EntityProcessorWrapper epw : epwList) {
- if (entities != null && !entities.contains(epw.getEntity().getName()))
- continue;
- lastIndexTimeProps.put(epw.getEntity().getName() + "." + LAST_INDEX_KEY, propWriter.getCurrentTimestamp());
- currentEntityProcessorWrapper = epw;
- String delQuery = epw.getEntity().getAllAttributes().get("preImportDeleteQuery");
- if (dataImporter.getStatus() == DataImporter.Status.RUNNING_DELTA_DUMP) {
- cleanByQuery(delQuery, fullCleanDone);
- doDelta();
- delQuery = epw.getEntity().getAllAttributes().get("postImportDeleteQuery");
- if (delQuery != null) {
- fullCleanDone.set(false);
- cleanByQuery(delQuery, fullCleanDone);
- }
- } else {
- cleanByQuery(delQuery, fullCleanDone);
- doFullDump();
- delQuery = epw.getEntity().getAllAttributes().get("postImportDeleteQuery");
- if (delQuery != null) {
- fullCleanDone.set(false);
- cleanByQuery(delQuery, fullCleanDone);
- }
- }
- }
-
- if (stop.get()) {
- // Dont commit if aborted using command=abort
- statusMessages.put("Aborted", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.ROOT).format(new Date()));
- handleError("Aborted", null);
- } else {
- // Do not commit unnecessarily if this is a delta-import and no documents were created or deleted
- if (!reqParams.isClean()) {
- if (importStatistics.docCount.get() > 0 || importStatistics.deletedDocCount.get() > 0) {
- finish(lastIndexTimeProps);
- }
- } else {
- // Finished operation normally, commit now
- finish(lastIndexTimeProps);
- }
-
- if (config.getOnImportEnd() != null) {
- invokeEventListener(config.getOnImportEnd());
- }
- }
-
- statusMessages.remove(TIME_ELAPSED);
- statusMessages.put(DataImporter.MSG.TOTAL_DOC_PROCESSED, ""+ importStatistics.docCount.get());
- if(importStatistics.failedDocCount.get() > 0)
- statusMessages.put(DataImporter.MSG.TOTAL_FAILED_DOCS, ""+ importStatistics.failedDocCount.get());
-
- statusMessages.put("Time taken", getTimeElapsedSince(startTime.get()));
- log.info("Time taken = " + getTimeElapsedSince(startTime.get()));
- } catch(Exception e)
- {
- throw new RuntimeException(e);
- } finally
- {
- if (writer != null) {
- writer.close();
- }
- if (epwList != null) {
- closeEntityProcessorWrappers(epwList);
- }
- if(reqParams.isDebug()) {
- reqParams.getDebugInfo().debugVerboseOutput = getDebugLogger().output;
- }
- }
- }
- private void closeEntityProcessorWrappers(List<EntityProcessorWrapper> epwList) {
- for(EntityProcessorWrapper epw : epwList) {
- epw.close();
- if(epw.getDatasource()!=null) {
- epw.getDatasource().close();
- }
- closeEntityProcessorWrappers(epw.getChildren());
- }
- }
-
- @SuppressWarnings("unchecked")
- private void finish(Map<String,Object> lastIndexTimeProps) {
- log.info("Import completed successfully");
- statusMessages.put("", "Indexing completed. Added/Updated: "
- + importStatistics.docCount + " documents. Deleted "
- + importStatistics.deletedDocCount + " documents.");
- if(reqParams.isCommit()) {
- writer.commit(reqParams.isOptimize());
- addStatusMessage("Committed");
- if (reqParams.isOptimize())
- addStatusMessage("Optimized");
- }
- try {
- propWriter.persist(lastIndexTimeProps);
- } catch (Exception e) {
- log.error("Could not write property file", e);
- statusMessages.put("error", "Could not write property file. Delta imports will not work. " +
- "Make sure your conf directory is writable");
- }
- }
-
- void handleError(String message, Exception e) {
- if (!dataImporter.getCore().getCoreContainer().isZooKeeperAware()) {
- writer.rollback();
- }
-
- statusMessages.put(message, "Indexing error");
- addStatusMessage(message);
- if ((config != null) && (config.getOnError() != null)) {
- invokeEventListener(config.getOnError(), e);
- }
- }
-
- private void doFullDump() {
- addStatusMessage("Full Dump Started");
- buildDocument(getVariableResolver(), null, null, currentEntityProcessorWrapper, true, null);
- }
-
- @SuppressWarnings("unchecked")
- private void doDelta() {
- addStatusMessage("Delta Dump started");
- VariableResolver resolver = getVariableResolver();
-
- if (config.getDeleteQuery() != null) {
- writer.deleteByQuery(config.getDeleteQuery());
- }
-
- addStatusMessage("Identifying Delta");
- log.info("Starting delta collection.");
- Set<Map<String, Object>> deletedKeys = new HashSet<>();
- Set<Map<String, Object>> allPks = collectDelta(currentEntityProcessorWrapper, resolver, deletedKeys);
- if (stop.get())
- return;
- addStatusMessage("Deltas Obtained");
- addStatusMessage("Building documents");
- if (!deletedKeys.isEmpty()) {
- allPks.removeAll(deletedKeys);
- deleteAll(deletedKeys);
- // Make sure that documents are not re-created
- }
- deletedKeys = null;
- writer.setDeltaKeys(allPks);
-
- statusMessages.put("Total Changed Documents", allPks.size());
- VariableResolver vri = getVariableResolver();
- Iterator<Map<String, Object>> pkIter = allPks.iterator();
- while (pkIter.hasNext()) {
- Map<String, Object> map = pkIter.next();
- vri.addNamespace(ConfigNameConstants.IMPORTER_NS_SHORT + ".delta", map);
- buildDocument(vri, null, map, currentEntityProcessorWrapper, true, null);
- pkIter.remove();
- // check for abort
- if (stop.get())
- break;
- }
-
- if (!stop.get()) {
- log.info("Delta Import completed successfully");
- }
- }
-
- private void deleteAll(Set<Map<String, Object>> deletedKeys) {
- log.info("Deleting stale documents ");
- Iterator<Map<String, Object>> iter = deletedKeys.iterator();
- while (iter.hasNext()) {
- Map<String, Object> map = iter.next();
- String keyName = currentEntityProcessorWrapper.getEntity().isDocRoot() ? currentEntityProcessorWrapper.getEntity().getPk() : currentEntityProcessorWrapper.getEntity().getSchemaPk();
- Object key = map.get(keyName);
- if(key == null) {
- keyName = findMatchingPkColumn(keyName, map);
- key = map.get(keyName);
- }
- if(key == null) {
- log.warn("no key was available for deleted pk query. keyName = " + keyName);
- continue;
- }
- writer.deleteDoc(key);
- importStatistics.deletedDocCount.incrementAndGet();
- iter.remove();
- }
- }
-
- @SuppressWarnings("unchecked")
- public void addStatusMessage(String msg) {
- statusMessages.put(msg, new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.ROOT).format(new Date()));
- }
-
- private void resetEntity(EntityProcessorWrapper epw) {
- epw.setInitialized(false);
- for (EntityProcessorWrapper child : epw.getChildren()) {
- resetEntity(child);
- }
-
- }
-
- private void buildDocument(VariableResolver vr, DocWrapper doc,
- Map<String,Object> pk, EntityProcessorWrapper epw, boolean isRoot,
- ContextImpl parentCtx) {
- List<EntityProcessorWrapper> entitiesToDestroy = new ArrayList<>();
- try {
- buildDocument(vr, doc, pk, epw, isRoot, parentCtx, entitiesToDestroy);
- } catch (Exception e) {
- throw new RuntimeException(e);
- } finally {
- for (EntityProcessorWrapper entityWrapper : entitiesToDestroy) {
- entityWrapper.destroy();
- }
- resetEntity(epw);
- }
- }
-
- @SuppressWarnings("unchecked")
- private void buildDocument(VariableResolver vr, DocWrapper doc,
- Map<String, Object> pk, EntityProcessorWrapper epw, boolean isRoot,
- ContextImpl parentCtx, List<EntityProcessorWrapper> entitiesToDestroy) {
-
- ContextImpl ctx = new ContextImpl(epw, vr, null,
- pk == null ? Context.FULL_DUMP : Context.DELTA_DUMP,
- session, parentCtx, this);
- epw.init(ctx);
- if (!epw.isInitialized()) {
- entitiesToDestroy.add(epw);
- epw.setInitialized(true);
- }
-
- if (reqParams.getStart() > 0) {
- getDebugLogger().log(DIHLogLevels.DISABLE_LOGGING, null, null);
- }
-
- if (verboseDebug) {
- getDebugLogger().log(DIHLogLevels.START_ENTITY, epw.getEntity().getName(), null);
- }
-
- int seenDocCount = 0;
-
- try {
- while (true) {
- if (stop.get())
- return;
- if(importStatistics.docCount.get() > (reqParams.getStart() + reqParams.getRows())) break;
- try {
- seenDocCount++;
-
- if (seenDocCount > reqParams.getStart()) {
- getDebugLogger().log(DIHLogLevels.ENABLE_LOGGING, null, null);
- }
-
- if (verboseDebug && epw.getEntity().isDocRoot()) {
- getDebugLogger().log(DIHLogLevels.START_DOC, epw.getEntity().getName(), null);
- }
- if (doc == null && epw.getEntity().isDocRoot()) {
- doc = new DocWrapper();
- ctx.setDoc(doc);
- Entity e = epw.getEntity();
- while (e.getParentEntity() != null) {
- addFields(e.getParentEntity(), doc, (Map<String, Object>) vr
- .resolve(e.getParentEntity().getName()), vr);
- e = e.getParentEntity();
- }
- }
-
- Map<String, Object> arow = epw.nextRow();
- if (arow == null) {
- break;
- }
-
- // Support for start parameter in debug mode
- if (epw.getEntity().isDocRoot()) {
- if (seenDocCount <= reqParams.getStart())
- continue;
- if (seenDocCount > reqParams.getStart() + reqParams.getRows()) {
- log.info("Indexing stopped at docCount = " + importStatistics.docCount);
- break;
- }
- }
-
- if (verboseDebug) {
- getDebugLogger().log(DIHLogLevels.ENTITY_OUT, epw.getEntity().getName(), arow);
- }
- importStatistics.rowsCount.incrementAndGet();
-
- DocWrapper childDoc = null;
- if (doc != null) {
- if (epw.getEntity().isChild()) {
- childDoc = new DocWrapper();
- handleSpecialCommands(arow, childDoc);
- addFields(epw.getEntity(), childDoc, arow, vr);
- doc.addChildDocument(childDoc);
- } else {
- handleSpecialCommands(arow, doc);
- vr.addNamespace(epw.getEntity().getName(), arow);
- addFields(epw.getEntity(), doc, arow, vr);
- vr.removeNamespace(epw.getEntity().getName());
- }
- }
- if (epw.getEntity().getChildren() != null) {
- vr.addNamespace(epw.getEntity().getName(), arow);
- for (EntityProcessorWrapper child : epw.getChildren()) {
- if (childDoc != null) {
- buildDocument(vr, childDoc,
- child.getEntity().isDocRoot() ? pk : null, child, false, ctx, entitiesToDestroy);
- } else {
- buildDocument(vr, doc,
- child.getEntity().isDocRoot() ? pk : null, child, false, ctx, entitiesToDestroy);
- }
- }
- vr.removeNamespace(epw.getEntity().getName());
- }
- if (epw.getEntity().isDocRoot()) {
- if (stop.get())
- return;
- if (!doc.isEmpty()) {
- boolean result = writer.upload(doc);
- if(reqParams.isDebug()) {
- reqParams.getDebugInfo().debugDocuments.add(doc);
- }
- doc = null;
- if (result){
- importStatistics.docCount.incrementAndGet();
- } else {
- importStatistics.failedDocCount.incrementAndGet();
- }
- }
- }
- } catch (DataImportHandlerException e) {
- if (verboseDebug) {
- getDebugLogger().log(DIHLogLevels.ENTITY_EXCEPTION, epw.getEntity().getName(), e);
- }
- if(e.getErrCode() == DataImportHandlerException.SKIP_ROW){
- continue;
- }
- if (isRoot) {
- if (e.getErrCode() == DataImportHandlerException.SKIP) {
- importStatistics.skipDocCount.getAndIncrement();
- doc = null;
- } else {
- SolrException.log(log, "Exception while processing: "
- + epw.getEntity().getName() + " document : " + doc, e);
- }
- if (e.getErrCode() == DataImportHandlerException.SEVERE)
- throw e;
- } else
- throw e;
- } catch (Exception t) {
- if (verboseDebug) {
- getDebugLogger().log(DIHLogLevels.ENTITY_EXCEPTION, epw.getEntity().getName(), t);
- }
- throw new DataImportHandlerException(DataImportHandlerException.SEVERE, t);
- } finally {
- if (verboseDebug) {
- getDebugLogger().log(DIHLogLevels.ROW_END, epw.getEntity().getName(), null);
- if (epw.getEntity().isDocRoot())
- getDebugLogger().log(DIHLogLevels.END_DOC, null, null);
- }
- }
- }
- } finally {
- if (verboseDebug) {
- getDebugLogger().log(DIHLogLevels.END_ENTITY, null, null);
- }
- }
- }
-
- static class DocWrapper extends SolrInputDocument {
- //final SolrInputDocument solrDocument = new SolrInputDocument();
- Map<String ,Object> session;
-
- public void setSessionAttribute(String key, Object val){
- if(session == null) session = new HashMap<>();
- session.put(key, val);
- }
-
- public Object getSessionAttribute(String key) {
- return session == null ? null : session.get(key);
- }
- }
-
- private void handleSpecialCommands(Map<String, Object> arow, DocWrapper doc) {
- Object value = arow.get(DELETE_DOC_BY_ID);
- if (value != null) {
- if (value instanceof Collection) {
- Collection collection = (Collection) value;
- for (Object o : collection) {
- writer.deleteDoc(o.toString());
- importStatistics.deletedDocCount.incrementAndGet();
- }
- } else {
- writer.deleteDoc(value);
- importStatistics.deletedDocCount.incrementAndGet();
- }
- }
- value = arow.get(DELETE_DOC_BY_QUERY);
- if (value != null) {
- if (value instanceof Collection) {
- Collection collection = (Collection) value;
- for (Object o : collection) {
- writer.deleteByQuery(o.toString());
- importStatistics.deletedDocCount.incrementAndGet();
- }
- } else {
- writer.deleteByQuery(value.toString());
- importStatistics.deletedDocCount.incrementAndGet();
- }
- }
- value = arow.get(DOC_BOOST);
- if (value != null) {
- String message = "Ignoring document boost: " + value + " as index-time boosts are not supported anymore";
- if (WARNED_ABOUT_INDEX_TIME_BOOSTS.compareAndSet(false, true)) {
- log.warn(message);
- } else {
- log.debug(message);
- }
- }
-
- value = arow.get(SKIP_DOC);
- if (value != null) {
- if (Boolean.parseBoolean(value.toString())) {
- throw new DataImportHandlerException(DataImportHandlerException.SKIP,
- "Document skipped :" + arow);
- }
- }
-
- value = arow.get(SKIP_ROW);
- if (value != null) {
- if (Boolean.parseBoolean(value.toString())) {
- throw new DataImportHandlerException(DataImportHandlerException.SKIP_ROW);
- }
- }
- }
-
- @SuppressWarnings("unchecked")
- private void addFields(Entity entity, DocWrapper doc,
- Map<String, Object> arow, VariableResolver vr) {
- for (Map.Entry<String, Object> entry : arow.entrySet()) {
- String key = entry.getKey();
- Object value = entry.getValue();
- if (value == null) continue;
- if (key.startsWith("$")) continue;
- Set<EntityField> field = entity.getColNameVsField().get(key);
- IndexSchema schema = null == reqParams.getRequest() ? null : reqParams.getRequest().getSchema();
- if (field == null && schema != null) {
- // This can be a dynamic field or a field which does not have an entry in data-config ( an implicit field)
- SchemaField sf = schema.getFieldOrNull(key);
- if (sf == null) {
- sf = config.getSchemaField(key);
- }
- if (sf != null) {
- addFieldToDoc(entry.getValue(), sf.getName(), sf.multiValued(), doc);
- }
- //else do nothing. if we add it it may fail
- } else {
- if (field != null) {
- for (EntityField f : field) {
- String name = f.getName();
- boolean multiValued = f.isMultiValued();
- boolean toWrite = f.isToWrite();
- if(f.isDynamicName()){
- name = vr.replaceTokens(name);
- SchemaField schemaField = config.getSchemaField(name);
- if(schemaField == null) {
- toWrite = false;
- } else {
- multiValued = schemaField.multiValued();
- toWrite = true;
- }
- }
- if (toWrite) {
- addFieldToDoc(entry.getValue(), name, multiValued, doc);
- }
- }
- }
- }
- }
- }
-
- private void addFieldToDoc(Object value, String name, boolean multiValued, DocWrapper doc) {
- if (value instanceof Collection) {
- Collection collection = (Collection) value;
- if (multiValued) {
- for (Object o : collection) {
- if (o != null)
- doc.addField(name, o);
- }
- } else {
- if (doc.getField(name) == null)
- for (Object o : collection) {
- if (o != null) {
- doc.addField(name, o);
- break;
- }
- }
- }
- } else if (multiValued) {
- if (value != null) {
- doc.addField(name, value);
- }
- } else {
- if (doc.getField(name) == null && value != null)
- doc.addField(name, value);
- }
- }
-
- public EntityProcessorWrapper getEntityProcessorWrapper(Entity entity) {
- EntityProcessor entityProcessor = null;
- if (entity.getProcessorName() == null) {
- entityProcessor = new SqlEntityProcessor();
- } else {
- try {
- entityProcessor = (EntityProcessor) loadClass(entity.getProcessorName(), dataImporter.getCore())
- .newInstance();
- } catch (Exception e) {
- wrapAndThrow (SEVERE,e,
- "Unable to load EntityProcessor implementation for entity:" + entity.getName());
- }
- }
- EntityProcessorWrapper epw = new EntityProcessorWrapper(entityProcessor, entity, this);
- for(Entity e1 : entity.getChildren()) {
- epw.getChildren().add(getEntityProcessorWrapper(e1));
- }
-
- return epw;
- }
-
- private String findMatchingPkColumn(String pk, Map<String, Object> row) {
- if (row.containsKey(pk)) {
- throw new IllegalArgumentException(String.format(Locale.ROOT,
- "deltaQuery returned a row with null for primary key %s", pk));
- }
- String resolvedPk = null;
- for (String columnName : row.keySet()) {
- if (columnName.endsWith("." + pk) || pk.endsWith("." + columnName)) {
- if (resolvedPk != null)
- throw new IllegalArgumentException(
- String.format(Locale.ROOT,
- "deltaQuery has more than one column (%s and %s) that might resolve to declared primary key pk='%s'",
- resolvedPk, columnName, pk));
- resolvedPk = columnName;
- }
- }
- if (resolvedPk == null) {
- throw new IllegalArgumentException(
- String
- .format(
- Locale.ROOT,
- "deltaQuery has no column to resolve to declared primary key pk='%s'",
- pk));
- }
- log.info(String.format(Locale.ROOT,
- "Resolving deltaQuery column '%s' to match entity's declared pk '%s'",
- resolvedPk, pk));
- return resolvedPk;
- }
-
- /**
- * <p> Collects unique keys of all Solr documents for whom one or more source tables have been changed since the last
- * indexed time. </p> <p> Note: In our definition, unique key of Solr document is the primary key of the top level
- * entity (unless skipped using docRoot=false) in the Solr document in data-config.xml </p>
- *
- * @return an iterator to the list of keys for which Solr documents should be updated.
- */
- @SuppressWarnings("unchecked")
- public Set<Map<String, Object>> collectDelta(EntityProcessorWrapper epw, VariableResolver resolver,
- Set<Map<String, Object>> deletedRows) {
- //someone called abort
- if (stop.get())
- return new HashSet();
-
- ContextImpl context1 = new ContextImpl(epw, resolver, null, Context.FIND_DELTA, session, null, this);
- epw.init(context1);
-
- Set<Map<String, Object>> myModifiedPks = new HashSet<>();
-
-
-
- for (EntityProcessorWrapper childEpw : epw.getChildren()) {
- //this ensures that we start from the leaf nodes
- myModifiedPks.addAll(collectDelta(childEpw, resolver, deletedRows));
- //someone called abort
- if (stop.get())
- return new HashSet();
- }
-
- // identifying the modified rows for this entity
- Map<String, Map<String, Object>> deltaSet = new HashMap<>();
- log.info("Running ModifiedRowKey() for Entity: " + epw.getEntity().getName());
- //get the modified rows in this entity
- String pk = epw.getEntity().getPk();
- while (true) {
- Map<String, Object> row = epw.nextModifiedRowKey();
-
- if (row == null)
- break;
-
- Object pkValue = row.get(pk);
- if (pkValue == null) {
- pk = findMatchingPkColumn(pk, row);
- pkValue = row.get(pk);
- }
-
- deltaSet.put(pkValue.toString(), row);
- importStatistics.rowsCount.incrementAndGet();
- // check for abort
- if (stop.get())
- return new HashSet();
- }
- //get the deleted rows for this entity
- Set<Map<String, Object>> deletedSet = new HashSet<>();
- while (true) {
- Map<String, Object> row = epw.nextDeletedRowKey();
- if (row == null)
- break;
-
- deletedSet.add(row);
-
- Object pkValue = row.get(pk);
- if (pkValue == null) {
- pk = findMatchingPkColumn(pk, row);
- pkValue = row.get(pk);
- }
-
- // Remove deleted rows from the delta rows
- String deletedRowPk = pkValue.toString();
- if (deltaSet.containsKey(deletedRowPk)) {
- deltaSet.remove(deletedRowPk);
- }
-
- importStatistics.rowsCount.incrementAndGet();
- // check for abort
- if (stop.get())
- return new HashSet();
- }
-
- log.info("Completed ModifiedRowKey for Entity: " + epw.getEntity().getName() + " rows obtained : " + deltaSet.size());
- log.info("Completed DeletedRowKey for Entity: " + epw.getEntity().getName() + " rows obtained : " + deletedSet.size());
-
- myModifiedPks.addAll(deltaSet.values());
- Set<Map<String, Object>> parentKeyList = new HashSet<>();
- //all that we have captured is useless (in a sub-entity) if no rows in the parent is modified because of these
- //propogate up the changes in the chain
- if (epw.getEntity().getParentEntity() != null) {
- // identifying deleted rows with deltas
-
- for (Map<String, Object> row : myModifiedPks) {
- resolver.addNamespace(epw.getEntity().getName(), row);
- getModifiedParentRows(resolver, epw.getEntity().getName(), epw, parentKeyList);
- // check for abort
- if (stop.get())
- return new HashSet();
- }
- // running the same for deletedrows
- for (Map<String, Object> row : deletedSet) {
- resolver.addNamespace(epw.getEntity().getName(), row);
- getModifiedParentRows(resolver, epw.getEntity().getName(), epw, parentKeyList);
- // check for abort
- if (stop.get())
- return new HashSet();
- }
- }
- log.info("Completed parentDeltaQuery for Entity: " + epw.getEntity().getName());
- if (epw.getEntity().isDocRoot())
- deletedRows.addAll(deletedSet);
-
- // Do not use entity.isDocRoot here because one of descendant entities may set rootEntity="true"
- return epw.getEntity().getParentEntity() == null ?
- myModifiedPks : new HashSet<>(parentKeyList);
- }
-
- private void getModifiedParentRows(VariableResolver resolver,
- String entity, EntityProcessor entityProcessor,
- Set<Map<String, Object>> parentKeyList) {
- try {
- while (true) {
- Map<String, Object> parentRow = entityProcessor
- .nextModifiedParentRowKey();
- if (parentRow == null)
- break;
-
- parentKeyList.add(parentRow);
- importStatistics.rowsCount.incrementAndGet();
- // check for abort
- if (stop.get())
- return;
- }
-
- } finally {
- resolver.removeNamespace(entity);
- }
- }
-
- public void abort() {
- stop.set(true);
- }
-
- private AtomicBoolean stop = new AtomicBoolean(false);
-
- public static final String TIME_ELAPSED = "Time Elapsed";
-
- static String getTimeElapsedSince(long l) {
- l = TimeUnit.MILLISECONDS.convert(System.nanoTime() - l, TimeUnit.NANOSECONDS);
- return (l / (60000 * 60)) + ":" + (l / 60000) % 60 + ":" + (l / 1000)
- % 60 + "." + l % 1000;
- }
-
- public RequestInfo getReqParams() {
- return reqParams;
- }
-
- @SuppressWarnings("unchecked")
- static Class loadClass(String name, SolrCore core) throws ClassNotFoundException {
- try {
- return core != null ?
- core.getResourceLoader().findClass(name, Object.class) :
- Class.forName(name);
- } catch (Exception e) {
- try {
- String n = DocBuilder.class.getPackage().getName() + "." + name;
- return core != null ?
- core.getResourceLoader().findClass(n, Object.class) :
- Class.forName(n);
- } catch (Exception e1) {
- throw new ClassNotFoundException("Unable to load " + name + " or " + DocBuilder.class.getPackage().getName() + "." + name, e);
- }
- }
- }
-
- public static class Statistics {
- public AtomicLong docCount = new AtomicLong();
-
- public AtomicLong deletedDocCount = new AtomicLong();
-
- public AtomicLong failedDocCount = new AtomicLong();
-
- public AtomicLong rowsCount = new AtomicLong();
-
- public AtomicLong queryCount = new AtomicLong();
-
- public AtomicLong skipDocCount = new AtomicLong();
-
- public Statistics add(Statistics stats) {
- this.docCount.addAndGet(stats.docCount.get());
- this.deletedDocCount.addAndGet(stats.deletedDocCount.get());
- this.rowsCount.addAndGet(stats.rowsCount.get());
- this.queryCount.addAndGet(stats.queryCount.get());
-
- return this;
- }
-
- public Map<String, Object> getStatsSnapshot() {
- Map<String, Object> result = new HashMap<>();
- result.put("docCount", docCount.get());
- result.put("deletedDocCount", deletedDocCount.get());
- result.put("rowCount", rowsCount.get());
- result.put("queryCount", rowsCount.get());
- result.put("skipDocCount", skipDocCount.get());
- return result;
- }
-
- }
-
- private void cleanByQuery(String delQuery, AtomicBoolean completeCleanDone) {
- delQuery = getVariableResolver().replaceTokens(delQuery);
- if (reqParams.isClean()) {
- if (delQuery == null && !completeCleanDone.get()) {
- writer.doDeleteAll();
- completeCleanDone.set(true);
- } else if (delQuery != null) {
- writer.deleteByQuery(delQuery);
- }
- }
- }
-
- public static final String LAST_INDEX_TIME = "last_index_time";
- public static final String INDEX_START_TIME = "index_start_time";
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d7c03684/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/EntityProcessor.java
----------------------------------------------------------------------
diff --git a/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/EntityProcessor.java b/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/EntityProcessor.java
deleted file mode 100644
index 8cfbed9..0000000
--- a/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/EntityProcessor.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.handler.dataimport;
-
-import java.util.Map;
-
-/**
- * <p>
- * An instance of entity processor serves an entity. It is reused throughout the
- * import process.
- * </p>
- * <p>
- * Implementations of this abstract class must provide a public no-args constructor.
- * </p>
- * <p>
- * Refer to <a
- * href="http://wiki.apache.org/solr/DataImportHandler">http://wiki.apache.org/solr/DataImportHandler</a>
- * for more details.
- * </p>
- * <p>
- * <b>This API is experimental and may change in the future.</b>
- *
- * @since solr 1.3
- */
-public abstract class EntityProcessor {
-
- /**
- * This method is called when it starts processing an entity. When it comes
- * back to the entity it is called again. So it can reset anything at that point.
- * For a rootmost entity this is called only once for an ingestion. For sub-entities , this
- * is called multiple once for each row from its parent entity
- *
- * @param context The current context
- */
- public abstract void init(Context context);
-
- /**
- * This method helps streaming the data for each row . The implementation
- * would fetch as many rows as needed and gives one 'row' at a time. Only this
- * method is used during a full import
- *
- * @return A 'row'. The 'key' for the map is the column name and the 'value'
- * is the value of that column. If there are no more rows to be
- * returned, return 'null'
- */
- public abstract Map<String, Object> nextRow();
-
- /**
- * This is used for delta-import. It gives the pks of the changed rows in this
- * entity
- *
- * @return the pk vs value of all changed rows
- */
- public abstract Map<String, Object> nextModifiedRowKey();
-
- /**
- * This is used during delta-import. It gives the primary keys of the rows
- * that are deleted from this entity. If this entity is the root entity, solr
- * document is deleted. If this is a sub-entity, the Solr document is
- * considered as 'changed' and will be recreated
- *
- * @return the pk vs value of all changed rows
- */
- public abstract Map<String, Object> nextDeletedRowKey();
-
- /**
- * This is used during delta-import. This gives the primary keys and their
- * values of all the rows changed in a parent entity due to changes in this
- * entity.
- *
- * @return the pk vs value of all changed rows in the parent entity
- */
- public abstract Map<String, Object> nextModifiedParentRowKey();
-
- /**
- * Invoked for each entity at the very end of the import to do any needed cleanup tasks.
- *
- */
- public abstract void destroy();
-
- /**
- * Invoked after the transformers are invoked. EntityProcessors can add, remove or modify values
- * added by Transformers in this method.
- *
- * @param r The transformed row
- * @since solr 1.4
- */
- public void postTransform(Map<String, Object> r) {
- }
-
- /**
- * Invoked when the Entity processor is destroyed towards the end of import.
- *
- * @since solr 1.4
- */
- public void close() {
- //no-op
- }
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d7c03684/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/EntityProcessorBase.java
----------------------------------------------------------------------
diff --git a/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/EntityProcessorBase.java b/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/EntityProcessorBase.java
deleted file mode 100644
index 8311f36..0000000
--- a/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/EntityProcessorBase.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.handler.dataimport;
-
-import org.apache.solr.common.SolrException;
-
-import static org.apache.solr.handler.dataimport.DataImportHandlerException.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.lang.invoke.MethodHandles;
-import java.util.*;
-
-/**
- * <p> Base class for all implementations of {@link EntityProcessor} </p> <p> Most implementations of {@link EntityProcessor}
- * extend this base class which provides common functionality. </p>
- * <p>
- * <b>This API is experimental and subject to change</b>
- *
- * @since solr 1.3
- */
-public class EntityProcessorBase extends EntityProcessor {
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- protected boolean isFirstInit = true;
-
- protected String entityName;
-
- protected Context context;
-
- protected Iterator<Map<String, Object>> rowIterator;
-
- protected String query;
-
- protected String onError = ABORT;
-
- protected DIHCacheSupport cacheSupport = null;
-
- private Zipper zipper;
-
-
- @Override
- public void init(Context context) {
- this.context = context;
- if (isFirstInit) {
- firstInit(context);
- }
- if(zipper!=null){
- zipper.onNewParent(context);
- }else{
- if(cacheSupport!=null) {
- cacheSupport.initNewParent(context);
- }
- }
- }
-
- /**
- * first time init call. do one-time operations here
- * it's necessary to call it from the overridden method,
- * otherwise it throws NPE on accessing zipper from nextRow()
- */
- protected void firstInit(Context context) {
- entityName = context.getEntityAttribute("name");
- String s = context.getEntityAttribute(ON_ERROR);
- if (s != null) onError = s;
-
- zipper = Zipper.createOrNull(context);
-
- if(zipper==null){
- initCache(context);
- }
- isFirstInit = false;
- }
-
- protected void initCache(Context context) {
- String cacheImplName = context
- .getResolvedEntityAttribute(DIHCacheSupport.CACHE_IMPL);
-
- if (cacheImplName != null ) {
- cacheSupport = new DIHCacheSupport(context, cacheImplName);
- }
- }
-
- @Override
- public Map<String, Object> nextModifiedRowKey() {
- return null;
- }
-
- @Override
- public Map<String, Object> nextDeletedRowKey() {
- return null;
- }
-
- @Override
- public Map<String, Object> nextModifiedParentRowKey() {
- return null;
- }
-
- /**
- * For a simple implementation, this is the only method that the sub-class should implement. This is intended to
- * stream rows one-by-one. Return null to signal end of rows
- *
- * @return a row where the key is the name of the field and value can be any Object or a Collection of objects. Return
- * null to signal end of rows
- */
- @Override
- public Map<String, Object> nextRow() {
- return null;// do not do anything
- }
-
- protected Map<String, Object> getNext() {
- if(zipper!=null){
- return zipper.supplyNextChild(rowIterator);
- }else{
- if(cacheSupport==null) {
- try {
- if (rowIterator == null)
- return null;
- if (rowIterator.hasNext())
- return rowIterator.next();
- query = null;
- rowIterator = null;
- return null;
- } catch (Exception e) {
- SolrException.log(log, "getNext() failed for query '" + query + "'", e);
- query = null;
- rowIterator = null;
- wrapAndThrow(DataImportHandlerException.WARN, e);
- return null;
- }
- } else {
- return cacheSupport.getCacheData(context, query, rowIterator);
- }
- }
- }
-
-
- @Override
- public void destroy() {
- query = null;
- if(cacheSupport!=null){
- cacheSupport.destroyAll();
- }
- cacheSupport = null;
- }
-
-
-
- public static final String TRANSFORMER = "transformer";
-
- public static final String TRANSFORM_ROW = "transformRow";
-
- public static final String ON_ERROR = "onError";
-
- public static final String ABORT = "abort";
-
- public static final String CONTINUE = "continue";
-
- public static final String SKIP = "skip";
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d7c03684/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/EntityProcessorWrapper.java
----------------------------------------------------------------------
diff --git a/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/EntityProcessorWrapper.java b/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/EntityProcessorWrapper.java
deleted file mode 100644
index 8a76e11..0000000
--- a/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/EntityProcessorWrapper.java
+++ /dev/null
@@ -1,351 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.handler.dataimport;
-
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.SolrException.ErrorCode;
-import org.apache.solr.core.SolrCore;
-import org.apache.solr.handler.dataimport.config.ConfigNameConstants;
-import org.apache.solr.handler.dataimport.config.Entity;
-
-import static org.apache.solr.handler.dataimport.DataImportHandlerException.*;
-import static org.apache.solr.handler.dataimport.EntityProcessorBase.*;
-import static org.apache.solr.handler.dataimport.EntityProcessorBase.SKIP;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.lang.invoke.MethodHandles;
-import java.lang.reflect.Method;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-/**
- * A Wrapper over {@link EntityProcessor} instance which performs transforms and handles multi-row outputs correctly.
- *
- * @since solr 1.4
- */
-public class EntityProcessorWrapper extends EntityProcessor {
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- private EntityProcessor delegate;
- private Entity entity;
- private DataSource datasource;
- private List<EntityProcessorWrapper> children = new ArrayList<>();
- private DocBuilder docBuilder;
- private boolean initialized;
- private String onError;
- private Context context;
- private VariableResolver resolver;
- private String entityName;
-
- protected List<Transformer> transformers;
-
- protected List<Map<String, Object>> rowcache;
-
- public EntityProcessorWrapper(EntityProcessor delegate, Entity entity, DocBuilder docBuilder) {
- this.delegate = delegate;
- this.entity = entity;
- this.docBuilder = docBuilder;
- }
-
- @Override
- public void init(Context context) {
- rowcache = null;
- this.context = context;
- resolver = (VariableResolver) context.getVariableResolver();
- if (entityName == null) {
- onError = resolver.replaceTokens(context.getEntityAttribute(ON_ERROR));
- if (onError == null) onError = ABORT;
- entityName = context.getEntityAttribute(ConfigNameConstants.NAME);
- }
- delegate.init(context);
-
- }
-
- @SuppressWarnings("unchecked")
- void loadTransformers() {
- String transClasses = context.getEntityAttribute(TRANSFORMER);
-
- if (transClasses == null) {
- transformers = Collections.EMPTY_LIST;
- return;
- }
-
- String[] transArr = transClasses.split(",");
- transformers = new ArrayList<Transformer>() {
- @Override
- public boolean add(Transformer transformer) {
- if (docBuilder != null && docBuilder.verboseDebug) {
- transformer = docBuilder.getDebugLogger().wrapTransformer(transformer);
- }
- return super.add(transformer);
- }
- };
- for (String aTransArr : transArr) {
- String trans = aTransArr.trim();
- if (trans.startsWith("script:")) {
- // The script transformer is a potential vulnerability, esp. when the script is
- // provided from an untrusted source. Check and don't proceed if source is untrusted.
- checkIfTrusted(trans);
- String functionName = trans.substring("script:".length());
- ScriptTransformer scriptTransformer = new ScriptTransformer();
- scriptTransformer.setFunctionName(functionName);
- transformers.add(scriptTransformer);
- continue;
- }
- try {
- Class clazz = DocBuilder.loadClass(trans, context.getSolrCore());
- if (Transformer.class.isAssignableFrom(clazz)) {
- transformers.add((Transformer) clazz.newInstance());
- } else {
- Method meth = clazz.getMethod(TRANSFORM_ROW, Map.class);
- transformers.add(new ReflectionTransformer(meth, clazz, trans));
- }
- } catch (NoSuchMethodException nsme){
- String msg = "Transformer :"
- + trans
- + "does not implement Transformer interface or does not have a transformRow(Map<String.Object> m)method";
- log.error(msg);
- wrapAndThrow(SEVERE, nsme,msg);
- } catch (Exception e) {
- log.error("Unable to load Transformer: " + aTransArr, e);
- wrapAndThrow(SEVERE, e,"Unable to load Transformer: " + trans);
- }
- }
-
- }
-
- private void checkIfTrusted(String trans) {
- if (docBuilder != null) {
- SolrCore core = docBuilder.dataImporter.getCore();
- boolean trusted = (core != null)? core.getCoreDescriptor().isConfigSetTrusted(): true;
- if (!trusted) {
- Exception ex = new SolrException(ErrorCode.UNAUTHORIZED, "The configset for this collection was uploaded "
- + "without any authentication in place,"
- + " and this transformer is not available for collections with untrusted configsets. To use this transformer,"
- + " re-upload the configset after enabling authentication and authorization.");
- String msg = "Transformer: "
- + trans
- + ". " + ex.getMessage();
- log.error(msg);
- wrapAndThrow(SEVERE, ex, msg);
- }
- }
- }
-
- @SuppressWarnings("unchecked")
- static class ReflectionTransformer extends Transformer {
- final Method meth;
-
- final Class clazz;
-
- final String trans;
-
- final Object o;
-
- public ReflectionTransformer(Method meth, Class clazz, String trans)
- throws Exception {
- this.meth = meth;
- this.clazz = clazz;
- this.trans = trans;
- o = clazz.newInstance();
- }
-
- @Override
- public Object transformRow(Map<String, Object> aRow, Context context) {
- try {
- return meth.invoke(o, aRow);
- } catch (Exception e) {
- log.warn("method invocation failed on transformer : " + trans, e);
- throw new DataImportHandlerException(WARN, e);
- }
- }
- }
-
- protected Map<String, Object> getFromRowCache() {
- Map<String, Object> r = rowcache.remove(0);
- if (rowcache.isEmpty())
- rowcache = null;
- return r;
- }
-
- @SuppressWarnings("unchecked")
- protected Map<String, Object> applyTransformer(Map<String, Object> row) {
- if(row == null) return null;
- if (transformers == null)
- loadTransformers();
- if (transformers == Collections.EMPTY_LIST)
- return row;
- Map<String, Object> transformedRow = row;
- List<Map<String, Object>> rows = null;
- boolean stopTransform = checkStopTransform(row);
- VariableResolver resolver = (VariableResolver) context.getVariableResolver();
- for (Transformer t : transformers) {
- if (stopTransform) break;
- try {
- if (rows != null) {
- List<Map<String, Object>> tmpRows = new ArrayList<>();
- for (Map<String, Object> map : rows) {
- resolver.addNamespace(entityName, map);
- Object o = t.transformRow(map, context);
- if (o == null)
- continue;
- if (o instanceof Map) {
- Map oMap = (Map) o;
- stopTransform = checkStopTransform(oMap);
- tmpRows.add((Map) o);
- } else if (o instanceof List) {
- tmpRows.addAll((List) o);
- } else {
- log.error("Transformer must return Map<String, Object> or a List<Map<String, Object>>");
- }
- }
- rows = tmpRows;
- } else {
- resolver.addNamespace(entityName, transformedRow);
- Object o = t.transformRow(transformedRow, context);
- if (o == null)
- return null;
- if (o instanceof Map) {
- Map oMap = (Map) o;
- stopTransform = checkStopTransform(oMap);
- transformedRow = (Map) o;
- } else if (o instanceof List) {
- rows = (List) o;
- } else {
- log.error("Transformer must return Map<String, Object> or a List<Map<String, Object>>");
- }
- }
- } catch (Exception e) {
- log.warn("transformer threw error", e);
- if (ABORT.equals(onError)) {
- wrapAndThrow(SEVERE, e);
- } else if (SKIP.equals(onError)) {
- wrapAndThrow(DataImportHandlerException.SKIP, e);
- }
- // onError = continue
- }
- }
- if (rows == null) {
- return transformedRow;
- } else {
- rowcache = rows;
- return getFromRowCache();
- }
-
- }
-
- private boolean checkStopTransform(Map oMap) {
- return oMap.get("$stopTransform") != null
- && Boolean.parseBoolean(oMap.get("$stopTransform").toString());
- }
-
- @Override
- public Map<String, Object> nextRow() {
- if (rowcache != null) {
- return getFromRowCache();
- }
- while (true) {
- Map<String, Object> arow = null;
- try {
- arow = delegate.nextRow();
- } catch (Exception e) {
- if(ABORT.equals(onError)){
- wrapAndThrow(SEVERE, e);
- } else {
- //SKIP is not really possible. If this calls the nextRow() again the Entityprocessor would be in an inconisttent state
- SolrException.log(log, "Exception in entity : "+ entityName, e);
- return null;
- }
- }
- if (arow == null) {
- return null;
- } else {
- arow = applyTransformer(arow);
- if (arow != null) {
- delegate.postTransform(arow);
- return arow;
- }
- }
- }
- }
-
- @Override
- public Map<String, Object> nextModifiedRowKey() {
- Map<String, Object> row = delegate.nextModifiedRowKey();
- row = applyTransformer(row);
- rowcache = null;
- return row;
- }
-
- @Override
- public Map<String, Object> nextDeletedRowKey() {
- Map<String, Object> row = delegate.nextDeletedRowKey();
- row = applyTransformer(row);
- rowcache = null;
- return row;
- }
-
- @Override
- public Map<String, Object> nextModifiedParentRowKey() {
- return delegate.nextModifiedParentRowKey();
- }
-
- @Override
- public void destroy() {
- delegate.destroy();
- }
-
- public VariableResolver getVariableResolver() {
- return (VariableResolver) context.getVariableResolver();
- }
-
- public Context getContext() {
- return context;
- }
-
- @Override
- public void close() {
- delegate.close();
- }
-
- public Entity getEntity() {
- return entity;
- }
-
- public List<EntityProcessorWrapper> getChildren() {
- return children;
- }
-
- public DataSource getDatasource() {
- return datasource;
- }
-
- public void setDatasource(DataSource datasource) {
- this.datasource = datasource;
- }
-
- public boolean isInitialized() {
- return initialized;
- }
-
- public void setInitialized(boolean initialized) {
- this.initialized = initialized;
- }
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d7c03684/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/Evaluator.java
----------------------------------------------------------------------
diff --git a/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/Evaluator.java b/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/Evaluator.java
deleted file mode 100644
index 22282b9..0000000
--- a/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/Evaluator.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.handler.dataimport;
-
-import static org.apache.solr.handler.dataimport.DataImportHandlerException.SEVERE;
-import static org.apache.solr.handler.dataimport.DataImportHandlerException.wrapAndThrow;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.regex.Pattern;
-
-/**
- * <p>
- * Pluggable functions for resolving variables
- * </p>
- * <p>
- * Implementations of this abstract class must provide a public no-arg constructor.
- * </p>
- * <p>
- * Refer to <a
- * href="http://wiki.apache.org/solr/DataImportHandler">http://wiki.apache.org/solr/DataImportHandler</a>
- * for more details.
- * </p>
- * <b>This API is experimental and may change in the future.</b>
- *
- * @since solr 1.3
- */
-public abstract class Evaluator {
-
- /**
- * Return a String after processing an expression and a {@link VariableResolver}
- *
- * @see VariableResolver
- * @param expression string to be evaluated
- * @param context instance
- * @return the value of the given expression evaluated using the resolver
- */
- public abstract String evaluate(String expression, Context context);
-
- /**
- * Parses a string of expression into separate params. The values are separated by commas. each value will be
- * translated into one of the following:
- * <ol>
- * <li>If it is in single quotes the value will be translated to a String</li>
- * <li>If is is not in quotes and is a number a it will be translated into a Double</li>
- * <li>else it is a variable which can be resolved and it will be put in as an instance of VariableWrapper</li>
- * </ol>
- *
- * @param expression the expression to be parsed
- * @param vr the VariableResolver instance for resolving variables
- *
- * @return a List of objects which can either be a string, number or a variable wrapper
- */
- protected List<Object> parseParams(String expression, VariableResolver vr) {
- List<Object> result = new ArrayList<>();
- expression = expression.trim();
- String[] ss = expression.split(",");
- for (int i = 0; i < ss.length; i++) {
- ss[i] = ss[i].trim();
- if (ss[i].startsWith("'")) {//a string param has started
- StringBuilder sb = new StringBuilder();
- while (true) {
- sb.append(ss[i]);
- if (ss[i].endsWith("'")) break;
- i++;
- if (i >= ss.length)
- throw new DataImportHandlerException(SEVERE, "invalid string at " + ss[i - 1] + " in function params: " + expression);
- sb.append(",");
- }
- String s = sb.substring(1, sb.length() - 1);
- s = s.replaceAll("\\\\'", "'");
- result.add(s);
- } else {
- if (Character.isDigit(ss[i].charAt(0))) {
- try {
- Double doub = Double.parseDouble(ss[i]);
- result.add(doub);
- } catch (NumberFormatException e) {
- if (vr.resolve(ss[i]) == null) {
- wrapAndThrow(
- SEVERE, e, "Invalid number :" + ss[i] +
- "in parameters " + expression);
- }
- }
- } else {
- result.add(getVariableWrapper(ss[i], vr));
- }
- }
- }
- return result;
- }
-
- protected VariableWrapper getVariableWrapper(String s, VariableResolver vr) {
- return new VariableWrapper(s,vr);
- }
-
- static protected class VariableWrapper {
- public final String varName;
- public final VariableResolver vr;
-
- public VariableWrapper(String s, VariableResolver vr) {
- this.varName = s;
- this.vr = vr;
- }
-
- public Object resolve() {
- return vr.resolve(varName);
- }
-
- @Override
- public String toString() {
- Object o = vr.resolve(varName);
- return o == null ? null : o.toString();
- }
- }
-
- static Pattern IN_SINGLE_QUOTES = Pattern.compile("^'(.*?)'$");
-
- public static final String DATE_FORMAT_EVALUATOR = "formatDate";
-
- public static final String URL_ENCODE_EVALUATOR = "encodeUrl";
-
- public static final String ESCAPE_SOLR_QUERY_CHARS = "escapeQueryChars";
-
- public static final String SQL_ESCAPE_EVALUATOR = "escapeSql";
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d7c03684/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/EventListener.java
----------------------------------------------------------------------
diff --git a/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/EventListener.java b/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/EventListener.java
deleted file mode 100644
index 0c43a0b..0000000
--- a/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/EventListener.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.handler.dataimport;
-
-/**
- * Event listener for DataImportHandler
- *
- * <b>This API is experimental and subject to change</b>
- *
- * @since solr 1.4
- */
-public interface EventListener {
-
- /**
- * Event callback
- *
- * @param ctx the Context in which this event was called
- */
- void onEvent(Context ctx);
-
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d7c03684/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/FieldReaderDataSource.java
----------------------------------------------------------------------
diff --git a/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/FieldReaderDataSource.java b/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/FieldReaderDataSource.java
deleted file mode 100644
index 571c280..0000000
--- a/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/FieldReaderDataSource.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.handler.dataimport;
-
-import static org.apache.solr.handler.dataimport.DataImportHandlerException.SEVERE;
-import static org.apache.solr.handler.dataimport.DataImportHandlerException.wrapAndThrow;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.*;
-import java.lang.invoke.MethodHandles;
-import java.nio.charset.StandardCharsets;
-import java.sql.Blob;
-import java.sql.Clob;
-import java.sql.SQLException;
-import java.util.Properties;
-
-/**
- * This can be useful for users who have a DB field containing xml and wish to use a nested {@link XPathEntityProcessor}
- * <p>
- * The datasouce may be configured as follows
- * <p>
- * <datasource name="f1" type="FieldReaderDataSource" />
- * <p>
- * The entity which uses this datasource must keep the url value as the variable name url="field-name"
- * <p>
- * The fieldname must be resolvable from {@link VariableResolver}
- * <p>
- * This may be used with any {@link EntityProcessor} which uses a {@link DataSource}<{@link Reader}> eg: {@link XPathEntityProcessor}
- * <p>
- * Supports String, BLOB, CLOB data types and there is an extra field (in the entity) 'encoding' for BLOB types
- *
- * @since 1.4
- */
-public class FieldReaderDataSource extends DataSource<Reader> {
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- protected VariableResolver vr;
- protected String dataField;
- private String encoding;
- private EntityProcessorWrapper entityProcessor;
-
- @Override
- public void init(Context context, Properties initProps) {
- dataField = context.getEntityAttribute("dataField");
- encoding = context.getEntityAttribute("encoding");
- entityProcessor = (EntityProcessorWrapper) context.getEntityProcessor();
- /*no op*/
- }
-
- @Override
- public Reader getData(String query) {
- Object o = entityProcessor.getVariableResolver().resolve(dataField);
- if (o == null) {
- throw new DataImportHandlerException (SEVERE, "No field available for name : " +dataField);
- }
- if (o instanceof String) {
- return new StringReader((String) o);
- } else if (o instanceof Clob) {
- Clob clob = (Clob) o;
- try {
- //Most of the JDBC drivers have getCharacterStream defined as public
- // so let us just check it
- return readCharStream(clob);
- } catch (Exception e) {
- log.info("Unable to get data from CLOB");
- return null;
-
- }
-
- } else if (o instanceof Blob) {
- Blob blob = (Blob) o;
- try {
- return getReader(blob);
- } catch (Exception e) {
- log.info("Unable to get data from BLOB");
- return null;
-
- }
- } else {
- return new StringReader(o.toString());
- }
-
- }
-
- static Reader readCharStream(Clob clob) {
- try {
- return clob.getCharacterStream();
- } catch (Exception e) {
- wrapAndThrow(SEVERE, e,"Unable to get reader from clob");
- return null;//unreachable
- }
- }
-
- private Reader getReader(Blob blob)
- throws SQLException, UnsupportedEncodingException {
- if (encoding == null) {
- return (new InputStreamReader(blob.getBinaryStream(), StandardCharsets.UTF_8));
- } else {
- return (new InputStreamReader(blob.getBinaryStream(), encoding));
- }
- }
-
- @Override
- public void close() {
-
- }
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d7c03684/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/FieldStreamDataSource.java
----------------------------------------------------------------------
diff --git a/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/FieldStreamDataSource.java b/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/FieldStreamDataSource.java
deleted file mode 100644
index ba7ca5d..0000000
--- a/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/FieldStreamDataSource.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.handler.dataimport;
-
-import static org.apache.solr.handler.dataimport.DataImportHandlerException.SEVERE;
-
-import java.io.ByteArrayInputStream;
-import java.io.InputStream;
-import java.lang.invoke.MethodHandles;
-import java.sql.Blob;
-import java.sql.SQLException;
-import java.util.Properties;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * This can be useful for users who have a DB field containing BLOBs which may be Rich documents
- * <p>
- * The datasource may be configured as follows
- * <p>
- * <dataSource name="f1" type="FieldStreamDataSource" />
- * <p>
- * The entity which uses this datasource must keep and attribute dataField
- * <p>
- * The fieldname must be resolvable from {@link VariableResolver}
- * <p>
- * This may be used with any {@link EntityProcessor} which uses a {@link DataSource}<{@link InputStream}> eg: TikaEntityProcessor
- *
- * @since 3.1
- */
-public class FieldStreamDataSource extends DataSource<InputStream> {
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- protected VariableResolver vr;
- protected String dataField;
- private EntityProcessorWrapper wrapper;
-
- @Override
- public void init(Context context, Properties initProps) {
- dataField = context.getEntityAttribute("dataField");
- wrapper = (EntityProcessorWrapper) context.getEntityProcessor();
- /*no op*/
- }
-
- @Override
- public InputStream getData(String query) {
- Object o = wrapper.getVariableResolver().resolve(dataField);
- if (o == null) {
- throw new DataImportHandlerException(SEVERE, "No field available for name : " + dataField);
- } else if (o instanceof Blob) {
- Blob blob = (Blob) o;
- try {
- return blob.getBinaryStream();
- } catch (SQLException sqle) {
- log.info("Unable to get data from BLOB");
- return null;
- }
- } else if (o instanceof byte[]) {
- byte[] bytes = (byte[]) o;
- return new ByteArrayInputStream(bytes);
- } else {
- throw new RuntimeException("unsupported type : " + o.getClass());
- }
-
- }
-
- @Override
- public void close() {
- }
-}