You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stanbol.apache.org by rw...@apache.org on 2012/04/25 07:12:15 UTC
svn commit: r1330107 [1/2] - in /incubator/stanbol/trunk: demos/ehealth/
entityhub/generic/servicesapi/src/main/java/org/apache/stanbol/entityhub/servicesapi/defaults/
entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/
entityh...
Author: rwesten
Date: Wed Apr 25 05:12:14 2012
New Revision: 1330107
URL: http://svn.apache.org/viewvc?rev=1330107&view=rev
Log:
Implementation of STANBOL-590, STANBOL-591, STANBOL-592 and STANBOL-593
In addition:
* added the new components in the documentation of the genericrdf indexer
* added Musicbrainz related namespaces to the Entityhub NamespaceEnum
* Entityhub LDPath default configuration is now public to allow usage also for non Entityhub LDPath instances (the namespace configuration are generally useful)
* Urify: A new command line utility that converts bNodes to URNs
* This is important when importing Datasets with millions of bNodes, because during import those need to be kept in memory - what may hinder import of those datasets to JenaTDB
Added:
incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/Urify.java (with props)
incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/processor/LdpathPostProcessor.java (with props)
incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/processor/LdpathSourceProcessor.java (with props)
incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/source/YardEntityDataProvider.java (with props)
incubator/stanbol/trunk/entityhub/indexing/source/jenatdb/src/main/java/org/apache/stanbol/entityhub/indexing/source/jenatdb/AbstractTdbBackend.java (with props)
incubator/stanbol/trunk/entityhub/indexing/source/jenatdb/src/main/java/org/apache/stanbol/entityhub/indexing/source/jenatdb/Constants.java (with props)
incubator/stanbol/trunk/entityhub/indexing/source/jenatdb/src/main/java/org/apache/stanbol/entityhub/indexing/source/jenatdb/ResourceFilterIterator.java (with props)
incubator/stanbol/trunk/entityhub/indexing/source/jenatdb/src/main/java/org/apache/stanbol/entityhub/indexing/source/jenatdb/Utils.java (with props)
Modified:
incubator/stanbol/trunk/demos/ehealth/README.md
incubator/stanbol/trunk/entityhub/generic/servicesapi/src/main/java/org/apache/stanbol/entityhub/servicesapi/defaults/NamespaceEnum.java
incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/Indexer.java
incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/IndexerFactory.java
incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/config/IndexingConfig.java
incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/config/IndexingConstants.java
incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/FinishedEntityDaemon.java
incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/IndexerImpl.java
incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/processor/FieldValueFilter.java
incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/processor/LdpathProcessor.java
incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/source/LineBasedEntityIterator.java
incubator/stanbol/trunk/entityhub/indexing/destination/solryard/src/main/java/org/apache/stanbol/entityhub/indexing/destination/solryard/SolrYardIndexingDestination.java
incubator/stanbol/trunk/entityhub/indexing/genericrdf/README.md
incubator/stanbol/trunk/entityhub/indexing/genericrdf/src/main/resources/indexing/config/entityTypes.properties
incubator/stanbol/trunk/entityhub/indexing/genericrdf/src/main/resources/indexing/config/indexing.properties
incubator/stanbol/trunk/entityhub/indexing/source/jenatdb/pom.xml
incubator/stanbol/trunk/entityhub/indexing/source/jenatdb/src/main/java/org/apache/stanbol/entityhub/indexing/source/jenatdb/RdfIndexingSource.java
incubator/stanbol/trunk/entityhub/ldpath/src/main/java/org/apache/stanbol/entityhub/ldpath/EntityhubLDPath.java
incubator/stanbol/trunk/entityhub/ldpath/src/main/java/org/apache/stanbol/entityhub/ldpath/backend/SingleRepresentationBackend.java
incubator/stanbol/trunk/parent/pom.xml
Modified: incubator/stanbol/trunk/demos/ehealth/README.md
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/demos/ehealth/README.md?rev=1330107&r1=1330106&r2=1330107&view=diff
==============================================================================
--- incubator/stanbol/trunk/demos/ehealth/README.md (original)
+++ incubator/stanbol/trunk/demos/ehealth/README.md Wed Apr 25 05:12:14 2012
@@ -260,7 +260,7 @@ Sider, Drugbank and Dailymed are interli
sideEffect = (owl:sameAs)+/sider:sideEffect/rdfs:label;
genericName = (owl:sameAs)+/drugbank:genericName;
- key = (owl:sameAs)+/drugbank:inchiKey;
+ inchiKey = (owl:sameAs)+/drugbank:inchiKey;
indication = (owl:sameAs)+/drugbank:indication;
foodInteraction = (owl:sameAs)+/drugbank:foodInteraction;
toxicity = (owl:sameAs)+/drugbank:toxicity;
Modified: incubator/stanbol/trunk/entityhub/generic/servicesapi/src/main/java/org/apache/stanbol/entityhub/servicesapi/defaults/NamespaceEnum.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/generic/servicesapi/src/main/java/org/apache/stanbol/entityhub/servicesapi/defaults/NamespaceEnum.java?rev=1330107&r1=1330106&r2=1330107&view=diff
==============================================================================
--- incubator/stanbol/trunk/entityhub/generic/servicesapi/src/main/java/org/apache/stanbol/entityhub/servicesapi/defaults/NamespaceEnum.java (original)
+++ incubator/stanbol/trunk/entityhub/generic/servicesapi/src/main/java/org/apache/stanbol/entityhub/servicesapi/defaults/NamespaceEnum.java Wed Apr 25 05:12:14 2012
@@ -123,7 +123,28 @@ public enum NamespaceEnum {
/**
* The Music Ontology (http://musicontology.com/)
*/
- mo("http://purl.org/ontology/mo/")
+ mo("http://purl.org/ontology/mo/"),
+ /**
+ * The Time ontology (http://www.w3.org/TR/owl-time/)
+ */
+ owlTime("owl-time","http://www.w3.org/2006/time#"),
+ /**
+ * The Event ontology (http://purl.org/NET/c4dm/event.owl#)
+ */
+ event("http://purl.org/NET/c4dm/event.owl#"),
+ /**
+ * The Timeline ontology (http://purl.org/NET/c4dm/timeline.owl#)
+ */
+ timeline("http://purl.org/NET/c4dm/timeline.owl#"),
+ /**
+ * Relationship: A vocabulary for describing relationships between people
+ * (http://purl.org/vocab/relationship/)
+ */
+ rel("http://purl.org/vocab/relationship/"),
+ /**
+ * Expression of Core FRBR Concepts in RDF (http://vocab.org/frbr/core)
+ */
+ frbr("http://purl.org/vocab/frbr/core#")
;
/**
* The logger
Added: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/Urify.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/Urify.java?rev=1330107&view=auto
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/Urify.java (added)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/Urify.java Wed Apr 25 05:12:14 2012
@@ -0,0 +1,302 @@
+package org.apache.stanbol.entityhub.indexing;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.PosixParser;
+import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
+import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.commons.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Implemented to allow importing the Musicbrainz dump to Jena TDB.<p>
+ *
+ * The problem is that RDF dumps with blank nodes do require to store a
+ * lookup table for the black nodes IDs during import.<p> In case a dump
+ * contains millions of such nodes this table does no longer fit into
+ * memory. This makes importing Dumps to with the Jena RDF parser impossible.
+ * <p>
+ * This Utility can replaces nodes that start with "_:{id}" with
+ * "<{prefix}{id}>. The prefix must be set with the "-p" parameter.
+ * <p>
+ * This tool supports "gz" and "bz2" compressed files. If the output will use
+ * the same compression as the input. It uses two threads for "reading/processing"
+ * and "writing". It could also process multiple files in parallel. However this
+ * feature is not yet activated as the Musicbrainz dump comes in a single file.
+ *
+ * @author Rupert Westenthaler
+ *
+ */
+public class Urify implements Runnable{
+
+ private static final String EOF_INDICATOR = "__EOF_INDICATOR";
+
+ private static Logger log = LoggerFactory.getLogger(Urify.class);
+
+ private static final Options options;
+ static {
+ options = new Options();
+ options.addOption("h", "help", false, "display this help and exit");
+ options.addOption("p","prefix",true,
+ "The URI prefix used for wrapping the bNode Id");
+ options.addOption("e","encoding",true, "the char encodinf (default: UTF-8)");
+ }
+ /**
+ * @param args
+ * @throws ParseException
+ */
+ public static void main(String[] args) throws IOException, ParseException {
+ CommandLineParser parser = new PosixParser();
+ CommandLine line = parser.parse(options, args);
+ args = line.getArgs();
+ if(!line.hasOption('p')){
+ log.error("Missing parameter 'prefix' ('p)!");
+ System.exit(1);
+ }
+ String prefix = "<"+line.getOptionValue('p');
+ log.info("prfix: {} ",line.getOptionValue('p'));
+ Charset charset;
+ if(line.hasOption('e')){
+ charset = Charset.forName(line.getOptionValue('e'));
+ if(charset == null){
+ log.error("Unsupported encoding '{}'!",line.getOptionValue('e'));
+ System.exit(1);
+ }
+ } else {
+ charset = Charset.forName("UTF-8");
+ }
+ log.info("charset: {} ",charset.name());
+ Urify urify = new Urify(Arrays.asList(args), charset, prefix);
+ urify.run(); //TODO: this could support processing multiple files in parallel
+ }
+
+ private final Charset charset;
+ private final String prefix;
+ protected long start = System.currentTimeMillis();
+ protected long uf_count = 0;
+
+ private List<String> resources;
+
+ public Urify(List<String> resources, Charset charset, String prefix) throws IOException {
+ this.charset = charset;
+ this.prefix = prefix;
+ this.resources = Collections.synchronizedList(new ArrayList<String>(resources));
+ }
+
+ public void run() {
+ String source;
+ do {
+ synchronized (resources) {
+ if(resources.isEmpty()){
+ source = null;
+ } else {
+ source = resources.remove(0);
+ try {
+ urify(source);
+ } catch (Exception e) {
+ log.error("Unable to Urify "+resources,e);
+ }
+ }
+ }
+ } while (source != null);
+ }
+ private void urify(String resource) throws IOException {
+ File source = new File(resource);
+ if(source.isFile()){
+ String path = FilenameUtils.getFullPathNoEndSeparator(resource);
+ String name = FilenameUtils.getName(resource);
+ File target = new File(path,"uf_"+name);
+ int i=0;
+ while(target.exists()){
+ i++;
+ target = new File(path,"uf"+i+"_"+name);
+ }
+ InputStream is = new FileInputStream(source);
+ OutputStream os = new FileOutputStream(target);
+ log.info("Resource: {}",resource);
+ log.info("Target : {}",target);
+ if ("gz".equalsIgnoreCase(FilenameUtils.getExtension(name))) {
+ is = new GZIPInputStream(is);
+ os = new GZIPOutputStream(os);
+ name = FilenameUtils.removeExtension(name);
+ log.debug(" - from GZIP Archive");
+ } else if ("bz2".equalsIgnoreCase(FilenameUtils.getExtension(name))) {
+ is = new BZip2CompressorInputStream(is);
+ os = new BZip2CompressorOutputStream(os);
+ name = FilenameUtils.removeExtension(name);
+ log.debug(" - from BZip2 Archive");
+ }// TODO: No Zip File support
+ //else no complression
+ BlockingQueue<String> queue = new ArrayBlockingQueue<String>(1000);
+ ReaderDaemon reader = new ReaderDaemon(new BufferedReader(new InputStreamReader(is, charset)), queue);
+ WriterDaemon writer = new WriterDaemon(
+ new BufferedWriter(new OutputStreamWriter(os, charset)), queue);
+ Thread readerDaemon = new Thread(reader,name+" reader");
+ Thread writerDaemon = new Thread(writer,name+" writer");
+ readerDaemon.setDaemon(true);
+ writerDaemon.setDaemon(true);
+ writerDaemon.start();
+ readerDaemon.start();
+ Object notifier = writer.getNotifier();
+ synchronized (notifier) { //wait until processed
+ if(!writer.completed()){
+ try {
+ notifier.wait();
+ } catch (InterruptedException e) {/*ignore*/}
+ }
+ }
+ if(reader.getError() != null){
+ throw new IOException("Error while reading source "+source,reader.getError());
+ }
+ if(writer.getError() != null){
+ throw new IOException("Error while writing resource "+target,writer.getError());
+ }
+ log.info(" ... completed resource {}",resource);
+ } else {
+ throw new FileNotFoundException("Parsed File "+resource+" does not exist or is not a File!");
+ }
+ }
+
+ private class ReaderDaemon implements Runnable {
+ private final BufferedReader reader;
+ private final BlockingQueue<String> queue;
+
+ private Exception error;
+
+ protected ReaderDaemon(BufferedReader reader, BlockingQueue<String> queue){
+ this.reader = reader;
+ this.queue = queue;
+ }
+
+
+ public Exception getError() {
+ return error;
+ }
+
+
+ @Override
+ public void run() {
+ String triple;
+ try {
+ while((triple = reader.readLine()) != null){
+ StringBuilder sb = new StringBuilder();
+ for(String node : triple.split(" ")){
+ if(node.startsWith("_:")){ //convert to uri
+ sb.append(prefix);
+ sb.append(node.substring(2,node.length()));
+ sb.append("> ");
+ uf_count++;
+ } else {
+ sb.append(node);
+ if(node.length()>1){
+ //the final '.' is also a node
+ sb.append(" ");
+ }
+ }
+ }
+ queue.put(sb.toString());
+ }
+ } catch (IOException e) {
+ error = e;
+ } catch (InterruptedException e) {
+ error = e;
+ } finally {
+ IOUtils.closeQuietly(reader);
+ try {
+ queue.put(EOF_INDICATOR); //indicates finished
+ } catch (InterruptedException e) {
+ log.error("Unable to put EOF to queue!",e);
+ }
+ }
+ }
+ }
+
+ private class WriterDaemon implements Runnable {
+ private final BufferedWriter writer;
+ private final BlockingQueue<String> queue;
+ private final Object notifier = new Object();
+ private boolean completed = false;
+
+ private Exception error;
+
+ protected WriterDaemon(BufferedWriter writer, BlockingQueue<String> queue){
+ this.writer = writer;
+ this.queue = queue;
+ }
+
+ public Exception getError() {
+ return error;
+ }
+
+ public Object getNotifier() {
+ return notifier;
+ }
+
+ public boolean completed() {
+ return completed;
+ }
+
+ @Override
+ public void run() {
+ String triple;
+ long count = 0;
+ boolean first = true;
+ try {
+ while(!EOF_INDICATOR.equals((triple = queue.take()))){
+ if(count % 1000000 == 0){
+ //NOTE: urified will not be correct as it is counted
+ // by an other thread, but for logging ...
+ long end = System.currentTimeMillis();
+ log.info("processed {} | urified: {} (batch: {}sec)",
+ new Object[]{count,uf_count,((double)(end-start))/1000});
+ start = end;
+ }
+ count++;
+ if(first){
+ first = false;
+ } else {
+ writer.write("\n");
+ }
+ writer.write(triple);
+
+ }
+ } catch (InterruptedException e) {
+ this.error = e;
+ } catch (IOException e) {
+ this.error = e;
+ } finally {
+ IOUtils.closeQuietly(writer);
+ this.completed = true;
+ synchronized (notifier) {
+ notifier.notifyAll();
+ }
+ }
+ }
+ }
+}
Propchange: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/Urify.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/Indexer.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/Indexer.java?rev=1330107&r1=1330106&r2=1330107&view=diff
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/Indexer.java (original)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/Indexer.java Wed Apr 25 05:12:14 2012
@@ -16,6 +16,7 @@
*/
package org.apache.stanbol.entityhub.indexing.core;
+import org.apache.stanbol.entityhub.servicesapi.model.Representation;
import org.apache.stanbol.entityhub.servicesapi.yard.Yard;
/**
@@ -56,6 +57,14 @@ public interface Indexer {
*/
INDEXED,
/**
+ * while post-processing of the indexed entities is performed
+ */
+ POSTPROCESSING,
+ /**
+ * after the post-processing of the entities has finished
+ */
+ POSTPROCESSED,
+ /**
* While the {@link IndexingTarget} is finalising the indexing process.
*/
FINALISING,
@@ -101,7 +110,7 @@ public interface Indexer {
* This Method is intended to be used by caller that need more control over
* the indexing process as simple to call {@link #index()}.
*/
- void initialiseIndexingSources();
+ void initialiseIndexing();
/**
* Brings this indexer in the {@link State#INDEXED} by indexing all Entities
* provided by the {@link IndexingComponent}s. This method blocks until the
@@ -113,7 +122,19 @@ public interface Indexer {
* @throws IllegalStateException if {@link #getState()} <
* {@link State#INITIALISED}
*/
- void indexAllEntities() throws IllegalStateException;
+ void indexEntities() throws IllegalStateException;
+ /**
+ * Performs {@link State#POSTPROCESSING} after all entities are
+ * {@link State#INDEXED} ({@link #indexEntities()} completed).<p>
+ * Post-processing will use the {@link IndexingDestination} as source and
+ * target. It will retrieve the {@link Representation} of each indexed
+ * entity and sent it to the configured post-processing
+ * {@link EntityProcessor}s<p>
+ * The resulting {@link Representation} will be stored to the
+ * {@link IndexingDestination}
+ * @throws IllegalStateException if the state is < {@link State#INDEXED}
+ */
+ void postProcessEntities() throws IllegalStateException;
/**
* {@link State#FINISHED Finalises} the indexing process by calling finalise
* on the {@link IndexingDestination}. This method blocks until the
@@ -125,13 +146,13 @@ public interface Indexer {
* @throws IllegalStateException if {@link #getState()} <
* {@link State#INDEXED}
*/
- void finaliseIndexingTarget() throws IllegalStateException;
+ void finaliseIndexing() throws IllegalStateException;
/**
* Initialise the {@link IndexingComponent}s, indexes all entities and
* finalises the {@link IndexingDestination}. <p>
* Calls to this method do have the same result as subsequent calls to
- * {@link #initialiseIndexingSources()}, {@link #indexAllEntities()},
- * {@link #finaliseIndexingTarget()}. This method can also be used if any of
+ * {@link #initialiseIndexing()}, {@link #indexEntities()},
+ * {@link #finaliseIndexing()}. This method can also be used if any of
* the mentioned three methods was already called to this indexer instance.
* <p>
* This method blocks until the whole process is completed. Ideal if the
Modified: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/IndexerFactory.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/IndexerFactory.java?rev=1330107&r1=1330106&r2=1330107&view=diff
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/IndexerFactory.java (original)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/IndexerFactory.java Wed Apr 25 05:12:14 2012
@@ -16,6 +16,10 @@
*/
package org.apache.stanbol.entityhub.indexing.core;
+import static org.apache.stanbol.entityhub.indexing.core.config.IndexingConfig.DEFAULT_INDEXED_ENTITIES_ID_FILE_NAME;
+
+import java.io.File;
+import java.io.IOException;
import java.util.List;
import org.apache.stanbol.entityhub.indexing.core.config.IndexingConfig;
@@ -100,12 +104,16 @@ public class IndexerFactory {
"'{}' in the indexing.properties within the directory {}",
IndexingConstants.KEY_ENTITY_PROCESSOR,config.getConfigFolder());
}
+ List<EntityProcessor> postProcessors = config.getEntityPostProcessors();
log.info("Present Source Configuration:");
log.info(" - EntityDataIterable: {}",dataIterable);
log.info(" - EntityIterator: {}",idIterator);
log.info(" - EntityDataProvider: {}",dataProvider);
log.info(" - EntityScoreProvider: {}",scoreProvider);
log.info(" - EntityProcessors ({}):",processors.size());
+ if(postProcessors != null){
+ log.info(" - EntityPostProcessors ({}):",postProcessors.size());
+ }
int i=0;
for(EntityProcessor processor : processors){
i++;
@@ -114,11 +122,13 @@ public class IndexerFactory {
if(dataIterable != null && scoreProvider != null){
// iterate over data and lookup scores
indexer = new IndexerImpl(dataIterable, scoreProvider,
- config.getNormaliser(),destination, processors);
+ config.getNormaliser(),destination, processors,
+ config.getIndexedEntitiesIdsFile(),postProcessors);
} else if(idIterator != null && dataProvider != null){
// iterate over id and lookup data
indexer = new IndexerImpl(idIterator,dataProvider,
- config.getNormaliser(),destination, processors);
+ config.getNormaliser(),destination, processors,
+ config.getIndexedEntitiesIdsFile(),postProcessors);
} else if(dataIterable != null && idIterator != null){
// create an EntityIterator to EntityScoreProvider adapter
log.info(
@@ -128,7 +138,8 @@ public class IndexerFactory {
idIterator.getClass(), dataIterable.getClass());
indexer = new IndexerImpl(dataIterable,
new EntityIneratorToScoreProviderAdapter(idIterator),
- config.getNormaliser(),destination, processors);
+ config.getNormaliser(),destination, processors,
+ config.getIndexedEntitiesIdsFile(),postProcessors);
} else {
log.error("Invalid Indexing Source configuration: ");
log.error(" - To iterate over the data and lookup scores one need to " +
@@ -143,12 +154,42 @@ public class IndexerFactory {
public Indexer create(EntityIterator idIterator, EntityDataProvider dataProvider,
ScoreNormaliser normaliser,
List<EntityProcessor> processors, IndexingDestination destination){
- return new IndexerImpl(idIterator, dataProvider, normaliser,destination, processors);
+ return new IndexerImpl(idIterator, dataProvider, normaliser,destination, processors,null,null);
+ }
+ public Indexer create(EntityIterator idIterator, EntityDataProvider dataProvider,
+ ScoreNormaliser normaliser,
+ List<EntityProcessor> processors, List<EntityProcessor> postProcessors,
+ IndexingDestination destination){
+ File tmp;
+ try {
+ tmp = File.createTempFile("ind-ent-ids","zip");
+ tmp.deleteOnExit();
+ } catch (IOException e) {
+ throw new IllegalStateException("Unable to create temporary file for storing the" +
+ "indexed Entity IDs",e);
+ }
+ return new IndexerImpl(idIterator, dataProvider, normaliser,destination, processors,
+ tmp,postProcessors);
}
public Indexer create(EntityDataIterable dataIterable,EntityScoreProvider scoreProvider,
ScoreNormaliser normaliser,
List<EntityProcessor> processors, IndexingDestination destination){
- return new IndexerImpl(dataIterable, scoreProvider, normaliser,destination, processors);
+ return new IndexerImpl(dataIterable, scoreProvider, normaliser,destination, processors,null,null);
+ }
+ public Indexer create(EntityDataIterable dataIterable,EntityScoreProvider scoreProvider,
+ ScoreNormaliser normaliser,
+ List<EntityProcessor> processors, List<EntityProcessor> postProcessors,
+ IndexingDestination destination){
+ File tmp;
+ try {
+ tmp = File.createTempFile("ind-ent-ids","zip");
+ tmp.deleteOnExit();
+ } catch (IOException e) {
+ throw new IllegalStateException("Unable to create temporary file for storing the" +
+ "indexed Entity IDs",e);
+ }
+ return new IndexerImpl(dataIterable, scoreProvider, normaliser,destination, processors,
+ tmp,postProcessors);
}
}
Modified: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/config/IndexingConfig.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/config/IndexingConfig.java?rev=1330107&r1=1330106&r2=1330107&view=diff
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/config/IndexingConfig.java (original)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/config/IndexingConfig.java Wed Apr 25 05:12:14 2012
@@ -20,6 +20,7 @@ import static org.apache.stanbol.entityh
import static org.apache.stanbol.entityhub.indexing.core.config.IndexingConstants.KEY_ENTITY_DATA_ITERABLE;
import static org.apache.stanbol.entityhub.indexing.core.config.IndexingConstants.KEY_ENTITY_DATA_PROVIDER;
import static org.apache.stanbol.entityhub.indexing.core.config.IndexingConstants.KEY_ENTITY_ID_ITERATOR;
+import static org.apache.stanbol.entityhub.indexing.core.config.IndexingConstants.KEY_ENTITY_POST_PROCESSOR;
import static org.apache.stanbol.entityhub.indexing.core.config.IndexingConstants.KEY_ENTITY_PROCESSOR;
import static org.apache.stanbol.entityhub.indexing.core.config.IndexingConstants.KEY_ENTITY_SCORE_PROVIDER;
import static org.apache.stanbol.entityhub.indexing.core.config.IndexingConstants.KEY_INDEXING_DESTINATION;
@@ -84,6 +85,8 @@ public class IndexingConfig {
private static final Logger log = LoggerFactory.getLogger(IndexingConfig.class);
private static final String DEFAULT_INDEX_FIELD_CONFIG_FILE_NAME = "indexFieldConfig.txt";
+ public static final String DEFAULT_INDEXED_ENTITIES_ID_FILE_NAME = "indexed-entities-ids.zip";
+
/**
* This stores the context within the classpath to initialise missing
* configurations and source based on the defaults in the classpath.
@@ -184,6 +187,14 @@ public class IndexingConfig {
*/
private List<EntityProcessor> entityProcessor = null;
/**
+ * The {@link EntityProcessor}s initialised based on the value
+ * of the {@link IndexingConstants#KEY_ENTITY_POST_PROCESSOR} key or
+ * <code>null</code> if not configured.
+ * This variable uses lazy initialisation
+ * @see #getEntityProcessor()
+ */
+ private List<EntityProcessor> entityPostProcessor = null;
+ /**
* The {@link IndexingDestination} instance initialised based on the value
* of the {@link IndexingConstants#KEY_INDEXING_DESTINATION} key or
* <code>null</code> if not configured.
@@ -837,6 +848,40 @@ public class IndexingConfig {
return null;
}
}
+ /**
+ * Getter for the {@link EntityProcessor}s configured to be used for
+ * post-processing or <code>null</code> if none.
+ * @return
+ */
+ public List<EntityProcessor> getEntityPostProcessors(){
+ if(entityPostProcessor != null){
+ return entityPostProcessor;
+ } else if(configuration.containsKey(KEY_ENTITY_POST_PROCESSOR)){
+ List<ConfigEntry> configs = parseConfigEntries(configuration.get(KEY_ENTITY_POST_PROCESSOR).toString());
+ List<EntityProcessor> postProcessorList = new ArrayList<EntityProcessor>(configs.size());
+ for(ConfigEntry config : configs){
+ EntityProcessor postProcessor;
+ try {
+ postProcessor = (EntityProcessor)Class.forName(config.getClassName()).newInstance();
+ } catch (Exception e) {
+ throw new IllegalArgumentException("Invalid EntityProcessor configuration '"+config.getConfigString()+"' for post-processing!",e);
+ }
+ //add the configuration
+ Map<String,Object> configMap = getComponentConfig(config, postProcessor.getClass().getSimpleName(), false);
+ //add also the directly provided parameters
+ configMap.putAll(config.getParams());
+ postProcessor.setConfiguration(configMap);
+ postProcessorList.add(postProcessor);
+ }
+ if(!postProcessorList.isEmpty()){ //do not set empty lists
+ entityPostProcessor = Collections.unmodifiableList(postProcessorList);
+ }
+ return entityPostProcessor;
+ } else {
+ return null;
+ }
+ }
+
public IndexingDestination getIndexingDestination() {
if(indexingDestination != null){
return indexingDestination;
@@ -857,6 +902,16 @@ public class IndexingConfig {
return null;
}
}
+ public File getIndexedEntitiesIdsFile(){
+ Object value = configuration.get(IndexingConstants.KEX_INDEXED_ENTITIES_FILE);
+ if(value == null){
+ return new File(getDestinationFolder(),DEFAULT_INDEXED_ENTITIES_ID_FILE_NAME);
+ } else if (value.toString().isEmpty()){
+ return null; //deactivate this feature;
+ } else {
+ return new File(getDestinationFolder(),value.toString());
+ }
+ }
private void initNormaliser() {
Object value = configuration.get(IndexingConstants.KEY_SCORE_NORMALIZER);
Modified: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/config/IndexingConstants.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/config/IndexingConstants.java?rev=1330107&r1=1330106&r2=1330107&view=diff
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/config/IndexingConstants.java (original)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/config/IndexingConstants.java Wed Apr 25 05:12:14 2012
@@ -34,6 +34,11 @@ public interface IndexingConstants {
String KEY_INDEXING_DESTINATION = "indexingDestination";
String KEY_INDEX_FIELD_CONFIG = "fieldConfiguration";
/**
+ * Name of the file relative to the destination folder used to store
+ * the IDs of indexed Entities.
+ */
+ String KEX_INDEXED_ENTITIES_FILE = "indexedEntitiesFile";
+ /**
* usage:<br>
* <pre>
* {class1},name:{name1};{class2},name:{name2};...
@@ -43,4 +48,5 @@ public interface IndexingConstants {
*/
String KEY_SCORE_NORMALIZER = "scoreNormalizer";
String KEY_ENTITY_PROCESSOR = "entityProcessor";
+ String KEY_ENTITY_POST_PROCESSOR = "entityPostProcessor";
}
Modified: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/FinishedEntityDaemon.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/FinishedEntityDaemon.java?rev=1330107&r1=1330106&r2=1330107&view=diff
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/FinishedEntityDaemon.java (original)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/FinishedEntityDaemon.java Wed Apr 25 05:12:14 2012
@@ -21,8 +21,15 @@ import static org.apache.stanbol.entityh
import static org.apache.stanbol.entityhub.indexing.core.impl.IndexerConstants.SOURCE_STARTED;
import static org.apache.stanbol.entityhub.indexing.core.impl.IndexerConstants.STORE_DURATION;
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.FileOutputStream;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.nio.charset.Charset;
import java.util.concurrent.BlockingQueue;
+import org.apache.commons.io.IOUtils;
import org.apache.stanbol.entityhub.servicesapi.model.Representation;
import org.slf4j.Logger;
@@ -57,11 +64,20 @@ public class FinishedEntityDaemon extend
private long countedAll;
private long countedMajor;
private long countedMinor;
+ /**
+ * Allows to write finished ids to a file. one ID per line
+ */
+ private final BufferedWriter idWriter;
+ /**
+ * The charset used for the {@link #idWriter}
+ */
+ private static final Charset UTF8 = Charset.forName("UTF-8");
public FinishedEntityDaemon(BlockingQueue<QueueItem<Representation>> consume,
int majorInterval,
- Logger out) {
+ Logger out,
+ OutputStream idOut) {
super("Indexing: Finished Entity Logger Deamon",
IndexerConstants.SEQUENCE_NUMBER_FINISHED_DAEMON,
consume, null, null);
@@ -72,6 +88,11 @@ public class FinishedEntityDaemon extend
this.major = DEFAULT_MAJOR_INTERVAL;
}
this.minor = major/10;
+ if(idOut != null){
+ this.idWriter = new BufferedWriter(new OutputStreamWriter(idOut, UTF8));
+ } else {
+ this.idWriter = null;
+ }
}
@Override
@@ -85,6 +106,17 @@ public class FinishedEntityDaemon extend
while(!isQueueFinished()){
QueueItem<Representation> item = consume();
if(item != null){
+ if(idWriter != null && item.getItem() != null){
+ String id = item.getItem().getId();
+ try {
+ if(count != 0){
+ idWriter.newLine();
+ }
+ idWriter.write(id);
+ } catch (Exception e){
+ log.error("Exception while logging ID of indexed Entity '"+id+"'!",e);
+ }
+ }
current = System.currentTimeMillis();
if(count == 0){
start = System.currentTimeMillis(); //default for the start!
@@ -154,6 +186,7 @@ public class FinishedEntityDaemon extend
}
}
printSummary(current);
+ IOUtils.closeQuietly(idWriter);
setFinished();
}
Modified: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/IndexerImpl.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/IndexerImpl.java?rev=1330107&r1=1330106&r2=1330107&view=diff
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/IndexerImpl.java (original)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/IndexerImpl.java Wed Apr 25 05:12:14 2012
@@ -18,18 +18,35 @@ package org.apache.stanbol.entityhub.ind
import static org.apache.stanbol.entityhub.indexing.core.impl.IndexerConstants.INDEXING_COMPLETED_QUEUE_ITEM;
import static org.apache.stanbol.entityhub.indexing.core.impl.IndexerConstants.SCORE_FIELD;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+import java.util.zip.ZipOutputStream;
+import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
+import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.commons.io.IOUtils;
import org.apache.stanbol.entityhub.indexing.core.EntityDataIterable;
import org.apache.stanbol.entityhub.indexing.core.EntityDataIterator;
import org.apache.stanbol.entityhub.indexing.core.EntityDataProvider;
@@ -48,6 +65,8 @@ import org.apache.stanbol.entityhub.inde
import org.apache.stanbol.entityhub.indexing.core.impl.IndexingSourceInitialiser.IndexingSourceInitialiserListener;
import org.apache.stanbol.entityhub.indexing.core.normaliser.ScoreNormaliser;
import org.apache.stanbol.entityhub.indexing.core.processor.EmptyProcessor;
+import org.apache.stanbol.entityhub.indexing.core.source.LineBasedEntityIterator;
+import org.apache.stanbol.entityhub.indexing.core.source.YardEntityDataProvider;
import org.apache.stanbol.entityhub.servicesapi.model.Representation;
import org.apache.stanbol.entityhub.servicesapi.yard.Yard;
import org.slf4j.Logger;
@@ -80,23 +99,6 @@ public class IndexerImpl implements Inde
private int chunkSize;
public static final int MIN_QUEUE_SIZE = 500;
- /**
- * Queue used to add Entities read from the IndexingSource(s). This queue
- * is consumed by the {@link EntityProcessorRunnable}.
- */
- private BlockingQueue<QueueItem<Representation>> indexedEntityQueue;
- /**
- * Queue used to add processed Entities. This queue is consumed by the
- * {@link EntityPersisterRunnable}.
- */
- private BlockingQueue<QueueItem<Representation>> processedEntityQueue;
- /**
- * Queue used to add finished Entities. Mainly used for counting and
- * logging
- */
- private BlockingQueue<QueueItem<Representation>> finishedEntityQueue;
-
- private BlockingQueue<QueueItem<IndexingError>> errorEntityQueue;
private boolean indexAllEntitiesState = false;
@@ -114,13 +116,29 @@ public class IndexerImpl implements Inde
private State state = State.UNINITIALISED;
private final Object stateSync = new Object();
+ /**
+ * The file to store the IDs of indexed entities. Might be <code>null</code>
+ * if {@link #entityPostProcessors} is <code>null</code>.
+ */
+ private File indexedEntityIdFile;
+ /**
+ * The {@link EntityProcessor}s used for the post processing phase or
+ * <code>null</code> if none.<p>
+ * If NOT <code>null</code> {@link #indexedEntityIdFile} is also
+ * guaranteed to be NET <code>null</code>!
+ */
+ private List<EntityProcessor> entityPostProcessors;
+
+ private OutputStream indexedEntityIdOutputStream;
public IndexerImpl(EntityIterator entityIterator,
EntityDataProvider dataProvider,
ScoreNormaliser normaliser,
IndexingDestination indexingDestination,
- List<EntityProcessor> entityProcessors){
- this(normaliser,indexingDestination,entityProcessors);
+ List<EntityProcessor> entityProcessors,
+ File indexedEntityIdFile,
+ List<EntityProcessor> entityPostProcessors){
+ this(normaliser,indexingDestination,entityProcessors,indexedEntityIdFile,entityPostProcessors);
//set entityMode interfaces
if(entityIterator == null){
throw new IllegalArgumentException("The EntityIterator MUST NOT be NULL!");
@@ -135,8 +153,10 @@ public class IndexerImpl implements Inde
EntityScoreProvider scoreProvider,
ScoreNormaliser normaliser,
IndexingDestination indexingDestination,
- List<EntityProcessor> entityProcessors){
- this(normaliser,indexingDestination,entityProcessors);
+ List<EntityProcessor> entityProcessors,
+ File indexedEntityIdFile,
+ List<EntityProcessor> entityPostProcessors){
+ this(normaliser,indexingDestination,entityProcessors,indexedEntityIdFile,entityPostProcessors);
//deactivate entityMode interfaces
this.entityIterator = null;
if(scoreProvider == null){
@@ -151,7 +171,9 @@ public class IndexerImpl implements Inde
protected IndexerImpl(ScoreNormaliser normaliser,
IndexingDestination indexingDestination,
- List<EntityProcessor> entityProcessors){
+ List<EntityProcessor> entityProcessors,
+ File indexedEntityIdFile,
+ List<EntityProcessor> entityPostProcessors){
if(indexingDestination == null){
throw new IllegalArgumentException("The Yard MUST NOT be NULL!");
}
@@ -167,6 +189,21 @@ public class IndexerImpl implements Inde
indexingComponents.add(indexingDestination);
indexingComponents.addAll(entityProcessors);
listeners = new HashSet<IndexingListener>();
+ this.indexedEntityIdFile = indexedEntityIdFile;
+ if(entityPostProcessors != null){
+ if(entityPostProcessors.isEmpty()){
+ this.entityPostProcessors = null;
+ } else {
+ this.entityPostProcessors = entityPostProcessors;
+ if(indexedEntityIdFile == null){
+ throw new IllegalArgumentException("The file used to store" +
+ "the IDs of indexed Entities MUST NOT be NULL if" +
+ "EntityPostProcessors are defined!");
+ }
+ }
+ } else {
+ this.entityPostProcessors = null;
+ }
}
public boolean addIndexListener(IndexingListener listener){
if(listener != null){
@@ -226,7 +263,7 @@ public class IndexerImpl implements Inde
return indexingDestination.getYard();
}
@Override
- public void initialiseIndexingSources() {
+ public void initialiseIndexing() {
synchronized (stateSync) { //ensure that two threads do not start the
//initialisation at the same time ...
if(getState() != State.UNINITIALISED){
@@ -279,6 +316,20 @@ public class IndexerImpl implements Inde
}
}
}
+ //initialise the stream used to write the ids of indexed entities
+ try {
+ indexedEntityIdOutputStream = getEntityIdFileOutputStream();
+ } catch (IOException e) {
+ if(entityPostProcessors != null){
+ throw new IllegalStateException("Unable to open stream for writing" +
+ "the IDs of indexed Entities as required for post-" +
+ "processing!",e);
+ } else {
+ log.warn("Unable to open stream for writing the Ids of indexed " +
+ "Entities -> indexes entity Ids will not be available!",e);
+ }
+ }
+
log.info("Initialisation completed");
setState(State.INITIALISED);
}
@@ -297,7 +348,9 @@ public class IndexerImpl implements Inde
"Calling this Method is not supported while in State %s! Supported States are ",
state,supportedStates));
}
- initialiseIndexingSources();
+ log.info("start initialisation ...");
+ initialiseIndexing();
+ log.info(" ... initialisation completed");
//if now the state is an unsupported one it indicates that
//initialiseIndexingSources() was called by an other thread before this one!
state = getState();
@@ -306,15 +359,18 @@ public class IndexerImpl implements Inde
"Calling this Method is not supported while in State %s! Supported States are ",
state,supportedStates));
}
- log.info("Start Indexing");
- indexAllEntities();
- log.info("Indexing completed ...");
+ log.info("start indexing ...");
+ indexEntities();
+ log.info(" ... indexing completed");
+ log.info("start post-processing ...");
+ postProcessEntities();
+ log.info(" ... post-processing finished ...");
log.info("start finalisation....");
- finaliseIndexingTarget();
- log.info("Indexing finished!");
+ finaliseIndexing();
+ log.info(" ...finalisation completed");
}
@Override
- public void finaliseIndexingTarget() {
+ public void postProcessEntities() {
synchronized (stateSync) { //ensure that two threads do not start the
//initialisation at the same time ...
State state = getState();
@@ -324,6 +380,153 @@ public class IndexerImpl implements Inde
if(state != State.INDEXED){ //if state > INITIALISED
return; // ignore this call
}
+ setState(State.POSTPROCESSING);
+ log.info("Indexing started ...");
+ }
+ if(entityPostProcessors == null || entityPostProcessors.isEmpty()){
+ setState(State.POSTPROCESSED);
+ return; //nothing to do
+ }
+ //init the post processing components
+ //use an EntityDataProvider based on the indexed data
+ EntityDataProvider dataProvider = new YardEntityDataProvider(indexingDestination.getYard());
+ //use an LineBasedEntityIterator to iterate over the indexed entity ids
+ EntityIterator entityIterator;
+ try {
+ entityIterator = new LineBasedEntityIterator(getEntityIdFileInputStream(),"UTF-8",null);
+ } catch (IOException e) {
+ throw new IllegalStateException("Unable to open file containing the " +
+ "IDs of the indexed Entities!",e);
+ }
+ Map<String,Object> config = new HashMap<String,Object>();
+ config.put(LineBasedEntityIterator.PARAM_ID_POS, 1);
+ config.put(LineBasedEntityIterator.PARAM_SCORE_POS, Integer.MAX_VALUE);
+ entityIterator.setConfiguration(config);
+ //init the post-processors (this time not in an own thread as this
+ //does not really make sense for processors
+ for(EntityProcessor processor : entityPostProcessors){
+ if(processor.needsInitialisation()){
+ processor.initialise();
+ }
+ }
+ //NOTE the destination needs not to be initialised -> it will be the same
+ //as for indexing!
+
+ //initialisation complete ... now setup the poet processing
+ //init the queues
+ int queueSize = Math.max(MIN_QUEUE_SIZE, chunkSize*2);
+ BlockingQueue<QueueItem<Representation>> indexedEntityQueue =
+ new ArrayBlockingQueue<QueueItem<Representation>>(queueSize);
+ BlockingQueue<QueueItem<Representation>> processedEntityQueue =
+ new ArrayBlockingQueue<QueueItem<Representation>>(queueSize);
+ BlockingQueue<QueueItem<Representation>> finishedEntityQueue =
+ new ArrayBlockingQueue<QueueItem<Representation>>(queueSize);
+ BlockingQueue<QueueItem<IndexingError>> errorEntityQueue =
+ new ArrayBlockingQueue<QueueItem<IndexingError>>(queueSize);
+
+ //Set holding all active post processing deamons
+ final SortedSet<IndexingDaemon<?,?>> activeIndexingDeamons =
+ new TreeSet<IndexingDaemon<?,?>>();
+ //create the IndexingDaemos
+ //TODO: Here we would need to create multiple instances in case
+ // one would e.g. like to use several threads for processing entities
+ //(1) the daemon reading from the IndexingSources
+ String entitySourceReaderName = "Post-processing: Entity Reader Deamon";
+ activeIndexingDeamons.add(
+ new EntityIdBasedIndexingDaemon(
+ entitySourceReaderName,
+ indexedEntityQueue, errorEntityQueue,
+ entityIterator,
+ dataProvider,
+ null, //no score normaliser
+ true)); //post-process all indexed entities
+ //(2) The daemon for post-processing the entities
+ activeIndexingDeamons.add(
+ new EntityProcessorRunnable(
+ "Post-processing: Entity Processor Deamon",
+ indexedEntityQueue, //it consumes indexed Entities
+ processedEntityQueue, //it produces processed Entities
+ errorEntityQueue,
+ entityPostProcessors,
+ //TODO: check that the score is not overriden by the NULL
+ // parsed by the used LineBasedEntityIterator!
+ Collections.singleton(SCORE_FIELD))); //ensure the score not changed
+ //(3) The daemon for persisting the entities
+ activeIndexingDeamons.add(
+ new EntityPersisterRunnable(
+ "Indexing: Entity Perstisting Deamon",
+ processedEntityQueue, //it consumes processed Entities
+ finishedEntityQueue, //it produces finished Entities
+ errorEntityQueue,
+ chunkSize, indexingDestination.getYard()));
+ //(4) The daemon for logging finished entities
+ activeIndexingDeamons.add(
+ new FinishedEntityDaemon(
+ finishedEntityQueue, -1, log,
+ null)); //we have already all entity ids!
+ //(5) The daemon for logging errors
+ activeIndexingDeamons.add(
+ new EntityErrorLoggerDaemon(
+ errorEntityQueue, log));
+ //start post-processing and wait until it has finished
+ startAndWait(activeIndexingDeamons);
+
+
+ setState(State.POSTPROCESSED);
+ }
+ /**
+ * Internally used to start the indexing/post-processing daemons and wait
+ * until they have finished.
+ * @param activeIndexingDeamons the deamos to start
+ */
+ private void startAndWait(final SortedSet<IndexingDaemon<?,?>> activeIndexingDeamons) {
+ //We need an listener for the IndexingDaemons we are about to start!
+ final IndexingDaemonListener listener = new IndexingDaemonListener() {
+ @Override
+ public void indexingDaemonFinished(IndexingDaemonEventObject indexingDaemonEventObject) {
+ //looks like one has finished
+ IndexingDaemon<?,?> indexingDaemon = indexingDaemonEventObject.getSource();
+ //handle the finished indexing daemon
+ handleFinishedIndexingDaemon(activeIndexingDeamons, indexingDaemon);
+ //finally remove the listener
+ indexingDaemon.removeIndexingDaemonListener(this);
+
+ }
+ };
+ //now start the IndexingDaemons in their own Threads
+ Set<IndexingDaemon<?,?>> deamonCopy =
+ new HashSet<IndexingDaemon<?,?>>(activeIndexingDeamons);
+ for(IndexingDaemon<?,?> deamon : deamonCopy){
+ deamon.addIndexingDaemonListener(listener); //add the listener
+ Thread thread = new Thread(deamon);// create the thread
+ thread.setDaemon(true); //ensure that the JVM can terminate
+ thread.setName(deamon.getName()); // set the name of the thread
+ thread.start(); //start the Thread
+ }
+ //now we need to wait until all Threads have finished ...
+ while(!activeIndexingDeamons.isEmpty()){
+ synchronized (activeIndexingDeamons) {
+ try {
+ activeIndexingDeamons.wait();
+ } catch (InterruptedException e) {
+ //year ... looks like we are done
+ }
+ }
+ }
+ //done!
+ }
+
+ @Override
+ public void finaliseIndexing() {
+ synchronized (stateSync) { //ensure that two threads do not start the
+ //initialisation at the same time ...
+ State state = getState();
+ if(state.ordinal() < State.POSTPROCESSED.ordinal()){
+ throw new IllegalStateException("The Indexer MUST BE already "+State.POSTPROCESSED+" when calling this Method!");
+ }
+ if(state != State.POSTPROCESSED){ //if state > POSTPROCESSED
+ return; // ignore this call
+ }
setState(State.FINALISING);
log.info("Indexing started ...");
}
@@ -331,7 +534,7 @@ public class IndexerImpl implements Inde
setState(State.FINISHED);
}
@Override
- public void indexAllEntities() {
+ public void indexEntities() {
synchronized (stateSync) { //ensure that two threads do not start the
//initialisation at the same time ...
State state = getState();
@@ -346,10 +549,14 @@ public class IndexerImpl implements Inde
}
//init the queues
int queueSize = Math.max(MIN_QUEUE_SIZE, chunkSize*2);
- indexedEntityQueue = new ArrayBlockingQueue<QueueItem<Representation>>(queueSize);
- processedEntityQueue = new ArrayBlockingQueue<QueueItem<Representation>>(queueSize);
- finishedEntityQueue = new ArrayBlockingQueue<QueueItem<Representation>>(queueSize);
- errorEntityQueue = new ArrayBlockingQueue<QueueItem<IndexingError>>(queueSize);
+ BlockingQueue<QueueItem<Representation>> indexedEntityQueue =
+ new ArrayBlockingQueue<QueueItem<Representation>>(queueSize);
+ BlockingQueue<QueueItem<Representation>> processedEntityQueue =
+ new ArrayBlockingQueue<QueueItem<Representation>>(queueSize);
+ BlockingQueue<QueueItem<Representation>> finishedEntityQueue =
+ new ArrayBlockingQueue<QueueItem<Representation>>(queueSize);
+ BlockingQueue<QueueItem<IndexingError>> errorEntityQueue =
+ new ArrayBlockingQueue<QueueItem<IndexingError>>(queueSize);
//Set holding all active IndexingDaemons
final SortedSet<IndexingDaemon<?,?>> activeIndexingDeamons =
@@ -398,52 +605,21 @@ public class IndexerImpl implements Inde
//(4) The daemon for logging finished entities
activeIndexingDeamons.add(
new FinishedEntityDaemon(
- finishedEntityQueue, -1, log));
+ finishedEntityQueue, -1, log, indexedEntityIdOutputStream));
//(5) The daemon for logging errors
activeIndexingDeamons.add(
new EntityErrorLoggerDaemon(
errorEntityQueue, log));
- //We need an listener for the IndexingDaemons we are about to start!
- final IndexingDaemonListener listener = new IndexingDaemonListener() {
- @Override
- public void indexingDaemonFinished(IndexingDaemonEventObject indexingDaemonEventObject) {
- //looks like one has finished
- IndexingDaemon<?,?> indexingDaemon = indexingDaemonEventObject.getSource();
- //handle the finished indexing daemon
- handleFinishedIndexingDaemon(activeIndexingDeamons, indexingDaemon);
- //finally remove the listener
- indexingDaemon.removeIndexingDaemonListener(this);
-
- }
- };
- //now start the IndexingDaemons in their own Threads
- Set<IndexingDaemon<?,?>> deamonCopy =
- new HashSet<IndexingDaemon<?,?>>(activeIndexingDeamons);
- for(IndexingDaemon<?,?> deamon : deamonCopy){
- deamon.addIndexingDaemonListener(listener); //add the listener
- Thread thread = new Thread(deamon);// create the thread
- thread.setDaemon(true); //ensure that the JVM can terminate
- thread.setName(deamon.getName()); // set the name of the thread
- thread.start(); //start the Thread
- }
- //now we need to wait until all Threads have finished ...
- while(!activeIndexingDeamons.isEmpty()){
- synchronized (activeIndexingDeamons) {
- try {
- activeIndexingDeamons.wait();
- } catch (InterruptedException e) {
- //year ... looks like we are done
- }
- }
- }
+ //start indexing and wait until it has finished
+ startAndWait(activeIndexingDeamons);
//set the new state to INDEXED
setState(State.INDEXED);
}
/**
* Handles the necessary actions if an {@link IndexingDaemon} used for the
- * work done within {@link #indexAllEntities()} completes its work (meaning
+ * work done within {@link #indexEntities()} completes its work (meaning
* it has executed all entities).<p>
- * The parsed SortedSet is created within {@link #indexAllEntities()} and
+ * The parsed SortedSet is created within {@link #indexEntities()} and
* contains all {@link IndexingDaemon}s that have not yet finished. It is
* the responsibility of this method to remove finished
* {@link IndexingDaemon}s from this set.<p>
@@ -543,5 +719,70 @@ public class IndexerImpl implements Inde
public State getState() {
return state;
}
+ /**
+ * Opens a stream to read data from the {@link #indexedEntityIdFile}.
+ * Can only be called in {@link State}s earlier that {@link State#INDEXING}.
+ * @return the stream
+ * @throws IOException on any error while opening the stream
+ * @throws IllegalStateException if {@link #getState()} is later than
+ * {@link State#INITIALISED}
+ */
+ protected OutputStream getEntityIdFileOutputStream() throws IOException {
+ if(indexedEntityIdFile == null){
+ return null;
+ }
+ State state = getState();
+ if(state.ordinal() > State.INITIALISED.ordinal()){
+ throw new IllegalStateException("Opening an OutputStrem to the "
+ + "indexed entity id file '"+indexedEntityIdFile+"' is not "
+ + "allowed for states > "+State.INITIALISED +" (current: "+state+")!");
+ }
+ if(indexedEntityIdFile.isFile()){//exists
+ log.info(" ... delete existing IndexedEntityId file "+indexedEntityIdFile);
+ indexedEntityIdFile.delete(); //delete existing data
+ }
+ //support compression
+ String extension = FilenameUtils.getExtension(indexedEntityIdFile.getName());
+ OutputStream out = new FileOutputStream(indexedEntityIdFile);
+ if("zip".equalsIgnoreCase(extension)){
+ out = new ZipOutputStream(out);
+ ((ZipOutputStream)out).putNextEntry(new ZipEntry("entity-ids.txt"));
+ } else if("gz".equalsIgnoreCase(extension)) {
+ out = new GZIPOutputStream(out);
+ } else if("bz2".equalsIgnoreCase(extension)) {
+ out = new BZip2CompressorOutputStream(out);
+ }
+ return out;
+ }
+ /**
+ * Opens a stream to read data from the {@link #indexedEntityIdFile}.
+ * Can only be called in {@link State}s later that {@link State#INDEXED}.
+ * @return the stream
+ * @throws IOException on any error while creating the stream
+ * @throws IllegalStateException if {@link #getState()} is earlier than
+ * {@link State#INDEXED}
+ */
+ protected InputStream getEntityIdFileInputStream() throws IOException {
+ if(indexedEntityIdFile == null){
+ return null;
+ }
+ State state = getState();
+ if(state.ordinal() < State.INDEXED.ordinal()){
+ throw new IllegalStateException("The indexed entity id data is not" +
+ "available for states < "+State.INDEXED +" (current: "+state+")!");
+ }
+ //support compression
+ String extension = FilenameUtils.getExtension(indexedEntityIdFile.getName());
+ InputStream in = new FileInputStream(indexedEntityIdFile);
+ if("zip".equalsIgnoreCase(extension)){
+ in = new ZipInputStream(in);
+ ((ZipInputStream)in).getNextEntry();
+ } else if("gz".equalsIgnoreCase(extension)) {
+ in = new GZIPInputStream(in);
+ } else if("bz2".equalsIgnoreCase(extension)) {
+ in = new BZip2CompressorInputStream(in);
+ }
+ return in;
+ }
}
Modified: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/processor/FieldValueFilter.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/processor/FieldValueFilter.java?rev=1330107&r1=1330106&r2=1330107&view=diff
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/processor/FieldValueFilter.java (original)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/processor/FieldValueFilter.java Wed Apr 25 05:12:14 2012
@@ -94,10 +94,10 @@ public class FieldValueFilter implements
Object value = config.get(PARAM_FIELD);
if(value == null || value.toString().isEmpty()){
this.field = NamespaceEnum.getFullName(DEFAULT_FIELD);
- log.info("Using default Field %s",field);
+ log.info("Using default Field {}",field);
} else {
this.field = NamespaceEnum.getFullName(DEFAULT_FIELD);
- log.info("configured Field: %s",field);
+ log.info("configured Field: {}",field);
}
value = config.get(PARAM_VALUES);
if(value == null || value.toString().isEmpty()){
Added: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/processor/LdpathPostProcessor.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/processor/LdpathPostProcessor.java?rev=1330107&view=auto
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/processor/LdpathPostProcessor.java (added)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/processor/LdpathPostProcessor.java Wed Apr 25 05:12:14 2012
@@ -0,0 +1,80 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.stanbol.entityhub.indexing.core.processor;
+
+import java.util.Map;
+
+import org.apache.stanbol.entityhub.indexing.core.EntityProcessor;
+import org.apache.stanbol.entityhub.indexing.core.IndexingDestination;
+import org.apache.stanbol.entityhub.ldpath.EntityhubLDPath;
+import org.apache.stanbol.entityhub.ldpath.backend.YardBackend;
+import org.apache.stanbol.entityhub.servicesapi.model.Representation;
+import org.apache.stanbol.entityhub.servicesapi.yard.Yard;
+
+import at.newmedialab.ldpath.api.backend.RDFBackend;
+
+/**
+ * Uses the {@link IndexingDestination#getYard()} as LDPath {@link RDFBackend}
+ * for the execution of configured LDPath statements.<p>
+ * <b>NOTE</b> in contrast to the {@link LdpathProcessor} this implementation
+ * is not limited to a subset of ldpath programs.<p>
+ * Typical use cases of this processor include:<ul>
+ * <li> indexing transitive closures (e.g. "
+ * <code>skos:broaderTransitive = (skos:broader)*</code>")
+ * <li> collecting labels of referenced entities to be used for disambiguation
+ * (e.g. use lables of linked concepts in a SKOS concept scheme :
+ * "<code> <urn:disambiguate.label> = *[rdf:type is skos:Concept]/(skos:prefLabel | skos:altLabel)<code>")
+ * <li> advanced indexing rules that need paths longer than one (e.g. adding
+ * labels of redirects pointing to an entity
+ * "<code> rdfs:label = rdfs:label | (^rdfs:seeAlso/rdfs:label)</code>")
+ * </ul>
+ * <p>
+ * The focus on post-processing allows an easy configuration as the
+ * data source needs not to be configured, but is directly retrieved from
+ * the {@link IndexingDestination}. Note that this also means that if this
+ * processor is not used in the post-processing state results are unpredictable
+ * as they will depend on the indexing order of the entities!
+ *
+ * @author Rupert Westenthaler
+ *
+ */
+public class LdpathPostProcessor extends LdpathProcessor implements EntityProcessor {
+
+ @Override
+ public void setConfiguration(Map<String,Object> config) {
+ super.setConfiguration(config);
+ }
+
+ @Override
+ public boolean needsInitialisation() {
+ return true;
+ }
+
+ @Override
+ public void initialise() {
+ //override the ldpath instance used for the initialisation with
+ //the one using the IndexingDestination
+ //this is OK, because parsing ldpath programs anyway does only need
+ //the "value factory" role of the RDFBackend and does not actually
+ //access any data.
+ Yard yard = indexingConfig.getIndexingDestination().getYard();
+ YardBackend backend = new YardBackend(yard);
+ this.ldPath = new EntityhubLDPath(backend,yard.getValueFactory());
+ }
+
+
+}
Propchange: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/processor/LdpathPostProcessor.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/processor/LdpathProcessor.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/processor/LdpathProcessor.java?rev=1330107&r1=1330106&r2=1330107&view=diff
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/processor/LdpathProcessor.java (original)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/processor/LdpathProcessor.java Wed Apr 25 05:12:14 2012
@@ -58,13 +58,14 @@ public class LdpathProcessor implements
* By default appending of LDPath results to the parsed Representation is
* activeted
*/
- private static final boolean DEFAULT_APPEND_MODE = true;
+ public static final boolean DEFAULT_APPEND_MODE = true;
private final ValueFactory vf;
- private final EntityhubLDPath ldPath;
+ protected EntityhubLDPath ldPath;
private final SingleRepresentationBackend backend;
private Program<Object> program;
private boolean appendMode;
+ protected IndexingConfig indexingConfig;
public LdpathProcessor(){
vf = InMemoryValueFactory.getInstance();
@@ -107,7 +108,7 @@ public class LdpathProcessor implements
@Override
public void setConfiguration(Map<String,Object> config) {
- IndexingConfig indexingConfig = (IndexingConfig)config.get(IndexingConfig.KEY_INDEXING_CONFIG);
+ indexingConfig = (IndexingConfig)config.get(IndexingConfig.KEY_INDEXING_CONFIG);
//parse the ldpath
final File ldpathFile;
Object value = config.get(PARAMETER_LD_PATH);
Added: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/processor/LdpathSourceProcessor.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/processor/LdpathSourceProcessor.java?rev=1330107&view=auto
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/processor/LdpathSourceProcessor.java (added)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/processor/LdpathSourceProcessor.java Wed Apr 25 05:12:14 2012
@@ -0,0 +1,265 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.stanbol.entityhub.indexing.core.processor;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.net.URI;
+import java.nio.charset.Charset;
+import java.util.Collection;
+import java.util.Date;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.stanbol.entityhub.core.model.InMemoryValueFactory;
+import org.apache.stanbol.entityhub.indexing.core.EntityProcessor;
+import org.apache.stanbol.entityhub.indexing.core.config.IndexingConfig;
+import org.apache.stanbol.entityhub.ldpath.EntityhubLDPath.EntityhubConfiguration;
+import org.apache.stanbol.entityhub.ldpath.backend.SingleRepresentationBackend;
+import org.apache.stanbol.entityhub.servicesapi.model.Representation;
+import org.apache.stanbol.entityhub.servicesapi.model.ValueFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import at.newmedialab.ldpath.LDPath;
+import at.newmedialab.ldpath.api.backend.RDFBackend;
+import at.newmedialab.ldpath.api.transformers.NodeTransformer;
+import at.newmedialab.ldpath.exception.LDPathParseException;
+import at.newmedialab.ldpath.model.programs.Program;
+import at.newmedialab.ldpath.model.transformers.IdentityTransformer;
+import at.newmedialab.ldpath.parser.Configuration;
+
+/**
+ * LDpath based processor that tries to cast the
+ * @author westei
+ *
+ */
+public class LdpathSourceProcessor implements EntityProcessor {
+
+ private final Logger log = LoggerFactory.getLogger(LdpathProcessor.class);
+ /**
+ * @see LdpathProcessor#PARAMETER_LD_PATH
+ */
+ public static final String PARAMETER_LD_PATH = LdpathProcessor.PARAMETER_LD_PATH;
+ /**
+ * @see LdpathProcessor#PARAMETER_APPEND
+ */
+ public static final String PARAMETER_APPEND = LdpathProcessor.PARAMETER_APPEND;
+ /**
+ * @see LdpathProcessor#DEFAULT_APPEND_MODE
+ */
+ public static final boolean DEFAULT_APPEND_MODE = LdpathProcessor.DEFAULT_APPEND_MODE;
+
+ /**
+ * ValueFactory used to create Representation
+ */
+ private final ValueFactory vf = InMemoryValueFactory.getInstance();
+ /**
+ * {@link LDPath} instance of an unknown generic type (depends on the
+ * used Indexing source
+ */
+ @SuppressWarnings("rawtypes")
+ protected LDPath ldPath;
+ /**
+ * The RDF backend
+ */
+ @SuppressWarnings("rawtypes")
+ protected RDFBackend backend;
+ @SuppressWarnings("rawtypes")
+ protected Configuration configuration;
+ @SuppressWarnings("rawtypes")
+ private Map<String,NodeTransformer> transformer;
+ @SuppressWarnings("rawtypes")
+ private Program program;
+ /**
+ * If results are appended to the parsed Representation
+ */
+ private boolean appendMode;
+
+ /**
+ * The indexing configuration
+ */
+ protected IndexingConfig indexingConfig;
+ @Override
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ public void setConfiguration(Map<String,Object> config) {
+ indexingConfig = (IndexingConfig)config.get(IndexingConfig.KEY_INDEXING_CONFIG);
+ Object indexingSource;
+ //we need to check for both EntityDataProvider and EntityDataIterator
+ indexingSource = indexingConfig.getEntityDataProvider();
+ if(indexingSource == null){
+ indexingSource = indexingConfig.getDataInterable();
+ }
+ if(indexingSource == null){
+ throw new IllegalStateException("Indexing Configuration does not contain" +
+ "neither an EntityDataProvider nor an EntityIdIterator!");
+ }
+ if(indexingSource instanceof RDFBackend<?>){
+ //NOTE we use the EntityhubConfiguration to have the same pre-registered
+ // namespaces as the other components.
+ this.backend = (RDFBackend)indexingSource;
+ this.configuration = new EntityhubConfiguration(vf);
+ this.transformer = configuration.getTransformers();
+ this.ldPath = new LDPath(backend,configuration);
+ } else {
+ throw new IllegalArgumentException("The configured IndexingSource '"
+ + indexingSource.getClass().getSimpleName()+"' does not support "
+ + "LDPath (does not implement RDFBackend)! This Processor "
+ + "can only be used with IndexingSources that support LDPath!");
+ }
+ Object value = config.get(PARAMETER_LD_PATH);
+ final File ldpathFile;
+ if(value != null && !value.toString().isEmpty()){
+ ldpathFile = indexingConfig.getConfigFile(value.toString());
+ if(ldpathFile == null || !ldpathFile.exists()){
+ throw new IllegalArgumentException("Configured '"
+ + PARAMETER_LD_PATH +"' file was not found!");
+ }
+ if(!ldpathFile.isFile()){
+ throw new IllegalArgumentException("Configured '"
+ + PARAMETER_LD_PATH +"' file exists but is not a File!");
+ }
+ } else {
+ throw new IllegalArgumentException("Missing required configuration '"
+ + PARAMETER_LD_PATH +"' - the file containing the LDPath program used by this "
+ + LdpathProcessor.class.getSimpleName()+"!");
+ }
+ //The backend needs not to be initialised to parse a program as
+ //parsing only requires the "value converter" methods that need also to
+ //work without initialising
+ //if this is a Problem one can also move parsing to the init method
+ parseLdPathProgram(ldpathFile);
+ value = config.get(PARAMETER_APPEND);
+ if(value instanceof Boolean){
+ this.appendMode = ((Boolean) value).booleanValue();
+ } else if(value != null && !value.toString().isEmpty()){
+ this.appendMode = Boolean.parseBoolean(value.toString());
+ } else {
+ this.appendMode = DEFAULT_APPEND_MODE;
+ }
+ }
+
+ /**
+ *
+ */
+ @SuppressWarnings("unchecked")
+ private void parseLdPathProgram(File ldpathFile) {
+ Reader in = null;
+ try {
+ in = new InputStreamReader(new FileInputStream(ldpathFile), Charset.forName("UTF-8"));
+ this.program = ldPath.parseProgram(in);
+ log.info("ldpath program: \n{}\n",program.getPathExpression(backend));
+ } catch (IOException e) {
+ throw new IllegalStateException("Unabwle to read LDPath program from configured file '"
+ + ldpathFile +"'!",e);
+ } catch (LDPathParseException e) {
+ throw new IllegalStateException("Unable to parse LDPath program from configured file '"
+ + ldpathFile +"'!",e);
+ } finally {
+ IOUtils.closeQuietly(in);
+ }
+ }
+
+ @Override
+ public boolean needsInitialisation() {
+ return false;
+ }
+
+ @Override
+ public void initialise() {
+
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ @SuppressWarnings({"unchecked","rawtypes"})
+ @Override
+ public Representation process(Representation source) {
+ Object context = backend.createURI(source.getId());
+ Representation result = appendMode ? source : vf.createRepresentation(source.getId());
+ /*
+ * NOTE: LDPath will return Node instances of the RDFRepositroy if no
+ * transformation is defined for a statement (line) in the configured
+ * LDpath program (the ":: xsd:int" at the end). this Nodes need to be
+ * converted to valid Entityhub Representation values.
+ * As we can not know the generic type used by the RDFRepository
+ * implementation of the indexing source this is a little bit tricky.
+ * What this does is:
+ * - for URIs it creates References
+ * - for plain literal it adds natural texts
+ * - for typed literals it uses the NodeTransformer registered with
+ * the LDPath (or more precise the Configuration object parsed to
+ * the LDPath in the constructor) to transform the values to
+ * Java objects. If no transformer is found or an Exeption occurs
+ * than the lexical form is used and added as String to the
+ * Entityhub.
+ */
+ Map<String,Collection<Object>> resultMap = (Map<String,Collection<Object>>)program.execute(backend, context);
+ for(Entry<String,Collection<Object>> entry : resultMap.entrySet()){
+ NodeTransformer fieldTransformer = program.getField(entry.getKey()).getTransformer();
+ if(fieldTransformer == null || fieldTransformer instanceof IdentityTransformer<?>){
+ //we need to convert the RDFBackend Node to an Representation object
+ for(Object value : entry.getValue()){
+ if(backend.isURI(value)){
+ result.addReference(entry.getKey(), backend.stringValue(value));
+ } else if(backend.isLiteral(value)){ //literal
+ Locale lang = backend.getLiteralLanguage(value);
+ if(lang != null){ //text with language
+ result.addNaturalText(entry.getKey(), backend.stringValue(value), lang.getLanguage());
+ } else { // no language
+ URI type = backend.getLiteralType(value);
+ if(type != null){ //typed literal -> need to transform
+ NodeTransformer nt = transformer.get(type.toString());
+ if(nt != null){ //add typed literal
+ try {
+ result.add(entry.getKey(), nt.transform(backend, value));
+ } catch (RuntimeException e) {
+ log.info("Unable to transform {} to dataType {} -> will use lexical form",value,type);
+ result.add(entry.getKey(),backend.stringValue(value));
+ }
+ } else { //no transformer
+ log.info("No transformer for type {} -> will use lexical form",type);
+ result.add(entry.getKey(),backend.stringValue(value));
+
+ }
+ } else { //no langauge and no type -> literal with no language
+ result.addNaturalText(entry.getKey(), backend.stringValue(value));
+ }
+ }
+ } else { //bNode
+ log.info("Ignore bNode {} (class: {})",value,value.getClass());
+ }
+ } //end for all values
+ } else { //already a transformed values
+ result.add(entry.getKey(), entry.getValue()); //just add all values
+ }
+ }
+ return result;
+ }
+
+}
Propchange: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/processor/LdpathSourceProcessor.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/source/LineBasedEntityIterator.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/source/LineBasedEntityIterator.java?rev=1330107&r1=1330106&r2=1330107&view=diff
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/source/LineBasedEntityIterator.java (original)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/source/LineBasedEntityIterator.java Wed Apr 25 05:12:14 2012
@@ -223,12 +223,14 @@ public class LineBasedEntityIterator imp
log.info("activate URL decoding of Entity IDs");
}
value = config.get(PARAM_ENTITY_SCORE_FILE);
- if(value == null || value.toString().isEmpty()){
- scoreFile = indexingConfig.getSourceFile(DEFAULT_ENTITY_SCORE_FILE);
- } else {
- scoreFile = indexingConfig.getSourceFile(value.toString());
- }
- log.info("Set Source File to '"+this.scoreFile+"'");
+ if(reader == null){
+ if(value == null || value.toString().isEmpty()){
+ scoreFile = indexingConfig.getSourceFile(DEFAULT_ENTITY_SCORE_FILE);
+ } else {
+ scoreFile = indexingConfig.getSourceFile(value.toString());
+ }
+ log.info("Set Source File to '"+this.scoreFile+"'");
+ } //else reader parsed in the constructor ... nothing todo
//now done in the initialise() method
// try {
// initReader(new FileInputStream(scoreFile));
@@ -341,7 +343,7 @@ public class LineBasedEntityIterator imp
try {
while((line = reader.readLine()) != null){
lineCounter++;
- log.debug("> line = {}");
+ log.debug("> line = {}",line);
EntityScore entity = parseEntityFormLine(line);
if(entity != null){
return entity;