You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@marmotta.apache.org by ss...@apache.org on 2013/09/02 17:58:10 UTC

[1/2] git commit: towards Sesame Test Suite (fixes MARMOTTA-297, addresses MARMOTTA-287, maybe solves MARMOTTA-283)

Updated Branches:
  refs/heads/develop ed115fcff -> b82e9ecef


towards Sesame Test Suite (fixes MARMOTTA-297, addresses MARMOTTA-287, maybe solves MARMOTTA-283)


Project: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/commit/11aae1b5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/tree/11aae1b5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/diff/11aae1b5

Branch: refs/heads/develop
Commit: 11aae1b5ae23aca6dfab392ed2d94fa7f8957e4b
Parents: bbd900e
Author: Sebastian Schaffert <ss...@apache.org>
Authored: Mon Sep 2 17:57:03 2013 +0200
Committer: Sebastian Schaffert <ss...@apache.org>
Committed: Mon Sep 2 17:57:03 2013 +0200

----------------------------------------------------------------------
 .../sparql/sail/KiWiSparqlSailConnection.java   |   3 +-
 .../marmotta/kiwi/model/rdf/KiWiTriple.java     |   2 +-
 .../kiwi/persistence/KiWiConnection.java        | 319 +++++++++++--------
 .../kiwi/persistence/KiWiPersistence.java       |   2 +-
 .../marmotta/kiwi/sail/KiWiSailConnection.java  |  26 +-
 .../kiwi/persistence/h2/create_base_tables.sql  |   2 +-
 .../persistence/mysql/create_base_tables.sql    |   2 +-
 .../kiwi/test/MySQLConcurrencyTest.java         |   2 +-
 .../test/sesame/KiWiSailConcurrencyTest.java    |  18 +-
 .../kiwi/test/sesame/KiWiStoreTest.java         |  21 +-
 .../KiWiRepositoryConnectionTest.java           |  21 +-
 .../sesame/repository/KiWiRepositoryTest.java   |  22 +-
 .../kiwi/model/caching/TripleTable.java         |  37 ++-
 13 files changed, 315 insertions(+), 162 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/11aae1b5/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/sail/KiWiSparqlSailConnection.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/sail/KiWiSparqlSailConnection.java b/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/sail/KiWiSparqlSailConnection.java
index bad56e3..b26ee36 100644
--- a/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/sail/KiWiSparqlSailConnection.java
+++ b/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/sail/KiWiSparqlSailConnection.java
@@ -32,6 +32,7 @@ import org.openrdf.query.algebra.QueryRoot;
 import org.openrdf.query.algebra.TupleExpr;
 import org.openrdf.query.algebra.evaluation.EvaluationStrategy;
 import org.openrdf.query.algebra.evaluation.impl.*;
+import org.openrdf.query.impl.EmptyBindingSet;
 import org.openrdf.sail.NotifyingSailConnection;
 import org.openrdf.sail.SailConnection;
 import org.openrdf.sail.SailException;
