You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stanbol.apache.org by rw...@apache.org on 2012/11/27 13:38:23 UTC

svn commit: r1414154 - in /stanbol/trunk/integration-tests/src/test/java/org/apache/stanbol/enhancer/it: EnhancerTestBase.java MultiThreadedTest.java MultiThreadedTestBase.java NlpProcessingStressTest.java ProperNounLinkingTest.java

Author: rwesten
Date: Tue Nov 27 12:38:21 2012
New Revision: 1414154

URL: http://svn.apache.org/viewvc?rev=1414154&view=rev
Log:
STANBOL-819: This adds the integration tests as described by this issue. Note that the MultiThreadedTest class was refactored to make it more easy to add additional tests to the integration test.

Added:
    stanbol/trunk/integration-tests/src/test/java/org/apache/stanbol/enhancer/it/MultiThreadedTestBase.java
    stanbol/trunk/integration-tests/src/test/java/org/apache/stanbol/enhancer/it/NlpProcessingStressTest.java
    stanbol/trunk/integration-tests/src/test/java/org/apache/stanbol/enhancer/it/ProperNounLinkingTest.java   (with props)
Modified:
    stanbol/trunk/integration-tests/src/test/java/org/apache/stanbol/enhancer/it/EnhancerTestBase.java
    stanbol/trunk/integration-tests/src/test/java/org/apache/stanbol/enhancer/it/MultiThreadedTest.java

Modified: stanbol/trunk/integration-tests/src/test/java/org/apache/stanbol/enhancer/it/EnhancerTestBase.java
URL: http://svn.apache.org/viewvc/stanbol/trunk/integration-tests/src/test/java/org/apache/stanbol/enhancer/it/EnhancerTestBase.java?rev=1414154&r1=1414153&r2=1414154&view=diff
==============================================================================
--- stanbol/trunk/integration-tests/src/test/java/org/apache/stanbol/enhancer/it/EnhancerTestBase.java (original)
+++ stanbol/trunk/integration-tests/src/test/java/org/apache/stanbol/enhancer/it/EnhancerTestBase.java Tue Nov 27 12:38:21 2012
@@ -18,6 +18,9 @@ package org.apache.stanbol.enhancer.it;
 
 import static org.junit.Assert.fail;
 
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -105,20 +108,42 @@ public class EnhancerTestBase extends St
     }
     public EnhancerTestBase(String endpoint,String...assertEngines){
         super();
-        if(endpoint == null){
-            endpoint = DEFAULT_ENDPOINT;
-        }
-        if(endpoint.charAt(0) != '/')
-            this.endpoint = "/"+endpoint;
-        else{
-            this.endpoint = endpoint;
-        }
+        setEndpoint(endpoint);
         if(assertEngines == null){
             this.assertEngines = DEFAULT_ASSERT_ENGINES;
         } else {
             this.assertEngines = assertEngines;
         }
     }
+
+    /**
+     * Setter for the endpoint. Keeps care of leading '/' and supports optional query parameter
+     * @param endpoint the endpoint or <code>null</code> to use the default
+     * @param params optional query parameter(s) [key,value,key,value,...]
+     */
+    protected void setEndpoint(String endpoint,String...params) {
+        StringBuilder sb = new StringBuilder();
+        if(endpoint == null){
+            sb.append(DEFAULT_ENDPOINT);;
+        } else if(endpoint.charAt(0) != '/')
+            sb.append("/").append(endpoint);
+        else{
+            sb.append(endpoint);
+        }
+        if(params != null && params.length > 1){
+            for(int i=0;i<params.length-1;i++){
+                sb.append(i==0?'?':'&');
+                sb.append(params[i]).append('=');
+                i++;
+                try {
+                    sb.append(URLEncoder.encode(params[i], "UTF-8"));
+                } catch (UnsupportedEncodingException e) {
+                    throw new IllegalStateException(e.getMessage(),e);
+                }
+            }
+        }
+        this.endpoint = sb.toString();
+    }
     public String getEndpoint(){
         return endpoint;
     }

Modified: stanbol/trunk/integration-tests/src/test/java/org/apache/stanbol/enhancer/it/MultiThreadedTest.java
URL: http://svn.apache.org/viewvc/stanbol/trunk/integration-tests/src/test/java/org/apache/stanbol/enhancer/it/MultiThreadedTest.java?rev=1414154&r1=1414153&r2=1414154&view=diff
==============================================================================
--- stanbol/trunk/integration-tests/src/test/java/org/apache/stanbol/enhancer/it/MultiThreadedTest.java (original)
+++ stanbol/trunk/integration-tests/src/test/java/org/apache/stanbol/enhancer/it/MultiThreadedTest.java Tue Nov 27 12:38:21 2012
@@ -16,758 +16,24 @@
  */
 package org.apache.stanbol.enhancer.it;
 
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.NoSuchElementException;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.zip.GZIPInputStream;
-
-import javax.ws.rs.core.MediaType;
-
-import org.apache.clerezza.rdf.core.Literal;
-import org.apache.clerezza.rdf.core.NonLiteral;
-import org.apache.clerezza.rdf.core.Resource;
-import org.apache.clerezza.rdf.core.Triple;
-import org.apache.clerezza.rdf.core.TripleCollection;
-import org.apache.clerezza.rdf.core.UriRef;
-import org.apache.clerezza.rdf.core.impl.SimpleMGraph;
-import org.apache.clerezza.rdf.core.serializedform.Parser;
-import org.apache.clerezza.rdf.core.serializedform.SupportedFormat;
-import org.apache.clerezza.rdf.core.serializedform.UnsupportedFormatException;
-import org.apache.clerezza.rdf.jena.parser.JenaParserProvider;
-import org.apache.clerezza.rdf.rdfjson.parser.RdfJsonParsingProvider;
-import org.apache.commons.compress.archivers.ArchiveEntry;
-import org.apache.commons.compress.archivers.zip.ZipArchiveInputStream;
-import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
-import org.apache.commons.io.FilenameUtils;
-import org.apache.commons.io.IOUtils;
-import org.apache.http.HttpResponse;
-import org.apache.http.client.params.ClientPNames;
-import org.apache.http.impl.client.DefaultHttpClient;
-import org.apache.http.impl.conn.PoolingClientConnectionManager;
-import org.apache.http.params.BasicHttpParams;
-import org.apache.http.params.CoreConnectionPNames;
-import org.apache.http.params.CoreProtocolPNames;
-import org.apache.stanbol.commons.indexedgraph.IndexedMGraph;
-import org.apache.stanbol.commons.testing.http.Request;
-import org.apache.stanbol.commons.testing.http.RequestExecutor;
-import org.apache.stanbol.enhancer.servicesapi.helper.execution.Execution;
-import org.apache.stanbol.enhancer.servicesapi.helper.execution.ExecutionMetadata;
-import org.apache.stanbol.enhancer.servicesapi.rdf.Properties;
-import org.apache.stanbol.entityhub.servicesapi.defaults.NamespaceEnum;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-/** Test that the default chain is called by requesting the "/enhancer" endpoint. */
-public class MultiThreadedTest extends EnhancerTestBase {
-    
-    /**
-     * The name of the Enhancement Chain this test runs against. If not defined
-     * the default chain is used.
-     */
-    public static final String PROPERTY_CHAIN = "stanbol.it.multithreadtest.chain";
-    /**
-     * The reference to the test data. Can be a File, a Resource available via the
-     * Classpath or an URL. This also supports compressed files. In case of ZIP
-     * only the first entry is processed.
-     */
-    public static final String PROPERTY_TEST_DATA = "stanbol.it.multithreadtest.data";
-    /**
-     * Can be used to explicitly parse the Media-Type of the test data. If not set
-     * the Media-Type is parsed based on the file extension.
-     */
-    public static final String PROPERTY_TEST_DATA_TYPE = "stanbol.it.multithreadtest.media-type";
-    /**
-     * The RDF property used to filter triples their values are used as texts for
-     * Enhancer requests. Only used of test data are provided as RDF<p>
-     * Note:<ul>
-     * <li> Only triples where their Object are Literals are used
-     * <li> the default property is "http://dbpedia.org/ontology/abstract"
-     * <li> if set to "*" than all triples with literal values are used.
-     * </ul>
-     */
-    public static final String PROPERTY_TEST_DATA_PROPERTY = "stanbol.it.multithreadtest.data-property";
-    /**
-     * The maximum number of concurrent requests
-     */
-    public static final String PROPERTY_THREADS = "stanbol.it.multithreadtest.threads";
-    /**
-     * The maximum number of requests. Can be used to limit the number of requests if
-     * the provided data do contain more samples.
-     */
-    public static final String PROPERTY_REQUESTS = "stanbol.it.multithreadtest.requests";
-    /**
-     * The RDF serialisation used as Accept header for Stanbol Enhancer requests
-     */
-    public static final String PROPERTY_RDF_FORMAT = "stanbol.it.multithreadtest.rdf-format";
-    
-    private static final Logger log = LoggerFactory.getLogger(MultiThreadedTest.class);
-    
-    public final static int DEFAULT_NUM_THREADS = 5;
-    public final static int DEFAULT_NUM_REQUESTS = 500;
-    public final static String DEFAULT_RDF_FORMAT = SupportedFormat.RDF_JSON;
-    public final static String DEFAULT_TEST_DATA = "10k_long_abstracts_en.nt.bz2";
-    public final static String DEFAULT_TEST_DATA_PROPERTY = "http://dbpedia.org/ontology/abstract";
-    
-    private static Parser rdfParser;
-    private static Iterator<String> testDataIterator;
-    /*
-     * We need here a custom http client that uses a connection pool 
-     */
-    protected DefaultHttpClient pooledHttpClient;
-    private BasicHttpParams httpParams;
-    private PoolingClientConnectionManager connectionManager;
+/**
+ * Default MultiThreadedTest tool. Supports the use of System properties to
+ * configure the test. See the <a href="http://stanbol.apache.org/docs/trunk/utils/enhancerstresstest">
+ * Stanbol Enhancer Stress Test Utility</a> documentation for details
+ * @author Rupert Westenthaler
+ *
+ */
+public final class MultiThreadedTest extends MultiThreadedTestBase {
     
     public MultiThreadedTest(){
-        this(System.getProperty(PROPERTY_CHAIN),new String[]{});
-    }
-    protected MultiThreadedTest(String chain){
-        this(chain,new String[]{});
-    }
-    protected MultiThreadedTest(String chain,String...assertEngines){
-        super(null,assertEngines);
-        if(chain != null && !chain.isEmpty()){
-            log.info("Testing with Enhancement Chain '{}'",chain);
-            this.endpoint = endpoint+"/chain/"+chain;
-        } else { //else no chain configured ... use default
-            log.info("Testing default Enhancement Chain");
-        }
-        //add the parameter for the execution metadata
-        this.endpoint = this.endpoint+"?executionmetadata=true";
+        super();
     }
 
-    @BeforeClass
-    public static void init() throws IOException {
-        //init the RDF parser
-        rdfParser = new Parser();
-        rdfParser.bindParsingProvider(new JenaParserProvider());
-        rdfParser.bindParsingProvider(new RdfJsonParsingProvider());
-        //init theTestData
-        initTestData();
-    }
-    
-    
-    @Before
-    public void initialiseHttpClient() {
-        if(this.pooledHttpClient == null){ //init for the first test
-            httpParams = new BasicHttpParams();
-            httpParams.setParameter(CoreProtocolPNames.USER_AGENT, "Stanbol Integration Test");
-            httpParams.setBooleanParameter(ClientPNames.HANDLE_REDIRECTS,true);
-            httpParams.setIntParameter(ClientPNames.MAX_REDIRECTS,3);
-            httpParams.setBooleanParameter(CoreConnectionPNames.SO_KEEPALIVE,true);
-    
-            connectionManager = new PoolingClientConnectionManager();
-            connectionManager.setMaxTotal(20);
-            connectionManager.setDefaultMaxPerRoute(20);
-    
-            pooledHttpClient = new DefaultHttpClient(connectionManager,httpParams);
-        }
-    }
-    
     @Test
     public void testMultipleParallelRequests() throws Exception {
-        Integer maxRequests = Integer.getInteger(PROPERTY_REQUESTS,DEFAULT_NUM_REQUESTS);
-        if(maxRequests.intValue() <= 0){
-            maxRequests = Integer.MAX_VALUE;
-        }
-        Integer numThreads = Integer.getInteger(PROPERTY_THREADS,DEFAULT_NUM_THREADS);
-        if(numThreads <= 0){
-            numThreads = DEFAULT_NUM_THREADS;
-        }
-        log.info("Start Multi Thread testing of max. {} requests using {} threads",
-            maxRequests,numThreads);
-        ExcutionTracker tracker = new ExcutionTracker(
-            Executors.newFixedThreadPool(numThreads),
-            Math.max(100, numThreads*5));
-        String rdfFormat = System.getProperty(PROPERTY_RDF_FORMAT,DEFAULT_RDF_FORMAT);
-        int testNum;
-        for(testNum = 0;testDataIterator.hasNext() && testNum < maxRequests; testNum++){
-            String test = testDataIterator.next();
-            Request request = builder.buildPostRequest(getEndpoint())
-                    .withHeader("Accept",rdfFormat)
-                    .withContent(test);
-            tracker.register(request);
-            if(testNum%100 == 0){
-                log.info("  ... sent {} Requests ({} finished, {} pending, {} failed",
-                    new Object[]{testNum,tracker.getNumCompleted(),
-                                 tracker.getNumPending(),tracker.getFailed().size()});
-            }
-        }
-        log.info("> All {} requests sent!",testNum);
-        log.info("  ... wait for all requests to complete");
-        while(tracker.getNumPending() > 0){
-            tracker.wait(3);
-            log.info("  ... {} finished, {} pending, {} failed",
-                new Object[]{tracker.getNumCompleted(),tracker.getNumPending(),tracker.getFailed().size()});
-        }
-        log.info("Multi Thread testing of {} requests (failed: {}) using {} threads completed",
-            new Object[]{tracker.getNumCompleted(),tracker.getFailed().size(),numThreads});
-        tracker.printStatistics();
-        Assert.assertTrue(tracker.getFailed()+"/"+numThreads+" failed", tracker.getFailed().isEmpty());
-        tracker = null;
-    }
-    
-    @After
-    public final void close(){
-        httpParams = null;
-        pooledHttpClient = null;
-        connectionManager.shutdown();
-        connectionManager = null;
-    }
-    
-    @AfterClass
-    public static final void cleanup(){
-        testDataIterator = null;
-    }
-    
-    /* -------------------------------------------------------------
-     * Utilities for reading the Test Data from the defined source
-     * -------------------------------------------------------------
-     */
-    
-    private static void initTestData() throws IOException {
-        String testData = System.getProperty(PROPERTY_TEST_DATA, DEFAULT_TEST_DATA);
-        log.info("Read Testdata from '{}'",testData);
-        File testFile = new File(testData);
-        InputStream is = null;
-        if(testFile.isFile()){
-            log.info(" ... init from File");
-            is = new FileInputStream(testFile);
-        } 
-        if(is == null) {
-            is = MultiThreadedTest.class.getClassLoader().getResourceAsStream(testData);
-        }
-        if(is == null){
-           is = ClassLoader.getSystemResourceAsStream(testData);
-        }
-        if(is == null){
-            try {
-              is = new URL(testData).openStream();
-              log.info(" ... init from URL");
-            }catch (MalformedURLException e) {
-                //not a URL
-            }
-        } else {
-            log.info(" ... init via Classpath");
-        }
-        Assert.assertNotNull("Unable to load the parsed TestData '"
-            +testData+"'!", is);
-        log.info("  - InputStream: {}", is == null ? null: is.getClass().getSimpleName());
-        
-        String name = FilenameUtils.getName(testData);
-        if ("gz".equalsIgnoreCase(FilenameUtils.getExtension(name))) {
-            is = new GZIPInputStream(is);
-            name = FilenameUtils.removeExtension(name);
-            log.debug("   - from GZIP Archive");
-        } else if ("bz2".equalsIgnoreCase(FilenameUtils.getExtension(name))) {
-            is = new BZip2CompressorInputStream(is);
-            name = FilenameUtils.removeExtension(name);
-            log.debug("   - from BZip2 Archive");
-        } else if ("zip".equalsIgnoreCase(FilenameUtils.getExtension(name))) {
-            ZipArchiveInputStream zipin = new ZipArchiveInputStream(is);
-            ArchiveEntry entry = zipin.getNextEntry();
-            log.info("For ZIP archives only the 1st Entry will be processed!");
-            name = FilenameUtils.getName(entry.getName());
-            log.info("  - processed Entry: {}",entry.getName());
-        } else { // else uncompressed data ...
-            log.info("  - uncompressed source: {}",name);
-        }
-        String mediaTypeString = System.getProperty(PROPERTY_TEST_DATA_TYPE);
-        MediaType mediaType;
-        if(mediaTypeString != null){
-            mediaType = MediaType.valueOf(mediaTypeString);
-        } else { //parse based on extension
-            String ext = FilenameUtils.getExtension(name);
-            if("txt".equalsIgnoreCase(ext)){
-                mediaType = MediaType.TEXT_PLAIN_TYPE;
-            } else if("rdf".equalsIgnoreCase(ext)){
-                mediaType = MediaType.valueOf(SupportedFormat.RDF_XML);
-            } else if("xml".equalsIgnoreCase(ext)){
-                mediaType = MediaType.valueOf(SupportedFormat.RDF_XML);
-            } else if("ttl".equalsIgnoreCase(ext)){
-                mediaType = MediaType.valueOf(SupportedFormat.TURTLE);
-            } else if("n3".equalsIgnoreCase(ext)){
-                mediaType = MediaType.valueOf(SupportedFormat.N3);
-            } else if("nt".equalsIgnoreCase(ext)){
-                mediaType = MediaType.valueOf(SupportedFormat.N_TRIPLE);
-            } else if("json".equalsIgnoreCase(ext)){
-                mediaType = MediaType.valueOf(SupportedFormat.RDF_JSON);
-            } else if(name.indexOf('.')<0){ //no extension
-                mediaType = MediaType.TEXT_PLAIN_TYPE; //try plain text
-            } else {
-                log.info("Unkown File Extension {} for resource name {}",
-                    ext,name);
-                mediaType = null;
-            }
-        }
-        Assert.assertNotNull("Unable to detect MediaType for Resource '"
-            + name+"'. Please use the property '"+PROPERTY_TEST_DATA_TYPE
-            + "' to manually parse the MediaType!", mediaType);
-        
-        log.info("  - Media-Type: {}", mediaType);
-        //now init the iterator for the test data
-        testDataIterator = mediaType.isCompatible(MediaType.TEXT_PLAIN_TYPE) ?
-            createTextDataIterator(is, mediaType) :
-            createRdfDataIterator(is, mediaType);
-    }
-    /**
-     * Iterator implementation that parses an RDF graph from the parsed
-     * {@link InputStream}. The RDF data are loaded in-memory. Because of this
-     * only test data that fit in-memory can be used. <p>
-     * Literal values (objects) of the {@link #PROPERTY_TEST_DATA_PROPERTY} are
-     * used as data. If this property is not present {@link #DEFAULT_TEST_DATA_PROPERTY}
-     * is used. If {@link #PROPERTY_TEST_DATA_PROPERTY} is set to '*' than all
-     * Triples with Literal values are used.<p>
-     * This supports all RDF-formats supported by the {@link JenaParserProvider} and
-     * {@link RdfJsonParsingProvider}. The charset is expected to be UTF-8.
-     * @param is the input stream providing the RDF test data.
-     * @param mediaType the Media-Type of the stream. MUST BE supported by
-     * the Apache Clerezza RDF parsers.
-     */
-    private static Iterator<String> createRdfDataIterator(InputStream is, MediaType mediaType) {
-        final SimpleMGraph graph = new SimpleMGraph();
-        try {
-            rdfParser.parse(graph, is, mediaType.toString());
-        } catch (UnsupportedFormatException e) {
-            Assert.fail("The MimeType '"+mediaType+"' of the parsed testData "
-                + "is not supported. This utility supports plain text files as "
-                + "as well as the RDF formats "+rdfParser.getSupportedFormats()
-                + "If your test data uses one of those formats but it was not "
-                + "correctly detected you can use the System property '"
-                + PROPERTY_TEST_DATA_TYPE + "' to manually parse the Media-Type!");
-        }
-        IOUtils.closeQuietly(is);
-        return new Iterator<String>() {
-            Iterator<Triple> it = null;
-            String next = null;
-            private String getNext(){
-                if(it == null){
-                    UriRef property;
-                    String propertyString = System.getProperty(PROPERTY_TEST_DATA_PROPERTY,DEFAULT_TEST_DATA_PROPERTY);
-                    propertyString.trim();
-                    if("*".equals(propertyString)){
-                        property = null; //wildcard
-                        log.info("Iterate over values of all Triples");
-                    } else {
-                        propertyString = NamespaceEnum.getFullName(propertyString);
-                        property = new UriRef(propertyString);
-                        log.info("Iterate over values of property {}", propertyString);
-                    }
-                    it = graph.filter(null, property, null);
-                }
-                while(it.hasNext()){
-                    Resource value = it.next().getObject();
-                    if(value instanceof Literal){
-                        return ((Literal)value).getLexicalForm();
-                    }
-                }
-                return null; //no more data
-            }
-            
-            @Override
-            public boolean hasNext() {
-                if(next == null){
-                    next = getNext();
-                }
-                return next != null;
-            }
-
-            @Override
-            public String next() {
-                if(next == null){
-                    next = getNext();
-                }
-                if(next == null){
-                    throw new NoSuchElementException("No further testData available");
-                } else {
-                    String elem = next;
-                    next = null;
-                    return elem;
-                }
-                
-            }
-
-            @Override
-            public void remove() {
-                throw new UnsupportedOperationException();
-            }
-            
-        };
+        performTest(TestSettings.fromSystemProperties());
     }
-    /**
-     * Iterator reading Content elements from the input stream. Two (ore more)
-     * empty lines are used to separate multiple content items.<p>
-     * NOTE: This iterator does not keep the whole text in-memory. Therefore
-     * it can be possible used to process test data that would not fit
-     * in-memory.
-     * @param is The input stream to read the data from
-     * @param mediaType the Media-Type - only used to parse the charset from. If
-     * no charset is specified UTF-8 is uses as default.
-     */
-    private static Iterator<String> createTextDataIterator(InputStream is, MediaType mediaType) {
-        String charsetString = mediaType.getParameters().get("charset");
-        Charset charset = Charset.forName(charsetString == null ? "UTF-8" : charsetString);
-        log.info("  ... using charset {} for parsing Text data",charset);
-        final BufferedReader reader = new BufferedReader(new InputStreamReader(is, charset));
-        return new Iterator<String>() {
-            String next = null;
-            private String getNext(){
-                String line;
-                StringBuilder data = new StringBuilder();
-                int emtptyLines = 0;
-                try {
-                    while((line = reader.readLine()) != null && emtptyLines < 2){
-                        if(line.isEmpty()){
-                            if(data.length() != 0){
-                                emtptyLines++;
-                            } //do not count empty lines at the beginning!
-                        } else {
-                            emtptyLines = 0;
-                        }
-                        data.append(line).append('\n');
-                    }
-                } catch (IOException e) {
-                    log.warn("IOException while reading from Stream",e);
-                    Assert.fail("IOException while reading from Stream");
-                }
-                return data.length() == 0 ? null : data.toString();
-            }
-            @Override
-            public boolean hasNext() {
-                if(next == null){
-                    next = getNext();
-                }
-                return next != null;
-            }
-
-            @Override
-            public String next() {
-                if(next == null){
-                    next = getNext();
-                }
-                if(next == null){
-                    throw new NoSuchElementException("No further testData available");
-                } else {
-                    String elem = next;
-                    next = null;
-                    return elem;
-                }
-            }
-
-            @Override
-            public void remove() {
-                throw new UnsupportedOperationException();
-            }
-            
-        };
-    }
-    
-    
-    
-    /* -------------------------------------------------------------
-     * Utilities for executing and tracking the concurrent Requests
-     * -------------------------------------------------------------
-     */
-
-    private class ExcutionTracker {
-        
-        
-        
-        private int maxRegistered;
-
-        
-        private int completed = 0;
-        private final Set<Request> registered = new HashSet<Request>();
-        private final List<HttpResponse> failed = Collections.synchronizedList(new ArrayList<HttpResponse>());
-        
-        private ExecutionStatistics statistics = new ExecutionStatistics();
-        
-        private ExecutorService executorService;
-        
-        protected ExcutionTracker(ExecutorService executorService){
-            this(executorService,100);
-        }
-        public ExcutionTracker(ExecutorService executorService,int maxRegistered) {
-            this.executorService = executorService;
-            this.maxRegistered = maxRegistered <= 0 ? Integer.MAX_VALUE : maxRegistered;
-        }
-        
-        public void register(Request request){
-            synchronized (registered) {
-                while(registered.size() >= maxRegistered){
-                    try {
-                        registered.wait();
-                    } catch (InterruptedException e) {
-                        //interrupped
-                    }
-                }
-                registered.add(request);
-                executorService.execute(new AsyncExecuter(request, this));
-            }
-        }
-
-        void succeed(Request request, UriRef contentItemUri, TripleCollection results,Long rtt) {
-            ExecutionMetadata em = ExecutionMetadata.parseFrom(results, (UriRef)contentItemUri);
-            results.clear(); //we no longer need the results
-            if(em != null){
-                synchronized (statistics) {
-                    statistics.addResult(em,rtt);
-                }
-            } //no executionData available ... unable to collect statistics
-            synchronized (registered) {
-                if(registered.remove(request)){
-                    completed++;
-                    registered.notifyAll();
-                }
-            }
-        }
-
-        void failed(Request request, RequestExecutor executor) {
-            synchronized (registered) {
-                failed.add(executor.getResponse());
-                if(registered.remove(request)){
-                    completed++;
-                    registered.notifyAll();
-                }
-            }
-        }
-        
-        public int getNumPending(){
-            synchronized (registered) {
-                return registered.size();
-            }
-        }
-        /**
-         * Live list of the failed requests. Non basic access MUST BE
-         * syncronized on the list while the requests are still pending as newly
-         * failed requests will modify this list
-         * @return
-         */
-        public List<HttpResponse> getFailed(){
-            return failed;
-        }
-        public int getNumCompleted(){
-            return completed;
-        }
-        public void wait(int seconds){
-            try {
-                executorService.awaitTermination(seconds, TimeUnit.SECONDS);
-            } catch (InterruptedException e) {
-            }
-        }
-        public void printStatistics(){
-            log.info("Statistics:");
-            synchronized (statistics) {
-                log.info("Chain:");
-                log.info("  Round Trip Time (Server + Transfer + Client):");
-                if(statistics.getNumRtt() < 1){
-                    log.info("    - not available");
-                } else {
-                    log.info("     max: {}ms | min: {}ms | avr: {}ms over {} requests",
-                        new Object[]{statistics.getMaxRtt(),
-                                     statistics.getMinRtt(),
-                                     statistics.getAverageRtt(),
-                                     statistics.getNumRtt()});
-                }
-                log.info("  processing time (server side)");
-                if(statistics.getNumSamples() < 1){
-                    log.info("    - not available. Make shure the used "
-                        + "EnhancementJobManager supports ExecutionMetadata!");
-                } else {
-                    log.info("     max: {}ms | min: {}ms | avr: {}ms over {} requests",
-                        new Object[]{statistics.getMaxDuration(),
-                                     statistics.getMinDuration(),
-                                     statistics.getAverageDuration(),
-                                     statistics.getNumSamples()});
-                    log.info("Enhancement Engines");
-                    for(String name :statistics.getEngineNames()){
-                        log.info("  {}: max: {}ms | min: {}ms | avr: {}ms over {} requests",
-                            new Object[]{name,
-                                         statistics.getMaxDuration(name),
-                                         statistics.getMinDuration(name),
-                                         statistics.getAverage(name),
-                                         statistics.getNumSamples(name)});
-                    }
-                }
-            }
-        }
-    }
-    private class AsyncExecuter implements Runnable{
-
-        private Request request;
-        private ExcutionTracker tracker;
-        protected AsyncExecuter(Request request, ExcutionTracker tracker){
-            this.request = request;
-            this.tracker = tracker;
-        }
-        @Override
-        public void run() {
-            RequestExecutor executor = new RequestExecutor(pooledHttpClient);
-            long start = System.currentTimeMillis();
-            Long rtt;
-            try {
-                executor.execute(request).assertStatus(200);
-                rtt = System.currentTimeMillis()-start;
-            } catch (Throwable e) {
-                log.warn("Error while sending Request ",e);
-                tracker.failed(request,executor);
-                rtt = null;
-            }
-            IndexedMGraph graph = new IndexedMGraph();
-            try {
-                rdfParser.parse(graph,executor.getStream(), executor.getContentType().getMimeType());
-            }catch (Exception e) {
-                Assert.fail("Unable to parse RDF data from Response with Content-Type "
-                    + executor.getContentType().getMimeType()+" ( "+e.getClass().getSimpleName()
-                    + ": "+e.getMessage()+")");
-            }
-//            log.info("Content:\n{}",executor.getContent());
-//            
-//            log.info("Triples");
-//            for(Triple t : graph){
-//                log.info(t.toString());
-//            }
-            Iterator<Triple> ciIt = graph.filter(null, Properties.ENHANCER_EXTRACTED_FROM, null);
-            Assert.assertTrue("Enhancement Results do not caontain a single Enhancement",ciIt.hasNext());
-            Resource contentItemUri = ciIt.next().getObject();
-            Assert.assertTrue("ContentItem URI is not an UriRef but an instance of "
-                    + contentItemUri.getClass().getSimpleName(), contentItemUri instanceof UriRef);
-            tracker.succeed(request,(UriRef)contentItemUri,graph,rtt);
-        }
-    }
-
-    private class ExecutionStatistics {
-        private int numSamples;
-        private long maxDuration = -1;
-        private long minDuration = Long.MAX_VALUE;
-        private long sumDuration = 0;
-        private int numRtt;
-        private long maxRtt = -1;
-        private long minRtt = Long.MAX_VALUE;
-        private long sumRtt = 0;
-        
-        private Map<String, long[]> engineStats = new TreeMap<String,long[]>();
-        
-        void addResult(ExecutionMetadata em,Long roundTripTime){
-            Long durationNumber = em.getChainExecution().getDuration();
-            long duration;
-            if(durationNumber != null){
-                duration = durationNumber.longValue();
-                if(duration > maxDuration){
-                    maxDuration = duration;
-                }
-                if(duration < minDuration){
-                    minDuration = duration;
-                }
-                sumDuration = sumDuration+duration;
-                numSamples++;
-            }
-            if(roundTripTime != null){
-                long rtt = roundTripTime;
-                if(rtt > maxRtt){
-                    maxRtt = rtt;
-                }
-                if(rtt < minRtt){
-                    minRtt = rtt;
-                }
-                sumRtt = sumRtt+rtt;
-                numRtt++;
-            }
-            for(Entry<String,Execution> ex : em.getEngineExecutions().entrySet()){
-                long[] stats = engineStats.get(ex.getKey());
-                if(stats == null){
-                    stats = new long[]{-1L,Long.MAX_VALUE,0L,0L};
-                    engineStats.put(ex.getKey(), stats);
-                }
-                durationNumber = ex.getValue().getDuration();
-                if(durationNumber != null){
-                    duration = durationNumber.longValue();
-                    if(duration > stats[0]){ //max duration
-                        stats[0] = duration;
-                    }
-                    if(duration < stats[1]){ //min duration
-                        stats[1] = duration;
-                    }
-                    stats[2] = stats[2]+duration; //sum duration
-                    stats[3]++; //num Samples
-                }
-            }
-        }
-        
-        
-        public Set<String> getEngineNames(){
-            return engineStats.keySet();
-        }
-        public Long getMaxDuration(){
-            return maxDuration < 0 ? null : maxDuration;
-        }
-        public Long getMinDuration(){
-            return minDuration == Long.MAX_VALUE ? null : minDuration;
-        }
-        public Long getAverageDuration(){
-            return sumDuration <= 0 && numSamples <= 0 ? null : Math.round((double)sumDuration/(double)numSamples);
-        }
-        public int getNumSamples(){
-            return numSamples;
-        }
-        public Long getMaxRtt(){
-            return maxRtt < 0 ? null : maxRtt;
-        }
-        public Long getMinRtt(){
-            return minRtt == Long.MAX_VALUE ? null : minRtt;
-        }
-        public Long getAverageRtt(){
-            return sumRtt <= 0 && numRtt <= 0 ? null : Math.round((double)sumRtt/(double)numRtt);
-        }
-        public int getNumRtt(){
-            return numRtt;
-        }
-        public Long getMaxDuration(String engine){
-            long[] stats = engineStats.get(engine);
-            return stats == null ? null : stats[0];
-        }
-        public Long getMinDuration(String engine){
-            long[] stats = engineStats.get(engine);
-            return stats == null ? null : stats[1];
-        }
-        public Long getAverage(String engine){
-            long[] stats = engineStats.get(engine);
-            return stats == null && stats[2] <= 0 && stats[3] <= 0 ? 
-                    null : Math.round((double)stats[2]/(double)stats[3]);
-        }
-        public int getNumSamples(String engine){
-            long[] stats = engineStats.get(engine);
-            return stats == null ? null : (int)stats[3];
-        }
-        
-    }
-    
     
 }

Added: stanbol/trunk/integration-tests/src/test/java/org/apache/stanbol/enhancer/it/MultiThreadedTestBase.java
URL: http://svn.apache.org/viewvc/stanbol/trunk/integration-tests/src/test/java/org/apache/stanbol/enhancer/it/MultiThreadedTestBase.java?rev=1414154&view=auto
==============================================================================
--- stanbol/trunk/integration-tests/src/test/java/org/apache/stanbol/enhancer/it/MultiThreadedTestBase.java (added)
+++ stanbol/trunk/integration-tests/src/test/java/org/apache/stanbol/enhancer/it/MultiThreadedTestBase.java Tue Nov 27 12:38:21 2012
@@ -0,0 +1,833 @@
+package org.apache.stanbol.enhancer.it;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.Map.Entry;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.zip.GZIPInputStream;
+
+import javax.ws.rs.core.MediaType;
+
+import org.apache.clerezza.rdf.core.Literal;
+import org.apache.clerezza.rdf.core.Resource;
+import org.apache.clerezza.rdf.core.Triple;
+import org.apache.clerezza.rdf.core.TripleCollection;
+import org.apache.clerezza.rdf.core.UriRef;
+import org.apache.clerezza.rdf.core.impl.SimpleMGraph;
+import org.apache.clerezza.rdf.core.serializedform.Parser;
+import org.apache.clerezza.rdf.core.serializedform.SupportedFormat;
+import org.apache.clerezza.rdf.core.serializedform.UnsupportedFormatException;
+import org.apache.clerezza.rdf.jena.parser.JenaParserProvider;
+import org.apache.clerezza.rdf.rdfjson.parser.RdfJsonParsingProvider;
+import org.apache.commons.compress.archivers.ArchiveEntry;
+import org.apache.commons.compress.archivers.zip.ZipArchiveInputStream;
+import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.params.ClientPNames;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.impl.conn.PoolingClientConnectionManager;
+import org.apache.http.params.BasicHttpParams;
+import org.apache.http.params.CoreConnectionPNames;
+import org.apache.http.params.CoreProtocolPNames;
+import org.apache.stanbol.commons.indexedgraph.IndexedMGraph;
+import org.apache.stanbol.commons.testing.http.Request;
+import org.apache.stanbol.commons.testing.http.RequestExecutor;
+import org.apache.stanbol.enhancer.servicesapi.helper.execution.Execution;
+import org.apache.stanbol.enhancer.servicesapi.helper.execution.ExecutionMetadata;
+import org.apache.stanbol.enhancer.servicesapi.rdf.Properties;
+import org.apache.stanbol.entityhub.servicesapi.defaults.NamespaceEnum;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Base class for multi threaded tests
+ * @author westei
+ *
+ */
+public abstract class MultiThreadedTestBase extends EnhancerTestBase {
+
+    /**
+     * The name of the Enhancement Chain this test runs against. If not defined
+     * the default chain is used.
+     */
+    public static final String PROPERTY_CHAIN = "stanbol.it.multithreadtest.chain";
+    /**
+     * The reference to the test data. Can be a File, a Resource available via the
+     * Classpath or an URL. This also supports compressed files. In case of ZIP
+     * only the first entry is processed.
+     */
+    public static final String PROPERTY_TEST_DATA = "stanbol.it.multithreadtest.data";
+    /**
+     * Can be used to explicitly parse the Media-Type of the test data. If not set
+     * the Media-Type is parsed based on the file extension.
+     */
+    public static final String PROPERTY_TEST_DATA_TYPE = "stanbol.it.multithreadtest.media-type";
+    /**
+     * The RDF property used to filter triples their values are used as texts for
+     * Enhancer requests. Only used of test data are provided as RDF<p>
+     * Note:<ul>
+     * <li> Only triples where their Object are Literals are used
+     * <li> the default property is "http://dbpedia.org/ontology/abstract"
+     * <li> if set to "*" than all triples with literal values are used.
+     * </ul>
+     */
+    public static final String PROPERTY_TEST_DATA_PROPERTY = "stanbol.it.multithreadtest.data-property";
+    /**
+     * The maximum number of concurrent requests
+     */
+    public static final String PROPERTY_THREADS = "stanbol.it.multithreadtest.threads";
+    /**
+     * The maximum number of requests. Can be used to limit the number of requests if
+     * the provided data do contain more samples.
+     */
+    public static final String PROPERTY_REQUESTS = "stanbol.it.multithreadtest.requests";
+    /**
+     * The RDF serialisation used as Accept header for Stanbol Enhancer requests
+     */
+    public static final String PROPERTY_RDF_FORMAT = "stanbol.it.multithreadtest.rdf-format";
+    protected static final Logger log = LoggerFactory.getLogger(MultiThreadedTest.class);
+    public static final int DEFAULT_NUM_THREADS = 5;
+    public static final int DEFAULT_NUM_REQUESTS = 500;
+    public static final String DEFAULT_RDF_FORMAT = SupportedFormat.RDF_JSON;
+    public static final String DEFAULT_TEST_DATA = "10k_long_abstracts_en.nt.bz2";
+    public static final String DEFAULT_TEST_DATA_PROPERTY = "http://dbpedia.org/ontology/abstract";
+    private static final String[] ENABLE_EXECUTION_METADATA = new String[]{"executionmetadata","true"};
+    protected static Parser rdfParser;
+    protected DefaultHttpClient pooledHttpClient;
+    private BasicHttpParams httpParams;
+    private PoolingClientConnectionManager connectionManager;
+
+    protected MultiThreadedTestBase() {
+        this(new String[]{});
+    }
+
+    protected MultiThreadedTestBase(String... assertEngines) {
+        super(null,assertEngines);
+        //set the endpoint to the default
+        setEndpoint(null, ENABLE_EXECUTION_METADATA);
+    }
+    
+    @BeforeClass
+    public static void init() throws IOException {
+        //init the RDF parser
+        rdfParser = new Parser();
+        rdfParser.bindParsingProvider(new JenaParserProvider());
+        rdfParser.bindParsingProvider(new RdfJsonParsingProvider());
+        //init theTestData
+    }
+
+    @AfterClass
+    public static final void cleanup() {
+    }
+
+    /**
+     * Helper method that initialises the test data based on the parsed parameter
+     * @param settings the settings of the Test.
+     * @return the Iterator over the contents in the test data
+     * @throws IOException on any error while accessing the parsed test data
+     */
+    private static Iterator<String> initTestData(TestSettings settings) throws IOException {
+        log.info("Read Testdata from '{}'",settings.getTestData());
+        File testFile = new File(settings.getTestData());
+        InputStream is = null;
+        if(testFile.isFile()){
+            log.info(" ... init from File");
+            is = new FileInputStream(testFile);
+        } 
+        if(is == null) {
+            is = MultiThreadedTest.class.getClassLoader().getResourceAsStream(settings.getTestData());
+        }
+        if(is == null){
+           is = ClassLoader.getSystemResourceAsStream(settings.getTestData());
+        }
+        if(is == null){
+            try {
+              is = new URL(settings.getTestData()).openStream();
+              log.info(" ... init from URL");
+            }catch (MalformedURLException e) {
+                //not a URL
+            }
+        } else {
+            log.info(" ... init via Classpath");
+        }
+        Assert.assertNotNull("Unable to load the parsed TestData '"
+            +settings.getTestData()+"'!", is);
+        log.info("  - InputStream: {}", is == null ? null: is.getClass().getSimpleName());
+        
+        String name = FilenameUtils.getName(settings.getTestData());
+        if ("gz".equalsIgnoreCase(FilenameUtils.getExtension(name))) {
+            is = new GZIPInputStream(is);
+            name = FilenameUtils.removeExtension(name);
+            log.debug("   - from GZIP Archive");
+        } else if ("bz2".equalsIgnoreCase(FilenameUtils.getExtension(name))) {
+            is = new BZip2CompressorInputStream(is);
+            name = FilenameUtils.removeExtension(name);
+            log.debug("   - from BZip2 Archive");
+        } else if ("zip".equalsIgnoreCase(FilenameUtils.getExtension(name))) {
+            ZipArchiveInputStream zipin = new ZipArchiveInputStream(is);
+            ArchiveEntry entry = zipin.getNextEntry();
+            log.info("For ZIP archives only the 1st Entry will be processed!");
+            name = FilenameUtils.getName(entry.getName());
+            log.info("  - processed Entry: {}",entry.getName());
+        } else { // else uncompressed data ...
+            log.info("  - uncompressed source: {}",name);
+        }
+        MediaType mediaType;
+        if(settings.getTestDataMediaType() != null){
+            mediaType = MediaType.valueOf(settings.getTestDataMediaType());
+        } else { //parse based on extension
+            String ext = FilenameUtils.getExtension(name);
+            if("txt".equalsIgnoreCase(ext)){
+                mediaType = MediaType.TEXT_PLAIN_TYPE;
+            } else if("rdf".equalsIgnoreCase(ext)){
+                mediaType = MediaType.valueOf(SupportedFormat.RDF_XML);
+            } else if("xml".equalsIgnoreCase(ext)){
+                mediaType = MediaType.valueOf(SupportedFormat.RDF_XML);
+            } else if("ttl".equalsIgnoreCase(ext)){
+                mediaType = MediaType.valueOf(SupportedFormat.TURTLE);
+            } else if("n3".equalsIgnoreCase(ext)){
+                mediaType = MediaType.valueOf(SupportedFormat.N3);
+            } else if("nt".equalsIgnoreCase(ext)){
+                mediaType = MediaType.valueOf(SupportedFormat.N_TRIPLE);
+            } else if("json".equalsIgnoreCase(ext)){
+                mediaType = MediaType.valueOf(SupportedFormat.RDF_JSON);
+            } else if(name.indexOf('.')<0){ //no extension
+                mediaType = MediaType.TEXT_PLAIN_TYPE; //try plain text
+            } else {
+                log.info("Unkown File Extension {} for resource name {}",
+                    ext,name);
+                mediaType = null;
+            }
+        }
+        Assert.assertNotNull("Unable to detect MediaType for Resource '"
+            + name+"'. Please use the property '"+PROPERTY_TEST_DATA_TYPE
+            + "' to manually parse the MediaType!", mediaType);
+        
+        log.info("  - Media-Type: {}", mediaType);
+        //now init the iterator for the test data
+        return mediaType.isCompatible(MediaType.TEXT_PLAIN_TYPE) ?
+            createTextDataIterator(is, mediaType) :
+            createRdfDataIterator(is, mediaType,settings.getContentProperty());
+    }
+
+    /**
+     * Iterator implementation that parses an RDF graph from the parsed
+     * {@link InputStream}. The RDF data are loaded in-memory. Because of this
+     * only test data that fit in-memory can be used. <p>
+     * Literal values (objects) of the {@link #PROPERTY_TEST_DATA_PROPERTY} are
+     * used as data. If this property is not present {@link #DEFAULT_TEST_DATA_PROPERTY}
+     * is used. If {@link #PROPERTY_TEST_DATA_PROPERTY} is set to '*' than all
+     * Triples with Literal values are used.<p>
+     * This supports all RDF-formats supported by the {@link JenaParserProvider} and
+     * {@link RdfJsonParsingProvider}. The charset is expected to be UTF-8.
+     * @param is the input stream providing the RDF test data.
+     * @param mediaType the Media-Type of the stream. MUST BE supported by
+     * the Apache Clerezza RDF parsers.
+     */
+    private static Iterator<String> createRdfDataIterator(InputStream is, MediaType mediaType, final String propertyString) {
+        final SimpleMGraph graph = new SimpleMGraph();
+        try {
+            rdfParser.parse(graph, is, mediaType.toString());
+        } catch (UnsupportedFormatException e) {
+            Assert.fail("The MimeType '"+mediaType+"' of the parsed testData "
+                + "is not supported. This utility supports plain text files as "
+                + "as well as the RDF formats "+rdfParser.getSupportedFormats()
+                + "If your test data uses one of those formats but it was not "
+                + "correctly detected you can use the System property '"
+                + PROPERTY_TEST_DATA_TYPE + "' to manually parse the Media-Type!");
+        }
+        IOUtils.closeQuietly(is);
+        return new Iterator<String>() {
+            Iterator<Triple> it = null;
+            String next = null;
+            private String getNext(){
+                if(it == null){
+                    UriRef property;
+                    propertyString.trim();
+                    if("*".equals(propertyString)){
+                        property = null; //wildcard
+                        log.info("Iterate over values of all Triples");
+                    } else {
+                        property = new UriRef(NamespaceEnum.getFullName(propertyString));
+                        log.info("Iterate over values of property {}", propertyString);
+                    }
+                    it = graph.filter(null, property, null);
+                }
+                while(it.hasNext()){
+                    Resource value = it.next().getObject();
+                    if(value instanceof Literal){
+                        return ((Literal)value).getLexicalForm();
+                    }
+                }
+                return null; //no more data
+            }
+            
+            @Override
+            public boolean hasNext() {
+                if(next == null){
+                    next = getNext();
+                }
+                return next != null;
+            }
+    
+            @Override
+            public String next() {
+                if(next == null){
+                    next = getNext();
+                }
+                if(next == null){
+                    throw new NoSuchElementException("No further testData available");
+                } else {
+                    String elem = next;
+                    next = null;
+                    return elem;
+                }
+                
+            }
+    
+            @Override
+            public void remove() {
+                throw new UnsupportedOperationException();
+            }
+            
+        };
+    }
+
+    @Before
+    public void initialiseHttpClient() {
+        if(this.pooledHttpClient == null){ //init for the first test
+            httpParams = new BasicHttpParams();
+            httpParams.setParameter(CoreProtocolPNames.USER_AGENT, "Stanbol Integration Test");
+            httpParams.setBooleanParameter(ClientPNames.HANDLE_REDIRECTS,true);
+            httpParams.setIntParameter(ClientPNames.MAX_REDIRECTS,3);
+            httpParams.setBooleanParameter(CoreConnectionPNames.SO_KEEPALIVE,true);
+    
+            connectionManager = new PoolingClientConnectionManager();
+            connectionManager.setMaxTotal(20);
+            connectionManager.setDefaultMaxPerRoute(20);
+    
+            pooledHttpClient = new DefaultHttpClient(connectionManager,httpParams);
+        }
+    }
+
+    protected void performTest(TestSettings settings) throws Exception {
+        Iterator<String> testDataIterator = initTestData(settings);
+        if(settings.getChain() != null){
+            setEndpoint(getChainEndpoint(settings.getChain()),ENABLE_EXECUTION_METADATA);
+        }
+        log.info("Start Multi Thread testing of max. {} requests using {} threads "
+            + "on Endpoint {}", new Object[]{
+            settings.getMaxRequests(),settings.getNumThreads(),getEndpoint()});
+        ExcutionTracker tracker = new ExcutionTracker(
+            Executors.newFixedThreadPool(settings.getNumThreads()),
+            Math.max(100, settings.getNumThreads()*5));
+        String rdfFormat = System.getProperty(PROPERTY_RDF_FORMAT,DEFAULT_RDF_FORMAT);
+        int testNum;
+        for(testNum = 0;testDataIterator.hasNext() && testNum < settings.getMaxRequests(); testNum++){
+            String test = testDataIterator.next();
+            Request request = builder.buildPostRequest(getEndpoint())
+                    .withHeader("Accept",rdfFormat)
+                    .withContent(test);
+            tracker.register(request,test);
+            if(testNum%100 == 0){
+                log.info("  ... sent {} Requests ({} finished, {} pending, {} failed",
+                    new Object[]{testNum,tracker.getNumCompleted(),
+                                 tracker.getNumPending(),tracker.getFailed().size()});
+            }
+        }
+        log.info("> All {} requests sent!",testNum);
+        log.info("  ... wait for all requests to complete");
+        while(tracker.getNumPending() > 0){
+            tracker.wait(3);
+            log.info("  ... {} finished, {} pending, {} failed",
+                new Object[]{tracker.getNumCompleted(),tracker.getNumPending(),tracker.getFailed().size()});
+        }
+        log.info("Multi Thread testing of {} requests (failed: {}) using {} threads completed",
+            new Object[]{tracker.getNumCompleted(),tracker.getFailed().size(),settings.getNumThreads()});
+        tracker.printStatistics();
+        log.warn("Content(s) of Faild tests:");
+        int i=1;
+        for(Entry<HttpResponse,String> failed :tracker.getFailed().entrySet()){
+            log.warn("Failed ({}):",i);
+            log.warn("  > Status: {}",failed.getKey().getStatusLine());
+            log.warn("  > Content: {}",failed.getValue());
+            i++;
+        }
+        Assert.assertTrue(tracker.getFailed()+"/"+settings.getNumThreads()+" failed", tracker.getFailed().isEmpty());
+        tracker = null;
+    }
+    
+    @After
+    public final void close() {
+        setEndpoint(null,ENABLE_EXECUTION_METADATA); //reset the endpoint to the default
+        httpParams = null;
+        pooledHttpClient = null;
+        connectionManager.shutdown();
+        connectionManager = null;
+    }
+
+    public static class TestSettings {
+        private String endpoint = null;
+        private Integer maxRequests = DEFAULT_NUM_REQUESTS;
+        private Integer numThreads = DEFAULT_NUM_THREADS;
+        private String rdfFormat = DEFAULT_RDF_FORMAT;
+        private String testData = DEFAULT_TEST_DATA;
+        private String testDataMediaType = null;
+        private String propertyString = DEFAULT_TEST_DATA_PROPERTY;
+        
+        public static TestSettings fromSystemProperties(){
+          TestSettings settings = new TestSettings();
+          settings.setMaxRequests(Integer.getInteger(PROPERTY_REQUESTS));
+          settings.setNumThreads(Integer.getInteger(PROPERTY_THREADS));
+          settings.setTestData(System.getProperty(PROPERTY_TEST_DATA), 
+              System.getProperty(PROPERTY_TEST_DATA_TYPE));
+          settings.setChain(System.getProperty(PROPERTY_CHAIN));
+          return settings;
+        }
+        public String getChain() {
+            return endpoint;
+        }
+        public void setChain(String endpoint) {
+            this.endpoint = endpoint;
+        }
+        public Integer getMaxRequests() {
+            return maxRequests;
+        }
+        public void setMaxRequests(Integer maxRequests) {
+            if(maxRequests == null || maxRequests < 1){
+                this.maxRequests = DEFAULT_NUM_REQUESTS;
+            } else {
+                this.maxRequests = maxRequests;
+            }
+        }
+        public void setNumThreads(Integer numThreads) {
+            if(numThreads == null || numThreads < 1){
+                this.numThreads = DEFAULT_NUM_THREADS;
+            } else {
+                this.numThreads = numThreads;
+            }
+        }
+        public Integer getNumThreads() {
+            return numThreads;
+        }
+        public String getRdfFormat() {
+            return rdfFormat;
+        }
+        public void setRdfFormat(String rdfFormat) {
+            if(rdfFormat == null){
+                this.rdfFormat = DEFAULT_RDF_FORMAT;
+            } else {
+                this.rdfFormat = rdfFormat;
+            }
+        }
+        public String getTestData() {
+            return testData;
+        }
+        /**
+         * setter for the test data
+         * @param testData source for the test. A file, URL or Classpath resource
+         * @param testdataMediaType the media type or <code>null</code> to detect
+         * it based on the name of the testData resource.
+         */
+        public void setTestData(String testData, String testdataMediaType) {
+            if(testData == null){
+                this.testData = DEFAULT_TEST_DATA;
+                this.testDataMediaType = null;
+            } else {
+                this.testData = testData;
+                this.testDataMediaType = testdataMediaType;
+            }
+        }
+        public String getTestDataMediaType() {
+            return testDataMediaType;
+        }
+        public void setContentProperty(String propertyString) {
+            if(propertyString == null || propertyString.isEmpty()){
+                this.propertyString = DEFAULT_TEST_DATA_PROPERTY;
+            } else {
+                this.propertyString = propertyString;
+            }
+        }
+        public String getContentProperty() {
+            return propertyString;
+        }
+    }
+    /* -------------------------------------------------------------
+     * Utilities for reading the Test Data from the defined source
+     * -------------------------------------------------------------
+     */
+
+    /**
+     * Iterator reading Content elements from the input stream. Two (ore more)
+     * empty lines are used to separate multiple content items.<p>
+     * NOTE: This iterator does not keep the whole text in-memory. Therefore
+     * it can be possible used to process test data that would not fit
+     * in-memory.
+     * @param is The input stream to read the data from
+     * @param mediaType the Media-Type - only used to parse the charset from. If
+     * no charset is specified UTF-8 is uses as default.
+     */
+    private static Iterator<String> createTextDataIterator(InputStream is, MediaType mediaType) {
+        String charsetString = mediaType.getParameters().get("charset");
+        Charset charset = Charset.forName(charsetString == null ? "UTF-8" : charsetString);
+        log.info("  ... using charset {} for parsing Text data",charset);
+        final BufferedReader reader = new BufferedReader(new InputStreamReader(is, charset));
+        return new Iterator<String>() {
+            String next = null;
+            private String getNext(){
+                String line;
+                StringBuilder data = new StringBuilder();
+                int emtptyLines = 0;
+                try {
+                    while((line = reader.readLine()) != null && emtptyLines < 2){
+                        if(line.isEmpty()){
+                            if(data.length() != 0){
+                                emtptyLines++;
+                            } //do not count empty lines at the beginning!
+                        } else {
+                            emtptyLines = 0;
+                        }
+                        data.append(line).append('\n');
+                    }
+                } catch (IOException e) {
+                    log.warn("IOException while reading from Stream",e);
+                    Assert.fail("IOException while reading from Stream");
+                }
+                return data.length() == 0 ? null : data.toString();
+            }
+            @Override
+            public boolean hasNext() {
+                if(next == null){
+                    next = getNext();
+                }
+                return next != null;
+            }
+    
+            @Override
+            public String next() {
+                if(next == null){
+                    next = getNext();
+                }
+                if(next == null){
+                    throw new NoSuchElementException("No further testData available");
+                } else {
+                    String elem = next;
+                    next = null;
+                    return elem;
+                }
+            }
+    
+            @Override
+            public void remove() {
+                throw new UnsupportedOperationException();
+            }
+            
+        };
+    }
+    /* -------------------------------------------------------------
+     * Utilities for executing and tracking the concurrent Requests
+     * -------------------------------------------------------------
+     */
+
+    protected class ExcutionTracker {
+        
+        
+        
+        private int maxRegistered;
+
+        
+        private int completed = 0;
+        private final Set<Request> registered = new HashSet<Request>();
+        private final Map<HttpResponse,String> failed = Collections.synchronizedMap(new LinkedHashMap<HttpResponse,String>());
+        
+        private ExecutionStatistics statistics = new ExecutionStatistics();
+        
+        private ExecutorService executorService;
+        
+        protected ExcutionTracker(ExecutorService executorService){
+            this(executorService,100);
+        }
+        public ExcutionTracker(ExecutorService executorService,int maxRegistered) {
+            this.executorService = executorService;
+            this.maxRegistered = maxRegistered <= 0 ? Integer.MAX_VALUE : maxRegistered;
+        }
+        
+        public void register(Request request,String content){
+            synchronized (registered) {
+                while(registered.size() >= maxRegistered){
+                    try {
+                        registered.wait();
+                    } catch (InterruptedException e) {
+                        //interrupped
+                    }
+                }
+                registered.add(request);
+                executorService.execute(new AsyncExecuter(content,request, this));
+            }
+        }
+
+        void succeed(Request request, UriRef contentItemUri, TripleCollection results,Long rtt) {
+            ExecutionMetadata em = ExecutionMetadata.parseFrom(results, (UriRef)contentItemUri);
+            results.clear(); //we no longer need the results
+            if(em != null){
+                synchronized (statistics) {
+                    statistics.addResult(em,rtt);
+                }
+            } //no executionData available ... unable to collect statistics
+            synchronized (registered) {
+                if(registered.remove(request)){
+                    completed++;
+                    registered.notifyAll();
+                }
+            }
+        }
+
+        void failed(Request request, String content,RequestExecutor executor) {
+            synchronized (registered) {
+                failed.put(executor.getResponse(),content);
+                if(registered.remove(request)){
+                    completed++;
+                    registered.notifyAll();
+                }
+            }
+        }
+        
+        public int getNumPending(){
+            synchronized (registered) {
+                return registered.size();
+            }
+        }
+        /**
+         * Live list of the failed requests. Non basic access MUST BE
+         * syncronized on the list while the requests are still pending as newly
+         * failed requests will modify this list
+         * @return
+         */
+        public Map<HttpResponse,String> getFailed(){
+            return failed;
+        }
+        public int getNumCompleted(){
+            return completed;
+        }
+        public void wait(int seconds){
+            try {
+                executorService.awaitTermination(seconds, TimeUnit.SECONDS);
+            } catch (InterruptedException e) {
+            }
+        }
+        public void printStatistics(){
+            log.info("Statistics:");
+            synchronized (statistics) {
+                log.info("Chain:");
+                log.info("  Round Trip Time (Server + Transfer + Client):");
+                if(statistics.getNumRtt() < 1){
+                    log.info("    - not available");
+                } else {
+                    log.info("     max: {}ms | min: {}ms | avr: {}ms over {} requests",
+                        new Object[]{statistics.getMaxRtt(),
+                                     statistics.getMinRtt(),
+                                     statistics.getAverageRtt(),
+                                     statistics.getNumRtt()});
+                }
+                log.info("  processing time (server side)");
+                if(statistics.getNumSamples() < 1){
+                    log.info("    - not available. Make shure the used "
+                        + "EnhancementJobManager supports ExecutionMetadata!");
+                } else {
+                    log.info("     max: {}ms | min: {}ms | avr: {}ms over {} requests",
+                        new Object[]{statistics.getMaxDuration(),
+                                     statistics.getMinDuration(),
+                                     statistics.getAverageDuration(),
+                                     statistics.getNumSamples()});
+                    log.info("Enhancement Engines");
+                    for(String name :statistics.getEngineNames()){
+                        log.info("  {}: max: {}ms | min: {}ms | avr: {}ms over {} requests",
+                            new Object[]{name,
+                                         statistics.getMaxDuration(name),
+                                         statistics.getMinDuration(name),
+                                         statistics.getAverage(name),
+                                         statistics.getNumSamples(name)});
+                    }
+                }
+            }
+        }
+    }
+    protected class AsyncExecuter implements Runnable{
+
+        private Request request;
+        private ExcutionTracker tracker;
+        private String content;
+        protected AsyncExecuter(String content, Request request, ExcutionTracker tracker){
+            this.content = content;
+            this.request = request;
+            this.tracker = tracker;
+        }
+        @Override
+        public void run() {
+            RequestExecutor executor = new RequestExecutor(pooledHttpClient);
+            long start = System.currentTimeMillis();
+            Long rtt;
+            try {
+                executor.execute(request).assertStatus(200);
+                content = null; //do not store content for successfull resutls
+                rtt = System.currentTimeMillis()-start;
+            } catch (Throwable e) {
+                log.warn("Error while sending Request ",e);
+                tracker.failed(request,content,executor);
+                rtt = null;
+            }
+            IndexedMGraph graph = new IndexedMGraph();
+            try {
+                rdfParser.parse(graph,executor.getStream(), executor.getContentType().getMimeType());
+            }catch (Exception e) {
+                Assert.fail("Unable to parse RDF data from Response with Content-Type "
+                    + executor.getContentType().getMimeType()+" ( "+e.getClass().getSimpleName()
+                    + ": "+e.getMessage()+")");
+            }
+//            log.info("Content:\n{}",executor.getContent());
+//            
+//            log.info("Triples");
+//            for(Triple t : graph){
+//                log.info(t.toString());
+//            }
+            Iterator<Triple> ciIt = graph.filter(null, Properties.ENHANCER_EXTRACTED_FROM, null);
+            Assert.assertTrue("Enhancement Results do not caontain a single Enhancement",ciIt.hasNext());
+            Resource contentItemUri = ciIt.next().getObject();
+            Assert.assertTrue("ContentItem URI is not an UriRef but an instance of "
+                    + contentItemUri.getClass().getSimpleName(), contentItemUri instanceof UriRef);
+            tracker.succeed(request,(UriRef)contentItemUri,graph,rtt);
+        }
+    }
+
+    protected class ExecutionStatistics {
+        private int numSamples;
+        private long maxDuration = -1;
+        private long minDuration = Long.MAX_VALUE;
+        private long sumDuration = 0;
+        private int numRtt;
+        private long maxRtt = -1;
+        private long minRtt = Long.MAX_VALUE;
+        private long sumRtt = 0;
+        
+        private Map<String, long[]> engineStats = new TreeMap<String,long[]>();
+        
+        void addResult(ExecutionMetadata em,Long roundTripTime){
+            Long durationNumber = em.getChainExecution().getDuration();
+            long duration;
+            if(durationNumber != null){
+                duration = durationNumber.longValue();
+                if(duration > maxDuration){
+                    maxDuration = duration;
+                }
+                if(duration < minDuration){
+                    minDuration = duration;
+                }
+                sumDuration = sumDuration+duration;
+                numSamples++;
+            }
+            if(roundTripTime != null){
+                long rtt = roundTripTime;
+                if(rtt > maxRtt){
+                    maxRtt = rtt;
+                }
+                if(rtt < minRtt){
+                    minRtt = rtt;
+                }
+                sumRtt = sumRtt+rtt;
+                numRtt++;
+            }
+            for(Entry<String,Execution> ex : em.getEngineExecutions().entrySet()){
+                long[] stats = engineStats.get(ex.getKey());
+                if(stats == null){
+                    stats = new long[]{-1L,Long.MAX_VALUE,0L,0L};
+                    engineStats.put(ex.getKey(), stats);
+                }
+                durationNumber = ex.getValue().getDuration();
+                if(durationNumber != null){
+                    duration = durationNumber.longValue();
+                    if(duration > stats[0]){ //max duration
+                        stats[0] = duration;
+                    }
+                    if(duration < stats[1]){ //min duration
+                        stats[1] = duration;
+                    }
+                    stats[2] = stats[2]+duration; //sum duration
+                    stats[3]++; //num Samples
+                }
+            }
+        }
+        
+        
+        public Set<String> getEngineNames(){
+            return engineStats.keySet();
+        }
+        public Long getMaxDuration(){
+            return maxDuration < 0 ? null : maxDuration;
+        }
+        public Long getMinDuration(){
+            return minDuration == Long.MAX_VALUE ? null : minDuration;
+        }
+        public Long getAverageDuration(){
+            return sumDuration <= 0 && numSamples <= 0 ? null : Math.round((double)sumDuration/(double)numSamples);
+        }
+        public int getNumSamples(){
+            return numSamples;
+        }
+        public Long getMaxRtt(){
+            return maxRtt < 0 ? null : maxRtt;
+        }
+        public Long getMinRtt(){
+            return minRtt == Long.MAX_VALUE ? null : minRtt;
+        }
+        public Long getAverageRtt(){
+            return sumRtt <= 0 && numRtt <= 0 ? null : Math.round((double)sumRtt/(double)numRtt);
+        }
+        public int getNumRtt(){
+            return numRtt;
+        }
+        public Long getMaxDuration(String engine){
+            long[] stats = engineStats.get(engine);
+            return stats == null ? null : stats[0];
+        }
+        public Long getMinDuration(String engine){
+            long[] stats = engineStats.get(engine);
+            return stats == null ? null : stats[1];
+        }
+        public Long getAverage(String engine){
+            long[] stats = engineStats.get(engine);
+            return stats == null && stats[2] <= 0 && stats[3] <= 0 ? 
+                    null : Math.round((double)stats[2]/(double)stats[3]);
+        }
+        public int getNumSamples(String engine){
+            long[] stats = engineStats.get(engine);
+            return stats == null ? null : (int)stats[3];
+        }
+        
+    }
+}
\ No newline at end of file

Added: stanbol/trunk/integration-tests/src/test/java/org/apache/stanbol/enhancer/it/NlpProcessingStressTest.java
URL: http://svn.apache.org/viewvc/stanbol/trunk/integration-tests/src/test/java/org/apache/stanbol/enhancer/it/NlpProcessingStressTest.java?rev=1414154&view=auto
==============================================================================
--- stanbol/trunk/integration-tests/src/test/java/org/apache/stanbol/enhancer/it/NlpProcessingStressTest.java (added)
+++ stanbol/trunk/integration-tests/src/test/java/org/apache/stanbol/enhancer/it/NlpProcessingStressTest.java Tue Nov 27 12:38:21 2012
@@ -0,0 +1,21 @@
+package org.apache.stanbol.enhancer.it;
+
+import org.junit.Test;
+
+public class NlpProcessingStressTest extends MultiThreadedTestBase {
+
+    public static final String PROPER_NOUN_LINKING_CHAIN = "dbpedia-proper-noun";
+    
+    public NlpProcessingStressTest(){
+        super();
+    }
+
+    @Test
+    public void testProperNounLinking() throws Exception {
+        TestSettings settings = new TestSettings();
+        settings.setChain(PROPER_NOUN_LINKING_CHAIN);
+        //use the default for the rest of the tests
+        performTest(settings);
+    }
+
+}

Added: stanbol/trunk/integration-tests/src/test/java/org/apache/stanbol/enhancer/it/ProperNounLinkingTest.java
URL: http://svn.apache.org/viewvc/stanbol/trunk/integration-tests/src/test/java/org/apache/stanbol/enhancer/it/ProperNounLinkingTest.java?rev=1414154&view=auto
==============================================================================
--- stanbol/trunk/integration-tests/src/test/java/org/apache/stanbol/enhancer/it/ProperNounLinkingTest.java (added)
+++ stanbol/trunk/integration-tests/src/test/java/org/apache/stanbol/enhancer/it/ProperNounLinkingTest.java Tue Nov 27 12:38:21 2012
@@ -0,0 +1,58 @@
+package org.apache.stanbol.enhancer.it;
+
+import org.junit.Test;
+
+public class ProperNounLinkingTest extends EnhancerTestBase {
+
+    
+    public static final String TEST_TEXT = "The ProperNoun linking Chain can not "
+            + "only detect famous cities such as London and people such as Bob "
+            + "Marley but also books like the Theory of Relativity, events like "
+            + "the French Revolution or the Paris Peace Conference and even "
+            + "prices such as the Nobel Prize in Literature.";
+    
+    /**
+     * 
+     */
+    public ProperNounLinkingTest() {
+        super(getChainEndpoint("dbpedia-proper-noun"), 
+            "langdetect"," LanguageDetectionEnhancementEngine",
+            "opennlp-sentence"," OpenNlpSentenceDetectionEngine",
+            "opennlp-token"," OpenNlpTokenizerEngine",
+            "opennlp-pos","OpenNlpPosTaggingEngine",
+            "opennlp-chunker","OpenNlpChunkingEngine",
+            "dbpedia-proper-noun-extraction","EntityLinkingEngine");
+    }
+    
+    
+    @Test
+    public void testSimpleEnhancement() throws Exception {
+        executor.execute(
+            builder.buildPostRequest(getEndpoint())
+            .withHeader("Accept","text/rdf+nt")
+            .withContent(TEST_TEXT)
+        )
+        .assertStatus(200)
+        .assertContentRegexp( // it MUST detect the language
+                "http://purl.org/dc/terms/creator.*LanguageDetectionEnhancementEngine",
+                "http://purl.org/dc/terms/language.*en",
+                //and the entityLinkingEngine
+                "http://purl.org/dc/terms/creator.*EntityLinkingEngine",
+                //needs to suggest the following Entities
+                "http://fise.iks-project.eu/ontology/entity-reference.*http://dbpedia.org/resource/London",
+                "http://fise.iks-project.eu/ontology/entity-reference.*http://dbpedia.org/resource/Bob_Marley",
+                "http://fise.iks-project.eu/ontology/entity-reference.*http://dbpedia.org/resource/French_Revolution",
+                "http://fise.iks-project.eu/ontology/entity-reference.*http://dbpedia.org/resource/Nobel_Prize_in_Literature",
+                "http://fise.iks-project.eu/ontology/entity-reference.*http://dbpedia.org/resource/Nobel_Prize",
+                "http://fise.iks-project.eu/ontology/entity-reference.*http://dbpedia.org/resource/Paris_Peace_Conference,_1919",
+                "http://fise.iks-project.eu/ontology/entity-reference.*http://dbpedia.org/resource/Theory_of_relativity",
+                "http://fise.iks-project.eu/ontology/entity-reference.*http://dbpedia.org/resource/Theory",
+                //for the following sections within the text
+                "http://fise.iks-project.eu/ontology/selected-text.*Theory of Relativity",
+                "http://fise.iks-project.eu/ontology/selected-text.*Nobel Prize in Literature",
+                "http://fise.iks-project.eu/ontology/selected-text.*Paris Peace Conference",
+                "http://fise.iks-project.eu/ontology/selected-text.*French Revolution");
+    }
+
+    
+}

Propchange: stanbol/trunk/integration-tests/src/test/java/org/apache/stanbol/enhancer/it/ProperNounLinkingTest.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain