You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stanbol.apache.org by rw...@apache.org on 2012/04/25 07:12:15 UTC

svn commit: r1330107 [1/2] - in /incubator/stanbol/trunk: demos/ehealth/ entityhub/generic/servicesapi/src/main/java/org/apache/stanbol/entityhub/servicesapi/defaults/ entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/ entityh...

Author: rwesten
Date: Wed Apr 25 05:12:14 2012
New Revision: 1330107

URL: http://svn.apache.org/viewvc?rev=1330107&view=rev
Log:
Implementation of STANBOL-590, STANBOL-591, STANBOL-592 and STANBOL-593

In addition:

* added the new components in the documentation of the genericrdf indexer
* added Musicbrainz related namespaces to the Entityhub NamespaceEnum
* Entityhub LDPath default configuration is now public to allow usage also for non Entityhub LDPath instances (the namespace configuration are generally useful)
* Urify: A new command line utility that converts bNodes to URNs
    * This is important when importing Datasets with millions of bNodes, because during import those need to be kept in memory - what may hinder import of those datasets to JenaTDB


Added:
    incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/Urify.java   (with props)
    incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/processor/LdpathPostProcessor.java   (with props)
    incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/processor/LdpathSourceProcessor.java   (with props)
    incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/source/YardEntityDataProvider.java   (with props)
    incubator/stanbol/trunk/entityhub/indexing/source/jenatdb/src/main/java/org/apache/stanbol/entityhub/indexing/source/jenatdb/AbstractTdbBackend.java   (with props)
    incubator/stanbol/trunk/entityhub/indexing/source/jenatdb/src/main/java/org/apache/stanbol/entityhub/indexing/source/jenatdb/Constants.java   (with props)
    incubator/stanbol/trunk/entityhub/indexing/source/jenatdb/src/main/java/org/apache/stanbol/entityhub/indexing/source/jenatdb/ResourceFilterIterator.java   (with props)
    incubator/stanbol/trunk/entityhub/indexing/source/jenatdb/src/main/java/org/apache/stanbol/entityhub/indexing/source/jenatdb/Utils.java   (with props)
Modified:
    incubator/stanbol/trunk/demos/ehealth/README.md
    incubator/stanbol/trunk/entityhub/generic/servicesapi/src/main/java/org/apache/stanbol/entityhub/servicesapi/defaults/NamespaceEnum.java
    incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/Indexer.java
    incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/IndexerFactory.java
    incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/config/IndexingConfig.java
    incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/config/IndexingConstants.java
    incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/FinishedEntityDaemon.java
    incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/IndexerImpl.java
    incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/processor/FieldValueFilter.java
    incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/processor/LdpathProcessor.java
    incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/source/LineBasedEntityIterator.java
    incubator/stanbol/trunk/entityhub/indexing/destination/solryard/src/main/java/org/apache/stanbol/entityhub/indexing/destination/solryard/SolrYardIndexingDestination.java
    incubator/stanbol/trunk/entityhub/indexing/genericrdf/README.md
    incubator/stanbol/trunk/entityhub/indexing/genericrdf/src/main/resources/indexing/config/entityTypes.properties
    incubator/stanbol/trunk/entityhub/indexing/genericrdf/src/main/resources/indexing/config/indexing.properties
    incubator/stanbol/trunk/entityhub/indexing/source/jenatdb/pom.xml
    incubator/stanbol/trunk/entityhub/indexing/source/jenatdb/src/main/java/org/apache/stanbol/entityhub/indexing/source/jenatdb/RdfIndexingSource.java
    incubator/stanbol/trunk/entityhub/ldpath/src/main/java/org/apache/stanbol/entityhub/ldpath/EntityhubLDPath.java
    incubator/stanbol/trunk/entityhub/ldpath/src/main/java/org/apache/stanbol/entityhub/ldpath/backend/SingleRepresentationBackend.java
    incubator/stanbol/trunk/parent/pom.xml

Modified: incubator/stanbol/trunk/demos/ehealth/README.md
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/demos/ehealth/README.md?rev=1330107&r1=1330106&r2=1330107&view=diff
==============================================================================
--- incubator/stanbol/trunk/demos/ehealth/README.md (original)
+++ incubator/stanbol/trunk/demos/ehealth/README.md Wed Apr 25 05:12:14 2012
@@ -260,7 +260,7 @@ Sider, Drugbank and Dailymed are interli
     sideEffect = (owl:sameAs)+/sider:sideEffect/rdfs:label;
           
     genericName = (owl:sameAs)+/drugbank:genericName;
-    key = (owl:sameAs)+/drugbank:inchiKey;
+    inchiKey = (owl:sameAs)+/drugbank:inchiKey;
     indication = (owl:sameAs)+/drugbank:indication;
     foodInteraction = (owl:sameAs)+/drugbank:foodInteraction;
     toxicity = (owl:sameAs)+/drugbank:toxicity;

