You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@marmotta.apache.org by ss...@apache.org on 2014/02/05 12:47:37 UTC

git commit: batched triple store selects for improved performance

Updated Branches:
  refs/heads/develop b3be446e1 -> bfd688b94


batched triple store selects for improved performance


Project: http://git-wip-us.apache.org/repos/asf/marmotta/repo
Commit: http://git-wip-us.apache.org/repos/asf/marmotta/commit/bfd688b9
Tree: http://git-wip-us.apache.org/repos/asf/marmotta/tree/bfd688b9
Diff: http://git-wip-us.apache.org/repos/asf/marmotta/diff/bfd688b9

Branch: refs/heads/develop
Commit: bfd688b945f27813f1854ff945234e7756753248
Parents: b3be446
Author: Sebastian Schaffert <ss...@apache.org>
Authored: Wed Feb 5 12:47:25 2014 +0100
Committer: Sebastian Schaffert <ss...@apache.org>
Committed: Wed Feb 5 12:47:25 2014 +0100

----------------------------------------------------------------------
 .../kiwi/persistence/KiWiConnection.java        | 240 ++++++++++++++++++-
 .../kiwi/persistence/h2/statements.properties   |   1 +
 .../persistence/mysql/statements.properties     |   1 +
 .../persistence/pgsql/statements.properties     |   1 +
 .../kiwi/test/profile/ProfileLoading.java       | 103 ++++++++
 5 files changed, 338 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/marmotta/blob/bfd688b9/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java
index 5093644..500b324 100644
--- a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java
+++ b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java
@@ -449,6 +449,55 @@ public class KiWiConnection implements AutoCloseable {
 
     }
 
