You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stanbol.apache.org by rw...@apache.org on 2011/04/29 11:20:38 UTC

svn commit: r1097740 [2/10] - in /incubator/stanbol/trunk: entityhub/ entityhub/generic/core/src/main/java/org/apache/stanbol/entityhub/core/mapping/ entityhub/generic/servicesapi/src/main/java/org/apache/stanbol/entityhub/servicesapi/mapping/ entityhu...

Added: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/config/IndexingConfig.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/config/IndexingConfig.java?rev=1097740&view=auto
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/config/IndexingConfig.java (added)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/config/IndexingConfig.java Fri Apr 29 09:20:31 2011
@@ -0,0 +1,601 @@
+package org.apache.stanbol.entityhub.indexing.core.config;
+
+import static org.apache.stanbol.entityhub.indexing.core.config.IndexingConstants.*;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.LineIterator;
+import org.apache.stanbol.entityhub.core.mapping.FieldMappingUtils;
+import org.apache.stanbol.entityhub.indexing.core.EntityDataIterable;
+import org.apache.stanbol.entityhub.indexing.core.EntityDataProvider;
+import org.apache.stanbol.entityhub.indexing.core.EntityIterator;
+import org.apache.stanbol.entityhub.indexing.core.EntityProcessor;
+import org.apache.stanbol.entityhub.indexing.core.EntityScoreProvider;
+import org.apache.stanbol.entityhub.indexing.core.IndexingDestination;
+import org.apache.stanbol.entityhub.indexing.core.normaliser.DefaultNormaliser;
+import org.apache.stanbol.entityhub.indexing.core.normaliser.ScoreNormaliser;
+import org.apache.stanbol.entityhub.servicesapi.mapping.FieldMapping;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class IndexingConfig {
+    private static final String DEFAULT_ROOT_PATH = "indexing";
+    private static final String CONFIG_PATH = "config";
+    private static final String SOURCE_PATH = "resources";
+    private static final String DESTINATION_PATH = "destination";
+    private static final String DISTRIBUTION_PATH = "dist";
+    private static final String INDEXING_PROERTIES = "indexing.properties";
+    private static final String CONFIG_PARAM = "config";
+    public static final String KEY_INDEXING_CONFIG = "indexingConfig";
+    
+    /**
+     * Internally used to explain the syntax in the configuration file to parse parameters
+     */
+    private static final String SYNTAX_ERROR_MESSAGE = "{key}={value1},{param1}:{value1},{param2}:{value2};{value2},{param1}:{value1} ...";
+    
+    private static final Logger log = LoggerFactory.getLogger(IndexingConfig.class);
+    private static final String DEFAULT_INDEX_FIELD_CONFIG_FILE_NAME = "indexFieldConfig.txt";
+    
+    private final File rootDir;
+    private final File configDir;
+    private final File sourceDir;
+    private final File destinationDir;
+    private final File distributionDir;
+    private final Map<String,Object> configuration;
+    
+    private String name;
+
+    private EntityDataIterable entityDataIterable = null;
+    private EntityDataProvider entityDataProvider = null;
+
+    private EntityIterator entityIdIterator = null;
+    private EntityScoreProvider entityScoreProvider = null;
+    
+    private ScoreNormaliser scoreNormaliser = null;
+    
+    private EntityProcessor entityProcessor = null;
+    
+    private IndexingDestination indexingDestination = null;
+    /**
+     * The configuration of the fields/languages included/excluded in the index.
+     */
+    private Collection<FieldMapping> fieldMappings;
+    
+    public IndexingConfig(){
+        this(null);
+    }
+    
+    public IndexingConfig(String rootPath){
+        //first get the root
+        File root = new File(System.getProperty("user.dir"));
+        if(rootPath != null){
+            File parsed = new File(rootPath);
+            if(!parsed.isAbsolute()){
+                root = new File(root,rootPath); //add parsed to "user.dir"
+            } else {
+                root = parsed; //use the parsed absolute path
+            }
+        }
+        //now we need to add the name of the root folder
+        root = new File(root,DEFAULT_ROOT_PATH);
+        //check if root exists
+        if(!root.isDirectory()){
+            throw new IllegalArgumentException(
+                "The root folder for the indexing '"+root.getAbsolutePath()+
+                "' does not exist!");
+        } else {
+            this.rootDir = root;
+        }
+        //check also for the config
+        this.configDir = new File(root,CONFIG_PATH);
+        if(!configDir.isDirectory()){
+            throw new IllegalArgumentException(
+                "The root folder for the indexing configuration '"+
+                root.getAbsolutePath()+"' does not exist!");
+        }
+        this.sourceDir = new File(root,SOURCE_PATH);
+        if(!sourceDir.exists()){
+            log.info("The resource folder '"+sourceDir.getAbsolutePath()+
+                "' (typically containing the sources used for indexing) does not exist");
+            log.info(" - this might be OK if no (local) resources are needed for the indexing");
+        }
+        this.destinationDir = new File(root,DESTINATION_PATH);
+        if(!destinationDir.exists()){
+            if(!destinationDir.mkdirs()){
+                throw new IllegalStateException(
+                    "Unable to create target folder for indexing '"+
+                    destinationDir.getAbsolutePath()+"'!");
+            }
+        }
+        this.distributionDir = new File(root,DISTRIBUTION_PATH);
+        if(!distributionDir.exists()){
+            if(!distributionDir.mkdirs()){
+                throw new IllegalStateException(
+                    "Unable to create distribution folder for indexing '"+
+                    destinationDir.getAbsolutePath()+"'!");
+            }
+        }
+        //check the main configuration
+        File indexingConfigFile = new File(this.configDir,INDEXING_PROERTIES);
+        this.configuration = loadConfig(indexingConfigFile,true);
+        Object value = configuration.get(KEY_NAME);
+        if(value == null){
+            throw new IllegalArgumentException("Indexing Configuration '"+
+                indexingConfigFile+"' is missing the required key "+KEY_NAME+"!");
+        }
+        this.name = value.toString();
+        if(name.isEmpty()){
+            throw new IllegalArgumentException("Invalid Indexing Configuration '"+
+                indexingConfigFile+"': The value for the parameter"+KEY_NAME+" MUST NOT be empty!");
+        }
+        value = configuration.get(KEY_INDEX_FIELD_CONFIG);
+        if(value == null || value.toString().isEmpty()){
+            value = DEFAULT_INDEX_FIELD_CONFIG_FILE_NAME;
+        }
+        final File indexFieldConfig = new File(configDir,value.toString());
+        if(indexFieldConfig.isFile()){
+            try {
+                this.fieldMappings = FieldMappingUtils.parseFieldMappings(new Iterator<String>() {
+                    LineIterator it = IOUtils.lineIterator(new FileInputStream(indexFieldConfig), "UTF-8");
+                    @Override
+                    public boolean hasNext() {
+                        return it.hasNext();
+                    }
+                    @Override
+                    public String next() {
+                        return it.nextLine();
+                    }
+                    @Override
+                    public void remove() {
+                        it.remove();
+                    }
+                });
+            } catch (IOException e) {
+               throw new IllegalStateException("Unable to read Index Field Configuration form '"
+                   +indexFieldConfig+"'!",e);
+            }
+        } else {
+            throw new IllegalArgumentException("Invalid Indexing Configuration: " +
+            		"IndexFieldConfiguration '"+indexFieldConfig+"' not found. " +
+            		"Provide the missing file or use the '"+KEY_INDEX_FIELD_CONFIG+
+            		"' in the '"+indexingConfigFile+"' to configure a different one!");
+        }
+    }
+
+    /**
+     * Loads an {@link Properties} configuration from the parsed file and
+     * returns it as Map
+     * @param configFile the file
+     * @param required if <code>true</code> an {@link IllegalArgumentException}
+     * will be thrown if the config was not present otherwise an empty map will
+     * be returned
+     * @return The configuration as Map
+     */
+    @SuppressWarnings("unchecked")
+    private Map<String,Object> loadConfig(File configFile, boolean required) {
+        //Uses an own implementation to parse key=value configuration
+        //The problem with the java properties is that keys do not support
+        //UTF-8, but some configurations might want to use URLs as keys!
+        Map<String,Object> configMap = new HashMap<String,Object>();
+        try {
+            LineIterator lines = IOUtils.lineIterator(new FileInputStream(configFile), "UTF-8");
+            while(lines.hasNext()){
+                String line = (String)lines.next();
+                if(!line.isEmpty()){
+                    int indexOfEquals = line.indexOf('=');
+                    String key = indexOfEquals > 0 ?
+                            line.substring(0,indexOfEquals).trim():
+                                line.trim();
+                    if(key.charAt(0) != '#' && key.charAt(0) != '!'){ //no comment
+                        String value;
+                        if(indexOfEquals > 0 && indexOfEquals < line.length()-1){
+                            value = line.substring(indexOfEquals+1,line.length());
+                        } else {
+                            value = null;
+                        }
+                        configMap.put(key,value);
+                    } // else ignore comments
+                } //else ignore empty lines
+            }
+        } catch (FileNotFoundException e) {
+            if(required){
+                throw new IllegalArgumentException(
+                    "Unable to find configuration file '"+
+                    configFile.getAbsolutePath()+"'!");
+            }
+        } catch (IOException e) {
+            if(required){
+                throw new IllegalStateException(
+                    "Unable to read configuration file '"+
+                    configFile.getAbsolutePath()+"'!",e);
+            }
+        }
+        // Old code that used java.util.Properties to load configurations!
+//        Properties config = new Properties();
+//        try {
+//            config.load(new FileInputStream(configFile));
+//        } catch (FileNotFoundException e) {
+//            if(required){
+//                throw new IllegalArgumentException(
+//                    "Unable to find configuration file '"+
+//                    configFile.getAbsolutePath()+"'!");
+//            }
+//        } catch (IOException e) {
+//            if(required){
+//                throw new IllegalStateException(
+//                    "Unable to read configuration file '"+
+//                    configFile.getAbsolutePath()+"'!",e);
+//            }
+//        }
+//        if(config != null){
+//            for(Enumeration<String> keys = (Enumeration<String>)config.propertyNames();keys.hasMoreElements();){
+//                String key = keys.nextElement();
+//                configMap.put(key, config.getProperty(key));
+//            }
+//        }
+        return configMap;
+    }
+    /**
+     * Getter for the root folder used for the Indexing
+     * @return the root folder (containing the config, resources, target and dist folders)
+     */
+    public final File getRootFolder() {
+        return rootDir;
+    }
+
+    /**
+     * The root folder for the configuration. Guaranteed to exist.
+     * @return the root folder for the configuration
+     */
+    public final File getConfigFolder() {
+        return configDir;
+    }
+
+    /**
+     * The root folder containing the resources used as input for the 
+     * indexing process. Might not exist if no resources are available
+     * @return the root folder for the resources
+     */
+    public final File getSourceFolder() {
+        return sourceDir;
+    }
+
+    /**
+     * The root folder containing the files created by the indexing process.
+     * Guaranteed to exist.
+     * @return the target folder
+     */
+    public final File getDestinationFolder() {
+        return destinationDir;
+    }
+    /**
+     * The root folder for the distribution. Guaranteed to exist.
+     * @return the distribution folder
+     */
+    public final File getDistributionFolder() {
+        return distributionDir;
+    }
+    /**
+     * Getter for the name as configured by the {@link IndexingConstants#KEY_NAME}
+     * by the main indexing configuration.
+     * @return the name of this data source to index
+     */
+    public String getName() {
+        return name;
+    }
+    /**
+     * Getter for the description as configured by the {@link IndexingConstants#KEY_DESCRIPTION}
+     * by the main indexing configuration.
+     * @return the description of the data source to index or <code>null</code>
+     * if not defined
+     */
+    public String getDescription(){
+        Object value = configuration.get(KEY_DESCRIPTION);
+        return value != null?value.toString():null;
+    }
+    /**
+     * The {@link ScoreNormaliser} as configured by the {@link IndexingConstants#KEY_SCORE_NORMALIZER}
+     * by the main indexing configuration.
+     * @return the configured {@link ScoreNormaliser} or a {@link DefaultNormaliser} if
+     * this configuration is missing.
+     */
+    public ScoreNormaliser getNormaliser(){
+        if(scoreNormaliser == null){
+            initNormaliser();
+        }
+        return scoreNormaliser;
+    }
+    /**
+     * The {@link EntityDataIterable} as configured by the {@link IndexingConstants#KEY_ENTITY_DATA_ITERABLE}
+     * by the main indexing configuration.
+     * @return the configured {@link EntityDataIterable} or a <code>null</code> if
+     * this configuration is not present.
+     */
+    public EntityDataIterable getDataInterable(){
+        if(entityDataIterable  != null){
+            return entityDataIterable;
+        } else if(configuration.containsKey(KEY_ENTITY_DATA_ITERABLE)){
+            ConfigEntry config = parseConfigEntry(configuration.get(KEY_ENTITY_DATA_ITERABLE).toString());
+            try {
+                entityDataIterable = (EntityDataIterable)Class.forName(config.getClassName()).newInstance();
+            } catch (Exception e) {
+                throw new IllegalArgumentException("Invalid EntityDataIterable configuration '"+config.getConfigString()+"'!",e);
+            }
+            //add the configuration
+            Map<String,Object> configMap = getComponentConfig(config, entityDataIterable.getClass().getSimpleName(), false);
+            //add also the directly provided parameters
+            configMap.putAll(config.getParams());
+            entityDataIterable.setConfiguration(configMap);
+            return entityDataIterable;
+        } else {
+            return null;
+        }
+    }
+    public EntityIterator getEntityIdIterator() {
+        if(entityIdIterator != null){
+            return entityIdIterator;
+        } else if(configuration.containsKey(KEY_ENTITY_ID_ITERATPR)){
+            ConfigEntry config = parseConfigEntry(configuration.get(KEY_ENTITY_ID_ITERATPR).toString());
+            try {
+                entityIdIterator = (EntityIterator)Class.forName(config.getClassName()).newInstance();
+            } catch (Exception e) {
+                throw new IllegalArgumentException("Invalid EntityIterator configuration '"+config.getConfigString()+"'!",e);
+            }
+            //add the configuration
+            Map<String,Object> configMap = getComponentConfig(config, entityIdIterator.getClass().getSimpleName(), false);
+            //add also the directly provided parameters
+            configMap.putAll(config.getParams());
+            entityIdIterator.setConfiguration(configMap);
+            return entityIdIterator;
+        } else {
+            return null;
+        }
+    }
+    public EntityDataProvider getEntityDataProvider() {
+        if(entityDataProvider != null){
+            return entityDataProvider;
+        } else if (configuration.containsKey(KEY_ENTITY_DATA_PROVIDER)){
+            ConfigEntry config = parseConfigEntry(configuration.get(KEY_ENTITY_DATA_PROVIDER).toString());
+            try {
+                entityDataProvider = (EntityDataProvider)Class.forName(config.getClassName()).newInstance();
+            } catch (Exception e) {
+                throw new IllegalArgumentException("Invalid EntityDataProvider configuration '"+config.getConfigString()+"'!",e);
+            }
+            //add the configuration
+            Map<String,Object> configMap = getComponentConfig(config, entityDataProvider.getClass().getSimpleName(), false);
+            //add also the directly provided parameters
+            configMap.putAll(config.getParams());
+            entityDataProvider.setConfiguration(configMap);
+            return entityDataProvider;
+        } else {
+            return null;
+        }
+    }
+    public EntityScoreProvider getEntityScoreProvider() {
+        if(entityScoreProvider != null){
+            return entityScoreProvider;
+        } else if (configuration.containsKey(KEY_ENTITY_SCORE_PROVIDER)){
+            ConfigEntry config = parseConfigEntry(configuration.get(KEY_ENTITY_SCORE_PROVIDER).toString());
+            try {
+                entityScoreProvider = (EntityScoreProvider)Class.forName(config.getClassName()).newInstance();
+            } catch (Exception e) {
+                throw new IllegalArgumentException("Invalid EntityScoreProvider configuration '"+config.getConfigString()+"'!",e);
+            }
+            //add the configuration
+            Map<String,Object> configMap = getComponentConfig(config, entityScoreProvider.getClass().getSimpleName(), false);
+            //add also the directly provided parameters
+            configMap.putAll(config.getParams());
+            entityScoreProvider.setConfiguration(configMap);
+            return entityScoreProvider;
+        } else {
+            return null;
+        }
+    }
+    /**
+     * The fields and languages included/excluded in the created index.<p>
+     * NOTE: Currently this uses the {@link FieldMapping} class was initially
+     * defined to be used as configuration for the {@link FieldMapper}. In
+     * future this might change to an Interface that is more tailored to
+     * defining the fields and languages included/excluded in the index and does
+     * not allow to define mappings and data type conversions as the current one
+     * @return
+     */
+    public Collection<FieldMapping> getIndexFieldConfiguration(){
+        return fieldMappings;
+    }
+    public EntityProcessor getEntityProcessor() {
+        if(entityProcessor != null){
+            return entityProcessor;
+        } else if (configuration.containsKey(KEY_ENTITY_PROCESSOR)){
+            ConfigEntry config = parseConfigEntry(configuration.get(KEY_ENTITY_PROCESSOR).toString());
+            try {
+                entityProcessor = (EntityProcessor)Class.forName(config.getClassName()).newInstance();
+            } catch (Exception e) {
+                throw new IllegalArgumentException("Invalid EntityProcessor configuration '"+config.getConfigString()+"'!",e);
+            }
+            //add the configuration
+            Map<String,Object> configMap = getComponentConfig(config, entityProcessor.getClass().getSimpleName(), false);
+            //add also the directly provided parameters
+            configMap.putAll(config.getParams());
+            entityProcessor.setConfiguration(configMap);
+            return entityProcessor;
+        } else {
+            return null;
+        }
+    }
+    public IndexingDestination getIndexingDestination() {
+        if(indexingDestination != null){
+            return indexingDestination;
+        } else if (configuration.containsKey(KEY_INDEXING_DESTINATION)){
+            ConfigEntry config = parseConfigEntry(configuration.get(KEY_INDEXING_DESTINATION).toString());
+            try {
+                indexingDestination = (IndexingDestination)Class.forName(config.getClassName()).newInstance();
+            } catch (Exception e) {
+                throw new IllegalArgumentException("Invalid IndexingDestination configuration '"+config.getConfigString()+"'!",e);
+            }
+            //add the configuration
+            Map<String,Object> configMap = getComponentConfig(config, indexingDestination.getClass().getSimpleName(), false);
+            //add also the directly provided parameters
+            configMap.putAll(config.getParams());
+            indexingDestination.setConfiguration(configMap);
+            return indexingDestination;
+        } else {
+            return null;
+        }
+    }
+
+    private void initNormaliser() {
+        Object value = configuration.get(IndexingConstants.KEY_SCORE_NORMALIZER);
+        if(value == null){
+            this.scoreNormaliser = new  DefaultNormaliser();
+        } else {
+            ScoreNormaliser normaliser = null;
+            ScoreNormaliser last = null;
+            List<ConfigEntry> configs = parseConfigEntries(value.toString());
+            for(int i=configs.size()-1;i>=0;i--){
+                last = normaliser;
+                normaliser = null;
+                ConfigEntry config = configs.get(i);
+                try {
+                    normaliser = (ScoreNormaliser)Class.forName(config.getClassName()).newInstance();
+                } catch (Exception e) {
+                    throw new IllegalArgumentException("Invalid Normaliser configuration '"+config.getConfigString()+"'!",e);
+                }
+                Map<String,Object> normaliserConfig = getComponentConfig(config,normaliser.getClass().getSimpleName(),config.getParams().containsKey(CONFIG_PARAM));
+                //add also the directly provided parameters
+                normaliserConfig.putAll(config.getParams());
+                if(last != null){
+                    normaliserConfig.put(ScoreNormaliser.CHAINED_SCORE_NORMALISER, last);
+                }
+                normaliser.setConfiguration(normaliserConfig);
+            }
+            //set the normaliser!
+            this.scoreNormaliser = normaliser;
+        }
+    }
+
+    /**
+     * Loads a configuration based on the value of the {@link #CONFIG_PARAM}
+     * parameter of the parsed {@link ConfigEntry}.
+     * @param configEntry
+     * @param defaultName
+     * @return
+     */
+    private Map<String,Object> getComponentConfig(ConfigEntry configEntry,String defaultName, boolean required) {
+        //Removed support for parsing the relative path to the config file
+        //because it was not used! (String relConfigPath was the first param)
+//        File configDir;
+//        if(relConfigPath == null || relConfigPath.isEmpty()){
+//            configDir = this.configDir;
+//        } else {
+//            configDir = new File(this.configDir,relConfigPath);
+//        }
+//        //test also if relConfigPath = null, because also the root might not exist!
+//        if(!configDir.isDirectory()){
+//            if(required){
+//                throw new IllegalArgumentException("The Configuration Directory '"+
+//                    configDir+"' does not exist (or ist not a directory)!");
+//            } else {
+//                return new HashMap<String,Object>();
+//            }
+//        }
+        //if the CONFIG_PARAM is present in the config we assume that a config is required
+        String name = configEntry.getParams().get(CONFIG_PARAM);
+        Map<String,Object> config = loadConfig(name == null ? defaultName : name, configDir, required);
+        //we need to also add the key used to get (this) indexing config
+        config.put(KEY_INDEXING_CONFIG, this);
+        return config;
+    }
+
+    /**
+     * Loads the config with the given name from the parsed directory and throwing
+     * an {@link IllegalArgumentException} if the configuration is required but
+     * not found
+     * @param name the name (".properties" is appended if missing)
+     * @param configDir the directory to look for the config
+     * @param required if this config is required or optional
+     * @return the key value mappings as map
+     */
+    private Map<String,Object> loadConfig(String name, File configDir, boolean required) {
+        Map<String,Object> loadedConfig;
+        name = name.endsWith(".properties")? name : name+".properties";
+        if(name == null){
+            if(required){
+                throw new IllegalArgumentException("Missing required parameter'"+
+                    CONFIG_PARAM+"' Syntax: '"+SYNTAX_ERROR_MESSAGE +"'!");
+            } else {
+                return new HashMap<String,Object>();
+            }
+        }
+        File configFile = new File(configDir,name);
+        loadedConfig = loadConfig(configFile,required);
+        return loadedConfig;
+    }
+
+    private ConfigEntry parseConfigEntry(String config){
+        return new ConfigEntry(config);
+    }
+    private List<ConfigEntry> parseConfigEntries(String config){
+        List<ConfigEntry> configs = new ArrayList<ConfigEntry>();
+        for(String configPart : config.split(";")){
+            configs.add(parseConfigEntry(configPart));
+        }
+        return configs;
+    }
+    private class ConfigEntry {
+        private String configString;
+        private String className;
+        private Map<String,String> params;
+        
+        private ConfigEntry(String config){
+            configString = config;
+            String[] parts = config.split(",");
+            className = parts[0];
+            params = new HashMap<String,String>();
+            if(parts.length>1){
+                for(int i=1;i<parts.length;i++){
+                    String[] param = parts[i].split(":"); //TODO: maybe use also "=" there
+                    String value = null;
+                    if(param.length>1){
+                        value = parts[i].substring(parts[i].indexOf(':')+1);
+                    }
+                    params.put(param[0], value);
+                }
+            }
+        }
+        public final String getConfigString() {
+            return configString;
+        }
+        public final String getClassName() {
+            return className;
+        }
+        public final Map<String,String> getParams() {
+            return params;
+        }
+    }
+    /**
+     * Can be used to look for a config within the configuration directory
+     * of the {@link IndexingConfig}.
+     * @param string the name of the configuration (".properties" is appended if
+     * missing)
+     * @param required if this is an required or optional configuration.
+     * @return the key value mappings as map
+     * @throws IllegalArgumentException if the configuration was not found and
+     * <code>true</code> was parsed for required
+     */
+    public Map<String,Object> getConfig(String name,boolean required) throws IllegalArgumentException {
+        return loadConfig(name, configDir, required);
+    }
+}

Propchange: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/config/IndexingConfig.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/config/IndexingConstants.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/config/IndexingConstants.java?rev=1097740&view=auto
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/config/IndexingConstants.java (added)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/config/IndexingConstants.java Fri Apr 29 09:20:31 2011
@@ -0,0 +1,30 @@
+package org.apache.stanbol.entityhub.indexing.core.config;
+
+import java.io.File;
+
+/**
+ * Constants defines/used for Indexing.
+ * @author Rupert Westenthaler
+ *
+ */
+public interface IndexingConstants {
+
+    String KEY_NAME                  = "name";
+    String KEY_DESCRIPTION           = "description";
+    String KEY_ENTITY_DATA_ITERABLE  = "entityDataIterable";
+    String KEY_ENTITY_DATA_PROVIDER  = "entityDataProvider";
+    String KEY_ENTITY_ID_ITERATPR    = "entityIdIterator";
+    String KEY_ENTITY_SCORE_PROVIDER = "entityScoreProvider";
+    String KEY_INDEXING_DESTINATION = "indexingDestination";
+    String KEY_INDEX_FIELD_CONFIG = "fieldConfiguration";
+    /**
+     * usage:<br>
+     * <pre>
+     * {class1},name:{name1};{class2},name:{name2};...
+     * </pre>
+     * The class implementing the normaliser and the name of the configuration
+     * file stored within /config/normaliser/{name}.properties
+     */
+    String KEY_SCORE_NORMALIZER      = "scoreNormalizer";
+    String KEY_ENTITY_PROCESSOR      = "entityProcessor";
+}

Propchange: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/config/IndexingConstants.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/event/IndexingEvent.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/event/IndexingEvent.java?rev=1097740&view=auto
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/event/IndexingEvent.java (added)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/event/IndexingEvent.java Fri Apr 29 09:20:31 2011
@@ -0,0 +1,22 @@
+package org.apache.stanbol.entityhub.indexing.core.event;
+
+import java.util.EventObject;
+
+import org.apache.stanbol.entityhub.indexing.core.impl.IndexerImpl;
+
+public class IndexingEvent extends EventObject {
+
+    private static final long serialVersionUID = 1L;
+    public IndexingEvent(IndexerImpl source) {
+        super(source);
+    }
+    
+    @Override
+    public IndexerImpl getSource() {
+        // TODO Auto-generated method stub
+        return (IndexerImpl)super.getSource();
+    }
+
+    
+
+}

Propchange: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/event/IndexingEvent.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/event/IndexingListener.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/event/IndexingListener.java?rev=1097740&view=auto
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/event/IndexingListener.java (added)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/event/IndexingListener.java Fri Apr 29 09:20:31 2011
@@ -0,0 +1,7 @@
+package org.apache.stanbol.entityhub.indexing.core.event;
+
+public interface IndexingListener {
+
+    void stateChanged(IndexingEvent event);
+    void indexingCompleted(IndexingEvent event);
+}

Propchange: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/event/IndexingListener.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/AbstractEntityIndexingDaemon.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/AbstractEntityIndexingDaemon.java?rev=1097740&view=auto
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/AbstractEntityIndexingDaemon.java (added)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/AbstractEntityIndexingDaemon.java Fri Apr 29 09:20:31 2011
@@ -0,0 +1,56 @@
+package org.apache.stanbol.entityhub.indexing.core.impl;
+
+import static org.apache.stanbol.entityhub.indexing.core.impl.IndexerConstants.SCORE_FIELD;
+import static org.apache.stanbol.entityhub.indexing.core.impl.IndexerConstants.SOURCE_COMPLETE;
+import static org.apache.stanbol.entityhub.indexing.core.impl.IndexerConstants.SOURCE_DURATION;
+import static org.apache.stanbol.entityhub.indexing.core.impl.IndexerConstants.SOURCE_STARTED;
+
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.stanbol.entityhub.indexing.core.IndexingComponent;
+import org.apache.stanbol.entityhub.indexing.core.normaliser.ScoreNormaliser;
+import org.apache.stanbol.entityhub.servicesapi.model.Representation;
+
+public abstract class AbstractEntityIndexingDaemon extends IndexingDaemon<Object,Representation> {
+
+    private final ScoreNormaliser normaliser;
+    
+    protected AbstractEntityIndexingDaemon(String name,
+                                           ScoreNormaliser scoreNormaliser,
+                                           BlockingQueue<QueueItem<Representation>> produce,
+                                           BlockingQueue<QueueItem<IndexingError>> error) {
+        super(name, 
+            IndexerConstants.SEQUENCE_NUMBER_SOURCE_DAEMON,
+            null,produce, error);
+        this.normaliser = scoreNormaliser;
+    }
+    /**
+     * Used to produce Representations by both variants of EnityIndexingDeamons
+     * @param rep the {@link Representation} extracted from the 
+     *  {@link IndexingComponent}s
+     * @param score The score for the Representation
+     */
+    protected final void produce(Representation rep,Float score,Long started) {
+        if(rep == null){
+            return;
+        }
+        //normalise the score if both score and a normaliser are present
+        if(score != null && normaliser != null){
+            score = normaliser.normalise(score);
+        }
+        //first set the score of the representation
+        QueueItem<Representation> item = new QueueItem<Representation>(rep);
+        //set the score as additional property to the QueueItem, because
+        //it needs to be added to the Representation after the processing completes
+        if(score != null && score.compareTo(ScoreNormaliser.ZERO) >= 0){
+            item.setProperty(SCORE_FIELD, score);
+        }
+        item.setProperty(SOURCE_STARTED, started);
+        Long completed = Long.valueOf(System.currentTimeMillis());
+        item.setProperty(SOURCE_COMPLETE, completed);
+        Float duration = Float.valueOf((float)(completed.longValue()-started.longValue()));
+        item.setProperty(SOURCE_DURATION, duration);
+        produce(item);
+    }
+
+}

Propchange: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/AbstractEntityIndexingDaemon.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/EntityDataBasedIndexingDaemon.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/EntityDataBasedIndexingDaemon.java?rev=1097740&view=auto
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/EntityDataBasedIndexingDaemon.java (added)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/EntityDataBasedIndexingDaemon.java Fri Apr 29 09:20:31 2011
@@ -0,0 +1,67 @@
+package org.apache.stanbol.entityhub.indexing.core.impl;
+
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.stanbol.entityhub.indexing.core.EntityDataIterable;
+import org.apache.stanbol.entityhub.indexing.core.EntityDataIterator;
+import org.apache.stanbol.entityhub.indexing.core.EntityScoreProvider;
+import org.apache.stanbol.entityhub.indexing.core.IndexingComponent;
+import org.apache.stanbol.entityhub.indexing.core.normaliser.ScoreNormaliser;
+import org.apache.stanbol.entityhub.servicesapi.model.Representation;
+
+/**
+ * Daemon the extracts Entities from the {@link IndexingComponent}s by iterating
+ * over the entity data and making lookups for the entity scores
+ * @author Rupert Westenthaler
+ */
+public class EntityDataBasedIndexingDaemon extends AbstractEntityIndexingDaemon {
+    private final EntityDataIterable dataIterable;
+    private final EntityScoreProvider scoreProvider;
+    private final boolean indexAllEntitiesState;
+    public EntityDataBasedIndexingDaemon(String name,
+                                         BlockingQueue<QueueItem<Representation>> produce,
+                                         BlockingQueue<QueueItem<IndexingError>> error,
+                                         EntityDataIterable dataIterable,
+                                         EntityScoreProvider scoreProvider,
+                                         ScoreNormaliser normaliser,
+                                         boolean indexAllEntitiesState) {
+        super(name, normaliser,produce, error);
+        if(dataIterable == null){
+            throw new IllegalArgumentException("The parsed EntityDataIterator MUST NOT be NULL");
+        }
+        if(scoreProvider == null){
+            throw new IllegalArgumentException("The parsed EntityScoreProvider MUST NOT be NULL");
+        }
+        this.dataIterable = dataIterable;
+        this.scoreProvider = scoreProvider;
+        this.indexAllEntitiesState = indexAllEntitiesState;
+    }
+
+    @Override
+    public void run() {
+        log.info("...start iterating over Entity data");
+        EntityDataIterator dataIterator = dataIterable.entityDataIterator();
+        while(dataIterator.hasNext()){
+            Long start = Long.valueOf(System.currentTimeMillis());
+            String id = dataIterator.next();
+            Representation rep = null;
+            Float score;
+            if(!scoreProvider.needsData()){
+                score = scoreProvider.process(id);
+            } else {
+                rep = dataIterator.getRepresentation();
+                score = scoreProvider.process(rep);
+            }
+            if(indexAllEntitiesState || //all entities are indexed anyway
+                    score == null || //no score available
+                    score.compareTo(ScoreNormaliser.ZERO) >= 0){ //score >= 0
+                if(rep == null){
+                    rep = dataIterator.getRepresentation();
+                }
+                produce(rep,score,start);
+            } // else ignore this entity
+        }
+        setFinished();
+    }
+
+}

Propchange: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/EntityDataBasedIndexingDaemon.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/EntityErrorLoggerDaemon.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/EntityErrorLoggerDaemon.java?rev=1097740&view=auto
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/EntityErrorLoggerDaemon.java (added)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/EntityErrorLoggerDaemon.java Fri Apr 29 09:20:31 2011
@@ -0,0 +1,31 @@
+package org.apache.stanbol.entityhub.indexing.core.impl;
+
+import java.util.concurrent.BlockingQueue;
+
+import org.slf4j.Logger;
+
+public class EntityErrorLoggerDaemon extends IndexingDaemon<IndexingError,Object> {
+
+    private final Logger err;
+    public EntityErrorLoggerDaemon(BlockingQueue<QueueItem<IndexingError>> consume,
+                             Logger err) {
+        super("Indexer: Entity Error Logging Daemon",
+            IndexerConstants.SEQUENCE_NUMBER_ERROR_HANDLING_DAEMON,
+            consume, null, null);
+        this.err = err;
+    }
+
+    @Override
+    public void run() {
+        while(!isQueueFinished()){
+            QueueItem<IndexingError> errorItem = consume();
+            if(errorItem != null){
+                IndexingError error = errorItem.getItem();
+                err.error(String.format("Error while indexing %s: %s",
+                    error.getEntity(),error.getMessage()),error.getException());
+            }
+        }
+        setFinished();
+    }
+
+}

