You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@marmotta.apache.org by ss...@apache.org on 2013/12/13 11:45:42 UTC

git commit: also integrate existance check to KiWiLoader and add a test

Updated Branches:
  refs/heads/develop acba9247d -> 54670f4c9


also integrate existance check to KiWiLoader and add a test


Project: http://git-wip-us.apache.org/repos/asf/marmotta/repo
Commit: http://git-wip-us.apache.org/repos/asf/marmotta/commit/54670f4c
Tree: http://git-wip-us.apache.org/repos/asf/marmotta/tree/54670f4c
Diff: http://git-wip-us.apache.org/repos/asf/marmotta/diff/54670f4c

Branch: refs/heads/develop
Commit: 54670f4c973cf2032f43526ca8a911f44ff1daa5
Parents: acba924
Author: Sebastian Schaffert <ss...@apache.org>
Authored: Fri Dec 13 11:45:36 2013 +0100
Committer: Sebastian Schaffert <ss...@apache.org>
Committed: Fri Dec 13 11:45:36 2013 +0100

----------------------------------------------------------------------
 .../kiwi/loader/generic/KiWiBatchHandler.java   | 12 ++--
 .../kiwi/loader/generic/KiWiHandler.java        | 47 +++++++++++--
 .../kiwi/loader/pgsql/create_indexes.sql        |  2 +-
 .../marmotta/kiwi/loader/pgsql/drop_indexes.sql |  2 +-
 .../marmotta/kiwi/loader/KiWiHandlerTest.java   | 74 +++++++++++---------
 .../marmotta/kiwi/loader/KiWiLoaderTest.java    | 14 ++--
 .../marmotta/kiwi/loader/PGCopyUtilTest.java    | 23 ++----
 .../apache/marmotta/kiwi/sail/KiWiStore.java    |  1 +
 8 files changed, 104 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/marmotta/blob/54670f4c/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/generic/KiWiBatchHandler.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/generic/KiWiBatchHandler.java b/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/generic/KiWiBatchHandler.java
index 01b3dd3..4cbd29d 100644
--- a/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/generic/KiWiBatchHandler.java
+++ b/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/generic/KiWiBatchHandler.java
@@ -19,11 +19,7 @@ package org.apache.marmotta.kiwi.loader.generic;
 
 import org.apache.marmotta.kiwi.loader.KiWiLoaderConfiguration;
 import org.apache.marmotta.kiwi.loader.pgsql.KiWiPostgresHandler;
-import org.apache.marmotta.kiwi.model.rdf.KiWiAnonResource;
-import org.apache.marmotta.kiwi.model.rdf.KiWiLiteral;
-import org.apache.marmotta.kiwi.model.rdf.KiWiNode;
-import org.apache.marmotta.kiwi.model.rdf.KiWiTriple;
-import org.apache.marmotta.kiwi.model.rdf.KiWiUriResource;
+import org.apache.marmotta.kiwi.model.rdf.*;
 import org.apache.marmotta.kiwi.sail.KiWiStore;
 import org.openrdf.model.Literal;
 import org.openrdf.rio.RDFHandler;
