You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@marmotta.apache.org by ss...@apache.org on 2013/06/21 10:42:43 UTC
git commit: improved concurrency of new triplestore implementation
Updated Branches:
refs/heads/develop a29e8a6f2 -> 85915f99d
improved concurrency of new triplestore implementation
Project: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/commit/85915f99
Tree: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/tree/85915f99
Diff: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/diff/85915f99
Branch: refs/heads/develop
Commit: 85915f99d8c15deaf5c8b924ae23f9ab3579cca7
Parents: a29e8a6
Author: Sebastian Schaffert <ss...@apache.org>
Authored: Fri Jun 21 10:42:34 2013 +0200
Committer: Sebastian Schaffert <ss...@apache.org>
Committed: Fri Jun 21 10:42:34 2013 +0200
----------------------------------------------------------------------
libraries/kiwi/kiwi-triplestore/pom.xml | 6 +
.../kiwi/persistence/KiWiConnection.java | 60 ++-----
.../marmotta/kiwi/persistence/KiWiDialect.java | 19 +-
.../kiwi/persistence/KiWiPersistence.java | 65 ++++++-
.../apache/marmotta/kiwi/sail/KiWiStore.java | 2 +-
.../marmotta/kiwi/test/H2ConcurrencyTest.java | 176 +++++++++++++++++++
.../kiwi/test/MySQLConcurrencyTest.java | 174 ++++++++++++++++++
.../kiwi/test/PostgreSQLConcurrencyTest.java | 174 ++++++++++++++++++
8 files changed, 617 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/85915f99/libraries/kiwi/kiwi-triplestore/pom.xml
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-triplestore/pom.xml b/libraries/kiwi/kiwi-triplestore/pom.xml
index 28defe4..bbb69a9 100644
--- a/libraries/kiwi/kiwi-triplestore/pom.xml
+++ b/libraries/kiwi/kiwi-triplestore/pom.xml
@@ -176,6 +176,12 @@
<artifactId>sesame-repository-sail</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>com.google.code.tempus-fugit</groupId>
+ <artifactId>tempus-fugit</artifactId>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/85915f99/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java
index 46b44ed..e0f7522 100644
--- a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java
+++ b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java
@@ -45,6 +45,7 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.*;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
/**
@@ -1617,51 +1618,9 @@ public class KiWiConnection {
* @throws SQLException
*/
public long getNextSequence(String sequenceName) throws SQLException {
- if(batchCommit && persistence.getConfiguration().isMemorySequences()) {
- // use in-memory sequences
- if(persistence.getMemorySequences() == null) {
- persistence.setMemorySequences(new HashMap<String,Long>());
- }
-
- synchronized (persistence.getMemorySequences()) {
- Long sequence = persistence.getMemorySequences().get(sequenceName);
- if(sequence == null) {
- requireJDBCConnection();
-
- // load sequence value from database
- // if there is a preparation needed to update the transaction, run it first
- if(dialect.hasStatement(sequenceName+".prep")) {
- PreparedStatement prepNodeId = getPreparedStatement(sequenceName+".prep");
- prepNodeId.executeUpdate();
- }
-
- PreparedStatement queryNodeId = getPreparedStatement(sequenceName);
- ResultSet resultNodeId = queryNodeId.executeQuery();
- try {
- if(resultNodeId.next()) {
- sequence = resultNodeId.getLong(1);
- } else {
- throw new SQLException("the sequence did not return a new value");
- }
- } finally {
- resultNodeId.close();
- }
-
- // this is a very ugly hack needed by MySQL because the sequences are not atomic
- // it is only necessary for the nodes sequence, because the value factory always keeps
- // a connection open to it; we could solve it maybe differently by remembering the
- // connection responsible for a in-memory sequence...
- if(sequenceName.equals("seq.nodes") && dialect instanceof MySQLDialect) {
- connection.commit();
- }
-
- } else {
- sequence += 1;
- }
- persistence.getMemorySequences().put(sequenceName,sequence);
- return sequence;
- }
-
+ if(batchCommit && persistence.getConfiguration().isMemorySequences() && persistence.getMemorySequences().containsKey(sequenceName)) {
+ AtomicLong sequence = persistence.getMemorySequences().get(sequenceName);
+ return sequence.incrementAndGet();
} else {
requireJDBCConnection();
@@ -1879,10 +1838,10 @@ public class KiWiConnection {
requireJDBCConnection();
try {
- synchronized (persistence.getMemorySequences()) {
- for(Map.Entry<String,Long> entry : persistence.getMemorySequences().entrySet()) {
+ for(Map.Entry<String,AtomicLong> entry : persistence.getMemorySequences().entrySet()) {
+ if( entry.getValue().get() > 0) {
PreparedStatement updateSequence = getPreparedStatement(entry.getKey()+".set");
- updateSequence.setLong(1, entry.getValue());
+ updateSequence.setLong(1, entry.getValue().get());
if(updateSequence.execute()) {
updateSequence.getResultSet().close();
} else {
@@ -2028,8 +1987,11 @@ public class KiWiConnection {
}
insertTriple.executeBatch();
- } catch (Throwable ex) {
+ } catch (SQLException ex) {
+ System.err.println("main exception:");
ex.printStackTrace();
+ System.err.println("next exception:");
+ ex.getNextException().printStackTrace();
throw ex;
} finally {
commitLock.unlock();
http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/85915f99/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiDialect.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiDialect.java b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiDialect.java
index bd4879b..dfde5cb 100644
--- a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiDialect.java
+++ b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiDialect.java
@@ -22,9 +22,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URL;
-import java.util.Enumeration;
-import java.util.Properties;
-import java.util.Set;
+import java.util.*;
/**
* A dialect provides the SQL statements necessary to access the different types of database systems. Each
@@ -159,6 +157,21 @@ public abstract class KiWiDialect {
return statements.stringPropertyNames();
}
+ /**
+ * Return the names of all sequences that have been configured in the system, i.e. all statements starting with "seq."
+ * @return
+ */
+ public Set<String> listSequences() {
+ Set<String> names = new HashSet<String>();
+ Enumeration e = statements.propertyNames();
+ while(e.hasMoreElements()) {
+ String[] keys = e.nextElement().toString().split("\\.");
+ if(keys[0].equals("seq")) {
+ names.add(keys[0] + "." + keys[1]);
+ }
+ }
+ return names;
+ }
/**
* Return the database specific operator for matching a text against a regular expression.
http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/85915f99/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiPersistence.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiPersistence.java b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiPersistence.java
index b1f4a4d..eab9729 100644
--- a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiPersistence.java
+++ b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiPersistence.java
@@ -22,6 +22,7 @@ import org.apache.marmotta.kiwi.config.KiWiConfiguration;
import org.apache.marmotta.kiwi.model.rdf.KiWiNode;
import org.apache.marmotta.kiwi.model.rdf.KiWiResource;
import org.apache.marmotta.kiwi.model.rdf.KiWiUriResource;
+import org.apache.marmotta.kiwi.persistence.mysql.MySQLDialect;
import org.apache.marmotta.kiwi.persistence.util.ScriptRunner;
import org.apache.marmotta.kiwi.sail.KiWiValueFactory;
import org.apache.tomcat.jdbc.pool.DataSource;
@@ -35,11 +36,15 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.StringReader;
import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.WeakHashMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
/**
* Add file description here!
@@ -73,7 +78,7 @@ public class KiWiPersistence {
* A map holding in-memory sequences to be used for sequence caching in case the appropriate configuration option
* is configued and batched commits are enabled.
*/
- private Map<String,Long> memorySequences;
+ private Map<String,AtomicLong> memorySequences;
/**
@@ -138,13 +143,13 @@ public class KiWiPersistence {
if(configuration.isQueryLoggingEnabled()) {
poolConfig.setJdbcInterceptors(
"org.apache.tomcat.jdbc.pool.interceptor.ConnectionState;" +
- "org.apache.tomcat.jdbc.pool.interceptor.StatementFinalizer;" +
- "org.apache.tomcat.jdbc.pool.interceptor.SlowQueryReport"
+ "org.apache.tomcat.jdbc.pool.interceptor.StatementFinalizer;" +
+ "org.apache.tomcat.jdbc.pool.interceptor.SlowQueryReport"
);
} else {
poolConfig.setJdbcInterceptors(
"org.apache.tomcat.jdbc.pool.interceptor.ConnectionState;" +
- "org.apache.tomcat.jdbc.pool.interceptor.StatementFinalizer"
+ "org.apache.tomcat.jdbc.pool.interceptor.StatementFinalizer"
);
}
@@ -170,6 +175,49 @@ public class KiWiPersistence {
}
+ /**
+ * Initialise in-memory sequences if the feature is enabled.
+ */
+ public void initSequences() {
+ if(configuration.isBatchCommit() && configuration.isMemorySequences()) {
+ memorySequences = new ConcurrentHashMap<String,AtomicLong>();
+
+ try {
+ Connection con = getJDBCConnection();
+ try {
+ for(String sequenceName : getDialect().listSequences()) {
+
+ // load sequence value from database
+ // if there is a preparation needed to update the transaction, run it first
+ if(getDialect().hasStatement(sequenceName+".prep")) {
+ PreparedStatement prepNodeId = con.prepareStatement(getDialect().getStatement(sequenceName+".prep"));
+ prepNodeId.executeUpdate();
+ prepNodeId.close();
+ }
+
+ PreparedStatement queryNodeId = con.prepareStatement(getDialect().getStatement(sequenceName));
+ ResultSet resultNodeId = queryNodeId.executeQuery();
+ try {
+ if(resultNodeId.next()) {
+ memorySequences.put(sequenceName,new AtomicLong(resultNodeId.getLong(1)-1));
+ } else {
+ throw new SQLException("the sequence did not return a new value");
+ }
+ } finally {
+ resultNodeId.close();
+ }
+
+ con.commit();
+ }
+ } finally {
+ con.close();
+ }
+ } catch(SQLException ex) {
+ log.warn("database error: could not initialise in-memory sequences",ex);
+ }
+ }
+ }
+
public void logPoolInfo() throws SQLException {
log.debug("num_busy_connections: {}", connectionPool.getNumActive());
log.debug("num_idle_connections: {}", connectionPool.getNumIdle());
@@ -236,6 +284,9 @@ public class KiWiPersistence {
} finally {
connection.close();
}
+
+ // init the in-memory sequences
+ initSequences();
}
/**
@@ -453,11 +504,13 @@ public class KiWiPersistence {
return configuration;
}
- public Map<String, Long> getMemorySequences() {
+ public Map<String, AtomicLong> getMemorySequences() {
return memorySequences;
}
- public void setMemorySequences(Map<String, Long> memorySequences) {
+ public void setMemorySequences(Map<String, AtomicLong> memorySequences) {
this.memorySequences = memorySequences;
}
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/85915f99/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/sail/KiWiStore.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/sail/KiWiStore.java b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/sail/KiWiStore.java
index 47ffccb..1438946 100644
--- a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/sail/KiWiStore.java
+++ b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/sail/KiWiStore.java
@@ -205,7 +205,7 @@ public class KiWiStore extends NotifyingSailBase {
* @return a ValueFactory object for this Sail object.
*/
@Override
- public ValueFactory getValueFactory() {
+ public synchronized ValueFactory getValueFactory() {
if(repositoryValueFactory == null) {
repositoryValueFactory = new KiWiValueFactory(this, defaultContext);
persistence.setValueFactory(repositoryValueFactory);
http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/85915f99/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/H2ConcurrencyTest.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/H2ConcurrencyTest.java b/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/H2ConcurrencyTest.java
new file mode 100644
index 0000000..a51dbc0
--- /dev/null
+++ b/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/H2ConcurrencyTest.java
@@ -0,0 +1,176 @@
+package org.apache.marmotta.kiwi.test;
+
+import com.google.code.tempusfugit.concurrency.ConcurrentRule;
+import com.google.code.tempusfugit.concurrency.RepeatingRule;
+import com.google.code.tempusfugit.concurrency.annotations.Concurrent;
+import com.google.code.tempusfugit.concurrency.annotations.Repeating;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.marmotta.kiwi.persistence.KiWiDialect;
+import org.apache.marmotta.kiwi.persistence.h2.H2Dialect;
+import org.apache.marmotta.kiwi.persistence.mysql.MySQLDialect;
+import org.apache.marmotta.kiwi.persistence.pgsql.PostgreSQLDialect;
+import org.apache.marmotta.kiwi.sail.KiWiStore;
+import org.apache.marmotta.kiwi.test.helper.DBConnectionChecker;
+import org.junit.*;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.openrdf.model.URI;
+import org.openrdf.model.Value;
+import org.openrdf.repository.Repository;
+import org.openrdf.repository.RepositoryConnection;
+import org.openrdf.repository.RepositoryException;
+import org.openrdf.repository.sail.SailRepository;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * This test starts many triplestore operations in parallel to check if concurrent operations will break things,
+ *
+ * It will try running over all available databases. Except for in-memory databases like H2 or Derby, database URLs must be passed as
+ * system property, or otherwise the test is skipped for this database. Available system properties:
+ * <ul>
+ * <li>PostgreSQL:
+ * <ul>
+ * <li>postgresql.url, e.g. jdbc:postgresql://localhost:5433/kiwitest?prepareThreshold=3</li>
+ * <li>postgresql.user (default: lmf)</li>
+ * <li>postgresql.pass (default: lmf)</li>
+ * </ul>
+ * </li>
+ * <li>MySQL:
+ * <ul>
+ * <li>mysql.url, e.g. jdbc:mysql://localhost:3306/kiwitest?characterEncoding=utf8&zeroDateTimeBehavior=convertToNull</li>
+ * <li>mysql.user (default: lmf)</li>
+ * <li>mysql.pass (default: lmf)</li>
+ * </ul>
+ * </li>
+ * <li>H2:
+ * <ul>
+ * <li>h2.url, e.g. jdbc:h2:mem;MVCC=true;DB_CLOSE_ON_EXIT=FALSE;DB_CLOSE_DELAY=10</li>
+ * <li>h2.user (default: lmf)</li>
+ * <li>h2.pass (default: lmf)</li>
+ * </ul>
+ * </li>
+ * </ul>
+ * <p/>
+ * Author: Sebastian Schaffert
+
+ *
+ * @author Sebastian Schaffert (sschaffert@apache.org)
+ */
+public class H2ConcurrencyTest {
+
+
+ @Rule public ConcurrentRule concurrently = new ConcurrentRule();
+ @Rule public RepeatingRule repeatedly = new RepeatingRule();
+
+ @Rule
+ public TestWatcher watchman = new TestWatcher() {
+ /**
+ * Invoked when a test is about to start
+ */
+ @Override
+ protected void starting(Description description) {
+ logger.info("{} being run...", description.getMethodName());
+ }
+ };
+
+
+ private static KiWiDialect dialect;
+
+ private static String jdbcUrl;
+
+ private static String jdbcUser;
+
+ private static String jdbcPass;
+
+ private static Repository repository;
+
+ private static KiWiStore store;
+
+ private static Random rnd;
+
+ private static long runs = 0;
+
+ @BeforeClass
+ public static void setup() throws RepositoryException {
+ jdbcPass = System.getProperty("h2.pass","lmf");
+ jdbcUrl = System.getProperty("h2.url");
+ jdbcUser = System.getProperty("h2.user","lmf");
+ rnd = new Random();
+
+ dialect = new H2Dialect();
+
+ DBConnectionChecker.checkDatabaseAvailability(jdbcUrl, jdbcUser, jdbcPass, dialect);
+
+ store = new KiWiStore("test",jdbcUrl,jdbcUser,jdbcPass,dialect, "http://localhost/context/default", "http://localhost/context/inferred" );
+ repository = new SailRepository(store);
+ repository.initialize();
+ }
+
+ @AfterClass
+ public static void dropDatabase() throws RepositoryException, SQLException {
+ store.closeValueFactory(); // release all connections before dropping the database
+ store.getPersistence().dropDatabase();
+ repository.shutDown();
+ }
+
+ @Override
+ protected void finalize() throws Throwable {
+ super.finalize();
+ }
+
+ final Logger logger =
+ LoggerFactory.getLogger(RepositoryTest.class);
+
+
+ long tripleCount = 0;
+
+ @Test
+ @Concurrent(count = 10)
+ @Repeating(repetition = 10)
+ public void testConcurrency() throws Exception {
+ runs++;
+
+ // generate random nodes and triples and add them
+ RepositoryConnection con = repository.getConnection();
+ try {
+ for(int i=0; i< rnd.nextInt(1000); i++) {
+ URI subject = repository.getValueFactory().createURI("http://localhost/"+ RandomStringUtils.randomAlphanumeric(8));
+ URI predicate = repository.getValueFactory().createURI("http://localhost/"+ RandomStringUtils.randomAlphanumeric(8));
+ Value object;
+ switch(rnd.nextInt(6)) {
+ case 0: object = repository.getValueFactory().createURI("http://localhost/"+ RandomStringUtils.randomAlphanumeric(8));
+ break;
+ case 1: object = repository.getValueFactory().createBNode();
+ break;
+ case 2: object = repository.getValueFactory().createLiteral(RandomStringUtils.random(40));
+ break;
+ case 3: object = repository.getValueFactory().createLiteral(rnd.nextInt());
+ break;
+ case 4: object = repository.getValueFactory().createLiteral(rnd.nextDouble());
+ break;
+ case 5: object = repository.getValueFactory().createLiteral(rnd.nextBoolean());
+ break;
+ default: object = repository.getValueFactory().createURI("http://localhost/"+ RandomStringUtils.randomAlphanumeric(8));
+ break;
+
+ }
+ con.add(subject,predicate,object);
+ tripleCount++;
+ }
+ con.commit();
+ } finally {
+ con.close();
+ }
+
+
+ logger.info("triple count: {}", tripleCount);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/85915f99/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/MySQLConcurrencyTest.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/MySQLConcurrencyTest.java b/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/MySQLConcurrencyTest.java
new file mode 100644
index 0000000..6b543c4
--- /dev/null
+++ b/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/MySQLConcurrencyTest.java
@@ -0,0 +1,174 @@
+package org.apache.marmotta.kiwi.test;
+
+import com.google.code.tempusfugit.concurrency.ConcurrentRule;
+import com.google.code.tempusfugit.concurrency.RepeatingRule;
+import com.google.code.tempusfugit.concurrency.annotations.Concurrent;
+import com.google.code.tempusfugit.concurrency.annotations.Repeating;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.marmotta.kiwi.persistence.KiWiDialect;
+import org.apache.marmotta.kiwi.persistence.h2.H2Dialect;
+import org.apache.marmotta.kiwi.persistence.mysql.MySQLDialect;
+import org.apache.marmotta.kiwi.sail.KiWiStore;
+import org.apache.marmotta.kiwi.test.helper.DBConnectionChecker;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+import org.openrdf.model.URI;
+import org.openrdf.model.Value;
+import org.openrdf.repository.Repository;
+import org.openrdf.repository.RepositoryConnection;
+import org.openrdf.repository.RepositoryException;
+import org.openrdf.repository.sail.SailRepository;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.SQLException;
+import java.util.Random;
+
+/**
+ * This test starts many triplestore operations in parallel to check if concurrent operations will break things,
+ *
+ * It will try running over all available databases. Except for in-memory databases like H2 or Derby, database URLs must be passed as
+ * system property, or otherwise the test is skipped for this database. Available system properties:
+ * <ul>
+ * <li>PostgreSQL:
+ * <ul>
+ * <li>postgresql.url, e.g. jdbc:postgresql://localhost:5433/kiwitest?prepareThreshold=3</li>
+ * <li>postgresql.user (default: lmf)</li>
+ * <li>postgresql.pass (default: lmf)</li>
+ * </ul>
+ * </li>
+ * <li>MySQL:
+ * <ul>
+ * <li>mysql.url, e.g. jdbc:mysql://localhost:3306/kiwitest?characterEncoding=utf8&zeroDateTimeBehavior=convertToNull</li>
+ * <li>mysql.user (default: lmf)</li>
+ * <li>mysql.pass (default: lmf)</li>
+ * </ul>
+ * </li>
+ * <li>H2:
+ * <ul>
+ * <li>h2.url, e.g. jdbc:h2:mem;MVCC=true;DB_CLOSE_ON_EXIT=FALSE;DB_CLOSE_DELAY=10</li>
+ * <li>h2.user (default: lmf)</li>
+ * <li>h2.pass (default: lmf)</li>
+ * </ul>
+ * </li>
+ * </ul>
+ * <p/>
+ * Author: Sebastian Schaffert
+
+ *
+ * @author Sebastian Schaffert (sschaffert@apache.org)
+ */
+public class MySQLConcurrencyTest {
+
+
+ @Rule public ConcurrentRule concurrently = new ConcurrentRule();
+ @Rule public RepeatingRule repeatedly = new RepeatingRule();
+
+ @Rule
+ public TestWatcher watchman = new TestWatcher() {
+ /**
+ * Invoked when a test is about to start
+ */
+ @Override
+ protected void starting(Description description) {
+ logger.info("{} being run...", description.getMethodName());
+ }
+ };
+
+
+ private static KiWiDialect dialect;
+
+ private static String jdbcUrl;
+
+ private static String jdbcUser;
+
+ private static String jdbcPass;
+
+ private static Repository repository;
+
+ private static KiWiStore store;
+
+ private static Random rnd;
+
+ private static long runs = 0;
+
+ @BeforeClass
+ public static void setup() throws RepositoryException {
+ jdbcPass = System.getProperty("mysql.pass","lmf");
+ jdbcUrl = System.getProperty("mysql.url");
+ jdbcUser = System.getProperty("mysql.user","lmf");
+ rnd = new Random();
+
+ dialect = new MySQLDialect();
+
+ DBConnectionChecker.checkDatabaseAvailability(jdbcUrl, jdbcUser, jdbcPass, dialect);
+
+ store = new KiWiStore("test",jdbcUrl,jdbcUser,jdbcPass,dialect, "http://localhost/context/default", "http://localhost/context/inferred" );
+ repository = new SailRepository(store);
+ repository.initialize();
+ }
+
+ @AfterClass
+ public static void dropDatabase() throws RepositoryException, SQLException {
+ store.closeValueFactory(); // release all connections before dropping the database
+ store.getPersistence().dropDatabase();
+ repository.shutDown();
+ }
+
+ @Override
+ protected void finalize() throws Throwable {
+ super.finalize();
+ }
+
+ final Logger logger =
+ LoggerFactory.getLogger(RepositoryTest.class);
+
+
+ long tripleCount = 0;
+
+ @Test
+ @Concurrent(count = 10)
+ @Repeating(repetition = 10)
+ public void testConcurrency() throws Exception {
+ runs++;
+
+ // generate random nodes and triples and add them
+ RepositoryConnection con = repository.getConnection();
+ try {
+ for(int i=0; i< rnd.nextInt(1000); i++) {
+ URI subject = repository.getValueFactory().createURI("http://localhost/"+ RandomStringUtils.randomAlphanumeric(8));
+ URI predicate = repository.getValueFactory().createURI("http://localhost/"+ RandomStringUtils.randomAlphanumeric(8));
+ Value object;
+ switch(rnd.nextInt(6)) {
+ case 0: object = repository.getValueFactory().createURI("http://localhost/"+ RandomStringUtils.randomAlphanumeric(8));
+ break;
+ case 1: object = repository.getValueFactory().createBNode();
+ break;
+ case 2: object = repository.getValueFactory().createLiteral(RandomStringUtils.random(40));
+ break;
+ case 3: object = repository.getValueFactory().createLiteral(rnd.nextInt());
+ break;
+ case 4: object = repository.getValueFactory().createLiteral(rnd.nextDouble());
+ break;
+ case 5: object = repository.getValueFactory().createLiteral(rnd.nextBoolean());
+ break;
+ default: object = repository.getValueFactory().createURI("http://localhost/"+ RandomStringUtils.randomAlphanumeric(8));
+ break;
+
+ }
+ con.add(subject,predicate,object);
+ tripleCount++;
+ }
+ con.commit();
+ } finally {
+ con.close();
+ }
+
+
+ logger.info("triple count: {}", tripleCount);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/85915f99/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/PostgreSQLConcurrencyTest.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/PostgreSQLConcurrencyTest.java b/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/PostgreSQLConcurrencyTest.java
new file mode 100644
index 0000000..414d179
--- /dev/null
+++ b/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/PostgreSQLConcurrencyTest.java
@@ -0,0 +1,174 @@
+package org.apache.marmotta.kiwi.test;
+
+import com.google.code.tempusfugit.concurrency.ConcurrentRule;
+import com.google.code.tempusfugit.concurrency.RepeatingRule;
+import com.google.code.tempusfugit.concurrency.annotations.Concurrent;
+import com.google.code.tempusfugit.concurrency.annotations.Repeating;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.marmotta.kiwi.persistence.KiWiDialect;
+import org.apache.marmotta.kiwi.persistence.mysql.MySQLDialect;
+import org.apache.marmotta.kiwi.persistence.pgsql.PostgreSQLDialect;
+import org.apache.marmotta.kiwi.sail.KiWiStore;
+import org.apache.marmotta.kiwi.test.helper.DBConnectionChecker;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+import org.openrdf.model.URI;
+import org.openrdf.model.Value;
+import org.openrdf.repository.Repository;
+import org.openrdf.repository.RepositoryConnection;
+import org.openrdf.repository.RepositoryException;
+import org.openrdf.repository.sail.SailRepository;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.SQLException;
+import java.util.Random;
+
+/**
+ * This test starts many triplestore operations in parallel to check if concurrent operations will break things,
+ *
+ * It will try running over all available databases. Except for in-memory databases like H2 or Derby, database URLs must be passed as
+ * system property, or otherwise the test is skipped for this database. Available system properties:
+ * <ul>
+ * <li>PostgreSQL:
+ * <ul>
+ * <li>postgresql.url, e.g. jdbc:postgresql://localhost:5433/kiwitest?prepareThreshold=3</li>
+ * <li>postgresql.user (default: lmf)</li>
+ * <li>postgresql.pass (default: lmf)</li>
+ * </ul>
+ * </li>
+ * <li>MySQL:
+ * <ul>
+ * <li>mysql.url, e.g. jdbc:mysql://localhost:3306/kiwitest?characterEncoding=utf8&zeroDateTimeBehavior=convertToNull</li>
+ * <li>mysql.user (default: lmf)</li>
+ * <li>mysql.pass (default: lmf)</li>
+ * </ul>
+ * </li>
+ * <li>H2:
+ * <ul>
+ * <li>h2.url, e.g. jdbc:h2:mem;MVCC=true;DB_CLOSE_ON_EXIT=FALSE;DB_CLOSE_DELAY=10</li>
+ * <li>h2.user (default: lmf)</li>
+ * <li>h2.pass (default: lmf)</li>
+ * </ul>
+ * </li>
+ * </ul>
+ * <p/>
+ * Author: Sebastian Schaffert
+
+ *
+ * @author Sebastian Schaffert (sschaffert@apache.org)
+ */
+public class PostgreSQLConcurrencyTest {
+
+
+ @Rule public ConcurrentRule concurrently = new ConcurrentRule();
+ @Rule public RepeatingRule repeatedly = new RepeatingRule();
+
+ @Rule
+ public TestWatcher watchman = new TestWatcher() {
+ /**
+ * Invoked when a test is about to start
+ */
+ @Override
+ protected void starting(Description description) {
+ logger.info("{} being run...", description.getMethodName());
+ }
+ };
+
+
+ private static KiWiDialect dialect;
+
+ private static String jdbcUrl;
+
+ private static String jdbcUser;
+
+ private static String jdbcPass;
+
+ private static Repository repository;
+
+ private static KiWiStore store;
+
+ private static Random rnd;
+
+ private static long runs = 0;
+
+ @BeforeClass
+ public static void setup() throws RepositoryException {
+ jdbcPass = System.getProperty("postgresql.pass","lmf");
+ jdbcUrl = System.getProperty("postgresql.url");
+ jdbcUser = System.getProperty("postgresql.user","lmf");
+ rnd = new Random();
+
+ dialect = new PostgreSQLDialect();
+
+ DBConnectionChecker.checkDatabaseAvailability(jdbcUrl, jdbcUser, jdbcPass, dialect);
+
+ store = new KiWiStore("test",jdbcUrl,jdbcUser,jdbcPass,dialect, "http://localhost/context/default", "http://localhost/context/inferred" );
+ repository = new SailRepository(store);
+ repository.initialize();
+ }
+
+ @AfterClass
+ public static void dropDatabase() throws RepositoryException, SQLException {
+ store.closeValueFactory(); // release all connections before dropping the database
+ store.getPersistence().dropDatabase();
+ repository.shutDown();
+ }
+
+ @Override
+ protected void finalize() throws Throwable {
+ super.finalize();
+ }
+
+ final Logger logger =
+ LoggerFactory.getLogger(RepositoryTest.class);
+
+
+ long tripleCount = 0;
+
+ @Test
+ @Concurrent(count = 10)
+ @Repeating(repetition = 10)
+ public void testConcurrency() throws Exception {
+ runs++;
+
+ // generate random nodes and triples and add them
+ RepositoryConnection con = repository.getConnection();
+ try {
+ for(int i=0; i< rnd.nextInt(1000); i++) {
+ URI subject = repository.getValueFactory().createURI("http://localhost/"+ RandomStringUtils.randomAlphanumeric(8));
+ URI predicate = repository.getValueFactory().createURI("http://localhost/"+ RandomStringUtils.randomAlphanumeric(8));
+ Value object;
+ switch(rnd.nextInt(6)) {
+ case 0: object = repository.getValueFactory().createURI("http://localhost/"+ RandomStringUtils.randomAlphanumeric(8));
+ break;
+ case 1: object = repository.getValueFactory().createBNode();
+ break;
+ case 2: object = repository.getValueFactory().createLiteral(RandomStringUtils.random(40));
+ break;
+ case 3: object = repository.getValueFactory().createLiteral(rnd.nextInt());
+ break;
+ case 4: object = repository.getValueFactory().createLiteral(rnd.nextDouble());
+ break;
+ case 5: object = repository.getValueFactory().createLiteral(rnd.nextBoolean());
+ break;
+ default: object = repository.getValueFactory().createURI("http://localhost/"+ RandomStringUtils.randomAlphanumeric(8));
+ break;
+
+ }
+ con.add(subject,predicate,object);
+ tripleCount++;
+ }
+ con.commit();
+ } finally {
+ con.close();
+ }
+
+
+ logger.info("triple count: {}", tripleCount);
+ }
+}