+    /**
+     * Batch load the nodes with the given ids. This method aims to offer performance improvements by reducing
+     * database roundtrips.
+     * @param ids array of ids to retrieve
+     * @return array of nodes corresponding to these ids (in the same order)
+     * @throws SQLException
+     */
+    public KiWiNode[] loadNodesByIds(Long... ids) throws SQLException {
+        KiWiNode[] result = new KiWiNode[ids.length];
+
+        // first look in the cache for any ids that have already been loaded
+        ArrayList<Long> toFetch = new ArrayList<>();
+        for(int i=0; i < ids.length; i++) {
+            result[i] = nodeCache.get(ids[i]);
+            if(result[i] == null) {
+                toFetch.add(ids[i]);
+            }
+        }
+
+        if(toFetch.size() > 0) {
+            PreparedStatement query = getPreparedStatement("load.nodes_by_ids", toFetch.size());
+            synchronized (query) {
+
+                for(int i=0; i<toFetch.size(); i++) {
+                    query.setLong(i+1, toFetch.get(i));
+                }
+                query.setMaxRows(toFetch.size());
+
+                // run the database query and if it yields a result, construct a new node; the method call will take care of
+                // caching the constructed node for future calls
+                ResultSet rows = query.executeQuery();
+                try {
+                    while(rows.next()) {
+                        KiWiNode n = constructNodeFromDatabase(rows);
+                        for(int i=0; i<ids.length; i++) {
+                            if(ids[i].longValue() == n.getId()) {
+                                result[i] = n;
+                            }
+                        }
+                    }
+                } finally {
+                    rows.close();
+                }
+            }
+
+        }
+        return result;
+    }
+
     public KiWiTriple loadTripleById(Long id) throws SQLException {
 
         // look in cache
@@ -1497,12 +1546,47 @@ public class KiWiConnection implements AutoCloseable {
 
         final ResultSet result = query.executeQuery();
 
-        return new ResultSetIteration<Statement>(result, true, new ResultTransformerFunction<Statement>() {
+
+        return new CloseableIteration<Statement, SQLException>() {
+
+            List<KiWiTriple> batch = null;
+            int batchPosition = 0;
+
             @Override
-            public Statement apply(ResultSet row) throws SQLException {
-                return constructTripleFromDatabase(result);
+            public void close() throws SQLException {
+                result.close();
             }
-        });
+
+            @Override
+            public boolean hasNext() throws SQLException {
+                fetchBatch();
+
+                return batch.size() > batchPosition;
+            }
+
+            @Override
+            public Statement next() throws SQLException {
+                fetchBatch();
+
+                if(batch.size() > batchPosition) {
+                    return batch.get(batchPosition++);
+                }  else {
+                    return null;
+                }
+            }
+
+            private void fetchBatch() throws SQLException {
+                if(batch == null || batch.size() <= batchPosition) {
+                    batch = constructTriplesFromDatabase(result, 100);
+                    batchPosition = 0;
+                }
+            }
+
+            @Override
+            public void remove() throws SQLException {
+                throw new UnsupportedOperationException("removing results not supported");
+            }
+        };
     }
 
     /**
@@ -1677,10 +1761,17 @@ public class KiWiConnection implements AutoCloseable {
 
         KiWiTriple result = new KiWiTriple();
         result.setId(id);
-        result.setSubject((KiWiResource)loadNodeById(row.getLong(2)));
-        result.setPredicate((KiWiUriResource) loadNodeById(row.getLong(3)));
-        result.setObject(loadNodeById(row.getLong(4)));
-        result.setContext((KiWiResource) loadNodeById(row.getLong(5)));
+
+        KiWiNode[] batch = loadNodesByIds(row.getLong(2), row.getLong(3), row.getLong(4), row.getLong(5));
+        result.setSubject((KiWiResource) batch[0]);
+        result.setPredicate((KiWiUriResource) batch[1]);
+        result.setObject(batch[2]);
+        result.setContext((KiWiResource) batch[3]);
+
+//        result.setSubject((KiWiResource)loadNodeById(row.getLong(2)));
+//        result.setPredicate((KiWiUriResource) loadNodeById(row.getLong(3)));
+//        result.setObject(loadNodeById(row.getLong(4)));
+//        result.setContext((KiWiResource) loadNodeById(row.getLong(5)));
         if(row.getLong(8) != 0) {
             result.setCreator((KiWiResource)loadNodeById(row.getLong(8)));
         }
@@ -1702,6 +1793,106 @@ public class KiWiConnection implements AutoCloseable {
     }
 
 
+    /**
+     * Construct a batch of KiWiTriples from the result of an SQL query. This query differs from constructTripleFromDatabase
+     * in that it does a batch-prefetching for optimized performance
+     *
+     * @param row a database result containing the columns described above
+     * @return a KiWiTriple representation of the database result
+     */
+    protected List<KiWiTriple> constructTriplesFromDatabase(ResultSet row, int maxPrefetch) throws SQLException {
+        int count = 0;
+
+        List<KiWiTriple> result = new ArrayList<>();
+        Map<Long,Long[]> tripleIds  = new HashMap<>();
+        Set<Long> nodeIds   = new HashSet<>();
+        while(count < maxPrefetch && row.next()) {
+            count++;
+
+            if(row.isClosed()) {
+                throw new ResultInterruptedException("retrieving results has been interrupted");
+            }
+
+            // columns: id,subject,predicate,object,context,deleted,inferred,creator,createdAt,deletedAt
+            //          1 ,2      ,3        ,4     ,5      ,6      ,7       ,8      ,9        ,10
+
+            Long id = row.getLong(1);
+
+            KiWiTriple cached = tripleCache.get(id);
+
+            // lookup element in cache first, so we can avoid reconstructing it if it is already there
+            if(cached != null) {
+                result.add(cached);
+            } else {
+
+                KiWiTriple triple = new KiWiTriple();
+                triple.setId(id);
+
+                // collect node ids for batch retrieval
+                nodeIds.add(row.getLong(2));
+                nodeIds.add(row.getLong(3));
+                nodeIds.add(row.getLong(4));
+
+                if(row.getLong(5) != 0) {
+                    nodeIds.add(row.getLong(5));
+                }
+
+                if(row.getLong(8) != 0) {
+                    nodeIds.add(row.getLong(8));
+                }
+
+                // remember which node ids where relevant for the triple
+                tripleIds.put(id,new Long[] { row.getLong(2),row.getLong(3),row.getLong(4),row.getLong(5),row.getLong(8) });
+
+                triple.setDeleted(row.getBoolean(6));
+                triple.setInferred(row.getBoolean(7));
+                triple.setCreated(new Date(row.getTimestamp(9).getTime()));
+                try {
+                    if(row.getDate(10) != null) {
+                        triple.setDeletedAt(new Date(row.getTimestamp(10).getTime()));
+                    }
+                } catch (SQLException ex) {
+                    // work around a MySQL problem with null dates
+                    // (see http://stackoverflow.com/questions/782823/handling-datetime-values-0000-00-00-000000-in-jdbc)
+                }
+
+                result.add(triple);
+            }
+        }
+
+        KiWiNode[] nodes = loadNodesByIds(nodeIds.toArray(new Long[nodeIds.size()]));
+        Map<Long,KiWiNode> nodeMap = new HashMap<>();
+        for(int i=0; i<nodes.length; i++) {
+            nodeMap.put(nodes[i].getId(), nodes[i]);
+        }
+
+        for(KiWiTriple triple : result) {
+            if(tripleIds.containsKey(triple.getId())) {
+                // need to set subject, predicate, object, context and creator
+                Long[] ids = tripleIds.get(triple.getId());
+                triple.setSubject((KiWiResource) nodeMap.get(ids[0]));
+                triple.setPredicate((KiWiUriResource) nodeMap.get(ids[1]));
+                triple.setObject(nodeMap.get(ids[2]));
+
+                if(ids[3] != 0) {
+                    triple.setContext((KiWiResource) nodeMap.get(ids[3]));
+                }
+
+                if(ids[4] != 0) {
+                    triple.setCreator((KiWiResource) nodeMap.get(ids[4]));
+                }
+
+            }
+
+            cacheTriple(triple);
+        }
+
+
+
+        return result;
+    }
+
+
     protected static Locale getLocale(String language) {
         Locale locale = localeMap.get(language);
         if(locale == null && language != null && !language.isEmpty()) {
@@ -1741,6 +1932,39 @@ public class KiWiConnection implements AutoCloseable {
     }
 
     /**
+     * Return the prepared statement with the given identifier; first looks in the statement cache and if it does
+     * not exist there create a new statement. This method is used for building statements with variable argument
+     * numbers (e.g. in an IN).
+     *
+     * @param key the id of the statement in statements.properties
+     * @return
+     * @throws SQLException
+     */
+    public PreparedStatement getPreparedStatement(String key, int numberOfArguments) throws SQLException {
+        requireJDBCConnection();
+
+        PreparedStatement statement = statementCache.get(key+numberOfArguments);
+        if(statement == null || statement.isClosed()) {
+            StringBuilder s = new StringBuilder();
+            for(int i=0; i<numberOfArguments; i++) {
+                if(i != 0) {
+                    s.append(',');
+                }
+                s.append('?');
+            }
+
+            statement = connection.prepareStatement(String.format(dialect.getStatement(key),s.toString(), numberOfArguments), ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+            statementCache.put(key+numberOfArguments,statement);
+        }
+        statement.clearParameters();
+        if(persistence.getDialect().isCursorSupported()) {
+            statement.setFetchSize(persistence.getConfiguration().getCursorSize());
+        }
+        return statement;
+    }
+
+
+    /**
      * Get next number in a sequence; for databases without sequence support (e.g. MySQL), this method will first update a
      * sequence table and then return the value.
      *

http://git-wip-us.apache.org/repos/asf/marmotta/blob/bfd688b9/libraries/kiwi/kiwi-triplestore/src/main/resources/org/apache/marmotta/kiwi/persistence/h2/statements.properties
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-triplestore/src/main/resources/org/apache/marmotta/kiwi/persistence/h2/statements.properties b/libraries/kiwi/kiwi-triplestore/src/main/resources/org/apache/marmotta/kiwi/persistence/h2/statements.properties
index 3e37358..8321f52 100644
--- a/libraries/kiwi/kiwi-triplestore/src/main/resources/org/apache/marmotta/kiwi/persistence/h2/statements.properties
+++ b/libraries/kiwi/kiwi-triplestore/src/main/resources/org/apache/marmotta/kiwi/persistence/h2/statements.properties
@@ -26,6 +26,7 @@ meta.get               = SELECT mvalue FROM metadata WHERE mkey = ?;
 
 # load entities
 load.node_by_id        = SELECT id,ntype,svalue,ivalue,dvalue,tvalue,bvalue,lang,ltype,createdAt FROM nodes WHERE id = ?
+load.nodes_by_ids        = SELECT id,ntype,svalue,ivalue,dvalue,tvalue,bvalue,lang,ltype,createdAt FROM nodes WHERE id IN(%s) LIMIT %d
 
 load.uri_by_uri        = SELECT id,ntype,svalue,createdAt FROM nodes WHERE ntype = 'uri' AND svalue = ?
 

http://git-wip-us.apache.org/repos/asf/marmotta/blob/bfd688b9/libraries/kiwi/kiwi-triplestore/src/main/resources/org/apache/marmotta/kiwi/persistence/mysql/statements.properties
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-triplestore/src/main/resources/org/apache/marmotta/kiwi/persistence/mysql/statements.properties b/libraries/kiwi/kiwi-triplestore/src/main/resources/org/apache/marmotta/kiwi/persistence/mysql/statements.properties
index 11fe0fa..fc61cc2 100644
--- a/libraries/kiwi/kiwi-triplestore/src/main/resources/org/apache/marmotta/kiwi/persistence/mysql/statements.properties
+++ b/libraries/kiwi/kiwi-triplestore/src/main/resources/org/apache/marmotta/kiwi/persistence/mysql/statements.properties
@@ -25,6 +25,7 @@ meta.get               = SELECT mvalue FROM metadata WHERE mkey = ?;
 
 # load entities
 load.node_by_id        = SELECT id,ntype,svalue,ivalue,dvalue,tvalue,bvalue,lang,ltype,createdAt FROM nodes WHERE id = ?
+load.nodes_by_ids        = SELECT id,ntype,svalue,ivalue,dvalue,tvalue,bvalue,lang,ltype,createdAt FROM nodes WHERE id IN(%s) LIMIT %d
 
 load.uri_by_uri        = SELECT id,ntype,svalue,createdAt FROM nodes WHERE ntype = 'uri' AND svalue = ?
 

http://git-wip-us.apache.org/repos/asf/marmotta/blob/bfd688b9/libraries/kiwi/kiwi-triplestore/src/main/resources/org/apache/marmotta/kiwi/persistence/pgsql/statements.properties
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-triplestore/src/main/resources/org/apache/marmotta/kiwi/persistence/pgsql/statements.properties b/libraries/kiwi/kiwi-triplestore/src/main/resources/org/apache/marmotta/kiwi/persistence/pgsql/statements.properties
index 0d1c1de..cc4af52 100644
--- a/libraries/kiwi/kiwi-triplestore/src/main/resources/org/apache/marmotta/kiwi/persistence/pgsql/statements.properties
+++ b/libraries/kiwi/kiwi-triplestore/src/main/resources/org/apache/marmotta/kiwi/persistence/pgsql/statements.properties
@@ -25,6 +25,7 @@ meta.version           = SELECT mvalue FROM metadata WHERE mkey = 'version';
 
 # load entities
 load.node_by_id        = SELECT id,ntype,svalue,ivalue,dvalue,tvalue,bvalue,lang,ltype,createdAt FROM nodes WHERE id = ?
+load.nodes_by_ids        = SELECT id,ntype,svalue,ivalue,dvalue,tvalue,bvalue,lang,ltype,createdAt FROM nodes WHERE id IN(%s) LIMIT %d
 
 load.uri_by_uri        = SELECT id,ntype,svalue,createdAt FROM nodes WHERE ntype = 'uri' AND svalue = ?
 

http://git-wip-us.apache.org/repos/asf/marmotta/blob/bfd688b9/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/profile/ProfileLoading.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/profile/ProfileLoading.java b/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/profile/ProfileLoading.java
new file mode 100644
index 0000000..1fe5adb
--- /dev/null
+++ b/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/profile/ProfileLoading.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.marmotta.kiwi.test.profile;
+
+import org.apache.marmotta.kiwi.config.KiWiConfiguration;
+import org.apache.marmotta.kiwi.persistence.pgsql.PostgreSQLDialect;
+import org.apache.marmotta.kiwi.sail.KiWiStore;
+import org.openrdf.model.Statement;
+import org.openrdf.repository.Repository;
+import org.openrdf.repository.RepositoryConnection;
+import org.openrdf.repository.RepositoryException;
+import org.openrdf.repository.RepositoryResult;
+import org.openrdf.repository.sail.SailRepository;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An executable class that can be used for profiling purposes. It requires a JDBC connection string to
+ * an existing (filled) database for running.
+ *
+ * @author Sebastian Schaffert (sschaffert@apache.org)
+ */
+public class ProfileLoading {
+
+    private static Logger log = LoggerFactory.getLogger(ProfileLoading.class);
+
+    private KiWiStore store;
+
+    private Repository repository;
+
+
+    public ProfileLoading(String jdbcUrl, String user, String password) throws RepositoryException {
+        this(new KiWiConfiguration("profiling",jdbcUrl,user,password, new PostgreSQLDialect()));
+    }
+
+    public ProfileLoading(KiWiConfiguration cfg) throws RepositoryException {
+        store = new KiWiStore(cfg);
+        repository = new SailRepository(store);
+        repository.initialize();
+    }
+
+
+    public void profileListStatements() throws RepositoryException {
+        RepositoryConnection con = repository.getConnection();
+        try {
+            con.begin();
+
+            long start = System.currentTimeMillis();
+            long stmts = 0;
+
+            RepositoryResult<Statement> r = con.getStatements(null,null,null,true);
+            while(r.hasNext()) {
+                Statement s = r.next();
+                stmts ++;
+            }
+
+            long end = System.currentTimeMillis();
+
+            log.info("listed {} triples in {} ms", stmts, end-start);
+
+
+            con.commit();
+        } catch(RepositoryException ex) {
+            con.rollback();
+        } finally {
+            con.close();
+        }
+
+    }
+
+
+    public void shutdown() throws RepositoryException {
+        repository.shutDown();
+    }
+
+    public static void main(String[] args) throws RepositoryException {
+        if(args.length != 3) {
+            log.error("arguments: <jdbc-url> <user> <password>");
+            System.exit(1);
+        }
+
+        ProfileLoading l = new ProfileLoading(args[0],args[1],args[2]);
+        l.profileListStatements();
+        l.shutdown();
+
+    }
+
+}