@@ -201,9 +197,6 @@ public abstract class KiWiBatchHandler extends KiWiHandler implements RDFHandler
 
     @Override
     protected void storeTriple(KiWiTriple result) throws SQLException {
-        if(result.getId() < 0) {
-            result.setId(connection.getNextSequence("triples"));
-        }
 
         tripleBacklog.add(result);
 
@@ -212,6 +205,9 @@ public abstract class KiWiBatchHandler extends KiWiHandler implements RDFHandler
         if(triples % config.getCommitBatchSize() == 0) {
             try {
                 flushBacklog();
+                if(registry != null) {
+                    registry.releaseTransaction(connection.getTransactionId());
+                }
                 connection.commit();
             } catch (SQLException ex) {
                 log.warn("could not flush out data ({}), retrying with fresh connection", ex.getCause().getMessage());

http://git-wip-us.apache.org/repos/asf/marmotta/blob/54670f4c/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/generic/KiWiHandler.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/generic/KiWiHandler.java b/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/generic/KiWiHandler.java
index 9ce1e84..8eca550 100644
--- a/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/generic/KiWiHandler.java
+++ b/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/generic/KiWiHandler.java
@@ -7,16 +7,14 @@ import net.sf.ehcache.Element;
 import net.sf.ehcache.constructs.blocking.CacheEntryFactory;
 import net.sf.ehcache.constructs.blocking.SelfPopulatingCache;
 import org.apache.marmotta.commons.sesame.model.Namespaces;
+import org.apache.marmotta.commons.sesame.tripletable.IntArray;
 import org.apache.marmotta.commons.util.DateUtils;
 import org.apache.marmotta.kiwi.loader.KiWiLoaderConfiguration;
 import org.apache.marmotta.kiwi.model.rdf.*;
 import org.apache.marmotta.kiwi.persistence.KiWiConnection;
+import org.apache.marmotta.kiwi.persistence.KiWiTripleRegistry;
 import org.apache.marmotta.kiwi.sail.KiWiStore;
-import org.openrdf.model.BNode;
-import org.openrdf.model.Literal;
-import org.openrdf.model.Statement;
-import org.openrdf.model.URI;
-import org.openrdf.model.Value;
+import org.openrdf.model.*;
 import org.openrdf.model.impl.URIImpl;
 import org.openrdf.rio.RDFHandler;
 import org.openrdf.rio.RDFHandlerException;
@@ -65,6 +63,9 @@ public class KiWiHandler implements RDFHandler {
 
     private Statistics statistics;
 
+    // only used when statement existance check is enabled
+    protected KiWiTripleRegistry registry;
+
 
     protected Date importDate;
 
@@ -112,6 +113,9 @@ public class KiWiHandler implements RDFHandler {
                 });
 
 
+        if(config.isStatementExistanceCheck()) {
+            registry = new KiWiTripleRegistry(store);
+        }
     }
 
 
@@ -163,6 +167,9 @@ public class KiWiHandler implements RDFHandler {
     @Override
     public void endRDF() throws RDFHandlerException {
 
+        if(registry != null) {
+            registry.releaseTransaction(connection.getTransactionId());
+        }
 
         try {
             connection.commit();
@@ -246,9 +253,33 @@ public class KiWiHandler implements RDFHandler {
             }
 
             KiWiTriple result = new KiWiTriple(subject,predicate,object,context, importDate);
+
+            // statement existance check; use the triple registry to lookup if there are any concurrent triple creations
             if(config.isStatementExistanceCheck()) {
-                result.setId(connection.getTripleId(subject, predicate, object, context, true));
+                IntArray cacheKey = IntArray.createSPOCKey(subject, predicate, object, context);
+                long tripleId = registry.lookupKey(cacheKey);
+
+                if(tripleId >= 0) {
+                    // try getting id from registry
+                    result.setId(tripleId);
+
+                    registry.registerKey(cacheKey, connection.getTransactionId(), result.getId());
+                } else {
+                    // not found in registry, try loading from database
+                    result.setId(connection.getTripleId(subject,predicate,object,context,true));
+                }
+
+                // triple has no id from registry or database, so we create one and flag it for reasoning
+                if(result.getId() < 0) {
+                    result.setId(connection.getNextSequence("seq.triples"));
+                    result.setNewTriple(true);
+
+                    registry.registerKey(cacheKey, connection.getTransactionId(), result.getId());
+                }
+            } else {
+                result.setId(connection.getNextSequence("triples"));
             }
+
             storeTriple(result);
 
         } catch (SQLException | ExecutionException e) {
@@ -459,6 +490,10 @@ public class KiWiHandler implements RDFHandler {
         triples++;
 
         if(triples % config.getCommitBatchSize() == 0) {
+            if(registry != null) {
+                registry.releaseTransaction(connection.getTransactionId());
+            }
+
             connection.commit();
 
             printStatistics();

http://git-wip-us.apache.org/repos/asf/marmotta/blob/54670f4c/libraries/kiwi/kiwi-loader/src/main/resources/org/apache/marmotta/kiwi/loader/pgsql/create_indexes.sql
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-loader/src/main/resources/org/apache/marmotta/kiwi/loader/pgsql/create_indexes.sql b/libraries/kiwi/kiwi-loader/src/main/resources/org/apache/marmotta/kiwi/loader/pgsql/create_indexes.sql
index 02818ad..30169fe 100644
--- a/libraries/kiwi/kiwi-loader/src/main/resources/org/apache/marmotta/kiwi/loader/pgsql/create_indexes.sql
+++ b/libraries/kiwi/kiwi-loader/src/main/resources/org/apache/marmotta/kiwi/loader/pgsql/create_indexes.sql
@@ -1,4 +1,4 @@
-CREATE INDEX idx_triples_op ON triples(object,predicate) WHERE deleted = false;
+CREATE INDEX idx_triples_p ON triples(object,predicate) WHERE deleted = false;
 CREATE INDEX idx_triples_spo ON triples(subject,predicate,object) WHERE deleted = false;
 CREATE INDEX idx_triples_cspo ON triples(context,subject,predicate,object) WHERE deleted = false;
 CREATE INDEX idx_node_dcontent ON nodes(dvalue) WHERE dvalue IS NOT NULL;

http://git-wip-us.apache.org/repos/asf/marmotta/blob/54670f4c/libraries/kiwi/kiwi-loader/src/main/resources/org/apache/marmotta/kiwi/loader/pgsql/drop_indexes.sql
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-loader/src/main/resources/org/apache/marmotta/kiwi/loader/pgsql/drop_indexes.sql b/libraries/kiwi/kiwi-loader/src/main/resources/org/apache/marmotta/kiwi/loader/pgsql/drop_indexes.sql
index 40dbafb..f979357 100644
--- a/libraries/kiwi/kiwi-loader/src/main/resources/org/apache/marmotta/kiwi/loader/pgsql/drop_indexes.sql
+++ b/libraries/kiwi/kiwi-loader/src/main/resources/org/apache/marmotta/kiwi/loader/pgsql/drop_indexes.sql
@@ -1,4 +1,4 @@
-DROP INDEX IF EXISTS idx_triples_op;
+DROP INDEX IF EXISTS idx_triples_p;
 DROP INDEX IF EXISTS idx_triples_spo;
 DROP INDEX IF EXISTS idx_triples_cspo;
 DROP INDEX IF EXISTS idx_node_dcontent;

http://git-wip-us.apache.org/repos/asf/marmotta/blob/54670f4c/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/KiWiHandlerTest.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/KiWiHandlerTest.java b/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/KiWiHandlerTest.java
index 24d7723..71d8dac 100644
--- a/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/KiWiHandlerTest.java
+++ b/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/KiWiHandlerTest.java
@@ -8,11 +8,7 @@ import org.apache.marmotta.kiwi.persistence.mysql.MySQLDialect;
 import org.apache.marmotta.kiwi.persistence.pgsql.PostgreSQLDialect;
 import org.apache.marmotta.kiwi.sail.KiWiStore;
 import org.apache.marmotta.kiwi.test.junit.KiWiDatabaseRunner;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
+import org.junit.*;
 import org.junit.rules.TestWatcher;
 import org.junit.runner.Description;
 import org.junit.runner.RunWith;
@@ -20,10 +16,7 @@ import org.openrdf.repository.Repository;
 import org.openrdf.repository.RepositoryConnection;
 import org.openrdf.repository.RepositoryException;
 import org.openrdf.repository.sail.SailRepository;
-import org.openrdf.rio.RDFFormat;
-import org.openrdf.rio.RDFParseException;
-import org.openrdf.rio.RDFParser;
-import org.openrdf.rio.Rio;
+import org.openrdf.rio.*;
 import org.openrdf.sail.SailException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -54,13 +47,13 @@ public class KiWiHandlerTest {
     @Before
     public void initDatabase() throws RepositoryException, IOException, RDFParseException, SailException {
         store = new KiWiStore(dbConfig);
+        store.setDropTablesOnShutdown(true);
         repository = new SailRepository(store);
         repository.initialize();
     }
 
     @After
     public void dropDatabase() throws RepositoryException, SQLException, SailException {
-        store.getPersistence().dropDatabase();
         repository.shutDown();
     }
 
@@ -79,41 +72,56 @@ public class KiWiHandlerTest {
     };
 
     @Test
-    public void testImport() throws Exception {
+    public void testImportNoCheck() throws Exception {
+        testImport(new KiWiLoaderConfiguration());
+    }
+
+    @Test
+    public void testImportExistanceCheck() throws Exception {
+        KiWiLoaderConfiguration cfg = new KiWiLoaderConfiguration();
+        cfg.setStatementExistanceCheck(true);
+        testImport(cfg);
+    }
+
 
+    private void testImport(KiWiLoaderConfiguration c) throws RDFParseException, IOException, RDFHandlerException {
         KiWiHandler handler;
         if(store.getPersistence().getDialect() instanceof PostgreSQLDialect) {
-            handler = new KiWiPostgresHandler(store, new KiWiLoaderConfiguration());
+            handler = new KiWiPostgresHandler(store, c);
         } else if(store.getPersistence().getDialect() instanceof MySQLDialect) {
-            handler = new KiWiMySQLHandler(store, new KiWiLoaderConfiguration());
+            handler = new KiWiMySQLHandler(store, c);
         } else {
-            handler = new KiWiHandler(store,new KiWiLoaderConfiguration());
+            handler = new KiWiHandler(store, c);
         }
 
-        // bulk import
-        long start = System.currentTimeMillis();
-        RDFParser parser = Rio.createParser(RDFFormat.RDFXML);
-        parser.setRDFHandler(handler);
-        parser.parse(this.getClass().getResourceAsStream("demo-data.foaf"),"");
-
-        logger.info("bulk import in {} ms", System.currentTimeMillis() - start);
-
-        // check presence of data
         try {
-            RepositoryConnection con = repository.getConnection();
-            try {
-                con.begin();
+            // bulk import
+            long start = System.currentTimeMillis();
+            RDFParser parser = Rio.createParser(RDFFormat.RDFXML);
+            parser.setRDFHandler(handler);
+            parser.parse(this.getClass().getResourceAsStream("demo-data.foaf"),"");
 
-                Assert.assertTrue(con.hasStatement(null,null,null,true));
+            logger.info("bulk import in {} ms", System.currentTimeMillis() - start);
 
-                con.commit();
+            // check presence of data
+            try {
+                RepositoryConnection con = repository.getConnection();
+                try {
+                    con.begin();
+
+                    Assert.assertTrue(con.hasStatement(null,null,null,true));
+
+                    con.commit();
+                } catch(RepositoryException ex) {
+                    con.rollback();
+                } finally {
+                    con.close();
+                }
             } catch(RepositoryException ex) {
-                con.rollback();
-            } finally {
-                con.close();
+                ex.printStackTrace(); // TODO: handle error
             }
-        } catch(RepositoryException ex) {
-            ex.printStackTrace(); // TODO: handle error
+        } finally {
+            handler.shutdown();
         }
 
     }

http://git-wip-us.apache.org/repos/asf/marmotta/blob/54670f4c/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/KiWiLoaderTest.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/KiWiLoaderTest.java b/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/KiWiLoaderTest.java
index 86a913e..39d6fff 100644
--- a/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/KiWiLoaderTest.java
+++ b/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/KiWiLoaderTest.java
@@ -20,11 +20,7 @@ import org.openrdf.rio.RDFFormat;
 import org.openrdf.rio.RDFHandlerException;
 import org.openrdf.rio.RDFParseException;
 
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
+import java.io.*;
 import java.util.Properties;
 import java.util.zip.GZIPOutputStream;
 
@@ -237,7 +233,13 @@ public class KiWiLoaderTest {
         public Repository getRepository() {
             return super.repository;
         }
-        
+
+        @Override
+        public synchronized void shutdown() throws RepositoryException, RDFHandlerException {
+            store.setDropTablesOnShutdown(true);
+
+            super.shutdown();
+        }
     }
     
 

http://git-wip-us.apache.org/repos/asf/marmotta/blob/54670f4c/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/PGCopyUtilTest.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/PGCopyUtilTest.java b/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/PGCopyUtilTest.java
index 8dfcf68..ce80ccc 100644
--- a/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/PGCopyUtilTest.java
+++ b/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/PGCopyUtilTest.java
@@ -4,14 +4,7 @@ import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.marmotta.commons.vocabulary.XSD;
 import org.apache.marmotta.kiwi.config.KiWiConfiguration;
 import org.apache.marmotta.kiwi.loader.pgsql.PGCopyUtil;
-import org.apache.marmotta.kiwi.model.rdf.KiWiAnonResource;
-import org.apache.marmotta.kiwi.model.rdf.KiWiBooleanLiteral;
-import org.apache.marmotta.kiwi.model.rdf.KiWiDateLiteral;
-import org.apache.marmotta.kiwi.model.rdf.KiWiDoubleLiteral;
-import org.apache.marmotta.kiwi.model.rdf.KiWiIntLiteral;
-import org.apache.marmotta.kiwi.model.rdf.KiWiNode;
-import org.apache.marmotta.kiwi.model.rdf.KiWiStringLiteral;
-import org.apache.marmotta.kiwi.model.rdf.KiWiUriResource;
+import org.apache.marmotta.kiwi.model.rdf.*;
 import org.apache.marmotta.kiwi.persistence.KiWiConnection;
 import org.apache.marmotta.kiwi.persistence.pgsql.PostgreSQLDialect;
 import org.apache.marmotta.kiwi.sail.KiWiStore;
@@ -32,11 +25,7 @@ import java.io.IOException;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-import java.util.Random;
-import java.util.UUID;
+import java.util.*;
 
 import static org.junit.Assert.assertTrue;
 
@@ -76,6 +65,7 @@ public class PGCopyUtilTest {
         rnd = new Random();
 
         store = new KiWiStore(psql);
+        store.setDropTablesOnShutdown(true);
         repository = new SailRepository(store);
         repository.initialize();
     }
@@ -84,10 +74,11 @@ public class PGCopyUtilTest {
     public void dropDatabase() throws RepositoryException, SQLException, SailException {
         log.info("cleaning up test setup...");
         if (store != null && store.isInitialized()) {
+            try {
             assertTrue(store.checkConsistency());
-            store.closeValueFactory(); // release all connections before dropping the database
-            store.getPersistence().dropDatabase();
-            repository.shutDown();
+            } finally {
+                repository.shutDown();
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/marmotta/blob/54670f4c/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/sail/KiWiStore.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/sail/KiWiStore.java b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/sail/KiWiStore.java
index 513e55e..899b941 100644
--- a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/sail/KiWiStore.java
+++ b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/sail/KiWiStore.java
@@ -176,6 +176,7 @@ public class KiWiStore extends NotifyingSailBase {
 
         if(dropTablesOnShutdown) {
             try {
+                logger.info("dropping database tables ...");
                 persistence.dropDatabase();
             } catch (SQLException e) {
                 logger.error("error dropping database: {}", e.getMessage());