You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@marmotta.apache.org by ss...@apache.org on 2013/08/05 16:02:58 UTC
git commit: trying further bug fixes
Updated Branches:
refs/heads/develop d3757e494 -> 8f612b0e4
trying further bug fixes
Project: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/commit/8f612b0e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/tree/8f612b0e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/diff/8f612b0e
Branch: refs/heads/develop
Commit: 8f612b0e438419c031f04b3fbf057e89825893ee
Parents: d3757e4
Author: Sebastian Schaffert <ss...@apache.org>
Authored: Mon Aug 5 16:02:52 2013 +0200
Committer: Sebastian Schaffert <ss...@apache.org>
Committed: Mon Aug 5 16:02:52 2013 +0200
----------------------------------------------------------------------
.../marmotta/commons/locking/ObjectLocks.java | 75 +++++++++
.../marmotta/commons/locking/StringLocks.java | 75 ---------
.../commons/sesame/model/LiteralKey.java | 43 +++++
.../marmotta/kiwi/config/KiWiConfiguration.java | 2 +-
.../kiwi/persistence/KiWiConnection.java | 24 ++-
.../kiwi/persistence/KiWiGarbageCollector.java | 1 -
.../kiwi/persistence/KiWiPersistence.java | 20 ++-
.../marmotta/kiwi/sail/KiWiValueFactory.java | 164 ++++++++++---------
.../persistence/mysql/create_base_tables.sql | 14 +-
.../marmotta/ldcache/services/LDCache.java | 6 +-
10 files changed, 255 insertions(+), 169 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/8f612b0e/commons/marmotta-commons/src/main/java/org/apache/marmotta/commons/locking/ObjectLocks.java
----------------------------------------------------------------------
diff --git a/commons/marmotta-commons/src/main/java/org/apache/marmotta/commons/locking/ObjectLocks.java b/commons/marmotta-commons/src/main/java/org/apache/marmotta/commons/locking/ObjectLocks.java
new file mode 100644
index 0000000..ebe4a1c
--- /dev/null
+++ b/commons/marmotta-commons/src/main/java/org/apache/marmotta/commons/locking/ObjectLocks.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.marmotta.commons.locking;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.util.concurrent.Monitor;
+
+import java.util.HashSet;
+
+/**
+ * An implementation of dynamic name-based locks that allows more fine-grained locking methods based on a string name.
+ *
+ * @author Sebastian Schaffert (sschaffert@apache.org)
+ */
+public class ObjectLocks {
+
+ private LoadingCache<Object,Monitor> stringLocks;
+
+ public ObjectLocks() {
+ stringLocks = CacheBuilder.newBuilder().build(new LockCacheLoader());
+ }
+
+
+ public void lock(Object name) {
+ Monitor lock;
+ synchronized (stringLocks) {
+ lock = stringLocks.getUnchecked(name);
+ }
+ lock.enter();
+ }
+
+ public void unlock(Object name) {
+ Monitor lock;
+ synchronized (stringLocks) {
+ lock = stringLocks.getUnchecked(name);
+ }
+ lock.leave();
+ }
+
+ public boolean tryLock(Object name) {
+ Monitor lock;
+ synchronized (stringLocks) {
+ lock = stringLocks.getUnchecked(name);
+ }
+ return lock.tryEnter();
+ }
+
+ /**
+ * A simple Guava cache loader implementation for generating object-based locks
+ */
+ private static class LockCacheLoader extends CacheLoader<Object,Monitor> {
+ @Override
+ public Monitor load(Object key) throws Exception {
+ return new Monitor();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/8f612b0e/commons/marmotta-commons/src/main/java/org/apache/marmotta/commons/locking/StringLocks.java
----------------------------------------------------------------------
diff --git a/commons/marmotta-commons/src/main/java/org/apache/marmotta/commons/locking/StringLocks.java b/commons/marmotta-commons/src/main/java/org/apache/marmotta/commons/locking/StringLocks.java
deleted file mode 100644
index 9a43f83..0000000
--- a/commons/marmotta-commons/src/main/java/org/apache/marmotta/commons/locking/StringLocks.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.marmotta.commons.locking;
-
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.util.concurrent.Monitor;
-
-import java.util.HashSet;
-
-/**
- * An implementation of dynamic name-based locks that allows more fine-grained locking methods based on a string name.
- *
- * @author Sebastian Schaffert (sschaffert@apache.org)
- */
-public class StringLocks {
-
- private LoadingCache<String,Monitor> stringLocks;
-
- public StringLocks() {
- stringLocks = CacheBuilder.newBuilder().weakKeys().build(new LockCacheLoader());
- }
-
-
- public void lock(String name) {
- Monitor lock;
- synchronized (stringLocks) {
- lock = stringLocks.getUnchecked(name);
- }
- lock.enter();
- }
-
- public void unlock(String name) {
- Monitor lock;
- synchronized (stringLocks) {
- lock = stringLocks.getUnchecked(name);
- }
- lock.leave();
- }
-
- public boolean tryLock(String name) {
- Monitor lock;
- synchronized (stringLocks) {
- lock = stringLocks.getUnchecked(name);
- }
- return lock.tryEnter();
- }
-
- /**
- * A simple Guava cache loader implementation for generating object-based locks
- */
- private static class LockCacheLoader extends CacheLoader<Object,Monitor> {
- @Override
- public Monitor load(Object key) throws Exception {
- return new Monitor();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/8f612b0e/commons/marmotta-commons/src/main/java/org/apache/marmotta/commons/sesame/model/LiteralKey.java
----------------------------------------------------------------------
diff --git a/commons/marmotta-commons/src/main/java/org/apache/marmotta/commons/sesame/model/LiteralKey.java b/commons/marmotta-commons/src/main/java/org/apache/marmotta/commons/sesame/model/LiteralKey.java
new file mode 100644
index 0000000..d921073
--- /dev/null
+++ b/commons/marmotta-commons/src/main/java/org/apache/marmotta/commons/sesame/model/LiteralKey.java
@@ -0,0 +1,43 @@
+package org.apache.marmotta.commons.sesame.model;
+
+/**
+ * Add file description here!
+ *
+ * @author Sebastian Schaffert (sschaffert@apache.org)
+ */
+public class LiteralKey {
+
+ private Object value;
+
+ private String type;
+
+ private String lang;
+
+ public LiteralKey(Object value, String type, String lang) {
+ this.value = value;
+ this.type = type;
+ this.lang = lang;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ LiteralKey that = (LiteralKey) o;
+
+ if (lang != null ? !lang.equals(that.lang) : that.lang != null) return false;
+ if (type != null ? !type.equals(that.type) : that.type != null) return false;
+ if (!value.equals(that.value)) return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = value.hashCode();
+ result = 31 * result + (type != null ? type.hashCode() : 0);
+ result = 31 * result + (lang != null ? lang.hashCode() : 0);
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/8f612b0e/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/config/KiWiConfiguration.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/config/KiWiConfiguration.java b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/config/KiWiConfiguration.java
index ba53de9..6767100 100644
--- a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/config/KiWiConfiguration.java
+++ b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/config/KiWiConfiguration.java
@@ -89,7 +89,7 @@ public class KiWiConfiguration {
private boolean memorySequences = true;
- private boolean commitSequencesOnCommit = false;
+ private boolean commitSequencesOnCommit = true;
public KiWiConfiguration(String name, String jdbcUrl, String dbUser, String dbPassword, KiWiDialect dialect) {
http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/8f612b0e/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java
index 52f46a2..1872edc 100644
--- a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java
+++ b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java
@@ -133,6 +133,7 @@ public class KiWiConnection {
// completely new addition to the triple store
private HashSet<Long> deletedStatementsLog;
+ private static long numberOfCommits = 0;
public KiWiConnection(KiWiPersistence persistence, KiWiDialect dialect, KiWiCacheManager cacheManager) throws SQLException {
this.cacheManager = cacheManager;
@@ -861,7 +862,7 @@ public class KiWiConnection {
- public synchronized long getNodeId() throws SQLException {
+ public long getNodeId() throws SQLException {
long result = getNextSequence("seq.nodes");
return result;
@@ -1900,10 +1901,18 @@ public class KiWiConnection {
* @see #setAutoCommit
*/
public void commit() throws SQLException {
- if(persistence.getConfiguration().isCommitSequencesOnCommit()) {
+ numberOfCommits++;
+
+ if(persistence.getConfiguration().isCommitSequencesOnCommit() || numberOfCommits % 100 == 0) {
commitMemorySequences();
}
+
+ if(tripleBatch != null && tripleBatch.size() > 0) {
+ flushBatch();
+ }
+
+
deletedStatementsLog.clear();
if(connection != null) {
@@ -1919,9 +1928,12 @@ public class KiWiConnection {
if(persistence.getMemorySequences() != null) {
requireJDBCConnection();
+ Set<String> updated = persistence.getSequencesUpdated();
+ persistence.setSequencesUpdated(new HashSet<String>());
+
try {
for(Map.Entry<String,Long> entry : persistence.getMemorySequences().asMap().entrySet()) {
- if( entry.getValue() > 0) {
+ if( updated.contains(entry.getKey()) && entry.getValue() > 0) {
PreparedStatement updateSequence = getPreparedStatement(entry.getKey()+".set");
updateSequence.setLong(1, entry.getValue());
if(updateSequence.execute()) {
@@ -1937,10 +1949,6 @@ public class KiWiConnection {
}
}
- if(tripleBatch != null && tripleBatch.size() > 0) {
- flushBatch();
- }
-
}
/**
@@ -2029,7 +2037,7 @@ public class KiWiConnection {
}
- public void flushBatch() throws SQLException {
+ public synchronized void flushBatch() throws SQLException {
if(batchCommit && tripleBatch != null) {
requireJDBCConnection();
http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/8f612b0e/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiGarbageCollector.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiGarbageCollector.java b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiGarbageCollector.java
index 5c69a39..d29104b 100644
--- a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiGarbageCollector.java
+++ b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiGarbageCollector.java
@@ -179,7 +179,6 @@ public class KiWiGarbageCollector extends Thread {
fixNodeIdsStatement.addBatch();
}
fixNodeIdsStatement.executeBatch();
- fixNodeIdsStatement.close();
}
// finally we clean up all now unused node ids
http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/8f612b0e/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiPersistence.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiPersistence.java b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiPersistence.java
index f487554..162de30 100644
--- a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiPersistence.java
+++ b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiPersistence.java
@@ -39,6 +39,7 @@ import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -95,6 +96,10 @@ public class KiWiPersistence {
*/
private boolean maintenance;
+ private boolean droppedDatabase = false;
+
+ // keep track which memory sequences have been updated and need to be written back
+ private Set<String> sequencesUpdated;
@Deprecated
public KiWiPersistence(String name, String jdbcUrl, String db_user, String db_password, KiWiDialect dialect) {
@@ -105,6 +110,7 @@ public class KiWiPersistence {
this.configuration = configuration;
this.maintenance = false;
this.sequencesLock = new ReentrantLock();
+ this.sequencesUpdated = new HashSet<>();
// init JDBC connection pool
initConnectionPool();
@@ -373,6 +379,8 @@ public class KiWiPersistence {
} catch(SQLException ex) {
log.error("SQL exception while acquiring database connection");
}
+
+ droppedDatabase = true;
}
/**
@@ -536,7 +544,7 @@ public class KiWiPersistence {
}
public void shutdown() {
- if(!configuration.isCommitSequencesOnCommit()) {
+ if(!droppedDatabase && !configuration.isCommitSequencesOnCommit()) {
log.info("storing in-memory sequences in database ...");
try {
KiWiConnection connection = getConnection();
@@ -585,6 +593,8 @@ public class KiWiPersistence {
}
public long incrementAndGetMemorySequence(String name) {
+ sequencesUpdated.add(name);
+
if(memorySequences != null) {
return memorySequences.incrementAndGet(name);
} else {
@@ -600,4 +610,12 @@ public class KiWiPersistence {
public boolean checkConsistency() throws SQLException {
return garbageCollector.checkConsistency();
}
+
+ public Set<String> getSequencesUpdated() {
+ return sequencesUpdated;
+ }
+
+ public void setSequencesUpdated(Set<String> sequencesUpdated) {
+ this.sequencesUpdated = sequencesUpdated;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/8f612b0e/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/sail/KiWiValueFactory.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/sail/KiWiValueFactory.java b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/sail/KiWiValueFactory.java
index 2d65db6..f3766d3 100644
--- a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/sail/KiWiValueFactory.java
+++ b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/sail/KiWiValueFactory.java
@@ -17,10 +17,11 @@
*/
package org.apache.marmotta.kiwi.sail;
-import com.google.common.util.concurrent.Monitor;
+import com.google.common.collect.Queues;
import org.apache.commons.lang3.LocaleUtils;
-import org.apache.marmotta.commons.locking.StringLocks;
+import org.apache.marmotta.commons.locking.ObjectLocks;
import org.apache.marmotta.commons.sesame.model.LiteralCommons;
+import org.apache.marmotta.commons.sesame.model.LiteralKey;
import org.apache.marmotta.commons.sesame.model.Namespaces;
import org.apache.marmotta.commons.util.DateUtils;
import org.apache.marmotta.kiwi.model.caching.IntArray;
@@ -34,8 +35,10 @@ import org.slf4j.LoggerFactory;
import javax.xml.datatype.XMLGregorianCalendar;
import java.sql.SQLException;
import java.util.*;
+import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* Add file description here!
@@ -67,8 +70,8 @@ public class KiWiValueFactory implements ValueFactory {
private KiWiStore store;
- private StringLocks resourceLocks;
- private StringLocks literalLocks;
+ private ObjectLocks resourceLocks;
+ private ObjectLocks literalLocks;
private String defaultContext;
@@ -84,21 +87,18 @@ public class KiWiValueFactory implements ValueFactory {
private Map<String,KiWiAnonResource> batchBNodeLookup;
private Map<String,KiWiLiteral> batchLiteralLookup;
- // this connection is kept open and never closed when releaseConnection is called; it will only be
- // used for generating sequence numbers for RDF nodes
- private KiWiConnection batchConnection;
+ private int poolSize = 4;
+ private int poolPosition = 0;
+
+ private ArrayList<KiWiConnection> pooledConnections;
+
+
+ private ReentrantReadWriteLock commitLock = new ReentrantReadWriteLock();
- private Monitor commitLock = new Monitor();
- private Monitor.Guard commitGuard = new Monitor.Guard(commitLock) {
- @Override
- public boolean isSatisfied() {
- return batchCommit && nodeBatch.size() > 0;
- }
- };
public KiWiValueFactory(KiWiStore store, String defaultContext) {
- resourceLocks = new StringLocks();
- literalLocks = new StringLocks();
+ resourceLocks = new ObjectLocks();
+ literalLocks = new ObjectLocks();
anonIdGenerator = new Random();
tripleRegistry = store.tripleRegistry;
@@ -115,21 +115,26 @@ public class KiWiValueFactory implements ValueFactory {
this.batchUriLookup = new ConcurrentHashMap<String,KiWiUriResource>();
this.batchBNodeLookup = new ConcurrentHashMap<String, KiWiAnonResource>();
this.batchLiteralLookup = new ConcurrentHashMap<String,KiWiLiteral>();
+
+ this.pooledConnections = new ArrayList<>(poolSize);
+ try {
+ for(int i = 0; i<poolSize ; i++) {
+ pooledConnections.add(store.getPersistence().getConnection());
+ }
+ } catch (SQLException e) {
+ log.error("error initialising value factory connection pool",e);
+ }
}
protected KiWiConnection aqcuireConnection() {
try {
if(batchCommit) {
- if(batchConnection == null) {
- batchConnection = store.getPersistence().getConnection();
- }
- return batchConnection;
+ return pooledConnections.get(poolPosition++ % poolSize);
} else {
- KiWiConnection connection = store.getPersistence().getConnection();
- return connection;
+ return store.getPersistence().getConnection();
}
} catch(SQLException ex) {
- log.error("could not acquire database connection",ex);
+ log.error("could not acquire database connection", ex);
throw new RuntimeException(ex);
}
}
@@ -164,6 +169,7 @@ public class KiWiValueFactory implements ValueFactory {
*/
@Override
public URI createURI(String uri) {
+ commitLock.readLock().lock();
resourceLocks.lock(uri);
@@ -185,18 +191,10 @@ public class KiWiValueFactory implements ValueFactory {
if(batchCommit) {
result.setId(connection.getNodeId());
- batchUriLookup.put(uri,result);
-
- commitLock.enter();
- try {
- nodeBatch.add(result);
-
- if(nodeBatch.size() >= batchSize) {
- flushBatch(connection);
- }
- } finally {
- commitLock.leave();
- }
+ batchUriLookup.put(uri, result);
+
+ nodeBatch.add(result);
+
} else {
connection.storeNode(result, false);
}
@@ -216,7 +214,18 @@ public class KiWiValueFactory implements ValueFactory {
}
} finally {
resourceLocks.unlock(uri);
+ commitLock.readLock().unlock();
+
+ try {
+ if(nodeBatch.size() >= batchSize) {
+ flushBatch();
+ }
+ } catch (SQLException e) {
+ log.error("database error, could not load URI resource",e);
+ throw new IllegalStateException("database error, could not load URI resource",e);
+ }
}
+
}
/**
@@ -246,6 +255,7 @@ public class KiWiValueFactory implements ValueFactory {
*/
@Override
public BNode createBNode(String nodeID) {
+ commitLock.readLock().lock();
resourceLocks.lock(nodeID);
try {
@@ -264,17 +274,9 @@ public class KiWiValueFactory implements ValueFactory {
result = new KiWiAnonResource(nodeID);
if(batchCommit) {
- commitLock.enter();
- try {
- result.setId(connection.getNodeId());
- nodeBatch.add(result);
- batchBNodeLookup.put(nodeID,result);
- if(nodeBatch.size() >= batchSize) {
- flushBatch(connection);
- }
- } finally {
- commitLock.leave();
- }
+ result.setId(connection.getNodeId());
+ nodeBatch.add(result);
+ batchBNodeLookup.put(nodeID,result);
} else {
connection.storeNode(result, false);
}
@@ -293,6 +295,16 @@ public class KiWiValueFactory implements ValueFactory {
}
} finally {
resourceLocks.unlock(nodeID);
+ commitLock.readLock().unlock();
+
+ try {
+ if(nodeBatch.size() >= batchSize) {
+ flushBatch();
+ }
+ } catch (SQLException e) {
+ log.error("database error, could not load URI resource",e);
+ throw new IllegalStateException("database error, could not load URI resource",e);
+ }
}
}
@@ -375,6 +387,7 @@ public class KiWiValueFactory implements ValueFactory {
* @return
*/
private <T> KiWiLiteral createLiteral(T value, String lang, String type) {
+ commitLock.readLock().lock();
final Locale locale;
if(lang != null) {
locale = LocaleUtils.toLocale(lang.replace("-","_"));
@@ -387,8 +400,9 @@ public class KiWiValueFactory implements ValueFactory {
type = LiteralCommons.getXSDType(value.getClass());
}
String key = LiteralCommons.createCacheKey(value.toString(),locale,type);
+ LiteralKey lkey = new LiteralKey(value,type,lang);
- literalLocks.lock(key);
+ literalLocks.lock(lkey);
try {
KiWiLiteral result = batchLiteralLookup.get(key);
@@ -480,19 +494,9 @@ public class KiWiValueFactory implements ValueFactory {
if(result.getId() == null) {
if(batchCommit) {
result.setId(connection.getNodeId());
- batchLiteralLookup.put(LiteralCommons.createCacheKey(value.toString(),locale,type), result);
-
- commitLock.enter();
- try {
- nodeBatch.add(result);
-
- if(nodeBatch.size() >= batchSize) {
- flushBatch(connection);
- }
+ batchLiteralLookup.put(key, result);
- } finally {
- commitLock.leave();
- }
+ nodeBatch.add(result);
} else {
connection.storeNode(result, false);
}
@@ -508,7 +512,17 @@ public class KiWiValueFactory implements ValueFactory {
}
}
} finally {
- literalLocks.unlock(key);
+ literalLocks.unlock(lkey);
+ commitLock.readLock().unlock();
+
+ try {
+ if(nodeBatch.size() >= batchSize) {
+ flushBatch();
+ }
+ } catch (SQLException e) {
+ log.error("database error, could not load URI resource",e);
+ throw new IllegalStateException("database error, could not load URI resource",e);
+ }
}
}
@@ -748,34 +762,38 @@ public class KiWiValueFactory implements ValueFactory {
* the node batch.
*/
public void flushBatch(KiWiConnection con) throws SQLException {
- if(commitLock.enterIf(commitGuard)) {
- try {
+ commitLock.writeLock().lock();
+ try {
+ if(batchCommit && nodeBatch.size() > 0) {
+ List<KiWiNode> processed = this.nodeBatch;
+ this.nodeBatch = Collections.synchronizedList(new ArrayList<KiWiNode>(batchSize));
+
con.startNodeBatch();
- for(KiWiNode n : nodeBatch) {
+ for(KiWiNode n : processed) {
con.storeNode(n,true);
}
- nodeBatch.clear();
-
batchLiteralLookup.clear();
batchUriLookup.clear();
batchBNodeLookup.clear();
con.commitNodeBatch();
- } finally {
- commitLock.leave();
}
+ } finally {
+ commitLock.writeLock().unlock();
}
}
public void close() {
- try {
- if(batchConnection != null && !batchConnection.isClosed()) {
- batchConnection.commit();
- batchConnection.close();
+ for(KiWiConnection con : pooledConnections) {
+ try {
+ if(!con.isClosed()) {
+ con.commit();
+ con.close();
+ }
+ } catch (SQLException e) {
+ log.warn("could not close value factory connection: {}",e.getMessage());
}
- } catch (SQLException e) {
- log.warn("could not close value factory connection: {}",e.getMessage());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/8f612b0e/libraries/kiwi/kiwi-triplestore/src/main/resources/org/apache/marmotta/kiwi/persistence/mysql/create_base_tables.sql
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-triplestore/src/main/resources/org/apache/marmotta/kiwi/persistence/mysql/create_base_tables.sql b/libraries/kiwi/kiwi-triplestore/src/main/resources/org/apache/marmotta/kiwi/persistence/mysql/create_base_tables.sql
index d2db972..548750a 100644
--- a/libraries/kiwi/kiwi-triplestore/src/main/resources/org/apache/marmotta/kiwi/persistence/mysql/create_base_tables.sql
+++ b/libraries/kiwi/kiwi-triplestore/src/main/resources/org/apache/marmotta/kiwi/persistence/mysql/create_base_tables.sql
@@ -12,13 +12,13 @@
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
-CREATE TABLE seq_nodes (id BIGINT NOT NULL);
+CREATE TABLE seq_nodes (id BIGINT NOT NULL) ENGINE=InnoDB;
INSERT INTO seq_nodes(id) VALUES (0);
-CREATE TABLE seq_triples (id BIGINT NOT NULL);
+CREATE TABLE seq_triples (id BIGINT NOT NULL) ENGINE=InnoDB;
INSERT INTO seq_triples VALUES (0);
-CREATE TABLE seq_namespaces (id BIGINT NOT NULL);
+CREATE TABLE seq_namespaces (id BIGINT NOT NULL) ENGINE=InnoDB;
INSERT INTO seq_namespaces(id) VALUES (0);
-- Sequences in MySQL:
@@ -38,7 +38,7 @@ CREATE TABLE nodes (
lang varchar(5),
createdAt timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(id)
-) CHARACTER SET utf8 COLLATE utf8_bin;
+) CHARACTER SET utf8 COLLATE utf8_bin ENGINE=InnoDB;
CREATE TABLE triples (
id bigint NOT NULL,
@@ -52,7 +52,7 @@ CREATE TABLE triples (
createdAt timestamp NOT NULL DEFAULT now(),
deletedAt timestamp,
PRIMARY KEY(id)
-) CHARACTER SET utf8 COLLATE utf8_bin;
+) CHARACTER SET utf8 COLLATE utf8_bin ENGINE=InnoDB;
CREATE TABLE namespaces (
id bigint NOT NULL,
@@ -60,7 +60,7 @@ CREATE TABLE namespaces (
uri varchar(2048) NOT NULL,
createdAt timestamp NOT NULL DEFAULT now(),
PRIMARY KEY(id)
-) CHARACTER SET utf8 COLLATE utf8_bin;
+) CHARACTER SET utf8 COLLATE utf8_bin ENGINE=InnoDB;
-- A table for storing metadata about the current database, e.g. version numbers for each table
@@ -69,7 +69,7 @@ CREATE TABLE metadata (
mkey varchar(16) NOT NULL,
mvalue varchar(256) NOT NULL,
PRIMARY KEY(id)
-) CHARACTER SET utf8 COLLATE utf8_bin;
+) CHARACTER SET utf8 COLLATE utf8_bin ENGINE=InnoDB;
-- Indexes for accessing nodes and triples efficiently
CREATE INDEX idx_node_content ON nodes(svalue(256));
http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/8f612b0e/libraries/ldcache/ldcache-core/src/main/java/org/apache/marmotta/ldcache/services/LDCache.java
----------------------------------------------------------------------
diff --git a/libraries/ldcache/ldcache-core/src/main/java/org/apache/marmotta/ldcache/services/LDCache.java b/libraries/ldcache/ldcache-core/src/main/java/org/apache/marmotta/ldcache/services/LDCache.java
index 9845579..d28a571 100644
--- a/libraries/ldcache/ldcache-core/src/main/java/org/apache/marmotta/ldcache/services/LDCache.java
+++ b/libraries/ldcache/ldcache-core/src/main/java/org/apache/marmotta/ldcache/services/LDCache.java
@@ -18,7 +18,7 @@
package org.apache.marmotta.ldcache.services;
import info.aduna.iteration.CloseableIteration;
-import org.apache.marmotta.commons.locking.StringLocks;
+import org.apache.marmotta.commons.locking.ObjectLocks;
import org.apache.marmotta.ldcache.api.LDCachingBackend;
import org.apache.marmotta.ldcache.api.LDCachingConnection;
import org.apache.marmotta.ldcache.api.LDCachingService;
@@ -52,7 +52,7 @@ public class LDCache implements LDCachingService {
// lock a resource while refreshing it so that not several threads trigger a refresh at the same time
- private StringLocks resourceLocks;
+ private ObjectLocks resourceLocks;
private LDClientService ldclient;
@@ -71,7 +71,7 @@ public class LDCache implements LDCachingService {
public LDCache(CacheConfiguration config, LDCachingBackend backend) {
log.info("Linked Data Caching Service initialising ...");
- this.resourceLocks = new StringLocks();
+ this.resourceLocks = new ObjectLocks();
this.backend = backend;
this.ldclient = new LDClient(config.getClientConfiguration());
this.config = config;