Modified: incubator/stanbol/trunk/entityhub/generic/servicesapi/src/main/java/org/apache/stanbol/entityhub/servicesapi/defaults/NamespaceEnum.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/generic/servicesapi/src/main/java/org/apache/stanbol/entityhub/servicesapi/defaults/NamespaceEnum.java?rev=1330107&r1=1330106&r2=1330107&view=diff
==============================================================================
--- incubator/stanbol/trunk/entityhub/generic/servicesapi/src/main/java/org/apache/stanbol/entityhub/servicesapi/defaults/NamespaceEnum.java (original)
+++ incubator/stanbol/trunk/entityhub/generic/servicesapi/src/main/java/org/apache/stanbol/entityhub/servicesapi/defaults/NamespaceEnum.java Wed Apr 25 05:12:14 2012
@@ -123,7 +123,28 @@ public enum NamespaceEnum {
     /**
      * The Music Ontology (http://musicontology.com/)
      */
-    mo("http://purl.org/ontology/mo/")
+    mo("http://purl.org/ontology/mo/"),
+    /**
+     *  The Time ontology (http://www.w3.org/TR/owl-time/)
+     */
+    owlTime("owl-time","http://www.w3.org/2006/time#"),
+    /**
+     *  The Event ontology (http://purl.org/NET/c4dm/event.owl#)
+     */
+    event("http://purl.org/NET/c4dm/event.owl#"),
+    /**
+     *  The Timeline ontology (http://purl.org/NET/c4dm/timeline.owl#)
+     */
+    timeline("http://purl.org/NET/c4dm/timeline.owl#"),
+    /**
+     *  Relationship: A vocabulary for describing relationships between people
+     *  (http://purl.org/vocab/relationship/)
+     */
+    rel("http://purl.org/vocab/relationship/"),
+    /**
+     *  Expression of Core FRBR Concepts in RDF (http://vocab.org/frbr/core)
+     */
+    frbr("http://purl.org/vocab/frbr/core#")
     ;
     /**
      * The logger

Added: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/Urify.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/Urify.java?rev=1330107&view=auto
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/Urify.java (added)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/Urify.java Wed Apr 25 05:12:14 2012
@@ -0,0 +1,302 @@
+package org.apache.stanbol.entityhub.indexing;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.PosixParser;
+import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
+import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.commons.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Implemented to allow importing the Musicbrainz dump to Jena TDB.<p>
+ * 
+ * The problem is that RDF dumps with blank nodes do require to store a
+ * lookup table for the black nodes IDs during import.<p> In case a dump
+ * contains millions of such nodes this table does no longer fit into
+ * memory. This makes importing Dumps to with the Jena RDF parser impossible.
+ * <p>
+ * This Utility can replaces nodes that start with "_:{id}" with
+ * "<{prefix}{id}>. The prefix must be set with the "-p" parameter.
+ * <p>
+ * This tool supports "gz" and "bz2" compressed files. If the output will use
+ * the same compression as the input. It uses two threads for "reading/processing" 
+ * and "writing". It could also process multiple files in parallel. However this
+ * feature is not yet activated as the Musicbrainz dump comes in a single file.
+ * 
+ * @author Rupert Westenthaler
+ *
+ */
+public class Urify implements Runnable{
+
+    private static final String EOF_INDICATOR = "__EOF_INDICATOR";
+    
+    private static Logger log = LoggerFactory.getLogger(Urify.class);
+    
+    private static final Options options;
+    static {
+        options = new Options();
+        options.addOption("h", "help", false, "display this help and exit");
+        options.addOption("p","prefix",true, 
+            "The URI prefix used for wrapping the bNode Id");
+        options.addOption("e","encoding",true, "the char encodinf (default: UTF-8)");
+    }
+    /**
+     * @param args
+     * @throws ParseException 
+     */
+    public static void main(String[] args) throws IOException, ParseException {
+        CommandLineParser parser = new PosixParser();
+        CommandLine line = parser.parse(options, args);
+        args = line.getArgs();
+        if(!line.hasOption('p')){
+            log.error("Missing parameter 'prefix' ('p)!");
+            System.exit(1);
+        }
+        String prefix = "<"+line.getOptionValue('p');
+        log.info("prfix: {} ",line.getOptionValue('p'));
+        Charset charset;
+        if(line.hasOption('e')){
+            charset = Charset.forName(line.getOptionValue('e'));
+            if(charset == null){
+                log.error("Unsupported encoding '{}'!",line.getOptionValue('e'));
+                System.exit(1);
+            }
+        } else {
+            charset = Charset.forName("UTF-8");
+        }
+        log.info("charset: {} ",charset.name());
+        Urify urify = new Urify(Arrays.asList(args), charset, prefix);
+        urify.run(); //TODO: this could support processing multiple files in parallel
+    }
+
+    private final Charset charset;
+    private final String prefix;
+    protected long start = System.currentTimeMillis();
+    protected long uf_count = 0;
+
+    private List<String> resources;
+
+    public Urify(List<String> resources, Charset charset, String prefix) throws IOException {
+        this.charset = charset;
+        this.prefix = prefix;
+        this.resources = Collections.synchronizedList(new ArrayList<String>(resources));
+    }
+    
+    public void run() {
+        String source;
+        do {
+            synchronized (resources) {
+                if(resources.isEmpty()){
+                    source = null;
+                } else {
+                    source = resources.remove(0);
+                    try {
+                        urify(source);
+                    } catch (Exception e) {
+                        log.error("Unable to Urify "+resources,e);
+                    }
+                }
+            }
+        } while (source != null);
+    }
+    private void urify(String resource) throws IOException {
+        File source = new File(resource);
+        if(source.isFile()){
+            String path = FilenameUtils.getFullPathNoEndSeparator(resource);
+            String name = FilenameUtils.getName(resource);
+            File target = new File(path,"uf_"+name);
+            int i=0;
+            while(target.exists()){
+                i++;
+                target = new File(path,"uf"+i+"_"+name);
+            }
+            InputStream is = new FileInputStream(source);
+            OutputStream os = new FileOutputStream(target);
+            log.info("Resource: {}",resource);
+            log.info("Target  : {}",target);
+            if ("gz".equalsIgnoreCase(FilenameUtils.getExtension(name))) {
+                is = new GZIPInputStream(is);
+                os = new GZIPOutputStream(os);
+                name = FilenameUtils.removeExtension(name);
+                log.debug("   - from GZIP Archive");
+            } else if ("bz2".equalsIgnoreCase(FilenameUtils.getExtension(name))) {
+                is = new BZip2CompressorInputStream(is);
+                os = new BZip2CompressorOutputStream(os);
+                name = FilenameUtils.removeExtension(name);
+                log.debug("   - from BZip2 Archive");
+            }// TODO: No Zip File support
+            //else no complression
+            BlockingQueue<String> queue = new ArrayBlockingQueue<String>(1000);
+            ReaderDaemon reader = new ReaderDaemon(new BufferedReader(new InputStreamReader(is, charset)), queue);
+            WriterDaemon writer = new WriterDaemon(
+                new BufferedWriter(new OutputStreamWriter(os, charset)), queue);
+            Thread readerDaemon = new Thread(reader,name+" reader");
+            Thread writerDaemon = new Thread(writer,name+" writer");
+            readerDaemon.setDaemon(true);
+            writerDaemon.setDaemon(true);
+            writerDaemon.start();
+            readerDaemon.start();
+            Object notifier = writer.getNotifier();
+            synchronized (notifier) { //wait until processed
+               if(!writer.completed()){
+                   try {
+                       notifier.wait();
+                   } catch (InterruptedException e) {/*ignore*/}
+               }
+            }
+            if(reader.getError() != null){
+                throw new IOException("Error while reading source "+source,reader.getError());
+            }
+            if(writer.getError() != null){
+                throw new IOException("Error while writing resource "+target,writer.getError());
+            }
+            log.info(" ... completed resource {}",resource);
+        } else {
+            throw new FileNotFoundException("Parsed File "+resource+" does not exist or is not a File!");
+        }
+    }
+
+    private class ReaderDaemon implements Runnable {
+        private final BufferedReader reader;
+        private final BlockingQueue<String> queue;
+
+        private Exception error;
+        
+        protected ReaderDaemon(BufferedReader reader, BlockingQueue<String> queue){
+            this.reader = reader;
+            this.queue = queue;
+        }
+        
+        
+        public Exception getError() {
+            return error;
+        }
+
+
+        @Override
+        public void run() {
+            String triple;
+            try {
+                while((triple = reader.readLine()) != null){
+                    StringBuilder sb = new StringBuilder();
+                    for(String node : triple.split(" ")){
+                        if(node.startsWith("_:")){ //convert to uri
+                            sb.append(prefix);
+                            sb.append(node.substring(2,node.length()));
+                            sb.append("> ");
+                            uf_count++;
+                        } else {
+                            sb.append(node);
+                            if(node.length()>1){
+                                //the final '.' is also a node
+                                sb.append(" ");
+                            }
+                        }
+                    }
+                    queue.put(sb.toString());
+                }
+            } catch (IOException e) {
+                error = e;
+            } catch (InterruptedException e) {
+                error = e;
+            } finally {
+                IOUtils.closeQuietly(reader);
+                try {
+                    queue.put(EOF_INDICATOR); //indicates finished
+                } catch (InterruptedException e) {
+                    log.error("Unable to put EOF to queue!",e);
+                } 
+            }
+        }
+    }
+    
+    private class WriterDaemon implements Runnable {
+        private final BufferedWriter writer;
+        private final BlockingQueue<String> queue;
+        private final Object notifier = new Object();
+        private boolean completed = false;
+
+        private Exception error;
+        
+        protected WriterDaemon(BufferedWriter writer, BlockingQueue<String> queue){
+            this.writer = writer;
+            this.queue = queue;
+        }
+        
+        public Exception getError() {
+            return error;
+        }
+
+        public Object getNotifier() {
+            return notifier;
+        }
+
+        public boolean completed() {
+            return completed;
+        }
+
+        @Override
+        public void run() {
+            String triple;
+            long count = 0;
+            boolean first = true;
+            try {
+                while(!EOF_INDICATOR.equals((triple = queue.take()))){
+                    if(count % 1000000 == 0){
+                        //NOTE: urified will not be correct as it is counted
+                        //      by an other thread, but for logging ...
+                        long end = System.currentTimeMillis();
+                        log.info("processed {} | urified: {} (batch: {}sec)",
+                            new Object[]{count,uf_count,((double)(end-start))/1000});
+                        start = end;
+                    }
+                    count++;
+                    if(first){
+                        first = false;
+                    } else {
+                        writer.write("\n");
+                    }
+                    writer.write(triple);
+                    
+                }
+            } catch (InterruptedException e) {
+                this.error = e;
+            } catch (IOException e) {
+                this.error = e;
+            } finally {
+                IOUtils.closeQuietly(writer);
+                this.completed = true;
+                synchronized (notifier) {
+                    notifier.notifyAll();
+                }
+            }
+        }
+    }
+}

Propchange: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/Urify.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/Indexer.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/Indexer.java?rev=1330107&r1=1330106&r2=1330107&view=diff
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/Indexer.java (original)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/Indexer.java Wed Apr 25 05:12:14 2012
@@ -16,6 +16,7 @@
 */
 package org.apache.stanbol.entityhub.indexing.core;
 
+import org.apache.stanbol.entityhub.servicesapi.model.Representation;
 import org.apache.stanbol.entityhub.servicesapi.yard.Yard;
 
 /**
@@ -56,6 +57,14 @@ public interface Indexer {
          */
         INDEXED,
         /**
+         * while post-processing of the indexed entities is performed
+         */
+        POSTPROCESSING,
+        /**
+         * after the post-processing of the entities has finished
+         */
+        POSTPROCESSED,
+        /**
          * While the {@link IndexingTarget} is finalising the indexing process.
          */
         FINALISING,
@@ -101,7 +110,7 @@ public interface Indexer {
      * This Method is intended to be used by caller that need more control over 
      * the indexing process as simple to call {@link #index()}.
      */
-    void initialiseIndexingSources();
+    void initialiseIndexing();
     /**
      * Brings this indexer in the {@link State#INDEXED} by indexing all Entities
      * provided by the {@link IndexingComponent}s. This method blocks until the  
@@ -113,7 +122,19 @@ public interface Indexer {
      * @throws IllegalStateException if {@link #getState()} &lt; 
      * {@link State#INITIALISED}
      */
-    void indexAllEntities() throws IllegalStateException;
+    void indexEntities() throws IllegalStateException;
+    /**
+     * Performs {@link State#POSTPROCESSING} after all entities are 
+     * {@link State#INDEXED} ({@link #indexEntities()} completed).<p>
+     * Post-processing will use the {@link IndexingDestination} as source and
+     * target. It will retrieve the {@link Representation} of each indexed
+     * entity and sent it to the configured post-processing 
+     * {@link EntityProcessor}s<p>
+     * The resulting {@link Representation} will be stored to the 
+     * {@link IndexingDestination}
+     * @throws IllegalStateException if the state is &lt {@link State#INDEXED}
+     */
+    void postProcessEntities() throws IllegalStateException;
     /**
      * {@link State#FINISHED Finalises} the indexing process by calling finalise
      * on the {@link IndexingDestination}. This method blocks until the  
@@ -125,13 +146,13 @@ public interface Indexer {
      * @throws IllegalStateException if {@link #getState()} &lt; 
      * {@link State#INDEXED}
      */
-    void finaliseIndexingTarget() throws IllegalStateException;
+    void finaliseIndexing() throws IllegalStateException;
     /**
      * Initialise the {@link IndexingComponent}s, indexes all entities and 
      * finalises the {@link IndexingDestination}. <p>
      * Calls to this method do have the same result as subsequent calls to 
-     * {@link #initialiseIndexingSources()}, {@link #indexAllEntities()},
-     * {@link #finaliseIndexingTarget()}. This method can also be used if any of
+     * {@link #initialiseIndexing()}, {@link #indexEntities()},
+     * {@link #finaliseIndexing()}. This method can also be used if any of
      * the mentioned three methods was already called to this indexer instance.
      * <p>
      * This method blocks until the whole process is completed. Ideal if the 

Modified: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/IndexerFactory.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/IndexerFactory.java?rev=1330107&r1=1330106&r2=1330107&view=diff
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/IndexerFactory.java (original)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/IndexerFactory.java Wed Apr 25 05:12:14 2012
@@ -16,6 +16,10 @@
 */
 package org.apache.stanbol.entityhub.indexing.core;
 
+import static org.apache.stanbol.entityhub.indexing.core.config.IndexingConfig.DEFAULT_INDEXED_ENTITIES_ID_FILE_NAME;
+
+import java.io.File;
+import java.io.IOException;
 import java.util.List;
 
 import org.apache.stanbol.entityhub.indexing.core.config.IndexingConfig;
@@ -100,12 +104,16 @@ public class IndexerFactory {
                 "'{}' in the indexing.properties within the directory {}",
                 IndexingConstants.KEY_ENTITY_PROCESSOR,config.getConfigFolder());
         }
+        List<EntityProcessor> postProcessors = config.getEntityPostProcessors();
         log.info("Present Source Configuration:");
         log.info(" - EntityDataIterable: {}",dataIterable);
         log.info(" - EntityIterator: {}",idIterator);
         log.info(" - EntityDataProvider: {}",dataProvider);
         log.info(" - EntityScoreProvider: {}",scoreProvider);
         log.info(" - EntityProcessors ({}):",processors.size());
+        if(postProcessors != null){
+            log.info(" - EntityPostProcessors ({}):",postProcessors.size());
+        }
         int i=0;
         for(EntityProcessor processor : processors){
             i++;
@@ -114,11 +122,13 @@ public class IndexerFactory {
         if(dataIterable != null && scoreProvider != null){
             // iterate over data and lookup scores
             indexer = new IndexerImpl(dataIterable, scoreProvider, 
-                config.getNormaliser(),destination, processors);
+                config.getNormaliser(),destination, processors,
+                config.getIndexedEntitiesIdsFile(),postProcessors);
         } else if(idIterator != null && dataProvider != null){
             // iterate over id and lookup data
             indexer = new IndexerImpl(idIterator,dataProvider,
-                config.getNormaliser(),destination, processors);
+                config.getNormaliser(),destination, processors,
+                config.getIndexedEntitiesIdsFile(),postProcessors);
         } else if(dataIterable != null && idIterator != null){
             // create an EntityIterator to EntityScoreProvider adapter
             log.info(
@@ -128,7 +138,8 @@ public class IndexerFactory {
             	idIterator.getClass(), dataIterable.getClass());
             indexer = new IndexerImpl(dataIterable,
                 new EntityIneratorToScoreProviderAdapter(idIterator),
-                config.getNormaliser(),destination, processors);
+                config.getNormaliser(),destination, processors,
+                config.getIndexedEntitiesIdsFile(),postProcessors);
         } else {
             log.error("Invalid Indexing Source configuration: ");
             log.error(" - To iterate over the data and lookup scores one need to " +
@@ -143,12 +154,42 @@ public class IndexerFactory {
     public Indexer create(EntityIterator idIterator, EntityDataProvider dataProvider,
                           ScoreNormaliser normaliser,
                           List<EntityProcessor> processors, IndexingDestination destination){
-        return new IndexerImpl(idIterator, dataProvider, normaliser,destination, processors);
+        return new IndexerImpl(idIterator, dataProvider, normaliser,destination, processors,null,null);
+    }
+    public Indexer create(EntityIterator idIterator, EntityDataProvider dataProvider,
+                          ScoreNormaliser normaliser,
+                          List<EntityProcessor> processors, List<EntityProcessor> postProcessors,
+                          IndexingDestination destination){
+        File tmp;
+        try {
+            tmp = File.createTempFile("ind-ent-ids","zip");
+            tmp.deleteOnExit();
+        } catch (IOException e) {
+            throw new IllegalStateException("Unable to create temporary file for storing the" +
+                    "indexed Entity IDs",e);
+        }
+        return new IndexerImpl(idIterator, dataProvider, normaliser,destination, processors,
+            tmp,postProcessors);
     }
     
     public Indexer create(EntityDataIterable dataIterable,EntityScoreProvider scoreProvider,
                           ScoreNormaliser normaliser,
                           List<EntityProcessor> processors, IndexingDestination destination){
-        return new IndexerImpl(dataIterable, scoreProvider, normaliser,destination, processors);
+        return new IndexerImpl(dataIterable, scoreProvider, normaliser,destination, processors,null,null);
+    }
+    public Indexer create(EntityDataIterable dataIterable,EntityScoreProvider scoreProvider,
+                          ScoreNormaliser normaliser,
+                          List<EntityProcessor> processors,  List<EntityProcessor> postProcessors,
+                          IndexingDestination destination){
+        File tmp;
+        try {
+            tmp = File.createTempFile("ind-ent-ids","zip");
+            tmp.deleteOnExit();
+        } catch (IOException e) {
+            throw new IllegalStateException("Unable to create temporary file for storing the" +
+                    "indexed Entity IDs",e);
+        }
+        return new IndexerImpl(dataIterable, scoreProvider, normaliser,destination, processors,
+            tmp,postProcessors);
     }
 }

Modified: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/config/IndexingConfig.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/config/IndexingConfig.java?rev=1330107&r1=1330106&r2=1330107&view=diff
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/config/IndexingConfig.java (original)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/config/IndexingConfig.java Wed Apr 25 05:12:14 2012
@@ -20,6 +20,7 @@ import static org.apache.stanbol.entityh
 import static org.apache.stanbol.entityhub.indexing.core.config.IndexingConstants.KEY_ENTITY_DATA_ITERABLE;
 import static org.apache.stanbol.entityhub.indexing.core.config.IndexingConstants.KEY_ENTITY_DATA_PROVIDER;
 import static org.apache.stanbol.entityhub.indexing.core.config.IndexingConstants.KEY_ENTITY_ID_ITERATOR;
+import static org.apache.stanbol.entityhub.indexing.core.config.IndexingConstants.KEY_ENTITY_POST_PROCESSOR;
 import static org.apache.stanbol.entityhub.indexing.core.config.IndexingConstants.KEY_ENTITY_PROCESSOR;
 import static org.apache.stanbol.entityhub.indexing.core.config.IndexingConstants.KEY_ENTITY_SCORE_PROVIDER;
 import static org.apache.stanbol.entityhub.indexing.core.config.IndexingConstants.KEY_INDEXING_DESTINATION;
@@ -84,6 +85,8 @@ public class IndexingConfig {
     private static final Logger log = LoggerFactory.getLogger(IndexingConfig.class);
     private static final String DEFAULT_INDEX_FIELD_CONFIG_FILE_NAME = "indexFieldConfig.txt";
     
+    public static final String DEFAULT_INDEXED_ENTITIES_ID_FILE_NAME = "indexed-entities-ids.zip";
+    
     /**
      * This stores the context within the classpath to initialise missing
      * configurations and source based on the defaults in the classpath.
@@ -184,6 +187,14 @@ public class IndexingConfig {
      */
     private List<EntityProcessor> entityProcessor = null;
     /**
+     * The {@link EntityProcessor}s initialised based on the value
+     * of the {@link IndexingConstants#KEY_ENTITY_POST_PROCESSOR} key or
+     * <code>null</code> if not configured.
+     * This variable uses lazy initialisation
+     * @see #getEntityProcessor()
+     */
+    private List<EntityProcessor> entityPostProcessor = null;
+    /**
      * The {@link IndexingDestination} instance initialised based on the value
      * of the {@link IndexingConstants#KEY_INDEXING_DESTINATION} key or
      * <code>null</code> if not configured.
@@ -837,6 +848,40 @@ public class IndexingConfig {
             return null;
         }
     }
+    /**
+     * Getter for the {@link EntityProcessor}s configured to be used for
+     * post-processing or <code>null</code> if none.
+     * @return
+     */
+    public List<EntityProcessor> getEntityPostProcessors(){
+        if(entityPostProcessor != null){
+            return entityPostProcessor;
+        } else if(configuration.containsKey(KEY_ENTITY_POST_PROCESSOR)){
+            List<ConfigEntry> configs = parseConfigEntries(configuration.get(KEY_ENTITY_POST_PROCESSOR).toString());
+            List<EntityProcessor> postProcessorList = new ArrayList<EntityProcessor>(configs.size());
+            for(ConfigEntry config : configs){
+                EntityProcessor postProcessor;
+                try {
+                    postProcessor = (EntityProcessor)Class.forName(config.getClassName()).newInstance();
+                } catch (Exception e) {
+                    throw new IllegalArgumentException("Invalid EntityProcessor configuration '"+config.getConfigString()+"' for post-processing!",e);
+                }
+                //add the configuration
+                Map<String,Object> configMap = getComponentConfig(config, postProcessor.getClass().getSimpleName(), false);
+                //add also the directly provided parameters
+                configMap.putAll(config.getParams());
+                postProcessor.setConfiguration(configMap);
+                postProcessorList.add(postProcessor);
+            }
+            if(!postProcessorList.isEmpty()){ //do not set empty lists
+                entityPostProcessor = Collections.unmodifiableList(postProcessorList);
+            }
+            return entityPostProcessor;
+        } else {
+            return null;
+        }
+    }
+    
     public IndexingDestination getIndexingDestination() {
         if(indexingDestination != null){
             return indexingDestination;
@@ -857,6 +902,16 @@ public class IndexingConfig {
             return null;
         }
     }
+    public File getIndexedEntitiesIdsFile(){
+        Object value = configuration.get(IndexingConstants.KEX_INDEXED_ENTITIES_FILE);
+        if(value == null){
+            return new File(getDestinationFolder(),DEFAULT_INDEXED_ENTITIES_ID_FILE_NAME);
+        } else if (value.toString().isEmpty()){
+            return null; //deactivate this feature;
+        } else {
+            return new File(getDestinationFolder(),value.toString());
+        }
+    }
 
     private void initNormaliser() {
         Object value = configuration.get(IndexingConstants.KEY_SCORE_NORMALIZER);

Modified: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/config/IndexingConstants.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/config/IndexingConstants.java?rev=1330107&r1=1330106&r2=1330107&view=diff
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/config/IndexingConstants.java (original)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/config/IndexingConstants.java Wed Apr 25 05:12:14 2012
@@ -34,6 +34,11 @@ public interface IndexingConstants {
     String KEY_INDEXING_DESTINATION = "indexingDestination";
     String KEY_INDEX_FIELD_CONFIG = "fieldConfiguration";
     /**
+     * Name of the file relative to the destination folder used to store
+     * the IDs of indexed Entities.
+     */
+    String KEX_INDEXED_ENTITIES_FILE = "indexedEntitiesFile";
+    /**
      * usage:<br>
      * <pre>
      * {class1},name:{name1};{class2},name:{name2};...
@@ -43,4 +48,5 @@ public interface IndexingConstants {
      */
     String KEY_SCORE_NORMALIZER      = "scoreNormalizer";
     String KEY_ENTITY_PROCESSOR      = "entityProcessor";
+    String KEY_ENTITY_POST_PROCESSOR = "entityPostProcessor";
 }

Modified: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/FinishedEntityDaemon.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/FinishedEntityDaemon.java?rev=1330107&r1=1330106&r2=1330107&view=diff
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/FinishedEntityDaemon.java (original)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/FinishedEntityDaemon.java Wed Apr 25 05:12:14 2012
@@ -21,8 +21,15 @@ import static org.apache.stanbol.entityh
 import static org.apache.stanbol.entityhub.indexing.core.impl.IndexerConstants.SOURCE_STARTED;
 import static org.apache.stanbol.entityhub.indexing.core.impl.IndexerConstants.STORE_DURATION;
 
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.FileOutputStream;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.nio.charset.Charset;
 import java.util.concurrent.BlockingQueue;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.stanbol.entityhub.servicesapi.model.Representation;
 import org.slf4j.Logger;
 
@@ -57,11 +64,20 @@ public class FinishedEntityDaemon extend
     private long countedAll;
     private long countedMajor;
     private long countedMinor;
+    /**
+     * Allows to write finished ids to a file. one ID per line
+     */
+    private final BufferedWriter idWriter;
+    /**
+     * The charset used for the {@link #idWriter}
+     */
+    private static final Charset UTF8 = Charset.forName("UTF-8");
     
     
     public FinishedEntityDaemon(BlockingQueue<QueueItem<Representation>> consume,
                                   int majorInterval,
-                                  Logger out) {
+                                  Logger out,
+                                  OutputStream idOut) {
         super("Indexing: Finished Entity Logger Deamon",
             IndexerConstants.SEQUENCE_NUMBER_FINISHED_DAEMON,
             consume, null, null);
@@ -72,6 +88,11 @@ public class FinishedEntityDaemon extend
             this.major = DEFAULT_MAJOR_INTERVAL;
         }
         this.minor = major/10;
+        if(idOut != null){
+            this.idWriter = new BufferedWriter(new OutputStreamWriter(idOut, UTF8));
+        } else {
+            this.idWriter = null;
+        }
     }
 
     @Override
@@ -85,6 +106,17 @@ public class FinishedEntityDaemon extend
         while(!isQueueFinished()){
             QueueItem<Representation> item = consume();
             if(item != null){
+                if(idWriter != null && item.getItem() != null){
+                    String id = item.getItem().getId();
+                    try {
+                        if(count != 0){
+                            idWriter.newLine();
+                        }
+                        idWriter.write(id);
+                    } catch (Exception e){
+                        log.error("Exception while logging ID of indexed Entity '"+id+"'!",e);
+                    }
+                }
                 current = System.currentTimeMillis();
                 if(count == 0){
                     start = System.currentTimeMillis(); //default for the start!
@@ -154,6 +186,7 @@ public class FinishedEntityDaemon extend
             }
         }
         printSummary(current);
+        IOUtils.closeQuietly(idWriter);
         setFinished();
     }
 

Modified: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/IndexerImpl.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/IndexerImpl.java?rev=1330107&r1=1330106&r2=1330107&view=diff
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/IndexerImpl.java (original)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/IndexerImpl.java Wed Apr 25 05:12:14 2012
@@ -18,18 +18,35 @@ package org.apache.stanbol.entityhub.ind
 import static org.apache.stanbol.entityhub.indexing.core.impl.IndexerConstants.INDEXING_COMPLETED_QUEUE_ITEM;
 import static org.apache.stanbol.entityhub.indexing.core.impl.IndexerConstants.SCORE_FIELD;
 
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+import java.util.zip.ZipOutputStream;
 
+import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
+import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.commons.io.IOUtils;
 import org.apache.stanbol.entityhub.indexing.core.EntityDataIterable;
 import org.apache.stanbol.entityhub.indexing.core.EntityDataIterator;
 import org.apache.stanbol.entityhub.indexing.core.EntityDataProvider;
@@ -48,6 +65,8 @@ import org.apache.stanbol.entityhub.inde
 import org.apache.stanbol.entityhub.indexing.core.impl.IndexingSourceInitialiser.IndexingSourceInitialiserListener;
 import org.apache.stanbol.entityhub.indexing.core.normaliser.ScoreNormaliser;
 import org.apache.stanbol.entityhub.indexing.core.processor.EmptyProcessor;
+import org.apache.stanbol.entityhub.indexing.core.source.LineBasedEntityIterator;
+import org.apache.stanbol.entityhub.indexing.core.source.YardEntityDataProvider;
 import org.apache.stanbol.entityhub.servicesapi.model.Representation;
 import org.apache.stanbol.entityhub.servicesapi.yard.Yard;
 import org.slf4j.Logger;
@@ -80,23 +99,6 @@ public class IndexerImpl implements Inde
 
     private int chunkSize;
     public static final int MIN_QUEUE_SIZE = 500;
-    /**
-     * Queue used to add Entities read from the IndexingSource(s). This queue
-     * is consumed by the {@link EntityProcessorRunnable}.
-     */
-    private BlockingQueue<QueueItem<Representation>> indexedEntityQueue;
-    /**
-     * Queue used to add processed Entities. This queue is consumed by the
-     * {@link EntityPersisterRunnable}.
-     */
-    private BlockingQueue<QueueItem<Representation>> processedEntityQueue;
-    /**
-     * Queue used to add finished Entities. Mainly used for counting and
-     * logging
-     */
-    private BlockingQueue<QueueItem<Representation>> finishedEntityQueue;
-
-    private BlockingQueue<QueueItem<IndexingError>> errorEntityQueue;
     
     private boolean indexAllEntitiesState = false;
     
@@ -114,13 +116,29 @@ public class IndexerImpl implements Inde
     
     private State state = State.UNINITIALISED;
     private final Object stateSync = new Object();
+    /**
+     * The file to store the IDs of indexed entities. Might be <code>null</code>
+     * if {@link #entityPostProcessors} is <code>null</code>.
+     */
+    private File indexedEntityIdFile;
+    /**
+     * The {@link EntityProcessor}s used for the post processing phase or
+     * <code>null</code> if none.<p>
+     * If NOT <code>null</code> {@link #indexedEntityIdFile} is also
+     * guaranteed to be NET <code>null</code>!
+     */
+    private List<EntityProcessor> entityPostProcessors;
+
+    private OutputStream indexedEntityIdOutputStream;
     
     public IndexerImpl(EntityIterator entityIterator,
                        EntityDataProvider dataProvider,
                        ScoreNormaliser normaliser,
                        IndexingDestination indexingDestination, 
-                       List<EntityProcessor> entityProcessors){
-        this(normaliser,indexingDestination,entityProcessors);
+                       List<EntityProcessor> entityProcessors,
+                       File indexedEntityIdFile,
+                       List<EntityProcessor> entityPostProcessors){
+        this(normaliser,indexingDestination,entityProcessors,indexedEntityIdFile,entityPostProcessors);
         //set entityMode interfaces
         if(entityIterator == null){
             throw new IllegalArgumentException("The EntityIterator MUST NOT be NULL!");
@@ -135,8 +153,10 @@ public class IndexerImpl implements Inde
                        EntityScoreProvider scoreProvider, 
                        ScoreNormaliser normaliser,
                        IndexingDestination indexingDestination, 
-                       List<EntityProcessor> entityProcessors){
-        this(normaliser,indexingDestination,entityProcessors);
+                       List<EntityProcessor> entityProcessors,
+                       File indexedEntityIdFile,
+                       List<EntityProcessor> entityPostProcessors){
+        this(normaliser,indexingDestination,entityProcessors,indexedEntityIdFile,entityPostProcessors);
         //deactivate entityMode interfaces
         this.entityIterator = null;
         if(scoreProvider == null){
@@ -151,7 +171,9 @@ public class IndexerImpl implements Inde
     
     protected IndexerImpl(ScoreNormaliser normaliser,
                           IndexingDestination indexingDestination, 
-                          List<EntityProcessor> entityProcessors){
+                          List<EntityProcessor> entityProcessors,
+                          File indexedEntityIdFile,
+                          List<EntityProcessor> entityPostProcessors){
         if(indexingDestination == null){
             throw new IllegalArgumentException("The Yard MUST NOT be NULL!");
         }
@@ -167,6 +189,21 @@ public class IndexerImpl implements Inde
         indexingComponents.add(indexingDestination);
         indexingComponents.addAll(entityProcessors);
         listeners = new HashSet<IndexingListener>();
+        this.indexedEntityIdFile = indexedEntityIdFile;
+        if(entityPostProcessors != null){
+            if(entityPostProcessors.isEmpty()){
+                this.entityPostProcessors = null;
+            } else {
+                this.entityPostProcessors = entityPostProcessors;
+                if(indexedEntityIdFile == null){
+                    throw new IllegalArgumentException("The file used to store" +
+                    		"the IDs of indexed Entities MUST NOT be NULL if" +
+                    		"EntityPostProcessors are defined!");
+                }
+            }
+        } else {
+            this.entityPostProcessors = null;
+        }
     }
     public boolean addIndexListener(IndexingListener listener){
         if(listener != null){
@@ -226,7 +263,7 @@ public class IndexerImpl implements Inde
         return indexingDestination.getYard();
     }
     @Override
-    public void initialiseIndexingSources() {
+    public void initialiseIndexing() {
         synchronized (stateSync) { //ensure that two threads do not start the
             //initialisation at the same time ...
             if(getState() != State.UNINITIALISED){
@@ -279,6 +316,20 @@ public class IndexerImpl implements Inde
                 }
             }
         }
+        //initialise the stream used to write the ids of indexed entities
+        try {
+            indexedEntityIdOutputStream = getEntityIdFileOutputStream();
+        } catch (IOException e) {
+            if(entityPostProcessors != null){
+                throw new IllegalStateException("Unable to open stream for writing" +
+                        "the IDs of indexed Entities as required for post-" +
+                        "processing!",e);
+            } else {
+                log.warn("Unable to open stream for writing the Ids of indexed " +
+                        "Entities -> indexes entity Ids will not be available!",e);
+            }
+        }
+
         log.info("Initialisation completed");
         setState(State.INITIALISED);
     }
@@ -297,7 +348,9 @@ public class IndexerImpl implements Inde
                 "Calling this Method is not supported while in State %s! Supported States are ",
                 state,supportedStates));
         }
-        initialiseIndexingSources();
+        log.info("start initialisation ...");
+        initialiseIndexing();
+        log.info("  ... initialisation completed");
         //if now the state is an unsupported one it indicates that
         //initialiseIndexingSources() was called by an other thread before this one!
         state = getState(); 
@@ -306,15 +359,18 @@ public class IndexerImpl implements Inde
                 "Calling this Method is not supported while in State %s! Supported States are ",
                 state,supportedStates));
         }
-        log.info("Start Indexing");
-        indexAllEntities();
-        log.info("Indexing completed ...");
+        log.info("start indexing ...");
+        indexEntities();
+        log.info("  ... indexing completed");
+        log.info("start post-processing ...");
+        postProcessEntities();
+        log.info("  ... post-processing finished ...");
         log.info("start finalisation....");
-        finaliseIndexingTarget();
-        log.info("Indexing finished!");
+        finaliseIndexing();
+        log.info("  ...finalisation completed");
     }
     @Override
-    public void finaliseIndexingTarget() {
+    public void postProcessEntities() {
         synchronized (stateSync) { //ensure that two threads do not start the
             //initialisation at the same time ...
             State state = getState();
@@ -324,6 +380,153 @@ public class IndexerImpl implements Inde
             if(state != State.INDEXED){ //if state > INITIALISED
                 return; // ignore this call
             }
+            setState(State.POSTPROCESSING);
+            log.info("Indexing started ...");
+        }
+        if(entityPostProcessors == null || entityPostProcessors.isEmpty()){
+            setState(State.POSTPROCESSED);
+            return; //nothing to do
+        }
+        //init the post processing components
+        //use an EntityDataProvider based on the indexed data
+        EntityDataProvider dataProvider = new YardEntityDataProvider(indexingDestination.getYard());
+        //use an LineBasedEntityIterator to iterate over the indexed entity ids
+        EntityIterator entityIterator;
+        try {
+            entityIterator = new LineBasedEntityIterator(getEntityIdFileInputStream(),"UTF-8",null);
+        }  catch (IOException e) {
+            throw new IllegalStateException("Unable to open file containing the " +
+            		"IDs of the indexed Entities!",e);
+        }
+        Map<String,Object> config = new HashMap<String,Object>();
+        config.put(LineBasedEntityIterator.PARAM_ID_POS, 1);
+        config.put(LineBasedEntityIterator.PARAM_SCORE_POS, Integer.MAX_VALUE);
+        entityIterator.setConfiguration(config);
+        //init the post-processors (this time not in an own thread as this
+        //does not really make sense for processors
+        for(EntityProcessor processor : entityPostProcessors){
+            if(processor.needsInitialisation()){
+                processor.initialise();
+            }
+        }
+        //NOTE the destination needs not to be initialised -> it will be the same
+        //as for indexing!
+
+        //initialisation complete ... now setup the poet processing
+        //init the queues
+        int queueSize = Math.max(MIN_QUEUE_SIZE, chunkSize*2);
+        BlockingQueue<QueueItem<Representation>> indexedEntityQueue = 
+                new ArrayBlockingQueue<QueueItem<Representation>>(queueSize);
+        BlockingQueue<QueueItem<Representation>> processedEntityQueue = 
+                new ArrayBlockingQueue<QueueItem<Representation>>(queueSize);
+        BlockingQueue<QueueItem<Representation>> finishedEntityQueue = 
+                new ArrayBlockingQueue<QueueItem<Representation>>(queueSize);
+        BlockingQueue<QueueItem<IndexingError>> errorEntityQueue = 
+                new ArrayBlockingQueue<QueueItem<IndexingError>>(queueSize);
+
+        //Set holding all active post processing deamons
+        final SortedSet<IndexingDaemon<?,?>> activeIndexingDeamons = 
+            new TreeSet<IndexingDaemon<?,?>>();
+        //create the IndexingDaemos
+        //TODO: Here we would need to create multiple instances in case
+        //      one would e.g. like to use several threads for processing entities
+        //(1) the daemon reading from the IndexingSources
+        String entitySourceReaderName = "Post-processing: Entity Reader Deamon";
+        activeIndexingDeamons.add(
+            new EntityIdBasedIndexingDaemon(
+                entitySourceReaderName,
+                indexedEntityQueue, errorEntityQueue, 
+                entityIterator, 
+                dataProvider, 
+                null, //no score normaliser
+                true)); //post-process all indexed entities
+        //(2) The daemon for post-processing the entities
+        activeIndexingDeamons.add(
+            new EntityProcessorRunnable(
+                "Post-processing: Entity Processor Deamon",
+                indexedEntityQueue, //it consumes indexed Entities
+                processedEntityQueue,  //it produces processed Entities
+                errorEntityQueue,
+                entityPostProcessors, 
+                //TODO: check that the score is not overriden by the NULL
+                //      parsed by the used LineBasedEntityIterator!
+                Collections.singleton(SCORE_FIELD))); //ensure the score not changed
+        //(3) The daemon for persisting the entities
+        activeIndexingDeamons.add(
+            new EntityPersisterRunnable(
+                "Indexing: Entity Perstisting Deamon",
+                processedEntityQueue, //it consumes processed Entities
+                finishedEntityQueue, //it produces finished Entities
+                errorEntityQueue,
+                chunkSize, indexingDestination.getYard()));
+        //(4) The daemon for logging finished entities
+        activeIndexingDeamons.add(
+            new FinishedEntityDaemon(
+                finishedEntityQueue, -1, log, 
+                null)); //we have already all entity ids!
+        //(5) The daemon for logging errors
+        activeIndexingDeamons.add(
+            new EntityErrorLoggerDaemon(
+            errorEntityQueue, log));
+        //start post-processing and wait until it has finished
+        startAndWait(activeIndexingDeamons);        
+        
+        
+        setState(State.POSTPROCESSED);
+    }
+    /**
+     * Internally used to start the indexing/post-processing daemons and wait
+     * until they have finished.
+     * @param activeIndexingDeamons the deamos to start
+     */
+    private void startAndWait(final SortedSet<IndexingDaemon<?,?>> activeIndexingDeamons) {
+        //We need an listener for the IndexingDaemons we are about to start!
+        final IndexingDaemonListener listener = new IndexingDaemonListener() {
+            @Override
+            public void indexingDaemonFinished(IndexingDaemonEventObject indexingDaemonEventObject) {
+                //looks like one has finished
+                IndexingDaemon<?,?> indexingDaemon = indexingDaemonEventObject.getSource();
+                //handle the finished indexing daemon
+                handleFinishedIndexingDaemon(activeIndexingDeamons, indexingDaemon);
+                //finally remove the listener
+                indexingDaemon.removeIndexingDaemonListener(this);
+
+            }
+        };
+        //now start the IndexingDaemons in their own Threads
+        Set<IndexingDaemon<?,?>> deamonCopy = 
+            new HashSet<IndexingDaemon<?,?>>(activeIndexingDeamons);
+        for(IndexingDaemon<?,?> deamon : deamonCopy){
+            deamon.addIndexingDaemonListener(listener); //add the listener
+            Thread thread = new Thread(deamon);// create the thread
+            thread.setDaemon(true); //ensure that the JVM can terminate
+            thread.setName(deamon.getName()); // set the name of the thread
+            thread.start(); //start the Thread
+        }
+        //now we need to wait until all Threads have finished ...
+        while(!activeIndexingDeamons.isEmpty()){
+            synchronized (activeIndexingDeamons) {
+                try {
+                    activeIndexingDeamons.wait();
+                } catch (InterruptedException e) {
+                    //year ... looks like we are done
+                }
+            }
+        }
+        //done!
+    }
+
+    @Override
+    public void finaliseIndexing() {
+        synchronized (stateSync) { //ensure that two threads do not start the
+            //initialisation at the same time ...
+            State state = getState();
+            if(state.ordinal() < State.POSTPROCESSED.ordinal()){
+                throw new IllegalStateException("The Indexer MUST BE already "+State.POSTPROCESSED+" when calling this Method!");
+            }
+            if(state != State.POSTPROCESSED){ //if state > POSTPROCESSED
+                return; // ignore this call
+            }
             setState(State.FINALISING);
             log.info("Indexing started ...");
         }
@@ -331,7 +534,7 @@ public class IndexerImpl implements Inde
         setState(State.FINISHED);
     }
     @Override
-    public void indexAllEntities() {
+    public void indexEntities() {
         synchronized (stateSync) { //ensure that two threads do not start the
             //initialisation at the same time ...
             State state = getState();
@@ -346,10 +549,14 @@ public class IndexerImpl implements Inde
         }
         //init the queues
         int queueSize = Math.max(MIN_QUEUE_SIZE, chunkSize*2);
-        indexedEntityQueue = new ArrayBlockingQueue<QueueItem<Representation>>(queueSize);
-        processedEntityQueue = new ArrayBlockingQueue<QueueItem<Representation>>(queueSize);
-        finishedEntityQueue = new ArrayBlockingQueue<QueueItem<Representation>>(queueSize);
-        errorEntityQueue = new ArrayBlockingQueue<QueueItem<IndexingError>>(queueSize);
+        BlockingQueue<QueueItem<Representation>> indexedEntityQueue = 
+                new ArrayBlockingQueue<QueueItem<Representation>>(queueSize);
+        BlockingQueue<QueueItem<Representation>> processedEntityQueue = 
+                new ArrayBlockingQueue<QueueItem<Representation>>(queueSize);
+        BlockingQueue<QueueItem<Representation>> finishedEntityQueue = 
+                new ArrayBlockingQueue<QueueItem<Representation>>(queueSize);
+        BlockingQueue<QueueItem<IndexingError>> errorEntityQueue = 
+                new ArrayBlockingQueue<QueueItem<IndexingError>>(queueSize);
         
         //Set holding all active IndexingDaemons
         final SortedSet<IndexingDaemon<?,?>> activeIndexingDeamons = 
@@ -398,52 +605,21 @@ public class IndexerImpl implements Inde
         //(4) The daemon for logging finished entities
         activeIndexingDeamons.add(
             new FinishedEntityDaemon(
-                finishedEntityQueue, -1, log));
+                finishedEntityQueue, -1, log, indexedEntityIdOutputStream));
         //(5) The daemon for logging errors
         activeIndexingDeamons.add(
             new EntityErrorLoggerDaemon(
             errorEntityQueue, log));
-        //We need an listener for the IndexingDaemons we are about to start!
-        final IndexingDaemonListener listener = new IndexingDaemonListener() {
-            @Override
-            public void indexingDaemonFinished(IndexingDaemonEventObject indexingDaemonEventObject) {
-                //looks like one has finished
-                IndexingDaemon<?,?> indexingDaemon = indexingDaemonEventObject.getSource();
-                //handle the finished indexing daemon
-                handleFinishedIndexingDaemon(activeIndexingDeamons, indexingDaemon);
-                //finally remove the listener
-                indexingDaemon.removeIndexingDaemonListener(this);
-
-            }
-        };
-        //now start the IndexingDaemons in their own Threads
-        Set<IndexingDaemon<?,?>> deamonCopy = 
-            new HashSet<IndexingDaemon<?,?>>(activeIndexingDeamons);
-        for(IndexingDaemon<?,?> deamon : deamonCopy){
-            deamon.addIndexingDaemonListener(listener); //add the listener
-            Thread thread = new Thread(deamon);// create the thread
-            thread.setDaemon(true); //ensure that the JVM can terminate
-            thread.setName(deamon.getName()); // set the name of the thread
-            thread.start(); //start the Thread
-        }
-        //now we need to wait until all Threads have finished ...
-        while(!activeIndexingDeamons.isEmpty()){
-            synchronized (activeIndexingDeamons) {
-                try {
-                    activeIndexingDeamons.wait();
-                } catch (InterruptedException e) {
-                    //year ... looks like we are done
-                }
-            }
-        }
+        //start indexing and wait until it has finished
+        startAndWait(activeIndexingDeamons);
         //set the new state to INDEXED
         setState(State.INDEXED);
     }
     /**
      * Handles the necessary actions if an {@link IndexingDaemon} used for the
-     * work done within {@link #indexAllEntities()} completes its work (meaning
+     * work done within {@link #indexEntities()} completes its work (meaning
      * it has executed all entities).<p>
-     * The parsed SortedSet is created within  {@link #indexAllEntities()} and 
+     * The parsed SortedSet is created within  {@link #indexEntities()} and 
      * contains all {@link IndexingDaemon}s that have not yet finished. It is 
      * the responsibility of this method to remove finished 
      * {@link IndexingDaemon}s from this set.<p>
@@ -543,5 +719,70 @@ public class IndexerImpl implements Inde
     public State getState() {
         return state;
     }
+    /**
+     * Opens a stream to read data from the {@link #indexedEntityIdFile}. 
+     * Can only be called in {@link State}s earlier that {@link State#INDEXING}.
+     * @return the stream
+     * @throws IOException on any error while opening the stream
+     * @throws IllegalStateException if {@link #getState()} is later than
+     * {@link State#INITIALISED}
+     */
+    protected OutputStream getEntityIdFileOutputStream() throws IOException {
+        if(indexedEntityIdFile == null){
+            return null;
+        }
+        State state = getState();
+        if(state.ordinal() > State.INITIALISED.ordinal()){
+            throw new IllegalStateException("Opening an OutputStrem to the "
+                    + "indexed entity id file '"+indexedEntityIdFile+"' is not "
+                    + "allowed for states > "+State.INITIALISED +" (current: "+state+")!");
+        }
+        if(indexedEntityIdFile.isFile()){//exists
+            log.info(" ... delete existing IndexedEntityId file "+indexedEntityIdFile);
+            indexedEntityIdFile.delete(); //delete existing data
+        }
+        //support compression
+        String extension = FilenameUtils.getExtension(indexedEntityIdFile.getName());
+        OutputStream out = new FileOutputStream(indexedEntityIdFile);
+        if("zip".equalsIgnoreCase(extension)){
+            out = new ZipOutputStream(out);
+            ((ZipOutputStream)out).putNextEntry(new ZipEntry("entity-ids.txt"));
+        } else if("gz".equalsIgnoreCase(extension)) {
+            out = new GZIPOutputStream(out);
+        } else if("bz2".equalsIgnoreCase(extension)) {
+            out = new BZip2CompressorOutputStream(out);
+        }
+        return out;
+    }
+    /**
+     * Opens a stream to read data from the {@link #indexedEntityIdFile}. 
+     * Can only be called in {@link State}s later that {@link State#INDEXED}.
+     * @return the stream
+     * @throws IOException on any error while creating the stream
+     * @throws IllegalStateException if {@link #getState()} is earlier than
+     * {@link State#INDEXED}
+     */
+    protected InputStream getEntityIdFileInputStream() throws IOException {
+        if(indexedEntityIdFile == null){
+            return null;
+        }
+        State state = getState();
+        if(state.ordinal() < State.INDEXED.ordinal()){
+            throw new IllegalStateException("The indexed entity id data is not" +
+            		"available for states < "+State.INDEXED +" (current: "+state+")!");
+        }
+        //support compression
+        String extension = FilenameUtils.getExtension(indexedEntityIdFile.getName());
+        InputStream in = new FileInputStream(indexedEntityIdFile);
+        if("zip".equalsIgnoreCase(extension)){
+            in = new ZipInputStream(in);
+            ((ZipInputStream)in).getNextEntry();
+        } else if("gz".equalsIgnoreCase(extension)) {
+            in = new GZIPInputStream(in);
+        } else if("bz2".equalsIgnoreCase(extension)) {
+            in = new BZip2CompressorInputStream(in);
+        }
+        return in;
+    }
 
 }

Modified: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/processor/FieldValueFilter.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/processor/FieldValueFilter.java?rev=1330107&r1=1330106&r2=1330107&view=diff
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/processor/FieldValueFilter.java (original)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/processor/FieldValueFilter.java Wed Apr 25 05:12:14 2012
@@ -94,10 +94,10 @@ public class FieldValueFilter implements
         Object value = config.get(PARAM_FIELD);
         if(value == null || value.toString().isEmpty()){
             this.field = NamespaceEnum.getFullName(DEFAULT_FIELD);
-            log.info("Using default Field %s",field);
+            log.info("Using default Field {}",field);
         } else {
             this.field = NamespaceEnum.getFullName(DEFAULT_FIELD);
-            log.info("configured Field: %s",field);
+            log.info("configured Field: {}",field);
         }
         value = config.get(PARAM_VALUES);
         if(value == null || value.toString().isEmpty()){

Added: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/processor/LdpathPostProcessor.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/processor/LdpathPostProcessor.java?rev=1330107&view=auto
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/processor/LdpathPostProcessor.java (added)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/processor/LdpathPostProcessor.java Wed Apr 25 05:12:14 2012
@@ -0,0 +1,80 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.stanbol.entityhub.indexing.core.processor;
+
+import java.util.Map;
+
+import org.apache.stanbol.entityhub.indexing.core.EntityProcessor;
+import org.apache.stanbol.entityhub.indexing.core.IndexingDestination;
+import org.apache.stanbol.entityhub.ldpath.EntityhubLDPath;
+import org.apache.stanbol.entityhub.ldpath.backend.YardBackend;
+import org.apache.stanbol.entityhub.servicesapi.model.Representation;
+import org.apache.stanbol.entityhub.servicesapi.yard.Yard;
+
+import at.newmedialab.ldpath.api.backend.RDFBackend;
+
+/**
+ * Uses the {@link IndexingDestination#getYard()} as LDPath {@link RDFBackend}
+ * for the execution of configured LDPath statements.<p>
+ * <b>NOTE</b> in contrast to the {@link LdpathProcessor} this implementation
+ * is not limited to a subset of ldpath programs.<p>
+ * Typical use cases of this processor include:<ul>
+ * <li> indexing transitive closures (e.g. "
+ *   <code>skos:broaderTransitive = (skos:broader)*</code>")
+ * <li> collecting labels of referenced entities to be used for disambiguation
+ *   (e.g. use lables of linked concepts in a SKOS concept scheme : 
+ *   "<code> <urn:disambiguate.label> = *[rdf:type is skos:Concept]/(skos:prefLabel | skos:altLabel)<code>")
+ * <li> advanced indexing rules that need paths longer than one (e.g. adding
+ *   labels of redirects pointing to an entity 
+ *   "<code> rdfs:label = rdfs:label | (^rdfs:seeAlso/rdfs:label)</code>")
+ * </ul>
+ * <p>
+ * The focus on post-processing allows an easy configuration as the
+ * data source needs not to be configured, but is directly retrieved from
+ * the {@link IndexingDestination}. Note that this also means that if this 
+ * processor is not used in the post-processing state results are unpredictable
+ * as they will depend on the indexing order of the entities!
+ * 
+ * @author Rupert Westenthaler
+ *
+ */
+public class LdpathPostProcessor extends LdpathProcessor implements EntityProcessor {
+
+    @Override
+    public void setConfiguration(Map<String,Object> config) {
+        super.setConfiguration(config);
+    }
+
+    @Override
+    public boolean needsInitialisation() {
+        return true;
+    }
+
+    @Override
+    public void initialise() {
+        //override the ldpath instance used for the initialisation with
+        //the one using the IndexingDestination
+        //this is OK, because parsing ldpath programs anyway does only need
+        //the "value factory" role of the RDFBackend and does not actually
+        //access any data.
+        Yard yard = indexingConfig.getIndexingDestination().getYard();
+        YardBackend backend = new YardBackend(yard);
+        this.ldPath = new EntityhubLDPath(backend,yard.getValueFactory());
+    }
+
+
+}

Propchange: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/processor/LdpathPostProcessor.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/processor/LdpathProcessor.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/processor/LdpathProcessor.java?rev=1330107&r1=1330106&r2=1330107&view=diff
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/processor/LdpathProcessor.java (original)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/processor/LdpathProcessor.java Wed Apr 25 05:12:14 2012
@@ -58,13 +58,14 @@ public class LdpathProcessor implements 
      * By default appending of LDPath results to the parsed Representation is
      * activeted
      */
-    private static final boolean DEFAULT_APPEND_MODE = true;
+    public static final boolean DEFAULT_APPEND_MODE = true;
 
     private final ValueFactory vf;
-    private final EntityhubLDPath ldPath;
+    protected EntityhubLDPath ldPath;
     private final SingleRepresentationBackend backend;
     private Program<Object> program;
     private boolean appendMode;
+    protected IndexingConfig indexingConfig;
     
     public LdpathProcessor(){
         vf = InMemoryValueFactory.getInstance();
@@ -107,7 +108,7 @@ public class LdpathProcessor implements 
     
     @Override
     public void setConfiguration(Map<String,Object> config) {
-        IndexingConfig indexingConfig = (IndexingConfig)config.get(IndexingConfig.KEY_INDEXING_CONFIG);
+        indexingConfig = (IndexingConfig)config.get(IndexingConfig.KEY_INDEXING_CONFIG);
         //parse the ldpath
         final File ldpathFile;
         Object value = config.get(PARAMETER_LD_PATH);

Added: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/processor/LdpathSourceProcessor.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/processor/LdpathSourceProcessor.java?rev=1330107&view=auto
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/processor/LdpathSourceProcessor.java (added)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/processor/LdpathSourceProcessor.java Wed Apr 25 05:12:14 2012
@@ -0,0 +1,265 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.stanbol.entityhub.indexing.core.processor;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.net.URI;
+import java.nio.charset.Charset;
+import java.util.Collection;
+import java.util.Date;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.stanbol.entityhub.core.model.InMemoryValueFactory;
+import org.apache.stanbol.entityhub.indexing.core.EntityProcessor;
+import org.apache.stanbol.entityhub.indexing.core.config.IndexingConfig;
+import org.apache.stanbol.entityhub.ldpath.EntityhubLDPath.EntityhubConfiguration;
+import org.apache.stanbol.entityhub.ldpath.backend.SingleRepresentationBackend;
+import org.apache.stanbol.entityhub.servicesapi.model.Representation;
+import org.apache.stanbol.entityhub.servicesapi.model.ValueFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import at.newmedialab.ldpath.LDPath;
+import at.newmedialab.ldpath.api.backend.RDFBackend;
+import at.newmedialab.ldpath.api.transformers.NodeTransformer;
+import at.newmedialab.ldpath.exception.LDPathParseException;
+import at.newmedialab.ldpath.model.programs.Program;
+import at.newmedialab.ldpath.model.transformers.IdentityTransformer;
+import at.newmedialab.ldpath.parser.Configuration;
+
+/**
+ * LDpath based processor that tries to cast the 
+ * @author westei
+ *
+ */
+public class LdpathSourceProcessor implements EntityProcessor {
+
+    private final Logger log = LoggerFactory.getLogger(LdpathProcessor.class);
+    /**
+     * @see LdpathProcessor#PARAMETER_LD_PATH
+     */
+    public static final String PARAMETER_LD_PATH = LdpathProcessor.PARAMETER_LD_PATH;
+    /**
+     * @see LdpathProcessor#PARAMETER_APPEND
+     */
+    public static final String PARAMETER_APPEND = LdpathProcessor.PARAMETER_APPEND;
+    /**
+     * @see LdpathProcessor#DEFAULT_APPEND_MODE
+     */
+    public static final boolean DEFAULT_APPEND_MODE = LdpathProcessor.DEFAULT_APPEND_MODE;
+
+    /**
+     * ValueFactory used to create Representation
+     */
+    private final ValueFactory vf = InMemoryValueFactory.getInstance();
+    /**
+     * {@link LDPath} instance of an unknown generic type (depends on the 
+     * used Indexing source
+     */
+    @SuppressWarnings("rawtypes")
+    protected LDPath ldPath;
+    /**
+     * The RDF backend
+     */
+    @SuppressWarnings("rawtypes")
+    protected RDFBackend backend;
+    @SuppressWarnings("rawtypes")
+    protected Configuration configuration;
+    @SuppressWarnings("rawtypes")
+    private Map<String,NodeTransformer> transformer;
+    @SuppressWarnings("rawtypes")
+    private Program program;
+    /**
+     * If results are appended to the parsed Representation
+     */
+    private boolean appendMode;
+
+    /**
+     * The indexing configuration
+     */
+    protected IndexingConfig indexingConfig;
+    @Override
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    public void setConfiguration(Map<String,Object> config) {
+        indexingConfig = (IndexingConfig)config.get(IndexingConfig.KEY_INDEXING_CONFIG);
+        Object indexingSource;
+        //we need to check for both EntityDataProvider and EntityDataIterator
+        indexingSource = indexingConfig.getEntityDataProvider();
+        if(indexingSource == null){
+            indexingSource = indexingConfig.getDataInterable();
+        }
+        if(indexingSource == null){
+            throw new IllegalStateException("Indexing Configuration does not contain" +
+            		"neither an EntityDataProvider nor an EntityIdIterator!");
+        }
+        if(indexingSource instanceof RDFBackend<?>){
+            //NOTE we use the EntityhubConfiguration to have the same pre-registered
+            //     namespaces as the other components.
+            this.backend = (RDFBackend)indexingSource;
+            this.configuration = new EntityhubConfiguration(vf);
+            this.transformer = configuration.getTransformers();
+            this.ldPath = new LDPath(backend,configuration);
+        } else {
+            throw new IllegalArgumentException("The configured IndexingSource '"
+                    + indexingSource.getClass().getSimpleName()+"' does not support "
+                    + "LDPath (does not implement RDFBackend)! This Processor "
+                    + "can only be used with IndexingSources that support LDPath!");
+        }
+        Object value = config.get(PARAMETER_LD_PATH);
+        final File ldpathFile;
+        if(value != null && !value.toString().isEmpty()){
+            ldpathFile = indexingConfig.getConfigFile(value.toString());
+            if(ldpathFile == null || !ldpathFile.exists()){
+                throw new IllegalArgumentException("Configured '"
+                        + PARAMETER_LD_PATH +"' file was not found!");
+            }
+            if(!ldpathFile.isFile()){
+                throw new IllegalArgumentException("Configured '"
+                        + PARAMETER_LD_PATH +"' file exists but is not a File!");
+            }
+        } else {
+            throw new IllegalArgumentException("Missing required configuration '"
+                + PARAMETER_LD_PATH +"' - the file containing the LDPath program used by this "
+                + LdpathProcessor.class.getSimpleName()+"!");
+        }
+        //The backend needs not to be initialised to parse a program as
+        //parsing only requires the "value converter" methods that need also to
+        //work without initialising
+        //if this is a Problem one can also move parsing to the init method
+        parseLdPathProgram(ldpathFile);
+        value = config.get(PARAMETER_APPEND);
+        if(value instanceof Boolean){
+            this.appendMode = ((Boolean) value).booleanValue();
+        } else if(value != null && !value.toString().isEmpty()){
+            this.appendMode = Boolean.parseBoolean(value.toString());
+        } else {
+            this.appendMode = DEFAULT_APPEND_MODE;
+        }
+    }
+
+    /**
+     * 
+     */
+    @SuppressWarnings("unchecked")
+    private void parseLdPathProgram(File ldpathFile) {
+        Reader in = null;
+        try {
+            in = new InputStreamReader(new FileInputStream(ldpathFile), Charset.forName("UTF-8"));
+            this.program = ldPath.parseProgram(in);
+            log.info("ldpath program: \n{}\n",program.getPathExpression(backend));
+        } catch (IOException e) {
+            throw new IllegalStateException("Unabwle to read LDPath program from configured file '"
+                + ldpathFile +"'!",e);
+        } catch (LDPathParseException e) {
+            throw new IllegalStateException("Unable to parse LDPath program from configured file '"
+                    + ldpathFile +"'!",e);
+        } finally {
+            IOUtils.closeQuietly(in);
+        }
+    }
+
+    @Override
+    public boolean needsInitialisation() {
+        return false;
+    }
+
+    @Override
+    public void initialise() {
+
+    }
+
+    @Override
+    public void close() {
+
+    }
+
+    @SuppressWarnings({"unchecked","rawtypes"})
+    @Override
+    public Representation process(Representation source) {
+        Object context = backend.createURI(source.getId());
+        Representation result  = appendMode ? source : vf.createRepresentation(source.getId());
+        /*
+         * NOTE: LDPath will return Node instances of the RDFRepositroy if no
+         * transformation is defined for a statement (line) in the configured
+         * LDpath program (the ":: xsd:int" at the end). this Nodes need to be
+         * converted to valid Entityhub Representation values.
+         * As we can not know the generic type used by the RDFRepository
+         * implementation of the indexing source this is a little bit tricky.
+         * What this does is:
+         *   - for URIs it creates References
+         *   - for plain literal it adds natural texts
+         *   - for typed literals it uses the NodeTransformer registered with 
+         *     the LDPath (or more precise the Configuration object parsed to 
+         *     the LDPath in the constructor) to transform the values to
+         *     Java objects. If no transformer is found or an Exeption occurs
+         *     than the lexical form is used and added as String to the 
+         *     Entityhub.
+         */
+        Map<String,Collection<Object>> resultMap = (Map<String,Collection<Object>>)program.execute(backend, context);
+        for(Entry<String,Collection<Object>> entry : resultMap.entrySet()){
+            NodeTransformer fieldTransformer = program.getField(entry.getKey()).getTransformer();
+            if(fieldTransformer == null || fieldTransformer instanceof IdentityTransformer<?>){
+                //we need to convert the RDFBackend Node to an Representation object
+                for(Object value : entry.getValue()){
+                    if(backend.isURI(value)){
+                        result.addReference(entry.getKey(), backend.stringValue(value));
+                    } else if(backend.isLiteral(value)){ //literal
+                        Locale lang = backend.getLiteralLanguage(value);
+                        if(lang != null){ //text with language
+                            result.addNaturalText(entry.getKey(), backend.stringValue(value), lang.getLanguage());
+                        } else { // no language
+                            URI type = backend.getLiteralType(value);
+                            if(type != null){ //typed literal -> need to transform
+                                NodeTransformer nt = transformer.get(type.toString());
+                                if(nt != null){ //add typed literal
+                                    try {
+                                        result.add(entry.getKey(), nt.transform(backend, value));
+                                    } catch (RuntimeException e) {
+                                       log.info("Unable to transform {} to dataType {} -> will use lexical form",value,type);
+                                       result.add(entry.getKey(),backend.stringValue(value));
+                                    }
+                                } else { //no transformer
+                                    log.info("No transformer for type {} -> will use lexical form",type);
+                                    result.add(entry.getKey(),backend.stringValue(value));
+                                    
+                                }
+                            } else { //no langauge and no type -> literal with no language
+                                result.addNaturalText(entry.getKey(), backend.stringValue(value));
+                            }
+                        }
+                    } else { //bNode
+                        log.info("Ignore bNode {} (class: {})",value,value.getClass());
+                    }
+                } //end for all values
+            } else { //already a transformed values
+                result.add(entry.getKey(), entry.getValue()); //just add all values
+            }
+        }
+        return result;
+    }
+
+}

Propchange: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/processor/LdpathSourceProcessor.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/source/LineBasedEntityIterator.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/source/LineBasedEntityIterator.java?rev=1330107&r1=1330106&r2=1330107&view=diff
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/source/LineBasedEntityIterator.java (original)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/source/LineBasedEntityIterator.java Wed Apr 25 05:12:14 2012
@@ -223,12 +223,14 @@ public class LineBasedEntityIterator imp
             log.info("activate URL decoding of Entity IDs");
         }
         value = config.get(PARAM_ENTITY_SCORE_FILE);
-        if(value == null || value.toString().isEmpty()){
-            scoreFile = indexingConfig.getSourceFile(DEFAULT_ENTITY_SCORE_FILE);
-        } else {
-            scoreFile = indexingConfig.getSourceFile(value.toString());
-        }
-        log.info("Set Source File to '"+this.scoreFile+"'");
+        if(reader == null){
+            if(value == null || value.toString().isEmpty()){
+                scoreFile = indexingConfig.getSourceFile(DEFAULT_ENTITY_SCORE_FILE);
+            } else {
+                scoreFile = indexingConfig.getSourceFile(value.toString());
+            }
+            log.info("Set Source File to '"+this.scoreFile+"'");
+        } //else reader parsed in the constructor ... nothing todo
         //now done in the initialise() method
 //        try {
 //            initReader(new FileInputStream(scoreFile));
@@ -341,7 +343,7 @@ public class LineBasedEntityIterator imp
         try {
             while((line = reader.readLine()) != null){
                 lineCounter++;
-                log.debug("> line = {}");
+                log.debug("> line = {}",line);
                 EntityScore entity = parseEntityFormLine(line);
                 if(entity != null){
                     return entity;