You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@marmotta.apache.org by wi...@apache.org on 2014/06/13 10:57:38 UTC
[018/100] [abbrv] [partial] Reverting the erroneous merge by
Sebastian according to the instructions in INFRA-6876
http://git-wip-us.apache.org/repos/asf/marmotta/blob/582abb5b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java
index e582631..9c7de67 100644
--- a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java
+++ b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java
@@ -17,38 +17,35 @@
*/
package org.apache.marmotta.kiwi.persistence;
+import info.aduna.iteration.*;
+
import org.apache.marmotta.commons.sesame.model.LiteralCommons;
import org.apache.marmotta.commons.sesame.model.Namespaces;
import org.apache.marmotta.commons.util.DateUtils;
+
import com.google.common.base.Preconditions;
-import info.aduna.iteration.CloseableIteration;
-import info.aduna.iteration.EmptyIteration;
-import info.aduna.iteration.ExceptionConvertingIteration;
+
import net.sf.ehcache.Cache;
import net.sf.ehcache.Element;
-import org.apache.commons.lang.LocaleUtils;
+
import org.apache.marmotta.kiwi.caching.KiWiCacheManager;
+import org.apache.marmotta.kiwi.config.KiWiConfiguration;
+import org.apache.marmotta.kiwi.model.caching.TripleTable;
import org.apache.marmotta.kiwi.model.rdf.*;
import org.apache.marmotta.kiwi.persistence.util.ResultSetIteration;
import org.apache.marmotta.kiwi.persistence.util.ResultTransformerFunction;
import org.openrdf.model.Literal;
+import org.openrdf.model.Resource;
import org.openrdf.model.Statement;
import org.openrdf.repository.RepositoryException;
import org.openrdf.repository.RepositoryResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Timestamp;
+import java.sql.*;
+import java.util.*;
import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Set;
+import java.util.concurrent.locks.ReentrantLock;
/**
* A KiWiConnection offers methods for storing and retrieving KiWiTriples, KiWiNodes, and KiWiNamespaces in the
@@ -70,6 +67,7 @@ public class KiWiConnection {
protected KiWiCacheManager cacheManager;
+ protected TripleTable<KiWiTriple> tripleBatch;
/**
* Cache nodes by database ID
@@ -119,11 +117,34 @@ public class KiWiConnection {
private boolean autoCommit = false;
+ private boolean batchCommit = true;
+
+ private int batchSize = 1000;
+
+ private ReentrantLock commitLock;
+
+ private ReentrantLock literalLock;
+ private ReentrantLock uriLock;
+ private ReentrantLock bnodeLock;
+
+
+ // this set keeps track of all statements that have been deleted in the active transaction of this connection
+ // this is needed to be able to determine if adding the triple again will merely undo a deletion or is a
+ // completely new addition to the triple store
+ private HashSet<Long> deletedStatementsLog;
+
+ private static long numberOfCommits = 0;
public KiWiConnection(KiWiPersistence persistence, KiWiDialect dialect, KiWiCacheManager cacheManager) throws SQLException {
this.cacheManager = cacheManager;
this.dialect = dialect;
this.persistence = persistence;
+ this.commitLock = new ReentrantLock();
+ this.literalLock = new ReentrantLock();
+ this.uriLock = new ReentrantLock();
+ this.bnodeLock = new ReentrantLock();
+ this.batchCommit = dialect.isBatchSupported();
+ this.deletedStatementsLog = new HashSet<Long>();
initCachePool();
initStatementCache();
@@ -163,6 +184,9 @@ public class KiWiConnection {
connection = persistence.getJDBCConnection();
connection.setAutoCommit(autoCommit);
}
+ if(tripleBatch == null) {
+ tripleBatch = new TripleTable<KiWiTriple>();
+ }
}
/**
@@ -170,7 +194,9 @@ public class KiWiConnection {
*
* @return
*/
- public Connection getJDBCConnection() {
+ public Connection getJDBCConnection() throws SQLException {
+ requireJDBCConnection();
+
return connection;
}
@@ -182,6 +208,14 @@ public class KiWiConnection {
return cacheManager;
}
+ public KiWiDialect getDialect() {
+ return dialect;
+ }
+
+ public KiWiConfiguration getConfiguration() {
+ return persistence.getConfiguration();
+ }
+
/**
* Load a KiWiNamespace with the given prefix, or null if the namespace does not exist. The method will first
* look in the node cache for cached nodes. If no cache entry is found, it will run a database query
@@ -317,9 +351,9 @@ public class KiWiConnection {
ResultSet result = querySize.executeQuery();
try {
if(result.next()) {
- return result.getLong(1);
+ return result.getLong(1) + (tripleBatch != null ? tripleBatch.size() : 0);
} else {
- return 0;
+ return 0 + (tripleBatch != null ? tripleBatch.size() : 0);
}
} finally {
result.close();
@@ -344,9 +378,9 @@ public class KiWiConnection {
ResultSet result = querySize.executeQuery();
try {
if(result.next()) {
- return result.getLong(1);
+ return result.getLong(1) + (tripleBatch != null ? tripleBatch.listTriples(null,null,null,context, false).size() : 0);
} else {
- return 0;
+ return 0 + (tripleBatch != null ? tripleBatch.listTriples(null,null,null,context, false).size() : 0);
}
} finally {
result.close();
@@ -391,20 +425,22 @@ public class KiWiConnection {
// prepare a query; we will only iterate once, read only, and need only one result row since the id is unique
PreparedStatement query = getPreparedStatement("load.node_by_id");
- query.setLong(1,id);
- query.setMaxRows(1);
+ synchronized (query) {
+ query.setLong(1,id);
+ query.setMaxRows(1);
- // run the database query and if it yields a result, construct a new node; the method call will take care of
- // caching the constructed node for future calls
- ResultSet result = query.executeQuery();
- try {
- if(result.next()) {
- return constructNodeFromDatabase(result);
- } else {
- return null;
+ // run the database query and if it yields a result, construct a new node; the method call will take care of
+ // caching the constructed node for future calls
+ ResultSet result = query.executeQuery();
+ try {
+ if(result.next()) {
+ return constructNodeFromDatabase(result);
+ } else {
+ return null;
+ }
+ } finally {
+ result.close();
}
- } finally {
- result.close();
}
}
@@ -451,6 +487,8 @@ public class KiWiConnection {
* @return the KiWiUriResource identified by the given URI or null if it does not exist
*/
public KiWiUriResource loadUriResource(String uri) throws SQLException {
+ Preconditions.checkNotNull(uri);
+
// look in cache
Element element = uriCache.get(uri);
if(element != null) {
@@ -459,22 +497,27 @@ public class KiWiConnection {
requireJDBCConnection();
- // prepare a query; we will only iterate once, read only, and need only one result row since the id is unique
- PreparedStatement query = getPreparedStatement("load.uri_by_uri");
- query.setString(1, uri);
- query.setMaxRows(1);
-
- // run the database query and if it yields a result, construct a new node; the method call will take care of
- // caching the constructed node for future calls
- ResultSet result = query.executeQuery();
+ uriLock.lock();
try {
- if(result.next()) {
- return (KiWiUriResource)constructNodeFromDatabase(result);
- } else {
- return null;
+ // prepare a query; we will only iterate once, read only, and need only one result row since the id is unique
+ PreparedStatement query = getPreparedStatement("load.uri_by_uri");
+ query.setString(1, uri);
+ query.setMaxRows(1);
+
+ // run the database query and if it yields a result, construct a new node; the method call will take care of
+ // caching the constructed node for future calls
+ ResultSet result = query.executeQuery();
+ try {
+ if(result.next()) {
+ return (KiWiUriResource)constructNodeFromDatabase(result);
+ } else {
+ return null;
+ }
+ } finally {
+ result.close();
}
} finally {
- result.close();
+ uriLock.unlock();
}
}
@@ -500,22 +543,29 @@ public class KiWiConnection {
requireJDBCConnection();
- // prepare a query; we will only iterate once, read only, and need only one result row since the id is unique
- PreparedStatement query = getPreparedStatement("load.bnode_by_anonid");
- query.setString(1,id);
- query.setMaxRows(1);
- // run the database query and if it yields a result, construct a new node; the method call will take care of
- // caching the constructed node for future calls
- ResultSet result = query.executeQuery();
+ bnodeLock.lock();
+
try {
- if(result.next()) {
- return (KiWiAnonResource)constructNodeFromDatabase(result);
- } else {
- return null;
+ // prepare a query; we will only iterate once, read only, and need only one result row since the id is unique
+ PreparedStatement query = getPreparedStatement("load.bnode_by_anonid");
+ query.setString(1,id);
+ query.setMaxRows(1);
+
+ // run the database query and if it yields a result, construct a new node; the method call will take care of
+ // caching the constructed node for future calls
+ ResultSet result = query.executeQuery();
+ try {
+ if(result.next()) {
+ return (KiWiAnonResource)constructNodeFromDatabase(result);
+ } else {
+ return null;
+ }
+ } finally {
+ result.close();
}
} finally {
- result.close();
+ bnodeLock.unlock();
}
}
@@ -541,42 +591,48 @@ public class KiWiConnection {
return (KiWiLiteral)element.getObjectValue();
}
+ requireJDBCConnection();
+
// ltype not persisted
if(ltype != null && ltype.getId() == null) {
return null;
}
- requireJDBCConnection();
-
- // otherwise prepare a query, depending on the parameters given
- final PreparedStatement query;
- if(lang == null && ltype == null) {
- query = getPreparedStatement("load.literal_by_v");
- query.setString(1,value);
- } else if(lang != null) {
- query = getPreparedStatement("load.literal_by_vl");
- query.setString(1,value);
- query.setString(2, lang);
- } else if(ltype != null) {
- query = getPreparedStatement("load.literal_by_vt");
- query.setString(1,value);
- query.setLong(2,ltype.getId());
- } else {
- // This cannot happen...
- throw new IllegalArgumentException("Impossible combination of lang/type in loadLiteral!");
- }
+ literalLock.lock();
- // run the database query and if it yields a result, construct a new node; the method call will take care of
- // caching the constructed node for future calls
- ResultSet result = query.executeQuery();
try {
- if(result.next()) {
- return (KiWiLiteral)constructNodeFromDatabase(result);
+ // otherwise prepare a query, depending on the parameters given
+ final PreparedStatement query;
+ if(lang == null && ltype == null) {
+ query = getPreparedStatement("load.literal_by_v");
+ query.setString(1,value);
+ } else if(lang != null) {
+ query = getPreparedStatement("load.literal_by_vl");
+ query.setString(1,value);
+ query.setString(2, lang);
+ } else if(ltype != null) {
+ query = getPreparedStatement("load.literal_by_vt");
+ query.setString(1,value);
+ query.setLong(2,ltype.getId());
} else {
- return null;
+ // This cannot happen...
+ throw new IllegalArgumentException("Impossible combination of lang/type in loadLiteral!");
+ }
+
+ // run the database query and if it yields a result, construct a new node; the method call will take care of
+ // caching the constructed node for future calls
+ ResultSet result = query.executeQuery();
+ try {
+ if(result.next()) {
+ return (KiWiLiteral)constructNodeFromDatabase(result);
+ } else {
+ return null;
+ }
+ } finally {
+ result.close();
}
} finally {
- result.close();
+ literalLock.unlock();
}
}
@@ -594,36 +650,42 @@ public class KiWiConnection {
* @throws SQLException
*/
public KiWiDateLiteral loadLiteral(Date date) throws SQLException {
- KiWiUriResource ltype = loadUriResource(Namespaces.NS_XSD + "dateTime");
-
- if(ltype == null || ltype.getId() == null) {
- return null;
- }
-
// look in cache
- Element element = literalCache.get(LiteralCommons.createCacheKey(DateUtils.getDateWithoutFraction(date),ltype.stringValue()));
+ Element element = literalCache.get(LiteralCommons.createCacheKey(DateUtils.getDateWithoutFraction(date),Namespaces.NS_XSD + "dateTime"));
if(element != null) {
return (KiWiDateLiteral)element.getObjectValue();
}
requireJDBCConnection();
- // otherwise prepare a query, depending on the parameters given
- PreparedStatement query = getPreparedStatement("load.literal_by_tv");
- query.setTimestamp(1, new Timestamp(DateUtils.getDateWithoutFraction(date).getTime()));
- query.setLong(2,ltype.getId());
+ KiWiUriResource ltype = loadUriResource(Namespaces.NS_XSD + "dateTime");
- // run the database query and if it yields a result, construct a new node; the method call will take care of
- // caching the constructed node for future calls
- ResultSet result = query.executeQuery();
+ if(ltype == null || ltype.getId() == null) {
+ return null;
+ }
+
+ literalLock.lock();
try {
- if(result.next()) {
- return (KiWiDateLiteral)constructNodeFromDatabase(result);
- } else {
- return null;
+
+ // otherwise prepare a query, depending on the parameters given
+ PreparedStatement query = getPreparedStatement("load.literal_by_tv");
+ query.setTimestamp(1, new Timestamp(DateUtils.getDateWithoutFraction(date).getTime()));
+ query.setLong(2,ltype.getId());
+
+ // run the database query and if it yields a result, construct a new node; the method call will take care of
+ // caching the constructed node for future calls
+ ResultSet result = query.executeQuery();
+ try {
+ if(result.next()) {
+ return (KiWiDateLiteral)constructNodeFromDatabase(result);
+ } else {
+ return null;
+ }
+ } finally {
+ result.close();
}
} finally {
- result.close();
+ literalLock.unlock();
}
}
@@ -642,6 +704,14 @@ public class KiWiConnection {
* @throws SQLException
*/
public KiWiIntLiteral loadLiteral(long value) throws SQLException {
+ // look in cache
+ Element element = literalCache.get(LiteralCommons.createCacheKey(Long.toString(value),null,Namespaces.NS_XSD + "integer"));
+ if(element != null) {
+ return (KiWiIntLiteral)element.getObjectValue();
+ }
+
+ requireJDBCConnection();
+
KiWiUriResource ltype = loadUriResource(Namespaces.NS_XSD + "integer");
// ltype not persisted
@@ -649,30 +719,29 @@ public class KiWiConnection {
return null;
}
- // look in cache
- Element element = literalCache.get(LiteralCommons.createCacheKey(Long.toString(value),null,ltype.stringValue()));
- if(element != null) {
- return (KiWiIntLiteral)element.getObjectValue();
- }
+ literalLock.lock();
- requireJDBCConnection();
+ try {
- // otherwise prepare a query, depending on the parameters given
- PreparedStatement query = getPreparedStatement("load.literal_by_iv");
- query.setLong(1,value);
- query.setLong(2,ltype.getId());
+ // otherwise prepare a query, depending on the parameters given
+ PreparedStatement query = getPreparedStatement("load.literal_by_iv");
+ query.setLong(1,value);
+ query.setLong(2,ltype.getId());
- // run the database query and if it yields a result, construct a new node; the method call will take care of
- // caching the constructed node for future calls
- ResultSet result = query.executeQuery();
- try {
- if(result.next()) {
- return (KiWiIntLiteral)constructNodeFromDatabase(result);
- } else {
- return null;
+ // run the database query and if it yields a result, construct a new node; the method call will take care of
+ // caching the constructed node for future calls
+ ResultSet result = query.executeQuery();
+ try {
+ if(result.next()) {
+ return (KiWiIntLiteral)constructNodeFromDatabase(result);
+ } else {
+ return null;
+ }
+ } finally {
+ result.close();
}
} finally {
- result.close();
+ literalLock.unlock();
}
}
@@ -689,38 +758,44 @@ public class KiWiConnection {
* @return a KiWiDoubleLiteral with the correct value, or null if it does not exist
* @throws SQLException
*/
- public KiWiDoubleLiteral loadLiteral(double value) throws SQLException {
- KiWiUriResource ltype = loadUriResource(Namespaces.NS_XSD + "double");
-
- // ltype not persisted
- if(ltype == null || ltype.getId() == null) {
- return null;
- }
-
+ public synchronized KiWiDoubleLiteral loadLiteral(double value) throws SQLException {
// look in cache
- Element element = literalCache.get(LiteralCommons.createCacheKey(Double.toString(value),null,ltype.stringValue()));
+ Element element = literalCache.get(LiteralCommons.createCacheKey(Double.toString(value),null,Namespaces.NS_XSD + "double"));
if(element != null) {
return (KiWiDoubleLiteral)element.getObjectValue();
}
requireJDBCConnection();
- // otherwise prepare a query, depending on the parameters given
- PreparedStatement query = getPreparedStatement("load.literal_by_dv");
- query.setDouble(1, value);
- query.setLong(2,ltype.getId());
+ KiWiUriResource ltype = loadUriResource(Namespaces.NS_XSD + "double");
+
+ // ltype not persisted
+ if(ltype == null || ltype.getId() == null) {
+ return null;
+ }
+
+ literalLock.lock();
- // run the database query and if it yields a result, construct a new node; the method call will take care of
- // caching the constructed node for future calls
- ResultSet result = query.executeQuery();
try {
- if(result.next()) {
- return (KiWiDoubleLiteral)constructNodeFromDatabase(result);
- } else {
- return null;
+ // otherwise prepare a query, depending on the parameters given
+ PreparedStatement query = getPreparedStatement("load.literal_by_dv");
+ query.setDouble(1, value);
+ query.setLong(2,ltype.getId());
+
+ // run the database query and if it yields a result, construct a new node; the method call will take care of
+ // caching the constructed node for future calls
+ ResultSet result = query.executeQuery();
+ try {
+ if(result.next()) {
+ return (KiWiDoubleLiteral)constructNodeFromDatabase(result);
+ } else {
+ return null;
+ }
+ } finally {
+ result.close();
}
} finally {
- result.close();
+ literalLock.unlock();
}
}
@@ -738,15 +813,8 @@ public class KiWiConnection {
* @throws SQLException
*/
public KiWiBooleanLiteral loadLiteral(boolean value) throws SQLException {
- KiWiUriResource ltype = loadUriResource(Namespaces.NS_XSD + "boolean");
-
- // ltype not persisted
- if(ltype == null || ltype.getId() == null) {
- return null;
- }
-
// look in cache
- Element element = literalCache.get(LiteralCommons.createCacheKey(Boolean.toString(value),null,ltype.stringValue()));
+ Element element = literalCache.get(LiteralCommons.createCacheKey(Boolean.toString(value),null,Namespaces.NS_XSD + "boolean"));
if(element != null) {
return (KiWiBooleanLiteral)element.getObjectValue();
}
@@ -754,22 +822,36 @@ public class KiWiConnection {
requireJDBCConnection();
- // otherwise prepare a query, depending on the parameters given
- PreparedStatement query = getPreparedStatement("load.literal_by_bv");
- query.setBoolean(1, value);
- query.setLong(2,ltype.getId());
+ KiWiUriResource ltype = loadUriResource(Namespaces.NS_XSD + "boolean");
+
+ // ltype not persisted
+ if(ltype == null || ltype.getId() == null) {
+ return null;
+ }
+
+ literalLock.lock();
- // run the database query and if it yields a result, construct a new node; the method call will take care of
- // caching the constructed node for future calls
- ResultSet result = query.executeQuery();
try {
- if(result.next()) {
- return (KiWiBooleanLiteral)constructNodeFromDatabase(result);
- } else {
- return null;
+
+ // otherwise prepare a query, depending on the parameters given
+ PreparedStatement query = getPreparedStatement("load.literal_by_bv");
+ query.setBoolean(1, value);
+ query.setLong(2,ltype.getId());
+
+ // run the database query and if it yields a result, construct a new node; the method call will take care of
+ // caching the constructed node for future calls
+ ResultSet result = query.executeQuery();
+ try {
+ if(result.next()) {
+ return (KiWiBooleanLiteral)constructNodeFromDatabase(result);
+ } else {
+ return null;
+ }
+ } finally {
+ result.close();
}
} finally {
- result.close();
+ literalLock.unlock();
}
}
@@ -779,6 +861,11 @@ public class KiWiConnection {
// }
+
+ public long getNodeId() throws SQLException {
+ return getNextSequence("seq.nodes");
+ }
+
/**
* Store a new node in the database. The method will retrieve a new database id for the node and update the
* passed object. Afterwards, the node data will be inserted into the database using appropriate INSERT
@@ -786,28 +873,27 @@ public class KiWiConnection {
* <p/>
* If the node already has an ID, the method will do nothing (assuming that it is already persistent)
*
+ *
* @param node
+ * @param batch
* @throws SQLException
*/
- public synchronized void storeNode(KiWiNode node) throws SQLException {
- // if the node already has an ID, storeNode should not be called, since it is already persisted
- if(node.getId() != null) {
- log.warn("node {} already had a node ID, not persisting", node);
- return;
- }
+ public synchronized void storeNode(KiWiNode node, boolean batch) throws SQLException {
// ensure the data type of a literal is persisted first
if(node instanceof KiWiLiteral) {
KiWiLiteral literal = (KiWiLiteral)node;
if(literal.getType() != null && literal.getType().getId() == null) {
- storeNode(literal.getType());
+ storeNode(literal.getType(), batch);
}
}
requireJDBCConnection();
// retrieve a new node id and set it in the node object
- node.setId(getNextSequence("seq.nodes"));
+ if(node.getId() == null) {
+ node.setId(getNextSequence("seq.nodes"));
+ }
// distinguish the different node types and run the appropriate updates
if(node instanceof KiWiUriResource) {
@@ -817,7 +903,12 @@ public class KiWiConnection {
insertNode.setLong(1,node.getId());
insertNode.setString(2,uriResource.stringValue());
insertNode.setTimestamp(3, new Timestamp(uriResource.getCreated().getTime()));
- insertNode.executeUpdate();
+
+ if(batch) {
+ insertNode.addBatch();
+ } else {
+ insertNode.executeUpdate();
+ }
} else if(node instanceof KiWiAnonResource) {
KiWiAnonResource anonResource = (KiWiAnonResource)node;
@@ -826,7 +917,12 @@ public class KiWiConnection {
insertNode.setLong(1,node.getId());
insertNode.setString(2,anonResource.stringValue());
insertNode.setTimestamp(3, new Timestamp(anonResource.getCreated().getTime()));
- insertNode.executeUpdate();
+
+ if(batch) {
+ insertNode.addBatch();
+ } else {
+ insertNode.executeUpdate();
+ }
} else if(node instanceof KiWiDateLiteral) {
KiWiDateLiteral dateLiteral = (KiWiDateLiteral)node;
@@ -840,7 +936,11 @@ public class KiWiConnection {
throw new IllegalStateException("a date literal must have a datatype");
insertNode.setTimestamp(5, new Timestamp(dateLiteral.getCreated().getTime()));
- insertNode.executeUpdate();
+ if(batch) {
+ insertNode.addBatch();
+ } else {
+ insertNode.executeUpdate();
+ }
} else if(node instanceof KiWiIntLiteral) {
KiWiIntLiteral intLiteral = (KiWiIntLiteral)node;
@@ -855,7 +955,11 @@ public class KiWiConnection {
throw new IllegalStateException("an integer literal must have a datatype");
insertNode.setTimestamp(6, new Timestamp(intLiteral.getCreated().getTime()));
- insertNode.executeUpdate();
+ if(batch) {
+ insertNode.addBatch();
+ } else {
+ insertNode.executeUpdate();
+ }
} else if(node instanceof KiWiDoubleLiteral) {
KiWiDoubleLiteral doubleLiteral = (KiWiDoubleLiteral)node;
@@ -869,7 +973,11 @@ public class KiWiConnection {
throw new IllegalStateException("a double literal must have a datatype");
insertNode.setTimestamp(5, new Timestamp(doubleLiteral.getCreated().getTime()));
- insertNode.executeUpdate();
+ if(batch) {
+ insertNode.addBatch();
+ } else {
+ insertNode.executeUpdate();
+ }
} else if(node instanceof KiWiBooleanLiteral) {
KiWiBooleanLiteral booleanLiteral = (KiWiBooleanLiteral)node;
@@ -883,7 +991,11 @@ public class KiWiConnection {
throw new IllegalStateException("a boolean literal must have a datatype");
insertNode.setTimestamp(5, new Timestamp(booleanLiteral.getCreated().getTime()));
- insertNode.executeUpdate();
+ if(batch) {
+ insertNode.addBatch();
+ } else {
+ insertNode.executeUpdate();
+ }
} else if(node instanceof KiWiStringLiteral) {
KiWiStringLiteral stringLiteral = (KiWiStringLiteral)node;
@@ -902,7 +1014,11 @@ public class KiWiConnection {
}
insertNode.setTimestamp(5, new Timestamp(stringLiteral.getCreated().getTime()));
- insertNode.executeUpdate();
+ if(batch) {
+ insertNode.addBatch();
+ } else {
+ insertNode.executeUpdate();
+ }
} else {
log.warn("unrecognized node type: {}", node.getClass().getCanonicalName());
}
@@ -911,45 +1027,165 @@ public class KiWiConnection {
}
/**
+ * Start a batch operation for inserting nodes. Afterwards, storeNode needs to be called with the batch argument
+ * set to "true".
+ *
+ * @throws SQLException
+ */
+ public void startNodeBatch() throws SQLException {
+ for(String stmt : new String[] { "store.uri", "store.sliteral", "store.bliteral", "store.dliteral", "store.iliteral", "store.tliteral", "store.bnode"}) {
+ PreparedStatement insertNode = getPreparedStatement(stmt);
+ insertNode.clearParameters();
+ insertNode.clearBatch();
+ }
+ }
+
+ /**
+ * Execute the batch operation for inserting nodes into the database.
+ * @throws SQLException
+ */
+ public void commitNodeBatch() throws SQLException {
+ for(String stmt : new String[] { "store.uri", "store.sliteral", "store.bliteral", "store.dliteral", "store.iliteral", "store.tliteral", "store.bnode"}) {
+ PreparedStatement insertNode = getPreparedStatement(stmt);
+ insertNode.executeBatch();
+ }
+ connection.commit();
+ }
+
+ /**
* Store a triple in the database. This method assumes that all nodes used by the triple are already persisted.
*
* @param triple the triple to store
* @throws SQLException
* @throws NullPointerException in case the subject, predicate, object or context have not been persisted
+ * @return true in case the update added a new triple to the database, false in case the triple already existed
*/
- public synchronized void storeTriple(KiWiTriple triple) throws SQLException {
- // if the node already has an ID, storeNode should not be called, since it is already persisted
- if(triple.getId() != null) {
- log.warn("triple {} already had a triple ID, not persisting", triple);
- return;
- }
+ public synchronized boolean storeTriple(final KiWiTriple triple) throws SQLException {
+ // mutual exclusion: prevent parallel adding and removing of the same triple
+ synchronized (triple) {
- Preconditions.checkNotNull(triple.getSubject().getId());
- Preconditions.checkNotNull(triple.getPredicate().getId());
- Preconditions.checkNotNull(triple.getObject().getId());
- Preconditions.checkNotNull(triple.getContext().getId());
+ requireJDBCConnection();
+ boolean hasId = triple.getId() != null;
- requireJDBCConnection();
+ if(hasId && deletedStatementsLog.contains(triple.getId())) {
+ // this is a hack for a concurrency problem that may occur in case the triple is removed in the
+ // transaction and then added again; in these cases the createStatement method might return
+ // an expired state of the triple because it uses its own database connection
- // retrieve a new triple ID and set it in the object
- triple.setId(getNextSequence("seq.triples"));
+ //deletedStatementsLog.remove(triple.getId());
+ undeleteTriple(triple);
- PreparedStatement insertTriple = getPreparedStatement("store.triple");
- insertTriple.setLong(1,triple.getId());
- insertTriple.setLong(2,triple.getSubject().getId());
- insertTriple.setLong(3,triple.getPredicate().getId());
- insertTriple.setLong(4,triple.getObject().getId());
- insertTriple.setLong(5,triple.getContext().getId());
- insertTriple.setBoolean(6,triple.isInferred());
- insertTriple.setTimestamp(7, new Timestamp(triple.getCreated().getTime()));
- insertTriple.executeUpdate();
+ return true;
+ } else {
+ // retrieve a new triple ID and set it in the object
+ if(triple.getId() == null) {
+ triple.setId(getNextSequence("seq.triples"));
+ }
- cacheTriple(triple);
+ if(batchCommit) {
+ commitLock.lock();
+ try {
+ cacheTriple(triple);
+ tripleBatch.add(triple);
+ if(tripleBatch.size() >= batchSize) {
+ flushBatch();
+ }
+ } finally {
+ commitLock.unlock();
+ }
+ return !hasId;
+ } else {
+ Preconditions.checkNotNull(triple.getSubject().getId());
+ Preconditions.checkNotNull(triple.getPredicate().getId());
+ Preconditions.checkNotNull(triple.getObject().getId());
+
+
+ try {
+ RetryExecution<Boolean> execution = new RetryExecution<>("STORE");
+ execution.setUseSavepoint(true);
+ execution.execute(connection, new RetryCommand<Boolean>() {
+ @Override
+ public Boolean run() throws SQLException {
+ PreparedStatement insertTriple = getPreparedStatement("store.triple");
+ insertTriple.setLong(1,triple.getId());
+ insertTriple.setLong(2,triple.getSubject().getId());
+ insertTriple.setLong(3,triple.getPredicate().getId());
+ insertTriple.setLong(4,triple.getObject().getId());
+ if(triple.getContext() != null) {
+ insertTriple.setLong(5,triple.getContext().getId());
+ } else {
+ insertTriple.setNull(5, Types.BIGINT);
+ }
+ insertTriple.setBoolean(6,triple.isInferred());
+ insertTriple.setTimestamp(7, new Timestamp(triple.getCreated().getTime()));
+ int count = insertTriple.executeUpdate();
+
+ cacheTriple(triple);
+
+ return count > 0;
+ }
+ });
+
+ return !hasId;
+
+ } catch(SQLException ex) {
+ if("HYT00".equals(ex.getSQLState())) { // H2 table locking timeout
+ throw new ConcurrentModificationException("the same triple was modified in concurrent transactions (triple="+triple+")");
+ } else {
+ throw ex;
+ }
+ }
+ }
+ }
+ }
}
/**
+ * Return the identifier of the triple with the given subject, predicate, object and context, or null if this
+ * triple does not exist. Used for quick existance checks of triples.
+ *
+ * @param subject
+ * @param predicate
+ * @param object
+ * @param context
+ * @param inferred
+ * @return
+ */
+ public synchronized Long getTripleId(final KiWiResource subject, final KiWiUriResource predicate, final KiWiNode object, final KiWiResource context, final boolean inferred) throws SQLException {
+ if(tripleBatch != null && tripleBatch.size() > 0) {
+ Collection<KiWiTriple> batched = tripleBatch.listTriples(subject,predicate,object,context, false);
+ if(batched.size() > 0) {
+ return batched.iterator().next().getId();
+ }
+ }
+
+ requireJDBCConnection();
+ PreparedStatement loadTripleId = getPreparedStatement("load.triple");
+ loadTripleId.setLong(1, subject.getId());
+ loadTripleId.setLong(2, predicate.getId());
+ loadTripleId.setLong(3, object.getId());
+ if(context != null) {
+ loadTripleId.setLong(4, context.getId());
+ } else {
+ loadTripleId.setNull(4, Types.BIGINT);
+ }
+
+ ResultSet result = loadTripleId.executeQuery();
+ try {
+ if(result.next()) {
+ return result.getLong(1);
+ } else {
+ return null;
+ }
+
+ } finally {
+ result.close();
+ }
+ }
+
+ /**
* Mark the triple passed as argument as deleted, setting the "deleted" flag to true and
* updating the timestamp value of "deletedAt".
* <p/>
@@ -958,23 +1194,62 @@ public class KiWiConnection {
*
* @param triple
*/
- public void deleteTriple(KiWiTriple triple) throws SQLException {
- if(triple.getId() == null) {
- log.warn("attempting to remove non-persistent triple: {}",triple);
- return;
- }
-
+ public void deleteTriple(final KiWiTriple triple) throws SQLException {
requireJDBCConnection();
- PreparedStatement deleteTriple = getPreparedStatement("delete.triple");
- deleteTriple.setLong(1,triple.getId());
- deleteTriple.executeUpdate();
+ RetryExecution execution = new RetryExecution("DELETE");
+ execution.setUseSavepoint(true);
+ execution.execute(connection, new RetryCommand<Void>() {
+ @Override
+ public Void run() throws SQLException {
+ // mutual exclusion: prevent parallel adding and removing of the same triple
+ synchronized (triple) {
+
+ // make sure the triple is marked as deleted in case some service still holds a reference
+ triple.setDeleted(true);
+ triple.setDeletedAt(new Date());
+
+ if (triple.getId() == null) {
+ log.warn("attempting to remove non-persistent triple: {}", triple);
+ removeCachedTriple(triple);
+ } else {
+ if (batchCommit) {
+ // need to remove from triple batch and from database
+ commitLock.lock();
+ try {
+ if (tripleBatch == null || !tripleBatch.remove(triple)) {
+
+ PreparedStatement deleteTriple = getPreparedStatement("delete.triple");
+ synchronized (deleteTriple) {
+ deleteTriple.setLong(1, triple.getId());
+ deleteTriple.executeUpdate();
+ }
+ deletedStatementsLog.add(triple.getId());
+ }
+ } finally {
+ commitLock.unlock();
+ }
+ } else {
+ requireJDBCConnection();
+
+ PreparedStatement deleteTriple = getPreparedStatement("delete.triple");
+ synchronized (deleteTriple) {
+ deleteTriple.setLong(1, triple.getId());
+ deleteTriple.executeUpdate();
+ }
+ deletedStatementsLog.add(triple.getId());
+
+
+ }
+ removeCachedTriple(triple);
+ }
+ }
+
+ return null;
+ }
+ });
- removeCachedTriple(triple);
- // make sure the triple is marked as deleted in case some service still holds a reference
- triple.setDeleted(true);
- triple.setDeletedAt(new Date());
}
/**
@@ -994,20 +1269,20 @@ public class KiWiConnection {
requireJDBCConnection();
+ // make sure the triple is not marked as deleted in case some service still holds a reference
+ triple.setDeleted(false);
+ triple.setDeletedAt(null);
+
+
synchronized (triple) {
if(!triple.isDeleted()) {
log.warn("attemting to undelete triple that was not deleted: {}",triple);
- return;
}
PreparedStatement undeleteTriple = getPreparedStatement("undelete.triple");
undeleteTriple.setLong(1, triple.getId());
undeleteTriple.executeUpdate();
- // make sure the triple is marked as deleted in case some service still holds a reference
- triple.setDeleted(false);
- triple.setDeletedAt(null);
-
cacheTriple(triple);
}
@@ -1015,30 +1290,85 @@ public class KiWiConnection {
/**
- * Remove from the database all triples that have been marked as deleted and are not referenced by any other
- * entity.
+ * List all contexts used in this triple store. See query.contexts .
+ * @return
+ * @throws SQLException
*/
- public void cleanupTriples() {
- throw new UnsupportedOperationException("garbage collection of triples is not yet supported!");
- }
+ public CloseableIteration<KiWiResource, SQLException> listContexts() throws SQLException {
+ requireJDBCConnection();
+
+ PreparedStatement queryContexts = getPreparedStatement("query.contexts");
+
+ final ResultSet result = queryContexts.executeQuery();
+ if(tripleBatch != null && tripleBatch.size() > 0) {
+ return new DistinctIteration<KiWiResource, SQLException>(
+ new UnionIteration<KiWiResource, SQLException>(
+ new ConvertingIteration<Resource,KiWiResource,SQLException>(new IteratorIteration<Resource, SQLException>(tripleBatch.listContextIDs().iterator())) {
+ @Override
+ protected KiWiResource convert(Resource sourceObject) throws SQLException {
+ return (KiWiResource)sourceObject;
+ }
+ },
+ new ResultSetIteration<KiWiResource>(result, new ResultTransformerFunction<KiWiResource>() {
+ @Override
+ public KiWiResource apply(ResultSet row) throws SQLException {
+ return (KiWiResource)loadNodeById(result.getLong("context"));
+ }
+ })
+ )
+ );
+
+
+ } else {
+ return new ResultSetIteration<KiWiResource>(result, new ResultTransformerFunction<KiWiResource>() {
+ @Override
+ public KiWiResource apply(ResultSet row) throws SQLException {
+ return (KiWiResource)loadNodeById(result.getLong("context"));
+ }
+ });
+ }
+
+ }
/**
* List all contexts used in this triple store. See query.contexts .
* @return
* @throws SQLException
*/
- public CloseableIteration<KiWiResource, SQLException> listContexts() throws SQLException {
+ public CloseableIteration<KiWiResource, SQLException> listResources() throws SQLException {
requireJDBCConnection();
- PreparedStatement queryContexts = getPreparedStatement("query.contexts");
+ PreparedStatement queryContexts = getPreparedStatement("query.resources");
final ResultSet result = queryContexts.executeQuery();
return new ResultSetIteration<KiWiResource>(result, new ResultTransformerFunction<KiWiResource>() {
@Override
public KiWiResource apply(ResultSet row) throws SQLException {
- return (KiWiResource)loadNodeById(result.getLong("context"));
+ return (KiWiResource)constructNodeFromDatabase(row);
+ }
+ });
+
+ }
+
+ /**
+ * List all contexts used in this triple store. See query.contexts .
+ * @return
+ * @throws SQLException
+ */
+ public CloseableIteration<KiWiUriResource, SQLException> listResources(String prefix) throws SQLException {
+ requireJDBCConnection();
+
+ PreparedStatement queryContexts = getPreparedStatement("query.resources_prefix");
+ queryContexts.setString(1, prefix+"%");
+
+ final ResultSet result = queryContexts.executeQuery();
+
+ return new ResultSetIteration<KiWiUriResource>(result, new ResultTransformerFunction<KiWiUriResource>() {
+ @Override
+ public KiWiUriResource apply(ResultSet row) throws SQLException {
+ return (KiWiUriResource)constructNodeFromDatabase(row);
}
});
@@ -1078,20 +1408,47 @@ public class KiWiConnection {
* @param object the object to query for, or null for a wildcard query
* @param context the context to query for, or null for a wildcard query
* @param inferred if true, the result will also contain triples inferred by the reasoner, if false not
+ * @param wildcardContext if true, a null context will be interpreted as a wildcard, if false, a null context will be interpreted as "no context"
* @return a new RepositoryResult with a direct connection to the database; the result should be properly closed
* by the caller
*/
- public RepositoryResult<Statement> listTriples(KiWiResource subject, KiWiUriResource predicate, KiWiNode object, KiWiResource context, boolean inferred) throws SQLException {
-
- return new RepositoryResult<Statement>(
- new ExceptionConvertingIteration<Statement, RepositoryException>(listTriplesInternal(subject,predicate,object,context,inferred)) {
- @Override
- protected RepositoryException convert(Exception e) {
- return new RepositoryException("database error while iterating over result set",e);
+ public RepositoryResult<Statement> listTriples(final KiWiResource subject, final KiWiUriResource predicate, final KiWiNode object, final KiWiResource context, final boolean inferred, final boolean wildcardContext) throws SQLException {
+
+
+ if(tripleBatch != null && tripleBatch.size() > 0) {
+ synchronized (tripleBatch) {
+ return new RepositoryResult<Statement>(
+ new ExceptionConvertingIteration<Statement, RepositoryException>(
+ new UnionIteration<Statement, SQLException>(
+ new IteratorIteration<Statement, SQLException>(tripleBatch.listTriples(subject,predicate,object,context, wildcardContext).iterator()),
+ new DelayedIteration<Statement, SQLException>() {
+ @Override
+ protected Iteration<? extends Statement, ? extends SQLException> createIteration() throws SQLException {
+ return listTriplesInternal(subject,predicate,object,context,inferred, wildcardContext);
+ }
+ }
+
+ )
+ ) {
+ @Override
+ protected RepositoryException convert(Exception e) {
+ return new RepositoryException("database error while iterating over result set",e);
+ }
+ }
+
+ );
+ }
+ } else {
+ return new RepositoryResult<Statement>(
+ new ExceptionConvertingIteration<Statement, RepositoryException>(listTriplesInternal(subject,predicate,object,context,inferred, wildcardContext)) {
+ @Override
+ protected RepositoryException convert(Exception e) {
+ return new RepositoryException("database error while iterating over result set",e);
+ }
}
- }
- );
+ );
+ }
}
/**
@@ -1103,10 +1460,11 @@ public class KiWiConnection {
* @param object the object to query for, or null for a wildcard query
* @param context the context to query for, or null for a wildcard query
* @param inferred if true, the result will also contain triples inferred by the reasoner, if false not
+ * @param wildcardContext if true, a null context will be interpreted as a wildcard, if false, a null context will be interpreted as "no context"
* @return a ClosableIteration that wraps the database ResultSet; needs to be closed explicitly by the caller
* @throws SQLException
*/
- private CloseableIteration<Statement, SQLException> listTriplesInternal(KiWiResource subject, KiWiUriResource predicate, KiWiNode object, KiWiResource context, boolean inferred) throws SQLException {
+ private CloseableIteration<Statement, SQLException> listTriplesInternal(KiWiResource subject, KiWiUriResource predicate, KiWiNode object, KiWiResource context, boolean inferred, final boolean wildcardContext) throws SQLException {
// if one of the database ids is null, there will not be any database results, so we can return an empty result
if(subject != null && subject.getId() == null) {
return new EmptyIteration<Statement, SQLException>();
@@ -1126,12 +1484,16 @@ public class KiWiConnection {
// otherwise we need to create an appropriate SQL query and execute it, the repository result will be read-only
// and only allow forward iteration, so we can limit the query using the respective flags
PreparedStatement query = connection.prepareStatement(
- constructTripleQuery(subject,predicate,object,context,inferred),
+ constructTripleQuery(subject,predicate,object,context,inferred, wildcardContext),
ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_READ_ONLY
);
query.clearParameters();
+ if(persistence.getDialect().isCursorSupported()) {
+ query.setFetchSize(persistence.getConfiguration().getCursorSize());
+ }
+
// set query parameters
int position = 1;
if(subject != null) {
@@ -1149,7 +1511,6 @@ public class KiWiConnection {
final ResultSet result = query.executeQuery();
-
return new ResultSetIteration<Statement>(result, true, new ResultTransformerFunction<Statement>() {
@Override
public Statement apply(ResultSet row) throws SQLException {
@@ -1168,7 +1529,7 @@ public class KiWiConnection {
* @param inferred if true, the result will also contain triples inferred by the reasoner, if false not
* @return an SQL query string representing the triple pattern
*/
- protected String constructTripleQuery(KiWiResource subject, KiWiUriResource predicate, KiWiNode object, KiWiResource context, boolean inferred) {
+ protected String constructTripleQuery(KiWiResource subject, KiWiUriResource predicate, KiWiNode object, KiWiResource context, boolean inferred, boolean wildcardContext) {
StringBuilder builder = new StringBuilder();
builder.append("SELECT id,subject,predicate,object,context,deleted,inferred,creator,createdAt,deletedAt FROM triples WHERE deleted = false");
if(subject != null) {
@@ -1182,6 +1543,8 @@ public class KiWiConnection {
}
if(context != null) {
builder.append(" AND context = ?");
+ } else if(!wildcardContext) {
+ builder.append(" AND context IS NULL");
}
if(!inferred) {
builder.append(" AND inferred = false");
@@ -1244,7 +1607,7 @@ public class KiWiConnection {
if(row.getLong("ltype") != 0) {
result.setType((KiWiUriResource) loadNodeById(row.getLong("ltype")));
} else {
- log.warn("Loaded literal without type: '{}' (id:{}).", result.getContent(), result.getId());
+ log.debug("Loaded literal without type: '{}' (id:{}).", result.getContent(), result.getId());
}
cacheNode(result);
@@ -1340,7 +1703,7 @@ public class KiWiConnection {
result.setCreator((KiWiResource)loadNodeById(row.getLong("creator")));
}
result.setDeleted(row.getBoolean("deleted"));
- result.setInferred(row.getBoolean("deleted"));
+ result.setInferred(row.getBoolean("inferred"));
result.setCreated(new Date(row.getTimestamp("createdAt").getTime()));
try {
if(row.getDate("deletedAt") != null) {
@@ -1359,10 +1722,15 @@ public class KiWiConnection {
protected static Locale getLocale(String language) {
Locale locale = localeMap.get(language);
- if(locale == null) {
- locale = LocaleUtils.toLocale(language);
- localeMap.put(language,locale);
-
+ if(locale == null && language != null) {
+ try {
+ Locale.Builder builder = new Locale.Builder();
+ builder.setLanguageTag(language);
+ locale = builder.build();
+ localeMap.put(language, locale);
+ } catch (IllformedLocaleException ex) {
+ throw new IllegalArgumentException("Language was not a valid BCP47 language: " + language, ex);
+ }
}
return locale;
}
@@ -1379,11 +1747,14 @@ public class KiWiConnection {
requireJDBCConnection();
PreparedStatement statement = statementCache.get(key);
- if(statement == null) {
+ if(statement == null || statement.isClosed()) {
statement = connection.prepareStatement(dialect.getStatement(key));
statementCache.put(key,statement);
}
statement.clearParameters();
+ if(persistence.getDialect().isCursorSupported()) {
+ statement.setFetchSize(persistence.getConfiguration().getCursorSize());
+ }
return statement;
}
@@ -1396,26 +1767,30 @@ public class KiWiConnection {
* @throws SQLException
*/
public long getNextSequence(String sequenceName) throws SQLException {
- requireJDBCConnection();
+ if(batchCommit && persistence.getConfiguration().isMemorySequences()) {
+ return persistence.incrementAndGetMemorySequence(sequenceName);
+ } else {
+ requireJDBCConnection();
- // retrieve a new node id and set it in the node object
+ // retrieve a new node id and set it in the node object
- // if there is a preparation needed to update the transaction, run it first
- if(dialect.hasStatement(sequenceName+".prep")) {
- PreparedStatement prepNodeId = getPreparedStatement(sequenceName+".prep");
- prepNodeId.executeUpdate();
- }
+ // if there is a preparation needed to update the transaction, run it first
+ if(dialect.hasStatement(sequenceName+".prep")) {
+ PreparedStatement prepNodeId = getPreparedStatement(sequenceName+".prep");
+ prepNodeId.executeUpdate();
+ }
- PreparedStatement queryNodeId = getPreparedStatement(sequenceName);
- ResultSet resultNodeId = queryNodeId.executeQuery();
- try {
- if(resultNodeId.next()) {
- return resultNodeId.getLong(1);
- } else {
- throw new SQLException("the sequence did not return a new value");
+ PreparedStatement queryNodeId = getPreparedStatement(sequenceName);
+ ResultSet resultNodeId = queryNodeId.executeQuery();
+ try {
+ if(resultNodeId.next()) {
+ return resultNodeId.getLong(1);
+ } else {
+ throw new SQLException("the sequence did not return a new value");
+ }
+ } finally {
+ resultNodeId.close();
}
- } finally {
- resultNodeId.close();
}
}
@@ -1423,7 +1798,7 @@ public class KiWiConnection {
private void cacheNode(KiWiNode node) {
if(node.getId() != null) {
- nodeCache.put(new Element(node.getId(),node));
+ nodeCache.put(new Element(node.getId(), node));
}
if(node instanceof KiWiUriResource) {
uriCache.put(new Element(node.stringValue(), node));
@@ -1553,6 +1928,47 @@ public class KiWiConnection {
}
/**
+ * Return true if batched commits are enabled. Batched commits will try to group database operations and
+ * keep a memory log while storing triples. This can considerably improve the database performance.
+ * @return
+ */
+ public boolean isBatchCommit() {
+ return batchCommit;
+ }
+
+ /**
+ * Enabled batched commits. Batched commits will try to group database operations and
+ * keep a memory log while storing triples. This can considerably improve the database performance.
+ * @return
+ */
+ public void setBatchCommit(boolean batchCommit) {
+ if(dialect.isBatchSupported()) {
+ this.batchCommit = batchCommit;
+ } else {
+ log.warn("batch commits are not supported by this database dialect");
+ }
+ }
+
+
+ /**
+ * Return the size of a batch for batched commits. Batched commits will try to group database operations and
+ * keep a memory log while storing triples. This can considerably improve the database performance.
+ * @return
+ */
+ public int getBatchSize() {
+ return batchSize;
+ }
+
+ /**
+ * Set the size of a batch for batched commits. Batched commits will try to group database operations and
+ * keep a memory log while storing triples. This can considerably improve the database performance.
+ * @param batchSize
+ */
+ public void setBatchSize(int batchSize) {
+ this.batchSize = batchSize;
+ }
+
+ /**
* Makes all changes made since the previous
* commit/rollback permanent and releases any database locks
* currently held by this <code>Connection</code> object.
@@ -1565,10 +1981,68 @@ public class KiWiConnection {
* <code>Connection</code> object is in auto-commit mode
* @see #setAutoCommit
*/
- public void commit() throws SQLException {
- if(connection != null) {
- connection.commit();
+ public synchronized void commit() throws SQLException {
+ numberOfCommits++;
+
+ RetryExecution execution = new RetryExecution("COMMIT");
+ execution.execute(connection, new RetryCommand<Void>() {
+ @Override
+ public Void run() throws SQLException {
+ if(persistence.getConfiguration().isCommitSequencesOnCommit() || numberOfCommits % 100 == 0) {
+ commitMemorySequences();
+ }
+
+
+ if(tripleBatch != null && tripleBatch.size() > 0) {
+ flushBatch();
+ }
+
+
+ deletedStatementsLog.clear();
+
+ if(connection != null) {
+ connection.commit();
+ }
+
+ return null;
+ }
+ });
+ }
+
+ /**
+ * Store the values of all memory sequences back into the database. Should be called at least on repository shutdown
+ * but possibly even when a transaction commits.
+ */
+ public void commitMemorySequences() throws SQLException {
+ if(persistence.getMemorySequences() != null) {
+ synchronized (persistence.getMemorySequences()) {
+ requireJDBCConnection();
+
+ Set<String> updated = persistence.getSequencesUpdated();
+ persistence.setSequencesUpdated(new HashSet<String>());
+
+ try {
+ for(Map.Entry<String,Long> entry : persistence.getMemorySequences().asMap().entrySet()) {
+ if( updated.contains(entry.getKey()) && entry.getValue() > 0) {
+ PreparedStatement updateSequence = getPreparedStatement(entry.getKey()+".set");
+ updateSequence.setLong(1, entry.getValue());
+ if(updateSequence.execute()) {
+ updateSequence.getResultSet().close();
+ } else {
+ updateSequence.getUpdateCount();
+ }
+ }
+ }
+ } catch(SQLException ex) {
+ // MySQL deadlock state, in this case we retry anyways
+ if(!"40001".equals(ex.getSQLState())) {
+ log.error("SQL exception:",ex);
+ }
+ throw ex;
+ }
+ }
}
+
}
/**
@@ -1584,6 +2058,15 @@ public class KiWiConnection {
* @see #setAutoCommit
*/
public void rollback() throws SQLException {
+ if(tripleBatch != null && tripleBatch.size() > 0) {
+ synchronized (tripleBatch) {
+ for(KiWiTriple triple : tripleBatch) {
+ triple.setId(null);
+ }
+ tripleBatch.clear();
+ }
+ }
+ deletedStatementsLog.clear();
if(connection != null && !connection.isClosed()) {
connection.rollback();
}
@@ -1643,7 +2126,192 @@ public class KiWiConnection {
log.debug("database system does not allow closing statements");
}
- connection.close();
+ persistence.releaseJDBCConnection(connection);
}
}
+
+
+ int retry = 0;
+
+ public synchronized void flushBatch() throws SQLException {
+ if(batchCommit && tripleBatch != null) {
+ requireJDBCConnection();
+
+ commitLock.lock();
+ try {
+ if(persistence.getValueFactory() != null) {
+ persistence.getValueFactory().flushBatch();
+ }
+
+
+ RetryExecution execution = new RetryExecution("FLUSH BATCH");
+ execution.setUseSavepoint(true);
+ execution.execute(connection, new RetryCommand<Void>() {
+ @Override
+ public Void run() throws SQLException {
+ PreparedStatement insertTriple = getPreparedStatement("store.triple");
+ insertTriple.clearParameters();
+ insertTriple.clearBatch();
+
+ synchronized (tripleBatch) {
+ for(KiWiTriple triple : tripleBatch) {
+ // if the triple has been marked as deleted, this can only have been done by another connection
+ // in this case the triple id is no longer usable (might result in key conflicts), so we set it to
+ // a new id
+ if(triple.isDeleted()) {
+ triple.setId(getNextSequence("seq.triples"));
+ triple.setDeleted(false);
+ triple.setDeletedAt(null);
+ }
+
+ // retrieve a new triple ID and set it in the object
+ if(triple.getId() == null) {
+ triple.setId(getNextSequence("seq.triples"));
+ }
+
+ insertTriple.setLong(1,triple.getId());
+ insertTriple.setLong(2,triple.getSubject().getId());
+ insertTriple.setLong(3,triple.getPredicate().getId());
+ insertTriple.setLong(4,triple.getObject().getId());
+ if(triple.getContext() != null) {
+ insertTriple.setLong(5,triple.getContext().getId());
+ } else {
+ insertTriple.setNull(5, Types.BIGINT);
+ }
+ insertTriple.setBoolean(6,triple.isInferred());
+ insertTriple.setTimestamp(7, new Timestamp(triple.getCreated().getTime()));
+
+ insertTriple.addBatch();
+ }
+ }
+ insertTriple.executeBatch();
+
+ tripleBatch.clear();
+
+ return null;
+ }
+ });
+
+ } finally {
+ commitLock.unlock();
+ }
+
+ }
+
+ }
+
+
+ protected static interface RetryCommand<T> {
+
+ public T run() throws SQLException;
+ }
+
+ /**
+ * A generic implementation of an SQL command that might fail (e.g. because of a timeout or concurrency situation)
+ * and should be retried several times before giving up completely.
+ *
+ */
+ protected static class RetryExecution<T> {
+
+ // counter for current number of retries
+ private int retries = 0;
+
+ // how often to reattempt the operation
+ private int maxRetries = 10;
+
+ // how long to wait before retrying
+ private long retryInterval = 1000;
+
+ // use an SQL savepoint and roll back in case a retry is needed?
+ private boolean useSavepoint = false;
+
+ private String name;
+
+ // if non-empty: only retry on the SQL states contained in this set
+ private Set<String> sqlStates;
+
+ public RetryExecution(String name) {
+ this.name = name;
+ this.sqlStates = new HashSet<>();
+ }
+
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public int getMaxRetries() {
+ return maxRetries;
+ }
+
+ public void setMaxRetries(int maxRetries) {
+ this.maxRetries = maxRetries;
+ }
+
+ public long getRetryInterval() {
+ return retryInterval;
+ }
+
+ public void setRetryInterval(long retryInterval) {
+ this.retryInterval = retryInterval;
+ }
+
+ public boolean isUseSavepoint() {
+ return useSavepoint;
+ }
+
+ public void setUseSavepoint(boolean useSavepoint) {
+ this.useSavepoint = useSavepoint;
+ }
+
+ public Set<String> getSqlStates() {
+ return sqlStates;
+ }
+
+ public T execute(Connection connection, RetryCommand<T> command) throws SQLException {
+ Savepoint savepoint = null;
+ if(useSavepoint) {
+ savepoint = connection.setSavepoint();
+ }
+ try {
+ T result = command.run();
+
+ if(useSavepoint && savepoint != null) {
+ connection.releaseSavepoint(savepoint);
+ }
+
+ return result;
+ } catch (SQLException ex) {
+ if(retries < maxRetries && (sqlStates.size() == 0 || sqlStates.contains(ex.getSQLState()))) {
+ if(useSavepoint && savepoint != null) {
+ connection.rollback(savepoint);
+ }
+ Random rnd = new Random();
+ long sleep = retryInterval - 250 + rnd.nextInt(500);
+ log.warn("{}: temporary conflict, retrying in {} ms ... (thread={}, retry={})", name, sleep, Thread.currentThread().getName(), retries);
+ try {
+ Thread.sleep(sleep);
+ } catch (InterruptedException e) {}
+ retries++;
+ T result = execute(connection, command);
+ retries--;
+
+ return result;
+ } else {
+ log.error("{}: temporary conflict could not be solved! (error: {})", name, ex.getMessage());
+
+ log.debug("main exception:",ex);
+ log.debug("next exception:",ex.getNextException());
+ throw ex;
+ }
+ }
+
+
+ }
+
+ }
}
http://git-wip-us.apache.org/repos/asf/marmotta/blob/582abb5b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiDialect.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiDialect.java b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiDialect.java
index 9cd130e..cbf6fbf 100644
--- a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiDialect.java
+++ b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiDialect.java
@@ -17,14 +17,13 @@
*/
package org.apache.marmotta.kiwi.persistence;
+import com.google.common.collect.ImmutableSet;
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URL;
-import java.util.Enumeration;
-import java.util.Properties;
-import java.util.Set;
+import java.util.*;
/**
* A dialect provides the SQL statements necessary to access the different types of database systems. Each
@@ -36,7 +35,7 @@ public abstract class KiWiDialect {
private static Logger log = LoggerFactory.getLogger(KiWiDialect.class);
- private final static int VERSION = 1;
+ private final static int VERSION = 2;
private Properties statements;
@@ -69,6 +68,12 @@ public abstract class KiWiDialect {
/**
+ * Return true if batched commits are supported by this dialect.
+ * @return
+ */
+ public abstract boolean isBatchSupported();
+
+ /**
* Return the contents of the SQL create script used for initialising an empty database
* @return
*/
@@ -112,7 +117,7 @@ public abstract class KiWiDialect {
StringBuilder builder = new StringBuilder();
for(int i = oldVersion+1; i <= VERSION; i++ ) {
try {
- String script = String.format("upgrade_"+name+"_%02d_%02d.sql",i-1,i);
+ String script = String.format("upgrade_"+name+"_%03d_%03d.sql",i-1,i);
builder.append(IOUtils.toString(this.getClass().getResourceAsStream(script)));
} catch (Exception e) {
@@ -152,4 +157,81 @@ public abstract class KiWiDialect {
public Set<String> getStatementIdentifiers() {
return statements.stringPropertyNames();
}
+
+ /**
+ * Return the names of all sequences that have been configured in the system, i.e. all statements starting with "seq."
+ * @return
+ */
+ public Set<String> listSequences(String scriptName) {
+ // quick hack for current modules, fix later!
+ if("base".equals(scriptName)) {
+ return ImmutableSet.of("seq.nodes", "seq.triples", "seq.namespaces");
+ } else if("reasoner".equals(scriptName)) {
+ return ImmutableSet.of("seq.rules", "seq.justifications", "seq.programs");
+ } else if("versioning".equals(scriptName)) {
+ return ImmutableSet.of("seq.versions");
+ } else if("ldcache".equals(scriptName)) {
+ return ImmutableSet.of("seq.ldcache");
+ } else {
+ return Collections.EMPTY_SET;
+ }
+
+
+ /*
+ Set<String> names = new HashSet<String>();
+ Enumeration e = statements.propertyNames();
+ while(e.hasMoreElements()) {
+ String[] keys = e.nextElement().toString().split("\\.");
+ if(keys[0].equals("seq")) {
+ names.add(keys[0] + "." + keys[1]);
+ }
+ }
+ return names;
+ */
+ }
+
+ /**
+ * Return the database specific operator for matching a text against a regular expression.
+ *
+ * @param text
+ * @param pattern
+ * @return
+ */
+ public abstract String getRegexp(String text, String pattern);
+
+
+ /**
+ * Return the database specific case insensitive like comparison, e.g. ILIKE in Postgres.
+ *
+ * @param text
+ * @param pattern
+ * @return
+ */
+ public abstract String getILike(String text, String pattern);
+
+
+ /**
+ * Get the database specific string concatenation function for the (variable number of) arguments.
+ *
+ * @param args
+ * @return
+ */
+ public abstract String getConcat(String... args);
+
+
+ /**
+ * Get the query string that can be used for validating that a JDBC connection to this database is still valid.
+ * Typically, this should be an inexpensive operation like "SELECT 1",
+ * @return
+ */
+ public abstract String getValidationQuery();
+
+
+ /**
+ * Return true in case the database system supports using cursors for queries over large data tables.
+ * @return
+ */
+ public boolean isCursorSupported() {
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/marmotta/blob/582abb5b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiGarbageCollector.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiGarbageCollector.java b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiGarbageCollector.java
index f832dd8..d29104b 100644
--- a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiGarbageCollector.java
+++ b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiGarbageCollector.java
@@ -22,10 +22,13 @@ import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.PreparedStatement;
+import java.sql.ResultSet;
import java.sql.SQLException;
+import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
/**
* This class implements a garbage collector for the database that cleans up deleted triples and nodes when they
@@ -42,7 +45,7 @@ public class KiWiGarbageCollector extends Thread {
private Set<TableDependency> tripleTableDependencies;
private Set<TableDependency> nodeTableDependencies;
- private long interval = 60 * 60 * 1000;
+ private long interval = TimeUnit.MILLISECONDS.convert(24L, TimeUnit.HOURS);
private long round = 0;
@@ -98,11 +101,107 @@ public class KiWiGarbageCollector extends Thread {
nodeTableDependencies.add(new TableDependency(tableName,columnName));
}
+ protected boolean checkConsistency() throws SQLException {
+ boolean consistent = true;
- private int garbageCollect() throws SQLException {
+ String checkNodeDuplicatesQuery = "SELECT svalue, ntype, count(id) FROM nodes group by svalue, ntype having count(id) > 1";
+
+ try(Connection con = persistence.getJDBCConnection()) {
+ PreparedStatement checkNodeDuplicatesStatement = con.prepareStatement(checkNodeDuplicatesQuery);
+
+ try(ResultSet result = checkNodeDuplicatesStatement.executeQuery()) {
+ if(result.next()) {
+ log.warn("DATABASE INCONSISTENCY: duplicate node entries found, please try to fix the consistency with fixConsistency()!");
+ do {
+ if(result.getString("ntype").equals("uri")) {
+ log.warn(" - inconsistent resource: {}", result.getString("svalue"));
+ }
+ } while(result.next());
+
+ consistent = false;
+ }
+ }
+
+ }
+
+ if(!consistent) {
+ log.warn("DATABASE INCONSISTENCY: attempting to auto-fix inconsistencies where possible");
+ try {
+ fixConsistency();
+ } catch (SQLException ex) {
+ log.error("DATABASE INCONSISTENCY: auto-fixing inconsistencies failed ({})", ex.getMessage());
+ }
+ }
+
+ return consistent;
+ }
+
+
+ protected void fixConsistency() throws SQLException {
+ String checkNodeDuplicatesQuery = "SELECT svalue, ntype, count(id), max(id) FROM nodes group by svalue, ntype having count(id) > 1";
+ String getNodeIdsQuery = "SELECT id FROM nodes WHERE svalue = ? AND ntype = ? AND id != ?";
+
+ try(Connection con = persistence.getJDBCConnection(true)) {
+ PreparedStatement checkNodeDuplicatesStatement = con.prepareStatement(persistence.getDialect().getStatement("gc.check_consistency"));
+ PreparedStatement getNodeIdsStatement = con.prepareStatement(persistence.getDialect().getStatement("gc.list_node_ids"));
+
+ ResultSet result = checkNodeDuplicatesStatement.executeQuery();
+ while(result.next()) {
+ if(result.getString("ntype").equals("uri")) {
+ log.warn("DATABASE INCONSISTENCY: attempting to fix references for resource {}", result.getString("svalue"));
+ } else {
+ log.warn("DATABASE INCONSISTENCY: attempting to fix references for literal or anonymous node {}", result.getString("svalue"));
+ }
+
+ long latest_id = result.getLong(4);
+
+ // first we collect all ids of nodes that have the same svalue and ntype
+ getNodeIdsStatement.clearParameters();
+ getNodeIdsStatement.setString(1, result.getString("svalue"));
+ getNodeIdsStatement.setString(2,result.getString("ntype"));
+ getNodeIdsStatement.setLong(3, latest_id);
+
+ ArrayList<Long> ids = new ArrayList<>();
+ try(ResultSet idResult = getNodeIdsStatement.executeQuery()) {
+ while(idResult.next()) {
+ ids.add(idResult.getLong(1));
+ }
+ }
+ getNodeIdsStatement.close();
+
+ // then we "fix" the triples table by making sure that all subjects, predicates, objects and contexts point to
+ // the latest version only; we use the nodes dependency table for this purpose
+ for(TableDependency dep : nodeTableDependencies) {
+ String fixNodeIdsQuery = "UPDATE " + dep.table + " SET " + dep.column + " = " + latest_id + " WHERE " + dep.column + " = ?";
+ PreparedStatement fixNodeIdsStatement = con.prepareStatement(fixNodeIdsQuery);
+ for(Long id : ids) {
+ fixNodeIdsStatement.setLong(1, id);
+ fixNodeIdsStatement.addBatch();
+ }
+ fixNodeIdsStatement.executeBatch();
+ }
+
+ // finally we clean up all now unused node ids
+ String deleteDuplicatesQuery = "DELETE FROM nodes WHERE id = ?";
+ PreparedStatement deleteDuplicatesStatement = con.prepareStatement(deleteDuplicatesQuery);
+ for(Long id : ids) {
+ deleteDuplicatesStatement.setLong(1, id);
+ deleteDuplicatesStatement.addBatch();
+ }
+ deleteDuplicatesStatement.executeBatch();
+ deleteDuplicatesStatement.close();
+ }
+ checkNodeDuplicatesStatement.close();
+ }
+ }
+
+
+ protected int garbageCollect() throws SQLException {
round++;
- Connection con = persistence.getJDBCConnection();
+ long start = System.currentTimeMillis();
+
+ Connection con = persistence.getJDBCConnection(false);
try {
int count = 0;
@@ -120,9 +219,14 @@ public class KiWiGarbageCollector extends Thread {
}
// garbage collect nodes (only every 10th garbage collection, only makes sense when we previously deleted triples ...)
- // TODO: this is currently not working, because the nodes remain in the cache; we need to find a different solution ...
- //if(count > 0 && round % 10 == 1) {
- if(false) {
+ if(count > 0 && round % 10 == 1) {
+ // flush all nodes from the value factory first
+ if(persistence.getValueFactory() != null) {
+ persistence.getValueFactory().flushBatch();
+ }
+
+
+ // then delete all unconnected nodes
try {
String gcNodesQuery = buildGCNodesQuery();
PreparedStatement stmtGcNodes = con.prepareStatement(gcNodesQuery);
@@ -135,10 +239,11 @@ public class KiWiGarbageCollector extends Thread {
log.warn("SQL error while executing garbage collection on nodes table: {}", ex.getMessage());
}
}
+ log.info("... cleaned up {} entries (duration: {} ms)", count, (System.currentTimeMillis()-start));
return count;
} finally {
- con.close();
+ persistence.releaseJDBCConnection(con);
}
}
@@ -149,7 +254,6 @@ public class KiWiGarbageCollector extends Thread {
*
* @see #start()
* @see #stop()
- * @see #Thread(ThreadGroup, Runnable, String)
*/
@Override
public void run() {
@@ -160,11 +264,16 @@ public class KiWiGarbageCollector extends Thread {
while(!shutdown) {
// don't run immediately on startup
if(started) {
- long start = System.currentTimeMillis();
+ log.info("running database consistency checks ...");
+ try {
+ checkConsistency();
+ } catch (SQLException e) {
+ log.error("error while executing consistency checks: {}",e.getMessage());
+ }
+
log.info("running garbage collection ...");
try {
int count = garbageCollect();
- log.info("... cleaned up {} entries (duration: {} ms)", count, (System.currentTimeMillis()-start));
} catch (SQLException e) {
log.error("error while executing garbage collection: {}",e.getMessage());
}