You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@marmotta.apache.org by ss...@apache.org on 2013/06/21 10:42:43 UTC

git commit: improved concurrency of new triplestore implementation

Updated Branches:
  refs/heads/develop a29e8a6f2 -> 85915f99d


improved concurrency of new triplestore implementation


Project: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/commit/85915f99
Tree: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/tree/85915f99
Diff: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/diff/85915f99

Branch: refs/heads/develop
Commit: 85915f99d8c15deaf5c8b924ae23f9ab3579cca7
Parents: a29e8a6
Author: Sebastian Schaffert <ss...@apache.org>
Authored: Fri Jun 21 10:42:34 2013 +0200
Committer: Sebastian Schaffert <ss...@apache.org>
Committed: Fri Jun 21 10:42:34 2013 +0200

----------------------------------------------------------------------
 libraries/kiwi/kiwi-triplestore/pom.xml         |   6 +
 .../kiwi/persistence/KiWiConnection.java        |  60 ++-----
 .../marmotta/kiwi/persistence/KiWiDialect.java  |  19 +-
 .../kiwi/persistence/KiWiPersistence.java       |  65 ++++++-
 .../apache/marmotta/kiwi/sail/KiWiStore.java    |   2 +-
 .../marmotta/kiwi/test/H2ConcurrencyTest.java   | 176 +++++++++++++++++++
 .../kiwi/test/MySQLConcurrencyTest.java         | 174 ++++++++++++++++++
 .../kiwi/test/PostgreSQLConcurrencyTest.java    | 174 ++++++++++++++++++
 8 files changed, 617 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/85915f99/libraries/kiwi/kiwi-triplestore/pom.xml
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-triplestore/pom.xml b/libraries/kiwi/kiwi-triplestore/pom.xml
index 28defe4..bbb69a9 100644
--- a/libraries/kiwi/kiwi-triplestore/pom.xml
+++ b/libraries/kiwi/kiwi-triplestore/pom.xml
@@ -176,6 +176,12 @@
             <artifactId>sesame-repository-sail</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>com.google.code.tempus-fugit</groupId>
+            <artifactId>tempus-fugit</artifactId>
+            <scope>test</scope>
+        </dependency>
+
 
     </dependencies>
     

http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/85915f99/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java
index 46b44ed..e0f7522 100644
--- a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java
+++ b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java
@@ -45,6 +45,7 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Timestamp;
 import java.util.*;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantLock;
 
 /**
@@ -1617,51 +1618,9 @@ public class KiWiConnection {
      * @throws SQLException
      */
     public long getNextSequence(String sequenceName) throws SQLException {
-        if(batchCommit && persistence.getConfiguration().isMemorySequences()) {
-            // use in-memory sequences
-            if(persistence.getMemorySequences() == null) {
-                persistence.setMemorySequences(new HashMap<String,Long>());
-            }
-
-            synchronized (persistence.getMemorySequences()) {
-                Long sequence = persistence.getMemorySequences().get(sequenceName);
-                if(sequence == null) {
-                    requireJDBCConnection();
-
-                    // load sequence value from database
-                    // if there is a preparation needed to update the transaction, run it first
-                    if(dialect.hasStatement(sequenceName+".prep")) {
-                        PreparedStatement prepNodeId = getPreparedStatement(sequenceName+".prep");
-                        prepNodeId.executeUpdate();
-                    }
-
-                    PreparedStatement queryNodeId = getPreparedStatement(sequenceName);
-                    ResultSet resultNodeId = queryNodeId.executeQuery();
-                    try {
-                        if(resultNodeId.next()) {
-                            sequence = resultNodeId.getLong(1);
-                        } else {
-                            throw new SQLException("the sequence did not return a new value");
-                        }
-                    } finally {
-                        resultNodeId.close();
-                    }
-
-                    // this is a very ugly hack needed by MySQL because the sequences are not atomic
-                    // it is only necessary for the nodes sequence, because the value factory always keeps
-                    // a connection open to it; we could solve it maybe differently by remembering the
-                    // connection responsible for a in-memory sequence...
-                    if(sequenceName.equals("seq.nodes") && dialect instanceof MySQLDialect) {
-                        connection.commit();
-                    }
-
-                } else {
-                    sequence += 1;
-                }
-                persistence.getMemorySequences().put(sequenceName,sequence);
-                return sequence;
-            }
-
+        if(batchCommit && persistence.getConfiguration().isMemorySequences() && persistence.getMemorySequences().containsKey(sequenceName)) {
+            AtomicLong sequence = persistence.getMemorySequences().get(sequenceName);
+            return sequence.incrementAndGet();
         } else {
             requireJDBCConnection();
 
@@ -1879,10 +1838,10 @@ public class KiWiConnection {
             requireJDBCConnection();
 
             try {
-                synchronized (persistence.getMemorySequences()) {
-                    for(Map.Entry<String,Long> entry : persistence.getMemorySequences().entrySet()) {
+                for(Map.Entry<String,AtomicLong> entry : persistence.getMemorySequences().entrySet()) {
+                    if( entry.getValue().get() > 0) {
                         PreparedStatement updateSequence = getPreparedStatement(entry.getKey()+".set");
-                        updateSequence.setLong(1, entry.getValue());
+                        updateSequence.setLong(1, entry.getValue().get());
                         if(updateSequence.execute()) {
                             updateSequence.getResultSet().close();
                         } else {
@@ -2028,8 +1987,11 @@ public class KiWiConnection {
                 }
                 insertTriple.executeBatch();
 
-            } catch (Throwable ex) {
+            } catch (SQLException ex) {
+                System.err.println("main exception:");
                 ex.printStackTrace();
+                System.err.println("next exception:");
+                ex.getNextException().printStackTrace();
                 throw ex;
             }  finally {
                 commitLock.unlock();

http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/85915f99/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiDialect.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiDialect.java b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiDialect.java
index bd4879b..dfde5cb 100644
--- a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiDialect.java
+++ b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiDialect.java
@@ -22,9 +22,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.net.URL;
-import java.util.Enumeration;
-import java.util.Properties;
-import java.util.Set;
+import java.util.*;
 
 /**
  * A dialect provides the SQL statements necessary to access the different types of database systems. Each
@@ -159,6 +157,21 @@ public abstract class KiWiDialect {
         return statements.stringPropertyNames();
     }
 
+    /**
+     * Return the names of all sequences that have been configured in the system, i.e. all statements starting with "seq."
+     * @return
+     */
+    public Set<String> listSequences() {
+        Set<String> names = new HashSet<String>();
+        Enumeration e = statements.propertyNames();
+        while(e.hasMoreElements()) {
+            String[] keys = e.nextElement().toString().split("\\.");
+            if(keys[0].equals("seq")) {
+                names.add(keys[0] + "." + keys[1]);
+            }
+        }
+        return names;
+    }
 
     /**
      * Return the database specific operator for matching a text against a regular expression.

http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/85915f99/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiPersistence.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiPersistence.java b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiPersistence.java
index b1f4a4d..eab9729 100644
--- a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiPersistence.java
+++ b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiPersistence.java
@@ -22,6 +22,7 @@ import org.apache.marmotta.kiwi.config.KiWiConfiguration;
 import org.apache.marmotta.kiwi.model.rdf.KiWiNode;
 import org.apache.marmotta.kiwi.model.rdf.KiWiResource;
 import org.apache.marmotta.kiwi.model.rdf.KiWiUriResource;
+import org.apache.marmotta.kiwi.persistence.mysql.MySQLDialect;
 import org.apache.marmotta.kiwi.persistence.util.ScriptRunner;
 import org.apache.marmotta.kiwi.sail.KiWiValueFactory;
 import org.apache.tomcat.jdbc.pool.DataSource;
@@ -35,11 +36,15 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.io.StringReader;
 import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
 import java.util.WeakHashMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * Add file description here!
@@ -73,7 +78,7 @@ public class KiWiPersistence {
      * A map holding in-memory sequences to be used for sequence caching in case the appropriate configuration option
      * is configued and batched commits are enabled.
      */
-    private Map<String,Long> memorySequences;
+    private Map<String,AtomicLong> memorySequences;
 
 
     /**
@@ -138,13 +143,13 @@ public class KiWiPersistence {
         if(configuration.isQueryLoggingEnabled()) {
             poolConfig.setJdbcInterceptors(
                     "org.apache.tomcat.jdbc.pool.interceptor.ConnectionState;"   +
-                    "org.apache.tomcat.jdbc.pool.interceptor.StatementFinalizer;" +
-                    "org.apache.tomcat.jdbc.pool.interceptor.SlowQueryReport"
+                            "org.apache.tomcat.jdbc.pool.interceptor.StatementFinalizer;" +
+                            "org.apache.tomcat.jdbc.pool.interceptor.SlowQueryReport"
             );
         } else {
             poolConfig.setJdbcInterceptors(
                     "org.apache.tomcat.jdbc.pool.interceptor.ConnectionState;"   +
-                    "org.apache.tomcat.jdbc.pool.interceptor.StatementFinalizer"
+                            "org.apache.tomcat.jdbc.pool.interceptor.StatementFinalizer"
             );
         }
 
@@ -170,6 +175,49 @@ public class KiWiPersistence {
 
     }
 
+    /**
+     * Initialise in-memory sequences if the feature is enabled.
+     */
+    public void initSequences() {
+        if(configuration.isBatchCommit() && configuration.isMemorySequences()) {
+            memorySequences = new ConcurrentHashMap<String,AtomicLong>();
+
+            try {
+                Connection con = getJDBCConnection();
+                try {
+                    for(String sequenceName : getDialect().listSequences()) {
+
+                        // load sequence value from database
+                        // if there is a preparation needed to update the transaction, run it first
+                        if(getDialect().hasStatement(sequenceName+".prep")) {
+                            PreparedStatement prepNodeId = con.prepareStatement(getDialect().getStatement(sequenceName+".prep"));
+                            prepNodeId.executeUpdate();
+                            prepNodeId.close();
+                        }
+
+                        PreparedStatement queryNodeId = con.prepareStatement(getDialect().getStatement(sequenceName));
+                        ResultSet resultNodeId = queryNodeId.executeQuery();
+                        try {
+                            if(resultNodeId.next()) {
+                                memorySequences.put(sequenceName,new AtomicLong(resultNodeId.getLong(1)-1));
+                            } else {
+                                throw new SQLException("the sequence did not return a new value");
+                            }
+                        } finally {
+                            resultNodeId.close();
+                        }
+
+                        con.commit();
+                    }
+                } finally {
+                    con.close();
+                }
+            } catch(SQLException ex) {
+                log.warn("database error: could not initialise in-memory sequences",ex);
+            }
+        }
+    }
+
     public void logPoolInfo() throws SQLException {
         log.debug("num_busy_connections:    {}", connectionPool.getNumActive());
         log.debug("num_idle_connections:    {}", connectionPool.getNumIdle());
@@ -236,6 +284,9 @@ public class KiWiPersistence {
         } finally {
             connection.close();
         }
+
+        // init the in-memory sequences
+        initSequences();
     }
 
     /**
@@ -453,11 +504,13 @@ public class KiWiPersistence {
         return configuration;
     }
 
-    public Map<String, Long> getMemorySequences() {
+    public Map<String, AtomicLong> getMemorySequences() {
         return memorySequences;
     }
 
-    public void setMemorySequences(Map<String, Long> memorySequences) {
+    public void setMemorySequences(Map<String, AtomicLong> memorySequences) {
         this.memorySequences = memorySequences;
     }
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/85915f99/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/sail/KiWiStore.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/sail/KiWiStore.java b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/sail/KiWiStore.java
index 47ffccb..1438946 100644
--- a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/sail/KiWiStore.java
+++ b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/sail/KiWiStore.java
@@ -205,7 +205,7 @@ public class KiWiStore extends NotifyingSailBase {
      * @return a ValueFactory object for this Sail object.
      */
     @Override
-    public ValueFactory getValueFactory() {
+    public synchronized ValueFactory getValueFactory() {
         if(repositoryValueFactory == null) {
             repositoryValueFactory = new KiWiValueFactory(this,  defaultContext);
             persistence.setValueFactory(repositoryValueFactory);

http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/85915f99/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/H2ConcurrencyTest.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/H2ConcurrencyTest.java b/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/H2ConcurrencyTest.java
new file mode 100644
index 0000000..a51dbc0
--- /dev/null
+++ b/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/H2ConcurrencyTest.java
@@ -0,0 +1,176 @@
+package org.apache.marmotta.kiwi.test;
+
+import com.google.code.tempusfugit.concurrency.ConcurrentRule;
+import com.google.code.tempusfugit.concurrency.RepeatingRule;
+import com.google.code.tempusfugit.concurrency.annotations.Concurrent;
+import com.google.code.tempusfugit.concurrency.annotations.Repeating;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.marmotta.kiwi.persistence.KiWiDialect;
+import org.apache.marmotta.kiwi.persistence.h2.H2Dialect;
+import org.apache.marmotta.kiwi.persistence.mysql.MySQLDialect;
+import org.apache.marmotta.kiwi.persistence.pgsql.PostgreSQLDialect;
+import org.apache.marmotta.kiwi.sail.KiWiStore;
+import org.apache.marmotta.kiwi.test.helper.DBConnectionChecker;
+import org.junit.*;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.openrdf.model.URI;
+import org.openrdf.model.Value;
+import org.openrdf.repository.Repository;
+import org.openrdf.repository.RepositoryConnection;
+import org.openrdf.repository.RepositoryException;
+import org.openrdf.repository.sail.SailRepository;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * This test starts many triplestore operations in parallel to check if concurrent operations will break things,
+ *
+ * It will try running over all available databases. Except for in-memory databases like H2 or Derby, database URLs must be passed as
+ * system property, or otherwise the test is skipped for this database. Available system properties:
+ * <ul>
+ *     <li>PostgreSQL:
+ *     <ul>
+ *         <li>postgresql.url, e.g. jdbc:postgresql://localhost:5433/kiwitest?prepareThreshold=3</li>
+ *         <li>postgresql.user (default: lmf)</li>
+ *         <li>postgresql.pass (default: lmf)</li>
+ *     </ul>
+ *     </li>
+ *     <li>MySQL:
+ *     <ul>
+ *         <li>mysql.url, e.g. jdbc:mysql://localhost:3306/kiwitest?characterEncoding=utf8&zeroDateTimeBehavior=convertToNull</li>
+ *         <li>mysql.user (default: lmf)</li>
+ *         <li>mysql.pass (default: lmf)</li>
+ *     </ul>
+ *     </li>
+ *     <li>H2:
+ *     <ul>
+ *         <li>h2.url, e.g. jdbc:h2:mem;MVCC=true;DB_CLOSE_ON_EXIT=FALSE;DB_CLOSE_DELAY=10</li>
+ *         <li>h2.user (default: lmf)</li>
+ *         <li>h2.pass (default: lmf)</li>
+ *     </ul>
+ *     </li>
+ * </ul>
+ * <p/>
+ * Author: Sebastian Schaffert
+
+ *
+ * @author Sebastian Schaffert (sschaffert@apache.org)
+ */
+public class H2ConcurrencyTest {
+
+
+    @Rule public ConcurrentRule concurrently = new ConcurrentRule();
+    @Rule public RepeatingRule repeatedly = new RepeatingRule();
+
+    @Rule
+    public TestWatcher watchman = new TestWatcher() {
+        /**
+         * Invoked when a test is about to start
+         */
+        @Override
+        protected void starting(Description description) {
+            logger.info("{} being run...", description.getMethodName());
+        }
+    };
+
+
+    private static KiWiDialect dialect;
+
+    private static String jdbcUrl;
+
+    private static String jdbcUser;
+
+    private static String jdbcPass;
+
+    private static Repository repository;
+
+    private static KiWiStore store;
+
+    private static Random rnd;
+
+    private static long runs = 0;
+
+    @BeforeClass
+    public static void setup() throws RepositoryException {
+        jdbcPass = System.getProperty("h2.pass","lmf");
+        jdbcUrl = System.getProperty("h2.url");
+        jdbcUser = System.getProperty("h2.user","lmf");
+        rnd = new Random();
+
+        dialect = new H2Dialect();
+
+        DBConnectionChecker.checkDatabaseAvailability(jdbcUrl, jdbcUser, jdbcPass, dialect);
+
+        store = new KiWiStore("test",jdbcUrl,jdbcUser,jdbcPass,dialect, "http://localhost/context/default", "http://localhost/context/inferred" );
+        repository = new SailRepository(store);
+        repository.initialize();
+    }
+
+    @AfterClass
+    public static void dropDatabase() throws RepositoryException, SQLException {
+        store.closeValueFactory(); // release all connections before dropping the database
+        store.getPersistence().dropDatabase();
+        repository.shutDown();
+    }
+
+    @Override
+    protected void finalize() throws Throwable {
+        super.finalize();
+    }
+
+    final Logger logger =
+            LoggerFactory.getLogger(RepositoryTest.class);
+
+
+    long tripleCount = 0;
+
+    @Test
+    @Concurrent(count = 10)
+    @Repeating(repetition = 10)
+    public void testConcurrency() throws Exception {
+        runs++;
+
+        // generate random nodes and triples and add them
+        RepositoryConnection con = repository.getConnection();
+        try {
+            for(int i=0; i< rnd.nextInt(1000); i++) {
+                URI subject = repository.getValueFactory().createURI("http://localhost/"+ RandomStringUtils.randomAlphanumeric(8));
+                URI predicate = repository.getValueFactory().createURI("http://localhost/"+ RandomStringUtils.randomAlphanumeric(8));
+                Value object;
+                switch(rnd.nextInt(6)) {
+                    case 0: object = repository.getValueFactory().createURI("http://localhost/"+ RandomStringUtils.randomAlphanumeric(8));
+                        break;
+                    case 1: object = repository.getValueFactory().createBNode();
+                        break;
+                    case 2: object = repository.getValueFactory().createLiteral(RandomStringUtils.random(40));
+                        break;
+                    case 3: object = repository.getValueFactory().createLiteral(rnd.nextInt());
+                        break;
+                    case 4: object = repository.getValueFactory().createLiteral(rnd.nextDouble());
+                        break;
+                    case 5: object = repository.getValueFactory().createLiteral(rnd.nextBoolean());
+                        break;
+                    default: object = repository.getValueFactory().createURI("http://localhost/"+ RandomStringUtils.randomAlphanumeric(8));
+                        break;
+
+                }
+                con.add(subject,predicate,object);
+                tripleCount++;
+            }
+            con.commit();
+        } finally {
+            con.close();
+        }
+
+
+        logger.info("triple count: {}", tripleCount);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/85915f99/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/MySQLConcurrencyTest.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/MySQLConcurrencyTest.java b/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/MySQLConcurrencyTest.java
new file mode 100644
index 0000000..6b543c4
--- /dev/null
+++ b/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/MySQLConcurrencyTest.java
@@ -0,0 +1,174 @@
+package org.apache.marmotta.kiwi.test;
+
+import com.google.code.tempusfugit.concurrency.ConcurrentRule;
+import com.google.code.tempusfugit.concurrency.RepeatingRule;
+import com.google.code.tempusfugit.concurrency.annotations.Concurrent;
+import com.google.code.tempusfugit.concurrency.annotations.Repeating;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.marmotta.kiwi.persistence.KiWiDialect;
+import org.apache.marmotta.kiwi.persistence.h2.H2Dialect;
+import org.apache.marmotta.kiwi.persistence.mysql.MySQLDialect;
+import org.apache.marmotta.kiwi.sail.KiWiStore;
+import org.apache.marmotta.kiwi.test.helper.DBConnectionChecker;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+import org.openrdf.model.URI;
+import org.openrdf.model.Value;
+import org.openrdf.repository.Repository;
+import org.openrdf.repository.RepositoryConnection;
+import org.openrdf.repository.RepositoryException;
+import org.openrdf.repository.sail.SailRepository;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.SQLException;
+import java.util.Random;
+
+/**
+ * This test starts many triplestore operations in parallel to check if concurrent operations will break things,
+ *
+ * It will try running over all available databases. Except for in-memory databases like H2 or Derby, database URLs must be passed as
+ * system property, or otherwise the test is skipped for this database. Available system properties:
+ * <ul>
+ *     <li>PostgreSQL:
+ *     <ul>
+ *         <li>postgresql.url, e.g. jdbc:postgresql://localhost:5433/kiwitest?prepareThreshold=3</li>
+ *         <li>postgresql.user (default: lmf)</li>
+ *         <li>postgresql.pass (default: lmf)</li>
+ *     </ul>
+ *     </li>
+ *     <li>MySQL:
+ *     <ul>
+ *         <li>mysql.url, e.g. jdbc:mysql://localhost:3306/kiwitest?characterEncoding=utf8&zeroDateTimeBehavior=convertToNull</li>
+ *         <li>mysql.user (default: lmf)</li>
+ *         <li>mysql.pass (default: lmf)</li>
+ *     </ul>
+ *     </li>
+ *     <li>H2:
+ *     <ul>
+ *         <li>h2.url, e.g. jdbc:h2:mem;MVCC=true;DB_CLOSE_ON_EXIT=FALSE;DB_CLOSE_DELAY=10</li>
+ *         <li>h2.user (default: lmf)</li>
+ *         <li>h2.pass (default: lmf)</li>
+ *     </ul>
+ *     </li>
+ * </ul>
+ * <p/>
+ * Author: Sebastian Schaffert
+
+ *
+ * @author Sebastian Schaffert (sschaffert@apache.org)
+ */
+public class MySQLConcurrencyTest {
+
+
+    @Rule public ConcurrentRule concurrently = new ConcurrentRule();
+    @Rule public RepeatingRule repeatedly = new RepeatingRule();
+
+    @Rule
+    public TestWatcher watchman = new TestWatcher() {
+        /**
+         * Invoked when a test is about to start
+         */
+        @Override
+        protected void starting(Description description) {
+            logger.info("{} being run...", description.getMethodName());
+        }
+    };
+
+
+    private static KiWiDialect dialect;
+
+    private static String jdbcUrl;
+
+    private static String jdbcUser;
+
+    private static String jdbcPass;
+
+    private static Repository repository;
+
+    private static KiWiStore store;
+
+    private static Random rnd;
+
+    private static long runs = 0;
+
+    @BeforeClass
+    public static void setup() throws RepositoryException {
+        jdbcPass = System.getProperty("mysql.pass","lmf");
+        jdbcUrl = System.getProperty("mysql.url");
+        jdbcUser = System.getProperty("mysql.user","lmf");
+        rnd = new Random();
+
+        dialect = new MySQLDialect();
+
+        DBConnectionChecker.checkDatabaseAvailability(jdbcUrl, jdbcUser, jdbcPass, dialect);
+
+        store = new KiWiStore("test",jdbcUrl,jdbcUser,jdbcPass,dialect, "http://localhost/context/default", "http://localhost/context/inferred" );
+        repository = new SailRepository(store);
+        repository.initialize();
+    }
+
+    @AfterClass
+    public static void dropDatabase() throws RepositoryException, SQLException {
+        store.closeValueFactory(); // release all connections before dropping the database
+        store.getPersistence().dropDatabase();
+        repository.shutDown();
+    }
+
+    @Override
+    protected void finalize() throws Throwable {
+        super.finalize();
+    }
+
+    final Logger logger =
+            LoggerFactory.getLogger(RepositoryTest.class);
+
+
+    long tripleCount = 0;
+
+    @Test
+    @Concurrent(count = 10)
+    @Repeating(repetition = 10)
+    public void testConcurrency() throws Exception {
+        runs++;
+
+        // generate random nodes and triples and add them
+        RepositoryConnection con = repository.getConnection();
+        try {
+            for(int i=0; i< rnd.nextInt(1000); i++) {
+                URI subject = repository.getValueFactory().createURI("http://localhost/"+ RandomStringUtils.randomAlphanumeric(8));
+                URI predicate = repository.getValueFactory().createURI("http://localhost/"+ RandomStringUtils.randomAlphanumeric(8));
+                Value object;
+                switch(rnd.nextInt(6)) {
+                    case 0: object = repository.getValueFactory().createURI("http://localhost/"+ RandomStringUtils.randomAlphanumeric(8));
+                        break;
+                    case 1: object = repository.getValueFactory().createBNode();
+                        break;
+                    case 2: object = repository.getValueFactory().createLiteral(RandomStringUtils.random(40));
+                        break;
+                    case 3: object = repository.getValueFactory().createLiteral(rnd.nextInt());
+                        break;
+                    case 4: object = repository.getValueFactory().createLiteral(rnd.nextDouble());
+                        break;
+                    case 5: object = repository.getValueFactory().createLiteral(rnd.nextBoolean());
+                        break;
+                    default: object = repository.getValueFactory().createURI("http://localhost/"+ RandomStringUtils.randomAlphanumeric(8));
+                        break;
+
+                }
+                con.add(subject,predicate,object);
+                tripleCount++;
+            }
+            con.commit();
+        } finally {
+            con.close();
+        }
+
+
+        logger.info("triple count: {}", tripleCount);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/85915f99/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/PostgreSQLConcurrencyTest.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/PostgreSQLConcurrencyTest.java b/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/PostgreSQLConcurrencyTest.java
new file mode 100644
index 0000000..414d179
--- /dev/null
+++ b/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/PostgreSQLConcurrencyTest.java
@@ -0,0 +1,174 @@
+package org.apache.marmotta.kiwi.test;
+
+import com.google.code.tempusfugit.concurrency.ConcurrentRule;
+import com.google.code.tempusfugit.concurrency.RepeatingRule;
+import com.google.code.tempusfugit.concurrency.annotations.Concurrent;
+import com.google.code.tempusfugit.concurrency.annotations.Repeating;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.marmotta.kiwi.persistence.KiWiDialect;
+import org.apache.marmotta.kiwi.persistence.mysql.MySQLDialect;
+import org.apache.marmotta.kiwi.persistence.pgsql.PostgreSQLDialect;
+import org.apache.marmotta.kiwi.sail.KiWiStore;
+import org.apache.marmotta.kiwi.test.helper.DBConnectionChecker;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+import org.openrdf.model.URI;
+import org.openrdf.model.Value;
+import org.openrdf.repository.Repository;
+import org.openrdf.repository.RepositoryConnection;
+import org.openrdf.repository.RepositoryException;
+import org.openrdf.repository.sail.SailRepository;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.SQLException;
+import java.util.Random;
+
+/**
+ * This test starts many triplestore operations in parallel to check if concurrent operations will break things,
+ *
+ * It will try running over all available databases. Except for in-memory databases like H2 or Derby, database URLs must be passed as
+ * system property, or otherwise the test is skipped for this database. Available system properties:
+ * <ul>
+ *     <li>PostgreSQL:
+ *     <ul>
+ *         <li>postgresql.url, e.g. jdbc:postgresql://localhost:5433/kiwitest?prepareThreshold=3</li>
+ *         <li>postgresql.user (default: lmf)</li>
+ *         <li>postgresql.pass (default: lmf)</li>
+ *     </ul>
+ *     </li>
+ *     <li>MySQL:
+ *     <ul>
+ *         <li>mysql.url, e.g. jdbc:mysql://localhost:3306/kiwitest?characterEncoding=utf8&zeroDateTimeBehavior=convertToNull</li>
+ *         <li>mysql.user (default: lmf)</li>
+ *         <li>mysql.pass (default: lmf)</li>
+ *     </ul>
+ *     </li>
+ *     <li>H2:
+ *     <ul>
+ *         <li>h2.url, e.g. jdbc:h2:mem;MVCC=true;DB_CLOSE_ON_EXIT=FALSE;DB_CLOSE_DELAY=10</li>
+ *         <li>h2.user (default: lmf)</li>
+ *         <li>h2.pass (default: lmf)</li>
+ *     </ul>
+ *     </li>
+ * </ul>
+ * <p/>
+ * Author: Sebastian Schaffert
+
+ *
+ * @author Sebastian Schaffert (sschaffert@apache.org)
+ */
+public class PostgreSQLConcurrencyTest {
+
+
+    @Rule public ConcurrentRule concurrently = new ConcurrentRule();
+    @Rule public RepeatingRule repeatedly = new RepeatingRule();
+
+    @Rule
+    public TestWatcher watchman = new TestWatcher() {
+        /**
+         * Invoked when a test is about to start
+         */
+        @Override
+        protected void starting(Description description) {
+            logger.info("{} being run...", description.getMethodName());
+        }
+    };
+
+
+    private static KiWiDialect dialect;
+
+    private static String jdbcUrl;
+
+    private static String jdbcUser;
+
+    private static String jdbcPass;
+
+    private static Repository repository;
+
+    private static KiWiStore store;
+
+    private static Random rnd;
+
+    private static long runs = 0;
+
+    @BeforeClass
+    public static void setup() throws RepositoryException {
+        jdbcPass = System.getProperty("postgresql.pass","lmf");
+        jdbcUrl = System.getProperty("postgresql.url");
+        jdbcUser = System.getProperty("postgresql.user","lmf");
+        rnd = new Random();
+
+        dialect = new PostgreSQLDialect();
+
+        DBConnectionChecker.checkDatabaseAvailability(jdbcUrl, jdbcUser, jdbcPass, dialect);
+
+        store = new KiWiStore("test",jdbcUrl,jdbcUser,jdbcPass,dialect, "http://localhost/context/default", "http://localhost/context/inferred" );
+        repository = new SailRepository(store);
+        repository.initialize();
+    }
+
+    @AfterClass
+    public static void dropDatabase() throws RepositoryException, SQLException {
+        store.closeValueFactory(); // release all connections before dropping the database
+        store.getPersistence().dropDatabase();
+        repository.shutDown();
+    }
+
+    @Override
+    protected void finalize() throws Throwable {
+        super.finalize();
+    }
+
+    final Logger logger =
+            LoggerFactory.getLogger(RepositoryTest.class);
+
+
+    long tripleCount = 0;
+
+    @Test
+    @Concurrent(count = 10)
+    @Repeating(repetition = 10)
+    public void testConcurrency() throws Exception {
+        runs++;
+
+        // generate random nodes and triples and add them
+        RepositoryConnection con = repository.getConnection();
+        try {
+            for(int i=0; i< rnd.nextInt(1000); i++) {
+                URI subject = repository.getValueFactory().createURI("http://localhost/"+ RandomStringUtils.randomAlphanumeric(8));
+                URI predicate = repository.getValueFactory().createURI("http://localhost/"+ RandomStringUtils.randomAlphanumeric(8));
+                Value object;
+                switch(rnd.nextInt(6)) {
+                    case 0: object = repository.getValueFactory().createURI("http://localhost/"+ RandomStringUtils.randomAlphanumeric(8));
+                        break;
+                    case 1: object = repository.getValueFactory().createBNode();
+                        break;
+                    case 2: object = repository.getValueFactory().createLiteral(RandomStringUtils.random(40));
+                        break;
+                    case 3: object = repository.getValueFactory().createLiteral(rnd.nextInt());
+                        break;
+                    case 4: object = repository.getValueFactory().createLiteral(rnd.nextDouble());
+                        break;
+                    case 5: object = repository.getValueFactory().createLiteral(rnd.nextBoolean());
+                        break;
+                    default: object = repository.getValueFactory().createURI("http://localhost/"+ RandomStringUtils.randomAlphanumeric(8));
+                        break;
+
+                }
+                con.add(subject,predicate,object);
+                tripleCount++;
+            }
+            con.commit();
+        } finally {
+            con.close();
+        }
+
+
+        logger.info("triple count: {}", tripleCount);
+    }
+}