@@ -86,7 +87,7 @@ public class KiWiSparqlSailConnection extends NotifyingSailConnectionWrapper {
 
             log.debug("evaluating SPARQL query:\n {}", tupleExpr);
 
-            return strategy.evaluate(tupleExpr, bindings);
+            return strategy.evaluate(tupleExpr, EmptyBindingSet.getInstance());
 
         } catch (QueryEvaluationException e) {
             throw new SailException(e);

http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/11aae1b5/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/model/rdf/KiWiTriple.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/model/rdf/KiWiTriple.java b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/model/rdf/KiWiTriple.java
index 58ff9f4..369c408 100644
--- a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/model/rdf/KiWiTriple.java
+++ b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/model/rdf/KiWiTriple.java
@@ -221,7 +221,7 @@ public class KiWiTriple  implements Statement, Serializable {
      * @param deletedAt
      */
     public void setDeletedAt(Date deletedAt) {
-        this.deletedAt = new Date(deletedAt.getTime());
+        this.deletedAt = deletedAt != null ? new Date(deletedAt.getTime()) : null;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/11aae1b5/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java
index 1872edc..9aa43e4 100644
--- a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java
+++ b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java
@@ -32,18 +32,16 @@ import org.apache.marmotta.kiwi.model.rdf.*;
 import org.apache.marmotta.kiwi.persistence.util.ResultSetIteration;
 import org.apache.marmotta.kiwi.persistence.util.ResultTransformerFunction;
 import org.openrdf.model.Literal;
+import org.openrdf.model.Resource;
 import org.openrdf.model.Statement;
 import org.openrdf.repository.RepositoryException;
 import org.openrdf.repository.RepositoryResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Timestamp;
+import java.sql.*;
 import java.util.*;
+import java.util.Date;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -863,9 +861,7 @@ public class KiWiConnection {
 
 
     public long getNodeId() throws SQLException {
-        long result = getNextSequence("seq.nodes");
-
-        return result;
+        return getNextSequence("seq.nodes");
     }
 
     /**
@@ -1063,67 +1059,66 @@ public class KiWiConnection {
      * @return true in case the update added a new triple to the database, false in case the triple already existed
      */
     public synchronized boolean storeTriple(KiWiTriple triple) throws SQLException {
+        // mutual exclusion: prevent parallel adding and removing of the same triple
+        synchronized (triple) {
 
+            requireJDBCConnection();
 
-        requireJDBCConnection();
-
-        boolean hasId = triple.getId() != null;
-
+            boolean hasId = triple.getId() != null;
 
-        if(hasId && deletedStatementsLog.contains(triple.getId())) {
-            // this is a hack for a concurrency problem that may occur in case the triple is removed in the
-            // transaction and then added again; in these cases the createStatement method might return
-            // an expired state of the triple because it uses its own database connection
+            if(hasId && deletedStatementsLog.contains(triple.getId())) {
+                // this is a hack for a concurrency problem that may occur in case the triple is removed in the
+                // transaction and then added again; in these cases the createStatement method might return
+                // an expired state of the triple because it uses its own database connection
 
-            //deletedStatementsLog.remove(triple.getId());
-            undeleteTriple(triple);
+                //deletedStatementsLog.remove(triple.getId());
+                undeleteTriple(triple);
 
-            return true;
-        } else {
-            // retrieve a new triple ID and set it in the object
-            if(triple.getId() == null) {
-                triple.setId(getNextSequence("seq.triples"));
-            }
+                return true;
+            } else {
+                // retrieve a new triple ID and set it in the object
+                if(triple.getId() == null) {
+                    triple.setId(getNextSequence("seq.triples"));
+                }
 
-            if(batchCommit) {
-                commitLock.lock();
-                try {
-                    cacheTriple(triple);
-                    synchronized (tripleBatch) {
+                if(batchCommit) {
+                    commitLock.lock();
+                    try {
+                        cacheTriple(triple);
                         tripleBatch.add(triple);
                         if(tripleBatch.size() >= batchSize) {
                             flushBatch();
                         }
+                    } finally {
+                        commitLock.unlock();
                     }
-                } finally {
-                    commitLock.unlock();
-                }
-                return !hasId;
-            }  else {
-                Preconditions.checkNotNull(triple.getSubject().getId());
-                Preconditions.checkNotNull(triple.getPredicate().getId());
-                Preconditions.checkNotNull(triple.getObject().getId());
-                Preconditions.checkNotNull(triple.getContext().getId());
+                    return !hasId;
+                }  else {
+                    Preconditions.checkNotNull(triple.getSubject().getId());
+                    Preconditions.checkNotNull(triple.getPredicate().getId());
+                    Preconditions.checkNotNull(triple.getObject().getId());
+                    Preconditions.checkNotNull(triple.getContext().getId());
 
 
-                try {
-                    PreparedStatement insertTriple = getPreparedStatement("store.triple");
-                    insertTriple.setLong(1,triple.getId());
-                    insertTriple.setLong(2,triple.getSubject().getId());
-                    insertTriple.setLong(3,triple.getPredicate().getId());
-                    insertTriple.setLong(4,triple.getObject().getId());
-                    insertTriple.setLong(5,triple.getContext().getId());
-                    insertTriple.setBoolean(6,triple.isInferred());
-                    insertTriple.setTimestamp(7, new Timestamp(triple.getCreated().getTime()));
-                    int count = insertTriple.executeUpdate();
-
-                    cacheTriple(triple);
-
-                    return count > 0;
-                } catch(SQLException ex) {
-                    // this is an ugly hack to catch duplicate key errors in some databases (H2)
-                    // better option could be http://stackoverflow.com/questions/6736518/h2-java-insert-ignore-allow-exception
-                    return false;
+                    try {
+                        PreparedStatement insertTriple = getPreparedStatement("store.triple");
+                        insertTriple.setLong(1,triple.getId());
+                        insertTriple.setLong(2,triple.getSubject().getId());
+                        insertTriple.setLong(3,triple.getPredicate().getId());
+                        insertTriple.setLong(4,triple.getObject().getId());
+                        insertTriple.setLong(5,triple.getContext().getId());
+                        insertTriple.setBoolean(6,triple.isInferred());
+                        insertTriple.setTimestamp(7, new Timestamp(triple.getCreated().getTime()));
+                        int count = insertTriple.executeUpdate();
+
+                        cacheTriple(triple);
+
+                        return count > 0;
+                    } catch(SQLException ex) {
+                        // this is an ugly hack to catch duplicate key errors in some databases (H2)
+                        // better option could be http://stackoverflow.com/questions/6736518/h2-java-insert-ignore-allow-exception
+                        return false;
+                    }
                 }
             }
         }
@@ -1143,11 +1138,9 @@ public class KiWiConnection {
      */
     public synchronized Long getTripleId(final KiWiResource subject, final KiWiUriResource predicate, final KiWiNode object, final KiWiResource context, final boolean inferred) throws SQLException {
         if(tripleBatch != null && tripleBatch.size() > 0) {
-            synchronized (tripleBatch) {
-                Collection<KiWiTriple> batched = tripleBatch.listTriples(subject,predicate,object,context);
-                if(batched.size() > 0) {
-                    return batched.iterator().next().getId();
-                }
+            Collection<KiWiTriple> batched = tripleBatch.listTriples(subject,predicate,object,context);
+            if(batched.size() > 0) {
+                return batched.iterator().next().getId();
             }
         }
 
@@ -1181,25 +1174,49 @@ public class KiWiConnection {
      * @param triple
      */
     public void deleteTriple(KiWiTriple triple) throws SQLException {
-        // make sure the triple is marked as deleted in case some service still holds a reference
-        triple.setDeleted(true);
-        triple.setDeletedAt(new Date());
+        // mutual exclusion: prevent parallel adding and removing of the same triple
+        synchronized (triple) {
 
-        if(triple.getId() == null) {
-            log.warn("attempting to remove non-persistent triple: {}", triple);
-        } else if(tripleBatch == null || !tripleBatch.contains(triple)) {
-            requireJDBCConnection();
+            // make sure the triple is marked as deleted in case some service still holds a reference
+            triple.setDeleted(true);
+            triple.setDeletedAt(new Date());
 
-            PreparedStatement deleteTriple = getPreparedStatement("delete.triple");
-            deleteTriple.setLong(1,triple.getId());
-            deleteTriple.executeUpdate();
+            if(triple.getId() == null) {
+                log.warn("attempting to remove non-persistent triple: {}", triple);
+                removeCachedTriple(triple);
+            } else {
+                if(batchCommit) {
+                    // need to remove from triple batch and from database
+                    commitLock.lock();
+                    try {
+                        if(!tripleBatch.remove(triple)) {
+                            requireJDBCConnection();
 
-            deletedStatementsLog.add(triple.getId());
-        } else {
-            tripleBatch.remove(triple);
-        }
-        removeCachedTriple(triple);
+                            PreparedStatement deleteTriple = getPreparedStatement("delete.triple");
+                            synchronized (deleteTriple) {
+                                deleteTriple.setLong(1,triple.getId());
+                                deleteTriple.executeUpdate();
+                            }
+                            deletedStatementsLog.add(triple.getId());
+                        }
+                    } finally {
+                        commitLock.unlock();
+                    }
+                } else {
+                    requireJDBCConnection();
 
+                    PreparedStatement deleteTriple = getPreparedStatement("delete.triple");
+                    synchronized (deleteTriple) {
+                        deleteTriple.setLong(1,triple.getId());
+                        deleteTriple.executeUpdate();
+                    }
+                    deletedStatementsLog.add(triple.getId());
+
+
+                }
+                removeCachedTriple(triple);
+            }
+        }
     }
 
     /**
@@ -1219,6 +1236,11 @@ public class KiWiConnection {
 
         requireJDBCConnection();
 
+        // make sure the triple is not marked as deleted in case some service still holds a reference
+        triple.setDeleted(false);
+        triple.setDeletedAt(null);
+
+
         synchronized (triple) {
             if(!triple.isDeleted()) {
                 log.warn("attemting to undelete triple that was not deleted: {}",triple);
@@ -1228,10 +1250,6 @@ public class KiWiConnection {
             undeleteTriple.setLong(1, triple.getId());
             undeleteTriple.executeUpdate();
 
-            // make sure the triple is marked as deleted in case some service still holds a reference
-            triple.setDeleted(false);
-            triple.setDeletedAt(null);
-
             cacheTriple(triple);
         }
 
@@ -1250,12 +1268,33 @@ public class KiWiConnection {
 
         final ResultSet result = queryContexts.executeQuery();
 
-        return new ResultSetIteration<KiWiResource>(result, new ResultTransformerFunction<KiWiResource>() {
-            @Override
-            public KiWiResource apply(ResultSet row) throws SQLException {
-                return (KiWiResource)loadNodeById(result.getLong("context"));
-            }
-        });
+        if(tripleBatch != null && tripleBatch.size() > 0) {
+            return new DistinctIteration<KiWiResource, SQLException>(
+                    new UnionIteration<KiWiResource, SQLException>(
+                            new ConvertingIteration<Resource,KiWiResource,SQLException>(new IteratorIteration<Resource, SQLException>(tripleBatch.listContextIDs().iterator())) {
+                                @Override
+                                protected KiWiResource convert(Resource sourceObject) throws SQLException {
+                                    return (KiWiResource)sourceObject;
+                                }
+                            },
+                            new ResultSetIteration<KiWiResource>(result, new ResultTransformerFunction<KiWiResource>() {
+                                @Override
+                                public KiWiResource apply(ResultSet row) throws SQLException {
+                                    return (KiWiResource)loadNodeById(result.getLong("context"));
+                                }
+                            })
+                    )
+            );
+
+
+        } else {
+            return new ResultSetIteration<KiWiResource>(result, new ResultTransformerFunction<KiWiResource>() {
+                @Override
+                public KiWiResource apply(ResultSet row) throws SQLException {
+                    return (KiWiResource)loadNodeById(result.getLong("context"));
+                }
+            });
+        }
 
     }
 
@@ -1926,26 +1965,28 @@ public class KiWiConnection {
      */
     public void commitMemorySequences() throws SQLException {
         if(persistence.getMemorySequences() != null) {
-            requireJDBCConnection();
+            synchronized (persistence.getMemorySequences()) {
+                requireJDBCConnection();
 
-            Set<String> updated = persistence.getSequencesUpdated();
-            persistence.setSequencesUpdated(new HashSet<String>());
+                Set<String> updated = persistence.getSequencesUpdated();
+                persistence.setSequencesUpdated(new HashSet<String>());
 
-            try {
-                for(Map.Entry<String,Long> entry : persistence.getMemorySequences().asMap().entrySet()) {
-                    if( updated.contains(entry.getKey()) && entry.getValue() > 0) {
-                        PreparedStatement updateSequence = getPreparedStatement(entry.getKey()+".set");
-                        updateSequence.setLong(1, entry.getValue());
-                        if(updateSequence.execute()) {
-                            updateSequence.getResultSet().close();
-                        } else {
-                            updateSequence.getUpdateCount();
+                try {
+                    for(Map.Entry<String,Long> entry : persistence.getMemorySequences().asMap().entrySet()) {
+                        if( updated.contains(entry.getKey()) && entry.getValue() > 0) {
+                            PreparedStatement updateSequence = getPreparedStatement(entry.getKey()+".set");
+                            updateSequence.setLong(1, entry.getValue());
+                            if(updateSequence.execute()) {
+                                updateSequence.getResultSet().close();
+                            } else {
+                                updateSequence.getUpdateCount();
+                            }
                         }
                     }
+                } catch(SQLException ex) {
+                    log.error("SQL exception:",ex);
+                    throw ex;
                 }
-            } catch(SQLException ex) {
-                log.error("SQL exception:",ex);
-                throw ex;
             }
         }
 
@@ -2037,15 +2078,20 @@ public class KiWiConnection {
     }
 
 
+    int retry = 0;
+
     public synchronized void flushBatch() throws SQLException {
         if(batchCommit && tripleBatch != null) {
             requireJDBCConnection();
 
             commitLock.lock();
+
+            if(persistence.getValueFactory() != null) {
+                persistence.getValueFactory().flushBatch();
+            }
+
+            Savepoint savepoint = connection.setSavepoint();
             try {
-                if(persistence.getValueFactory() != null) {
-                    persistence.getValueFactory().flushBatch();
-                }
 
                 PreparedStatement insertTriple = getPreparedStatement("store.triple");
                 insertTriple.clearParameters();
@@ -2053,36 +2099,55 @@ public class KiWiConnection {
 
                 synchronized (tripleBatch) {
                     for(KiWiTriple triple : tripleBatch) {
-                        // retrieve a new triple ID and set it in the object
-                        if(!triple.isDeleted()) {
-                            if(triple.getId() == null) {
-                                triple.setId(getNextSequence("seq.triples"));
-                                log.warn("the batched triple did not have an ID");
-                            }
+                        // if the triple has been marked as deleted, this can only have been done by another connection
+                        // in this case the triple id is no longer usable (might result in key conflicts), so we set it to
+                        // a new id
+                        if(triple.isDeleted()) {
+                            triple.setId(getNextSequence("seq.triples"));
+                            triple.setDeleted(false);
+                            triple.setDeletedAt(null);
+                        }
 
-                            insertTriple.setLong(1,triple.getId());
-                            insertTriple.setLong(2,triple.getSubject().getId());
-                            insertTriple.setLong(3,triple.getPredicate().getId());
-                            insertTriple.setLong(4,triple.getObject().getId());
-                            insertTriple.setLong(5,triple.getContext().getId());
-                            insertTriple.setBoolean(6,triple.isInferred());
-                            insertTriple.setTimestamp(7, new Timestamp(triple.getCreated().getTime()));
-
-                            insertTriple.addBatch();
-                        } else {
-                            log.error("triple batch contained a triple marked as deleted, this should not happen (triple: {})", triple);
+                        // retrieve a new triple ID and set it in the object
+                        if(triple.getId() == null) {
+                            triple.setId(getNextSequence("seq.triples"));
                         }
+
+                        insertTriple.setLong(1,triple.getId());
+                        insertTriple.setLong(2,triple.getSubject().getId());
+                        insertTriple.setLong(3,triple.getPredicate().getId());
+                        insertTriple.setLong(4,triple.getObject().getId());
+                        insertTriple.setLong(5,triple.getContext().getId());
+                        insertTriple.setBoolean(6,triple.isInferred());
+                        insertTriple.setTimestamp(7, new Timestamp(triple.getCreated().getTime()));
+
+                        insertTriple.addBatch();
                     }
-                    tripleBatch.clear();
                 }
                 insertTriple.executeBatch();
 
+                tripleBatch.clear();
+
+                connection.releaseSavepoint(savepoint);
             } catch (SQLException ex) {
-                System.err.println("main exception:");
-                ex.printStackTrace();
-                System.err.println("next exception:");
-                ex.getNextException().printStackTrace();
-                throw ex;
+                if(retry < 10) {
+                    connection.rollback(savepoint);
+                    log.warn("temporary concurrency conflict, retrying in 1000 ms ... (thread={}, retry={})", Thread.currentThread().getName(), retry);
+                    try {
+                        Thread.sleep(1000);
+                    } catch (InterruptedException e) {}
+                    retry++;
+                    flushBatch();
+                    retry--;
+                } else {
+                    log.error("concurrency conflict could not be solved!");
+
+                    System.err.println("main exception:");
+                    ex.printStackTrace();
+                    System.err.println("next exception:");
+                    ex.getNextException().printStackTrace();
+                    throw ex;
+                }
             }  finally {
                 commitLock.unlock();
             }

http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/11aae1b5/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiPersistence.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiPersistence.java b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiPersistence.java
index 162de30..6a73579 100644
--- a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiPersistence.java
+++ b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiPersistence.java
@@ -540,7 +540,7 @@ public class KiWiPersistence {
 
 
     public void initialise() {
-        garbageCollector.start();
+        //garbageCollector.start();
     }
 
     public void shutdown() {

http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/11aae1b5/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/sail/KiWiSailConnection.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/sail/KiWiSailConnection.java b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/sail/KiWiSailConnection.java
index fe7176e..bbb1bfd 100644
--- a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/sail/KiWiSailConnection.java
+++ b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/sail/KiWiSailConnection.java
@@ -20,12 +20,7 @@ package org.apache.marmotta.kiwi.sail;
 import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
-import info.aduna.iteration.CloseableIteration;
-import info.aduna.iteration.DelayedIteration;
-import info.aduna.iteration.ExceptionConvertingIteration;
-import info.aduna.iteration.Iteration;
-import info.aduna.iteration.Iterations;
-import info.aduna.iteration.UnionIteration;
+import info.aduna.iteration.*;
 import org.apache.marmotta.commons.sesame.repository.ResourceConnection;
 import org.apache.marmotta.kiwi.model.rdf.KiWiNamespace;
 import org.apache.marmotta.kiwi.model.rdf.KiWiNode;
@@ -45,6 +40,7 @@ import org.openrdf.query.algebra.Var;
 import org.openrdf.query.algebra.evaluation.EvaluationStrategy;
 import org.openrdf.query.algebra.evaluation.TripleSource;
 import org.openrdf.query.algebra.evaluation.impl.*;
+import org.openrdf.query.impl.EmptyBindingSet;
 import org.openrdf.repository.RepositoryException;
 import org.openrdf.repository.RepositoryResult;
 import org.openrdf.sail.Sail;
@@ -228,7 +224,7 @@ public class KiWiSailConnection extends NotifyingSailConnectionBase implements I
             new FilterOptimizer().optimize(tupleExpr, dataset, bindings);
             new OrderLimitOptimizer().optimize(tupleExpr, dataset, bindings);
 
-            return strategy.evaluate(tupleExpr, bindings);
+            return strategy.evaluate(tupleExpr, EmptyBindingSet.getInstance());
 
         } catch (QueryEvaluationException e) {
             throw new SailException(e);
@@ -239,11 +235,16 @@ public class KiWiSailConnection extends NotifyingSailConnectionBase implements I
     @Override
     protected CloseableIteration<? extends Resource, SailException> getContextIDsInternal() throws SailException {
         try {
-            return new ExceptionConvertingIteration<Resource, SailException>(databaseConnection.listContexts()) {
+            return  new FilterIteration<Resource, SailException>(new ExceptionConvertingIteration<Resource, SailException>(databaseConnection.listContexts()) {
                 @Override
                 protected SailException convert(Exception e) {
                     return new SailException("database error while iterating over result set",e);
                 }
+            }) {
+                @Override
+                protected boolean accept(Resource object) throws SailException {
+                    return !object.stringValue().equals(defaultContext);
+                }
             };
         } catch (SQLException e) {
             throw new SailException("database error while listing contexts",e);
@@ -260,7 +261,12 @@ public class KiWiSailConnection extends NotifyingSailConnectionBase implements I
         contextSet.addAll(Lists.transform(Arrays.asList(contexts), new Function<Resource, KiWiResource>() {
             @Override
             public KiWiResource apply(Resource input) {
-                return valueFactory.convert(input);
+                if(input == null) {
+                    // null value for context means statements without context; in KiWi, this means "default context"
+                    return (KiWiUriResource)valueFactory.createURI(defaultContext);
+                } else {
+                    return valueFactory.convert(input);
+                }
             }
         }));
 
@@ -383,6 +389,7 @@ public class KiWiSailConnection extends NotifyingSailConnectionBase implements I
                     triplesRemoved = true;
                     notifyStatementRemoved(triple);
                 }
+                valueFactory.removeStatement(triple);
             }
             triples.close();
         } catch(SQLException ex) {
@@ -413,6 +420,7 @@ public class KiWiSailConnection extends NotifyingSailConnectionBase implements I
                     triplesRemoved = true;
                     notifyStatementRemoved(triple);
                 }
+                valueFactory.removeStatement(triple);
             }
             triples.close();
         } catch(SQLException ex) {

http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/11aae1b5/libraries/kiwi/kiwi-triplestore/src/main/resources/org/apache/marmotta/kiwi/persistence/h2/create_base_tables.sql
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-triplestore/src/main/resources/org/apache/marmotta/kiwi/persistence/h2/create_base_tables.sql b/libraries/kiwi/kiwi-triplestore/src/main/resources/org/apache/marmotta/kiwi/persistence/h2/create_base_tables.sql
index f6e1722..5427d8c 100644
--- a/libraries/kiwi/kiwi-triplestore/src/main/resources/org/apache/marmotta/kiwi/persistence/h2/create_base_tables.sql
+++ b/libraries/kiwi/kiwi-triplestore/src/main/resources/org/apache/marmotta/kiwi/persistence/h2/create_base_tables.sql
@@ -20,7 +20,7 @@ CREATE SEQUENCE seq_namespaces;
 CREATE TABLE nodes (
   id        bigint     NOT NULL,
   ntype     char(8)    NOT NULL,
-  svalue    varchar(65536) NOT NULL,
+  svalue    varchar(2147483647) NOT NULL,
   dvalue    double precision,
   ivalue    bigint,
   tvalue    timestamp,

http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/11aae1b5/libraries/kiwi/kiwi-triplestore/src/main/resources/org/apache/marmotta/kiwi/persistence/mysql/create_base_tables.sql
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-triplestore/src/main/resources/org/apache/marmotta/kiwi/persistence/mysql/create_base_tables.sql b/libraries/kiwi/kiwi-triplestore/src/main/resources/org/apache/marmotta/kiwi/persistence/mysql/create_base_tables.sql
index 548750a..e6db9ea 100644
--- a/libraries/kiwi/kiwi-triplestore/src/main/resources/org/apache/marmotta/kiwi/persistence/mysql/create_base_tables.sql
+++ b/libraries/kiwi/kiwi-triplestore/src/main/resources/org/apache/marmotta/kiwi/persistence/mysql/create_base_tables.sql
@@ -29,7 +29,7 @@ INSERT INTO seq_namespaces(id) VALUES (0);
 CREATE TABLE nodes (
   id        bigint     NOT NULL,
   ntype     char(8)    NOT NULL,
-  svalue    text       NOT NULL,
+  svalue    longtext   NOT NULL,
   dvalue    double precision,
   ivalue    bigint,
   tvalue    datetime   DEFAULT NULL,

http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/11aae1b5/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/MySQLConcurrencyTest.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/MySQLConcurrencyTest.java b/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/MySQLConcurrencyTest.java
index ddcdf46..aeddeaa 100644
--- a/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/MySQLConcurrencyTest.java
+++ b/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/MySQLConcurrencyTest.java
@@ -28,7 +28,7 @@ public class MySQLConcurrencyTest extends ConcurrencyTestBase {
 
     @BeforeClass
     public static void setup() throws RepositoryException {
-        logger = LoggerFactory.getLogger(H2ConcurrencyTest.class);
+        logger = LoggerFactory.getLogger(MySQLConcurrencyTest.class);
 
         KiWiConfiguration mysqlConfig = KiWiDatabaseRunner.createKiWiConfig("MySQL", new MySQLDialect());
         DBConnectionChecker.checkDatabaseAvailability(mysqlConfig);

http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/11aae1b5/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/sesame/KiWiSailConcurrencyTest.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/sesame/KiWiSailConcurrencyTest.java b/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/sesame/KiWiSailConcurrencyTest.java
index f4d1fec..20b4dd4 100644
--- a/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/sesame/KiWiSailConcurrencyTest.java
+++ b/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/sesame/KiWiSailConcurrencyTest.java
@@ -27,6 +27,11 @@ import org.junit.runner.RunWith;
 import org.openrdf.sail.Sail;
 import org.openrdf.sail.SailConcurrencyTest;
 import org.openrdf.sail.SailException;
+import org.openrdf.sail.helpers.SailWrapper;
+
+import java.sql.SQLException;
+
+import static org.junit.Assert.fail;
 
 /**
  * Run the Sesame {@link SailConcurrencyTest} suite.
@@ -44,7 +49,18 @@ public class KiWiSailConcurrencyTest extends SailConcurrencyTest {
     
     @Override
     protected Sail createSail() throws SailException {
-        KiWiStore store = new KiWiStore(kiwiConfig);
+        Sail store = new SailWrapper(new KiWiStore(kiwiConfig)) {
+            @Override
+            public void shutDown() throws SailException {
+                try {
+                    ((KiWiStore)getBaseSail()).getPersistence().dropDatabase();
+                } catch (SQLException e) {
+                    fail("SQL exception while deleting database");
+                }
+
+                super.shutDown();
+            }
+        };
         return store;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/11aae1b5/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/sesame/KiWiStoreTest.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/sesame/KiWiStoreTest.java b/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/sesame/KiWiStoreTest.java
index 93ebf66..d73e7f2 100644
--- a/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/sesame/KiWiStoreTest.java
+++ b/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/sesame/KiWiStoreTest.java
@@ -24,6 +24,11 @@ import org.junit.runner.RunWith;
 import org.openrdf.sail.RDFStoreTest;
 import org.openrdf.sail.Sail;
 import org.openrdf.sail.SailException;
+import org.openrdf.sail.helpers.SailWrapper;
+
+import java.sql.SQLException;
+
+import static org.junit.Assert.fail;
 
 /**
  * Run the Sesame {@link RDFStoreTest} suite.
@@ -40,9 +45,23 @@ public class KiWiStoreTest extends RDFStoreTest {
     
     @Override
     protected Sail createSail() throws SailException {
-        KiWiStore store = new KiWiStore(kiwiConfig);
+        Sail store = new SailWrapper(new KiWiStore(kiwiConfig)) {
+            @Override
+            public void shutDown() throws SailException {
+                try {
+                    ((KiWiStore)getBaseSail()).getPersistence().dropDatabase();
+                } catch (SQLException e) {
+                    fail("SQL exception while deleting database");
+                }
+
+                super.shutDown();
+            }
+        };
         store.initialize();
         return store;
     }
 
+
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/11aae1b5/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/sesame/repository/KiWiRepositoryConnectionTest.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/sesame/repository/KiWiRepositoryConnectionTest.java b/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/sesame/repository/KiWiRepositoryConnectionTest.java
index 588d8ec..905fc8f 100644
--- a/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/sesame/repository/KiWiRepositoryConnectionTest.java
+++ b/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/sesame/repository/KiWiRepositoryConnectionTest.java
@@ -24,6 +24,13 @@ import org.junit.runner.RunWith;
 import org.openrdf.repository.Repository;
 import org.openrdf.repository.RepositoryConnectionTest;
 import org.openrdf.repository.sail.SailRepository;
+import org.openrdf.sail.Sail;
+import org.openrdf.sail.SailException;
+import org.openrdf.sail.helpers.SailWrapper;
+
+import java.sql.SQLException;
+
+import static org.junit.Assert.fail;
 
 /**
  * Run the {@link RepositoryConnectionTest}s.
@@ -44,7 +51,19 @@ public class KiWiRepositoryConnectionTest extends RepositoryConnectionTest {
      */
     @Override
     protected Repository createRepository() throws Exception {
-        return new SailRepository(new KiWiStore(config));
+        Sail store = new SailWrapper(new KiWiStore(config)) {
+            @Override
+            public void shutDown() throws SailException {
+                try {
+                    ((KiWiStore)getBaseSail()).getPersistence().dropDatabase();
+                } catch (SQLException e) {
+                    fail("SQL exception while deleting database");
+                }
+
+                super.shutDown();
+            }
+        };
+        return new SailRepository(store);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/11aae1b5/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/sesame/repository/KiWiRepositoryTest.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/sesame/repository/KiWiRepositoryTest.java b/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/sesame/repository/KiWiRepositoryTest.java
index 3f02a77..26e3be8 100644
--- a/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/sesame/repository/KiWiRepositoryTest.java
+++ b/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/sesame/repository/KiWiRepositoryTest.java
@@ -24,6 +24,13 @@ import org.junit.runner.RunWith;
 import org.openrdf.repository.Repository;
 import org.openrdf.repository.RepositoryTest;
 import org.openrdf.repository.sail.SailRepository;
+import org.openrdf.sail.Sail;
+import org.openrdf.sail.SailException;
+import org.openrdf.sail.helpers.SailWrapper;
+
+import java.sql.SQLException;
+
+import static org.junit.Assert.fail;
 
 /**
  * Run the {@link RepositoryTest}s.
@@ -38,13 +45,24 @@ public class KiWiRepositoryTest extends RepositoryTest {
     public KiWiRepositoryTest(KiWiConfiguration config) {
         this.config = config;
     }
-    
+
     /* (non-Javadoc)
      * @see org.openrdf.repository.RepositoryTest#createRepository()
      */
     @Override
     protected Repository createRepository() throws Exception {
-        KiWiStore store = new KiWiStore(config);
+        Sail store = new SailWrapper(new KiWiStore(config)) {
+            @Override
+            public void shutDown() throws SailException {
+                try {
+                    ((KiWiStore)getBaseSail()).getPersistence().dropDatabase();
+                } catch (SQLException e) {
+                    fail("SQL exception while deleting database");
+                }
+
+                super.shutDown();
+            }
+        };
         return new SailRepository(store);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/11aae1b5/libraries/kiwi/kiwi-tripletable/src/main/java/org/apache/marmotta/kiwi/model/caching/TripleTable.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-tripletable/src/main/java/org/apache/marmotta/kiwi/model/caching/TripleTable.java b/libraries/kiwi/kiwi-tripletable/src/main/java/org/apache/marmotta/kiwi/model/caching/TripleTable.java
index 6b0ec57..f4b986e 100644
--- a/libraries/kiwi/kiwi-tripletable/src/main/java/org/apache/marmotta/kiwi/model/caching/TripleTable.java
+++ b/libraries/kiwi/kiwi-tripletable/src/main/java/org/apache/marmotta/kiwi/model/caching/TripleTable.java
@@ -83,7 +83,7 @@ public class TripleTable<Triple extends Statement> implements Set<Triple>, Seria
      * @return the number of elements in this set (its cardinality)
      */
     @Override
-    public int size() {
+    public synchronized int size() {
         return data.size();
     }
 
@@ -93,7 +93,7 @@ public class TripleTable<Triple extends Statement> implements Set<Triple>, Seria
      * @return <tt>true</tt> if this set contains no elements
      */
     @Override
-    public boolean isEmpty() {
+    public synchronized boolean isEmpty() {
         return data.isEmpty();
     }
 
@@ -111,7 +111,7 @@ public class TripleTable<Triple extends Statement> implements Set<Triple>, Seria
      *         set does not permit null elements (optional)
      */
     @Override
-    public boolean contains(Object o) {
+    public synchronized boolean contains(Object o) {
         return data.contains(o);
     }
 
@@ -144,7 +144,7 @@ public class TripleTable<Triple extends Statement> implements Set<Triple>, Seria
      * @return an array containing all the elements in this set
      */
     @Override
-    public Object[] toArray() {
+    public synchronized Object[] toArray() {
         return data.toArray();
     }
 
@@ -191,7 +191,7 @@ public class TripleTable<Triple extends Statement> implements Set<Triple>, Seria
      * @throws NullPointerException if the specified array is null
      */
     @Override
-    public <T> T[] toArray(T[] a) {
+    public synchronized <T> T[] toArray(T[] a) {
         return data.toArray(a);
     }
 
@@ -226,7 +226,7 @@ public class TripleTable<Triple extends Statement> implements Set<Triple>, Seria
      *         prevents it from being added to this set
      */
     @Override
-    public boolean add(Triple triple) {
+    public synchronized boolean add(Triple triple) {
         indexSPOC.put(IntArray.createSPOCKey(triple.getSubject(), triple.getPredicate(), triple.getObject(), triple.getContext()),triple);
         indexCSPO.put(IntArray.createCSPOKey(triple.getSubject(), triple.getPredicate(), triple.getObject(), triple.getContext()),triple);
         return data.add(triple);
@@ -252,7 +252,7 @@ public class TripleTable<Triple extends Statement> implements Set<Triple>, Seria
      *         is not supported by this set
      */
     @Override
-    public boolean remove(Object o) {
+    public synchronized boolean remove(Object o) {
         if(o instanceof Statement) {
             Statement triple = (Statement)o;
             indexSPOC.remove(IntArray.createSPOCKey(triple.getSubject(), triple.getPredicate(), triple.getObject(), triple.getContext()));
@@ -278,7 +278,7 @@ public class TripleTable<Triple extends Statement> implements Set<Triple>, Seria
      * @see    #contains(Object)
      */
     @Override
-    public boolean containsAll(Collection<?> c) {
+    public synchronized boolean containsAll(Collection<?> c) {
         return data.containsAll(c);
     }
 
@@ -305,7 +305,7 @@ public class TripleTable<Triple extends Statement> implements Set<Triple>, Seria
      * @see #add(Object)
      */
     @Override
-    public boolean addAll(Collection<? extends Triple> c) {
+    public synchronized boolean addAll(Collection<? extends Triple> c) {
         boolean modified = false;
         for(Triple t : c) {
             modified = add(t) || modified;
@@ -333,7 +333,7 @@ public class TripleTable<Triple extends Statement> implements Set<Triple>, Seria
      * @see #remove(Object)
      */
     @Override
-    public boolean retainAll(Collection<?> c) {
+    public synchronized boolean retainAll(Collection<?> c) {
         Iterator<Map.Entry<IntArray,Triple>> it = indexSPOC.entrySet().iterator();
         while(it.hasNext()) {
             if(!c.contains(it.next().getValue())) {
@@ -369,7 +369,7 @@ public class TripleTable<Triple extends Statement> implements Set<Triple>, Seria
      * @see #contains(Object)
      */
     @Override
-    public boolean removeAll(Collection<?> c) {
+    public synchronized boolean removeAll(Collection<?> c) {
         boolean modified = false;
         for(Object o : c) {
             modified = remove(o) || modified;
@@ -385,7 +385,7 @@ public class TripleTable<Triple extends Statement> implements Set<Triple>, Seria
      *         is not supported by this set
      */
     @Override
-    public void clear() {
+    public synchronized void clear() {
         data.clear();
         indexSPOC.clear();
         indexCSPO.clear();
@@ -400,7 +400,7 @@ public class TripleTable<Triple extends Statement> implements Set<Triple>, Seria
      * @param context
      * @return
      */
-    public Collection<Triple> listTriples(final Resource subject, final URI property, final Value object, final Resource context) {
+    public synchronized Collection<Triple> listTriples(final Resource subject, final URI property, final Value object, final Resource context) {
         // in special cases we can make use of the index
         if(subject != null && property != null && object != null && context != null) {
             IntArray key = IntArray.createSPOCKey(subject, property, object, context);
@@ -450,9 +450,16 @@ public class TripleTable<Triple extends Statement> implements Set<Triple>, Seria
         }
     }
 
+    public synchronized Collection<Resource> listContextIDs() {
+        Set<Resource> result = new HashSet<>();
+        for(Statement stmt : data) {
+            result.add(stmt.getContext());
+        }
+        return result;
+    }
 
     @Override
-    public boolean equals(Object o) {
+    public synchronized boolean equals(Object o) {
         if (this == o) return true;
         if (o == null || getClass() != o.getClass()) return false;
 
@@ -465,7 +472,7 @@ public class TripleTable<Triple extends Statement> implements Set<Triple>, Seria
     }
 
     @Override
-    public int hashCode() {
+    public synchronized int hashCode() {
         return data.hashCode();
     }
 }


[2/2] git commit: Merge remote-tracking branch 'origin/develop' into develop

Posted by ss...@apache.org.
Merge remote-tracking branch 'origin/develop' into develop


Project: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/commit/b82e9ece
Tree: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/tree/b82e9ece
Diff: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/diff/b82e9ece

Branch: refs/heads/develop
Commit: b82e9eceff46fd1f2755e61f51c8c78663076d85
Parents: 11aae1b ed115fc
Author: Sebastian Schaffert <ss...@apache.org>
Authored: Mon Sep 2 17:57:59 2013 +0200
Committer: Sebastian Schaffert <ss...@apache.org>
Committed: Mon Sep 2 17:57:59 2013 +0200

----------------------------------------------------------------------
 .../services/importer/ImportWatchServiceImpl.java   | 16 +++++++++++-----
 1 file changed, 11 insertions(+), 5 deletions(-)
----------------------------------------------------------------------