You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@marmotta.apache.org by ss...@apache.org on 2013/08/02 14:06:53 UTC
git commit: trying to fix the concurrency issues
Updated Branches:
refs/heads/develop ea1db9cec -> e505371ef
trying to fix the concurrency issues
Project: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/commit/e505371e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/tree/e505371e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/diff/e505371e
Branch: refs/heads/develop
Commit: e505371ef1fc273a6fed80bc87afe6432fd087b9
Parents: ea1db9c
Author: Sebastian Schaffert <ss...@apache.org>
Authored: Fri Aug 2 14:06:46 2013 +0200
Committer: Sebastian Schaffert <ss...@apache.org>
Committed: Fri Aug 2 14:06:46 2013 +0200
----------------------------------------------------------------------
.../kiwi/persistence/KiWiConnection.java | 124 +++++++++++--------
.../kiwi/persistence/KiWiGarbageCollector.java | 2 +-
.../marmotta/kiwi/sail/KiWiSailConnection.java | 19 +--
.../marmotta/kiwi/sail/KiWiValueFactory.java | 37 ++++--
.../kiwi/test/MySQLConcurrencyTest.java | 2 +-
5 files changed, 101 insertions(+), 83 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/e505371e/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java
index 6001b4a..6484094 100644
--- a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java
+++ b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java
@@ -127,6 +127,13 @@ public class KiWiConnection {
private ReentrantLock uriLock;
private ReentrantLock bnodeLock;
+
+ // this set keeps track of all statements that have been deleted in the active transaction of this connection
+ // this is needed to be able to determine if adding the triple again will merely undo a deletion or is a
+ // completely new addition to the triple store
+ private HashSet<Long> deletedStatementsLog;
+
+
public KiWiConnection(KiWiPersistence persistence, KiWiDialect dialect, KiWiCacheManager cacheManager) throws SQLException {
this.cacheManager = cacheManager;
this.dialect = dialect;
@@ -136,6 +143,7 @@ public class KiWiConnection {
this.uriLock = new ReentrantLock();
this.bnodeLock = new ReentrantLock();
this.batchCommit = dialect.isBatchSupported();
+ this.deletedStatementsLog = new HashSet<Long>();
initCachePool();
initStatementCache();
@@ -1060,45 +1068,62 @@ public class KiWiConnection {
boolean hasId = triple.getId() != null;
- // retrieve a new triple ID and set it in the object
- if(triple.getId() == null) {
- triple.setId(getNextSequence("seq.triples"));
- }
- if(batchCommit) {
- cacheTriple(triple);
- synchronized (tripleBatch) {
- tripleBatch.add(triple);
- if(tripleBatch.size() >= batchSize) {
- flushBatch();
- }
- }
- return !hasId;
- } else {
- Preconditions.checkNotNull(triple.getSubject().getId());
- Preconditions.checkNotNull(triple.getPredicate().getId());
- Preconditions.checkNotNull(triple.getObject().getId());
- Preconditions.checkNotNull(triple.getContext().getId());
+ if(hasId && deletedStatementsLog.contains(triple.getId())) {
+ // this is a hack for a concurrency problem that may occur in case the triple is removed in the
+ // transaction and then added again; in these cases the createStatement method might return
+ // an expired state of the triple because it uses its own database connection
+ //deletedStatementsLog.remove(triple.getId());
+ undeleteTriple(triple);
- try {
- PreparedStatement insertTriple = getPreparedStatement("store.triple");
- insertTriple.setLong(1,triple.getId());
- insertTriple.setLong(2,triple.getSubject().getId());
- insertTriple.setLong(3,triple.getPredicate().getId());
- insertTriple.setLong(4,triple.getObject().getId());
- insertTriple.setLong(5,triple.getContext().getId());
- insertTriple.setBoolean(6,triple.isInferred());
- insertTriple.setTimestamp(7, new Timestamp(triple.getCreated().getTime()));
- int count = insertTriple.executeUpdate();
-
- cacheTriple(triple);
-
- return count > 0;
- } catch(SQLException ex) {
- // this is an ugly hack to catch duplicate key errors in some databases (H2)
- // better option could be http://stackoverflow.com/questions/6736518/h2-java-insert-ignore-allow-exception
- return false;
+ return true;
+ } else {
+ // retrieve a new triple ID and set it in the object
+ if(triple.getId() == null) {
+ triple.setId(getNextSequence("seq.triples"));
+ }
+
+ if(batchCommit) {
+ commitLock.lock();
+ try {
+ cacheTriple(triple);
+ synchronized (tripleBatch) {
+ tripleBatch.add(triple);
+ if(tripleBatch.size() >= batchSize) {
+ flushBatch();
+ }
+ }
+ } finally {
+ commitLock.unlock();
+ }
+ return !hasId;
+ } else {
+ Preconditions.checkNotNull(triple.getSubject().getId());
+ Preconditions.checkNotNull(triple.getPredicate().getId());
+ Preconditions.checkNotNull(triple.getObject().getId());
+ Preconditions.checkNotNull(triple.getContext().getId());
+
+
+ try {
+ PreparedStatement insertTriple = getPreparedStatement("store.triple");
+ insertTriple.setLong(1,triple.getId());
+ insertTriple.setLong(2,triple.getSubject().getId());
+ insertTriple.setLong(3,triple.getPredicate().getId());
+ insertTriple.setLong(4,triple.getObject().getId());
+ insertTriple.setLong(5,triple.getContext().getId());
+ insertTriple.setBoolean(6,triple.isInferred());
+ insertTriple.setTimestamp(7, new Timestamp(triple.getCreated().getTime()));
+ int count = insertTriple.executeUpdate();
+
+ cacheTriple(triple);
+
+ return count > 0;
+ } catch(SQLException ex) {
+ // this is an ugly hack to catch duplicate key errors in some databases (H2)
+ // better option could be http://stackoverflow.com/questions/6736518/h2-java-insert-ignore-allow-exception
+ return false;
+ }
}
}
}
@@ -1161,20 +1186,19 @@ public class KiWiConnection {
if(triple.getId() == null) {
log.warn("attempting to remove non-persistent triple: {}", triple);
- } else {
+ } else if(tripleBatch == null || !tripleBatch.contains(triple)) {
requireJDBCConnection();
PreparedStatement deleteTriple = getPreparedStatement("delete.triple");
deleteTriple.setLong(1,triple.getId());
deleteTriple.executeUpdate();
+
+ deletedStatementsLog.add(triple.getId());
+ } else {
+ tripleBatch.remove(triple);
}
removeCachedTriple(triple);
- if(tripleBatch != null) {
- synchronized (tripleBatch) {
- tripleBatch.remove(triple);
- }
- }
}
/**
@@ -1214,15 +1238,6 @@ public class KiWiConnection {
/**
- * Remove from the database all triples that have been marked as deleted and are not referenced by any other
- * entity.
- */
- public void cleanupTriples() {
- throw new UnsupportedOperationException("garbage collection of triples is not yet supported!");
- }
-
-
- /**
* List all contexts used in this triple store. See query.contexts .
* @return
* @throws SQLException
@@ -1910,6 +1925,8 @@ public class KiWiConnection {
flushBatch();
}
+ deletedStatementsLog.clear();
+
if(connection != null) {
connection.commit();
}
@@ -1936,6 +1953,7 @@ public class KiWiConnection {
tripleBatch.clear();
}
}
+ deletedStatementsLog.clear();
if(connection != null && !connection.isClosed()) {
connection.rollback();
}
@@ -2007,7 +2025,7 @@ public class KiWiConnection {
commitLock.lock();
try {
if(persistence.getValueFactory() != null) {
- persistence.getValueFactory().flushBatch(this);
+ persistence.getValueFactory().flushBatch();
}
PreparedStatement insertTriple = getPreparedStatement("store.triple");
@@ -2032,6 +2050,8 @@ public class KiWiConnection {
insertTriple.setTimestamp(7, new Timestamp(triple.getCreated().getTime()));
insertTriple.addBatch();
+ } else {
+ log.error("triple batch contained a triple marked as deleted, this should not happen (triple: {})", triple);
}
}
tripleBatch.clear();
http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/e505371e/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiGarbageCollector.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiGarbageCollector.java b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiGarbageCollector.java
index c839d8e..74ceb4f 100644
--- a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiGarbageCollector.java
+++ b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiGarbageCollector.java
@@ -150,7 +150,7 @@ public class KiWiGarbageCollector extends Thread {
if(result.getString("ntype").equals("uri")) {
log.warn("DATABASE INCONSISTENCY: attempting to fix references for resource {}", result.getString("svalue"));
} else {
- log.warn("DATABASE INCONSISTENCY: attempting to fix references for literal or anonymous node");
+ log.warn("DATABASE INCONSISTENCY: attempting to fix references for literal or anonymous node {}", result.getString("svalue"));
}
long latest_id = result.getLong(4);
http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/e505371e/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/sail/KiWiSailConnection.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/sail/KiWiSailConnection.java b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/sail/KiWiSailConnection.java
index a9a1aec..fe7176e 100644
--- a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/sail/KiWiSailConnection.java
+++ b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/sail/KiWiSailConnection.java
@@ -86,7 +86,6 @@ public class KiWiSailConnection extends NotifyingSailConnectionBase implements I
private boolean triplesAdded, triplesRemoved;
- private HashSet<Long> deletedStatementsLog;
public KiWiSailConnection(KiWiStore sailBase) throws SailException {
super(sailBase);
@@ -164,19 +163,9 @@ public class KiWiSailConnection extends NotifyingSailConnectionBase implements I
KiWiTriple triple = (KiWiTriple)valueFactory.createStatement(ksubj,kpred,kobj,kcontext, databaseConnection);
triple.setInferred(inferred);
- if(triple.getId() != null && deletedStatementsLog.contains(triple.getId())) {
- // this is a hack for a concurrency problem that may occur in case the triple is removed in the
- // transaction and then added again; in these cases the createStatement method might return
- // an expired state of the triple because it uses its own database connection
-
- databaseConnection.undeleteTriple(triple);
+ if(databaseConnection.storeTriple(triple)) {
triplesAdded = true;
notifyStatementAdded(triple);
- } else {
- if(databaseConnection.storeTriple(triple)) {
- triplesAdded = true;
- notifyStatementAdded(triple);
- }
}
added.add(triple);
@@ -344,7 +333,6 @@ public class KiWiSailConnection extends NotifyingSailConnectionBase implements I
// nothing to do, the database transaction is started automatically
triplesAdded = false;
triplesRemoved = false;
- deletedStatementsLog = new HashSet<Long>();
}
@Override
@@ -355,7 +343,6 @@ public class KiWiSailConnection extends NotifyingSailConnectionBase implements I
throw new SailException("database error while committing transaction",e);
}
if(triplesAdded || triplesRemoved) {
- deletedStatementsLog.clear();
store.notifySailChanged(new SailChangedEvent() {
@Override
@@ -380,7 +367,6 @@ public class KiWiSailConnection extends NotifyingSailConnectionBase implements I
protected void rollbackInternal() throws SailException {
try {
databaseConnection.rollback();
- deletedStatementsLog.clear();
} catch (SQLException e) {
throw new SailException("database error while rolling back transaction",e);
}
@@ -395,7 +381,6 @@ public class KiWiSailConnection extends NotifyingSailConnectionBase implements I
if(triple.getId() != null) {
databaseConnection.deleteTriple(triple);
triplesRemoved = true;
- deletedStatementsLog.add(triple.getId());
notifyStatementRemoved(triple);
}
}
@@ -426,7 +411,6 @@ public class KiWiSailConnection extends NotifyingSailConnectionBase implements I
if(triple.getId() != null && triple.isInferred()) {
databaseConnection.deleteTriple(triple);
triplesRemoved = true;
- deletedStatementsLog.add(triple.getId());
notifyStatementRemoved(triple);
}
}
@@ -448,7 +432,6 @@ public class KiWiSailConnection extends NotifyingSailConnectionBase implements I
if(triple.getId() != null && triple.isInferred()) {
databaseConnection.deleteTriple(triple);
triplesRemoved = true;
- deletedStatementsLog.add(triple.getId());
notifyStatementRemoved(triple);
}
} catch(SQLException ex) {
http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/e505371e/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/sail/KiWiValueFactory.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/sail/KiWiValueFactory.java b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/sail/KiWiValueFactory.java
index b3843b3..1c5927d 100644
--- a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/sail/KiWiValueFactory.java
+++ b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/sail/KiWiValueFactory.java
@@ -687,22 +687,12 @@ public class KiWiValueFactory implements ValueFactory {
IntArray cacheKey = IntArray.createSPOCKey(subject, predicate, object, context);
KiWiTriple result = (KiWiTriple)tripleRegistry.get(cacheKey);
try {
- if(result == null || ((KiWiTriple)result).isDeleted()) {
+ if(result == null || result.isDeleted()) {
KiWiResource ksubject = convert(subject);
KiWiUriResource kpredicate = convert(predicate);
KiWiNode kobject = convert(object);
KiWiResource kcontext = convert(context);
- // test if the triple already exists in the database; if yes, return it
- /*
- List<Statement> triples = Iterations.asList(connection.listTriples(ksubject,kpredicate,kobject,kcontext,true));
- if(triples.size() == 1) {
- result = triples.get(0);
- } else {
- result = new KiWiTriple(ksubject,kpredicate,kobject,kcontext);
- ((KiWiTriple)result).setMarkedForReasoning(true);
- }
- */
result = new KiWiTriple(ksubject,kpredicate,kobject,kcontext);
result.setId(connection.getTripleId(ksubject,kpredicate,kobject,kcontext,true));
if(result.getId() == null) {
@@ -718,6 +708,15 @@ public class KiWiValueFactory implements ValueFactory {
}
}
+ /**
+ * Remove a statement from the triple registry. Called when the statement is deleted and the transaction commits.
+ * @param triple
+ */
+ protected void removeStatement(KiWiTriple triple) {
+ IntArray cacheKey = IntArray.createSPOCKey(triple.getSubject(), triple.getPredicate(), triple.getObject(), triple.getContext());
+ tripleRegistry.remove(cacheKey);
+ triple.setDeleted(true);
+ }
public KiWiResource convert(Resource r) {
@@ -762,6 +761,22 @@ public class KiWiValueFactory implements ValueFactory {
this.batchSize = batchSize;
}
+
+ /**
+ * Immediately flush the batch to the database using the value factory's connection. The method expects the
+ * underlying connection to start and commit the node batch.
+ */
+ public void flushBatch() throws SQLException {
+ KiWiConnection con = aqcuireConnection();
+ try {
+ flushBatch(con);
+ } finally {
+ releaseConnection(con);
+ }
+
+ }
+
+
/**
* Immediately flush the batch to the database. The method expects the underlying connection to start and commit
* the node batch.
http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/e505371e/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/MySQLConcurrencyTest.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/MySQLConcurrencyTest.java b/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/MySQLConcurrencyTest.java
index 6fbb740..e174864 100644
--- a/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/MySQLConcurrencyTest.java
+++ b/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/MySQLConcurrencyTest.java
@@ -153,7 +153,7 @@ public class MySQLConcurrencyTest {
break;
case 1: object = repository.getValueFactory().createBNode();
break;
- case 2: object = repository.getValueFactory().createLiteral(RandomStringUtils.random(40));
+ case 2: object = repository.getValueFactory().createLiteral(RandomStringUtils.randomAscii(40)); // MySQL often has UTF problems
break;
case 3: object = repository.getValueFactory().createLiteral(rnd.nextInt());
break;