Propchange: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/EntityErrorLoggerDaemon.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/EntityIdBasedIndexingDaemon.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/EntityIdBasedIndexingDaemon.java?rev=1097740&view=auto
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/EntityIdBasedIndexingDaemon.java (added)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/EntityIdBasedIndexingDaemon.java Fri Apr 29 09:20:31 2011
@@ -0,0 +1,51 @@
+package org.apache.stanbol.entityhub.indexing.core.impl;
+
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.stanbol.entityhub.indexing.core.EntityDataProvider;
+import org.apache.stanbol.entityhub.indexing.core.EntityIterator;
+import org.apache.stanbol.entityhub.indexing.core.EntityIterator.EntityScore;
+import org.apache.stanbol.entityhub.indexing.core.normaliser.ScoreNormaliser;
+import org.apache.stanbol.entityhub.servicesapi.model.Representation;
+
+public class EntityIdBasedIndexingDaemon extends AbstractEntityIndexingDaemon {
+
+    private final EntityIterator entityIdIterator;
+    private final EntityDataProvider dataProvider;
+    private final boolean indexAllEntitiesState;
+    public EntityIdBasedIndexingDaemon(String name,
+                                          BlockingQueue<QueueItem<Representation>> produce,
+                                          BlockingQueue<QueueItem<IndexingError>> error,
+                                          EntityIterator entityIdIterator,
+                                          EntityDataProvider dataProvider,
+                                          ScoreNormaliser normaliser,
+                                          boolean indexAllEntitiesState) {
+        super(name, normaliser,produce, error);
+        if(entityIdIterator == null){
+            throw new IllegalArgumentException("The parsed EntityIterator MUST NOT be NULL");
+        }
+        if(dataProvider == null){
+            throw new IllegalArgumentException("The parsed EntityDataProvider MUST NOT be NULL");
+        }
+        this.entityIdIterator = entityIdIterator;
+        this.dataProvider = dataProvider;
+        this.indexAllEntitiesState = indexAllEntitiesState;
+    }
+
+    @Override
+    public void run() {
+        while(entityIdIterator.hasNext()){
+            Long start = Long.valueOf(System.currentTimeMillis());
+            EntityScore entityScore = entityIdIterator.next();
+            if(indexAllEntitiesState || //all entities are indexed anyway
+                    entityScore.score == null || //no score available
+                    entityScore.score.compareTo(ScoreNormaliser.ZERO) >= 0){ //score >= 0
+                Representation rep = dataProvider.getEntityData(entityScore.id);
+                
+                produce(rep,entityScore.score,start);
+            } //else ignore this entity
+        }
+        setFinished();
+    }
+
+}

Propchange: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/EntityIdBasedIndexingDaemon.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/EntityPersisterRunnable.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/EntityPersisterRunnable.java?rev=1097740&view=auto
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/EntityPersisterRunnable.java (added)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/EntityPersisterRunnable.java Fri Apr 29 09:20:31 2011
@@ -0,0 +1,105 @@
+package org.apache.stanbol.entityhub.indexing.core.impl;
+import static org.apache.stanbol.entityhub.indexing.core.impl.IndexerConstants.STORE_COMPLETE;
+import static org.apache.stanbol.entityhub.indexing.core.impl.IndexerConstants.STORE_DURATION;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.stanbol.entityhub.servicesapi.model.Representation;
+import org.apache.stanbol.entityhub.servicesapi.yard.Yard;
+import org.apache.stanbol.entityhub.servicesapi.yard.YardException;
+
+/**
+ * @author Rupert Westenthaler
+ */
+public class EntityPersisterRunnable extends IndexingDaemon<Representation,Representation> {
+
+    private int chunkSize;
+    private Yard yard;
+    public EntityPersisterRunnable(String name,
+                                   BlockingQueue<QueueItem<Representation>> consume, 
+                                   BlockingQueue<QueueItem<Representation>> produce,
+                                   BlockingQueue<QueueItem<IndexingError>> error,
+                                   int chunkSize, Yard yard){
+        super(name,IndexerConstants.SEQUENCE_NUMBER_PERSIT_DAEMON,
+            consume,produce,error);
+        this.chunkSize = chunkSize;
+        this.yard = yard;
+    }
+    @Override
+    public void run() {
+        Map<String,QueueItem<Representation>> toStore = new HashMap<String,QueueItem<Representation>>();
+        while(!isQueueFinished()){
+            QueueItem<Representation> item;
+            item = consume();
+            if(item != null){
+                if(item.getItem() != null){
+                    toStore.put(item.getItem().getId(),item);
+                }
+            }
+            if(toStore.size() >= chunkSize){
+                process(toStore);
+            }
+        }
+        //process the remaining
+        process(toStore);
+        setFinished();
+    }
+    /**
+     * processes the items within the parsed Map
+     * @param toStore the items to process
+     */
+    private void process(Map<String,QueueItem<Representation>> toStore) {
+        //keep the number of elements because store(..) will remove them!
+        int elements = toStore.size(); 
+        Long start = Long.valueOf(System.currentTimeMillis());
+        Collection<QueueItem<Representation>> stored = store(toStore);
+        Long completed = Long.valueOf(System.currentTimeMillis());
+        Float duration = Float.valueOf(((float)(completed.longValue()-start.longValue()))/elements);
+        for(QueueItem<Representation> storedItem : stored){
+            storedItem.setProperty(STORE_COMPLETE, completed);
+            storedItem.setProperty(STORE_DURATION, duration);
+            produce(storedItem);
+        }
+    }
+    /**
+     * Stores the parsed Representations to the {@link #yard} and 
+     * {@link #sendError(String, String, Exception)} for entities that could
+     * not be stored!
+     * @param toStore the Representations to store. This method removes all
+     * Elements of this map while doing the work
+     */
+    private Set<QueueItem<Representation>> store(Map<String,QueueItem<Representation>> toStore) {
+        String errorMsg;
+        YardException yardException = null;
+        Set<QueueItem<Representation>> stored = new HashSet<QueueItem<Representation>>();
+        Collection<Representation> reps = new ArrayList<Representation>(toStore.size());
+        for(QueueItem<Representation> item:toStore.values()){
+            reps.add(item.getItem());
+        }
+        try {
+            for(Representation r : yard.store(reps)){
+                QueueItem<Representation> old = toStore.remove(r.getId());
+                //create a new QueueItem and copy the metadata of the old one
+                stored.add(new QueueItem<Representation>(r,old));
+            }
+            errorMsg = "Entity %s was not indexed by the Yard %s";
+        } catch (YardException e) {
+            errorMsg = "Unable to store Entity %s to Yard %s because of an YardException";
+        }
+        //the remaining Items in to store have some errors
+        for(QueueItem<Representation> entry : toStore.values()){
+            sendError(entry.getItem().getId(),entry,
+                String.format(errorMsg,entry.getItem().getId(),yard.getId()),
+                yardException);
+        }
+        toStore.clear(); //clear the
+        return stored;
+    }
+    
+}
\ No newline at end of file

