You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@marmotta.apache.org by ss...@apache.org on 2013/08/02 14:06:53 UTC

git commit: trying to fix the concurrency issues

Updated Branches:
  refs/heads/develop ea1db9cec -> e505371ef


trying to fix the concurrency issues


Project: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/commit/e505371e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/tree/e505371e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/diff/e505371e

Branch: refs/heads/develop
Commit: e505371ef1fc273a6fed80bc87afe6432fd087b9
Parents: ea1db9c
Author: Sebastian Schaffert <ss...@apache.org>
Authored: Fri Aug 2 14:06:46 2013 +0200
Committer: Sebastian Schaffert <ss...@apache.org>
Committed: Fri Aug 2 14:06:46 2013 +0200

----------------------------------------------------------------------
 .../kiwi/persistence/KiWiConnection.java        | 124 +++++++++++--------
 .../kiwi/persistence/KiWiGarbageCollector.java  |   2 +-
 .../marmotta/kiwi/sail/KiWiSailConnection.java  |  19 +--
 .../marmotta/kiwi/sail/KiWiValueFactory.java    |  37 ++++--
 .../kiwi/test/MySQLConcurrencyTest.java         |   2 +-
 5 files changed, 101 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/e505371e/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java
index 6001b4a..6484094 100644
--- a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java
+++ b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java
@@ -127,6 +127,13 @@ public class KiWiConnection {
     private ReentrantLock uriLock;
     private ReentrantLock bnodeLock;
 
+
+    // this set keeps track of all statements that have been deleted in the active transaction of this connection
+    // this is needed to be able to determine if adding the triple again will merely undo a deletion or is a
+    // completely new addition to the triple store
+    private HashSet<Long> deletedStatementsLog;
+
+
     public KiWiConnection(KiWiPersistence persistence, KiWiDialect dialect, KiWiCacheManager cacheManager) throws SQLException {
         this.cacheManager = cacheManager;
         this.dialect      = dialect;
@@ -136,6 +143,7 @@ public class KiWiConnection {
         this.uriLock   = new ReentrantLock();
         this.bnodeLock   = new ReentrantLock();
         this.batchCommit  = dialect.isBatchSupported();
+        this.deletedStatementsLog = new HashSet<Long>();
 
         initCachePool();
         initStatementCache();
@@ -1060,45 +1068,62 @@ public class KiWiConnection {
 
         boolean hasId = triple.getId() != null;
 
-        // retrieve a new triple ID and set it in the object
-        if(triple.getId() == null) {
-            triple.setId(getNextSequence("seq.triples"));
-        }
 
-        if(batchCommit) {
-            cacheTriple(triple);
-            synchronized (tripleBatch) {
-                tripleBatch.add(triple);
-                if(tripleBatch.size() >= batchSize) {
-                    flushBatch();
-                }
-            }
-            return !hasId;
-        }  else {
-            Preconditions.checkNotNull(triple.getSubject().getId());
-            Preconditions.checkNotNull(triple.getPredicate().getId());
-            Preconditions.checkNotNull(triple.getObject().getId());
-            Preconditions.checkNotNull(triple.getContext().getId());
+        if(hasId && deletedStatementsLog.contains(triple.getId())) {
+            // this is a hack for a concurrency problem that may occur in case the triple is removed in the
+            // transaction and then added again; in these cases the createStatement method might return
+            // an expired state of the triple because it uses its own database connection
 
+            //deletedStatementsLog.remove(triple.getId());
+            undeleteTriple(triple);
 
-            try {
-                PreparedStatement insertTriple = getPreparedStatement("store.triple");
-                insertTriple.setLong(1,triple.getId());
-                insertTriple.setLong(2,triple.getSubject().getId());
-                insertTriple.setLong(3,triple.getPredicate().getId());
-                insertTriple.setLong(4,triple.getObject().getId());
-                insertTriple.setLong(5,triple.getContext().getId());
-                insertTriple.setBoolean(6,triple.isInferred());
-                insertTriple.setTimestamp(7, new Timestamp(triple.getCreated().getTime()));
-                int count = insertTriple.executeUpdate();
-
-                cacheTriple(triple);
-
-                return count > 0;
-            } catch(SQLException ex) {
-                // this is an ugly hack to catch duplicate key errors in some databases (H2)
-                // better option could be http://stackoverflow.com/questions/6736518/h2-java-insert-ignore-allow-exception
-                return false;
+            return true;
+        } else {
+            // retrieve a new triple ID and set it in the object
+            if(triple.getId() == null) {
+                triple.setId(getNextSequence("seq.triples"));
+            }
+
+            if(batchCommit) {
+                commitLock.lock();
+                try {
+                    cacheTriple(triple);
+                    synchronized (tripleBatch) {
+                        tripleBatch.add(triple);
+                        if(tripleBatch.size() >= batchSize) {
+                            flushBatch();
+                        }
+                    }
+                } finally {
+                    commitLock.unlock();
+                }
+                return !hasId;
+            }  else {
+                Preconditions.checkNotNull(triple.getSubject().getId());
+                Preconditions.checkNotNull(triple.getPredicate().getId());
+                Preconditions.checkNotNull(triple.getObject().getId());
+                Preconditions.checkNotNull(triple.getContext().getId());
+
+
+                try {
+                    PreparedStatement insertTriple = getPreparedStatement("store.triple");
+                    insertTriple.setLong(1,triple.getId());
+                    insertTriple.setLong(2,triple.getSubject().getId());
+                    insertTriple.setLong(3,triple.getPredicate().getId());
+                    insertTriple.setLong(4,triple.getObject().getId());
+                    insertTriple.setLong(5,triple.getContext().getId());
+                    insertTriple.setBoolean(6,triple.isInferred());
+                    insertTriple.setTimestamp(7, new Timestamp(triple.getCreated().getTime()));
+                    int count = insertTriple.executeUpdate();
+
+                    cacheTriple(triple);
+
+                    return count > 0;
+                } catch(SQLException ex) {
+                    // this is an ugly hack to catch duplicate key errors in some databases (H2)
+                    // better option could be http://stackoverflow.com/questions/6736518/h2-java-insert-ignore-allow-exception
+                    return false;
+                }
             }
         }
     }
@@ -1161,20 +1186,19 @@ public class KiWiConnection {
 
         if(triple.getId() == null) {
             log.warn("attempting to remove non-persistent triple: {}", triple);
-        } else {
+        } else if(tripleBatch == null || !tripleBatch.contains(triple)) {
             requireJDBCConnection();
 
             PreparedStatement deleteTriple = getPreparedStatement("delete.triple");
             deleteTriple.setLong(1,triple.getId());
             deleteTriple.executeUpdate();
+
+            deletedStatementsLog.add(triple.getId());
+        } else {
+            tripleBatch.remove(triple);
         }
         removeCachedTriple(triple);
 
-        if(tripleBatch != null) {
-            synchronized (tripleBatch) {
-                tripleBatch.remove(triple);
-            }
-        }
     }
 
     /**
@@ -1214,15 +1238,6 @@ public class KiWiConnection {
 
 
     /**
-     * Remove from the database all triples that have been marked as deleted and are not referenced by any other
-     * entity.
-     */
-    public void cleanupTriples() {
-        throw new UnsupportedOperationException("garbage collection of triples is not yet supported!");
-    }
-
-
-    /**
      * List all contexts used in this triple store. See query.contexts .
      * @return
      * @throws SQLException
@@ -1910,6 +1925,8 @@ public class KiWiConnection {
             flushBatch();
         }
 
+        deletedStatementsLog.clear();
+
         if(connection != null) {
             connection.commit();
         }
@@ -1936,6 +1953,7 @@ public class KiWiConnection {
                 tripleBatch.clear();
             }
         }
+        deletedStatementsLog.clear();
         if(connection != null && !connection.isClosed()) {
             connection.rollback();
         }
@@ -2007,7 +2025,7 @@ public class KiWiConnection {
             commitLock.lock();
             try {
                 if(persistence.getValueFactory() != null) {
-                    persistence.getValueFactory().flushBatch(this);
+                    persistence.getValueFactory().flushBatch();
                 }
 
                 PreparedStatement insertTriple = getPreparedStatement("store.triple");
@@ -2032,6 +2050,8 @@ public class KiWiConnection {
                             insertTriple.setTimestamp(7, new Timestamp(triple.getCreated().getTime()));
 
                             insertTriple.addBatch();
+                        } else {
+                            log.error("triple batch contained a triple marked as deleted, this should not happen (triple: {})", triple);
                         }
                     }
                     tripleBatch.clear();

http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/e505371e/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiGarbageCollector.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiGarbageCollector.java b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiGarbageCollector.java
index c839d8e..74ceb4f 100644
--- a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiGarbageCollector.java
+++ b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiGarbageCollector.java
@@ -150,7 +150,7 @@ public class KiWiGarbageCollector extends Thread {
                 if(result.getString("ntype").equals("uri")) {
                     log.warn("DATABASE INCONSISTENCY: attempting to fix references for resource {}", result.getString("svalue"));
                 } else {
-                    log.warn("DATABASE INCONSISTENCY: attempting to fix references for literal or anonymous node");
+                    log.warn("DATABASE INCONSISTENCY: attempting to fix references for literal or anonymous node {}", result.getString("svalue"));
                 }
 
                 long latest_id = result.getLong(4);

http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/e505371e/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/sail/KiWiSailConnection.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/sail/KiWiSailConnection.java b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/sail/KiWiSailConnection.java
index a9a1aec..fe7176e 100644
--- a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/sail/KiWiSailConnection.java
+++ b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/sail/KiWiSailConnection.java
@@ -86,7 +86,6 @@ public class KiWiSailConnection extends NotifyingSailConnectionBase implements I
 
     private boolean triplesAdded, triplesRemoved;
 
-    private HashSet<Long> deletedStatementsLog;
 
     public KiWiSailConnection(KiWiStore sailBase) throws SailException {
         super(sailBase);
@@ -164,19 +163,9 @@ public class KiWiSailConnection extends NotifyingSailConnectionBase implements I
                 KiWiTriple triple = (KiWiTriple)valueFactory.createStatement(ksubj,kpred,kobj,kcontext, databaseConnection);
                 triple.setInferred(inferred);
 
-                if(triple.getId() != null && deletedStatementsLog.contains(triple.getId())) {
-                    // this is a hack for a concurrency problem that may occur in case the triple is removed in the
-                    // transaction and then added again; in these cases the createStatement method might return
-                    // an expired state of the triple because it uses its own database connection
-
-                    databaseConnection.undeleteTriple(triple);
+                if(databaseConnection.storeTriple(triple)) {
                     triplesAdded = true;
                     notifyStatementAdded(triple);
-                } else {
-                    if(databaseConnection.storeTriple(triple)) {
-                        triplesAdded = true;
-                        notifyStatementAdded(triple);
-                    }
                 }
 
                 added.add(triple);
@@ -344,7 +333,6 @@ public class KiWiSailConnection extends NotifyingSailConnectionBase implements I
         // nothing to do, the database transaction is started automatically
         triplesAdded = false;
         triplesRemoved = false;
-        deletedStatementsLog = new HashSet<Long>();
     }
 
     @Override
@@ -355,7 +343,6 @@ public class KiWiSailConnection extends NotifyingSailConnectionBase implements I
             throw new SailException("database error while committing transaction",e);
         }
         if(triplesAdded || triplesRemoved) {
-            deletedStatementsLog.clear();
 
             store.notifySailChanged(new SailChangedEvent() {
                 @Override
@@ -380,7 +367,6 @@ public class KiWiSailConnection extends NotifyingSailConnectionBase implements I
     protected void rollbackInternal() throws SailException {
         try {
             databaseConnection.rollback();
-            deletedStatementsLog.clear();
         } catch (SQLException e) {
             throw new SailException("database error while rolling back transaction",e);
         }
@@ -395,7 +381,6 @@ public class KiWiSailConnection extends NotifyingSailConnectionBase implements I
                 if(triple.getId() != null) {
                     databaseConnection.deleteTriple(triple);
                     triplesRemoved = true;
-                    deletedStatementsLog.add(triple.getId());
                     notifyStatementRemoved(triple);
                 }
             }
@@ -426,7 +411,6 @@ public class KiWiSailConnection extends NotifyingSailConnectionBase implements I
                 if(triple.getId() != null && triple.isInferred()) {
                     databaseConnection.deleteTriple(triple);
                     triplesRemoved = true;
-                    deletedStatementsLog.add(triple.getId());
                     notifyStatementRemoved(triple);
                 }
             }
@@ -448,7 +432,6 @@ public class KiWiSailConnection extends NotifyingSailConnectionBase implements I
             if(triple.getId() != null && triple.isInferred()) {
                 databaseConnection.deleteTriple(triple);
                 triplesRemoved = true;
-                deletedStatementsLog.add(triple.getId());
                 notifyStatementRemoved(triple);
             }
         } catch(SQLException ex) {

http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/e505371e/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/sail/KiWiValueFactory.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/sail/KiWiValueFactory.java b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/sail/KiWiValueFactory.java
index b3843b3..1c5927d 100644
--- a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/sail/KiWiValueFactory.java
+++ b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/sail/KiWiValueFactory.java
@@ -687,22 +687,12 @@ public class KiWiValueFactory implements ValueFactory {
         IntArray cacheKey = IntArray.createSPOCKey(subject, predicate, object, context);
         KiWiTriple result = (KiWiTriple)tripleRegistry.get(cacheKey);
         try {
-            if(result == null || ((KiWiTriple)result).isDeleted()) {
+            if(result == null || result.isDeleted()) {
                 KiWiResource ksubject   = convert(subject);
                 KiWiUriResource kpredicate = convert(predicate);
                 KiWiNode kobject    = convert(object);
                 KiWiResource    kcontext   = convert(context);
 
-                // test if the triple already exists in the database; if yes, return it
-                /*
-                List<Statement> triples = Iterations.asList(connection.listTriples(ksubject,kpredicate,kobject,kcontext,true));
-                if(triples.size() == 1) {
-                    result = triples.get(0);
-                } else {
-                    result = new KiWiTriple(ksubject,kpredicate,kobject,kcontext);
-                    ((KiWiTriple)result).setMarkedForReasoning(true);
-                }
-                */
                 result = new KiWiTriple(ksubject,kpredicate,kobject,kcontext);
                 result.setId(connection.getTripleId(ksubject,kpredicate,kobject,kcontext,true));
                 if(result.getId() == null) {
@@ -718,6 +708,15 @@ public class KiWiValueFactory implements ValueFactory {
         }
     }
 
+    /**
+     * Remove a statement from the triple registry. Called when the statement is deleted and the transaction commits.
+     * @param triple
+     */
+    protected void removeStatement(KiWiTriple triple) {
+        IntArray cacheKey = IntArray.createSPOCKey(triple.getSubject(), triple.getPredicate(), triple.getObject(), triple.getContext());
+        tripleRegistry.remove(cacheKey);
+        triple.setDeleted(true);
+    }
 
 
     public KiWiResource convert(Resource r) {
@@ -762,6 +761,22 @@ public class KiWiValueFactory implements ValueFactory {
         this.batchSize = batchSize;
     }
 
+
+    /**
+     * Immediately flush the batch to the database using the value factory's connection. The method expects the
+     * underlying connection to start and commit the node batch.
+     */
+    public void flushBatch() throws SQLException {
+        KiWiConnection con = aqcuireConnection();
+        try {
+            flushBatch(con);
+        } finally {
+            releaseConnection(con);
+        }
+
+    }
+
+
     /**
      * Immediately flush the batch to the database. The method expects the underlying connection to start and commit
      * the node batch.

http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/e505371e/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/MySQLConcurrencyTest.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/MySQLConcurrencyTest.java b/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/MySQLConcurrencyTest.java
index 6fbb740..e174864 100644
--- a/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/MySQLConcurrencyTest.java
+++ b/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/MySQLConcurrencyTest.java
@@ -153,7 +153,7 @@ public class MySQLConcurrencyTest {
                         break;
                     case 1: object = repository.getValueFactory().createBNode();
                         break;
-                    case 2: object = repository.getValueFactory().createLiteral(RandomStringUtils.random(40));
+                    case 2: object = repository.getValueFactory().createLiteral(RandomStringUtils.randomAscii(40)); // MySQL often has UTF problems
                         break;
                     case 3: object = repository.getValueFactory().createLiteral(rnd.nextInt());
                         break;