You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stanbol.apache.org by rw...@apache.org on 2011/04/29 11:20:38 UTC

svn commit: r1097740 [3/10] - in /incubator/stanbol/trunk: entityhub/ entityhub/generic/core/src/main/java/org/apache/stanbol/entityhub/core/mapping/ entityhub/generic/servicesapi/src/main/java/org/apache/stanbol/entityhub/servicesapi/mapping/ entityhu...

Added: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/IndexerImpl.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/IndexerImpl.java?rev=1097740&view=auto
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/IndexerImpl.java (added)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/IndexerImpl.java Fri Apr 29 09:20:31 2011
@@ -0,0 +1,530 @@
+package org.apache.stanbol.entityhub.indexing.core.impl;
+import static org.apache.stanbol.entityhub.indexing.core.impl.IndexerConstants.INDEXING_COMPLETED_QUEUE_ITEM;
+import static org.apache.stanbol.entityhub.indexing.core.impl.IndexerConstants.SCORE_FIELD;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.stanbol.entityhub.indexing.core.EntityDataIterable;
+import org.apache.stanbol.entityhub.indexing.core.EntityDataIterator;
+import org.apache.stanbol.entityhub.indexing.core.EntityDataProvider;
+import org.apache.stanbol.entityhub.indexing.core.EntityIterator;
+import org.apache.stanbol.entityhub.indexing.core.EntityProcessor;
+import org.apache.stanbol.entityhub.indexing.core.EntityScoreProvider;
+import org.apache.stanbol.entityhub.indexing.core.Indexer;
+import org.apache.stanbol.entityhub.indexing.core.IndexingComponent;
+import org.apache.stanbol.entityhub.indexing.core.IndexingDestination;
+import org.apache.stanbol.entityhub.indexing.core.event.IndexingEvent;
+import org.apache.stanbol.entityhub.indexing.core.event.IndexingListener;
+import org.apache.stanbol.entityhub.indexing.core.impl.IndexingDaemon.IndexingDaemonEventObject;
+import org.apache.stanbol.entityhub.indexing.core.impl.IndexingDaemon.IndexingDaemonListener;
+import org.apache.stanbol.entityhub.indexing.core.impl.IndexingSourceInitialiser.IndexingSourceEventObject;
+import org.apache.stanbol.entityhub.indexing.core.impl.IndexingSourceInitialiser.IndexingSourceInitialiserListener;
+import org.apache.stanbol.entityhub.indexing.core.normaliser.ScoreNormaliser;
+import org.apache.stanbol.entityhub.indexing.core.processor.EmptyProcessor;
+import org.apache.stanbol.entityhub.servicesapi.model.Representation;
+import org.apache.stanbol.entityhub.servicesapi.yard.Yard;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+/**
+ * Abstract Implementation of the Indexer. <p>
+ * Principally there are two ways how to index entities <ol>
+ * <li> Iterate of the the entityIds/scores by using an {@link EntityIterator}
+ * and lookup the data by using an {@link EntityDataProvider}.
+ * <li> Iterate over the data by using an {@link EntityDataIterator} (provided
+ * by an {@link EntityDataIterable}) and lookup/calculate the scores by using an 
+ * {@link EntityScoreProvider}.
+ * </ol>
+ * This Implementation provides a static createInstance(..) method for each of
+ * the two variants.<p>
+ * After the Entities are loaded from the source they are processed by using the
+ * configured {@link EntityProcessor}. Finally the processed entities are
+ * persisted in a {@link Yard}. 
+ * @author Rupert Westenthaler
+ *
+ */
+public class IndexerImpl implements Indexer {
+    //protected to allow internal classes direct access!
+    protected static final Logger log = LoggerFactory.getLogger(IndexerImpl.class);
+
+    /**
+     * Holds the indexing listener
+     */
+    private final Set<IndexingListener> listeners;
+
+    public static final int DEFAULT_CHUNK_SIZE = 10;
+    private int chunkSize;
+    public static final int MIN_QUEUE_SIZE = 500;
+    /**
+     * Queue used to add Entities read from the IndexingSource(s). This queue
+     * is consumed by the {@link EntityProcessorRunnable}.
+     */
+    private BlockingQueue<QueueItem<Representation>> indexedEntityQueue;
+    /**
+     * Queue used to add processed Entities. This queue is consumed by the
+     * {@link EntityPersisterRunnable}.
+     */
+    private BlockingQueue<QueueItem<Representation>> processedEntityQueue;
+    /**
+     * Queue used to add finished Entities. Mainly used for counting and
+     * logging
+     */
+    private BlockingQueue<QueueItem<Representation>> finishedEntityQueue;
+
+    private BlockingQueue<QueueItem<IndexingError>> errorEntityQueue;
+    
+    private boolean indexAllEntitiesState = false;
+    
+    //entityMode
+    private EntityIterator entityIterator;
+    private EntityDataProvider dataProvider;
+    //dataMode
+    private EntityDataIterable dataIterable;
+    private EntityScoreProvider scoreProvider;
+    private final Collection<IndexingComponent> indexingComponents;
+    
+    private final IndexingDestination indexingDestination;
+    private final EntityProcessor entityProcessor;
+    private final ScoreNormaliser scoreNormaliser;
+    
+    private State state = State.UNINITIALISED;
+    private final Object stateSync = new Object();
+    
+    public IndexerImpl(EntityIterator entityIterator,
+                       EntityDataProvider dataProvider,
+                       ScoreNormaliser normaliser,
+                       IndexingDestination indexingDestination, 
+                       EntityProcessor entityProcessor){
+        this(normaliser,indexingDestination,entityProcessor);
+        //set entityMode interfaces
+        if(entityIterator == null){
+            throw new IllegalArgumentException("The EntityIterator MUST NOT be NULL!");
+        }
+        this.entityIterator = entityIterator;
+        this.dataProvider = dataProvider;
+        //add the parsed indexingSources to the list
+        this.indexingComponents.add(entityIterator);
+        this.indexingComponents.add(dataProvider);
+    }
+    public IndexerImpl(EntityDataIterable dataIterable, 
+                       EntityScoreProvider scoreProvider, 
+                       ScoreNormaliser normaliser,
+                       IndexingDestination indexingDestination, 
+                       EntityProcessor entityProcessor){
+        this(normaliser,indexingDestination,entityProcessor);
+        //deactivate entityMode interfaces
+        this.entityIterator = null;
+        if(scoreProvider == null){
+            throw new IllegalArgumentException("The EntityScoreProvider MUST NOT be NULL!");
+        }
+        this.scoreProvider = scoreProvider;
+        this.dataIterable = dataIterable;
+        //add the parsed indexingSources to the list
+        this.indexingComponents.add(scoreProvider);
+        this.indexingComponents.add(dataIterable);
+    }
+    
+    protected IndexerImpl(ScoreNormaliser normaliser,
+                          IndexingDestination indexingDestination, 
+                          EntityProcessor entityProcessor){
+        if(indexingDestination == null){
+            throw new IllegalArgumentException("The Yard MUST NOT be NULL!");
+        }
+        this.indexingDestination = indexingDestination;
+        if(entityProcessor == null){
+            this.entityProcessor = new EmptyProcessor();
+        } else {
+            this.entityProcessor = entityProcessor;
+        }
+        setChunkSize(DEFAULT_CHUNK_SIZE); //init the chunk size and the cache
+        this.scoreNormaliser = normaliser;
+        indexingComponents = new ArrayList<IndexingComponent>();
+        indexingComponents.add(indexingDestination);
+        indexingComponents.add(entityProcessor);
+        listeners = new HashSet<IndexingListener>();
+    }
+    public boolean addIndexListener(IndexingListener listener){
+        if(listener != null){
+            synchronized (listeners) {
+                return listeners.add(listener);
+            }
+        } else {
+            return false;
+        }
+    }
+    public boolean removeIndexListener(IndexingListener listener){
+        if(listener != null){
+            synchronized (listeners) {
+                return listeners.remove(listener);
+            }
+        } else {
+            return false;
+        }
+    }
+    protected void fireStateChanged(){
+        IndexingEvent event = new IndexingEvent(this);
+        Collection<IndexingListener> copy = new ArrayList<IndexingListener>(listeners.size());
+        synchronized (listeners) {
+            copy.addAll(listeners);
+        }        
+        for(IndexingListener listener : copy){
+            listener.stateChanged(event);
+            //if the state is finished also send the completed event
+            if(getState() == State.FINISHED){
+                listener.indexingCompleted(event);
+            }
+        }
+    }
+    
+    /* (non-Javadoc)
+     * @see org.apache.stanbol.entityhub.indexing.core.IndexerInterface#setChunkSize(int)
+     */
+    public void setChunkSize(int chunkSize) throws IllegalStateException {
+        if(getState().ordinal() >= State.INDEXING.ordinal()){
+            throw new IllegalStateException("Setting the chunkSize is only allowed before starting the indexing process!");
+        }
+        if(chunkSize <= 0){
+            chunkSize = DEFAULT_CHUNK_SIZE;
+        }
+        this.chunkSize = chunkSize;
+    }
+    /* (non-Javadoc)
+     * @see org.apache.stanbol.entityhub.indexing.core.IndexerInterface#getChunkSize()
+     */
+    public int getChunkSize() {
+        return chunkSize;
+    }
+    /* (non-Javadoc)
+     * @see org.apache.stanbol.entityhub.indexing.core.IndexerInterface#getYard()
+     */
+    public Yard getYard() {
+        return indexingDestination.getYard();
+    }
+    @Override
+    public void initialiseIndexingSources() {
+        synchronized (stateSync) { //ensure that two threads do not start the
+            //initialisation at the same time ...
+            if(getState() != State.UNINITIALISED){
+                return;
+            }
+            setState(State.INITIALISING);
+            log.info("Initialisation started ...");
+        }
+        //add all IndexingSources that need to be initialised to a set
+        final Collection<IndexingComponent> toInitialise = new HashSet<IndexingComponent>();
+        //we need an simple listener that removes the IndexingSerouces from the
+        //above list
+        final IndexingSourceInitialiserListener listener = new IndexingSourceInitialiserListener() {
+            @Override
+            public void indexingSourceInitialised(IndexingSourceEventObject eventObject) {
+                //remove the IndexingSource from the toInitialise set
+                synchronized (toInitialise) {
+                    toInitialise.remove(eventObject.getIndexingSource());
+                    if(toInitialise.isEmpty()){ //if no more left to initialise
+                        //notify others about it
+                        toInitialise.notifyAll();
+                    }
+                }
+                //finally remove this listener
+                eventObject.getSource().removeIndexingSourceInitialiserListener(this);
+            }
+        };
+        //now create the IndexingSourceInitialiser that initialise the
+        //Indexing Sources in their own Thread
+        for(IndexingComponent source : indexingComponents){
+            if(source.needsInitialisation()){ //if it need to be initialised
+                toInitialise.add(source); // add it to the list
+                //create an initialiser
+                IndexingSourceInitialiser initialiser = new IndexingSourceInitialiser(source);
+                //add the listener
+                initialiser.addIndexingSourceInitialiserListener(listener);
+                //create and init the Thread
+                Thread thread = new Thread(initialiser);
+                thread.setDaemon(true);
+                thread.start();
+            } //else no initialisation is needed
+        }
+        //now wait until all IndexingSources are initialised!
+        while(!toInitialise.isEmpty()){
+            synchronized (toInitialise) {
+                try {
+                    toInitialise.wait();
+                } catch (InterruptedException e) {
+                    //year looks like all IndexingSources are initialised!
+                }
+            }
+        }
+        log.info("Initialisation completed");
+        setState(State.INITIALISED);
+    }
+    /* (non-Javadoc)
+     * @see org.apache.stanbol.entityhub.indexing.core.IndexerInterface#index()
+     */
+    public void index(){
+        Set<State> supportedStates = EnumSet.of(
+            State.UNINITIALISED,State.INITIALISED,State.INDEXED,State.FINISHED);
+        //this is only used to inform about wrong usage. It does not ensure
+        //that index is called twice by different threads. This check is done
+        //within the initialise, index and finalise methods!
+        State state = getState();
+        if(!supportedStates.contains(state)){
+            throw new IllegalStateException(String.format(
+                "Calling this Method is not supported while in State %s! Supported States are ",
+                state,supportedStates));
+        }
+        initialiseIndexingSources();
+        //if now the state is an unsupported one it indicates that
+        //initialiseIndexingSources() was called by an other thread before this one!
+        state = getState(); 
+        if(!supportedStates.contains(state)){
+            throw new IllegalStateException(String.format(
+                "Calling this Method is not supported while in State %s! Supported States are ",
+                state,supportedStates));
+        }
+        log.info("Start Indexing");
+        indexAllEntities();
+        log.info("Indexing completed ...");
+        log.info("start finalisation....");
+        finaliseIndexingTarget();
+        log.info("Indexing finished!");
+    }
+    @Override
+    public void finaliseIndexingTarget() {
+        synchronized (stateSync) { //ensure that two threads do not start the
+            //initialisation at the same time ...
+            State state = getState();
+            if(state.ordinal() < State.INDEXED.ordinal()){
+                throw new IllegalStateException("The Indexer MUST BE already "+State.INDEXED+" when calling this Method!");
+            }
+            if(state != State.INDEXED){ //if state > INITIALISED
+                return; // ignore this call
+            }
+            setState(State.FINALISING);
+            log.info("Indexing started ...");
+        }
+        indexingDestination.finalise();
+        setState(State.FINISHED);
+    }
+    @Override
+    public void indexAllEntities() {
+        synchronized (stateSync) { //ensure that two threads do not start the
+            //initialisation at the same time ...
+            State state = getState();
+            if(state.ordinal() < State.INITIALISED.ordinal()){
+                throw new IllegalStateException("The Indexer MUST BE already "+State.INITIALISED+" when calling this Method!");
+            }
+            if(state != State.INITIALISED){ //if state > INITIALISED
+                return; // ignore this call
+            }
+            setState(State.INDEXING);
+            log.info("Indexing started ...");
+        }
+        //init the queues
+        int queueSize = Math.max(MIN_QUEUE_SIZE, chunkSize*2);
+        indexedEntityQueue = new ArrayBlockingQueue<QueueItem<Representation>>(queueSize);
+        processedEntityQueue = new ArrayBlockingQueue<QueueItem<Representation>>(queueSize);
+        finishedEntityQueue = new ArrayBlockingQueue<QueueItem<Representation>>(queueSize);
+        errorEntityQueue = new ArrayBlockingQueue<QueueItem<IndexingError>>(queueSize);
+        
+        //Set holding all active IndexingDaemons
+        final SortedSet<IndexingDaemon<?,?>> activeIndexingDeamons = 
+            new TreeSet<IndexingDaemon<?,?>>();
+        //create the IndexingDaemos
+        //TODO: Here we would need to create multiple instances in case
+        //      one would e.g. like to use several threads for processing entities
+        //(1) the daemon reading from the IndexingSources
+        String entitySourceReaderName = "Indexing: Entity Source Reader Deamon";
+        if(entityIterator != null){
+            activeIndexingDeamons.add(
+                new EntityIdBasedIndexingDaemon(
+                    entitySourceReaderName,
+                    indexedEntityQueue, errorEntityQueue, 
+                    entityIterator, 
+                    dataProvider, 
+                    scoreNormaliser,
+                    indexAllEntitiesState));
+        } else {
+            activeIndexingDeamons.add(
+                new EntityDataBasedIndexingDaemon(
+                    entitySourceReaderName,
+                    indexedEntityQueue, errorEntityQueue, 
+                    dataIterable, 
+                    scoreProvider, 
+                    scoreNormaliser,
+                    indexAllEntitiesState));
+        }
+        //(2) The daemon for processing the entities
+        activeIndexingDeamons.add(
+            new EntityProcessorRunnable(
+                "Indexing: Entity Processor Deamon",
+                indexedEntityQueue, //it consumes indexed Entities
+                processedEntityQueue,  //it produces processed Entities
+                errorEntityQueue,
+                entityProcessor, 
+                Collections.singleton(SCORE_FIELD)));
+        //(3) The daemon for persisting the entities
+        activeIndexingDeamons.add(
+            new EntityPersisterRunnable(
+                "Indexing: Entity Perstisting Deamon",
+                processedEntityQueue, //it consumes processed Entities
+                finishedEntityQueue, //it produces finished Entities
+                errorEntityQueue,
+                chunkSize, indexingDestination.getYard()));
+        //(4) The daemon for logging finished entities
+        activeIndexingDeamons.add(
+            new FinishedEntityDaemon(
+                finishedEntityQueue, -1, log));
+        //(5) The daemon for logging errors
+        activeIndexingDeamons.add(
+            new EntityErrorLoggerDaemon(
+            errorEntityQueue, log));
+        //We need an listener for the IndexingDaemons we are about to start!
+        final IndexingDaemonListener listener = new IndexingDaemonListener() {
+            @Override
+            public void indexingDaemonFinished(IndexingDaemonEventObject indexingDaemonEventObject) {
+                //looks like one has finished
+                IndexingDaemon<?,?> indexingDaemon = indexingDaemonEventObject.getSource();
+                //handle the finished indexing daemon
+                handleFinishedIndexingDaemon(activeIndexingDeamons, indexingDaemon);
+                //finally remove the listener
+                indexingDaemon.removeIndexingDaemonListener(this);
+
+            }
+        };
+        //now start the IndexingDaemons in their own Threads
+        Set<IndexingDaemon<?,?>> deamonCopy = 
+            new HashSet<IndexingDaemon<?,?>>(activeIndexingDeamons);
+        for(IndexingDaemon<?,?> deamon : deamonCopy){
+            deamon.addIndexingDaemonListener(listener); //add the listener
+            Thread thread = new Thread(deamon);// create the thread
+            thread.setDaemon(true); //ensure that the JVM can terminate
+            thread.setName(deamon.getName()); // set the name of the thread
+            thread.start(); //start the Thread
+        }
+        //now we need to wait until all Threads have finished ...
+        while(!activeIndexingDeamons.isEmpty()){
+            synchronized (activeIndexingDeamons) {
+                try {
+                    activeIndexingDeamons.wait();
+                } catch (InterruptedException e) {
+                    //year ... looks like we are done
+                }
+            }
+        }
+        //set the new state to INDEXED
+        setState(State.INDEXED);
+    }
+    /**
+     * Handles the necessary actions if an {@link IndexingDaemon} used for the
+     * work done within {@link #indexAllEntities()} completes its work (meaning
+     * it has executed all entities).<p>
+     * The parsed SortedSet is created within  {@link #indexAllEntities()} and 
+     * contains all {@link IndexingDaemon}s that have not yet finished. It is 
+     * the responsibility of this method to remove finished 
+     * {@link IndexingDaemon}s from this set.<p>
+     * In addition this Method is responsible to {@link BlockingQueue#put(Object)}
+     * the {@link IndexerConstants#INDEXING_COMPLETED_QUEUE_ITEM} to the
+     * consuming queue of {@link IndexingDaemon}s as soon as all 
+     * {@link IndexingDaemon}s of the previous indexing steps have finished.
+     * This is checked comparing the {@link IndexingDaemon#getSequence()} number
+     * of the first entry within the sorted activeIndexingDeamons set.
+     * If the sequence number of the first element has increased after the
+     * finished {@link IndexingDaemon} was removed this Method puts the
+     * {@link IndexerConstants#INDEXING_COMPLETED_QUEUE_ITEM} item to the
+     * consuming queue of the new first entry of activeIndexingDeamons. 
+     * @param activeIndexingDeamons The SortedSet containing all 
+     * {@link IndexingDaemon}s that are still active.
+     * @param indexingDaemon the {@link IndexingDaemon} that completed its work.
+     */
+    private void handleFinishedIndexingDaemon(final SortedSet<IndexingDaemon<?,?>> activeIndexingDeamons,
+                                              IndexingDaemon<?,?> indexingDaemon) {
+        log.info("{} completed (sequence={}) ... ",
+            indexingDaemon.getName(), indexingDaemon.getSequence());
+        IndexingDaemon<?,?> sendEndofQueue = null;
+        synchronized (activeIndexingDeamons) {
+            if(log.isDebugEnabled()){
+                log.info(" Active Indexing Deamons:");
+                for(IndexingDaemon<?,?> active : activeIndexingDeamons){
+                    log.info(" > {} {}",active.getSequence(),active.getName());
+                }
+            }
+            //get the SequenceNumber of the first Element
+            if(!activeIndexingDeamons.isEmpty()){
+                Integer sequenceNumber = activeIndexingDeamons.first().getSequence();
+                log.info(" > current sequence : {}",sequenceNumber);
+                //try to remove it from the activeDeamons list
+                activeIndexingDeamons.remove(indexingDaemon);
+                if(activeIndexingDeamons.isEmpty()){ //if no active is left
+                    log.debug("  - indexingDeamons list now emoty ... notifyAll to indicate indexing has completed!");
+                    activeIndexingDeamons.notifyAll(); //notify all others!
+                } else { //check new SequenceNumber
+                    IndexingDaemon<?,?> first = activeIndexingDeamons.first();
+                    if(sequenceNumber.compareTo(first.getSequence()) < 0){
+                        log.info(" > new sequence: {}",first.getSequence());
+                        //sequence number increased -> 
+                        // ... all Daemons for the step have completed
+                        // ... send EndOfQueue
+                        // ... but outside of the synchronized block!
+                        sendEndofQueue = first;
+                    }
+                }
+            } //already empty ... nothing todo
+        }
+        if(sendEndofQueue != null){ //send endOfQueue
+            //to the consuming Queue of this one
+            try {
+                //ignore the Type safety because the item is of
+                //INDEXING_COMPLETED_QUEUE_ITEM is anyway null
+                log.info("Send end-of-queue to Deamons with Sequence "+sendEndofQueue.getSequence());
+                sendEndofQueue.getConsumeQueue().put(INDEXING_COMPLETED_QUEUE_ITEM);
+            } catch (InterruptedException e) {
+                log.error("Interupped while sending EnodOfQueue Item to consuming queue of "+sendEndofQueue.getName(),e);
+            }
+        }
+    }        
+    /* (non-Javadoc)
+     * @see org.apache.stanbol.entityhub.indexing.core.IndexerInterface#setIndexAllEntitiesState(boolean)
+     */
+    public void setIndexAllEntitiesState(boolean indexAllEntitiesState) {
+        this.indexAllEntitiesState = indexAllEntitiesState;
+    }
+    /* (non-Javadoc)
+     * @see org.apache.stanbol.entityhub.indexing.core.IndexerInterface#isIndexAllEntitiesState()
+     */
+    public boolean isIndexAllEntitiesState() {
+        return indexAllEntitiesState;
+    }
+    /**
+     * Setter for the state. <p>
+     * Implementation Note: This setter is synchronised to the sync object for
+     * the state
+     * @param state the state to set
+     */
+    private void setState(State state) {
+        boolean changed;
+        synchronized (stateSync) {
+            changed = state != this.state;
+            if(changed){
+                this.state = state;
+            }
+        }
+        if(changed){ //do not fire events within synchronized blocks ...
+            fireStateChanged();
+        }
+    }
+    /* (non-Javadoc)
+     * @see org.apache.stanbol.entityhub.indexing.core.IndexerInterface#getState()
+     */
+    public State getState() {
+        return state;
+    }
+
+}

Propchange: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/IndexerImpl.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/IndexingDaemon.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/IndexingDaemon.java?rev=1097740&view=auto
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/IndexingDaemon.java (added)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/IndexingDaemon.java Fri Apr 29 09:20:31 2011
@@ -0,0 +1,226 @@
+/**
+ * 
+ */
+package org.apache.stanbol.entityhub.indexing.core.impl;
+
+import static org.apache.stanbol.entityhub.indexing.core.impl.IndexerConstants.ERROR_TIME;
+import static org.apache.stanbol.entityhub.indexing.core.impl.IndexerConstants.INDEXING_COMPLETED_QUEUE_ITEM;
+
+import java.util.Collections;
+import java.util.EventObject;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class IndexingDaemon<CI,PI> implements Comparable<IndexingDaemon<?,?>>,Runnable {
+    protected final Logger log;
+    private boolean queueFinished = false;
+    private final BlockingQueue<QueueItem<CI>> consume;
+    private final BlockingQueue<QueueItem<PI>> produce;
+    private final BlockingQueue<QueueItem<IndexingError>> error;
+    private boolean finisehd = false;
+    private final Set<IndexingDaemonListener> listeners;
+    /**
+     * Typically used to set the name of the {@link Thread} running this Runnable
+     */
+    private final String name;
+    /**
+     * Used for {@link #compareTo(IndexingDaemon)}
+     */
+    private final Integer sequence;
+    protected IndexingDaemon(String name,
+                             Integer sequence,
+                             BlockingQueue<QueueItem<CI>> consume,
+                             BlockingQueue<QueueItem<PI>> produce,
+                             BlockingQueue<QueueItem<IndexingError>> error){
+        if(name == null || name.isEmpty()){
+            this.name = getClass().getSimpleName()+" Deamon";
+        } else {
+            this.name = name;
+        }
+        if(sequence == null){
+            this.sequence = Integer.valueOf(0);
+        } else {
+            this.sequence = sequence;
+        }
+        this.consume = consume;
+        this.produce = produce;
+        this.error = error;
+        //get the logger for the actual implementation
+        this.log = LoggerFactory.getLogger(getClass());
+        this.listeners = Collections.synchronizedSet(
+            new HashSet<IndexingDaemonListener>());
+        
+    }
+    protected final void sendError(String entityId, String message, Exception ex){
+        if(entityId == null){
+            return;
+        } else {
+            putError(new QueueItem<IndexingError>(
+                    new IndexingError(entityId, message, ex)));
+        }
+    }
+    protected final void sendError(String entityId,QueueItem<?> item, String message, Exception ex){
+        if(entityId == null){
+            return;
+        }
+        putError((new QueueItem<IndexingError>(
+                new IndexingError(entityId, message, ex)
+                ,item)));
+    }
+    private void putError(QueueItem<IndexingError> errorItem){
+        if(error == null){
+            log.warn("Unable to process Error because Error Queue is NULL!");
+        }
+        Long errorTime = Long.valueOf(System.currentTimeMillis());
+        errorItem.setProperty(ERROR_TIME, errorTime);
+        try {
+            error.put(errorItem);
+        } catch (InterruptedException e) {
+            log.error("Interupped while sending an Error for Entity "+errorItem.getItem().getEntity());
+        }
+
+    }
+    protected final void produce(QueueItem<PI> item){
+        if(produce == null){
+            log.warn("Unable to produce Items because produce queue is NULL!");
+        }
+        if(item != null){
+            try {
+                produce.put(item);
+            } catch (InterruptedException e) {
+                log.error("Interupped while producing item "+item.getItem(), e);
+            }
+        }
+    }
+    protected final QueueItem<CI> consume(){
+        if(queueFinished){
+            return null;
+        }
+        if(consume == null){
+            log.warn("Unable to consume items because consume queue is NULl!");
+        }
+        try {
+            QueueItem<CI> consumed = consume.take();
+            if(consumed == INDEXING_COMPLETED_QUEUE_ITEM){
+                queueFinished = true;
+                consume.put(consumed); //put it back to the list
+                return null;
+            } else {
+                return consumed;
+            }
+        } catch (InterruptedException e) {
+            log.error("Interupped while consuming -> return null");
+            return null;
+        }
+    }
+    /**
+     * @return the queueFinished
+     */
+    protected final boolean isQueueFinished() {
+        return queueFinished;
+    }
+    /**
+     * Method has to be called by the subclasses to signal that this Runnable
+     * has finished. It will set {@link #finished()} to <code>true</code>
+     */
+    protected final void setFinished(){
+        this.finisehd = true;
+        //tell listener that his one has finished!
+        fireIndexingDaemonEvent();
+        
+    }
+    public final boolean finished(){
+        return finisehd;
+    }
+    public final boolean addIndexingDaemonListener(IndexingDaemonListener listener){
+        if(listener != null){
+            return listeners.add(listener);
+        } else {
+            return false;
+        }
+    }
+    public boolean removeIndexingDaemonListener(IndexingDaemonListener listener){
+        if(listener != null){
+            return listeners.remove(listener);
+        } else {
+            return false;
+        }
+    }
+    public void fireIndexingDaemonEvent(){
+        Set<IndexingDaemonListener> copy;
+        synchronized (listeners) {
+            copy = new HashSet<IndexingDaemonListener>(listeners);
+        }
+        IndexingDaemonEventObject eventObject = new IndexingDaemonEventObject(this);
+        for(IndexingDaemonListener listener : copy){
+            listener.indexingDaemonFinished(eventObject);
+        }
+    }
+    /**
+     * Currently only used to notify listener that this Daemon has processed
+     * all entities
+     * TODO: I would like to use generics here, but I was not able to figure out
+     * how to used them in a way, that one can still register an Listener that
+     * uses <code>IndexingDaemonListener&lt;? super CI,? super&gt;</code> with
+     * the {@link IndexingDaemon#addIndexingDaemonListener(IndexingDaemonListener)}
+     * and {@link IndexingDaemon#removeIndexingDaemonListener(IndexingDaemonListener)}
+     * methods.
+     * @author Rupert Westenthaler
+     *
+     */
+    public static interface IndexingDaemonListener {
+        void indexingDaemonFinished(IndexingDaemonEventObject indexingDaemonEventObject);
+    }
+
+    public static class IndexingDaemonEventObject extends EventObject {
+        private static final long serialVersionUID = -1L;
+        public IndexingDaemonEventObject(IndexingDaemon<?,?> indexingDaemon){
+            super(indexingDaemon);
+        }
+        @Override
+        public IndexingDaemon<?,?> getSource() {
+            return (IndexingDaemon<?,?>)super.getSource();
+        }
+    }
+    protected final BlockingQueue<QueueItem<CI>> getConsumeQueue() {
+        return consume;
+    }
+    protected final BlockingQueue<QueueItem<PI>> getProduceQueue() {
+        return produce;
+    }
+
+    public String getName() {
+        return this.name;
+    }
+    /**
+     * The order of this Daemon. Guaranteed to be NOT NULL
+     * @return the order
+     */
+    public final Integer getSequence() {
+        return sequence;
+    }
+    @Override
+    public int compareTo(IndexingDaemon<?,?> o) {
+        int compare = sequence.compareTo(o.sequence);
+        if(compare != 0){
+            return compare;
+        } else {
+            //the ordering within the same sequence position is of no importance
+            //but it is important to only return 0 if the two Objects are
+            //equals because we will use this class together with SortedSets!
+            if(hashCode() == o.hashCode()){
+                if(equals(o)){
+                    return 0;
+                } else {
+                    return -1; //no idea if that is OK
+                }
+            } else {
+                return hashCode()-o.hashCode();
+            }
+        }
+    }
+}
\ No newline at end of file

Propchange: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/IndexingDaemon.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/IndexingError.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/IndexingError.java?rev=1097740&view=auto
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/IndexingError.java (added)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/IndexingError.java Fri Apr 29 09:20:31 2011
@@ -0,0 +1,33 @@
+/**
+ * 
+ */
+package org.apache.stanbol.entityhub.indexing.core.impl;
+
+public class IndexingError {
+    private final Exception ex;
+    private final String msg;
+    private final String entityId;
+    public IndexingError(String id,String msg,Exception ex){
+        this.entityId = id;
+        this.msg = msg;
+        this.ex = ex;
+    }
+    /**
+     * @return the ex
+     */
+    public Exception getException() {
+        return ex;
+    }
+    /**
+     * @return the msg
+     */
+    public String getMessage() {
+        return msg;
+    }
+    /**
+     * @return the entityId
+     */
+    public String getEntity() {
+        return entityId;
+    }
+}
\ No newline at end of file

Propchange: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/IndexingError.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/IndexingSourceInitialiser.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/IndexingSourceInitialiser.java?rev=1097740&view=auto
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/IndexingSourceInitialiser.java (added)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/IndexingSourceInitialiser.java Fri Apr 29 09:20:31 2011
@@ -0,0 +1,81 @@
+package org.apache.stanbol.entityhub.indexing.core.impl;
+
+import java.util.Collections;
+import java.util.EventObject;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.stanbol.entityhub.indexing.core.IndexingComponent;
+
+/**
+ * Initialises an {@link IndexingComponent} and {@link IndexerImpl#notifyState()} 
+ * if finished
+ * @author Rupert Westenthaler
+ *
+ */
+public class IndexingSourceInitialiser implements Runnable {
+    private final IndexingComponent source;
+    private boolean finished = false;
+    private final Set<IndexingSourceInitialiserListener> listeners;
+    public IndexingSourceInitialiser(IndexingComponent source){
+       this.source = source; 
+       this.listeners = Collections.synchronizedSet(
+           new HashSet<IndexingSourceInitialiserListener>());
+    }
+    @Override
+    public final void run() {
+        source.initialise();
+        finished = true;
+        fireInitialisationCompletedEvent();
+    }
+    public boolean finished(){
+        return finished;
+    }
+    public boolean addIndexingSourceInitialiserListener(IndexingSourceInitialiserListener listener){
+        if(listener != null){
+            return listeners.add(listener);
+        } else {
+            return false;
+        }
+    }
+    public boolean removeIndexingSourceInitialiserListener(IndexingSourceInitialiserListener listener){
+        if(listener != null){
+            return listeners.remove(listener);
+        } else {
+            return false;
+        }
+    }
+    protected void fireInitialisationCompletedEvent(){
+        Set<IndexingSourceInitialiserListener> copy;
+        synchronized (listeners) {
+            copy = new HashSet<IndexingSourceInitialiserListener>(listeners);
+        }
+        IndexingSourceEventObject eventObject = new IndexingSourceEventObject(this,this.source);
+        for(IndexingSourceInitialiserListener listener : copy){;
+            listener.indexingSourceInitialised(eventObject);
+        }
+    }
+    
+    public static interface IndexingSourceInitialiserListener {
+        void indexingSourceInitialised(IndexingSourceEventObject eventObject);
+    }
+    public static class IndexingSourceEventObject extends EventObject{
+        private static final long serialVersionUID = -1L;
+        private final IndexingComponent indexingSource;
+        protected IndexingSourceEventObject(IndexingSourceInitialiser initialiser,IndexingComponent source){
+            super(source);
+            indexingSource = source;
+        }
+        @Override
+        public IndexingSourceInitialiser getSource() {
+            return (IndexingSourceInitialiser) super.getSource();
+        }
+        /**
+         * @return the indexingSource
+         */
+        public IndexingComponent getIndexingSource() {
+            return indexingSource;
+        }
+    }
+
+}
\ No newline at end of file

Propchange: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/IndexingSourceInitialiser.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/QueueItem.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/QueueItem.java?rev=1097740&view=auto
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/QueueItem.java (added)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/QueueItem.java Fri Apr 29 09:20:31 2011
@@ -0,0 +1,87 @@
+package org.apache.stanbol.entityhub.indexing.core.impl;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+
+/**
+ * Item internally used for Queues. It holds an item and can store
+ * additional properties. The Item can be only set while constructing an
+ * instance. The properties can be changed at any time.<p>
+ * Notes:<ul>
+ * <li> {@link #hashCode()} uses the hashCode of the item and 
+ * {@link #equals(Object)} also checks if the item is equal to the other
+ * item. The properties are not used!
+ * <li>This Class is not synchronised.
+ * </ul>
+ * @author Rupert Westenthaler
+ *
+ * @param <T>
+ */
+public class QueueItem<T> {
+    private Map<String,Object> properties;
+    private final T item;
+    /**
+     * Creates a new QueueItem
+     * @param item the payload
+     */
+    public QueueItem(T item){
+        this.item = item;
+    }
+    /**
+     * Creates a QueueItem and copies the properties of an other one.
+     * NOTE that components with an reference to the other QueueItem will be
+     * able to change the properties of the new one. Use <br>
+     * <pre><code>
+     *   QueueItem&lt;String&gt; item = new QueueItem&lt;String&gt;("demo");
+     *   for(String key : other.getProperties()){
+     *       item.setProperties(key,other.getProperty(key));
+     *   }
+     * </code></pre><br>
+     * if you need to ensure that the properties within the QueueItem are not
+     * shared with others.
+     * @param item the payload
+     * @param properties the properties
+     */
+    public QueueItem(T item,QueueItem<?> other){
+        this(item);
+        if(other != null){
+            this.properties = other.properties;
+        }
+    }
+    public T getItem(){
+        return item;
+    }
+    public void setProperty(String key,Object value){
+        if(properties == null){
+            properties = new HashMap<String,Object>();
+        }
+        properties.put(key, value);
+    }
+    public Object getProperty(String key){
+        return properties != null ?
+            properties.get(key): null;
+    }
+    public Object removeProperty(String key){
+        return properties != null ?
+            properties.remove(key): null;
+    }
+    public Set<String> properties(){
+        if(properties != null){
+            return Collections.unmodifiableSet(properties.keySet());
+        } else {
+            return Collections.emptySet();
+        }
+    }
+    @Override
+    public int hashCode() {
+        return item.hashCode();
+    }
+    @Override
+    public boolean equals(Object other) {
+        return (other instanceof QueueItem<?>) &&
+                item.equals(((QueueItem<?>)other).item);
+    }
+}
\ No newline at end of file

Propchange: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/QueueItem.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/normaliser/DefaultNormaliser.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/normaliser/DefaultNormaliser.java?rev=1097740&view=auto
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/normaliser/DefaultNormaliser.java (added)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/normaliser/DefaultNormaliser.java Fri Apr 29 09:20:31 2011
@@ -0,0 +1,38 @@
+package org.apache.stanbol.entityhub.indexing.core.normaliser;
+
+import java.util.Map;
+
+
+/**
+ * This default implementation returns the parsed value. Intended to be used
+ * in cases where parsing <code>null</code> as {@link ScoreNormaliser} is not
+ * supported for some reason.
+ * @author Rupert Westenthaler
+ */
+public class DefaultNormaliser implements ScoreNormaliser{
+
+    
+    private ScoreNormaliser normaliser;
+
+    @Override
+    public Float normalise(Float score) {
+        if(normaliser != null){
+            score = normaliser.normalise(score);
+        }
+        return score;
+    }
+
+    @Override
+    public void setConfiguration(Map<String,Object> config) {
+        Object value = config.get(CHAINED_SCORE_NORMALISER);
+        if(value != null){
+            this.normaliser = (ScoreNormaliser) value;
+        }
+    }
+
+    @Override
+    public ScoreNormaliser getChained() {
+        return normaliser;
+    }
+
+}

Propchange: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/normaliser/DefaultNormaliser.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/normaliser/MinScoreNormalizer.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/normaliser/MinScoreNormalizer.java?rev=1097740&view=auto
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/normaliser/MinScoreNormalizer.java (added)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/normaliser/MinScoreNormalizer.java Fri Apr 29 09:20:31 2011
@@ -0,0 +1,92 @@
+package org.apache.stanbol.entityhub.indexing.core.normaliser;
+
+import java.util.Map;
+
+
+/**
+ * This normaliser will return -1 for scores lower than the minimum parsed to the
+ * constructor. Because of Entities with score &lt;0 are typically not indexed
+ * this can be used to filter Entities based on there score.<p>
+ * This normaliser also supports forwarding the score to an other {@link ScoreNormaliser}.
+ * The filtering is calculated based on the results of this normaliser. To
+ * perform the minimum score on the original scores one should not parse an
+ * {@link ScoreNormaliser} in the constructor
+ * @author Rupert Westenthaler
+ *
+ */
+public class MinScoreNormalizer implements ScoreNormaliser {
+
+    public static final String KEY_INCLUSIVE = "inclusive";
+    public static final String KEY_MIN_SCORE = "min-score";
+    
+    private Float minScore;
+    private ScoreNormaliser normaliser;
+    private boolean inclusive;
+    public MinScoreNormalizer(){
+        this(0,true,null);
+    }
+    /**
+     * Constructs a normaliser that returns -1 for scores lower (if inclusive is 
+     * <code>false</code> lower equals) to the minimum required score. In case
+     * an other normaliser is parsed than scores parsed to {@link #normalise(float)}
+     * are first processed by this normaliser
+     * @param minimumRequiredScore the minimum required score. MUST BE &gt; 0
+     * @param inclusive if scores equals to the required minimum are accepted
+     * @param normaliser the normaliser used to process parsed scores or
+     * <code>null</code> to use none.
+     */
+    public MinScoreNormalizer(float minimumRequiredScore, boolean inclusive,ScoreNormaliser normaliser) {
+        if(minimumRequiredScore < 0){
+            throw new IllegalArgumentException("The parsed minimum required score MUST BE >= 0");
+        }
+        this.inclusive = inclusive;
+        this.minScore = minimumRequiredScore;
+        this.normaliser = normaliser;
+    }
+    
+    @Override
+    public void setConfiguration(Map<String,Object> config) {
+        Object value = config.get(KEY_INCLUSIVE);
+        if(value != null){
+            inclusive = Boolean.parseBoolean(value.toString());
+        } //else default true
+        value = config.get(KEY_MIN_SCORE);
+        if(value != null){
+            minScore = Float.valueOf(value.toString());
+            if(minScore.floatValue() <= 0){
+                throw new IllegalArgumentException("The parsed minScore value '"+value+"'MUST BE greater than 0");
+            }
+        } //else default null
+        value = config.get(CHAINED_SCORE_NORMALISER);
+        if(value != null){
+            normaliser = (ScoreNormaliser)value;
+        }
+    }
+    /**
+     * Constructs an normaliser that returns -1 for all scores lower than the
+     * minimum required score 
+     * @param minimumRequiredScore the minimum required score. MUST BE &gt; 0
+     */
+    public MinScoreNormalizer(float minimumRequiredScore){
+        this(minimumRequiredScore,true, null);
+    }
+    
+    @Override
+    public Float normalise(Float score) {
+        if(normaliser != null){
+            score = normaliser.normalise(score);
+        }
+        if(score == null || score.compareTo(ZERO) < 0){
+            return score;
+        }
+        int compare = score.compareTo(minScore);
+        return (inclusive && compare < 0) || //score == minScore is OK
+                (!inclusive && compare <= 0)? //score == minScore is not OK
+                        MINUS_ONE:score; 
+    }
+    @Override
+    public ScoreNormaliser getChained() {
+        return normaliser;
+    }
+
+}

Propchange: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/normaliser/MinScoreNormalizer.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/normaliser/NaturalLogNormaliser.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/normaliser/NaturalLogNormaliser.java?rev=1097740&view=auto
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/normaliser/NaturalLogNormaliser.java (added)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/normaliser/NaturalLogNormaliser.java Fri Apr 29 09:20:31 2011
@@ -0,0 +1,37 @@
+package org.apache.stanbol.entityhub.indexing.core.normaliser;
+
+import java.util.Map;
+
+
+/**
+ * Uses {@link Math#log1p(double)} to normalise parsed scores.
+ * @author Rupert Westenthaler
+ */
+public class NaturalLogNormaliser implements ScoreNormaliser {
+
+    private ScoreNormaliser normaliser;
+    @Override
+    public Float normalise(Float score) {
+        if(normaliser != null){
+            score = normaliser.normalise(score);
+        }
+        if(score == null || score.compareTo(ZERO) < 0){
+            return score;
+        } else {
+            return Float.valueOf((float)Math.log1p(score.doubleValue()));
+        }
+    }
+
+    @Override
+    public void setConfiguration(Map<String,Object> config) {
+        Object value = config.get(CHAINED_SCORE_NORMALISER);
+        if(value != null){
+            this.normaliser = (ScoreNormaliser) value;
+        }
+    }
+    @Override
+    public ScoreNormaliser getChained() {
+        return normaliser;
+    }
+
+}

Propchange: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/normaliser/NaturalLogNormaliser.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/normaliser/RangeNormaliser.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/normaliser/RangeNormaliser.java?rev=1097740&view=auto
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/normaliser/RangeNormaliser.java (added)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/normaliser/RangeNormaliser.java Fri Apr 29 09:20:31 2011
@@ -0,0 +1,115 @@
+package org.apache.stanbol.entityhub.indexing.core.normaliser;
+
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Normalises scores to the range [0..max]. it supports forwarding the parsed
+ * scores to an other {@link ScoreNormaliser}.
+ * @author Rupert Westenthaler
+ *
+ */
+public class RangeNormaliser implements ScoreNormaliser {
+
+    public static final String KEY_UPPER_BOUND = "upper-bound";
+    public static final String KEY_MAX_EXPECTED_SCORE = "max-expected-score";
+    
+    private static final Logger log = LoggerFactory.getLogger(RangeNormaliser.class);
+
+    private double upperBound;
+    private ScoreNormaliser normaliser;
+    private double maxScore = -1;
+    /**
+     * Normalises values parsed to {@link #normalise(float)} to [0..1] assuming
+     * that the first call to {@link #normalise(float)} will parsed the higest
+     * value.
+     */
+    public RangeNormaliser(){
+        this(null,null,null);
+    }
+    /**
+     * Uses the parsed {@link ScoreNormaliser} and further normalises results to
+     * [0..1] assuming that the first call to {@link #normalise(float)} will 
+     * parsed the highest value. 
+     * @param normaliser the normaliser used to normalise parsed scores before
+     * putting them to the range [0..1]
+     */
+    public RangeNormaliser(ScoreNormaliser normaliser){
+        this(normaliser,null,null);
+    }
+    /**
+     * Constructs an RangeNormalizer that forwards to the parsed normaliser but
+     * keeps results within the range [0..{upperBound}] based on the provided
+     * {maxScore} expected to be parsed to {@link #normalise(float)};
+     * @param normaliser The normaliser called to process scores parsed to 
+     * {@link #normalise(float)}. If <code>null</code> than parsed scores are
+     * only normalised to the range [0..{upperBound}]
+     * @param upperBound the upper bound for the range. If <code>null</code> is
+     * parsed the range [0..1] will be used.
+     * @param maxExpectedScore the maximum expected score. If this value is &lt;
+     * 0 or <code>null</code> than the first score parsed to 
+     * {@link #normalise(float)} will be used instead. This feature is useful if
+     * entities are already sorted by there score.
+     */
+    public RangeNormaliser(ScoreNormaliser normaliser,Float upperBound,Float maxExpectedScore) {
+        if(normaliser == null){
+            this.normaliser = new DefaultNormaliser();
+        } else {
+            this.normaliser = normaliser;
+        }
+        if(upperBound == null){
+            this.upperBound = 1;
+        } else if(upperBound > 0){
+            this.upperBound = upperBound;
+        } else {
+            throw new IllegalArgumentException("The parsed upperBound MUST NOT be <= 0. Parse NULL (to use the default) or values > 0!");
+        }
+        if(maxExpectedScore != null && maxExpectedScore > 0){
+            normalise(maxExpectedScore); //call normalise for initialisation of maxScore
+        } else if(maxExpectedScore != null){
+            throw new IllegalArgumentException("The parsed maxExpectedScore MUST NOT be <= 0. Parse NULL (to use the first value parsed to normalise(..)) or values > 0!");
+        } //else maxExpectedScore == null -> will use the first call to init maxExpectedScore!
+    }  
+    
+    @Override
+    public Float normalise(Float parsed) {
+        parsed = normaliser.normalise(parsed);
+        if(parsed == null || parsed.compareTo(ZERO) < 0){
+            return parsed;
+        }
+        double score = parsed.doubleValue();
+        if(maxScore<0){ //set based on the first call
+            maxScore = score;
+        } else if(score > maxScore){
+            //print a warning if the first call does not parse the higest score
+            log.warn("Found higer Score than of the first parsed value. This will cause all scores to exeed the range [0..1]");
+        }
+        return Float.valueOf((float)(upperBound*(score/maxScore)));
+    }
+    @Override
+    public void setConfiguration(Map<String,Object> config) {
+        Object value = config.get(CHAINED_SCORE_NORMALISER);
+        if(value != null){
+            normaliser = (ScoreNormaliser)value;
+        }
+        value = config.get(KEY_UPPER_BOUND);
+        if(value != null) {
+            upperBound = Double.valueOf(value.toString());
+            if(upperBound <= 0){
+                throw new IllegalArgumentException("The upper bound '"+upperBound+"' MUST BE > zero!");
+            }
+        } //else [0..1]
+        value = config.get(KEY_MAX_EXPECTED_SCORE);
+        if(value != null){
+            Float maxExpected = Float.valueOf(value.toString());
+            normalise(maxExpected);
+        } // else none
+    }
+    @Override
+    public ScoreNormaliser getChained() {
+        return normaliser;
+    }
+
+}

Propchange: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/normaliser/RangeNormaliser.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/normaliser/ScoreNormaliser.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/normaliser/ScoreNormaliser.java?rev=1097740&view=auto
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/normaliser/ScoreNormaliser.java (added)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/normaliser/ScoreNormaliser.java Fri Apr 29 09:20:31 2011
@@ -0,0 +1,47 @@
+package org.apache.stanbol.entityhub.indexing.core.normaliser;
+
+import java.util.Map;
+
+/**
+ * This Interface provides the possibility to process score values provided for
+ * Entities (e.g. to calculate the pageRank based on the number of incomming links
+ * @author Rupert Westenthaler
+ *
+ */
+public interface ScoreNormaliser {
+    /**
+     * Key used to configure an other {@link ScoreNormaliser} instance that
+     * should be called before this instance processes the score. Values
+     * MUST BE of type {@link ScoreNormaliser}.
+     */
+    String CHAINED_SCORE_NORMALISER = "chained";
+    
+    /**
+     * -1 will be used a lot with implementations of this interface
+     */
+    public static final Float MINUS_ONE = Float.valueOf(-1f);
+    /**
+     * 0 will be used a lot with implementations of this interface
+     */
+    public static final Float ZERO = Float.valueOf(0f);
+
+    void setConfiguration(Map<String,Object> config);
+    /**
+     * Normalises the parsed score value based on some algorithm.
+     * @param score The score to normalise. <code>null</code> and values &lt; 0
+     * MUST be ignored and returned as parsed.
+     * @return <code>null</code> if no score can be calculated based on the
+     * parsed value (especially if <code>null</code> is parsed as score).
+     * Otherwise the normalized score where values &lt;0 indicate that the
+     * entity should not be indexed.
+     */
+    Float normalise(Float score);
+
+    /**
+     * Returns the {@link ScoreNormaliser} instance that is chained to this one.
+     * Parsed scores are first parsed to chained instances before they are
+     * processed by current one.
+     * @return the chained instance or <code>null</code> if none
+     */
+    ScoreNormaliser getChained();
+}

Propchange: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/normaliser/ScoreNormaliser.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/processor/EmptyProcessor.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/processor/EmptyProcessor.java?rev=1097740&view=auto
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/processor/EmptyProcessor.java (added)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/processor/EmptyProcessor.java Fri Apr 29 09:20:31 2011
@@ -0,0 +1,41 @@
+package org.apache.stanbol.entityhub.indexing.core.processor;
+
+import java.util.Map;
+
+import org.apache.stanbol.entityhub.indexing.core.EntityProcessor;
+import org.apache.stanbol.entityhub.servicesapi.model.Representation;
+
+/**
+ * Returns the parsed Representation. Intended to be used in cases where a
+ * <code>null</code> value is not allowed for the {@link EntityProcessor}.
+ * @author Rupert Westenthaler
+ *
+ */
+public class EmptyProcessor implements EntityProcessor{
+
+    @Override
+    public Representation process(Representation source) {
+        return source;
+    }
+
+    @Override
+    public void close() {
+        //nothing to do
+    }
+
+    @Override
+    public void initialise() {
+        //nothing to do
+    }
+
+    @Override
+    public boolean needsInitialisation() {
+        return false;
+    }
+
+    @Override
+    public void setConfiguration(Map<String,Object> config) {
+        //no configuration supported
+    }
+
+}

Propchange: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/processor/EmptyProcessor.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/processor/FiledMapperProcessor.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/processor/FiledMapperProcessor.java?rev=1097740&view=auto
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/processor/FiledMapperProcessor.java (added)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/processor/FiledMapperProcessor.java Fri Apr 29 09:20:31 2011
@@ -0,0 +1,153 @@
+package org.apache.stanbol.entityhub.indexing.core.processor;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.LineIterator;
+import org.apache.stanbol.entityhub.core.mapping.DefaultFieldMapperImpl;
+import org.apache.stanbol.entityhub.core.mapping.FieldMappingUtils;
+import org.apache.stanbol.entityhub.core.mapping.ValueConverterFactory;
+import org.apache.stanbol.entityhub.core.model.InMemoryValueFactory;
+import org.apache.stanbol.entityhub.core.site.CacheUtils;
+import org.apache.stanbol.entityhub.indexing.core.EntityProcessor;
+import org.apache.stanbol.entityhub.indexing.core.config.IndexingConfig;
+import org.apache.stanbol.entityhub.servicesapi.mapping.FieldMapper;
+import org.apache.stanbol.entityhub.servicesapi.mapping.FieldMapping;
+import org.apache.stanbol.entityhub.servicesapi.model.Representation;
+import org.apache.stanbol.entityhub.servicesapi.model.ValueFactory;
+
+public class FiledMapperProcessor implements EntityProcessor{
+    public static final String PARAM_MAPPINGS = "mappings";
+    public static final String PARAM_VALUE_FACTORY = "valueFactory";
+    public static final String DEFAULT_MAPPINGS_FILE_NAME = "fieldMappings.txt";
+    private FieldMapper mapper;
+    private ValueFactory vf;
+    /**
+     * This Constructor relays on that {@link #setConfiguration(Map)} is called
+     * afterwards!
+     */
+    public FiledMapperProcessor(){
+        this(null);
+    }
+    /**
+     * Internally used to initialise the {@link ValueFactory}
+     * @param vf the value factory or <code>null</code> to use the {@link InMemoryValueFactory}.
+     */
+    private FiledMapperProcessor(ValueFactory vf){
+        setValueFactory(vf);
+    }
+    public FiledMapperProcessor(FieldMapper mapper, ValueFactory vf){
+        this(vf);
+        if(mapper == null){
+            throw new IllegalArgumentException("The parsed FieldMapper MUST NOT be NULL!");
+        }
+    }
+    public FiledMapperProcessor(Iterator<String> mappings,ValueFactory vf){
+        this(vf);
+        if(mappings == null){
+            throw new IllegalArgumentException("The parsed field mappings MUST NOT be NULL!");
+        }
+        mapper = FieldMappingUtils.createDefaultFieldMapper(mappings);
+        if(mapper.getMappings().isEmpty()){
+            throw new IllegalStateException("The parsed field mappings MUST contain at least a single valid mapping!");
+        }
+    }
+    public FiledMapperProcessor(InputStream mappings, ValueFactory vf) throws IOException{
+        this(vf);
+        if(mappings == null){
+            throw new IllegalArgumentException("The parsed field mappings MUST NOT be NULL!");
+        }
+        this.mapper = createMapperFormStream(mappings);
+    }
+    @Override
+    public Representation process(Representation source) {
+        if(mapper == null){
+            throw new IllegalStateException("The mapper is not initialised. One must call setConfiguration to configure the FieldMapper!");
+        }
+        if(source == null){
+            return null;
+        } else {
+            return mapper.applyMappings(source,
+                vf.createRepresentation(source.getId()),
+                vf);
+        }
+    }
+    /**
+     * used by the different constructors to init the {@link ValueFactory}
+     * @param vf the value factory or <code>null</code> to use the default
+     */
+    private void setValueFactory(ValueFactory vf) {
+        if(vf == null){
+            this.vf = InMemoryValueFactory.getInstance();
+        } else {
+            this.vf = vf;
+        }
+    }
+    @Override
+    public void close() {
+        //nothing todo
+        
+    }
+    @Override
+    public void initialise() {
+        //nothing todo
+    }
+    @Override
+    public boolean needsInitialisation() {
+        return false;
+    }
+    @Override
+    public void setConfiguration(Map<String,Object> config) {
+        IndexingConfig indexingConfig = (IndexingConfig)config.get(IndexingConfig.KEY_INDEXING_CONFIG);
+        Object value = config.get(PARAM_MAPPINGS);
+        if(value == null || value.toString().isEmpty()){
+            //use the mappings configured for the Index
+            this.mapper = FieldMappingUtils.createDefaultFieldMapper(
+                indexingConfig.getIndexFieldConfiguration());
+        } else {
+            //load (other) mappings based on the provided mappings parameter
+            final File file = new File(indexingConfig.getConfigFolder(),value.toString());
+            try {
+                this.mapper = createMapperFormStream(new FileInputStream(file));
+            } catch (FileNotFoundException e) {
+                throw new IllegalArgumentException("FieldMapping file "+value+" not found in configuration directory "+indexingConfig.getConfigFolder());
+            } catch (IOException e) {
+                throw new IllegalArgumentException("Unable to access FieldMapping file "+value+" not found in configuration directory "+indexingConfig.getConfigFolder());
+            }
+        }
+        //TODO: get the valueFactory form the config (currently an InMemory is
+        //create by the default constructor!
+    }
+    /**
+     * Utility that allows to create a FieldMapper form an inputStream.
+     * It uses {@link IOUtils#lineIterator(InputStream, String)} and parses it
+     * to {@link FieldMappingUtils#createDefaultFieldMapper(Iterator)}
+     * @param in the stream to read the mappings from
+     * @throws IOException on any error while reading the data from the stream
+     */
+    private static FieldMapper createMapperFormStream(final InputStream in) throws IOException {
+        return FieldMappingUtils.createDefaultFieldMapper(new Iterator<String>() {
+            LineIterator it = IOUtils.lineIterator(in, "UTF-8");
+            @Override
+            public boolean hasNext() {
+                return it.hasNext();
+            }
+            @Override
+            public String next() {
+                return it.nextLine();
+            }
+            @Override
+            public void remove() {
+                it.remove();
+            }
+        });
+    }
+
+}

Propchange: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/processor/FiledMapperProcessor.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/source/EntityFieldScoreProvider.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/source/EntityFieldScoreProvider.java?rev=1097740&view=auto
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/source/EntityFieldScoreProvider.java (added)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/source/EntityFieldScoreProvider.java Fri Apr 29 09:20:31 2011
@@ -0,0 +1,90 @@
+package org.apache.stanbol.entityhub.indexing.core.source;
+
+import java.util.Map;
+
+import org.apache.stanbol.entityhub.indexing.core.EntityScoreProvider;
+import org.apache.stanbol.entityhub.servicesapi.model.Representation;
+import org.apache.stanbol.entityhub.servicesapi.model.rdf.RdfResourceEnum;
+
+public class EntityFieldScoreProvider implements EntityScoreProvider {
+    
+    public static final String PRAM_FIELD_NAME = "field";
+    
+    public static final String DEFAULT_FIELD_NAME = RdfResourceEnum.signRank.getUri();
+    
+    private String fieldName;
+    
+    /**
+     * Creates an instance that uses the field as specified by
+     * {@link RdfResourceEnum#signRank} to retrieve the score for an entity
+     */
+    public EntityFieldScoreProvider(){
+        this(null);
+    }
+    /**
+     * Creates an instance that uses the parsed field to retrieve the score for 
+     * an entity or {@link RdfResourceEnum#signRank} in case <code>null</code>
+     * is parsed.
+     * @param fieldName the field used to retrieve the score from parsed
+     * {@link Representation}s or <code>null</code> to use the
+     * {@link RdfResourceEnum#signRank} field.
+     */
+    public EntityFieldScoreProvider(String fieldName){
+        if(fieldName == null){
+            this.fieldName = DEFAULT_FIELD_NAME;
+        }
+    }
+    
+    @Override
+    public boolean needsData() {
+        return true;
+    }
+
+    @Override
+    public Float process(String id) throws UnsupportedOperationException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Float process(Representation entity) throws UnsupportedOperationException {
+        Object value = entity.getFirst(fieldName);
+        if(value instanceof Float){
+            return (Float)value;
+        } else if(value instanceof Number) {
+            return Float.valueOf(((Number)value).floatValue());
+        } else {
+            if(value != null){
+                try {
+                    return Float.valueOf(value.toString());
+                } catch(NumberFormatException e){
+                    return null;
+                }
+            } else {
+                return null;
+            }
+        }
+    }
+
+    @Override
+    public void close() {
+        //nothing to do
+    }
+
+    @Override
+    public void initialise() {
+        //no initialisation needed
+    }
+
+    @Override
+    public boolean needsInitialisation() {
+        return false;
+    }
+
+    @Override
+    public void setConfiguration(Map<String,Object> config) {
+        Object value = config.get(PRAM_FIELD_NAME);
+        if(value != null && !value.toString().isEmpty()){
+            fieldName = value.toString();
+        }
+    }
+}

Propchange: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/source/EntityFieldScoreProvider.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/source/EntityIneratorToScoreProviderAdapter.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/source/EntityIneratorToScoreProviderAdapter.java?rev=1097740&view=auto
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/source/EntityIneratorToScoreProviderAdapter.java (added)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/source/EntityIneratorToScoreProviderAdapter.java Fri Apr 29 09:20:31 2011
@@ -0,0 +1,88 @@
+package org.apache.stanbol.entityhub.indexing.core.source;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.stanbol.entityhub.indexing.core.EntityIterator;
+import org.apache.stanbol.entityhub.indexing.core.EntityScoreProvider;
+import org.apache.stanbol.entityhub.indexing.core.EntityIterator.EntityScore;
+import org.apache.stanbol.entityhub.indexing.core.config.IndexingConfig;
+import org.apache.stanbol.entityhub.servicesapi.model.Representation;
+
+/**
+ * Simple Adapter between an {@link EntityIterator} and the {@link EntityScoreProvider}
+ * interface that iterates over all entities provided by the {@link EntityIterator}
+ * and uses this information to initialise an {@link EntityScoreProvider}.<p>
+ *  
+ * @author Rupert Westenthaler
+ *
+ */
+public class EntityIneratorToScoreProviderAdapter implements EntityScoreProvider {
+
+    private EntityIterator entityIterator;
+    private EntityScoreProvider provider;
+    public EntityIneratorToScoreProviderAdapter(){
+        this(null);
+    }
+    public EntityIneratorToScoreProviderAdapter(EntityIterator entityIterator){
+        if(entityIterator == null){
+            throw new IllegalArgumentException("The EntityIterator MUST NOT be NULL!");
+        }
+        this.entityIterator = entityIterator;
+    }
+    
+    @Override
+    public boolean needsData() {
+        return provider.needsData();
+    }
+
+    @Override
+    public Float process(String id) throws UnsupportedOperationException {
+        return provider.process(id);
+    }
+
+    @Override
+    public Float process(Representation entity) throws UnsupportedOperationException {
+        return provider.process(entity);
+    }
+
+    @Override
+    public boolean needsInitialisation() {
+        return true;
+    }
+    @Override
+    public void initialise() {
+        //initialise the source entity iterator
+        if(entityIterator.needsInitialisation()){
+            entityIterator.initialise();
+        }
+        //initialise this instace
+        Map<String,Float> entityScoreMap = new HashMap<String,Float>();
+        while(entityIterator.hasNext()){
+            EntityScore entityScore = entityIterator.next();
+            entityScoreMap.put(entityScore.id, entityScore.score);
+        }
+        //close the source because it is no longer needed!
+        entityIterator.close();
+        provider = new MapEntityScoreProvider(entityScoreMap);
+        //initialise the wrapped score provider
+        if(provider.needsInitialisation()){
+            provider.initialise();
+        }
+    }
+    @Override
+    public void close() {
+       provider.close();
+    }
+    @Override
+    public void setConfiguration(Map<String,Object> config) {
+        //the IndexingConfig is available via the IndexingConfig.KEY_INDEXING_CONFIG key!
+        IndexingConfig indexingConfig = (IndexingConfig)config.get(IndexingConfig.KEY_INDEXING_CONFIG);
+        //configure first the EntityIterator to adapt
+        entityIterator = indexingConfig.getEntityIdIterator();
+        if(entityIterator == null){
+            throw new IllegalArgumentException("No EntityIterator available via the indexing configuration "+indexingConfig.getName());
+        }
+    }
+
+}

Propchange: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/source/EntityIneratorToScoreProviderAdapter.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/source/LineBasedEntityIterator.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/source/LineBasedEntityIterator.java?rev=1097740&view=auto
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/source/LineBasedEntityIterator.java (added)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/source/LineBasedEntityIterator.java Fri Apr 29 09:20:31 2011
@@ -0,0 +1,225 @@
+package org.apache.stanbol.entityhub.indexing.core.source;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+import java.util.Arrays;
+import java.util.Map;
+
+import org.apache.stanbol.entityhub.indexing.core.EntityIterator;
+import org.apache.stanbol.entityhub.indexing.core.config.IndexingConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+/**
+ * Implementation of the {@link EntityIterator} based on reading data line wise
+ * from an {@link InputStream}
+ * @author Rupert Westenthaler
+ *
+ */
+public class LineBasedEntityIterator implements EntityIterator {
+
+    private static final Logger log = LoggerFactory.getLogger(LineBasedEntityIterator.class);
+    /**
+     * The default separator to split the entity id with the score "\t" (tab)
+     */
+    public static final String DEFAULT_SEPARATOR = "\t";
+    /**
+     * The default encoding used to read the data from the parsed {@link InputStream}
+     * (UTF-8)
+     */
+    public static final String DEFAULT_ENCODING = "UTF-8";
+    /**
+     * The default if Entity ids should be {@link URLEncoder URLEncoded} (false)
+     */
+    public static final boolean DEFAULT_ENCODE_ENTITY_IDS = false;
+    /**
+     * Parameter used to configure the name of the source file within the
+     * {@link IndexingConfig#getSourceFolder()}
+     */
+    public static final String PARAM_ENTITY_SCORE_FILE = "source";
+    /**
+     * The default name used for the {@link #PARAM_ENTITY_SCORE_FILE}
+     */
+    public static final String DEFAULT_ENTITY_SCORE_FILE = "entityScores.tsv";
+    /**
+     * Parameter used to configure if the Entity IDs should be {@link URLEncoder
+     * URL encoded} (the default is {@value #DEFAULT_ENCODE_ENTITY_IDS})
+     */
+    public static final String PARAM_URL_ENCODE_ENTITY_IDS = "encodeIds";
+    /**
+     * Parameter used to configure the text encoding used by the source file
+     * (the default is {@value #DEFAULT_ENCODING})
+     */
+    public static final String PARAM_CHARSET = "charset";
+    
+    private BufferedReader reader;
+    private String separator;
+    private String charset;
+    private boolean encodeEntityIds;
+    private long lineCounter = 0;
+    private String nextLine;
+    
+    /**
+     * Default constructor relaying on {@link #setConfiguration(Map)} is used
+     * to provide the configuration
+     */
+    public LineBasedEntityIterator(){
+        this(null);
+    }
+    /**
+     * Constructs an EntityScoreIterator that reads {@link EntityScore}s based 
+     * on lines provided by the parsed InputStream. <p> Separator, Charset and
+     * encoding of Entity ids are initialised based on the default values.
+     * @param is the InputStream to read the data from
+     * @throws IOException On any error while initialising the {@link BufferedReader}
+     * based on the parsed {@link InputStream}
+     */
+    public LineBasedEntityIterator(InputStream is) {
+        this(is,null,null,null);
+    }
+    /**
+     * Constructs an EntityScoreIterator based on the parsed parameters. The
+     * default values are used if <code>null</code> is parsed for any parameter
+     * other than the InputStream.
+     * @param is the InputStream to read the data from
+     * @param charset
+     * @param separator
+     * @param encodeIds
+     * @throws IOException On any error while initialising the {@link BufferedReader}
+     * based on the parsed {@link InputStream}
+     * @throws IllegalArgumentException if <code>null</code> is parsed as InputStream
+     */
+    public LineBasedEntityIterator(InputStream is,String charset,String separator,Boolean encodeIds) throws IllegalArgumentException {
+        if(charset == null){
+            this.charset = DEFAULT_ENCODING;
+        } else {
+            this.charset = charset;
+        }
+        if(separator == null){
+            this.separator = DEFAULT_SEPARATOR;
+        } else {
+            this.separator = separator;
+        }
+        if(encodeIds == null){
+            encodeEntityIds = DEFAULT_ENCODE_ENTITY_IDS;
+        } else {
+            encodeEntityIds = encodeIds;
+        }
+        if(is != null){
+            initReader(is);
+        }
+    }
+    @Override
+    public void setConfiguration(Map<String,Object> config) {
+        IndexingConfig indexingConfig = (IndexingConfig)config.get(IndexingConfig.KEY_INDEXING_CONFIG);
+        File score;
+        Object value = config.get(PARAM_CHARSET);
+        if(value != null && value.toString() != null){
+            this.charset = value.toString();
+        }
+        value = config.get(PARAM_URL_ENCODE_ENTITY_IDS);
+        if(value != null){
+            this.encodeEntityIds = Boolean.parseBoolean(value.toString());
+        }
+        value = config.get(PARAM_ENTITY_SCORE_FILE);
+        if(value == null || value.toString().isEmpty()){
+            score = new File(DEFAULT_ENTITY_SCORE_FILE);
+        } else {
+            score = new File(indexingConfig.getSourceFolder(),value.toString());
+        }
+        try {
+            initReader(new FileInputStream(score));
+        } catch (FileNotFoundException e) {
+            throw new IllegalArgumentException("The File with the entity scores "+score.getAbsolutePath()+" does not exist",e);
+        }
+    }
+    /**
+     * used by the constructors and {@link #setConfiguration(Map)} to initialise
+     * the reader based on the provided File/InputStream.
+     * @param is the input stream
+     * @param charset the charset
+     */
+    private void initReader(InputStream is) {
+        try {
+            reader = new BufferedReader(new InputStreamReader(is, this.charset));
+        } catch (UnsupportedEncodingException e) {
+            throw new IllegalArgumentException("The parsed encoding "+charset+" is not supported",e);
+        }
+    }
+    @Override
+    public boolean hasNext() {
+        if(nextLine == null){ //consumed
+            getNext();
+        }
+        return nextLine != null;
+    }
+
+    @Override
+    public EntityScore next() {
+        String line = nextLine;
+        nextLine = null; //consume
+        String[] parts = line.split(separator);
+        if(parts.length > 2){
+            log.warn("Line {} does have more than 2 parts {}",
+                lineCounter,Arrays.toString(parts));
+        }
+        Float score;
+        if(parts.length >=2){
+            try {
+                score = Float.parseFloat(parts[1]);
+            } catch (NumberFormatException e) {
+                log.warn(String.format("Unable to parse the score for " +
+                		"Entity %s from value %s in line %s! Use NULL as score 0",
+                		parts[0],parts[1],lineCounter));
+                score = null;
+            }
+        } else {
+            log.debug("No score for Entity {} in line {}! Use NULL as score",parts[0],lineCounter);
+            score = null;
+        }
+        try {
+            return new EntityScore(
+                encodeEntityIds ? URLEncoder.encode(parts[0], charset) : parts[0],
+                score);
+        } catch (UnsupportedEncodingException e) {
+            throw new IllegalStateException("Unable to URLEncode EntityId",e);
+        }
+    }
+    private void getNext(){
+        try {
+            nextLine = reader.readLine();
+            lineCounter++;
+        } catch (IOException e) {
+           throw new IllegalStateException("Unable to read next EntityScore",e);
+        }
+    }
+
+    @Override
+    public void remove() {
+        throw new UnsupportedOperationException("Removal form the EnityScore list is not supported");
+    }
+    @Override
+    public boolean needsInitialisation() {
+        return false;
+    }
+    @Override
+    public void initialise() {
+    }
+        
+    @Override
+    public void close(){
+        if(reader != null){
+            try {
+                reader.close();
+            }catch (IOException e) {
+                //ignore
+            }
+        }
+    }
+}

Propchange: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/source/LineBasedEntityIterator.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain