You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stanbol.apache.org by rw...@apache.org on 2011/04/29 11:20:38 UTC
svn commit: r1097740 [3/10] - in /incubator/stanbol/trunk: entityhub/
entityhub/generic/core/src/main/java/org/apache/stanbol/entityhub/core/mapping/
entityhub/generic/servicesapi/src/main/java/org/apache/stanbol/entityhub/servicesapi/mapping/
entityhu...
Added: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/IndexerImpl.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/IndexerImpl.java?rev=1097740&view=auto
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/IndexerImpl.java (added)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/IndexerImpl.java Fri Apr 29 09:20:31 2011
@@ -0,0 +1,530 @@
+package org.apache.stanbol.entityhub.indexing.core.impl;
+import static org.apache.stanbol.entityhub.indexing.core.impl.IndexerConstants.INDEXING_COMPLETED_QUEUE_ITEM;
+import static org.apache.stanbol.entityhub.indexing.core.impl.IndexerConstants.SCORE_FIELD;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.stanbol.entityhub.indexing.core.EntityDataIterable;
+import org.apache.stanbol.entityhub.indexing.core.EntityDataIterator;
+import org.apache.stanbol.entityhub.indexing.core.EntityDataProvider;
+import org.apache.stanbol.entityhub.indexing.core.EntityIterator;
+import org.apache.stanbol.entityhub.indexing.core.EntityProcessor;
+import org.apache.stanbol.entityhub.indexing.core.EntityScoreProvider;
+import org.apache.stanbol.entityhub.indexing.core.Indexer;
+import org.apache.stanbol.entityhub.indexing.core.IndexingComponent;
+import org.apache.stanbol.entityhub.indexing.core.IndexingDestination;
+import org.apache.stanbol.entityhub.indexing.core.event.IndexingEvent;
+import org.apache.stanbol.entityhub.indexing.core.event.IndexingListener;
+import org.apache.stanbol.entityhub.indexing.core.impl.IndexingDaemon.IndexingDaemonEventObject;
+import org.apache.stanbol.entityhub.indexing.core.impl.IndexingDaemon.IndexingDaemonListener;
+import org.apache.stanbol.entityhub.indexing.core.impl.IndexingSourceInitialiser.IndexingSourceEventObject;
+import org.apache.stanbol.entityhub.indexing.core.impl.IndexingSourceInitialiser.IndexingSourceInitialiserListener;
+import org.apache.stanbol.entityhub.indexing.core.normaliser.ScoreNormaliser;
+import org.apache.stanbol.entityhub.indexing.core.processor.EmptyProcessor;
+import org.apache.stanbol.entityhub.servicesapi.model.Representation;
+import org.apache.stanbol.entityhub.servicesapi.yard.Yard;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+/**
+ * Abstract Implementation of the Indexer. <p>
+ * Principally there are two ways how to index entities <ol>
+ * <li> Iterate of the the entityIds/scores by using an {@link EntityIterator}
+ * and lookup the data by using an {@link EntityDataProvider}.
+ * <li> Iterate over the data by using an {@link EntityDataIterator} (provided
+ * by an {@link EntityDataIterable}) and lookup/calculate the scores by using an
+ * {@link EntityScoreProvider}.
+ * </ol>
+ * This Implementation provides a static createInstance(..) method for each of
+ * the two variants.<p>
+ * After the Entities are loaded from the source they are processed by using the
+ * configured {@link EntityProcessor}. Finally the processed entities are
+ * persisted in a {@link Yard}.
+ * @author Rupert Westenthaler
+ *
+ */
+public class IndexerImpl implements Indexer {
+ //protected to allow internal classes direct access!
+ protected static final Logger log = LoggerFactory.getLogger(IndexerImpl.class);
+
+ /**
+ * Holds the indexing listener
+ */
+ private final Set<IndexingListener> listeners;
+
+ public static final int DEFAULT_CHUNK_SIZE = 10;
+ private int chunkSize;
+ public static final int MIN_QUEUE_SIZE = 500;
+ /**
+ * Queue used to add Entities read from the IndexingSource(s). This queue
+ * is consumed by the {@link EntityProcessorRunnable}.
+ */
+ private BlockingQueue<QueueItem<Representation>> indexedEntityQueue;
+ /**
+ * Queue used to add processed Entities. This queue is consumed by the
+ * {@link EntityPersisterRunnable}.
+ */
+ private BlockingQueue<QueueItem<Representation>> processedEntityQueue;
+ /**
+ * Queue used to add finished Entities. Mainly used for counting and
+ * logging
+ */
+ private BlockingQueue<QueueItem<Representation>> finishedEntityQueue;
+
+ private BlockingQueue<QueueItem<IndexingError>> errorEntityQueue;
+
+ private boolean indexAllEntitiesState = false;
+
+ //entityMode
+ private EntityIterator entityIterator;
+ private EntityDataProvider dataProvider;
+ //dataMode
+ private EntityDataIterable dataIterable;
+ private EntityScoreProvider scoreProvider;
+ private final Collection<IndexingComponent> indexingComponents;
+
+ private final IndexingDestination indexingDestination;
+ private final EntityProcessor entityProcessor;
+ private final ScoreNormaliser scoreNormaliser;
+
+ private State state = State.UNINITIALISED;
+ private final Object stateSync = new Object();
+
+ public IndexerImpl(EntityIterator entityIterator,
+ EntityDataProvider dataProvider,
+ ScoreNormaliser normaliser,
+ IndexingDestination indexingDestination,
+ EntityProcessor entityProcessor){
+ this(normaliser,indexingDestination,entityProcessor);
+ //set entityMode interfaces
+ if(entityIterator == null){
+ throw new IllegalArgumentException("The EntityIterator MUST NOT be NULL!");
+ }
+ this.entityIterator = entityIterator;
+ this.dataProvider = dataProvider;
+ //add the parsed indexingSources to the list
+ this.indexingComponents.add(entityIterator);
+ this.indexingComponents.add(dataProvider);
+ }
+ public IndexerImpl(EntityDataIterable dataIterable,
+ EntityScoreProvider scoreProvider,
+ ScoreNormaliser normaliser,
+ IndexingDestination indexingDestination,
+ EntityProcessor entityProcessor){
+ this(normaliser,indexingDestination,entityProcessor);
+ //deactivate entityMode interfaces
+ this.entityIterator = null;
+ if(scoreProvider == null){
+ throw new IllegalArgumentException("The EntityScoreProvider MUST NOT be NULL!");
+ }
+ this.scoreProvider = scoreProvider;
+ this.dataIterable = dataIterable;
+ //add the parsed indexingSources to the list
+ this.indexingComponents.add(scoreProvider);
+ this.indexingComponents.add(dataIterable);
+ }
+
+ protected IndexerImpl(ScoreNormaliser normaliser,
+ IndexingDestination indexingDestination,
+ EntityProcessor entityProcessor){
+ if(indexingDestination == null){
+ throw new IllegalArgumentException("The Yard MUST NOT be NULL!");
+ }
+ this.indexingDestination = indexingDestination;
+ if(entityProcessor == null){
+ this.entityProcessor = new EmptyProcessor();
+ } else {
+ this.entityProcessor = entityProcessor;
+ }
+ setChunkSize(DEFAULT_CHUNK_SIZE); //init the chunk size and the cache
+ this.scoreNormaliser = normaliser;
+ indexingComponents = new ArrayList<IndexingComponent>();
+ indexingComponents.add(indexingDestination);
+ indexingComponents.add(entityProcessor);
+ listeners = new HashSet<IndexingListener>();
+ }
+ public boolean addIndexListener(IndexingListener listener){
+ if(listener != null){
+ synchronized (listeners) {
+ return listeners.add(listener);
+ }
+ } else {
+ return false;
+ }
+ }
+ public boolean removeIndexListener(IndexingListener listener){
+ if(listener != null){
+ synchronized (listeners) {
+ return listeners.remove(listener);
+ }
+ } else {
+ return false;
+ }
+ }
+ protected void fireStateChanged(){
+ IndexingEvent event = new IndexingEvent(this);
+ Collection<IndexingListener> copy = new ArrayList<IndexingListener>(listeners.size());
+ synchronized (listeners) {
+ copy.addAll(listeners);
+ }
+ for(IndexingListener listener : copy){
+ listener.stateChanged(event);
+ //if the state is finished also send the completed event
+ if(getState() == State.FINISHED){
+ listener.indexingCompleted(event);
+ }
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.stanbol.entityhub.indexing.core.IndexerInterface#setChunkSize(int)
+ */
+ public void setChunkSize(int chunkSize) throws IllegalStateException {
+ if(getState().ordinal() >= State.INDEXING.ordinal()){
+ throw new IllegalStateException("Setting the chunkSize is only allowed before starting the indexing process!");
+ }
+ if(chunkSize <= 0){
+ chunkSize = DEFAULT_CHUNK_SIZE;
+ }
+ this.chunkSize = chunkSize;
+ }
+ /* (non-Javadoc)
+ * @see org.apache.stanbol.entityhub.indexing.core.IndexerInterface#getChunkSize()
+ */
+ public int getChunkSize() {
+ return chunkSize;
+ }
+ /* (non-Javadoc)
+ * @see org.apache.stanbol.entityhub.indexing.core.IndexerInterface#getYard()
+ */
+ public Yard getYard() {
+ return indexingDestination.getYard();
+ }
+ @Override
+ public void initialiseIndexingSources() {
+ synchronized (stateSync) { //ensure that two threads do not start the
+ //initialisation at the same time ...
+ if(getState() != State.UNINITIALISED){
+ return;
+ }
+ setState(State.INITIALISING);
+ log.info("Initialisation started ...");
+ }
+ //add all IndexingSources that need to be initialised to a set
+ final Collection<IndexingComponent> toInitialise = new HashSet<IndexingComponent>();
+ //we need an simple listener that removes the IndexingSerouces from the
+ //above list
+ final IndexingSourceInitialiserListener listener = new IndexingSourceInitialiserListener() {
+ @Override
+ public void indexingSourceInitialised(IndexingSourceEventObject eventObject) {
+ //remove the IndexingSource from the toInitialise set
+ synchronized (toInitialise) {
+ toInitialise.remove(eventObject.getIndexingSource());
+ if(toInitialise.isEmpty()){ //if no more left to initialise
+ //notify others about it
+ toInitialise.notifyAll();
+ }
+ }
+ //finally remove this listener
+ eventObject.getSource().removeIndexingSourceInitialiserListener(this);
+ }
+ };
+ //now create the IndexingSourceInitialiser that initialise the
+ //Indexing Sources in their own Thread
+ for(IndexingComponent source : indexingComponents){
+ if(source.needsInitialisation()){ //if it need to be initialised
+ toInitialise.add(source); // add it to the list
+ //create an initialiser
+ IndexingSourceInitialiser initialiser = new IndexingSourceInitialiser(source);
+ //add the listener
+ initialiser.addIndexingSourceInitialiserListener(listener);
+ //create and init the Thread
+ Thread thread = new Thread(initialiser);
+ thread.setDaemon(true);
+ thread.start();
+ } //else no initialisation is needed
+ }
+ //now wait until all IndexingSources are initialised!
+ while(!toInitialise.isEmpty()){
+ synchronized (toInitialise) {
+ try {
+ toInitialise.wait();
+ } catch (InterruptedException e) {
+ //year looks like all IndexingSources are initialised!
+ }
+ }
+ }
+ log.info("Initialisation completed");
+ setState(State.INITIALISED);
+ }
+ /* (non-Javadoc)
+ * @see org.apache.stanbol.entityhub.indexing.core.IndexerInterface#index()
+ */
+ public void index(){
+ Set<State> supportedStates = EnumSet.of(
+ State.UNINITIALISED,State.INITIALISED,State.INDEXED,State.FINISHED);
+ //this is only used to inform about wrong usage. It does not ensure
+ //that index is called twice by different threads. This check is done
+ //within the initialise, index and finalise methods!
+ State state = getState();
+ if(!supportedStates.contains(state)){
+ throw new IllegalStateException(String.format(
+ "Calling this Method is not supported while in State %s! Supported States are ",
+ state,supportedStates));
+ }
+ initialiseIndexingSources();
+ //if now the state is an unsupported one it indicates that
+ //initialiseIndexingSources() was called by an other thread before this one!
+ state = getState();
+ if(!supportedStates.contains(state)){
+ throw new IllegalStateException(String.format(
+ "Calling this Method is not supported while in State %s! Supported States are ",
+ state,supportedStates));
+ }
+ log.info("Start Indexing");
+ indexAllEntities();
+ log.info("Indexing completed ...");
+ log.info("start finalisation....");
+ finaliseIndexingTarget();
+ log.info("Indexing finished!");
+ }
+ @Override
+ public void finaliseIndexingTarget() {
+ synchronized (stateSync) { //ensure that two threads do not start the
+ //initialisation at the same time ...
+ State state = getState();
+ if(state.ordinal() < State.INDEXED.ordinal()){
+ throw new IllegalStateException("The Indexer MUST BE already "+State.INDEXED+" when calling this Method!");
+ }
+ if(state != State.INDEXED){ //if state > INITIALISED
+ return; // ignore this call
+ }
+ setState(State.FINALISING);
+ log.info("Indexing started ...");
+ }
+ indexingDestination.finalise();
+ setState(State.FINISHED);
+ }
+ @Override
+ public void indexAllEntities() {
+ synchronized (stateSync) { //ensure that two threads do not start the
+ //initialisation at the same time ...
+ State state = getState();
+ if(state.ordinal() < State.INITIALISED.ordinal()){
+ throw new IllegalStateException("The Indexer MUST BE already "+State.INITIALISED+" when calling this Method!");
+ }
+ if(state != State.INITIALISED){ //if state > INITIALISED
+ return; // ignore this call
+ }
+ setState(State.INDEXING);
+ log.info("Indexing started ...");
+ }
+ //init the queues
+ int queueSize = Math.max(MIN_QUEUE_SIZE, chunkSize*2);
+ indexedEntityQueue = new ArrayBlockingQueue<QueueItem<Representation>>(queueSize);
+ processedEntityQueue = new ArrayBlockingQueue<QueueItem<Representation>>(queueSize);
+ finishedEntityQueue = new ArrayBlockingQueue<QueueItem<Representation>>(queueSize);
+ errorEntityQueue = new ArrayBlockingQueue<QueueItem<IndexingError>>(queueSize);
+
+ //Set holding all active IndexingDaemons
+ final SortedSet<IndexingDaemon<?,?>> activeIndexingDeamons =
+ new TreeSet<IndexingDaemon<?,?>>();
+ //create the IndexingDaemos
+ //TODO: Here we would need to create multiple instances in case
+ // one would e.g. like to use several threads for processing entities
+ //(1) the daemon reading from the IndexingSources
+ String entitySourceReaderName = "Indexing: Entity Source Reader Deamon";
+ if(entityIterator != null){
+ activeIndexingDeamons.add(
+ new EntityIdBasedIndexingDaemon(
+ entitySourceReaderName,
+ indexedEntityQueue, errorEntityQueue,
+ entityIterator,
+ dataProvider,
+ scoreNormaliser,
+ indexAllEntitiesState));
+ } else {
+ activeIndexingDeamons.add(
+ new EntityDataBasedIndexingDaemon(
+ entitySourceReaderName,
+ indexedEntityQueue, errorEntityQueue,
+ dataIterable,
+ scoreProvider,
+ scoreNormaliser,
+ indexAllEntitiesState));
+ }
+ //(2) The daemon for processing the entities
+ activeIndexingDeamons.add(
+ new EntityProcessorRunnable(
+ "Indexing: Entity Processor Deamon",
+ indexedEntityQueue, //it consumes indexed Entities
+ processedEntityQueue, //it produces processed Entities
+ errorEntityQueue,
+ entityProcessor,
+ Collections.singleton(SCORE_FIELD)));
+ //(3) The daemon for persisting the entities
+ activeIndexingDeamons.add(
+ new EntityPersisterRunnable(
+ "Indexing: Entity Perstisting Deamon",
+ processedEntityQueue, //it consumes processed Entities
+ finishedEntityQueue, //it produces finished Entities
+ errorEntityQueue,
+ chunkSize, indexingDestination.getYard()));
+ //(4) The daemon for logging finished entities
+ activeIndexingDeamons.add(
+ new FinishedEntityDaemon(
+ finishedEntityQueue, -1, log));
+ //(5) The daemon for logging errors
+ activeIndexingDeamons.add(
+ new EntityErrorLoggerDaemon(
+ errorEntityQueue, log));
+ //We need an listener for the IndexingDaemons we are about to start!
+ final IndexingDaemonListener listener = new IndexingDaemonListener() {
+ @Override
+ public void indexingDaemonFinished(IndexingDaemonEventObject indexingDaemonEventObject) {
+ //looks like one has finished
+ IndexingDaemon<?,?> indexingDaemon = indexingDaemonEventObject.getSource();
+ //handle the finished indexing daemon
+ handleFinishedIndexingDaemon(activeIndexingDeamons, indexingDaemon);
+ //finally remove the listener
+ indexingDaemon.removeIndexingDaemonListener(this);
+
+ }
+ };
+ //now start the IndexingDaemons in their own Threads
+ Set<IndexingDaemon<?,?>> deamonCopy =
+ new HashSet<IndexingDaemon<?,?>>(activeIndexingDeamons);
+ for(IndexingDaemon<?,?> deamon : deamonCopy){
+ deamon.addIndexingDaemonListener(listener); //add the listener
+ Thread thread = new Thread(deamon);// create the thread
+ thread.setDaemon(true); //ensure that the JVM can terminate
+ thread.setName(deamon.getName()); // set the name of the thread
+ thread.start(); //start the Thread
+ }
+ //now we need to wait until all Threads have finished ...
+ while(!activeIndexingDeamons.isEmpty()){
+ synchronized (activeIndexingDeamons) {
+ try {
+ activeIndexingDeamons.wait();
+ } catch (InterruptedException e) {
+ //year ... looks like we are done
+ }
+ }
+ }
+ //set the new state to INDEXED
+ setState(State.INDEXED);
+ }
+ /**
+ * Handles the necessary actions if an {@link IndexingDaemon} used for the
+ * work done within {@link #indexAllEntities()} completes its work (meaning
+ * it has executed all entities).<p>
+ * The parsed SortedSet is created within {@link #indexAllEntities()} and
+ * contains all {@link IndexingDaemon}s that have not yet finished. It is
+ * the responsibility of this method to remove finished
+ * {@link IndexingDaemon}s from this set.<p>
+ * In addition this Method is responsible to {@link BlockingQueue#put(Object)}
+ * the {@link IndexerConstants#INDEXING_COMPLETED_QUEUE_ITEM} to the
+ * consuming queue of {@link IndexingDaemon}s as soon as all
+ * {@link IndexingDaemon}s of the previous indexing steps have finished.
+ * This is checked comparing the {@link IndexingDaemon#getSequence()} number
+ * of the first entry within the sorted activeIndexingDeamons set.
+ * If the sequence number of the first element has increased after the
+ * finished {@link IndexingDaemon} was removed this Method puts the
+ * {@link IndexerConstants#INDEXING_COMPLETED_QUEUE_ITEM} item to the
+ * consuming queue of the new first entry of activeIndexingDeamons.
+ * @param activeIndexingDeamons The SortedSet containing all
+ * {@link IndexingDaemon}s that are still active.
+ * @param indexingDaemon the {@link IndexingDaemon} that completed its work.
+ */
+ private void handleFinishedIndexingDaemon(final SortedSet<IndexingDaemon<?,?>> activeIndexingDeamons,
+ IndexingDaemon<?,?> indexingDaemon) {
+ log.info("{} completed (sequence={}) ... ",
+ indexingDaemon.getName(), indexingDaemon.getSequence());
+ IndexingDaemon<?,?> sendEndofQueue = null;
+ synchronized (activeIndexingDeamons) {
+ if(log.isDebugEnabled()){
+ log.info(" Active Indexing Deamons:");
+ for(IndexingDaemon<?,?> active : activeIndexingDeamons){
+ log.info(" > {} {}",active.getSequence(),active.getName());
+ }
+ }
+ //get the SequenceNumber of the first Element
+ if(!activeIndexingDeamons.isEmpty()){
+ Integer sequenceNumber = activeIndexingDeamons.first().getSequence();
+ log.info(" > current sequence : {}",sequenceNumber);
+ //try to remove it from the activeDeamons list
+ activeIndexingDeamons.remove(indexingDaemon);
+ if(activeIndexingDeamons.isEmpty()){ //if no active is left
+ log.debug(" - indexingDeamons list now emoty ... notifyAll to indicate indexing has completed!");
+ activeIndexingDeamons.notifyAll(); //notify all others!
+ } else { //check new SequenceNumber
+ IndexingDaemon<?,?> first = activeIndexingDeamons.first();
+ if(sequenceNumber.compareTo(first.getSequence()) < 0){
+ log.info(" > new sequence: {}",first.getSequence());
+ //sequence number increased ->
+ // ... all Daemons for the step have completed
+ // ... send EndOfQueue
+ // ... but outside of the synchronized block!
+ sendEndofQueue = first;
+ }
+ }
+ } //already empty ... nothing todo
+ }
+ if(sendEndofQueue != null){ //send endOfQueue
+ //to the consuming Queue of this one
+ try {
+ //ignore the Type safety because the item is of
+ //INDEXING_COMPLETED_QUEUE_ITEM is anyway null
+ log.info("Send end-of-queue to Deamons with Sequence "+sendEndofQueue.getSequence());
+ sendEndofQueue.getConsumeQueue().put(INDEXING_COMPLETED_QUEUE_ITEM);
+ } catch (InterruptedException e) {
+ log.error("Interupped while sending EnodOfQueue Item to consuming queue of "+sendEndofQueue.getName(),e);
+ }
+ }
+ }
+ /* (non-Javadoc)
+ * @see org.apache.stanbol.entityhub.indexing.core.IndexerInterface#setIndexAllEntitiesState(boolean)
+ */
+ public void setIndexAllEntitiesState(boolean indexAllEntitiesState) {
+ this.indexAllEntitiesState = indexAllEntitiesState;
+ }
+ /* (non-Javadoc)
+ * @see org.apache.stanbol.entityhub.indexing.core.IndexerInterface#isIndexAllEntitiesState()
+ */
+ public boolean isIndexAllEntitiesState() {
+ return indexAllEntitiesState;
+ }
+ /**
+ * Setter for the state. <p>
+ * Implementation Note: This setter is synchronised to the sync object for
+ * the state
+ * @param state the state to set
+ */
+ private void setState(State state) {
+ boolean changed;
+ synchronized (stateSync) {
+ changed = state != this.state;
+ if(changed){
+ this.state = state;
+ }
+ }
+ if(changed){ //do not fire events within synchronized blocks ...
+ fireStateChanged();
+ }
+ }
+ /* (non-Javadoc)
+ * @see org.apache.stanbol.entityhub.indexing.core.IndexerInterface#getState()
+ */
+ public State getState() {
+ return state;
+ }
+
+}
Propchange: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/IndexerImpl.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/IndexingDaemon.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/IndexingDaemon.java?rev=1097740&view=auto
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/IndexingDaemon.java (added)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/IndexingDaemon.java Fri Apr 29 09:20:31 2011
@@ -0,0 +1,226 @@
+/**
+ *
+ */
+package org.apache.stanbol.entityhub.indexing.core.impl;
+
+import static org.apache.stanbol.entityhub.indexing.core.impl.IndexerConstants.ERROR_TIME;
+import static org.apache.stanbol.entityhub.indexing.core.impl.IndexerConstants.INDEXING_COMPLETED_QUEUE_ITEM;
+
+import java.util.Collections;
+import java.util.EventObject;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class IndexingDaemon<CI,PI> implements Comparable<IndexingDaemon<?,?>>,Runnable {
+ protected final Logger log;
+ private boolean queueFinished = false;
+ private final BlockingQueue<QueueItem<CI>> consume;
+ private final BlockingQueue<QueueItem<PI>> produce;
+ private final BlockingQueue<QueueItem<IndexingError>> error;
+ private boolean finisehd = false;
+ private final Set<IndexingDaemonListener> listeners;
+ /**
+ * Typically used to set the name of the {@link Thread} running this Runnable
+ */
+ private final String name;
+ /**
+ * Used for {@link #compareTo(IndexingDaemon)}
+ */
+ private final Integer sequence;
+ protected IndexingDaemon(String name,
+ Integer sequence,
+ BlockingQueue<QueueItem<CI>> consume,
+ BlockingQueue<QueueItem<PI>> produce,
+ BlockingQueue<QueueItem<IndexingError>> error){
+ if(name == null || name.isEmpty()){
+ this.name = getClass().getSimpleName()+" Deamon";
+ } else {
+ this.name = name;
+ }
+ if(sequence == null){
+ this.sequence = Integer.valueOf(0);
+ } else {
+ this.sequence = sequence;
+ }
+ this.consume = consume;
+ this.produce = produce;
+ this.error = error;
+ //get the logger for the actual implementation
+ this.log = LoggerFactory.getLogger(getClass());
+ this.listeners = Collections.synchronizedSet(
+ new HashSet<IndexingDaemonListener>());
+
+ }
+ protected final void sendError(String entityId, String message, Exception ex){
+ if(entityId == null){
+ return;
+ } else {
+ putError(new QueueItem<IndexingError>(
+ new IndexingError(entityId, message, ex)));
+ }
+ }
+ protected final void sendError(String entityId,QueueItem<?> item, String message, Exception ex){
+ if(entityId == null){
+ return;
+ }
+ putError((new QueueItem<IndexingError>(
+ new IndexingError(entityId, message, ex)
+ ,item)));
+ }
+ private void putError(QueueItem<IndexingError> errorItem){
+ if(error == null){
+ log.warn("Unable to process Error because Error Queue is NULL!");
+ }
+ Long errorTime = Long.valueOf(System.currentTimeMillis());
+ errorItem.setProperty(ERROR_TIME, errorTime);
+ try {
+ error.put(errorItem);
+ } catch (InterruptedException e) {
+ log.error("Interupped while sending an Error for Entity "+errorItem.getItem().getEntity());
+ }
+
+ }
+ protected final void produce(QueueItem<PI> item){
+ if(produce == null){
+ log.warn("Unable to produce Items because produce queue is NULL!");
+ }
+ if(item != null){
+ try {
+ produce.put(item);
+ } catch (InterruptedException e) {
+ log.error("Interupped while producing item "+item.getItem(), e);
+ }
+ }
+ }
+ protected final QueueItem<CI> consume(){
+ if(queueFinished){
+ return null;
+ }
+ if(consume == null){
+ log.warn("Unable to consume items because consume queue is NULl!");
+ }
+ try {
+ QueueItem<CI> consumed = consume.take();
+ if(consumed == INDEXING_COMPLETED_QUEUE_ITEM){
+ queueFinished = true;
+ consume.put(consumed); //put it back to the list
+ return null;
+ } else {
+ return consumed;
+ }
+ } catch (InterruptedException e) {
+ log.error("Interupped while consuming -> return null");
+ return null;
+ }
+ }
+ /**
+ * @return the queueFinished
+ */
+ protected final boolean isQueueFinished() {
+ return queueFinished;
+ }
+ /**
+ * Method has to be called by the subclasses to signal that this Runnable
+ * has finished. It will set {@link #finished()} to <code>true</code>
+ */
+ protected final void setFinished(){
+ this.finisehd = true;
+ //tell listener that his one has finished!
+ fireIndexingDaemonEvent();
+
+ }
+ public final boolean finished(){
+ return finisehd;
+ }
+ public final boolean addIndexingDaemonListener(IndexingDaemonListener listener){
+ if(listener != null){
+ return listeners.add(listener);
+ } else {
+ return false;
+ }
+ }
+ public boolean removeIndexingDaemonListener(IndexingDaemonListener listener){
+ if(listener != null){
+ return listeners.remove(listener);
+ } else {
+ return false;
+ }
+ }
+ public void fireIndexingDaemonEvent(){
+ Set<IndexingDaemonListener> copy;
+ synchronized (listeners) {
+ copy = new HashSet<IndexingDaemonListener>(listeners);
+ }
+ IndexingDaemonEventObject eventObject = new IndexingDaemonEventObject(this);
+ for(IndexingDaemonListener listener : copy){
+ listener.indexingDaemonFinished(eventObject);
+ }
+ }
+ /**
+ * Currently only used to notify listener that this Daemon has processed
+ * all entities
+ * TODO: I would like to use generics here, but I was not able to figure out
+ * how to used them in a way, that one can still register an Listener that
+ * uses <code>IndexingDaemonListener<? super CI,? super></code> with
+ * the {@link IndexingDaemon#addIndexingDaemonListener(IndexingDaemonListener)}
+ * and {@link IndexingDaemon#removeIndexingDaemonListener(IndexingDaemonListener)}
+ * methods.
+ * @author Rupert Westenthaler
+ *
+ */
+ public static interface IndexingDaemonListener {
+ void indexingDaemonFinished(IndexingDaemonEventObject indexingDaemonEventObject);
+ }
+
+ public static class IndexingDaemonEventObject extends EventObject {
+ private static final long serialVersionUID = -1L;
+ public IndexingDaemonEventObject(IndexingDaemon<?,?> indexingDaemon){
+ super(indexingDaemon);
+ }
+ @Override
+ public IndexingDaemon<?,?> getSource() {
+ return (IndexingDaemon<?,?>)super.getSource();
+ }
+ }
+ protected final BlockingQueue<QueueItem<CI>> getConsumeQueue() {
+ return consume;
+ }
+ protected final BlockingQueue<QueueItem<PI>> getProduceQueue() {
+ return produce;
+ }
+
+ public String getName() {
+ return this.name;
+ }
+ /**
+ * The order of this Daemon. Guaranteed to be NOT NULL
+ * @return the order
+ */
+ public final Integer getSequence() {
+ return sequence;
+ }
+ @Override
+ public int compareTo(IndexingDaemon<?,?> o) {
+ int compare = sequence.compareTo(o.sequence);
+ if(compare != 0){
+ return compare;
+ } else {
+ //the ordering within the same sequence position is of no importance
+ //but it is important to only return 0 if the two Objects are
+ //equals because we will use this class together with SortedSets!
+ if(hashCode() == o.hashCode()){
+ if(equals(o)){
+ return 0;
+ } else {
+ return -1; //no idea if that is OK
+ }
+ } else {
+ return hashCode()-o.hashCode();
+ }
+ }
+ }
+}
\ No newline at end of file
Propchange: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/IndexingDaemon.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/IndexingError.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/IndexingError.java?rev=1097740&view=auto
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/IndexingError.java (added)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/IndexingError.java Fri Apr 29 09:20:31 2011
@@ -0,0 +1,33 @@
+/**
+ *
+ */
+package org.apache.stanbol.entityhub.indexing.core.impl;
+
+public class IndexingError {
+ private final Exception ex;
+ private final String msg;
+ private final String entityId;
+ public IndexingError(String id,String msg,Exception ex){
+ this.entityId = id;
+ this.msg = msg;
+ this.ex = ex;
+ }
+ /**
+ * @return the ex
+ */
+ public Exception getException() {
+ return ex;
+ }
+ /**
+ * @return the msg
+ */
+ public String getMessage() {
+ return msg;
+ }
+ /**
+ * @return the entityId
+ */
+ public String getEntity() {
+ return entityId;
+ }
+}
\ No newline at end of file
Propchange: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/IndexingError.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/IndexingSourceInitialiser.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/IndexingSourceInitialiser.java?rev=1097740&view=auto
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/IndexingSourceInitialiser.java (added)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/IndexingSourceInitialiser.java Fri Apr 29 09:20:31 2011
@@ -0,0 +1,81 @@
+package org.apache.stanbol.entityhub.indexing.core.impl;
+
+import java.util.Collections;
+import java.util.EventObject;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.stanbol.entityhub.indexing.core.IndexingComponent;
+
+/**
+ * Initialises an {@link IndexingComponent} and {@link IndexerImpl#notifyState()}
+ * if finished
+ * @author Rupert Westenthaler
+ *
+ */
+public class IndexingSourceInitialiser implements Runnable {
+ private final IndexingComponent source;
+ private boolean finished = false;
+ private final Set<IndexingSourceInitialiserListener> listeners;
+ public IndexingSourceInitialiser(IndexingComponent source){
+ this.source = source;
+ this.listeners = Collections.synchronizedSet(
+ new HashSet<IndexingSourceInitialiserListener>());
+ }
+ @Override
+ public final void run() {
+ source.initialise();
+ finished = true;
+ fireInitialisationCompletedEvent();
+ }
+ public boolean finished(){
+ return finished;
+ }
+ public boolean addIndexingSourceInitialiserListener(IndexingSourceInitialiserListener listener){
+ if(listener != null){
+ return listeners.add(listener);
+ } else {
+ return false;
+ }
+ }
+ public boolean removeIndexingSourceInitialiserListener(IndexingSourceInitialiserListener listener){
+ if(listener != null){
+ return listeners.remove(listener);
+ } else {
+ return false;
+ }
+ }
+ protected void fireInitialisationCompletedEvent(){
+ Set<IndexingSourceInitialiserListener> copy;
+ synchronized (listeners) {
+ copy = new HashSet<IndexingSourceInitialiserListener>(listeners);
+ }
+ IndexingSourceEventObject eventObject = new IndexingSourceEventObject(this,this.source);
+ for(IndexingSourceInitialiserListener listener : copy){;
+ listener.indexingSourceInitialised(eventObject);
+ }
+ }
+
+ public static interface IndexingSourceInitialiserListener {
+ void indexingSourceInitialised(IndexingSourceEventObject eventObject);
+ }
+ public static class IndexingSourceEventObject extends EventObject{
+ private static final long serialVersionUID = -1L;
+ private final IndexingComponent indexingSource;
+ protected IndexingSourceEventObject(IndexingSourceInitialiser initialiser,IndexingComponent source){
+ super(source);
+ indexingSource = source;
+ }
+ @Override
+ public IndexingSourceInitialiser getSource() {
+ return (IndexingSourceInitialiser) super.getSource();
+ }
+ /**
+ * @return the indexingSource
+ */
+ public IndexingComponent getIndexingSource() {
+ return indexingSource;
+ }
+ }
+
+}
\ No newline at end of file
Propchange: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/IndexingSourceInitialiser.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/QueueItem.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/QueueItem.java?rev=1097740&view=auto
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/QueueItem.java (added)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/QueueItem.java Fri Apr 29 09:20:31 2011
@@ -0,0 +1,87 @@
+package org.apache.stanbol.entityhub.indexing.core.impl;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+
+/**
+ * Item internally used for Queues. It holds an item and can store
+ * additional properties. The Item can be only set while constructing an
+ * instance. The properties can be changed at any time.<p>
+ * Notes:<ul>
+ * <li> {@link #hashCode()} uses the hashCode of the item and
+ * {@link #equals(Object)} also checks if the item is equal to the other
+ * item. The properties are not used!
+ * <li>This Class is not synchronised.
+ * </ul>
+ * @author Rupert Westenthaler
+ *
+ * @param <T>
+ */
+public class QueueItem<T> {
+ private Map<String,Object> properties;
+ private final T item;
+ /**
+ * Creates a new QueueItem
+ * @param item the payload
+ */
+ public QueueItem(T item){
+ this.item = item;
+ }
+ /**
+ * Creates a QueueItem and copies the properties of an other one.
+ * NOTE that components with an reference to the other QueueItem will be
+ * able to change the properties of the new one. Use <br>
+ * <pre><code>
+ * QueueItem<String> item = new QueueItem<String>("demo");
+ * for(String key : other.getProperties()){
+ * item.setProperties(key,other.getProperty(key));
+ * }
+ * </code></pre><br>
+ * if you need to ensure that the properties within the QueueItem are not
+ * shared with others.
+ * @param item the payload
+ * @param properties the properties
+ */
+ public QueueItem(T item,QueueItem<?> other){
+ this(item);
+ if(other != null){
+ this.properties = other.properties;
+ }
+ }
+ public T getItem(){
+ return item;
+ }
+ public void setProperty(String key,Object value){
+ if(properties == null){
+ properties = new HashMap<String,Object>();
+ }
+ properties.put(key, value);
+ }
+ public Object getProperty(String key){
+ return properties != null ?
+ properties.get(key): null;
+ }
+ public Object removeProperty(String key){
+ return properties != null ?
+ properties.remove(key): null;
+ }
+ public Set<String> properties(){
+ if(properties != null){
+ return Collections.unmodifiableSet(properties.keySet());
+ } else {
+ return Collections.emptySet();
+ }
+ }
+ @Override
+ public int hashCode() {
+ return item.hashCode();
+ }
+ @Override
+ public boolean equals(Object other) {
+ return (other instanceof QueueItem<?>) &&
+ item.equals(((QueueItem<?>)other).item);
+ }
+}
\ No newline at end of file
Propchange: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/QueueItem.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/normaliser/DefaultNormaliser.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/normaliser/DefaultNormaliser.java?rev=1097740&view=auto
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/normaliser/DefaultNormaliser.java (added)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/normaliser/DefaultNormaliser.java Fri Apr 29 09:20:31 2011
@@ -0,0 +1,38 @@
+package org.apache.stanbol.entityhub.indexing.core.normaliser;
+
+import java.util.Map;
+
+
+/**
+ * This default implementation returns the parsed value. Intended to be used
+ * in cases where parsing <code>null</code> as {@link ScoreNormaliser} is not
+ * supported for some reason.
+ * @author Rupert Westenthaler
+ */
+public class DefaultNormaliser implements ScoreNormaliser{
+
+
+ private ScoreNormaliser normaliser;
+
+ @Override
+ public Float normalise(Float score) {
+ if(normaliser != null){
+ score = normaliser.normalise(score);
+ }
+ return score;
+ }
+
+ @Override
+ public void setConfiguration(Map<String,Object> config) {
+ Object value = config.get(CHAINED_SCORE_NORMALISER);
+ if(value != null){
+ this.normaliser = (ScoreNormaliser) value;
+ }
+ }
+
+ @Override
+ public ScoreNormaliser getChained() {
+ return normaliser;
+ }
+
+}
Propchange: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/normaliser/DefaultNormaliser.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/normaliser/MinScoreNormalizer.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/normaliser/MinScoreNormalizer.java?rev=1097740&view=auto
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/normaliser/MinScoreNormalizer.java (added)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/normaliser/MinScoreNormalizer.java Fri Apr 29 09:20:31 2011
@@ -0,0 +1,92 @@
+package org.apache.stanbol.entityhub.indexing.core.normaliser;
+
+import java.util.Map;
+
+
+/**
+ * This normaliser will return -1 for scores lower than the minimum parsed to the
+ * constructor. Because of Entities with score <0 are typically not indexed
+ * this can be used to filter Entities based on there score.<p>
+ * This normaliser also supports forwarding the score to an other {@link ScoreNormaliser}.
+ * The filtering is calculated based on the results of this normaliser. To
+ * perform the minimum score on the original scores one should not parse an
+ * {@link ScoreNormaliser} in the constructor
+ * @author Rupert Westenthaler
+ *
+ */
+public class MinScoreNormalizer implements ScoreNormaliser {
+
+ public static final String KEY_INCLUSIVE = "inclusive";
+ public static final String KEY_MIN_SCORE = "min-score";
+
+ private Float minScore;
+ private ScoreNormaliser normaliser;
+ private boolean inclusive;
+ public MinScoreNormalizer(){
+ this(0,true,null);
+ }
+ /**
+ * Constructs a normaliser that returns -1 for scores lower (if inclusive is
+ * <code>false</code> lower equals) to the minimum required score. In case
+ * an other normaliser is parsed than scores parsed to {@link #normalise(float)}
+ * are first processed by this normaliser
+ * @param minimumRequiredScore the minimum required score. MUST BE > 0
+ * @param inclusive if scores equals to the required minimum are accepted
+ * @param normaliser the normaliser used to process parsed scores or
+ * <code>null</code> to use none.
+ */
+ public MinScoreNormalizer(float minimumRequiredScore, boolean inclusive,ScoreNormaliser normaliser) {
+ if(minimumRequiredScore < 0){
+ throw new IllegalArgumentException("The parsed minimum required score MUST BE >= 0");
+ }
+ this.inclusive = inclusive;
+ this.minScore = minimumRequiredScore;
+ this.normaliser = normaliser;
+ }
+
+ @Override
+ public void setConfiguration(Map<String,Object> config) {
+ Object value = config.get(KEY_INCLUSIVE);
+ if(value != null){
+ inclusive = Boolean.parseBoolean(value.toString());
+ } //else default true
+ value = config.get(KEY_MIN_SCORE);
+ if(value != null){
+ minScore = Float.valueOf(value.toString());
+ if(minScore.floatValue() <= 0){
+ throw new IllegalArgumentException("The parsed minScore value '"+value+"'MUST BE greater than 0");
+ }
+ } //else default null
+ value = config.get(CHAINED_SCORE_NORMALISER);
+ if(value != null){
+ normaliser = (ScoreNormaliser)value;
+ }
+ }
+ /**
+ * Constructs an normaliser that returns -1 for all scores lower than the
+ * minimum required score
+ * @param minimumRequiredScore the minimum required score. MUST BE > 0
+ */
+ public MinScoreNormalizer(float minimumRequiredScore){
+ this(minimumRequiredScore,true, null);
+ }
+
+ @Override
+ public Float normalise(Float score) {
+ if(normaliser != null){
+ score = normaliser.normalise(score);
+ }
+ if(score == null || score.compareTo(ZERO) < 0){
+ return score;
+ }
+ int compare = score.compareTo(minScore);
+ return (inclusive && compare < 0) || //score == minScore is OK
+ (!inclusive && compare <= 0)? //score == minScore is not OK
+ MINUS_ONE:score;
+ }
+ @Override
+ public ScoreNormaliser getChained() {
+ return normaliser;
+ }
+
+}
Propchange: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/normaliser/MinScoreNormalizer.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/normaliser/NaturalLogNormaliser.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/normaliser/NaturalLogNormaliser.java?rev=1097740&view=auto
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/normaliser/NaturalLogNormaliser.java (added)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/normaliser/NaturalLogNormaliser.java Fri Apr 29 09:20:31 2011
@@ -0,0 +1,37 @@
+package org.apache.stanbol.entityhub.indexing.core.normaliser;
+
+import java.util.Map;
+
+
+/**
+ * Uses {@link Math#log1p(double)} to normalise parsed scores.
+ * @author Rupert Westenthaler
+ */
+public class NaturalLogNormaliser implements ScoreNormaliser {
+
+ private ScoreNormaliser normaliser;
+ @Override
+ public Float normalise(Float score) {
+ if(normaliser != null){
+ score = normaliser.normalise(score);
+ }
+ if(score == null || score.compareTo(ZERO) < 0){
+ return score;
+ } else {
+ return Float.valueOf((float)Math.log1p(score.doubleValue()));
+ }
+ }
+
+ @Override
+ public void setConfiguration(Map<String,Object> config) {
+ Object value = config.get(CHAINED_SCORE_NORMALISER);
+ if(value != null){
+ this.normaliser = (ScoreNormaliser) value;
+ }
+ }
+ @Override
+ public ScoreNormaliser getChained() {
+ return normaliser;
+ }
+
+}
Propchange: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/normaliser/NaturalLogNormaliser.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/normaliser/RangeNormaliser.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/normaliser/RangeNormaliser.java?rev=1097740&view=auto
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/normaliser/RangeNormaliser.java (added)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/normaliser/RangeNormaliser.java Fri Apr 29 09:20:31 2011
@@ -0,0 +1,115 @@
+package org.apache.stanbol.entityhub.indexing.core.normaliser;
+
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Normalises scores to the range [0..max]. it supports forwarding the parsed
+ * scores to an other {@link ScoreNormaliser}.
+ * @author Rupert Westenthaler
+ *
+ */
+public class RangeNormaliser implements ScoreNormaliser {
+
+ public static final String KEY_UPPER_BOUND = "upper-bound";
+ public static final String KEY_MAX_EXPECTED_SCORE = "max-expected-score";
+
+ private static final Logger log = LoggerFactory.getLogger(RangeNormaliser.class);
+
+ private double upperBound;
+ private ScoreNormaliser normaliser;
+ private double maxScore = -1;
+ /**
+ * Normalises values parsed to {@link #normalise(float)} to [0..1] assuming
+ * that the first call to {@link #normalise(float)} will parsed the higest
+ * value.
+ */
+ public RangeNormaliser(){
+ this(null,null,null);
+ }
+ /**
+ * Uses the parsed {@link ScoreNormaliser} and further normalises results to
+ * [0..1] assuming that the first call to {@link #normalise(float)} will
+ * parsed the highest value.
+ * @param normaliser the normaliser used to normalise parsed scores before
+ * putting them to the range [0..1]
+ */
+ public RangeNormaliser(ScoreNormaliser normaliser){
+ this(normaliser,null,null);
+ }
+ /**
+ * Constructs an RangeNormalizer that forwards to the parsed normaliser but
+ * keeps results within the range [0..{upperBound}] based on the provided
+ * {maxScore} expected to be parsed to {@link #normalise(float)};
+ * @param normaliser The normaliser called to process scores parsed to
+ * {@link #normalise(float)}. If <code>null</code> than parsed scores are
+ * only normalised to the range [0..{upperBound}]
+ * @param upperBound the upper bound for the range. If <code>null</code> is
+ * parsed the range [0..1] will be used.
+ * @param maxExpectedScore the maximum expected score. If this value is <
+ * 0 or <code>null</code> than the first score parsed to
+ * {@link #normalise(float)} will be used instead. This feature is useful if
+ * entities are already sorted by there score.
+ */
+ public RangeNormaliser(ScoreNormaliser normaliser,Float upperBound,Float maxExpectedScore) {
+ if(normaliser == null){
+ this.normaliser = new DefaultNormaliser();
+ } else {
+ this.normaliser = normaliser;
+ }
+ if(upperBound == null){
+ this.upperBound = 1;
+ } else if(upperBound > 0){
+ this.upperBound = upperBound;
+ } else {
+ throw new IllegalArgumentException("The parsed upperBound MUST NOT be <= 0. Parse NULL (to use the default) or values > 0!");
+ }
+ if(maxExpectedScore != null && maxExpectedScore > 0){
+ normalise(maxExpectedScore); //call normalise for initialisation of maxScore
+ } else if(maxExpectedScore != null){
+ throw new IllegalArgumentException("The parsed maxExpectedScore MUST NOT be <= 0. Parse NULL (to use the first value parsed to normalise(..)) or values > 0!");
+ } //else maxExpectedScore == null -> will use the first call to init maxExpectedScore!
+ }
+
+ @Override
+ public Float normalise(Float parsed) {
+ parsed = normaliser.normalise(parsed);
+ if(parsed == null || parsed.compareTo(ZERO) < 0){
+ return parsed;
+ }
+ double score = parsed.doubleValue();
+ if(maxScore<0){ //set based on the first call
+ maxScore = score;
+ } else if(score > maxScore){
+ //print a warning if the first call does not parse the higest score
+ log.warn("Found higer Score than of the first parsed value. This will cause all scores to exeed the range [0..1]");
+ }
+ return Float.valueOf((float)(upperBound*(score/maxScore)));
+ }
+ @Override
+ public void setConfiguration(Map<String,Object> config) {
+ Object value = config.get(CHAINED_SCORE_NORMALISER);
+ if(value != null){
+ normaliser = (ScoreNormaliser)value;
+ }
+ value = config.get(KEY_UPPER_BOUND);
+ if(value != null) {
+ upperBound = Double.valueOf(value.toString());
+ if(upperBound <= 0){
+ throw new IllegalArgumentException("The upper bound '"+upperBound+"' MUST BE > zero!");
+ }
+ } //else [0..1]
+ value = config.get(KEY_MAX_EXPECTED_SCORE);
+ if(value != null){
+ Float maxExpected = Float.valueOf(value.toString());
+ normalise(maxExpected);
+ } // else none
+ }
+ @Override
+ public ScoreNormaliser getChained() {
+ return normaliser;
+ }
+
+}
Propchange: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/normaliser/RangeNormaliser.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/normaliser/ScoreNormaliser.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/normaliser/ScoreNormaliser.java?rev=1097740&view=auto
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/normaliser/ScoreNormaliser.java (added)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/normaliser/ScoreNormaliser.java Fri Apr 29 09:20:31 2011
@@ -0,0 +1,47 @@
+package org.apache.stanbol.entityhub.indexing.core.normaliser;
+
+import java.util.Map;
+
+/**
+ * This Interface provides the possibility to process score values provided for
+ * Entities (e.g. to calculate the pageRank based on the number of incomming links
+ * @author Rupert Westenthaler
+ *
+ */
+public interface ScoreNormaliser {
+ /**
+ * Key used to configure an other {@link ScoreNormaliser} instance that
+ * should be called before this instance processes the score. Values
+ * MUST BE of type {@link ScoreNormaliser}.
+ */
+ String CHAINED_SCORE_NORMALISER = "chained";
+
+ /**
+ * -1 will be used a lot with implementations of this interface
+ */
+ public static final Float MINUS_ONE = Float.valueOf(-1f);
+ /**
+ * 0 will be used a lot with implementations of this interface
+ */
+ public static final Float ZERO = Float.valueOf(0f);
+
+ void setConfiguration(Map<String,Object> config);
+ /**
+ * Normalises the parsed score value based on some algorithm.
+ * @param score The score to normalise. <code>null</code> and values < 0
+ * MUST be ignored and returned as parsed.
+ * @return <code>null</code> if no score can be calculated based on the
+ * parsed value (especially if <code>null</code> is parsed as score).
+ * Otherwise the normalized score where values <0 indicate that the
+ * entity should not be indexed.
+ */
+ Float normalise(Float score);
+
+ /**
+ * Returns the {@link ScoreNormaliser} instance that is chained to this one.
+ * Parsed scores are first parsed to chained instances before they are
+ * processed by current one.
+ * @return the chained instance or <code>null</code> if none
+ */
+ ScoreNormaliser getChained();
+}
Propchange: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/normaliser/ScoreNormaliser.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/processor/EmptyProcessor.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/processor/EmptyProcessor.java?rev=1097740&view=auto
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/processor/EmptyProcessor.java (added)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/processor/EmptyProcessor.java Fri Apr 29 09:20:31 2011
@@ -0,0 +1,41 @@
+package org.apache.stanbol.entityhub.indexing.core.processor;
+
+import java.util.Map;
+
+import org.apache.stanbol.entityhub.indexing.core.EntityProcessor;
+import org.apache.stanbol.entityhub.servicesapi.model.Representation;
+
+/**
+ * Returns the parsed Representation. Intended to be used in cases where a
+ * <code>null</code> value is not allowed for the {@link EntityProcessor}.
+ * @author Rupert Westenthaler
+ *
+ */
+public class EmptyProcessor implements EntityProcessor{
+
+ @Override
+ public Representation process(Representation source) {
+ return source;
+ }
+
+ @Override
+ public void close() {
+ //nothing to do
+ }
+
+ @Override
+ public void initialise() {
+ //nothing to do
+ }
+
+ @Override
+ public boolean needsInitialisation() {
+ return false;
+ }
+
+ @Override
+ public void setConfiguration(Map<String,Object> config) {
+ //no configuration supported
+ }
+
+}
Propchange: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/processor/EmptyProcessor.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/processor/FiledMapperProcessor.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/processor/FiledMapperProcessor.java?rev=1097740&view=auto
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/processor/FiledMapperProcessor.java (added)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/processor/FiledMapperProcessor.java Fri Apr 29 09:20:31 2011
@@ -0,0 +1,153 @@
+package org.apache.stanbol.entityhub.indexing.core.processor;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.LineIterator;
+import org.apache.stanbol.entityhub.core.mapping.DefaultFieldMapperImpl;
+import org.apache.stanbol.entityhub.core.mapping.FieldMappingUtils;
+import org.apache.stanbol.entityhub.core.mapping.ValueConverterFactory;
+import org.apache.stanbol.entityhub.core.model.InMemoryValueFactory;
+import org.apache.stanbol.entityhub.core.site.CacheUtils;
+import org.apache.stanbol.entityhub.indexing.core.EntityProcessor;
+import org.apache.stanbol.entityhub.indexing.core.config.IndexingConfig;
+import org.apache.stanbol.entityhub.servicesapi.mapping.FieldMapper;
+import org.apache.stanbol.entityhub.servicesapi.mapping.FieldMapping;
+import org.apache.stanbol.entityhub.servicesapi.model.Representation;
+import org.apache.stanbol.entityhub.servicesapi.model.ValueFactory;
+
+public class FiledMapperProcessor implements EntityProcessor{
+ public static final String PARAM_MAPPINGS = "mappings";
+ public static final String PARAM_VALUE_FACTORY = "valueFactory";
+ public static final String DEFAULT_MAPPINGS_FILE_NAME = "fieldMappings.txt";
+ private FieldMapper mapper;
+ private ValueFactory vf;
+ /**
+ * This Constructor relays on that {@link #setConfiguration(Map)} is called
+ * afterwards!
+ */
+ public FiledMapperProcessor(){
+ this(null);
+ }
+ /**
+ * Internally used to initialise the {@link ValueFactory}
+ * @param vf the value factory or <code>null</code> to use the {@link InMemoryValueFactory}.
+ */
+ private FiledMapperProcessor(ValueFactory vf){
+ setValueFactory(vf);
+ }
+ public FiledMapperProcessor(FieldMapper mapper, ValueFactory vf){
+ this(vf);
+ if(mapper == null){
+ throw new IllegalArgumentException("The parsed FieldMapper MUST NOT be NULL!");
+ }
+ }
+ public FiledMapperProcessor(Iterator<String> mappings,ValueFactory vf){
+ this(vf);
+ if(mappings == null){
+ throw new IllegalArgumentException("The parsed field mappings MUST NOT be NULL!");
+ }
+ mapper = FieldMappingUtils.createDefaultFieldMapper(mappings);
+ if(mapper.getMappings().isEmpty()){
+ throw new IllegalStateException("The parsed field mappings MUST contain at least a single valid mapping!");
+ }
+ }
+ public FiledMapperProcessor(InputStream mappings, ValueFactory vf) throws IOException{
+ this(vf);
+ if(mappings == null){
+ throw new IllegalArgumentException("The parsed field mappings MUST NOT be NULL!");
+ }
+ this.mapper = createMapperFormStream(mappings);
+ }
+ @Override
+ public Representation process(Representation source) {
+ if(mapper == null){
+ throw new IllegalStateException("The mapper is not initialised. One must call setConfiguration to configure the FieldMapper!");
+ }
+ if(source == null){
+ return null;
+ } else {
+ return mapper.applyMappings(source,
+ vf.createRepresentation(source.getId()),
+ vf);
+ }
+ }
+ /**
+ * used by the different constructors to init the {@link ValueFactory}
+ * @param vf the value factory or <code>null</code> to use the default
+ */
+ private void setValueFactory(ValueFactory vf) {
+ if(vf == null){
+ this.vf = InMemoryValueFactory.getInstance();
+ } else {
+ this.vf = vf;
+ }
+ }
+ @Override
+ public void close() {
+ //nothing todo
+
+ }
+ @Override
+ public void initialise() {
+ //nothing todo
+ }
+ @Override
+ public boolean needsInitialisation() {
+ return false;
+ }
+ @Override
+ public void setConfiguration(Map<String,Object> config) {
+ IndexingConfig indexingConfig = (IndexingConfig)config.get(IndexingConfig.KEY_INDEXING_CONFIG);
+ Object value = config.get(PARAM_MAPPINGS);
+ if(value == null || value.toString().isEmpty()){
+ //use the mappings configured for the Index
+ this.mapper = FieldMappingUtils.createDefaultFieldMapper(
+ indexingConfig.getIndexFieldConfiguration());
+ } else {
+ //load (other) mappings based on the provided mappings parameter
+ final File file = new File(indexingConfig.getConfigFolder(),value.toString());
+ try {
+ this.mapper = createMapperFormStream(new FileInputStream(file));
+ } catch (FileNotFoundException e) {
+ throw new IllegalArgumentException("FieldMapping file "+value+" not found in configuration directory "+indexingConfig.getConfigFolder());
+ } catch (IOException e) {
+ throw new IllegalArgumentException("Unable to access FieldMapping file "+value+" not found in configuration directory "+indexingConfig.getConfigFolder());
+ }
+ }
+ //TODO: get the valueFactory form the config (currently an InMemory is
+ //create by the default constructor!
+ }
+ /**
+ * Utility that allows to create a FieldMapper form an inputStream.
+ * It uses {@link IOUtils#lineIterator(InputStream, String)} and parses it
+ * to {@link FieldMappingUtils#createDefaultFieldMapper(Iterator)}
+ * @param in the stream to read the mappings from
+ * @throws IOException on any error while reading the data from the stream
+ */
+ private static FieldMapper createMapperFormStream(final InputStream in) throws IOException {
+ return FieldMappingUtils.createDefaultFieldMapper(new Iterator<String>() {
+ LineIterator it = IOUtils.lineIterator(in, "UTF-8");
+ @Override
+ public boolean hasNext() {
+ return it.hasNext();
+ }
+ @Override
+ public String next() {
+ return it.nextLine();
+ }
+ @Override
+ public void remove() {
+ it.remove();
+ }
+ });
+ }
+
+}
Propchange: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/processor/FiledMapperProcessor.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/source/EntityFieldScoreProvider.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/source/EntityFieldScoreProvider.java?rev=1097740&view=auto
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/source/EntityFieldScoreProvider.java (added)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/source/EntityFieldScoreProvider.java Fri Apr 29 09:20:31 2011
@@ -0,0 +1,90 @@
+package org.apache.stanbol.entityhub.indexing.core.source;
+
+import java.util.Map;
+
+import org.apache.stanbol.entityhub.indexing.core.EntityScoreProvider;
+import org.apache.stanbol.entityhub.servicesapi.model.Representation;
+import org.apache.stanbol.entityhub.servicesapi.model.rdf.RdfResourceEnum;
+
+public class EntityFieldScoreProvider implements EntityScoreProvider {
+
+ public static final String PRAM_FIELD_NAME = "field";
+
+ public static final String DEFAULT_FIELD_NAME = RdfResourceEnum.signRank.getUri();
+
+ private String fieldName;
+
+ /**
+ * Creates an instance that uses the field as specified by
+ * {@link RdfResourceEnum#signRank} to retrieve the score for an entity
+ */
+ public EntityFieldScoreProvider(){
+ this(null);
+ }
+ /**
+ * Creates an instance that uses the parsed field to retrieve the score for
+ * an entity or {@link RdfResourceEnum#signRank} in case <code>null</code>
+ * is parsed.
+ * @param fieldName the field used to retrieve the score from parsed
+ * {@link Representation}s or <code>null</code> to use the
+ * {@link RdfResourceEnum#signRank} field.
+ */
+ public EntityFieldScoreProvider(String fieldName){
+ if(fieldName == null){
+ this.fieldName = DEFAULT_FIELD_NAME;
+ }
+ }
+
+ @Override
+ public boolean needsData() {
+ return true;
+ }
+
+ @Override
+ public Float process(String id) throws UnsupportedOperationException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Float process(Representation entity) throws UnsupportedOperationException {
+ Object value = entity.getFirst(fieldName);
+ if(value instanceof Float){
+ return (Float)value;
+ } else if(value instanceof Number) {
+ return Float.valueOf(((Number)value).floatValue());
+ } else {
+ if(value != null){
+ try {
+ return Float.valueOf(value.toString());
+ } catch(NumberFormatException e){
+ return null;
+ }
+ } else {
+ return null;
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ //nothing to do
+ }
+
+ @Override
+ public void initialise() {
+ //no initialisation needed
+ }
+
+ @Override
+ public boolean needsInitialisation() {
+ return false;
+ }
+
+ @Override
+ public void setConfiguration(Map<String,Object> config) {
+ Object value = config.get(PRAM_FIELD_NAME);
+ if(value != null && !value.toString().isEmpty()){
+ fieldName = value.toString();
+ }
+ }
+}
Propchange: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/source/EntityFieldScoreProvider.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/source/EntityIneratorToScoreProviderAdapter.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/source/EntityIneratorToScoreProviderAdapter.java?rev=1097740&view=auto
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/source/EntityIneratorToScoreProviderAdapter.java (added)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/source/EntityIneratorToScoreProviderAdapter.java Fri Apr 29 09:20:31 2011
@@ -0,0 +1,88 @@
+package org.apache.stanbol.entityhub.indexing.core.source;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.stanbol.entityhub.indexing.core.EntityIterator;
+import org.apache.stanbol.entityhub.indexing.core.EntityScoreProvider;
+import org.apache.stanbol.entityhub.indexing.core.EntityIterator.EntityScore;
+import org.apache.stanbol.entityhub.indexing.core.config.IndexingConfig;
+import org.apache.stanbol.entityhub.servicesapi.model.Representation;
+
+/**
+ * Simple Adapter between an {@link EntityIterator} and the {@link EntityScoreProvider}
+ * interface that iterates over all entities provided by the {@link EntityIterator}
+ * and uses this information to initialise an {@link EntityScoreProvider}.<p>
+ *
+ * @author Rupert Westenthaler
+ *
+ */
+public class EntityIneratorToScoreProviderAdapter implements EntityScoreProvider {
+
+ private EntityIterator entityIterator;
+ private EntityScoreProvider provider;
+ public EntityIneratorToScoreProviderAdapter(){
+ this(null);
+ }
+ public EntityIneratorToScoreProviderAdapter(EntityIterator entityIterator){
+ if(entityIterator == null){
+ throw new IllegalArgumentException("The EntityIterator MUST NOT be NULL!");
+ }
+ this.entityIterator = entityIterator;
+ }
+
+ @Override
+ public boolean needsData() {
+ return provider.needsData();
+ }
+
+ @Override
+ public Float process(String id) throws UnsupportedOperationException {
+ return provider.process(id);
+ }
+
+ @Override
+ public Float process(Representation entity) throws UnsupportedOperationException {
+ return provider.process(entity);
+ }
+
+ @Override
+ public boolean needsInitialisation() {
+ return true;
+ }
+ @Override
+ public void initialise() {
+ //initialise the source entity iterator
+ if(entityIterator.needsInitialisation()){
+ entityIterator.initialise();
+ }
+ //initialise this instace
+ Map<String,Float> entityScoreMap = new HashMap<String,Float>();
+ while(entityIterator.hasNext()){
+ EntityScore entityScore = entityIterator.next();
+ entityScoreMap.put(entityScore.id, entityScore.score);
+ }
+ //close the source because it is no longer needed!
+ entityIterator.close();
+ provider = new MapEntityScoreProvider(entityScoreMap);
+ //initialise the wrapped score provider
+ if(provider.needsInitialisation()){
+ provider.initialise();
+ }
+ }
+ @Override
+ public void close() {
+ provider.close();
+ }
+ @Override
+ public void setConfiguration(Map<String,Object> config) {
+ //the IndexingConfig is available via the IndexingConfig.KEY_INDEXING_CONFIG key!
+ IndexingConfig indexingConfig = (IndexingConfig)config.get(IndexingConfig.KEY_INDEXING_CONFIG);
+ //configure first the EntityIterator to adapt
+ entityIterator = indexingConfig.getEntityIdIterator();
+ if(entityIterator == null){
+ throw new IllegalArgumentException("No EntityIterator available via the indexing configuration "+indexingConfig.getName());
+ }
+ }
+
+}
Propchange: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/source/EntityIneratorToScoreProviderAdapter.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/source/LineBasedEntityIterator.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/source/LineBasedEntityIterator.java?rev=1097740&view=auto
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/source/LineBasedEntityIterator.java (added)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/source/LineBasedEntityIterator.java Fri Apr 29 09:20:31 2011
@@ -0,0 +1,225 @@
+package org.apache.stanbol.entityhub.indexing.core.source;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+import java.util.Arrays;
+import java.util.Map;
+
+import org.apache.stanbol.entityhub.indexing.core.EntityIterator;
+import org.apache.stanbol.entityhub.indexing.core.config.IndexingConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+/**
+ * Implementation of the {@link EntityIterator} based on reading data line wise
+ * from an {@link InputStream}
+ * @author Rupert Westenthaler
+ *
+ */
+public class LineBasedEntityIterator implements EntityIterator {
+
+ private static final Logger log = LoggerFactory.getLogger(LineBasedEntityIterator.class);
+ /**
+ * The default separator to split the entity id with the score "\t" (tab)
+ */
+ public static final String DEFAULT_SEPARATOR = "\t";
+ /**
+ * The default encoding used to read the data from the parsed {@link InputStream}
+ * (UTF-8)
+ */
+ public static final String DEFAULT_ENCODING = "UTF-8";
+ /**
+ * The default if Entity ids should be {@link URLEncoder URLEncoded} (false)
+ */
+ public static final boolean DEFAULT_ENCODE_ENTITY_IDS = false;
+ /**
+ * Parameter used to configure the name of the source file within the
+ * {@link IndexingConfig#getSourceFolder()}
+ */
+ public static final String PARAM_ENTITY_SCORE_FILE = "source";
+ /**
+ * The default name used for the {@link #PARAM_ENTITY_SCORE_FILE}
+ */
+ public static final String DEFAULT_ENTITY_SCORE_FILE = "entityScores.tsv";
+ /**
+ * Parameter used to configure if the Entity IDs should be {@link URLEncoder
+ * URL encoded} (the default is {@value #DEFAULT_ENCODE_ENTITY_IDS})
+ */
+ public static final String PARAM_URL_ENCODE_ENTITY_IDS = "encodeIds";
+ /**
+ * Parameter used to configure the text encoding used by the source file
+ * (the default is {@value #DEFAULT_ENCODING})
+ */
+ public static final String PARAM_CHARSET = "charset";
+
+ private BufferedReader reader;
+ private String separator;
+ private String charset;
+ private boolean encodeEntityIds;
+ private long lineCounter = 0;
+ private String nextLine;
+
+ /**
+ * Default constructor relaying on {@link #setConfiguration(Map)} is used
+ * to provide the configuration
+ */
+ public LineBasedEntityIterator(){
+ this(null);
+ }
+ /**
+ * Constructs an EntityScoreIterator that reads {@link EntityScore}s based
+ * on lines provided by the parsed InputStream. <p> Separator, Charset and
+ * encoding of Entity ids are initialised based on the default values.
+ * @param is the InputStream to read the data from
+ * @throws IOException On any error while initialising the {@link BufferedReader}
+ * based on the parsed {@link InputStream}
+ */
+ public LineBasedEntityIterator(InputStream is) {
+ this(is,null,null,null);
+ }
+ /**
+ * Constructs an EntityScoreIterator based on the parsed parameters. The
+ * default values are used if <code>null</code> is parsed for any parameter
+ * other than the InputStream.
+ * @param is the InputStream to read the data from
+ * @param charset
+ * @param separator
+ * @param encodeIds
+ * @throws IOException On any error while initialising the {@link BufferedReader}
+ * based on the parsed {@link InputStream}
+ * @throws IllegalArgumentException if <code>null</code> is parsed as InputStream
+ */
+ public LineBasedEntityIterator(InputStream is,String charset,String separator,Boolean encodeIds) throws IllegalArgumentException {
+ if(charset == null){
+ this.charset = DEFAULT_ENCODING;
+ } else {
+ this.charset = charset;
+ }
+ if(separator == null){
+ this.separator = DEFAULT_SEPARATOR;
+ } else {
+ this.separator = separator;
+ }
+ if(encodeIds == null){
+ encodeEntityIds = DEFAULT_ENCODE_ENTITY_IDS;
+ } else {
+ encodeEntityIds = encodeIds;
+ }
+ if(is != null){
+ initReader(is);
+ }
+ }
+ @Override
+ public void setConfiguration(Map<String,Object> config) {
+ IndexingConfig indexingConfig = (IndexingConfig)config.get(IndexingConfig.KEY_INDEXING_CONFIG);
+ File score;
+ Object value = config.get(PARAM_CHARSET);
+ if(value != null && value.toString() != null){
+ this.charset = value.toString();
+ }
+ value = config.get(PARAM_URL_ENCODE_ENTITY_IDS);
+ if(value != null){
+ this.encodeEntityIds = Boolean.parseBoolean(value.toString());
+ }
+ value = config.get(PARAM_ENTITY_SCORE_FILE);
+ if(value == null || value.toString().isEmpty()){
+ score = new File(DEFAULT_ENTITY_SCORE_FILE);
+ } else {
+ score = new File(indexingConfig.getSourceFolder(),value.toString());
+ }
+ try {
+ initReader(new FileInputStream(score));
+ } catch (FileNotFoundException e) {
+ throw new IllegalArgumentException("The File with the entity scores "+score.getAbsolutePath()+" does not exist",e);
+ }
+ }
+ /**
+ * used by the constructors and {@link #setConfiguration(Map)} to initialise
+ * the reader based on the provided File/InputStream.
+ * @param is the input stream
+ * @param charset the charset
+ */
+ private void initReader(InputStream is) {
+ try {
+ reader = new BufferedReader(new InputStreamReader(is, this.charset));
+ } catch (UnsupportedEncodingException e) {
+ throw new IllegalArgumentException("The parsed encoding "+charset+" is not supported",e);
+ }
+ }
+ @Override
+ public boolean hasNext() {
+ if(nextLine == null){ //consumed
+ getNext();
+ }
+ return nextLine != null;
+ }
+
+ @Override
+ public EntityScore next() {
+ String line = nextLine;
+ nextLine = null; //consume
+ String[] parts = line.split(separator);
+ if(parts.length > 2){
+ log.warn("Line {} does have more than 2 parts {}",
+ lineCounter,Arrays.toString(parts));
+ }
+ Float score;
+ if(parts.length >=2){
+ try {
+ score = Float.parseFloat(parts[1]);
+ } catch (NumberFormatException e) {
+ log.warn(String.format("Unable to parse the score for " +
+ "Entity %s from value %s in line %s! Use NULL as score 0",
+ parts[0],parts[1],lineCounter));
+ score = null;
+ }
+ } else {
+ log.debug("No score for Entity {} in line {}! Use NULL as score",parts[0],lineCounter);
+ score = null;
+ }
+ try {
+ return new EntityScore(
+ encodeEntityIds ? URLEncoder.encode(parts[0], charset) : parts[0],
+ score);
+ } catch (UnsupportedEncodingException e) {
+ throw new IllegalStateException("Unable to URLEncode EntityId",e);
+ }
+ }
+ private void getNext(){
+ try {
+ nextLine = reader.readLine();
+ lineCounter++;
+ } catch (IOException e) {
+ throw new IllegalStateException("Unable to read next EntityScore",e);
+ }
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("Removal form the EnityScore list is not supported");
+ }
+ @Override
+ public boolean needsInitialisation() {
+ return false;
+ }
+ @Override
+ public void initialise() {
+ }
+
+ @Override
+ public void close(){
+ if(reader != null){
+ try {
+ reader.close();
+ }catch (IOException e) {
+ //ignore
+ }
+ }
+ }
+}
Propchange: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/source/LineBasedEntityIterator.java
------------------------------------------------------------------------------
svn:mime-type = text/plain