Propchange: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/EntityPersisterRunnable.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/EntityProcessorRunnable.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/EntityProcessorRunnable.java?rev=1097740&view=auto
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/EntityProcessorRunnable.java (added)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/EntityProcessorRunnable.java Fri Apr 29 09:20:31 2011
@@ -0,0 +1,70 @@
+package org.apache.stanbol.entityhub.indexing.core.impl;
+
+import static org.apache.stanbol.entityhub.indexing.core.impl.IndexerConstants.PROCESS_COMPLETE;
+import static org.apache.stanbol.entityhub.indexing.core.impl.IndexerConstants.PROCESS_DURATION;
+import static org.apache.stanbol.entityhub.indexing.core.impl.IndexerConstants.PROCESS_STARTED;
+
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.stanbol.entityhub.indexing.core.EntityProcessor;
+import org.apache.stanbol.entityhub.servicesapi.model.Representation;
+
+/**
+ * Consumes Representations as created by the IndexingSource and processes
+ * it by using the configured {@link EntityProcessor}. In addition this
+ * components adds configured keys to the Representation.
+ * @author Rupert Westenthaler
+ *
+ */
+public class EntityProcessorRunnable extends IndexingDaemon<Representation,Representation> {
+    private final EntityProcessor processor;
+    private final Set<String> keys;
+    public EntityProcessorRunnable(String name,
+                                   BlockingQueue<QueueItem<Representation>> consume,
+                                   BlockingQueue<QueueItem<Representation>> produce,
+                                   BlockingQueue<QueueItem<IndexingError>> error,
+                                   EntityProcessor processor,Set<String> keys) {
+        super(name,IndexerConstants.SEQUENCE_NUMBER_PROCESSOR_DAEMON,
+            consume,produce,error);
+        this.processor = processor;
+        if(keys == null){
+            this.keys = Collections.emptySet();
+        } else {
+            this.keys = keys;
+        }
+    }
+    @Override
+    public void run() {
+        while(!isQueueFinished()){
+            QueueItem<Representation> item = consume();
+            if(item != null){
+                Long start = Long.valueOf(System.currentTimeMillis());
+                item.setProperty(PROCESS_STARTED, start);
+                Representation processed = processor.process(item.getItem());
+                if(processed == null){
+                    sendError(item.getItem().getId(),item, 
+                        String.format("Processor %s returned NULL for Entity %s",
+                            processor,item.getItem().getId()), null);
+                } else {
+                    for(String key : keys){
+                        //consume the property and add it to the
+                        //transformed representation
+                        Object value = item.removeProperty(key);
+                        if(value != null){
+                            processed.add(key, value);
+                        }
+                    }
+                    QueueItem<Representation> produced = new QueueItem<Representation>(processed,item);
+                    Long completed = Long.valueOf(System.currentTimeMillis());
+                    produced.setProperty(PROCESS_COMPLETE, completed);
+                    produced.setProperty(PROCESS_DURATION, Float.valueOf(
+                        (float)(completed.longValue()-start.longValue())));
+                    produce(produced);
+                }
+            }
+        }
+        setFinished();
+    }
+}
\ No newline at end of file

Propchange: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/EntityProcessorRunnable.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/FinishedEntityDaemon.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/FinishedEntityDaemon.java?rev=1097740&view=auto
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/FinishedEntityDaemon.java (added)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/FinishedEntityDaemon.java Fri Apr 29 09:20:31 2011
@@ -0,0 +1,199 @@
+package org.apache.stanbol.entityhub.indexing.core.impl;
+
+import static org.apache.stanbol.entityhub.indexing.core.impl.IndexerConstants.PROCESS_DURATION;
+import static org.apache.stanbol.entityhub.indexing.core.impl.IndexerConstants.SOURCE_DURATION;
+import static org.apache.stanbol.entityhub.indexing.core.impl.IndexerConstants.SOURCE_STARTED;
+import static org.apache.stanbol.entityhub.indexing.core.impl.IndexerConstants.STORE_DURATION;
+
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.stanbol.entityhub.servicesapi.model.Representation;
+import org.slf4j.Logger;
+
+public class FinishedEntityDaemon extends IndexingDaemon<Representation,Object> {
+
+    private static final int DEFAULT_MAJOR_INTERVAL = 100000;
+    /**
+     * For now use an logger as output!
+     */
+    private Logger out;
+
+    private int major;
+    private int minor;
+    private double sourceDurationAll;
+    private double processDurationAll;
+    private double storeDurationAll;
+    private double durationAll;
+    private double sourceDurationMajor;
+    private double processDurationMajor;
+    private double storeDurationMajor;
+    private double durationMajor;
+    private double durationMinor;
+    private long start;
+    private long startMajor;
+    private long startMinor;
+    
+    private double timeAll;
+    private double timeMajor;
+    private double timeMinor;
+    
+    private long count;
+    private long countedAll;
+    private long countedMajor;
+    private long countedMinor;
+    
+    
+    public FinishedEntityDaemon(BlockingQueue<QueueItem<Representation>> consume,
+                                  int majorInterval,
+                                  Logger out) {
+        super("Indexing: Finished Entity Logger Deamon",
+            IndexerConstants.SEQUENCE_NUMBER_FINISHED_DAEMON,
+            consume, null, null);
+        this.out = out;
+        if(majorInterval > 0){
+            this.major = majorInterval;
+        } else {
+            this.major = DEFAULT_MAJOR_INTERVAL;
+        }
+        this.minor = major/10;
+    }
+
+    @Override
+    public void run() {
+        count = 0; //Elements indexed
+        //Elements with valid statistics
+        countedAll = 0; 
+        countedMajor = 0;
+        countedMinor = 0;
+        long current = System.currentTimeMillis();
+        while(!isQueueFinished()){
+            QueueItem<Representation> item = consume();
+            if(item != null){
+                current = System.currentTimeMillis();
+                if(count == 0){
+                    start = System.currentTimeMillis(); //default for the start!
+                }
+                if(startMajor == 0){
+                    startMajor = current;
+                }
+                if(startMinor == 0){
+                    startMinor = current;
+                }
+                count++;
+                try {
+                    long startSource = ((Long)item.getProperty(SOURCE_STARTED)).longValue();
+                    if(count < minor){ //for the first few item
+                        //try to get the correct start time for the indexing!
+                        if(startSource < start){
+                            start = startSource;
+                        }
+                    }
+                    float sourceDuration = ((Float)item.getProperty(SOURCE_DURATION)).floatValue();
+                    float processDuration = ((Float)item.getProperty(PROCESS_DURATION)).floatValue();
+                    float storeDuration = ((Float)item.getProperty(STORE_DURATION)).floatValue();
+                    sourceDurationAll+=sourceDuration;
+                    sourceDurationMajor+=sourceDuration;
+                    processDurationAll+=processDuration;
+                    processDurationMajor+=processDuration;
+                    storeDurationAll+=storeDuration;
+                    storeDurationMajor+=storeDuration;
+                    double duration = sourceDuration+processDuration+storeDuration;
+                    durationAll+=duration;
+                    durationMajor+=duration;
+                    durationMinor+=duration;
+                    
+                    long time = current-startSource;
+                    timeAll+=time;
+                    timeMajor+=time;
+                    timeMinor+=time;
+                    countedAll++;
+                    countedMajor++;
+                    countedMinor++;
+                }catch(Exception e){
+                    //ignore NullpointerExceptions that will be thrown on missing
+                    //metadata!
+                }
+                if(count%major == 0){
+                    printMajor(current);
+                    sourceDurationMajor = 0;
+                    processDurationMajor = 0;
+                    storeDurationMajor = 0;
+                    durationMajor = 0;
+                    timeMajor = 0;
+                    countedMajor = 0;
+                    startMajor = 0;
+                    //reset also minor
+                    durationMinor = 0;
+                    timeMinor = 0;
+                    countedMinor = 0;
+                    startMinor = 0;
+                } else if(count%minor == 0){
+                    printMinor(current);
+                    durationMinor = 0;
+                    timeMinor = 0;
+                    countedMinor = 0;
+                    startMinor = 0;
+                }
+                
+            }
+        }
+        printSummary(current);
+        setFinished();
+    }
+
+    private void printMinor(long current) {
+        long interval = current-start;
+        long intervalMinor = current-startMinor;
+//        double itemDurationAll = countedAll>0?durationAll/countedAll:-1;
+        double itemDurationMinor = countedMinor>0?durationMinor/countedMinor:-1;
+//        double itemTimeAll = countedAll>0?timeAll/countedAll:-1;
+        double itemTimeMinor = countedMinor>0?timeMinor/countedMinor:-1;
+        out.info(String.format("    - %d items in %fsec (last %d in %7.3fsec | %7.3fms/item | %7.3fms in queue)",
+            count,(float)interval/1000f,minor,(float)intervalMinor/1000f,itemDurationMinor,itemTimeMinor));
+    }
+
+    private void printMajor(long current) {
+        long interval = current-start;
+        long intervalMajor = current-startMajor;
+        double itemDurationAll = countedAll>0?durationAll/countedAll:-1;
+        double itemDurationMajor = countedMajor>0?durationMinor/countedMajor:-1;
+        double itemTimeAll = countedAll>0?timeAll/countedAll:-1;
+        double itemTimeMajor = countedMajor>0?timeMajor/countedMajor:-1;
+        
+        double itemSourceDurationAll = countedAll>0? sourceDurationAll/countedAll:-1;
+        double itemProcessingDurationAll = countedAll>0? processDurationAll/countedAll:-1;
+        double itemStoreDurationAll = countedAll>0? sourceDurationAll/countedAll:-1;
+        
+        double itemSourceDurationMajor = countedMajor>0? sourceDurationMajor/countedMajor:-1;
+        double itemProcessingDurationMajor = countedMajor>0? processDurationMajor/countedMajor:-1;
+        double itemStoreDurationMajor = countedMajor>0? storeDurationMajor/countedMajor:-1;
+        out.info(String.format("+ %d items in %fsec | %7.3fms/item | %7.3fms in queue",
+            count,(float)interval/1000f,itemDurationAll,itemTimeAll));
+        out.info(String.format("  last %d items in %fsec | %7.3fms/item | %7.3fms in queue",
+            major,(float)intervalMajor/1000f,itemDurationMajor,itemTimeMajor));
+        out.info(String.format("  - source   : all: %7.3fms/item | current: %7.3fms/item",
+            itemSourceDurationAll,itemSourceDurationMajor));
+        out.info(String.format("  - processing: all: %7.3fms | current: %7.3fms/item",
+            itemProcessingDurationAll,itemProcessingDurationMajor));
+        out.info(String.format("  - store     : all: %7.3fms | current: %7.3fms/item",
+            itemStoreDurationAll,itemStoreDurationMajor));
+    }
+    private void printSummary(long current){
+        long interval = current-start;
+        double itemDurationAll = countedAll>0?durationAll/countedAll:-1;
+        double itemTimeAll = countedAll>0?timeAll/countedAll:-1;
+        double itemSourceDurationAll = countedAll>0? sourceDurationAll/countedAll:-1;
+        double itemProcessingDurationAll = countedAll>0? processDurationAll/countedAll:-1;
+        double itemStoreDurationAll = countedAll>0? sourceDurationAll/countedAll:-1;
+        out.info(String.format("Indexed %d items in %fsec | %7.3fms/item | %7.3fdms in queue",
+            count,(float)interval/1000f,itemDurationAll,itemTimeAll));
+        out.info(String.format("  - source   : %7.3fms/item",
+            itemSourceDurationAll));
+        out.info(String.format("  - processing: %7.3fms/item",
+            itemProcessingDurationAll));
+        out.info(String.format("  - store     : %7.3fms/item",
+            itemStoreDurationAll));
+        
+    }
+
+}

Propchange: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/FinishedEntityDaemon.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/IndexerConstants.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/IndexerConstants.java?rev=1097740&view=auto
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/IndexerConstants.java (added)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/IndexerConstants.java Fri Apr 29 09:20:31 2011
@@ -0,0 +1,94 @@
+package org.apache.stanbol.entityhub.indexing.core.impl;
+
+import org.apache.stanbol.entityhub.indexing.core.IndexingComponent;
+import org.apache.stanbol.entityhub.indexing.core.IndexingDestination;
+import org.apache.stanbol.entityhub.servicesapi.model.rdf.RdfResourceEnum;
+/**
+ * Interface with the constants used by the {@link IndexerImpl} part of this
+ * package. This Constants can be assumed as private and SHOULD NOT be used by
+ * other components.
+ * @author Rupert Westenthaler
+ *
+ */
+public interface IndexerConstants {
+    /**
+     * The field used to store the score of an entity if not <code>null</code>
+     * and &gt;= 0.
+     */
+    String SCORE_FIELD = RdfResourceEnum.signRank.getUri();
+    
+    /**
+     * Key used to store the time when the reading from the source started
+     */
+    String SOURCE_STARTED = "entity.source.started";
+    /**
+     * Key used to store the time when the reading from the source completed
+     */
+    String SOURCE_COMPLETE = "entity.source.complete";
+    /**
+     * Key used to store the time needed to read the entity from the source.
+     * ({@link Float})
+     */
+    String SOURCE_DURATION = "entity.source.duration";
+    /**
+     * Key used to store the time when the processing of the entity started
+     */
+    String PROCESS_STARTED = "entity.process.started";
+    /**
+     * Key used to store the time when the processing of the entity completed
+     */
+    String PROCESS_COMPLETE = "entity.process.complete";
+    /**
+     * Key used to store the time needed for processing an entity. ({@link Float})
+     */
+    String PROCESS_DURATION = "entity.process.duration";
+    /**
+     * Key used to store the time when the storing of the entity started
+     */
+    String STORE_STARTED = "entity.store.started";
+    /**
+     * Key used to store the time when the storing of the entity completed
+     */
+    String STORE_COMPLETE = "entity.store.complete";
+    /**
+     * Key used to store the time needed to store the entity. ({@link Float})
+     */
+    String STORE_DURATION = "entity.store.duration";
+    /**
+     * Key used to store the time stamp when the error occurred
+     */
+    String ERROR_TIME = "entity.error.time";
+    /**
+     * Item used by the consumers to recognise that the Queue has finished.
+     * See http://stackoverflow.com/questions/1956526/under-what-conditions-will-blockingqueue-take-throw-interrupted-exception Thread}
+     * for an Example.
+     */
+    //ignore the Type safety because the item is of
+    //INDEXING_COMPLETED_QUEUE_ITEM is anyway null
+    @SuppressWarnings("unchecked")
+    QueueItem INDEXING_COMPLETED_QUEUE_ITEM = new QueueItem(null);
+
+    /**
+     * The sequence number for {@link IndexingDaemon}s that read from the 
+     * {@link IndexingComponent}s 
+     */
+    Integer SEQUENCE_NUMBER_SOURCE_DAEMON = 0;
+    /**
+     * The sequence number for {@link IndexingDaemon}s that process Entities
+     */
+    Integer SEQUENCE_NUMBER_PROCESSOR_DAEMON = 1;
+    /**
+     * The sequence number for {@link IndexingDaemon}s that persist Entities to
+     * the {@link IndexingDestination}
+     */
+    Integer SEQUENCE_NUMBER_PERSIT_DAEMON = 2;
+    /**
+     * The sequence number for {@link IndexingDaemon}s that indexed Entities
+     */
+    Integer SEQUENCE_NUMBER_FINISHED_DAEMON = 3;
+    /**
+     * The sequence number for {@link IndexingDaemon}s that handle errors
+     */
+    Integer SEQUENCE_NUMBER_ERROR_HANDLING_DAEMON = 4;
+
+}

Propchange: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/IndexerConstants.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain