You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stanbol.apache.org by rw...@apache.org on 2012/06/30 14:03:52 UTC
svn commit: r1355717 - in /incubator/stanbol/trunk/integration-tests: ./
src/test/java/org/apache/stanbol/enhancer/it/ src/test/resources/
Author: rwesten
Date: Sat Jun 30 12:03:51 2012
New Revision: 1355717
URL: http://svn.apache.org/viewvc?rev=1355717&view=rev
Log:
STANBOL-670: First version of the multi threaded integration test & utility.
TODOs:
* Support for Enhancement Chains.
* Features other than used during the integration test are not yet tested
* Documentation on how to use this utility
Added:
incubator/stanbol/trunk/integration-tests/src/test/java/org/apache/stanbol/enhancer/it/MultiThreadedTest.java (with props)
incubator/stanbol/trunk/integration-tests/src/test/resources/10k_long_abstracts_en.nt.bz2 (with props)
Modified:
incubator/stanbol/trunk/integration-tests/pom.xml
incubator/stanbol/trunk/integration-tests/src/test/java/org/apache/stanbol/enhancer/it/EnhancerTestBase.java
Modified: incubator/stanbol/trunk/integration-tests/pom.xml
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/integration-tests/pom.xml?rev=1355717&r1=1355716&r2=1355717&view=diff
==============================================================================
--- incubator/stanbol/trunk/integration-tests/pom.xml (original)
+++ incubator/stanbol/trunk/integration-tests/pom.xml Sat Jun 30 12:03:51 2012
@@ -141,11 +141,6 @@
</dependency>
<dependency>
<groupId>org.apache.stanbol</groupId>
- <artifactId>org.apache.stanbol.commons.testing.http</artifactId>
- <version>0.10.0-incubating-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>org.apache.stanbol</groupId>
<artifactId>org.apache.stanbol.commons.testing.stanbol</artifactId>
<version>0.10.0-incubating-SNAPSHOT</version>
</dependency>
@@ -166,6 +161,10 @@
<artifactId>commons-io</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-compress</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore-osgi</artifactId>
</dependency>
@@ -198,6 +197,14 @@
<groupId>org.apache.clerezza</groupId>
<artifactId>rdf.jena.serializer</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.clerezza</groupId>
+ <artifactId>rdf.jena.parser</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.clerezza</groupId>
+ <artifactId>rdf.rdfjson</artifactId>
+ </dependency>
</dependencies>
</project>
Modified: incubator/stanbol/trunk/integration-tests/src/test/java/org/apache/stanbol/enhancer/it/EnhancerTestBase.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/integration-tests/src/test/java/org/apache/stanbol/enhancer/it/EnhancerTestBase.java?rev=1355717&r1=1355716&r2=1355717&view=diff
==============================================================================
--- incubator/stanbol/trunk/integration-tests/src/test/java/org/apache/stanbol/enhancer/it/EnhancerTestBase.java (original)
+++ incubator/stanbol/trunk/integration-tests/src/test/java/org/apache/stanbol/enhancer/it/EnhancerTestBase.java Sat Jun 30 12:03:51 2012
@@ -177,7 +177,7 @@ public class EnhancerTestBase extends St
@Override
protected void reportException(Throwable t) {
log.info("Exception in RetryLoop, will retry for up to "
- + getRemainingTimeSeconds() + " seconds: " + t);
+ + getRemainingTimeSeconds() + " seconds: ", t);
}
protected void onTimeout() {
Added: incubator/stanbol/trunk/integration-tests/src/test/java/org/apache/stanbol/enhancer/it/MultiThreadedTest.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/integration-tests/src/test/java/org/apache/stanbol/enhancer/it/MultiThreadedTest.java?rev=1355717&view=auto
==============================================================================
--- incubator/stanbol/trunk/integration-tests/src/test/java/org/apache/stanbol/enhancer/it/MultiThreadedTest.java (added)
+++ incubator/stanbol/trunk/integration-tests/src/test/java/org/apache/stanbol/enhancer/it/MultiThreadedTest.java Sat Jun 30 12:03:51 2012
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.stanbol.enhancer.it;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.zip.GZIPInputStream;
+
+import javax.ws.rs.core.MediaType;
+
+import org.apache.clerezza.rdf.core.Literal;
+import org.apache.clerezza.rdf.core.Resource;
+import org.apache.clerezza.rdf.core.Triple;
+import org.apache.clerezza.rdf.core.UriRef;
+import org.apache.clerezza.rdf.core.impl.SimpleMGraph;
+import org.apache.clerezza.rdf.core.serializedform.Parser;
+import org.apache.clerezza.rdf.core.serializedform.SupportedFormat;
+import org.apache.clerezza.rdf.core.serializedform.UnsupportedFormatException;
+import org.apache.clerezza.rdf.jena.parser.JenaParserProvider;
+import org.apache.clerezza.rdf.rdfjson.parser.RdfJsonParsingProvider;
+import org.apache.commons.compress.archivers.ArchiveEntry;
+import org.apache.commons.compress.archivers.zip.ZipArchiveInputStream;
+import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.http.client.params.ClientPNames;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.impl.conn.PoolingClientConnectionManager;
+import org.apache.http.params.BasicHttpParams;
+import org.apache.http.params.CoreConnectionPNames;
+import org.apache.http.params.CoreProtocolPNames;
+import org.apache.stanbol.commons.testing.http.Request;
+import org.apache.stanbol.commons.testing.http.RequestExecutor;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Test that the default chain is called by requesting the "/enhancer" endpoint. */
+public class MultiThreadedTest extends EnhancerTestBase {
+
+
+ public static final String PROPERTY_TEST_DATA = "stanbol.it.multithreadtest.data";
+ public static final String PROPERTY_TEST_DATA_TYPE = "stanbol.it.multithreadtest.media-type";
+ public static final String PROPERTY_TEST_DATA_PROPERTY = "stanbol.it.multithreadtest.data-property";
+ public static final String PROPERTY_THREADS = "stanbol.it.multithreadtest.threads";
+ public static final String PROPERTY_REQUESTS = "stanbol.it.multithreadtest.requests";
+
+ private static final Logger log = LoggerFactory.getLogger(MultiThreadedTest.class);
+
+ public final static int DEFAULT_NUM_THREADS = 5;
+ public final static int DEFAULT_NUM_REQUESTS = 500;
+ public final static String DEFAULT_TEST_DATA = "10k_long_abstracts_en.nt.bz2";
+ public final static String DEFAULT_TEST_DATA_PROPERTY = "http://dbpedia.org/ontology/abstract";
+
+ private static Parser rdfParser;
+ private static Iterator<String> testDataIterator;
+ /*
+ * We need here a custom http client that uses a connection pool
+ */
+ protected DefaultHttpClient pooledHttpClient;
+ private BasicHttpParams httpParams;
+ private PoolingClientConnectionManager connectionManager;
+
+ public MultiThreadedTest(){
+ super(null,new String[]{});
+ }
+ protected MultiThreadedTest(String endpoint){
+ super(endpoint);
+ }
+ protected MultiThreadedTest(String endpoint,String...assertEngines){
+ super(endpoint,assertEngines);
+ }
+
+ @BeforeClass
+ public static void init() throws IOException {
+ //init the RDF parser
+ rdfParser = new Parser();
+ rdfParser.bindParsingProvider(new JenaParserProvider());
+ rdfParser.bindParsingProvider(new RdfJsonParsingProvider());
+ //init theTestData
+ initTestData();
+ }
+
+
+ @Before
+ public void initialiseHttpClient() {
+ if(this.pooledHttpClient == null){ //init for the first test
+ httpParams = new BasicHttpParams();
+ httpParams.setParameter(CoreProtocolPNames.USER_AGENT, "Stanbol Integration Test");
+ httpParams.setBooleanParameter(ClientPNames.HANDLE_REDIRECTS,true);
+ httpParams.setIntParameter(ClientPNames.MAX_REDIRECTS,3);
+ httpParams.setBooleanParameter(CoreConnectionPNames.SO_KEEPALIVE,true);
+
+ connectionManager = new PoolingClientConnectionManager();
+ connectionManager.setMaxTotal(20);
+ connectionManager.setDefaultMaxPerRoute(20);
+
+ pooledHttpClient = new DefaultHttpClient(connectionManager,httpParams);
+ }
+ }
+
+ @Test
+ public void testMultipleParallelRequests() throws Exception {
+ Integer maxRequests = Integer.getInteger(PROPERTY_REQUESTS,DEFAULT_NUM_REQUESTS);
+ if(maxRequests.intValue() <= 0){
+ maxRequests = Integer.MAX_VALUE;
+ }
+ Integer numThreads = Integer.getInteger(PROPERTY_THREADS,DEFAULT_NUM_THREADS);
+ if(numThreads <= 0){
+ numThreads = DEFAULT_NUM_THREADS;
+ }
+ log.info("Start Multi Thread testing of max. {} requests using {} threads",
+ maxRequests,numThreads);
+ ExcutionTracker tracker = new ExcutionTracker(Executors.newFixedThreadPool(numThreads));
+ int testNum;
+ for(testNum = 0;testDataIterator.hasNext() && testNum < maxRequests; testNum++){
+ String test = testDataIterator.next();
+ Request request = builder.buildPostRequest(getEndpoint())
+ .withHeader("Accept","text/rdf+nt")
+ .withContent(test);
+ tracker.register(request);
+ if(testNum%100 == 0){
+ log.info(" ... sent {} Requests ({} finished, {} pending, {} failed",
+ new Object[]{testNum,tracker.getNumCompleted(),
+ tracker.getPending().size(),tracker.getFailed().size()});
+ }
+ }
+ log.info("> All {} requests sent!",testNum);
+ log.info(" ... wait for all requests to complete");
+ while(!tracker.getPending().isEmpty()){
+ tracker.wait(3);
+ log.info(" ... {} finished, {} pending, {} failed",
+ new Object[]{tracker.getNumCompleted(),tracker.getPending().size(),tracker.getFailed().size()});
+ }
+ Assert.assertTrue(tracker.getFailed()+"/"+numThreads+" failed", tracker.getFailed().isEmpty());
+ }
+
+ /* -------------------------------------------------------------
+ * Utilities for reading the Test Data from the defined source
+ * -------------------------------------------------------------
+ */
+
+ private static void initTestData() throws IOException {
+ String testData = System.getProperty(PROPERTY_TEST_DATA, DEFAULT_TEST_DATA);
+ File testFile = new File(testData);
+ InputStream is = null;
+ if(testFile.isFile()){
+ is = new FileInputStream(testFile);
+ }
+ if(is == null) {
+ is = MultiThreadedTest.class.getClassLoader().getResourceAsStream(testData);
+ }
+ if(is == null){
+ is = ClassLoader.getSystemResourceAsStream(testData);
+ }
+ if(is == null){
+ try {
+ is = new URL(testData).openStream();
+ }catch (MalformedURLException e) {
+ //not a URL
+ }
+ }
+ Assert.assertNotNull("Unable to load the parsed TestData '"
+ +testData+"'!", is != null);
+
+ String name = FilenameUtils.getName(testData);
+ if ("gz".equalsIgnoreCase(FilenameUtils.getExtension(name))) {
+ is = new GZIPInputStream(is);
+ name = FilenameUtils.removeExtension(name);
+ log.debug(" - from GZIP Archive");
+ } else if ("bz2".equalsIgnoreCase(FilenameUtils.getExtension(name))) {
+ is = new BZip2CompressorInputStream(is);
+ name = FilenameUtils.removeExtension(name);
+ log.debug(" - from BZip2 Archive");
+ } else if ("zip".equalsIgnoreCase(FilenameUtils.getExtension(name))) {
+ ZipArchiveInputStream zipin = new ZipArchiveInputStream(is);
+ ArchiveEntry entry = zipin.getNextEntry();
+ log.info("For ZIP archives only the 1st Entry will be processed!");
+ name = FilenameUtils.getName(entry.getName());
+ log.info(" - processed Entry: {}",entry.getName());
+ } // else uncompressed data ...
+
+ String mediaTypeString = System.getProperty(PROPERTY_TEST_DATA_TYPE);
+ MediaType mediaType;
+ if(mediaTypeString != null){
+ mediaType = MediaType.valueOf(mediaTypeString);
+ } else { //parse based on extension
+ String ext = FilenameUtils.getExtension(name);
+ if("txt".equalsIgnoreCase(ext)){
+ mediaType = MediaType.TEXT_PLAIN_TYPE;
+ } else if("rdf".equalsIgnoreCase(ext)){
+ mediaType = MediaType.valueOf(SupportedFormat.RDF_XML);
+ } else if("xml".equalsIgnoreCase(ext)){
+ mediaType = MediaType.valueOf(SupportedFormat.RDF_XML);
+ } else if("ttl".equalsIgnoreCase(ext)){
+ mediaType = MediaType.valueOf(SupportedFormat.TURTLE);
+ } else if("n3".equalsIgnoreCase(ext)){
+ mediaType = MediaType.valueOf(SupportedFormat.N3);
+ } else if("nt".equalsIgnoreCase(ext)){
+ mediaType = MediaType.valueOf(SupportedFormat.N_TRIPLE);
+ } else if("json".equalsIgnoreCase(ext)){
+ mediaType = MediaType.valueOf(SupportedFormat.RDF_JSON);
+ } else if(name.indexOf('.')<0){ //no extension
+ mediaType = MediaType.TEXT_PLAIN_TYPE; //try plain text
+ } else {
+ log.info("Unkown File Extension {} for resource name {}",
+ ext,name);
+ mediaType = null;
+ }
+ }
+ Assert.assertNotNull("Unable to detect MediaType for Resource '"
+ + name+"'. Please use the property '"+PROPERTY_TEST_DATA_TYPE
+ + "' to manually parse the MediaType!", mediaType);
+
+ //now init the iterator for the test data
+ testDataIterator = mediaType.isCompatible(MediaType.TEXT_PLAIN_TYPE) ?
+ createTextDataIterator(is, mediaType) :
+ createRdfDataIterator(is,mediaType);
+ }
+ /**
+ * @param is
+ * @param mediaType
+ */
+ private static Iterator<String> createRdfDataIterator(InputStream is, MediaType mediaType) {
+ final SimpleMGraph graph = new SimpleMGraph();
+ try {
+ rdfParser.parse(graph, is, mediaType.toString());
+ } catch (UnsupportedFormatException e) {
+ Assert.fail("The MimeType '"+mediaType+"' of the parsed testData "
+ + "is not supported. This utility supports plain text files as "
+ + "as well as the RDF formats "+rdfParser.getSupportedFormats()
+ + "If your test data uses one of those formats but it was not "
+ + "correctly detected you can use the System property '"
+ + PROPERTY_TEST_DATA_TYPE + "' to manually parse the Media-Type!");
+ }
+ return new Iterator<String>() {
+ Iterator<Triple> it = null;
+ String next = null;
+ private String getNext(){
+ if(it == null){
+ UriRef property;
+ String propertyString = System.getProperty(PROPERTY_TEST_DATA_PROPERTY,DEFAULT_TEST_DATA_PROPERTY);
+ propertyString.trim();
+ if("*".equals(propertyString)){
+ property = null; //wildcard
+ } else {
+ property = new UriRef(propertyString);
+ }
+ it = graph.filter(null, property, null);
+ }
+ while(it.hasNext()){
+ Resource value = it.next().getObject();
+ if(value instanceof Literal){
+ return ((Literal)value).getLexicalForm();
+ }
+ }
+ return null; //no more data
+ }
+
+ @Override
+ public boolean hasNext() {
+ if(next == null){
+ next = getNext();
+ }
+ return next != null;
+ }
+
+ @Override
+ public String next() {
+ if(next == null){
+ next = getNext();
+ }
+ if(next == null){
+ throw new NoSuchElementException("No further testData available");
+ } else {
+ String elem = next;
+ next = null;
+ return elem;
+ }
+
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ };
+ }
+ /**
+ * @param is
+ * @param mediaType
+ */
+ private static Iterator<String> createTextDataIterator(InputStream is, MediaType mediaType) {
+ String charsetString = mediaType.getParameters().get("charset");
+ Charset charset = Charset.forName(charsetString == null ? "UTF-8" : charsetString);
+ log.info(" ... using charset {} for parsing Text data",charset);
+ final BufferedReader reader = new BufferedReader(new InputStreamReader(is, charset));
+ return new Iterator<String>() {
+ String next = null;
+ private String getNext(){
+ String line;
+ StringBuilder data = new StringBuilder();
+ int emtptyLines = 0;
+ try {
+ while((line = reader.readLine()) != null && emtptyLines < 2){
+ data.append(line).append('\n');
+ if(line.isEmpty()){
+ emtptyLines++;
+ } else {
+ emtptyLines = 0;
+ }
+ }
+ } catch (IOException e) {
+ log.warn("IOException while reading from Stream",e);
+ Assert.fail("IOException while reading from Stream");
+ }
+ return data.length() == 0 ? null : data.toString();
+ }
+ @Override
+ public boolean hasNext() {
+ if(next == null){
+ next = getNext();
+ }
+ return next != null;
+ }
+
+ @Override
+ public String next() {
+ if(next == null){
+ next = getNext();
+ }
+ if(next == null){
+ throw new NoSuchElementException("No further testData available");
+ } else {
+ String elem = next;
+ next = null;
+ return elem;
+ }
+
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ };
+ }
+
+
+
+ /* -------------------------------------------------------------
+ * Utilities for executing and tracking the concurrent Requests
+ * -------------------------------------------------------------
+ */
+
+ private class ExcutionTracker {
+
+ private int completed = 0;
+ private final Set<Request> registered = Collections.synchronizedSet(new HashSet<Request>());
+ private final List<Request> failed = Collections.synchronizedList(new ArrayList<Request>());
+ private ExecutorService executorService;
+
+ public ExcutionTracker(ExecutorService executorService) {
+ this.executorService = executorService;
+ }
+
+ public void register(Request request){
+ registered.add(request);
+ executorService.execute(new AsyncExecuter(request, this));
+ }
+
+ public void succeed(Request request) {
+ if(registered.remove(request)){
+ completed++;
+ }
+ }
+
+ public void failed(Request request) {
+ if(registered.remove(request)){
+ completed++;
+ }
+ failed.add(request);
+ }
+
+ public Set<Request> getPending(){
+ return registered;
+ }
+
+ public List<Request> getFailed(){
+ return failed;
+ }
+ public int getNumCompleted(){
+ return completed;
+ }
+ public void wait(int seconds){
+ try {
+ executorService.awaitTermination(seconds, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ }
+ }
+
+ }
+ private class AsyncExecuter implements Runnable{
+
+ private Request request;
+ private ExcutionTracker tracker;
+ protected AsyncExecuter(Request request, ExcutionTracker tracker){
+ this.request = request;
+ this.tracker = tracker;
+ }
+ @Override
+ public void run() {
+ try {
+ RequestExecutor executor = new RequestExecutor(pooledHttpClient);
+ executor.execute(request).assertStatus(200);
+ tracker.succeed(request);
+ } catch (Throwable e) {
+ log.warn("Error while sending Request ",e);
+ tracker.failed(request);
+ }
+ }
+
+ }
+
+}
Propchange: incubator/stanbol/trunk/integration-tests/src/test/java/org/apache/stanbol/enhancer/it/MultiThreadedTest.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/stanbol/trunk/integration-tests/src/test/resources/10k_long_abstracts_en.nt.bz2
URL: http://svn.apache.org/viewvc/incubator/stanbol/trunk/integration-tests/src/test/resources/10k_long_abstracts_en.nt.bz2?rev=1355717&view=auto
==============================================================================
Binary file - no diff available.
Propchange: incubator/stanbol/trunk/integration-tests/src/test/resources/10k_long_abstracts_en.nt.bz2
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream