You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@lucene.apache.org by "Dyer, James" <Ja...@ingrambook.com> on 2012/03/22 20:33:46 UTC
RE: svn commit: r1303792 [1/2] - in
/lucene/dev/branches/branch_3x/solr:
contrib/dataimporthandler-extras/src/test/org/apache/solr/handler/dataimport/
contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/
contrib/dataimporthandler/src/...
I'm running "ant javadocs" now and will fix and errors it gives me related to my commit.
James Dyer
E-Commerce Systems
Ingram Content Group
(615) 213-4311
-----Original Message-----
From: Chris Hostetter [mailto:hossman_lucene@fucit.org]
Sent: Thursday, March 22, 2012 2:16 PM
To: dev@lucene.apache.org
Cc: jdyer@apache.org
Subject: Re: svn commit: r1303792 [1/2] - in /lucene/dev/branches/branch_3x/solr: contrib/dataimporthandler-extras/src/test/org/apache/solr/handler/dataimport/ contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/ contrib/dataimporthandler/src/...
james: i'm seeing a bunch of errors from "ant javadocs" on the 3x branch
that seem to relate to methods/variables you've changed/removed here...
[javadoc]
/home/hossman/lucene/3x_dev/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DIHWriter.java:84:
warning - @return tag has no arguments.
[javadoc]
/home/hossman/lucene/3x_dev/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/EntityProcessorWrapper.java:167:
warning - Tag @link: reference not found: transformRow
[javadoc]
/home/hossman/lucene/3x_dev/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/EntityProcessorWrapper.java:167:
warning - Tag @link: reference not found: rowcache
[javadoc]
/home/hossman/lucene/3x_dev/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/EntityProcessorWrapper.java:167:
warning - Tag @link: reference not found: getFromRowCache
[javadoc]
/home/hossman/lucene/3x_dev/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/EntityProcessorWrapper.java:334:
warning - Tag @link: reference not found: delegate
...etc...
: Date: Thu, 22 Mar 2012 14:11:17 -0000
: From: jdyer@apache.org
: Reply-To: dev@lucene.apache.org
: To: commits@lucene.apache.org
: Subject: svn commit: r1303792 [1/2] - in /lucene/dev/branches/branch_3x/solr:
: contrib/dataimporthandler-extras/src/test/org/apache/solr/handler/dataimpo
: rt/ contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/
: contrib/dataimporthandler/src/test...
:
: Author: jdyer
: Date: Thu Mar 22 14:11:16 2012
: New Revision: 1303792
:
: URL: http://svn.apache.org/viewvc?rev=1303792&view=rev
: Log:
: SOLR-2382: Framework for Pluggable caches
:
: Added:
: lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/CachePropertyUtil.java (with props)
: lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DIHCache.java (with props)
: lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DIHCacheSupport.java (with props)
: lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DIHLogLevels.java (with props)
: lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DIHPropertiesWriter.java (with props)
: lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DIHWriter.java (with props)
: lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DIHWriterBase.java (with props)
: lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/SimplePropertiesWriter.java (with props)
: lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/SortedMapBackedCache.java (with props)
: lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/AbstractDIHCacheTestCase.java (with props)
: lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/DestroyCountCache.java (with props)
: lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestEphemeralCache.java (with props)
: lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSortedMapBackedCache.java (with props)
: lucene/dev/branches/branch_3x/solr/webapp/web/WEB-INF/jboss-web.xml (with props)
: Modified:
: lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler-extras/src/test/org/apache/solr/handler/dataimport/TestMailEntityProcessor.java
: lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/CachedSqlEntityProcessor.java
: lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/ContextImpl.java
: lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DataConfig.java
: lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DataImportHandler.java
: lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DataImporter.java
: lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DebugLogger.java
: lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DocBuilder.java
: lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/EntityProcessorBase.java
: lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/EntityProcessorWrapper.java
: lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/SolrWriter.java
: lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/ThreadedContext.java
: lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/test-files/dih/solr/conf/dataimport-schema.xml
: lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestCachedSqlEntityProcessor.java
: lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestContextImpl.java
: lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestDocBuilder.java
:
: Modified: lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler-extras/src/test/org/apache/solr/handler/dataimport/TestMailEntityProcessor.java
: URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler-extras/src/test/org/apache/solr/handler/dataimport/TestMailEntityProcessor.java?rev=1303792&r1=1303791&r2=1303792&view=diff
: ==============================================================================
: --- lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler-extras/src/test/org/apache/solr/handler/dataimport/TestMailEntityProcessor.java (original)
: +++ lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler-extras/src/test/org/apache/solr/handler/dataimport/TestMailEntityProcessor.java Thu Mar 22 14:11:16 2012
: @@ -188,7 +188,7 @@ public class TestMailEntityProcessor ext
: Boolean commitCalled;
:
: public SolrWriterImpl() {
: - super(null, ".");
: + super(null);
: }
:
: @Override
: @@ -197,11 +197,6 @@ public class TestMailEntityProcessor ext
: }
:
: @Override
: - public void log(int event, String name, Object row) {
: - // Do nothing
: - }
: -
: - @Override
: public void doDeleteAll() {
: deleteAllCalled = Boolean.TRUE;
: }
:
: Added: lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/CachePropertyUtil.java
: URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/CachePropertyUtil.java?rev=1303792&view=auto
: ==============================================================================
: --- lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/CachePropertyUtil.java (added)
: +++ lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/CachePropertyUtil.java Thu Mar 22 14:11:16 2012
: @@ -0,0 +1,32 @@
: +package org.apache.solr.handler.dataimport;
: +
: +public class CachePropertyUtil {
: + public static String getAttributeValueAsString(Context context, String attr) {
: + Object o = context.getSessionAttribute(attr, Context.SCOPE_ENTITY);
: + if (o == null) {
: + o = context.getResolvedEntityAttribute(attr);
: + }
: + if (o == null && context.getRequestParameters() != null) {
: + o = context.getRequestParameters().get(attr);
: + }
: + if (o == null) {
: + return null;
: + }
: + return o.toString();
: + }
: +
: + public static Object getAttributeValue(Context context, String attr) {
: + Object o = context.getSessionAttribute(attr, Context.SCOPE_ENTITY);
: + if (o == null) {
: + o = context.getResolvedEntityAttribute(attr);
: + }
: + if (o == null && context.getRequestParameters() != null) {
: + o = context.getRequestParameters().get(attr);
: + }
: + if (o == null) {
: + return null;
: + }
: + return o;
: + }
: +
: +}
:
: Modified: lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/CachedSqlEntityProcessor.java
: URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/CachedSqlEntityProcessor.java?rev=1303792&r1=1303791&r2=1303792&view=diff
: ==============================================================================
: --- lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/CachedSqlEntityProcessor.java (original)
: +++ lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/CachedSqlEntityProcessor.java Thu Mar 22 14:11:16 2012
: @@ -16,67 +16,27 @@
: */
: package org.apache.solr.handler.dataimport;
:
: -import java.util.ArrayList;
: -import java.util.List;
: -import java.util.Map;
: -
: /**
: * This class enables caching of data obtained from the DB to avoid too many sql
: * queries
: * <p/>
: * <p>
: * Refer to <a
: - * href="http://wiki.apache.org/solr/DataImportHandler">http://wiki.apache.org/solr/DataImportHandler</a>
: - * for more details.
: + * href="http://wiki.apache.org/solr/DataImportHandler">http://wiki.apache
: + * .org/solr/DataImportHandler</a> for more details.
: * </p>
: * <p/>
: * <b>This API is experimental and subject to change</b>
: - *
: + *
: * @version $Id$
: * @since solr 1.3
: + * @deprecated - Use SqlEntityProcessor with cacheImpl parameter.
: */
: +@Deprecated
: public class CachedSqlEntityProcessor extends SqlEntityProcessor {
: - private boolean isFirst;
: -
: - @Override
: - @SuppressWarnings("unchecked")
: - public void init(Context context) {
: - super.init(context);
: - super.cacheInit();
: - isFirst = true;
: - }
: -
: - @Override
: - public Map<String, Object> nextRow() {
: - if (dataSourceRowCache != null)
: - return getFromRowCacheTransformed();
: - if (!isFirst)
: - return null;
: - String query = context.replaceTokens(context.getEntityAttribute("query"));
: - isFirst = false;
: - if (simpleCache != null) {
: - return getSimpleCacheData(query);
: - } else {
: - return getIdCacheData(query);
: + @Override
: + protected void initCache(Context context) {
: + cacheSupport = new DIHCacheSupport(context, "SortedMapBackedCache");
: }
:
: - }
: -
: - @Override
: - protected List<Map<String, Object>> getAllNonCachedRows() {
: - List<Map<String, Object>> rows = new ArrayList<Map<String, Object>>();
: - String q = getQuery();
: - initQuery(context.replaceTokens(q));
: - if (rowIterator == null)
: - return rows;
: - while (rowIterator.hasNext()) {
: - Map<String, Object> arow = rowIterator.next();
: - if (arow == null) {
: - break;
: - } else {
: - rows.add(arow);
: - }
: - }
: - return rows;
: - }
: }
:
: Modified: lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/ContextImpl.java
: URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/ContextImpl.java?rev=1303792&r1=1303791&r2=1303792&view=diff
: ==============================================================================
: --- lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/ContextImpl.java (original)
: +++ lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/ContextImpl.java Thu Mar 22 14:11:16 2012
: @@ -101,7 +101,7 @@ public class ContextImpl extends Context
: if (entity.dataSrc != null && docBuilder != null && docBuilder.verboseDebug &&
: Context.FULL_DUMP.equals(currentProcess())) {
: //debug is not yet implemented properly for deltas
: - entity.dataSrc = docBuilder.writer.getDebugLogger().wrapDs(entity.dataSrc);
: + entity.dataSrc = docBuilder.getDebugLogger().wrapDs(entity.dataSrc);
: }
: return entity.dataSrc;
: }
:
: Added: lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DIHCache.java
: URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DIHCache.java?rev=1303792&view=auto
: ==============================================================================
: --- lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DIHCache.java (added)
: +++ lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DIHCache.java Thu Mar 22 14:11:16 2012
: @@ -0,0 +1,108 @@
: +package org.apache.solr.handler.dataimport;
: +
: +/*
: + * Licensed to the Apache Software Foundation (ASF) under one or more
: + * contributor license agreements. See the NOTICE file distributed with
: + * this work for additional information regarding copyright ownership.
: + * The ASF licenses this file to You under the Apache License, Version 2.0
: + * (the "License"); you may not use this file except in compliance with
: + * the License. You may obtain a copy of the License at
: + *
: + * http://www.apache.org/licenses/LICENSE-2.0
: + *
: + * Unless required by applicable law or agreed to in writing, software
: + * distributed under the License is distributed on an "AS IS" BASIS,
: + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
: + * See the License for the specific language governing permissions and
: + * limitations under the License.
: + */
: +
: +import java.util.Iterator;
: +import java.util.Map;
: +
: +/**
: + * <p>
: + * A cache that allows a DIH entity's data to persist locally prior being joined
: + * to other data and/or indexed.
: + * </p>
: + *
: + * @lucene.experimental
: + */
: +public interface DIHCache extends Iterable<Map<String,Object>> {
: +
: + /**
: + * <p>
: + * Opens the cache using the specified properties. The {@link Context}
: + * includes any parameters needed by the cache impl. This must be called
: + * before any read/write operations are permitted.
: + * <p>
: + */
: + public void open(Context context);
: +
: + /**
: + * <p>
: + * Releases resources used by this cache, if possible. The cache is flushed
: + * but not destroyed.
: + * </p>
: + */
: + public void close();
: +
: + /**
: + * <p>
: + * Persists any pending data to the cache
: + * </p>
: + */
: + public void flush();
: +
: + /**
: + * <p>
: + * Closes the cache, if open. Then removes all data, possibly removing the
: + * cache entirely from persistent storage.
: + * </p>
: + */
: + public void destroy();
: +
: + /**
: + * <p>
: + * Adds a document. If a document already exists with the same key, both
: + * documents will exist in the cache, as the cache allows duplicate keys. To
: + * update a key's documents, first call delete(Object key).
: + * </p>
: + *
: + * @param rec
: + */
: + public void add(Map<String,Object> rec);
: +
: + /**
: + * <p>
: + * Returns an iterator, allowing callers to iterate through the entire cache
: + * in key, then insertion, order.
: + * </p>
: + */
: + public Iterator<Map<String,Object>> iterator();
: +
: + /**
: + * <p>
: + * Returns an iterator, allowing callers to iterate through all documents that
: + * match the given key in insertion order.
: + * </p>
: + */
: + public Iterator<Map<String,Object>> iterator(Object key);
: +
: + /**
: + * <p>
: + * Delete all documents associated with the given key
: + * </p>
: + *
: + * @param key
: + */
: + public void delete(Object key);
: +
: + /**
: + * <p>
: + * Delete all data from the cache,leaving the empty cache intact.
: + * </p>
: + */
: + public void deleteAll();
: +
: +}
:
: Added: lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DIHCacheSupport.java
: URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DIHCacheSupport.java?rev=1303792&view=auto
: ==============================================================================
: --- lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DIHCacheSupport.java (added)
: +++ lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DIHCacheSupport.java Thu Mar 22 14:11:16 2012
: @@ -0,0 +1,250 @@
: +package org.apache.solr.handler.dataimport;
: +
: +import static org.apache.solr.handler.dataimport.DataImportHandlerException.wrapAndThrow;
: +
: +import java.lang.reflect.Constructor;
: +import java.util.ArrayList;
: +import java.util.HashMap;
: +import java.util.Iterator;
: +import java.util.List;
: +import java.util.Map;
: +
: +import org.apache.solr.common.SolrException;
: +import org.slf4j.Logger;
: +import org.slf4j.LoggerFactory;
: +
: +public class DIHCacheSupport {
: + private static final Logger log = LoggerFactory
: + .getLogger(DIHCacheSupport.class);
: + private String cacheForeignKey;
: + private String cacheImplName;
: + private Map<String,DIHCache> queryVsCache = new HashMap<String,DIHCache>();
: + private Map<String,Iterator<Map<String,Object>>> queryVsCacheIterator;
: + private Iterator<Map<String,Object>> dataSourceRowCache;
: + private boolean cacheDoKeyLookup;
: +
: + public DIHCacheSupport(Context context, String cacheImplName) {
: + this.cacheImplName = cacheImplName;
: +
: + String where = context.getEntityAttribute("where");
: + String cacheKey = context.getEntityAttribute(DIHCacheSupport.CACHE_PRIMARY_KEY);
: + String lookupKey = context.getEntityAttribute(DIHCacheSupport.CACHE_FOREIGN_KEY);
: + if (cacheKey != null && lookupKey == null) {
: + throw new DataImportHandlerException(DataImportHandlerException.SEVERE,
: + "'cacheKey' is specified for the entity "
: + + context.getEntityAttribute("name")
: + + " but 'cacheLookup' is missing");
: +
: + }
: + if (where == null && cacheKey == null) {
: + cacheDoKeyLookup = false;
: + } else {
: + if (where != null) {
: + String[] splits = where.split("=");
: + cacheKey = splits[0];
: + cacheForeignKey = splits[1].trim();
: + } else {
: + cacheForeignKey = lookupKey;
: + }
: + cacheDoKeyLookup = true;
: + }
: + context.setSessionAttribute(DIHCacheSupport.CACHE_PRIMARY_KEY, cacheKey,
: + Context.SCOPE_ENTITY);
: + context.setSessionAttribute(DIHCacheSupport.CACHE_FOREIGN_KEY, cacheForeignKey,
: + Context.SCOPE_ENTITY);
: + context.setSessionAttribute(DIHCacheSupport.CACHE_DELETE_PRIOR_DATA,
: + "true", Context.SCOPE_ENTITY);
: + context.setSessionAttribute(DIHCacheSupport.CACHE_READ_ONLY, "false",
: + Context.SCOPE_ENTITY);
: + }
: +
: + private DIHCache instantiateCache(Context context) {
: + DIHCache cache = null;
: + try {
: + @SuppressWarnings("unchecked")
: + Class<DIHCache> cacheClass = DocBuilder.loadClass(cacheImplName, context
: + .getSolrCore());
: + Constructor<DIHCache> constr = cacheClass.getConstructor();
: + cache = constr.newInstance();
: + cache.open(context);
: + } catch (Exception e) {
: + throw new DataImportHandlerException(DataImportHandlerException.SEVERE,
: + "Unable to load Cache implementation:" + cacheImplName, e);
: + }
: + return cache;
: + }
: +
: + public void initNewParent(Context context) {
: + queryVsCacheIterator = new HashMap<String,Iterator<Map<String,Object>>>();
: + for (Map.Entry<String,DIHCache> entry : queryVsCache.entrySet()) {
: + queryVsCacheIterator.put(entry.getKey(), entry.getValue().iterator());
: + }
: + }
: +
: + public void destroyAll() {
: + if (queryVsCache != null) {
: + for (DIHCache cache : queryVsCache.values()) {
: + cache.destroy();
: + }
: + }
: + queryVsCache = null;
: + dataSourceRowCache = null;
: + cacheForeignKey = null;
: + }
: +
: + /**
: + * <p>
: + * Get all the rows from the datasource for the given query and cache them
: + * </p>
: + */
: + public void populateCache(String query,
: + Iterator<Map<String,Object>> rowIterator) {
: + Map<String,Object> aRow = null;
: + DIHCache cache = queryVsCache.get(query);
: + while ((aRow = getNextFromCache(query, rowIterator)) != null) {
: + cache.add(aRow);
: + }
: + }
: +
: + private Map<String,Object> getNextFromCache(String query,
: + Iterator<Map<String,Object>> rowIterator) {
: + try {
: + if (rowIterator == null) return null;
: + if (rowIterator.hasNext()) return rowIterator.next();
: + return null;
: + } catch (Exception e) {
: + SolrException.log(log, "getNextFromCache() failed for query '" + query
: + + "'", e);
: + wrapAndThrow(DataImportHandlerException.WARN, e);
: + return null;
: + }
: + }
: +
: + public Map<String,Object> getCacheData(Context context, String query,
: + Iterator<Map<String,Object>> rowIterator) {
: + if (cacheDoKeyLookup) {
: + return getIdCacheData(context, query, rowIterator);
: + } else {
: + return getSimpleCacheData(context, query, rowIterator);
: + }
: + }
: +
: + /**
: + * If the where clause is present the cache is sql Vs Map of key Vs List of
: + * Rows.
: + *
: + * @param query
: + * the query string for which cached data is to be returned
: + *
: + * @return the cached row corresponding to the given query after all variables
: + * have been resolved
: + */
: + protected Map<String,Object> getIdCacheData(Context context, String query,
: + Iterator<Map<String,Object>> rowIterator) {
: + Object key = context.resolve(cacheForeignKey);
: + if (key == null) {
: + throw new DataImportHandlerException(DataImportHandlerException.WARN,
: + "The cache lookup value : " + cacheForeignKey
: + + " is resolved to be null in the entity :"
: + + context.getEntityAttribute("name"));
: +
: + }
: + DIHCache cache = queryVsCache.get(query);
: + if (cache == null) {
: + cache = instantiateCache(context);
: + queryVsCache.put(query, cache);
: + populateCache(query, rowIterator);
: + }
: + if (dataSourceRowCache == null) {
: + dataSourceRowCache = cache.iterator(key);
: + }
: + if (dataSourceRowCache == null) {
: + return null;
: + }
: + return getFromRowCacheTransformed();
: + }
: +
: + /**
: + * If where clause is not present the cache is a Map of query vs List of Rows.
: + *
: + * @param query
: + * string for which cached row is to be returned
: + *
: + * @return the cached row corresponding to the given query
: + */
: + protected Map<String,Object> getSimpleCacheData(Context context,
: + String query, Iterator<Map<String,Object>> rowIterator) {
: + DIHCache cache = queryVsCache.get(query);
: + if (cache == null) {
: + cache = instantiateCache(context);
: + queryVsCache.put(query, cache);
: + populateCache(query, rowIterator);
: + queryVsCacheIterator.put(query, cache.iterator());
: + }
: + if (dataSourceRowCache == null || !dataSourceRowCache.hasNext()) {
: + dataSourceRowCache = null;
: + Iterator<Map<String,Object>> cacheIter = queryVsCacheIterator.get(query);
: + if (cacheIter.hasNext()) {
: + List<Map<String,Object>> dsrcl = new ArrayList<Map<String,Object>>(1);
: + dsrcl.add(cacheIter.next());
: + dataSourceRowCache = dsrcl.iterator();
: + }
: + }
: + if (dataSourceRowCache == null) {
: + return null;
: + }
: + return getFromRowCacheTransformed();
: + }
: +
: + protected Map<String,Object> getFromRowCacheTransformed() {
: + if (dataSourceRowCache == null || !dataSourceRowCache.hasNext()) {
: + dataSourceRowCache = null;
: + return null;
: + }
: + Map<String,Object> r = dataSourceRowCache.next();
: + return r;
: + }
: +
: + /**
: + * <p>
: + * Specify the class for the cache implementation
: + * </p>
: + */
: + public static final String CACHE_IMPL = "cacheImpl";
: +
: + /**
: + * <p>
: + * If the cache supports persistent data, set to "true" to delete any prior
: + * persisted data before running the entity.
: + * </p>
: + */
: +
: + public static final String CACHE_DELETE_PRIOR_DATA = "cacheDeletePriorData";
: + /**
: + * <p>
: + * Specify the Foreign Key from the parent entity to join on. Use if the cache
: + * is on a child entity.
: + * </p>
: + */
: + public static final String CACHE_FOREIGN_KEY = "cacheLookup";
: +
: +
: +
: + /**
: + * <p>
: + * Specify the Primary Key field from this Entity to map the input records
: + * with
: + * </p>
: + */
: + public static final String CACHE_PRIMARY_KEY = "cachePk";
: + /**
: + * <p>
: + * If true, a pre-existing cache is re-opened for read-only access.
: + * </p>
: + */
: + public static final String CACHE_READ_ONLY = "cacheReadOnly";
: +
: +
: +
: +
: +}
:
: Added: lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DIHLogLevels.java
: URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DIHLogLevels.java?rev=1303792&view=auto
: ==============================================================================
: --- lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DIHLogLevels.java (added)
: +++ lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DIHLogLevels.java Thu Mar 22 14:11:16 2012
: @@ -0,0 +1,5 @@
: +package org.apache.solr.handler.dataimport;
: +
: +public enum DIHLogLevels {
: + START_ENTITY, END_ENTITY, TRANSFORMED_ROW, ENTITY_META, PRE_TRANSFORMER_ROW, START_DOC, END_DOC, ENTITY_OUT, ROW_END, TRANSFORMER_EXCEPTION, ENTITY_EXCEPTION, DISABLE_LOGGING, ENABLE_LOGGING, NONE
: +}
:
: Added: lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DIHPropertiesWriter.java
: URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DIHPropertiesWriter.java?rev=1303792&view=auto
: ==============================================================================
: --- lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DIHPropertiesWriter.java (added)
: +++ lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DIHPropertiesWriter.java Thu Mar 22 14:11:16 2012
: @@ -0,0 +1,34 @@
: +package org.apache.solr.handler.dataimport;
: +/**
: + * Licensed to the Apache Software Foundation (ASF) under one or more
: + * contributor license agreements. See the NOTICE file distributed with
: + * this work for additional information regarding copyright ownership.
: + * The ASF licenses this file to You under the Apache License, Version 2.0
: + * (the "License"); you may not use this file except in compliance with
: + * the License. You may obtain a copy of the License at
: + *
: + * http://www.apache.org/licenses/LICENSE-2.0
: + *
: + * Unless required by applicable law or agreed to in writing, software
: + * distributed under the License is distributed on an "AS IS" BASIS,
: + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
: + * See the License for the specific language governing permissions and
: + * limitations under the License.
: + */
: +import java.io.File;
: +import java.util.Properties;
: +
: +/**
: + *
: + */
: +public interface DIHPropertiesWriter {
: +
: + public void init(DataImporter dataImporter);
: +
: + public boolean isWritable();
: +
: + public void persist(Properties props);
: +
: + public Properties readIndexerProperties();
: +
: +}
:
: Added: lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DIHWriter.java
: URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DIHWriter.java?rev=1303792&view=auto
: ==============================================================================
: --- lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DIHWriter.java (added)
: +++ lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DIHWriter.java Thu Mar 22 14:11:16 2012
: @@ -0,0 +1,105 @@
: +package org.apache.solr.handler.dataimport;
: +/**
: + * Licensed to the Apache Software Foundation (ASF) under one or more
: + * contributor license agreements. See the NOTICE file distributed with
: + * this work for additional information regarding copyright ownership.
: + * The ASF licenses this file to You under the Apache License, Version 2.0
: + * (the "License"); you may not use this file except in compliance with
: + * the License. You may obtain a copy of the License at
: + *
: + * http://www.apache.org/licenses/LICENSE-2.0
: + *
: + * Unless required by applicable law or agreed to in writing, software
: + * distributed under the License is distributed on an "AS IS" BASIS,
: + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
: + * See the License for the specific language governing permissions and
: + * limitations under the License.
: + */
: +import java.util.Map;
: +import java.util.Set;
: +
: +import org.apache.solr.common.SolrInputDocument;
: +
: +/**
: + * @solr.experimental
: + *
: + */
: +public interface DIHWriter {
: +
: + /**
: + * <p>
: + * If this writer supports transactions or commit points, then commit any changes,
: + * optionally optimizing the data for read/write performance
: + * </p>
: + * @param optimize
: + */
: + public void commit(boolean optimize);
: +
: + /**
: + * <p>
: + * Release resources used by this writer. After calling close, reads & updates will throw exceptions.
: + * </p>
: + */
: + public void close();
: +
: + /**
: + * <p>
: + * If this writer supports transactions or commit points, then roll back any uncommitted changes.
: + * </p>
: + */
: + public void rollback();
: +
: + /**
: + * <p>
: + * Delete from the writer's underlying data store based the passed-in writer-specific query. (Optional Operation)
: + * </p>
: + * @param q
: + */
: + public void deleteByQuery(String q);
: +
: + /**
: + * <p>
: + * Delete everything from the writer's underlying data store
: + * </p>
: + */
: + public void doDeleteAll();
: +
: + /**
: + * <p>
: + * Delete from the writer's underlying data store based on the passed-in Primary Key
: + * </p>
: + * @param key
: + */
: + public void deleteDoc(Object key);
: +
: +
: +
: + /**
: + * <p>
: + * Add a document to this writer's underlying data store.
: + * </p>
: + * @param doc
: + * @return
: + */
: + public boolean upload(SolrInputDocument doc);
: +
: +
: +
: + /**
: + * <p>
: + * Provide context information for this writer. init() should be called before using the writer.
: + * </p>
: + * @param context
: + */
: + public void init(Context context) ;
: +
: +
: + /**
: + * <p>
: + * Specify the keys to be modified by a delta update (required by writers that can store duplicate keys)
: + * </p>
: + * @param deltaKeys
: + */
: + public void setDeltaKeys(Set<Map<String, Object>> deltaKeys) ;
: +
: +}
:
: Added: lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DIHWriterBase.java
: URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DIHWriterBase.java?rev=1303792&view=auto
: ==============================================================================
: --- lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DIHWriterBase.java (added)
: +++ lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DIHWriterBase.java Thu Mar 22 14:11:16 2012
: @@ -0,0 +1,27 @@
: +package org.apache.solr.handler.dataimport;
: +
: +import java.util.HashSet;
: +import java.util.Map;
: +import java.util.Set;
: +
: +public abstract class DIHWriterBase implements DIHWriter {
: + protected String keyFieldName;
: + protected Set<Object> deltaKeys = null;
: +
: + public void setDeltaKeys(Set<Map<String,Object>> passedInDeltaKeys) {
: + deltaKeys = new HashSet<Object>();
: + for (Map<String,Object> aMap : passedInDeltaKeys) {
: + if (aMap.size() > 0) {
: + Object key = null;
: + if (keyFieldName != null) {
: + key = aMap.get(keyFieldName);
: + } else {
: + key = aMap.entrySet().iterator().next();
: + }
: + if (key != null) {
: + deltaKeys.add(key);
: + }
: + }
: + }
: + }
: +}
:
: Modified: lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DataConfig.java
: URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DataConfig.java?rev=1303792&r1=1303791&r2=1303792&view=diff
: ==============================================================================
: --- lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DataConfig.java (original)
: +++ lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DataConfig.java Thu Mar 22 14:11:16 2012
: @@ -110,6 +110,8 @@ public class DataConfig {
: public DataSource dataSrc;
:
: public Map<String, List<Field>> colNameVsField = new HashMap<String, List<Field>>();
: +
: + public boolean initalized = false;
:
: public Entity() {
: }
:
: Modified: lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DataImportHandler.java
: URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DataImportHandler.java?rev=1303792&r1=1303791&r2=1303792&view=diff
: ==============================================================================
: --- lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DataImportHandler.java (original)
: +++ lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DataImportHandler.java Thu Mar 22 14:11:16 2012
: @@ -22,7 +22,6 @@ import org.apache.solr.common.SolrInputD
: import org.apache.solr.common.params.CommonParams;
: import org.apache.solr.common.params.ModifiableSolrParams;
: import org.apache.solr.common.params.SolrParams;
: -import org.apache.solr.common.params.UpdateParams;
: import org.apache.solr.common.util.ContentStreamBase;
: import org.apache.solr.common.util.NamedList;
: import org.apache.solr.common.util.ContentStream;
: @@ -115,7 +114,7 @@ public class DataImportHandler extends R
: final InputSource is = new InputSource(core.getResourceLoader().openConfig(configLoc));
: is.setSystemId(SystemIdResolver.createSystemIdFromResourceName(configLoc));
: importer = new DataImporter(is, core,
: - dataSources, coreScopeSession);
: + dataSources, coreScopeSession, myName);
: }
: }
: } catch (Throwable e) {
: @@ -167,7 +166,7 @@ public class DataImportHandler extends R
: try {
: processConfiguration((NamedList) initArgs.get("defaults"));
: importer = new DataImporter(new InputSource(new StringReader(requestParams.dataConfig)), req.getCore()
: - , dataSources, coreScopeSession);
: + , dataSources, coreScopeSession, myName);
: } catch (RuntimeException e) {
: rsp.add("exception", DebugLogger.getStacktraceString(e));
: importer = null;
: @@ -199,16 +198,18 @@ public class DataImportHandler extends R
: UpdateRequestProcessor processor = processorChain.createProcessor(req, rsp);
: SolrResourceLoader loader = req.getCore().getResourceLoader();
: SolrWriter sw = getSolrWriter(processor, loader, requestParams);
: -
: +
: if (requestParams.debug) {
: if (debugEnabled) {
: // Synchronous request for the debug mode
: importer.runCmd(requestParams, sw);
: rsp.add("mode", "debug");
: rsp.add("documents", debugDocuments);
: - if (sw.debugLogger != null)
: - rsp.add("verbose-output", sw.debugLogger.output);
: + if (requestParams.debugVerboseOutput != null) {
: + rsp.add("verbose-output", requestParams.debugVerboseOutput);
: + }
: debugDocuments.clear();
: + requestParams.debugVerboseOutput = null;
: } else {
: message = DataImporter.MSG.DEBUG_NOT_ENABLED;
: }
: @@ -217,7 +218,7 @@ public class DataImportHandler extends R
: if(requestParams.contentStream == null && !requestParams.syncMode){
: importer.runAsync(requestParams, sw);
: } else {
: - importer.runCmd(requestParams, sw);
: + importer.runCmd(requestParams, sw);
: }
: }
: } else if (DataImporter.RELOAD_CONF_CMD.equals(command)) {
: @@ -282,9 +283,8 @@ public class DataImportHandler extends R
: private SolrWriter getSolrWriter(final UpdateRequestProcessor processor,
: final SolrResourceLoader loader, final DataImporter.RequestParams requestParams) {
:
: - return new SolrWriter(processor, loader.getConfigDir(), myName) {
: + return new SolrWriter(processor) {
:
: - @Override
: public boolean upload(SolrInputDocument document) {
: try {
: if (requestParams.debug) {
:
: Modified: lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DataImporter.java
: URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DataImporter.java?rev=1303792&r1=1303791&r2=1303792&view=diff
: ==============================================================================
: --- lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DataImporter.java (original)
: +++ lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DataImporter.java Thu Mar 22 14:11:16 2012
: @@ -18,11 +18,13 @@
: package org.apache.solr.handler.dataimport;
:
: import org.apache.solr.common.SolrException;
: +import org.apache.solr.common.SolrInputDocument;
: import org.apache.solr.core.SolrConfig;
: import org.apache.solr.core.SolrCore;
: import org.apache.solr.schema.IndexSchema;
: import org.apache.solr.schema.SchemaField;
: import org.apache.solr.common.util.ContentStream;
: +import org.apache.solr.common.util.NamedList;
: import org.apache.solr.common.util.StrUtils;
: import org.apache.solr.common.util.SystemIdResolver;
: import org.apache.solr.common.util.XMLErrorLogger;
: @@ -39,7 +41,6 @@ import org.apache.commons.io.IOUtils;
:
: import javax.xml.parsers.DocumentBuilder;
: import javax.xml.parsers.DocumentBuilderFactory;
: -import java.io.File;
: import java.io.StringReader;
: import java.text.SimpleDateFormat;
: import java.util.*;
: @@ -81,26 +82,35 @@ public class DataImporter {
: public DocBuilder.Statistics cumulativeStatistics = new DocBuilder.Statistics();
:
: private SolrCore core;
: +
: + private DIHPropertiesWriter propWriter;
:
: private ReentrantLock importLock = new ReentrantLock();
:
: private final Map<String , Object> coreScopeSession;
:
: private boolean isDeltaImportSupported = false;
: + private final String handlerName;
:
: /**
: * Only for testing purposes
: */
: DataImporter() {
: coreScopeSession = new ConcurrentHashMap<String, Object>();
: + this.propWriter = new SimplePropertiesWriter();
: + propWriter.init(this);
: + this.handlerName = "dataimport" ;
: }
:
: - DataImporter(InputSource dataConfig, SolrCore core, Map<String, Properties> ds, Map<String, Object> session) {
: + DataImporter(InputSource dataConfig, SolrCore core, Map<String, Properties> ds, Map<String, Object> session, String handlerName) {
: + this.handlerName = handlerName;
: if (dataConfig == null)
: throw new DataImportHandlerException(SEVERE,
: "Configuration not found");
: this.core = core;
: this.schema = core.getSchema();
: + this.propWriter = new SimplePropertiesWriter();
: + propWriter.init(this);
: dataSourceProps = ds;
: if (session == null)
: session = new HashMap<String, Object>();
: @@ -121,7 +131,11 @@ public class DataImporter {
: }
: }
:
: - private void verifyWithSchema(Map<String, DataConfig.Field> fields) {
: + public String getHandlerName() {
: + return handlerName;
: + }
: +
: + private void verifyWithSchema(Map<String, DataConfig.Field> fields) {
: Map<String, SchemaField> schemaFields = schema.getFields();
: for (Map.Entry<String, SchemaField> entry : schemaFields.entrySet()) {
: SchemaField sf = entry.getValue();
: @@ -354,7 +368,7 @@ public class DataImporter {
: setIndexStartTime(new Date());
:
: try {
: - docBuilder = new DocBuilder(this, writer, requestParams);
: + docBuilder = new DocBuilder(this, writer, propWriter, requestParams);
: checkWritablePersistFile(writer);
: docBuilder.execute();
: if (!requestParams.debug)
: @@ -371,11 +385,11 @@ public class DataImporter {
: }
:
: private void checkWritablePersistFile(SolrWriter writer) {
: - File persistFile = writer.getPersistFile();
: - boolean isWritable = persistFile.exists() ? persistFile.canWrite() : persistFile.getParentFile().canWrite();
: - if (isDeltaImportSupported && !isWritable) {
: - throw new DataImportHandlerException(SEVERE, persistFile.getAbsolutePath() +
: - " is not writable. Delta imports are supported by data config but will not work.");
: +// File persistFile = propWriter.getPersistFile();
: +// boolean isWritable = persistFile.exists() ? persistFile.canWrite() : persistFile.getParentFile().canWrite();
: + if (isDeltaImportSupported && !propWriter.isWritable()) {
: + throw new DataImportHandlerException(SEVERE,
: + "Properties is not writable. Delta imports are supported by data config but will not work.");
: }
: }
:
: @@ -385,7 +399,7 @@ public class DataImporter {
:
: try {
: setIndexStartTime(new Date());
: - docBuilder = new DocBuilder(this, writer, requestParams);
: + docBuilder = new DocBuilder(this, writer, propWriter, requestParams);
: checkWritablePersistFile(writer);
: docBuilder.execute();
: if (!requestParams.debug)
: @@ -504,7 +518,7 @@ public class DataImporter {
: public String command = null;
:
: public boolean debug = false;
: -
: +
: public boolean verbose = false;
:
: public boolean syncMode = false;
: @@ -526,6 +540,10 @@ public class DataImporter {
: public String dataConfig;
:
: public ContentStream contentStream;
: +
: + public List<SolrInputDocument> debugDocuments = new ArrayList<SolrInputDocument>(0);
: +
: + public NamedList debugVerboseOutput = null;
:
: public RequestParams() {
: }
:
: Modified: lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DebugLogger.java
: URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DebugLogger.java?rev=1303792&r1=1303791&r2=1303792&view=diff
: ==============================================================================
: --- lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DebugLogger.java (original)
: +++ lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DebugLogger.java Thu Mar 22 14:11:16 2012
: @@ -46,7 +46,7 @@ class DebugLogger {
: private Stack<DebugInfo> debugStack;
:
: NamedList output;
: - private final SolrWriter writer;
: +// private final SolrWriter writer1;
:
: private static final String LINE = "---------------------------------------------";
:
: @@ -55,8 +55,8 @@ class DebugLogger {
:
: boolean enabled = true;
:
: - public DebugLogger(SolrWriter solrWriter) {
: - writer = solrWriter;
: + public DebugLogger() {
: +// writer = solrWriter;
: output = new NamedList();
: debugStack = new Stack<DebugInfo>() {
:
: @@ -68,7 +68,7 @@ class DebugLogger {
: return super.pop();
: }
: };
: - debugStack.push(new DebugInfo(null, -1, null));
: + debugStack.push(new DebugInfo(null, DIHLogLevels.NONE, null));
: output = debugStack.peek().lst;
: }
:
: @@ -76,47 +76,47 @@ class DebugLogger {
: return debugStack.isEmpty() ? null : debugStack.peek();
: }
:
: - public void log(int event, String name, Object row) {
: - if (event == SolrWriter.DISABLE_LOGGING) {
: + public void log(DIHLogLevels event, String name, Object row) {
: + if (event == DIHLogLevels.DISABLE_LOGGING) {
: enabled = false;
: return;
: - } else if (event == SolrWriter.ENABLE_LOGGING) {
: + } else if (event == DIHLogLevels.ENABLE_LOGGING) {
: enabled = true;
: return;
: }
:
: - if (!enabled && event != SolrWriter.START_ENTITY
: - && event != SolrWriter.END_ENTITY) {
: + if (!enabled && event != DIHLogLevels.START_ENTITY
: + && event != DIHLogLevels.END_ENTITY) {
: return;
: }
:
: - if (event == SolrWriter.START_DOC) {
: - debugStack.push(new DebugInfo(null, SolrWriter.START_DOC, peekStack()));
: - } else if (SolrWriter.START_ENTITY == event) {
: + if (event == DIHLogLevels.START_DOC) {
: + debugStack.push(new DebugInfo(null, DIHLogLevels.START_DOC, peekStack()));
: + } else if (DIHLogLevels.START_ENTITY == event) {
: debugStack
: - .push(new DebugInfo(name, SolrWriter.START_ENTITY, peekStack()));
: - } else if (SolrWriter.ENTITY_OUT == event
: - || SolrWriter.PRE_TRANSFORMER_ROW == event) {
: - if (debugStack.peek().type == SolrWriter.START_ENTITY
: - || debugStack.peek().type == SolrWriter.START_DOC) {
: + .push(new DebugInfo(name, DIHLogLevels.START_ENTITY, peekStack()));
: + } else if (DIHLogLevels.ENTITY_OUT == event
: + || DIHLogLevels.PRE_TRANSFORMER_ROW == event) {
: + if (debugStack.peek().type == DIHLogLevels.START_ENTITY
: + || debugStack.peek().type == DIHLogLevels.START_DOC) {
: debugStack.peek().lst.add(null, fmt.format(new Object[]{++debugStack
: .peek().rowCount}));
: addToNamedList(debugStack.peek().lst, row);
: debugStack.peek().lst.add(null, LINE);
: }
: - } else if (event == SolrWriter.ROW_END) {
: + } else if (event == DIHLogLevels.ROW_END) {
: popAllTransformers();
: - } else if (SolrWriter.END_ENTITY == event) {
: - while (debugStack.pop().type != SolrWriter.START_ENTITY)
: + } else if (DIHLogLevels.END_ENTITY == event) {
: + while (debugStack.pop().type != DIHLogLevels.START_ENTITY)
: ;
: - } else if (SolrWriter.END_DOC == event) {
: - while (debugStack.pop().type != SolrWriter.START_DOC)
: + } else if (DIHLogLevels.END_DOC == event) {
: + while (debugStack.pop().type != DIHLogLevels.START_DOC)
: ;
: - } else if (event == SolrWriter.TRANSFORMER_EXCEPTION) {
: + } else if (event == DIHLogLevels.TRANSFORMER_EXCEPTION) {
: debugStack.push(new DebugInfo(name, event, peekStack()));
: debugStack.peek().lst.add("EXCEPTION",
: getStacktraceString((Exception) row));
: - } else if (SolrWriter.TRANSFORMED_ROW == event) {
: + } else if (DIHLogLevels.TRANSFORMED_ROW == event) {
: debugStack.push(new DebugInfo(name, event, peekStack()));
: debugStack.peek().lst.add(null, LINE);
: addToNamedList(debugStack.peek().lst, row);
: @@ -125,10 +125,10 @@ class DebugLogger {
: DataImportHandlerException dataImportHandlerException = (DataImportHandlerException) row;
: dataImportHandlerException.debugged = true;
: }
: - } else if (SolrWriter.ENTITY_META == event) {
: + } else if (DIHLogLevels.ENTITY_META == event) {
: popAllTransformers();
: debugStack.peek().lst.add(name, row);
: - } else if (SolrWriter.ENTITY_EXCEPTION == event) {
: + } else if (DIHLogLevels.ENTITY_EXCEPTION == event) {
: if (row instanceof DataImportHandlerException) {
: DataImportHandlerException dihe = (DataImportHandlerException) row;
: if (dihe.debugged)
: @@ -144,8 +144,8 @@ class DebugLogger {
:
: private void popAllTransformers() {
: while (true) {
: - int type = debugStack.peek().type;
: - if (type == SolrWriter.START_DOC || type == SolrWriter.START_ENTITY)
: + DIHLogLevels type = debugStack.peek().type;
: + if (type == DIHLogLevels.START_DOC || type == DIHLogLevels.START_ENTITY)
: break;
: debugStack.pop();
: }
: @@ -182,23 +182,23 @@ class DebugLogger {
:
: @Override
: public Object getData(String query) {
: - writer.log(SolrWriter.ENTITY_META, "query", query);
: + log(DIHLogLevels.ENTITY_META, "query", query);
: long start = System.currentTimeMillis();
: try {
: return ds.getData(query);
: } catch (DataImportHandlerException de) {
: - writer.log(SolrWriter.ENTITY_EXCEPTION,
: + log(DIHLogLevels.ENTITY_EXCEPTION,
: null, de);
: throw de;
: } catch (Exception e) {
: - writer.log(SolrWriter.ENTITY_EXCEPTION,
: + log(DIHLogLevels.ENTITY_EXCEPTION,
: null, e);
: DataImportHandlerException de = new DataImportHandlerException(
: DataImportHandlerException.SEVERE, "", e);
: de.debugged = true;
: throw de;
: } finally {
: - writer.log(SolrWriter.ENTITY_META, "time-taken", DocBuilder
: + log(DIHLogLevels.ENTITY_META, "time-taken", DocBuilder
: .getTimeElapsedSince(start));
: }
: }
: @@ -209,18 +209,18 @@ class DebugLogger {
: return new Transformer() {
: @Override
: public Object transformRow(Map<String, Object> row, Context context) {
: - writer.log(SolrWriter.PRE_TRANSFORMER_ROW, null, row);
: + log(DIHLogLevels.PRE_TRANSFORMER_ROW, null, row);
: String tName = getTransformerName(t);
: Object result = null;
: try {
: result = t.transformRow(row, context);
: - writer.log(SolrWriter.TRANSFORMED_ROW, tName, result);
: + log(DIHLogLevels.TRANSFORMED_ROW, tName, result);
: } catch (DataImportHandlerException de) {
: - writer.log(SolrWriter.TRANSFORMER_EXCEPTION, tName, de);
: + log(DIHLogLevels.TRANSFORMER_EXCEPTION, tName, de);
: de.debugged = true;
: throw de;
: } catch (Exception e) {
: - writer.log(SolrWriter.TRANSFORMER_EXCEPTION, tName, e);
: + log(DIHLogLevels.TRANSFORMER_EXCEPTION, tName, e);
: DataImportHandlerException de = new DataImportHandlerException(DataImportHandlerException.SEVERE, "", e);
: de.debugged = true;
: throw de;
: @@ -259,23 +259,23 @@ class DebugLogger {
:
: NamedList lst;
:
: - int type;
: + DIHLogLevels type;
:
: DebugInfo parent;
:
: - public DebugInfo(String name, int type, DebugInfo parent) {
: + public DebugInfo(String name, DIHLogLevels type, DebugInfo parent) {
: this.name = name;
: this.type = type;
: this.parent = parent;
: lst = new NamedList();
: if (parent != null) {
: String displayName = null;
: - if (type == SolrWriter.START_ENTITY) {
: + if (type == DIHLogLevels.START_ENTITY) {
: displayName = "entity:" + name;
: - } else if (type == SolrWriter.TRANSFORMED_ROW
: - || type == SolrWriter.TRANSFORMER_EXCEPTION) {
: + } else if (type == DIHLogLevels.TRANSFORMED_ROW
: + || type == DIHLogLevels.TRANSFORMER_EXCEPTION) {
: displayName = "transformer:" + name;
: - } else if (type == SolrWriter.START_DOC) {
: + } else if (type == DIHLogLevels.START_DOC) {
: this.name = displayName = "document#" + SolrWriter.getDocCount();
: }
: parent.lst.add(displayName, lst);
:
: Modified: lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DocBuilder.java
: URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DocBuilder.java?rev=1303792&r1=1303791&r2=1303792&view=diff
: ==============================================================================
: --- lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DocBuilder.java (original)
: +++ lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DocBuilder.java Thu Mar 22 14:11:16 2012
: @@ -57,27 +57,62 @@ public class DocBuilder {
:
: public Statistics importStatistics = new Statistics();
:
: - SolrWriter writer;
: + DIHWriter writer;
:
: DataImporter.RequestParams requestParameters;
:
: boolean verboseDebug = false;
:
: - Map<String, Object> session = new ConcurrentHashMap<String, Object>();
: + Map<String, Object> session = new ConcurrentHashMap<String, Object>();
:
: static final ThreadLocal<DocBuilder> INSTANCE = new ThreadLocal<DocBuilder>();
: Map<String, Object> functionsNamespace;
: private Properties persistedProperties;
: -
: - public DocBuilder(DataImporter dataImporter, SolrWriter writer, DataImporter.RequestParams reqParams) {
: +
: + private DIHPropertiesWriter propWriter;
: + private static final String PARAM_WRITER_IMPL = "writerImpl";
: + private static final String DEFAULT_WRITER_NAME = "SolrWriter";
: + private DebugLogger debugLogger;
: + private DataImporter.RequestParams reqParams;
: +
: + @SuppressWarnings("unchecked")
: + public DocBuilder(DataImporter dataImporter, SolrWriter solrWriter, DIHPropertiesWriter propWriter, DataImporter.RequestParams reqParams) {
: INSTANCE.set(this);
: this.dataImporter = dataImporter;
: - this.writer = writer;
: + this.reqParams = reqParams;
: + this.propWriter = propWriter;
: DataImporter.QUERY_COUNT.set(importStatistics.queryCount);
: requestParameters = reqParams;
: verboseDebug = requestParameters.debug && requestParameters.verbose;
: functionsNamespace = EvaluatorBag.getFunctionsNamespace(this.dataImporter.getConfig().functions, this);
: - persistedProperties = writer.readIndexerProperties();
: + persistedProperties = propWriter.readIndexerProperties();
: +
: + String writerClassStr = null;
: + if(reqParams!=null && reqParams.requestParams != null) {
: + writerClassStr = (String) reqParams.requestParams.get(PARAM_WRITER_IMPL);
: + }
: + if(writerClassStr != null && !writerClassStr.equals(DEFAULT_WRITER_NAME) && !writerClassStr.equals(DocBuilder.class.getPackage().getName() + "." + DEFAULT_WRITER_NAME)) {
: + try {
: + Class<DIHWriter> writerClass = loadClass(writerClassStr, dataImporter.getCore());
: + this.writer = writerClass.newInstance();
: + } catch (Exception e) {
: + throw new DataImportHandlerException(DataImportHandlerException.SEVERE, "Unable to load Writer implementation:" + writerClassStr, e);
: + }
: + } else {
: + writer = solrWriter;
: + }
: + ContextImpl ctx = new ContextImpl(null, null, null, null, reqParams.requestParams, null, this);
: + writer.init(ctx);
: + }
: +
: +
: +
: +
: + DebugLogger getDebugLogger(){
: + if (debugLogger == null) {
: + debugLogger = new DebugLogger();
: + }
: + return debugLogger;
: }
:
: public VariableResolverImpl getVariableResolver() {
: @@ -137,94 +172,103 @@ public class DocBuilder {
:
: @SuppressWarnings("unchecked")
: public void execute() {
: - dataImporter.store(DataImporter.STATUS_MSGS, statusMessages);
: - document = dataImporter.getConfig().document;
: - final AtomicLong startTime = new AtomicLong(System.currentTimeMillis());
: - statusMessages.put(TIME_ELAPSED, new Object() {
: - @Override
: - public String toString() {
: - return getTimeElapsedSince(startTime.get());
: - }
: - });
: -
: - statusMessages.put(DataImporter.MSG.TOTAL_QUERIES_EXECUTED,
: - importStatistics.queryCount);
: - statusMessages.put(DataImporter.MSG.TOTAL_ROWS_EXECUTED,
: - importStatistics.rowsCount);
: - statusMessages.put(DataImporter.MSG.TOTAL_DOC_PROCESSED,
: - importStatistics.docCount);
: - statusMessages.put(DataImporter.MSG.TOTAL_DOCS_SKIPPED,
: - importStatistics.skipDocCount);
: -
: - List<String> entities = requestParameters.entities;
: -
: - // Trigger onImportStart
: - if (document.onImportStart != null) {
: - invokeEventListener(document.onImportStart);
: - }
: - AtomicBoolean fullCleanDone = new AtomicBoolean(false);
: - //we must not do a delete of *:* multiple times if there are multiple root entities to be run
: - Properties lastIndexTimeProps = new Properties();
: - lastIndexTimeProps.setProperty(LAST_INDEX_KEY,
: - DataImporter.DATE_TIME_FORMAT.get().format(dataImporter.getIndexStartTime()));
: - for (DataConfig.Entity e : document.entities) {
: - if (entities != null && !entities.contains(e.name))
: - continue;
: - lastIndexTimeProps.setProperty(e.name + "." + LAST_INDEX_KEY,
: - DataImporter.DATE_TIME_FORMAT.get().format(new Date()));
: - root = e;
: - String delQuery = e.allAttributes.get("preImportDeleteQuery");
: - if (dataImporter.getStatus() == DataImporter.Status.RUNNING_DELTA_DUMP) {
: - cleanByQuery(delQuery, fullCleanDone);
: - doDelta();
: - delQuery = e.allAttributes.get("postImportDeleteQuery");
: - if (delQuery != null) {
: - fullCleanDone.set(false);
: - cleanByQuery(delQuery, fullCleanDone);
: - }
: - } else {
: - cleanByQuery(delQuery, fullCleanDone);
: - doFullDump();
: - delQuery = e.allAttributes.get("postImportDeleteQuery");
: - if (delQuery != null) {
: - fullCleanDone.set(false);
: - cleanByQuery(delQuery, fullCleanDone);
: - }
: - }
: - statusMessages.remove(DataImporter.MSG.TOTAL_DOC_PROCESSED);
: - }
: -
: - if (stop.get()) {
: - // Dont commit if aborted using command=abort
: - statusMessages.put("Aborted", DataImporter.DATE_TIME_FORMAT.get().format(new Date()));
: - rollback();
: - } else {
: - // Do not commit unnecessarily if this is a delta-import and no documents were created or deleted
: - if (!requestParameters.clean) {
: - if (importStatistics.docCount.get() > 0 || importStatistics.deletedDocCount.get() > 0) {
: - finish(lastIndexTimeProps);
: - }
: - } else {
: - // Finished operation normally, commit now
: - finish(lastIndexTimeProps);
: - }
: -
: - if (writer != null) {
: - writer.finish();
: - }
: -
: - if (document.onImportEnd != null) {
: - invokeEventListener(document.onImportEnd);
: - }
: - }
: -
: - statusMessages.remove(TIME_ELAPSED);
: - statusMessages.put(DataImporter.MSG.TOTAL_DOC_PROCESSED, ""+ importStatistics.docCount.get());
: - if(importStatistics.failedDocCount.get() > 0)
: - statusMessages.put(DataImporter.MSG.TOTAL_FAILED_DOCS, ""+ importStatistics.failedDocCount.get());
: -
: - statusMessages.put("Time taken ", getTimeElapsedSince(startTime.get()));
: - LOG.info("Time taken = " + getTimeElapsedSince(startTime.get()));
: + try {
: + dataImporter.store(DataImporter.STATUS_MSGS, statusMessages);
: + document = dataImporter.getConfig().document;
: + final AtomicLong startTime = new AtomicLong(System.currentTimeMillis());
: + statusMessages.put(TIME_ELAPSED, new Object() {
: + @Override
: + public String toString() {
: + return getTimeElapsedSince(startTime.get());
: + }
: + });
: +
: + statusMessages.put(DataImporter.MSG.TOTAL_QUERIES_EXECUTED,
: + importStatistics.queryCount);
: + statusMessages.put(DataImporter.MSG.TOTAL_ROWS_EXECUTED,
: + importStatistics.rowsCount);
: + statusMessages.put(DataImporter.MSG.TOTAL_DOC_PROCESSED,
: + importStatistics.docCount);
: + statusMessages.put(DataImporter.MSG.TOTAL_DOCS_SKIPPED,
: + importStatistics.skipDocCount);
: +
: + List<String> entities = requestParameters.entities;
: +
: + // Trigger onImportStart
: + if (document.onImportStart != null) {
: + invokeEventListener(document.onImportStart);
: + }
: + AtomicBoolean fullCleanDone = new AtomicBoolean(false);
: + //we must not do a delete of *:* multiple times if there are multiple root entities to be run
: + Properties lastIndexTimeProps = new Properties();
: + lastIndexTimeProps.setProperty(LAST_INDEX_KEY,
: + DataImporter.DATE_TIME_FORMAT.get().format(dataImporter.getIndexStartTime()));
: + for (DataConfig.Entity e : document.entities) {
: + if (entities != null && !entities.contains(e.name))
: + continue;
: + lastIndexTimeProps.setProperty(e.name + "." + LAST_INDEX_KEY,
: + DataImporter.DATE_TIME_FORMAT.get().format(new Date()));
: + root = e;
: + String delQuery = e.allAttributes.get("preImportDeleteQuery");
: + if (dataImporter.getStatus() == DataImporter.Status.RUNNING_DELTA_DUMP) {
: + cleanByQuery(delQuery, fullCleanDone);
: + doDelta();
: + delQuery = e.allAttributes.get("postImportDeleteQuery");
: + if (delQuery != null) {
: + fullCleanDone.set(false);
: + cleanByQuery(delQuery, fullCleanDone);
: + }
: + } else {
: + cleanByQuery(delQuery, fullCleanDone);
: + doFullDump();
: + delQuery = e.allAttributes.get("postImportDeleteQuery");
: + if (delQuery != null) {
: + fullCleanDone.set(false);
: + cleanByQuery(delQuery, fullCleanDone);
: + }
: + }
: + statusMessages.remove(DataImporter.MSG.TOTAL_DOC_PROCESSED);
: + }
: +
: + if (stop.get()) {
: + // Dont commit if aborted using command=abort
: + statusMessages.put("Aborted", DataImporter.DATE_TIME_FORMAT.get().format(new Date()));
: + rollback();
: + } else {
: + // Do not commit unnecessarily if this is a delta-import and no documents were created or deleted
: + if (!requestParameters.clean) {
: + if (importStatistics.docCount.get() > 0 || importStatistics.deletedDocCount.get() > 0) {
: + finish(lastIndexTimeProps);
: + }
: + } else {
: + // Finished operation normally, commit now
: + finish(lastIndexTimeProps);
: + }
: +
: + if (document.onImportEnd != null) {
: + invokeEventListener(document.onImportEnd);
: + }
: + }
: +
: + statusMessages.remove(TIME_ELAPSED);
: + statusMessages.put(DataImporter.MSG.TOTAL_DOC_PROCESSED, ""+ importStatistics.docCount.get());
: + if(importStatistics.failedDocCount.get() > 0)
: + statusMessages.put(DataImporter.MSG.TOTAL_FAILED_DOCS, ""+ importStatistics.failedDocCount.get());
: +
: + statusMessages.put("Time taken ", getTimeElapsedSince(startTime.get()));
: + LOG.info("Time taken = " + getTimeElapsedSince(startTime.get()));
: + } catch(Exception e)
: + {
: + throw new RuntimeException(e);
: + } finally
: + {
: + if (writer != null) {
: + writer.close();
: + }
: + if(requestParameters.debug) {
: + requestParameters.debugVerboseOutput = getDebugLogger().output;
: + }
: + }
: }
:
: @SuppressWarnings("unchecked")
: @@ -240,7 +284,7 @@ public class DocBuilder {
: addStatusMessage("Optimized");
: }
: try {
: - writer.persist(lastIndexTimeProps);
: + propWriter.persist(lastIndexTimeProps);
: } catch (Exception e) {
: LOG.error("Could not write property file", e);
: statusMessages.put("error", "Could not write property file. Delta imports will not work. " +
: @@ -254,20 +298,32 @@ public class DocBuilder {
: addStatusMessage("Rolledback");
: }
:
: - @SuppressWarnings("unchecked")
: private void doFullDump() {
: addStatusMessage("Full Dump Started");
: - if(dataImporter.getConfig().isMultiThreaded && !verboseDebug){
: + if (dataImporter.getConfig().isMultiThreaded && !verboseDebug) {
: + EntityRunner entityRunner = null;
: try {
: LOG.info("running multithreaded full-import");
: - new EntityRunner(root,null).run(null,Context.FULL_DUMP,null);
: + entityRunner = new EntityRunner(root, null);
: + entityRunner.run(null, Context.FULL_DUMP, null);
: } catch (Exception e) {
: throw new RuntimeException("Error in multi-threaded import", e);
: + } finally {
: + if (entityRunner != null) {
: + List<EntityRunner> closure = new ArrayList<EntityRunner>();
: + closure.add(entityRunner);
: + for (int i = 0; i < closure.size(); i++) {
: + assert(!closure.get(i).entityProcessorWrapper.isEmpty());
: + closure.addAll(closure.get(i).entityProcessorWrapper.iterator().next().children.values());
: + }
: + for (EntityRunner er : closure) {
: + er.entityProcessor.destroy();
: + }
: + }
: }
: } else {
: buildDocument(getVariableResolver(), null, null, root, true, null);
: - }
: -
: + }
: }
:
: @SuppressWarnings("unchecked")
: @@ -293,6 +349,7 @@ public class DocBuilder {
: // Make sure that documents are not re-created
: }
: deletedKeys = null;
: + writer.setDeltaKeys(allPks);
:
: statusMessages.put("Total Changed Documents", allPks.size());
: VariableResolverImpl vri = getVariableResolver();
: @@ -385,7 +442,7 @@ public class DocBuilder {
: for (int i = 0; i < threads; i++) {
: entityProcessorWrapper.add(new ThreadedEntityProcessorWrapper(entityProcessor, DocBuilder.this, this, getVariableResolver()));
: }
: - context = new ThreadedContext(this, DocBuilder.this);
: + context = new ThreadedContext(this, DocBuilder.this, getVariableResolver());
: }
:
:
: @@ -426,7 +483,6 @@ public class DocBuilder {
: }
: }
: } finally {
: - entityProcessor.destroy();
: }
:
:
: @@ -476,6 +532,9 @@ public class DocBuilder {
: LOG.debug("adding a doc "+docWrapper);
: }
: boolean result = writer.upload(docWrapper);
: + if(reqParams.debug) {
: + reqParams.debugDocuments.add(docWrapper);
: + }
: docWrapper = null;
: if (result){
: importStatistics.docCount.incrementAndGet();
: @@ -511,7 +570,6 @@ public class DocBuilder {
: }
: }
: } finally {
: - epw.destroy();
: currentEntityProcWrapper.remove();
: Context.CURRENT_CONTEXT.remove();
: }
: @@ -526,7 +584,7 @@ public class DocBuilder {
: }
: }
: }
: - }
: + }
: }
:
: /**A reverse linked list .
: @@ -544,10 +602,35 @@ public class DocBuilder {
: }
: }
:
: + private void resetEntity(DataConfig.Entity entity) {
: + entity.initalized = false;
: + if (entity.entities != null) {
: + for (DataConfig.Entity child : entity.entities) {
: + resetEntity(child);
: + }
: + }
: + }
: +
: + private void buildDocument(VariableResolverImpl vr, DocWrapper doc,
: + Map<String,Object> pk, DataConfig.Entity entity, boolean isRoot,
: + ContextImpl parentCtx) {
: + List<EntityProcessorWrapper> entitiesToDestroy = new ArrayList<EntityProcessorWrapper>();
: + try {
: + buildDocument(vr, doc, pk, entity, isRoot, parentCtx, entitiesToDestroy);
: + } catch (Exception e) {
: + throw new RuntimeException(e);
: + } finally {
: + for (EntityProcessorWrapper entityWrapper : entitiesToDestroy) {
: + entityWrapper.destroy();
: + }
: + resetEntity(entity);
: + }
: + }
: +
: @SuppressWarnings("unchecked")
: private void buildDocument(VariableResolverImpl vr, DocWrapper doc,
: Map<String, Object> pk, DataConfig.Entity entity, boolean isRoot,
: - ContextImpl parentCtx) {
: + ContextImpl parentCtx, List<EntityProcessorWrapper> entitiesToDestroy) {
:
: EntityProcessorWrapper entityProcessor = getEntityProcessor(entity);
:
: @@ -556,13 +639,17 @@ public class DocBuilder {
: session, parentCtx, this);
: entityProcessor.init(ctx);
: Context.CURRENT_CONTEXT.set(ctx);
: + if (!entity.initalized) {
: + entitiesToDestroy.add(entityProcessor);
: + entity.initalized = true;
: + }
:
: if (requestParameters.start > 0) {
: - writer.log(SolrWriter.DISABLE_LOGGING, null, null);
: + getDebugLogger().log(DIHLogLevels.DISABLE_LOGGING, null, null);
: }
:
: if (verboseDebug) {
: - writer.log(SolrWriter.START_ENTITY, entity.name, null);
: + getDebugLogger().log(DIHLogLevels.START_ENTITY, entity.name, null);
: }
:
: int seenDocCount = 0;
: @@ -576,11 +663,11 @@ public class DocBuilder {
: seenDocCount++;
:
: if (seenDocCount > requestParameters.start) {
: - writer.log(SolrWriter.ENABLE_LOGGING, null, null);
: + getDebugLogger().log(DIHLogLevels.ENABLE_LOGGING, null, null);
: }
:
: if (verboseDebug && entity.isDocRoot) {
: - writer.log(SolrWriter.START_DOC, entity.name, null);
: + getDebugLogger().log(DIHLogLevels.START_DOC, entity.name, null);
: }
: if (doc == null && entity.isDocRoot) {
: doc = new DocWrapper();
: @@ -609,7 +696,7 @@ public class DocBuilder {
: }
:
: if (verboseDebug) {
: - writer.log(SolrWriter.ENTITY_OUT, entity.name, arow);
: + getDebugLogger().log(DIHLogLevels.ENTITY_OUT, entity.name, arow);
: }
: importStatistics.rowsCount.incrementAndGet();
: if (doc != null) {
: @@ -620,7 +707,7 @@ public class DocBuilder {
: vr.addNamespace(entity.name, arow);
: for (DataConfig.Entity child : entity.entities) {
: buildDocument(vr, doc,
: - child.isDocRoot ? pk : null, child, false, ctx);
: + child.isDocRoot ? pk : null, child, false, ctx, entitiesToDestroy);
: }
: vr.removeNamespace(entity.name);
: }
: @@ -634,6 +721,9 @@ public class DocBuilder {
: return;
: if (!doc.isEmpty()) {
: boolean result = writer.upload(doc);
: + if(reqParams.debug) {
: + reqParams.debugDocuments.add(doc);
: + }
: doc = null;
: if (result){
: importStatistics.docCount.incrementAndGet();
: @@ -645,7 +735,7 @@ public class DocBuilder {
:
: } catch (DataImportHandlerException e) {
: if (verboseDebug) {
: - writer.log(SolrWriter.ENTITY_EXCEPTION, entity.name, e);
: + getDebugLogger().log(DIHLogLevels.ENTITY_EXCEPTION, entity.name, e);
: }
: if(e.getErrCode() == DataImportHandlerException.SKIP_ROW){
: continue;
: @@ -664,23 +754,22 @@ public class DocBuilder {
: throw e;
: } catch (Throwable t) {
: if (verboseDebug) {
: - writer.log(SolrWriter.ENTITY_EXCEPTION, entity.name, t);
: + getDebugLogger().log(DIHLogLevels.ENTITY_EXCEPTION, entity.name, t);
: }
: throw new DataImportHandlerException(DataImportHandlerException.SEVERE, t);
: } finally {
: if (verboseDebug) {
: - writer.log(SolrWriter.ROW_END, entity.name, null);
: + getDebugLogger().log(DIHLogLevels.ROW_END, entity.name, null);
: if (entity.isDocRoot)
: - writer.log(SolrWriter.END_DOC, null, null);
: + getDebugLogger().log(DIHLogLevels.END_DOC, null, null);
: Context.CURRENT_CONTEXT.remove();
: }
: }
: }
: } finally {
: if (verboseDebug) {
: - writer.log(SolrWriter.END_ENTITY, null, null);
: + getDebugLogger().log(DIHLogLevels.END_ENTITY, null, null);
: }
: - entityProcessor.destroy();
: }
: }
:
:
: Modified: lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/EntityProcessorBase.java
: URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/EntityProcessorBase.java?rev=1303792&r1=1303791&r2=1303792&view=diff
: ==============================================================================
: --- lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/EntityProcessorBase.java (original)
: +++ lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/EntityProcessorBase.java Thu Mar 22 14:11:16 2012
: @@ -17,6 +17,7 @@
: package org.apache.solr.handler.dataimport;
:
: import org.apache.solr.common.SolrException;
: +
: import static org.apache.solr.handler.dataimport.DataImportHandlerException.*;
: import org.slf4j.Logger;
: import org.slf4j.LoggerFactory;
: @@ -43,21 +44,25 @@ public class EntityProcessorBase extends
:
: protected Iterator<Map<String, Object>> rowIterator;
:
: - protected List<Transformer> transformers;
: -
: - protected String query;
: -
: - protected String onError = ABORT;
: + protected String query;
: +
: + protected String onError = ABORT;
: +
: + protected DIHCacheSupport cacheSupport = null;
:
:
: @Override
: public void init(Context context) {
: - rowIterator = null;
: this.context = context;
: if (isFirstInit) {
: firstInit(context);
: }
: - query = null;
: + if(cacheSupport!=null) {
: + rowIterator = null;
: + query = null;
: + cacheSupport.initNewParent(context);
: + }
: +
: }
:
: /**first time init call. do one-time operations here
: @@ -66,29 +71,20 @@ public class EntityProcessorBase extends
: entityName = context.getEntityAttribute("name");
: String s = context.getEntityAttribute(ON_ERROR);
: if (s != null) onError = s;
: + initCache(context);
: isFirstInit = false;
: }
:
: + protected void initCache(Context context) {
: + String cacheImplName = context
: + .getResolvedEntityAttribute(DIHCacheSupport.CACHE_IMPL);
:
: - protected Map<String, Object> getNext() {
: - try {
: - if (rowIterator == null)
: - return null;
: - if (rowIterator.hasNext())
: - return rowIterator.next();
: - query = null;
: - rowIterator = null;
: - return null;
: - } catch (Exception e) {
: - SolrException.log(log, "getNext() failed for query '" + query + "'", e);
: - query = null;
: - rowIterator = null;
: - wrapAndThrow(DataImportHandlerException.WARN, e);
: - return null;
: + if (cacheImplName != null ) {
: + cacheSupport = new DIHCacheSupport(context, cacheImplName);
: + }
: }
: - }
:
: - @Override
: + @Override
: public Map<String, Object> nextModifiedRowKey() {
: return null;
: }
: @@ -114,165 +110,40 @@ public class EntityProcessorBase extends
: public Map<String, Object> nextRow() {
: return null;// do not do anything
: }
: -
: -
: - @Override
: - public void destroy() {
: - /*no op*/
: - }
: -
: - /**
: - * Only used by cache implementations
: - */
: - protected String cachePk;
: -
: - /**
: - * Only used by cache implementations
: - */
: - protected String cacheVariableName;
: -
: - /**
: - * Only used by cache implementations
: - */
: - protected Map<String, List<Map<String, Object>>> simpleCache;
: -
: - /**
: - * Only used by cache implementations
: - */
: - protected Map<String, Map<Object, List<Map<String, Object>>>> cacheWithWhereClause;
: -
: - protected List<Map<String, Object>> dataSourceRowCache;
: -
: - /**
: - * Only used by cache implementations
: - */
: - protected void cacheInit() {
: - if (simpleCache != null || cacheWithWhereClause != null)
: - return;
: - String where = context.getEntityAttribute("where");
: -
: - String cacheKey = context.getEntityAttribute(CACHE_KEY);
: - String lookupKey = context.getEntityAttribute(CACHE_LOOKUP);
: - if(cacheKey != null && lookupKey == null){
: - throw new DataImportHandlerException(DataImportHandlerException.SEVERE,
: - "'cacheKey' is specified for the entity "+ entityName+" but 'cacheLookup' is missing" );
: -
: - }
: - if (where == null && cacheKey == null) {
: - simpleCache = new HashMap<String, List<Map<String, Object>>>();
: - } else {
: - if (where != null) {
: - String[] splits = where.split("=");
: - cachePk = splits[0];
: - cacheVariableName = splits[1].trim();
: - } else {
: - cachePk = cacheKey;
: - cacheVariableName = lookupKey;
: - }
: - cacheWithWhereClause = new HashMap<String, Map<Object, List<Map<String, Object>>>>();
: - }
: - }
: -
: - /**
: - * If the where clause is present the cache is sql Vs Map of key Vs List of Rows. Only used by cache implementations.
: - *
: - * @param query the query string for which cached data is to be returned
: - *
: - * @return the cached row corresponding to the given query after all variables have been resolved
: - */
: - protected Map<String, Object> getIdCacheData(String query) {
: - Map<Object, List<Map<String, Object>>> rowIdVsRows = cacheWithWhereClause
: - .get(query);
: - List<Map<String, Object>> rows = null;
: - Object key = context.resolve(cacheVariableName);
: - if (key == null) {
: - throw new DataImportHandlerException(DataImportHandlerException.WARN,
: - "The cache lookup value : " + cacheVariableName + " is resolved to be null in the entity :" +
: - context.getEntityAttribute("name"));
: -
: - }
: - if (rowIdVsRows != null) {
: - rows = rowIdVsRows.get(key);
: - if (rows == null)
: +
: + protected Map<String, Object> getNext() {
: + if(cacheSupport==null) {
: + try {
: + if (rowIterator == null)
: + return null;
: + if (rowIterator.hasNext())
: + return rowIterator.next();
: + query = null;
: + rowIterator = null;
: return null;
: - dataSourceRowCache = new ArrayList<Map<String, Object>>(rows);
: - return getFromRowCacheTransformed();
: - } else {
: - rows = getAllNonCachedRows();
: - if (rows.isEmpty()) {
: + } catch (Exception e) {
: + SolrException.log(log, "getNext() failed for query '" + query + "'", e);
: + query = null;
: + rowIterator = null;
: + wrapAndThrow(DataImportHandlerException.WARN, e);
: return null;
: - } else {
: - rowIdVsRows = new HashMap<Object, List<Map<String, Object>>>();
: - for (Map<String, Object> row : rows) {
: - Object k = row.get(cachePk);
: - if (k == null) {
: - throw new DataImportHandlerException(DataImportHandlerException.WARN,
: - "No value available for the cache key : " + cachePk + " in the entity : " +
: - context.getEntityAttribute("name"));
: - }
: - if (!k.getClass().equals(key.getClass())) {
: - throw new DataImportHandlerException(DataImportHandlerException.WARN,
: - "The key in the cache type : " + k.getClass().getName() +
: - "is not same as the lookup value type " + key.getClass().getName() + " in the entity " +
: - context.getEntityAttribute("name"));
: - }
: - if (rowIdVsRows.get(k) == null)
: - rowIdVsRows.put(k, new ArrayList<Map<String, Object>>());
: - rowIdVsRows.get(k).add(row);
: - }
: - cacheWithWhereClause.put(query, rowIdVsRows);
: - if (!rowIdVsRows.containsKey(key))
: - return null;
: - dataSourceRowCache = new ArrayList<Map<String, Object>>(rowIdVsRows.get(key));
: - if (dataSourceRowCache.isEmpty()) {
: - dataSourceRowCache = null;
: - return null;
: - }
: - return getFromRowCacheTransformed();
: }
: - }
: + } else {
: + return cacheSupport.getCacheData(context, query, rowIterator);
: + }
: }
:
: - /**
: - * <p> Get all the rows from the the datasource for the given query. Only used by cache implementations. </p> This
: - * <b>must</b> be implemented by sub-classes which intend to provide a cached implementation
: - *
: - * @return the list of all rows fetched from the datasource.
: - */
: - protected List<Map<String, Object>> getAllNonCachedRows() {
: - return Collections.EMPTY_LIST;
: - }
:
: - /**
: - * If where clause is not present the cache is a Map of query vs List of Rows. Only used by cache implementations.
: - *
: - * @param query string for which cached row is to be returned
: - *
: - * @return the cached row corresponding to the given query
: - */
: - protected Map<String, Object> getSimpleCacheData(String query) {
: - List<Map<String, Object>> rows = simpleCache.get(query);
: - if (rows != null) {
: - dataSourceRowCache = new ArrayList<Map<String, Object>>(rows);
: - return getFromRowCacheTransformed();
: - } else {
: - rows = getAllNonCachedRows();
: - if (rows.isEmpty()) {
: - return null;
: - } else {
: - dataSourceRowCache = new ArrayList<Map<String, Object>>(rows);
: - simpleCache.put(query, rows);
: - return getFromRowCacheTransformed();
: - }
: - }
: + @Override
: + public void destroy() {
: + query = null;
: + if(cacheSupport!=null){
: + cacheSupport.destroyAll();
: + }
: + cacheSupport = null;
: }
:
: - protected Map<String, Object> getFromRowCacheTransformed() {
: - Map<String, Object> r = dataSourceRowCache.remove(0);
: - if (dataSourceRowCache.isEmpty())
: - dataSourceRowCache = null;
: - return r;
: - }
: +
:
: public static final String TRANSFORMER = "transformer";
:
: @@ -288,8 +159,4 @@ public class EntityProcessorBase extends
:
: public static final String SKIP_DOC = "$skipDoc";
:
: - public static final String CACHE_KEY = "cacheKey";
: -
: - public static final String CACHE_LOOKUP = "cacheLookup";
: -
: }
:
: Modified: lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/EntityProcessorWrapper.java
: URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/EntityProcessorWrapper.java?rev=1303792&r1=1303791&r2=1303792&view=diff
: ==============================================================================
: --- lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/EntityProcessorWrapper.java (original)
: +++ lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/EntityProcessorWrapper.java Thu Mar 22 14:11:16 2012
: @@ -84,7 +84,7 @@ public class EntityProcessorWrapper exte
: @Override
: public boolean add(Transformer transformer) {
: if (docBuilder != null && docBuilder.verboseDebug) {
: - transformer = docBuilder.writer.getDebugLogger().wrapTransformer(transformer);
: + transformer = docBuilder.getDebugLogger().wrapTransformer(transformer);
: }
: return super.add(transformer);
: }
:
: Added: lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/SimplePropertiesWriter.java
: URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/SimplePropertiesWriter.java?rev=1303792&view=auto
: ==============================================================================
: --- lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/SimplePropertiesWriter.java (added)
: +++ lucene/dev/branches/branch_3x/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/SimplePropertiesWriter.java Thu Mar 22 14:11:16 2012
: @@ -0,0 +1,117 @@
: +package org.apache.solr.handler.dataimport;
: +
: +/**
: + * Licensed to the Apache Software Foundation (ASF) under one or more
: + * contributor license agreements. See the NOTICE file distributed with
: + * this work for additional information regarding copyright ownership.
: + * The ASF licenses this file to You under the Apache License, Version 2.0
: + * (the "License"); you may not use this file except in compliance with
: + * the License. You may obtain a copy of the License at
: + *
: + * http://www.apache.org/licenses/LICENSE-2.0
: + *
: + * Unless required by applicable law or agreed to in writing, software
: + * distributed under the License is distributed on an "AS IS" BASIS,
: + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
: + * See the License for the specific language governing permissions and
: + * limitations under the License.
: + */
: +
: +import java.io.File;
: +import java.io.FileInputStream;
: +import java.io.FileNotFoundException;
: +import java.io.FileOutputStream;
: +import java.io.IOException;
: +import java.io.InputStream;
: +import java.io.OutputStream;
: +import java.util.Properties;
: +
: +import org.apache.solr.core.SolrCore;
: +import org.slf4j.Logger;
: +import org.slf4j.LoggerFactory;
: +
: +public class SimplePropertiesWriter implements DIHPropertiesWriter {
: + private static final Logger log = LoggerFactory
: + .getLogger(SimplePropertiesWriter.class);
: +
: + static final String IMPORTER_PROPERTIES = "dataimport.properties";
: +
: + static final String LAST_INDEX_KEY = "last_index_time";
: +
: + private String persistFilename = IMPORTER_PROPERTIES;
: +
: + private String configDir = null;
: +
: + public void init(DataImporter dataImporter) {
: + SolrCore core = dataImporter.getCore();
: + String configDir = core == null ? "." : core.getResourceLoader()
: + .getConfigDir();
: + String persistFileName = dataImporter.getHandlerName();
: +
: + this.configDir = configDir;
: + if (persistFileName != null) {
: + persistFilename = persistFileName + ".properties";
: + }
: + }
: +
: + private File getPersistFile() {
: + String filePath = configDir;
: + if (configDir != null && !configDir.endsWith(File.separator)) filePath += File.separator;
: + filePath += persistFilename;
: + return new File(filePath);
: + }
: +
: + public boolean isWritable() {
: + File persistFile = getPersistFile();
: + return persistFile.exists() ? persistFile.canWrite() : persistFile
: + .getParentFile().canWrite();
: +
: + }
: +
: + public void persist(Properties p) {
: + OutputStream propOutput = null;
: +
: + Properties props = readIndexerProperties();
: +
: + try {
: + props.putAll(p);
: + String filePath = configDir;
: + if (configDir != null && !configDir.endsWith(File.separator)) filePath += File.separator;
: + filePath += persistFilename;
: + propOutput = new FileOutputStream(filePath);
: + props.store(propOutput, null);
: + log.info("Wrote last indexed time to " + persistFilename);
: + } catch (Exception e) {
: + throw new DataImportHandlerException(DataImportHandlerException.SEVERE,
: + "Unable to persist Index Start Time", e);
: + } finally {
: + try {
: + if (propOutput != null) propOutput.close();
: + } catch (IOException e) {
: + propOutput = null;
: + }
: + }
: + }
: +
: + public Properties readIndexerProperties() {
: + Properties props = new Properties();
: + InputStream propInput = null;
: +
: + try {
: + propInput = new FileInputStream(configDir + persistFilename);
: + props.load(propInput);
: + log.info("Read " + persistFilename);
: + } catch (Exception e) {
: + log.warn("Unable to read: " + persistFilename);
: + } finally {
: + try {
: + if (propInput != null) propInput.close();
: + } catch (IOException e) {
: + propInput = null;
: + }
: + }
: +
: + return props;
: + }
: +
: +}
:
:
:
-Hoss
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@lucene.apache.org
For additional commands, e-mail: dev-help@lucene.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@lucene.apache.org
For additional commands, e-mail: dev-help@lucene.apache.org