You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stanbol.apache.org by rw...@apache.org on 2011/04/29 11:20:38 UTC
svn commit: r1097740 [2/10] - in /incubator/stanbol/trunk: entityhub/
entityhub/generic/core/src/main/java/org/apache/stanbol/entityhub/core/mapping/
entityhub/generic/servicesapi/src/main/java/org/apache/stanbol/entityhub/servicesapi/mapping/
entityhu...
Added: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/config/IndexingConfig.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/config/IndexingConfig.java?rev=1097740&view=auto
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/config/IndexingConfig.java (added)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/config/IndexingConfig.java Fri Apr 29 09:20:31 2011
@@ -0,0 +1,601 @@
+package org.apache.stanbol.entityhub.indexing.core.config;
+
+import static org.apache.stanbol.entityhub.indexing.core.config.IndexingConstants.*;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.LineIterator;
+import org.apache.stanbol.entityhub.core.mapping.FieldMappingUtils;
+import org.apache.stanbol.entityhub.indexing.core.EntityDataIterable;
+import org.apache.stanbol.entityhub.indexing.core.EntityDataProvider;
+import org.apache.stanbol.entityhub.indexing.core.EntityIterator;
+import org.apache.stanbol.entityhub.indexing.core.EntityProcessor;
+import org.apache.stanbol.entityhub.indexing.core.EntityScoreProvider;
+import org.apache.stanbol.entityhub.indexing.core.IndexingDestination;
+import org.apache.stanbol.entityhub.indexing.core.normaliser.DefaultNormaliser;
+import org.apache.stanbol.entityhub.indexing.core.normaliser.ScoreNormaliser;
+import org.apache.stanbol.entityhub.servicesapi.mapping.FieldMapping;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class IndexingConfig {
+ private static final String DEFAULT_ROOT_PATH = "indexing";
+ private static final String CONFIG_PATH = "config";
+ private static final String SOURCE_PATH = "resources";
+ private static final String DESTINATION_PATH = "destination";
+ private static final String DISTRIBUTION_PATH = "dist";
+ private static final String INDEXING_PROERTIES = "indexing.properties";
+ private static final String CONFIG_PARAM = "config";
+ public static final String KEY_INDEXING_CONFIG = "indexingConfig";
+
+ /**
+ * Internally used to explain the syntax in the configuration file to parse parameters
+ */
+ private static final String SYNTAX_ERROR_MESSAGE = "{key}={value1},{param1}:{value1},{param2}:{value2};{value2},{param1}:{value1} ...";
+
+ private static final Logger log = LoggerFactory.getLogger(IndexingConfig.class);
+ private static final String DEFAULT_INDEX_FIELD_CONFIG_FILE_NAME = "indexFieldConfig.txt";
+
+ private final File rootDir;
+ private final File configDir;
+ private final File sourceDir;
+ private final File destinationDir;
+ private final File distributionDir;
+ private final Map<String,Object> configuration;
+
+ private String name;
+
+ private EntityDataIterable entityDataIterable = null;
+ private EntityDataProvider entityDataProvider = null;
+
+ private EntityIterator entityIdIterator = null;
+ private EntityScoreProvider entityScoreProvider = null;
+
+ private ScoreNormaliser scoreNormaliser = null;
+
+ private EntityProcessor entityProcessor = null;
+
+ private IndexingDestination indexingDestination = null;
+ /**
+ * The configuration of the fields/languages included/excluded in the index.
+ */
+ private Collection<FieldMapping> fieldMappings;
+
+ public IndexingConfig(){
+ this(null);
+ }
+
+ public IndexingConfig(String rootPath){
+ //first get the root
+ File root = new File(System.getProperty("user.dir"));
+ if(rootPath != null){
+ File parsed = new File(rootPath);
+ if(!parsed.isAbsolute()){
+ root = new File(root,rootPath); //add parsed to "user.dir"
+ } else {
+ root = parsed; //use the parsed absolute path
+ }
+ }
+ //now we need to add the name of the root folder
+ root = new File(root,DEFAULT_ROOT_PATH);
+ //check if root exists
+ if(!root.isDirectory()){
+ throw new IllegalArgumentException(
+ "The root folder for the indexing '"+root.getAbsolutePath()+
+ "' does not exist!");
+ } else {
+ this.rootDir = root;
+ }
+ //check also for the config
+ this.configDir = new File(root,CONFIG_PATH);
+ if(!configDir.isDirectory()){
+ throw new IllegalArgumentException(
+ "The root folder for the indexing configuration '"+
+ root.getAbsolutePath()+"' does not exist!");
+ }
+ this.sourceDir = new File(root,SOURCE_PATH);
+ if(!sourceDir.exists()){
+ log.info("The resource folder '"+sourceDir.getAbsolutePath()+
+ "' (typically containing the sources used for indexing) does not exist");
+ log.info(" - this might be OK if no (local) resources are needed for the indexing");
+ }
+ this.destinationDir = new File(root,DESTINATION_PATH);
+ if(!destinationDir.exists()){
+ if(!destinationDir.mkdirs()){
+ throw new IllegalStateException(
+ "Unable to create target folder for indexing '"+
+ destinationDir.getAbsolutePath()+"'!");
+ }
+ }
+ this.distributionDir = new File(root,DISTRIBUTION_PATH);
+ if(!distributionDir.exists()){
+ if(!distributionDir.mkdirs()){
+ throw new IllegalStateException(
+ "Unable to create distribution folder for indexing '"+
+ destinationDir.getAbsolutePath()+"'!");
+ }
+ }
+ //check the main configuration
+ File indexingConfigFile = new File(this.configDir,INDEXING_PROERTIES);
+ this.configuration = loadConfig(indexingConfigFile,true);
+ Object value = configuration.get(KEY_NAME);
+ if(value == null){
+ throw new IllegalArgumentException("Indexing Configuration '"+
+ indexingConfigFile+"' is missing the required key "+KEY_NAME+"!");
+ }
+ this.name = value.toString();
+ if(name.isEmpty()){
+ throw new IllegalArgumentException("Invalid Indexing Configuration '"+
+ indexingConfigFile+"': The value for the parameter"+KEY_NAME+" MUST NOT be empty!");
+ }
+ value = configuration.get(KEY_INDEX_FIELD_CONFIG);
+ if(value == null || value.toString().isEmpty()){
+ value = DEFAULT_INDEX_FIELD_CONFIG_FILE_NAME;
+ }
+ final File indexFieldConfig = new File(configDir,value.toString());
+ if(indexFieldConfig.isFile()){
+ try {
+ this.fieldMappings = FieldMappingUtils.parseFieldMappings(new Iterator<String>() {
+ LineIterator it = IOUtils.lineIterator(new FileInputStream(indexFieldConfig), "UTF-8");
+ @Override
+ public boolean hasNext() {
+ return it.hasNext();
+ }
+ @Override
+ public String next() {
+ return it.nextLine();
+ }
+ @Override
+ public void remove() {
+ it.remove();
+ }
+ });
+ } catch (IOException e) {
+ throw new IllegalStateException("Unable to read Index Field Configuration form '"
+ +indexFieldConfig+"'!",e);
+ }
+ } else {
+ throw new IllegalArgumentException("Invalid Indexing Configuration: " +
+ "IndexFieldConfiguration '"+indexFieldConfig+"' not found. " +
+ "Provide the missing file or use the '"+KEY_INDEX_FIELD_CONFIG+
+ "' in the '"+indexingConfigFile+"' to configure a different one!");
+ }
+ }
+
+ /**
+ * Loads an {@link Properties} configuration from the parsed file and
+ * returns it as Map
+ * @param configFile the file
+ * @param required if <code>true</code> an {@link IllegalArgumentException}
+ * will be thrown if the config was not present otherwise an empty map will
+ * be returned
+ * @return The configuration as Map
+ */
+ @SuppressWarnings("unchecked")
+ private Map<String,Object> loadConfig(File configFile, boolean required) {
+ //Uses an own implementation to parse key=value configuration
+ //The problem with the java properties is that keys do not support
+ //UTF-8, but some configurations might want to use URLs as keys!
+ Map<String,Object> configMap = new HashMap<String,Object>();
+ try {
+ LineIterator lines = IOUtils.lineIterator(new FileInputStream(configFile), "UTF-8");
+ while(lines.hasNext()){
+ String line = (String)lines.next();
+ if(!line.isEmpty()){
+ int indexOfEquals = line.indexOf('=');
+ String key = indexOfEquals > 0 ?
+ line.substring(0,indexOfEquals).trim():
+ line.trim();
+ if(key.charAt(0) != '#' && key.charAt(0) != '!'){ //no comment
+ String value;
+ if(indexOfEquals > 0 && indexOfEquals < line.length()-1){
+ value = line.substring(indexOfEquals+1,line.length());
+ } else {
+ value = null;
+ }
+ configMap.put(key,value);
+ } // else ignore comments
+ } //else ignore empty lines
+ }
+ } catch (FileNotFoundException e) {
+ if(required){
+ throw new IllegalArgumentException(
+ "Unable to find configuration file '"+
+ configFile.getAbsolutePath()+"'!");
+ }
+ } catch (IOException e) {
+ if(required){
+ throw new IllegalStateException(
+ "Unable to read configuration file '"+
+ configFile.getAbsolutePath()+"'!",e);
+ }
+ }
+ // Old code that used java.util.Properties to load configurations!
+// Properties config = new Properties();
+// try {
+// config.load(new FileInputStream(configFile));
+// } catch (FileNotFoundException e) {
+// if(required){
+// throw new IllegalArgumentException(
+// "Unable to find configuration file '"+
+// configFile.getAbsolutePath()+"'!");
+// }
+// } catch (IOException e) {
+// if(required){
+// throw new IllegalStateException(
+// "Unable to read configuration file '"+
+// configFile.getAbsolutePath()+"'!",e);
+// }
+// }
+// if(config != null){
+// for(Enumeration<String> keys = (Enumeration<String>)config.propertyNames();keys.hasMoreElements();){
+// String key = keys.nextElement();
+// configMap.put(key, config.getProperty(key));
+// }
+// }
+ return configMap;
+ }
+ /**
+ * Getter for the root folder used for the Indexing
+ * @return the root folder (containing the config, resources, target and dist folders)
+ */
+ public final File getRootFolder() {
+ return rootDir;
+ }
+
+ /**
+ * The root folder for the configuration. Guaranteed to exist.
+ * @return the root folder for the configuration
+ */
+ public final File getConfigFolder() {
+ return configDir;
+ }
+
+ /**
+ * The root folder containing the resources used as input for the
+ * indexing process. Might not exist if no resources are available
+ * @return the root folder for the resources
+ */
+ public final File getSourceFolder() {
+ return sourceDir;
+ }
+
+ /**
+ * The root folder containing the files created by the indexing process.
+ * Guaranteed to exist.
+ * @return the target folder
+ */
+ public final File getDestinationFolder() {
+ return destinationDir;
+ }
+ /**
+ * The root folder for the distribution. Guaranteed to exist.
+ * @return the distribution folder
+ */
+ public final File getDistributionFolder() {
+ return distributionDir;
+ }
+ /**
+ * Getter for the name as configured by the {@link IndexingConstants#KEY_NAME}
+ * by the main indexing configuration.
+ * @return the name of this data source to index
+ */
+ public String getName() {
+ return name;
+ }
+ /**
+ * Getter for the description as configured by the {@link IndexingConstants#KEY_DESCRIPTION}
+ * by the main indexing configuration.
+ * @return the description of the data source to index or <code>null</code>
+ * if not defined
+ */
+ public String getDescription(){
+ Object value = configuration.get(KEY_DESCRIPTION);
+ return value != null?value.toString():null;
+ }
+ /**
+ * The {@link ScoreNormaliser} as configured by the {@link IndexingConstants#KEY_SCORE_NORMALIZER}
+ * by the main indexing configuration.
+ * @return the configured {@link ScoreNormaliser} or a {@link DefaultNormaliser} if
+ * this configuration is missing.
+ */
+ public ScoreNormaliser getNormaliser(){
+ if(scoreNormaliser == null){
+ initNormaliser();
+ }
+ return scoreNormaliser;
+ }
+ /**
+ * The {@link EntityDataIterable} as configured by the {@link IndexingConstants#KEY_ENTITY_DATA_ITERABLE}
+ * by the main indexing configuration.
+ * @return the configured {@link EntityDataIterable} or a <code>null</code> if
+ * this configuration is not present.
+ */
+ public EntityDataIterable getDataInterable(){
+ if(entityDataIterable != null){
+ return entityDataIterable;
+ } else if(configuration.containsKey(KEY_ENTITY_DATA_ITERABLE)){
+ ConfigEntry config = parseConfigEntry(configuration.get(KEY_ENTITY_DATA_ITERABLE).toString());
+ try {
+ entityDataIterable = (EntityDataIterable)Class.forName(config.getClassName()).newInstance();
+ } catch (Exception e) {
+ throw new IllegalArgumentException("Invalid EntityDataIterable configuration '"+config.getConfigString()+"'!",e);
+ }
+ //add the configuration
+ Map<String,Object> configMap = getComponentConfig(config, entityDataIterable.getClass().getSimpleName(), false);
+ //add also the directly provided parameters
+ configMap.putAll(config.getParams());
+ entityDataIterable.setConfiguration(configMap);
+ return entityDataIterable;
+ } else {
+ return null;
+ }
+ }
+ public EntityIterator getEntityIdIterator() {
+ if(entityIdIterator != null){
+ return entityIdIterator;
+ } else if(configuration.containsKey(KEY_ENTITY_ID_ITERATPR)){
+ ConfigEntry config = parseConfigEntry(configuration.get(KEY_ENTITY_ID_ITERATPR).toString());
+ try {
+ entityIdIterator = (EntityIterator)Class.forName(config.getClassName()).newInstance();
+ } catch (Exception e) {
+ throw new IllegalArgumentException("Invalid EntityIterator configuration '"+config.getConfigString()+"'!",e);
+ }
+ //add the configuration
+ Map<String,Object> configMap = getComponentConfig(config, entityIdIterator.getClass().getSimpleName(), false);
+ //add also the directly provided parameters
+ configMap.putAll(config.getParams());
+ entityIdIterator.setConfiguration(configMap);
+ return entityIdIterator;
+ } else {
+ return null;
+ }
+ }
+ public EntityDataProvider getEntityDataProvider() {
+ if(entityDataProvider != null){
+ return entityDataProvider;
+ } else if (configuration.containsKey(KEY_ENTITY_DATA_PROVIDER)){
+ ConfigEntry config = parseConfigEntry(configuration.get(KEY_ENTITY_DATA_PROVIDER).toString());
+ try {
+ entityDataProvider = (EntityDataProvider)Class.forName(config.getClassName()).newInstance();
+ } catch (Exception e) {
+ throw new IllegalArgumentException("Invalid EntityDataProvider configuration '"+config.getConfigString()+"'!",e);
+ }
+ //add the configuration
+ Map<String,Object> configMap = getComponentConfig(config, entityDataProvider.getClass().getSimpleName(), false);
+ //add also the directly provided parameters
+ configMap.putAll(config.getParams());
+ entityDataProvider.setConfiguration(configMap);
+ return entityDataProvider;
+ } else {
+ return null;
+ }
+ }
+ public EntityScoreProvider getEntityScoreProvider() {
+ if(entityScoreProvider != null){
+ return entityScoreProvider;
+ } else if (configuration.containsKey(KEY_ENTITY_SCORE_PROVIDER)){
+ ConfigEntry config = parseConfigEntry(configuration.get(KEY_ENTITY_SCORE_PROVIDER).toString());
+ try {
+ entityScoreProvider = (EntityScoreProvider)Class.forName(config.getClassName()).newInstance();
+ } catch (Exception e) {
+ throw new IllegalArgumentException("Invalid EntityScoreProvider configuration '"+config.getConfigString()+"'!",e);
+ }
+ //add the configuration
+ Map<String,Object> configMap = getComponentConfig(config, entityScoreProvider.getClass().getSimpleName(), false);
+ //add also the directly provided parameters
+ configMap.putAll(config.getParams());
+ entityScoreProvider.setConfiguration(configMap);
+ return entityScoreProvider;
+ } else {
+ return null;
+ }
+ }
+ /**
+ * The fields and languages included/excluded in the created index.<p>
+ * NOTE: Currently this uses the {@link FieldMapping} class was initially
+ * defined to be used as configuration for the {@link FieldMapper}. In
+ * future this might change to an Interface that is more tailored to
+ * defining the fields and languages included/excluded in the index and does
+ * not allow to define mappings and data type conversions as the current one
+ * @return
+ */
+ public Collection<FieldMapping> getIndexFieldConfiguration(){
+ return fieldMappings;
+ }
+ public EntityProcessor getEntityProcessor() {
+ if(entityProcessor != null){
+ return entityProcessor;
+ } else if (configuration.containsKey(KEY_ENTITY_PROCESSOR)){
+ ConfigEntry config = parseConfigEntry(configuration.get(KEY_ENTITY_PROCESSOR).toString());
+ try {
+ entityProcessor = (EntityProcessor)Class.forName(config.getClassName()).newInstance();
+ } catch (Exception e) {
+ throw new IllegalArgumentException("Invalid EntityProcessor configuration '"+config.getConfigString()+"'!",e);
+ }
+ //add the configuration
+ Map<String,Object> configMap = getComponentConfig(config, entityProcessor.getClass().getSimpleName(), false);
+ //add also the directly provided parameters
+ configMap.putAll(config.getParams());
+ entityProcessor.setConfiguration(configMap);
+ return entityProcessor;
+ } else {
+ return null;
+ }
+ }
+ public IndexingDestination getIndexingDestination() {
+ if(indexingDestination != null){
+ return indexingDestination;
+ } else if (configuration.containsKey(KEY_INDEXING_DESTINATION)){
+ ConfigEntry config = parseConfigEntry(configuration.get(KEY_INDEXING_DESTINATION).toString());
+ try {
+ indexingDestination = (IndexingDestination)Class.forName(config.getClassName()).newInstance();
+ } catch (Exception e) {
+ throw new IllegalArgumentException("Invalid IndexingDestination configuration '"+config.getConfigString()+"'!",e);
+ }
+ //add the configuration
+ Map<String,Object> configMap = getComponentConfig(config, indexingDestination.getClass().getSimpleName(), false);
+ //add also the directly provided parameters
+ configMap.putAll(config.getParams());
+ indexingDestination.setConfiguration(configMap);
+ return indexingDestination;
+ } else {
+ return null;
+ }
+ }
+
+ private void initNormaliser() {
+ Object value = configuration.get(IndexingConstants.KEY_SCORE_NORMALIZER);
+ if(value == null){
+ this.scoreNormaliser = new DefaultNormaliser();
+ } else {
+ ScoreNormaliser normaliser = null;
+ ScoreNormaliser last = null;
+ List<ConfigEntry> configs = parseConfigEntries(value.toString());
+ for(int i=configs.size()-1;i>=0;i--){
+ last = normaliser;
+ normaliser = null;
+ ConfigEntry config = configs.get(i);
+ try {
+ normaliser = (ScoreNormaliser)Class.forName(config.getClassName()).newInstance();
+ } catch (Exception e) {
+ throw new IllegalArgumentException("Invalid Normaliser configuration '"+config.getConfigString()+"'!",e);
+ }
+ Map<String,Object> normaliserConfig = getComponentConfig(config,normaliser.getClass().getSimpleName(),config.getParams().containsKey(CONFIG_PARAM));
+ //add also the directly provided parameters
+ normaliserConfig.putAll(config.getParams());
+ if(last != null){
+ normaliserConfig.put(ScoreNormaliser.CHAINED_SCORE_NORMALISER, last);
+ }
+ normaliser.setConfiguration(normaliserConfig);
+ }
+ //set the normaliser!
+ this.scoreNormaliser = normaliser;
+ }
+ }
+
+ /**
+ * Loads a configuration based on the value of the {@link #CONFIG_PARAM}
+ * parameter of the parsed {@link ConfigEntry}.
+ * @param configEntry
+ * @param defaultName
+ * @return
+ */
+ private Map<String,Object> getComponentConfig(ConfigEntry configEntry,String defaultName, boolean required) {
+ //Removed support for parsing the relative path to the config file
+ //because it was not used! (String relConfigPath was the first param)
+// File configDir;
+// if(relConfigPath == null || relConfigPath.isEmpty()){
+// configDir = this.configDir;
+// } else {
+// configDir = new File(this.configDir,relConfigPath);
+// }
+// //test also if relConfigPath = null, because also the root might not exist!
+// if(!configDir.isDirectory()){
+// if(required){
+// throw new IllegalArgumentException("The Configuration Directory '"+
+// configDir+"' does not exist (or ist not a directory)!");
+// } else {
+// return new HashMap<String,Object>();
+// }
+// }
+ //if the CONFIG_PARAM is present in the config we assume that a config is required
+ String name = configEntry.getParams().get(CONFIG_PARAM);
+ Map<String,Object> config = loadConfig(name == null ? defaultName : name, configDir, required);
+ //we need to also add the key used to get (this) indexing config
+ config.put(KEY_INDEXING_CONFIG, this);
+ return config;
+ }
+
+ /**
+ * Loads the config with the given name from the parsed directory and throwing
+ * an {@link IllegalArgumentException} if the configuration is required but
+ * not found
+ * @param name the name (".properties" is appended if missing)
+ * @param configDir the directory to look for the config
+ * @param required if this config is required or optional
+ * @return the key value mappings as map
+ */
+ private Map<String,Object> loadConfig(String name, File configDir, boolean required) {
+ Map<String,Object> loadedConfig;
+ name = name.endsWith(".properties")? name : name+".properties";
+ if(name == null){
+ if(required){
+ throw new IllegalArgumentException("Missing required parameter'"+
+ CONFIG_PARAM+"' Syntax: '"+SYNTAX_ERROR_MESSAGE +"'!");
+ } else {
+ return new HashMap<String,Object>();
+ }
+ }
+ File configFile = new File(configDir,name);
+ loadedConfig = loadConfig(configFile,required);
+ return loadedConfig;
+ }
+
+ private ConfigEntry parseConfigEntry(String config){
+ return new ConfigEntry(config);
+ }
+ private List<ConfigEntry> parseConfigEntries(String config){
+ List<ConfigEntry> configs = new ArrayList<ConfigEntry>();
+ for(String configPart : config.split(";")){
+ configs.add(parseConfigEntry(configPart));
+ }
+ return configs;
+ }
+ private class ConfigEntry {
+ private String configString;
+ private String className;
+ private Map<String,String> params;
+
+ private ConfigEntry(String config){
+ configString = config;
+ String[] parts = config.split(",");
+ className = parts[0];
+ params = new HashMap<String,String>();
+ if(parts.length>1){
+ for(int i=1;i<parts.length;i++){
+ String[] param = parts[i].split(":"); //TODO: maybe use also "=" there
+ String value = null;
+ if(param.length>1){
+ value = parts[i].substring(parts[i].indexOf(':')+1);
+ }
+ params.put(param[0], value);
+ }
+ }
+ }
+ public final String getConfigString() {
+ return configString;
+ }
+ public final String getClassName() {
+ return className;
+ }
+ public final Map<String,String> getParams() {
+ return params;
+ }
+ }
+ /**
+ * Can be used to look for a config within the configuration directory
+ * of the {@link IndexingConfig}.
+ * @param string the name of the configuration (".properties" is appended if
+ * missing)
+ * @param required if this is an required or optional configuration.
+ * @return the key value mappings as map
+ * @throws IllegalArgumentException if the configuration was not found and
+ * <code>true</code> was parsed for required
+ */
+ public Map<String,Object> getConfig(String name,boolean required) throws IllegalArgumentException {
+ return loadConfig(name, configDir, required);
+ }
+}
Propchange: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/config/IndexingConfig.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/config/IndexingConstants.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/config/IndexingConstants.java?rev=1097740&view=auto
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/config/IndexingConstants.java (added)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/config/IndexingConstants.java Fri Apr 29 09:20:31 2011
@@ -0,0 +1,30 @@
+package org.apache.stanbol.entityhub.indexing.core.config;
+
+import java.io.File;
+
+/**
+ * Constants defines/used for Indexing.
+ * @author Rupert Westenthaler
+ *
+ */
+public interface IndexingConstants {
+
+ String KEY_NAME = "name";
+ String KEY_DESCRIPTION = "description";
+ String KEY_ENTITY_DATA_ITERABLE = "entityDataIterable";
+ String KEY_ENTITY_DATA_PROVIDER = "entityDataProvider";
+ String KEY_ENTITY_ID_ITERATPR = "entityIdIterator";
+ String KEY_ENTITY_SCORE_PROVIDER = "entityScoreProvider";
+ String KEY_INDEXING_DESTINATION = "indexingDestination";
+ String KEY_INDEX_FIELD_CONFIG = "fieldConfiguration";
+ /**
+ * usage:<br>
+ * <pre>
+ * {class1},name:{name1};{class2},name:{name2};...
+ * </pre>
+ * The class implementing the normaliser and the name of the configuration
+ * file stored within /config/normaliser/{name}.properties
+ */
+ String KEY_SCORE_NORMALIZER = "scoreNormalizer";
+ String KEY_ENTITY_PROCESSOR = "entityProcessor";
+}
Propchange: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/config/IndexingConstants.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/event/IndexingEvent.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/event/IndexingEvent.java?rev=1097740&view=auto
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/event/IndexingEvent.java (added)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/event/IndexingEvent.java Fri Apr 29 09:20:31 2011
@@ -0,0 +1,22 @@
+package org.apache.stanbol.entityhub.indexing.core.event;
+
+import java.util.EventObject;
+
+import org.apache.stanbol.entityhub.indexing.core.impl.IndexerImpl;
+
+public class IndexingEvent extends EventObject {
+
+ private static final long serialVersionUID = 1L;
+ public IndexingEvent(IndexerImpl source) {
+ super(source);
+ }
+
+ @Override
+ public IndexerImpl getSource() {
+ // TODO Auto-generated method stub
+ return (IndexerImpl)super.getSource();
+ }
+
+
+
+}
Propchange: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/event/IndexingEvent.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/event/IndexingListener.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/event/IndexingListener.java?rev=1097740&view=auto
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/event/IndexingListener.java (added)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/event/IndexingListener.java Fri Apr 29 09:20:31 2011
@@ -0,0 +1,7 @@
+package org.apache.stanbol.entityhub.indexing.core.event;
+
+public interface IndexingListener {
+
+ void stateChanged(IndexingEvent event);
+ void indexingCompleted(IndexingEvent event);
+}
Propchange: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/event/IndexingListener.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/AbstractEntityIndexingDaemon.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/AbstractEntityIndexingDaemon.java?rev=1097740&view=auto
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/AbstractEntityIndexingDaemon.java (added)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/AbstractEntityIndexingDaemon.java Fri Apr 29 09:20:31 2011
@@ -0,0 +1,56 @@
+package org.apache.stanbol.entityhub.indexing.core.impl;
+
+import static org.apache.stanbol.entityhub.indexing.core.impl.IndexerConstants.SCORE_FIELD;
+import static org.apache.stanbol.entityhub.indexing.core.impl.IndexerConstants.SOURCE_COMPLETE;
+import static org.apache.stanbol.entityhub.indexing.core.impl.IndexerConstants.SOURCE_DURATION;
+import static org.apache.stanbol.entityhub.indexing.core.impl.IndexerConstants.SOURCE_STARTED;
+
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.stanbol.entityhub.indexing.core.IndexingComponent;
+import org.apache.stanbol.entityhub.indexing.core.normaliser.ScoreNormaliser;
+import org.apache.stanbol.entityhub.servicesapi.model.Representation;
+
+public abstract class AbstractEntityIndexingDaemon extends IndexingDaemon<Object,Representation> {
+
+ private final ScoreNormaliser normaliser;
+
+ protected AbstractEntityIndexingDaemon(String name,
+ ScoreNormaliser scoreNormaliser,
+ BlockingQueue<QueueItem<Representation>> produce,
+ BlockingQueue<QueueItem<IndexingError>> error) {
+ super(name,
+ IndexerConstants.SEQUENCE_NUMBER_SOURCE_DAEMON,
+ null,produce, error);
+ this.normaliser = scoreNormaliser;
+ }
+ /**
+ * Used to produce Representations by both variants of EnityIndexingDeamons
+ * @param rep the {@link Representation} extracted from the
+ * {@link IndexingComponent}s
+ * @param score The score for the Representation
+ */
+ protected final void produce(Representation rep,Float score,Long started) {
+ if(rep == null){
+ return;
+ }
+ //normalise the score if both score and a normaliser are present
+ if(score != null && normaliser != null){
+ score = normaliser.normalise(score);
+ }
+ //first set the score of the representation
+ QueueItem<Representation> item = new QueueItem<Representation>(rep);
+ //set the score as additional property to the QueueItem, because
+ //it needs to be added to the Representation after the processing completes
+ if(score != null && score.compareTo(ScoreNormaliser.ZERO) >= 0){
+ item.setProperty(SCORE_FIELD, score);
+ }
+ item.setProperty(SOURCE_STARTED, started);
+ Long completed = Long.valueOf(System.currentTimeMillis());
+ item.setProperty(SOURCE_COMPLETE, completed);
+ Float duration = Float.valueOf((float)(completed.longValue()-started.longValue()));
+ item.setProperty(SOURCE_DURATION, duration);
+ produce(item);
+ }
+
+}
Propchange: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/AbstractEntityIndexingDaemon.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/EntityDataBasedIndexingDaemon.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/EntityDataBasedIndexingDaemon.java?rev=1097740&view=auto
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/EntityDataBasedIndexingDaemon.java (added)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/EntityDataBasedIndexingDaemon.java Fri Apr 29 09:20:31 2011
@@ -0,0 +1,67 @@
+package org.apache.stanbol.entityhub.indexing.core.impl;
+
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.stanbol.entityhub.indexing.core.EntityDataIterable;
+import org.apache.stanbol.entityhub.indexing.core.EntityDataIterator;
+import org.apache.stanbol.entityhub.indexing.core.EntityScoreProvider;
+import org.apache.stanbol.entityhub.indexing.core.IndexingComponent;
+import org.apache.stanbol.entityhub.indexing.core.normaliser.ScoreNormaliser;
+import org.apache.stanbol.entityhub.servicesapi.model.Representation;
+
+/**
+ * Daemon the extracts Entities from the {@link IndexingComponent}s by iterating
+ * over the entity data and making lookups for the entity scores
+ * @author Rupert Westenthaler
+ */
+public class EntityDataBasedIndexingDaemon extends AbstractEntityIndexingDaemon {
+ private final EntityDataIterable dataIterable;
+ private final EntityScoreProvider scoreProvider;
+ private final boolean indexAllEntitiesState;
+ public EntityDataBasedIndexingDaemon(String name,
+ BlockingQueue<QueueItem<Representation>> produce,
+ BlockingQueue<QueueItem<IndexingError>> error,
+ EntityDataIterable dataIterable,
+ EntityScoreProvider scoreProvider,
+ ScoreNormaliser normaliser,
+ boolean indexAllEntitiesState) {
+ super(name, normaliser,produce, error);
+ if(dataIterable == null){
+ throw new IllegalArgumentException("The parsed EntityDataIterator MUST NOT be NULL");
+ }
+ if(scoreProvider == null){
+ throw new IllegalArgumentException("The parsed EntityScoreProvider MUST NOT be NULL");
+ }
+ this.dataIterable = dataIterable;
+ this.scoreProvider = scoreProvider;
+ this.indexAllEntitiesState = indexAllEntitiesState;
+ }
+
+ @Override
+ public void run() {
+ log.info("...start iterating over Entity data");
+ EntityDataIterator dataIterator = dataIterable.entityDataIterator();
+ while(dataIterator.hasNext()){
+ Long start = Long.valueOf(System.currentTimeMillis());
+ String id = dataIterator.next();
+ Representation rep = null;
+ Float score;
+ if(!scoreProvider.needsData()){
+ score = scoreProvider.process(id);
+ } else {
+ rep = dataIterator.getRepresentation();
+ score = scoreProvider.process(rep);
+ }
+ if(indexAllEntitiesState || //all entities are indexed anyway
+ score == null || //no score available
+ score.compareTo(ScoreNormaliser.ZERO) >= 0){ //score >= 0
+ if(rep == null){
+ rep = dataIterator.getRepresentation();
+ }
+ produce(rep,score,start);
+ } // else ignore this entity
+ }
+ setFinished();
+ }
+
+}
Propchange: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/EntityDataBasedIndexingDaemon.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/EntityErrorLoggerDaemon.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/EntityErrorLoggerDaemon.java?rev=1097740&view=auto
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/EntityErrorLoggerDaemon.java (added)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/EntityErrorLoggerDaemon.java Fri Apr 29 09:20:31 2011
@@ -0,0 +1,31 @@
+package org.apache.stanbol.entityhub.indexing.core.impl;
+
+import java.util.concurrent.BlockingQueue;
+
+import org.slf4j.Logger;
+
+public class EntityErrorLoggerDaemon extends IndexingDaemon<IndexingError,Object> {
+
+ private final Logger err;
+ public EntityErrorLoggerDaemon(BlockingQueue<QueueItem<IndexingError>> consume,
+ Logger err) {
+ super("Indexer: Entity Error Logging Daemon",
+ IndexerConstants.SEQUENCE_NUMBER_ERROR_HANDLING_DAEMON,
+ consume, null, null);
+ this.err = err;
+ }
+
+ @Override
+ public void run() {
+ while(!isQueueFinished()){
+ QueueItem<IndexingError> errorItem = consume();
+ if(errorItem != null){
+ IndexingError error = errorItem.getItem();
+ err.error(String.format("Error while indexing %s: %s",
+ error.getEntity(),error.getMessage()),error.getException());
+ }
+ }
+ setFinished();
+ }
+
+}
Propchange: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/EntityErrorLoggerDaemon.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/EntityIdBasedIndexingDaemon.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/EntityIdBasedIndexingDaemon.java?rev=1097740&view=auto
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/EntityIdBasedIndexingDaemon.java (added)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/EntityIdBasedIndexingDaemon.java Fri Apr 29 09:20:31 2011
@@ -0,0 +1,51 @@
+package org.apache.stanbol.entityhub.indexing.core.impl;
+
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.stanbol.entityhub.indexing.core.EntityDataProvider;
+import org.apache.stanbol.entityhub.indexing.core.EntityIterator;
+import org.apache.stanbol.entityhub.indexing.core.EntityIterator.EntityScore;
+import org.apache.stanbol.entityhub.indexing.core.normaliser.ScoreNormaliser;
+import org.apache.stanbol.entityhub.servicesapi.model.Representation;
+
+public class EntityIdBasedIndexingDaemon extends AbstractEntityIndexingDaemon {
+
+ private final EntityIterator entityIdIterator;
+ private final EntityDataProvider dataProvider;
+ private final boolean indexAllEntitiesState;
+ public EntityIdBasedIndexingDaemon(String name,
+ BlockingQueue<QueueItem<Representation>> produce,
+ BlockingQueue<QueueItem<IndexingError>> error,
+ EntityIterator entityIdIterator,
+ EntityDataProvider dataProvider,
+ ScoreNormaliser normaliser,
+ boolean indexAllEntitiesState) {
+ super(name, normaliser,produce, error);
+ if(entityIdIterator == null){
+ throw new IllegalArgumentException("The parsed EntityIterator MUST NOT be NULL");
+ }
+ if(dataProvider == null){
+ throw new IllegalArgumentException("The parsed EntityDataProvider MUST NOT be NULL");
+ }
+ this.entityIdIterator = entityIdIterator;
+ this.dataProvider = dataProvider;
+ this.indexAllEntitiesState = indexAllEntitiesState;
+ }
+
+ @Override
+ public void run() {
+ while(entityIdIterator.hasNext()){
+ Long start = Long.valueOf(System.currentTimeMillis());
+ EntityScore entityScore = entityIdIterator.next();
+ if(indexAllEntitiesState || //all entities are indexed anyway
+ entityScore.score == null || //no score available
+ entityScore.score.compareTo(ScoreNormaliser.ZERO) >= 0){ //score >= 0
+ Representation rep = dataProvider.getEntityData(entityScore.id);
+
+ produce(rep,entityScore.score,start);
+ } //else ignore this entity
+ }
+ setFinished();
+ }
+
+}
Propchange: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/EntityIdBasedIndexingDaemon.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/EntityPersisterRunnable.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/EntityPersisterRunnable.java?rev=1097740&view=auto
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/EntityPersisterRunnable.java (added)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/EntityPersisterRunnable.java Fri Apr 29 09:20:31 2011
@@ -0,0 +1,105 @@
+package org.apache.stanbol.entityhub.indexing.core.impl;
+import static org.apache.stanbol.entityhub.indexing.core.impl.IndexerConstants.STORE_COMPLETE;
+import static org.apache.stanbol.entityhub.indexing.core.impl.IndexerConstants.STORE_DURATION;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.stanbol.entityhub.servicesapi.model.Representation;
+import org.apache.stanbol.entityhub.servicesapi.yard.Yard;
+import org.apache.stanbol.entityhub.servicesapi.yard.YardException;
+
+/**
+ * @author Rupert Westenthaler
+ */
+public class EntityPersisterRunnable extends IndexingDaemon<Representation,Representation> {
+
+ private int chunkSize;
+ private Yard yard;
+ public EntityPersisterRunnable(String name,
+ BlockingQueue<QueueItem<Representation>> consume,
+ BlockingQueue<QueueItem<Representation>> produce,
+ BlockingQueue<QueueItem<IndexingError>> error,
+ int chunkSize, Yard yard){
+ super(name,IndexerConstants.SEQUENCE_NUMBER_PERSIT_DAEMON,
+ consume,produce,error);
+ this.chunkSize = chunkSize;
+ this.yard = yard;
+ }
+ @Override
+ public void run() {
+ Map<String,QueueItem<Representation>> toStore = new HashMap<String,QueueItem<Representation>>();
+ while(!isQueueFinished()){
+ QueueItem<Representation> item;
+ item = consume();
+ if(item != null){
+ if(item.getItem() != null){
+ toStore.put(item.getItem().getId(),item);
+ }
+ }
+ if(toStore.size() >= chunkSize){
+ process(toStore);
+ }
+ }
+ //process the remaining
+ process(toStore);
+ setFinished();
+ }
+ /**
+ * processes the items within the parsed Map
+ * @param toStore the items to process
+ */
+ private void process(Map<String,QueueItem<Representation>> toStore) {
+ //keep the number of elements because store(..) will remove them!
+ int elements = toStore.size();
+ Long start = Long.valueOf(System.currentTimeMillis());
+ Collection<QueueItem<Representation>> stored = store(toStore);
+ Long completed = Long.valueOf(System.currentTimeMillis());
+ Float duration = Float.valueOf(((float)(completed.longValue()-start.longValue()))/elements);
+ for(QueueItem<Representation> storedItem : stored){
+ storedItem.setProperty(STORE_COMPLETE, completed);
+ storedItem.setProperty(STORE_DURATION, duration);
+ produce(storedItem);
+ }
+ }
+ /**
+ * Stores the parsed Representations to the {@link #yard} and
+ * {@link #sendError(String, String, Exception)} for entities that could
+ * not be stored!
+ * @param toStore the Representations to store. This method removes all
+ * Elements of this map while doing the work
+ */
+ private Set<QueueItem<Representation>> store(Map<String,QueueItem<Representation>> toStore) {
+ String errorMsg;
+ YardException yardException = null;
+ Set<QueueItem<Representation>> stored = new HashSet<QueueItem<Representation>>();
+ Collection<Representation> reps = new ArrayList<Representation>(toStore.size());
+ for(QueueItem<Representation> item:toStore.values()){
+ reps.add(item.getItem());
+ }
+ try {
+ for(Representation r : yard.store(reps)){
+ QueueItem<Representation> old = toStore.remove(r.getId());
+ //create a new QueueItem and copy the metadata of the old one
+ stored.add(new QueueItem<Representation>(r,old));
+ }
+ errorMsg = "Entity %s was not indexed by the Yard %s";
+ } catch (YardException e) {
+ errorMsg = "Unable to store Entity %s to Yard %s because of an YardException";
+ }
+ //the remaining Items in to store have some errors
+ for(QueueItem<Representation> entry : toStore.values()){
+ sendError(entry.getItem().getId(),entry,
+ String.format(errorMsg,entry.getItem().getId(),yard.getId()),
+ yardException);
+ }
+ toStore.clear(); //clear the
+ return stored;
+ }
+
+}
\ No newline at end of file
Propchange: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/EntityPersisterRunnable.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/EntityProcessorRunnable.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/EntityProcessorRunnable.java?rev=1097740&view=auto
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/EntityProcessorRunnable.java (added)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/EntityProcessorRunnable.java Fri Apr 29 09:20:31 2011
@@ -0,0 +1,70 @@
+package org.apache.stanbol.entityhub.indexing.core.impl;
+
+import static org.apache.stanbol.entityhub.indexing.core.impl.IndexerConstants.PROCESS_COMPLETE;
+import static org.apache.stanbol.entityhub.indexing.core.impl.IndexerConstants.PROCESS_DURATION;
+import static org.apache.stanbol.entityhub.indexing.core.impl.IndexerConstants.PROCESS_STARTED;
+
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.stanbol.entityhub.indexing.core.EntityProcessor;
+import org.apache.stanbol.entityhub.servicesapi.model.Representation;
+
+/**
+ * Consumes Representations as created by the IndexingSource and processes
+ * it by using the configured {@link EntityProcessor}. In addition this
+ * components adds configured keys to the Representation.
+ * @author Rupert Westenthaler
+ *
+ */
+public class EntityProcessorRunnable extends IndexingDaemon<Representation,Representation> {
+ private final EntityProcessor processor;
+ private final Set<String> keys;
+ public EntityProcessorRunnable(String name,
+ BlockingQueue<QueueItem<Representation>> consume,
+ BlockingQueue<QueueItem<Representation>> produce,
+ BlockingQueue<QueueItem<IndexingError>> error,
+ EntityProcessor processor,Set<String> keys) {
+ super(name,IndexerConstants.SEQUENCE_NUMBER_PROCESSOR_DAEMON,
+ consume,produce,error);
+ this.processor = processor;
+ if(keys == null){
+ this.keys = Collections.emptySet();
+ } else {
+ this.keys = keys;
+ }
+ }
+ @Override
+ public void run() {
+ while(!isQueueFinished()){
+ QueueItem<Representation> item = consume();
+ if(item != null){
+ Long start = Long.valueOf(System.currentTimeMillis());
+ item.setProperty(PROCESS_STARTED, start);
+ Representation processed = processor.process(item.getItem());
+ if(processed == null){
+ sendError(item.getItem().getId(),item,
+ String.format("Processor %s returned NULL for Entity %s",
+ processor,item.getItem().getId()), null);
+ } else {
+ for(String key : keys){
+ //consume the property and add it to the
+ //transformed representation
+ Object value = item.removeProperty(key);
+ if(value != null){
+ processed.add(key, value);
+ }
+ }
+ QueueItem<Representation> produced = new QueueItem<Representation>(processed,item);
+ Long completed = Long.valueOf(System.currentTimeMillis());
+ produced.setProperty(PROCESS_COMPLETE, completed);
+ produced.setProperty(PROCESS_DURATION, Float.valueOf(
+ (float)(completed.longValue()-start.longValue())));
+ produce(produced);
+ }
+ }
+ }
+ setFinished();
+ }
+}
\ No newline at end of file
Propchange: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/EntityProcessorRunnable.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/FinishedEntityDaemon.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/FinishedEntityDaemon.java?rev=1097740&view=auto
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/FinishedEntityDaemon.java (added)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/FinishedEntityDaemon.java Fri Apr 29 09:20:31 2011
@@ -0,0 +1,199 @@
+package org.apache.stanbol.entityhub.indexing.core.impl;
+
+import static org.apache.stanbol.entityhub.indexing.core.impl.IndexerConstants.PROCESS_DURATION;
+import static org.apache.stanbol.entityhub.indexing.core.impl.IndexerConstants.SOURCE_DURATION;
+import static org.apache.stanbol.entityhub.indexing.core.impl.IndexerConstants.SOURCE_STARTED;
+import static org.apache.stanbol.entityhub.indexing.core.impl.IndexerConstants.STORE_DURATION;
+
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.stanbol.entityhub.servicesapi.model.Representation;
+import org.slf4j.Logger;
+
+public class FinishedEntityDaemon extends IndexingDaemon<Representation,Object> {
+
+ private static final int DEFAULT_MAJOR_INTERVAL = 100000;
+ /**
+ * For now use an logger as output!
+ */
+ private Logger out;
+
+ private int major;
+ private int minor;
+ private double sourceDurationAll;
+ private double processDurationAll;
+ private double storeDurationAll;
+ private double durationAll;
+ private double sourceDurationMajor;
+ private double processDurationMajor;
+ private double storeDurationMajor;
+ private double durationMajor;
+ private double durationMinor;
+ private long start;
+ private long startMajor;
+ private long startMinor;
+
+ private double timeAll;
+ private double timeMajor;
+ private double timeMinor;
+
+ private long count;
+ private long countedAll;
+ private long countedMajor;
+ private long countedMinor;
+
+
+ public FinishedEntityDaemon(BlockingQueue<QueueItem<Representation>> consume,
+ int majorInterval,
+ Logger out) {
+ super("Indexing: Finished Entity Logger Deamon",
+ IndexerConstants.SEQUENCE_NUMBER_FINISHED_DAEMON,
+ consume, null, null);
+ this.out = out;
+ if(majorInterval > 0){
+ this.major = majorInterval;
+ } else {
+ this.major = DEFAULT_MAJOR_INTERVAL;
+ }
+ this.minor = major/10;
+ }
+
+ @Override
+ public void run() {
+ count = 0; //Elements indexed
+ //Elements with valid statistics
+ countedAll = 0;
+ countedMajor = 0;
+ countedMinor = 0;
+ long current = System.currentTimeMillis();
+ while(!isQueueFinished()){
+ QueueItem<Representation> item = consume();
+ if(item != null){
+ current = System.currentTimeMillis();
+ if(count == 0){
+ start = System.currentTimeMillis(); //default for the start!
+ }
+ if(startMajor == 0){
+ startMajor = current;
+ }
+ if(startMinor == 0){
+ startMinor = current;
+ }
+ count++;
+ try {
+ long startSource = ((Long)item.getProperty(SOURCE_STARTED)).longValue();
+ if(count < minor){ //for the first few item
+ //try to get the correct start time for the indexing!
+ if(startSource < start){
+ start = startSource;
+ }
+ }
+ float sourceDuration = ((Float)item.getProperty(SOURCE_DURATION)).floatValue();
+ float processDuration = ((Float)item.getProperty(PROCESS_DURATION)).floatValue();
+ float storeDuration = ((Float)item.getProperty(STORE_DURATION)).floatValue();
+ sourceDurationAll+=sourceDuration;
+ sourceDurationMajor+=sourceDuration;
+ processDurationAll+=processDuration;
+ processDurationMajor+=processDuration;
+ storeDurationAll+=storeDuration;
+ storeDurationMajor+=storeDuration;
+ double duration = sourceDuration+processDuration+storeDuration;
+ durationAll+=duration;
+ durationMajor+=duration;
+ durationMinor+=duration;
+
+ long time = current-startSource;
+ timeAll+=time;
+ timeMajor+=time;
+ timeMinor+=time;
+ countedAll++;
+ countedMajor++;
+ countedMinor++;
+ }catch(Exception e){
+ //ignore NullpointerExceptions that will be thrown on missing
+ //metadata!
+ }
+ if(count%major == 0){
+ printMajor(current);
+ sourceDurationMajor = 0;
+ processDurationMajor = 0;
+ storeDurationMajor = 0;
+ durationMajor = 0;
+ timeMajor = 0;
+ countedMajor = 0;
+ startMajor = 0;
+ //reset also minor
+ durationMinor = 0;
+ timeMinor = 0;
+ countedMinor = 0;
+ startMinor = 0;
+ } else if(count%minor == 0){
+ printMinor(current);
+ durationMinor = 0;
+ timeMinor = 0;
+ countedMinor = 0;
+ startMinor = 0;
+ }
+
+ }
+ }
+ printSummary(current);
+ setFinished();
+ }
+
+ private void printMinor(long current) {
+ long interval = current-start;
+ long intervalMinor = current-startMinor;
+// double itemDurationAll = countedAll>0?durationAll/countedAll:-1;
+ double itemDurationMinor = countedMinor>0?durationMinor/countedMinor:-1;
+// double itemTimeAll = countedAll>0?timeAll/countedAll:-1;
+ double itemTimeMinor = countedMinor>0?timeMinor/countedMinor:-1;
+ out.info(String.format(" - %d items in %fsec (last %d in %7.3fsec | %7.3fms/item | %7.3fms in queue)",
+ count,(float)interval/1000f,minor,(float)intervalMinor/1000f,itemDurationMinor,itemTimeMinor));
+ }
+
+ private void printMajor(long current) {
+ long interval = current-start;
+ long intervalMajor = current-startMajor;
+ double itemDurationAll = countedAll>0?durationAll/countedAll:-1;
+ double itemDurationMajor = countedMajor>0?durationMinor/countedMajor:-1;
+ double itemTimeAll = countedAll>0?timeAll/countedAll:-1;
+ double itemTimeMajor = countedMajor>0?timeMajor/countedMajor:-1;
+
+ double itemSourceDurationAll = countedAll>0? sourceDurationAll/countedAll:-1;
+ double itemProcessingDurationAll = countedAll>0? processDurationAll/countedAll:-1;
+ double itemStoreDurationAll = countedAll>0? sourceDurationAll/countedAll:-1;
+
+ double itemSourceDurationMajor = countedMajor>0? sourceDurationMajor/countedMajor:-1;
+ double itemProcessingDurationMajor = countedMajor>0? processDurationMajor/countedMajor:-1;
+ double itemStoreDurationMajor = countedMajor>0? storeDurationMajor/countedMajor:-1;
+ out.info(String.format("+ %d items in %fsec | %7.3fms/item | %7.3fms in queue",
+ count,(float)interval/1000f,itemDurationAll,itemTimeAll));
+ out.info(String.format(" last %d items in %fsec | %7.3fms/item | %7.3fms in queue",
+ major,(float)intervalMajor/1000f,itemDurationMajor,itemTimeMajor));
+ out.info(String.format(" - source : all: %7.3fms/item | current: %7.3fms/item",
+ itemSourceDurationAll,itemSourceDurationMajor));
+ out.info(String.format(" - processing: all: %7.3fms | current: %7.3fms/item",
+ itemProcessingDurationAll,itemProcessingDurationMajor));
+ out.info(String.format(" - store : all: %7.3fms | current: %7.3fms/item",
+ itemStoreDurationAll,itemStoreDurationMajor));
+ }
+ private void printSummary(long current){
+ long interval = current-start;
+ double itemDurationAll = countedAll>0?durationAll/countedAll:-1;
+ double itemTimeAll = countedAll>0?timeAll/countedAll:-1;
+ double itemSourceDurationAll = countedAll>0? sourceDurationAll/countedAll:-1;
+ double itemProcessingDurationAll = countedAll>0? processDurationAll/countedAll:-1;
+ double itemStoreDurationAll = countedAll>0? sourceDurationAll/countedAll:-1;
+ out.info(String.format("Indexed %d items in %fsec | %7.3fms/item | %7.3fdms in queue",
+ count,(float)interval/1000f,itemDurationAll,itemTimeAll));
+ out.info(String.format(" - source : %7.3fms/item",
+ itemSourceDurationAll));
+ out.info(String.format(" - processing: %7.3fms/item",
+ itemProcessingDurationAll));
+ out.info(String.format(" - store : %7.3fms/item",
+ itemStoreDurationAll));
+
+ }
+
+}
Propchange: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/FinishedEntityDaemon.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/IndexerConstants.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/IndexerConstants.java?rev=1097740&view=auto
==============================================================================
--- incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/IndexerConstants.java (added)
+++ incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/IndexerConstants.java Fri Apr 29 09:20:31 2011
@@ -0,0 +1,94 @@
+package org.apache.stanbol.entityhub.indexing.core.impl;
+
+import org.apache.stanbol.entityhub.indexing.core.IndexingComponent;
+import org.apache.stanbol.entityhub.indexing.core.IndexingDestination;
+import org.apache.stanbol.entityhub.servicesapi.model.rdf.RdfResourceEnum;
+/**
+ * Interface with the constants used by the {@link IndexerImpl} part of this
+ * package. This Constants can be assumed as private and SHOULD NOT be used by
+ * other components.
+ * @author Rupert Westenthaler
+ *
+ */
+public interface IndexerConstants {
+ /**
+ * The field used to store the score of an entity if not <code>null</code>
+ * and >= 0.
+ */
+ String SCORE_FIELD = RdfResourceEnum.signRank.getUri();
+
+ /**
+ * Key used to store the time when the reading from the source started
+ */
+ String SOURCE_STARTED = "entity.source.started";
+ /**
+ * Key used to store the time when the reading from the source completed
+ */
+ String SOURCE_COMPLETE = "entity.source.complete";
+ /**
+ * Key used to store the time needed to read the entity from the source.
+ * ({@link Float})
+ */
+ String SOURCE_DURATION = "entity.source.duration";
+ /**
+ * Key used to store the time when the processing of the entity started
+ */
+ String PROCESS_STARTED = "entity.process.started";
+ /**
+ * Key used to store the time when the processing of the entity completed
+ */
+ String PROCESS_COMPLETE = "entity.process.complete";
+ /**
+ * Key used to store the time needed for processing an entity. ({@link Float})
+ */
+ String PROCESS_DURATION = "entity.process.duration";
+ /**
+ * Key used to store the time when the storing of the entity started
+ */
+ String STORE_STARTED = "entity.store.started";
+ /**
+ * Key used to store the time when the storing of the entity completed
+ */
+ String STORE_COMPLETE = "entity.store.complete";
+ /**
+ * Key used to store the time needed to store the entity. ({@link Float})
+ */
+ String STORE_DURATION = "entity.store.duration";
+ /**
+ * Key used to store the time stamp when the error occurred
+ */
+ String ERROR_TIME = "entity.error.time";
+ /**
+ * Item used by the consumers to recognise that the Queue has finished.
+ * See http://stackoverflow.com/questions/1956526/under-what-conditions-will-blockingqueue-take-throw-interrupted-exception Thread}
+ * for an Example.
+ */
+ //ignore the Type safety because the item is of
+ //INDEXING_COMPLETED_QUEUE_ITEM is anyway null
+ @SuppressWarnings("unchecked")
+ QueueItem INDEXING_COMPLETED_QUEUE_ITEM = new QueueItem(null);
+
+ /**
+ * The sequence number for {@link IndexingDaemon}s that read from the
+ * {@link IndexingComponent}s
+ */
+ Integer SEQUENCE_NUMBER_SOURCE_DAEMON = 0;
+ /**
+ * The sequence number for {@link IndexingDaemon}s that process Entities
+ */
+ Integer SEQUENCE_NUMBER_PROCESSOR_DAEMON = 1;
+ /**
+ * The sequence number for {@link IndexingDaemon}s that persist Entities to
+ * the {@link IndexingDestination}
+ */
+ Integer SEQUENCE_NUMBER_PERSIT_DAEMON = 2;
+ /**
+ * The sequence number for {@link IndexingDaemon}s that indexed Entities
+ */
+ Integer SEQUENCE_NUMBER_FINISHED_DAEMON = 3;
+ /**
+ * The sequence number for {@link IndexingDaemon}s that handle errors
+ */
+ Integer SEQUENCE_NUMBER_ERROR_HANDLING_DAEMON = 4;
+
+}
Propchange: incubator/stanbol/trunk/entityhub/indexing/core/src/main/java/org/apache/stanbol/entityhub/indexing/core/impl/IndexerConstants.java
------------------------------------------------------------------------------
svn:mime-type = text/plain