You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@marmotta.apache.org by wi...@apache.org on 2015/10/12 18:07:59 UTC

marmotta git commit: MARMOTTA-584: dynamically adapted the low-level loader statements

Repository: marmotta
Updated Branches:
  refs/heads/MARMOTTA-584 fa32ed2a7 -> fc5bb28de


MARMOTTA-584: dynamically adapted the low-level loader statements


Project: http://git-wip-us.apache.org/repos/asf/marmotta/repo
Commit: http://git-wip-us.apache.org/repos/asf/marmotta/commit/fc5bb28d
Tree: http://git-wip-us.apache.org/repos/asf/marmotta/tree/fc5bb28d
Diff: http://git-wip-us.apache.org/repos/asf/marmotta/diff/fc5bb28d

Branch: refs/heads/MARMOTTA-584
Commit: fc5bb28dea7ca1a3a66818591f49eabf0d81d6e6
Parents: fa32ed2
Author: Sergio Fernández <wi...@apache.org>
Authored: Mon Oct 12 18:07:39 2015 +0200
Committer: Sergio Fernández <wi...@apache.org>
Committed: Mon Oct 12 18:07:39 2015 +0200

----------------------------------------------------------------------
 .../kiwi/loader/generic/KiWiBatchHandler.java   | 26 ++++++------
 .../kiwi/loader/pgsql/KiWiPostgresHandler.java  | 13 +++---
 .../marmotta/kiwi/loader/pgsql/PGCopyUtil.java  | 43 ++++++++++++++------
 .../marmotta/kiwi/loader/KiWiHandlerTest.java   | 23 ++++++++---
 .../marmotta/kiwi/loader/PGCopyUtilTest.java    | 29 +++++++++----
 .../kiwi/persistence/KiWiConnection.java        |  1 -
 .../apache/marmotta/kiwi/sail/KiWiStore.java    |  6 +--
 7 files changed, 90 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/marmotta/blob/fc5bb28d/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/generic/KiWiBatchHandler.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/generic/KiWiBatchHandler.java b/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/generic/KiWiBatchHandler.java
index 8dc0c14..a28649d 100644
--- a/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/generic/KiWiBatchHandler.java
+++ b/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/generic/KiWiBatchHandler.java
@@ -20,6 +20,7 @@ package org.apache.marmotta.kiwi.loader.generic;
 import org.apache.marmotta.commons.sesame.model.LiteralCommons;
 import org.apache.marmotta.kiwi.loader.KiWiLoaderConfiguration;
 import org.apache.marmotta.kiwi.model.rdf.*;
+import org.apache.marmotta.kiwi.persistence.KiWiConnection;
 import org.apache.marmotta.kiwi.sail.KiWiStore;
 import org.openrdf.model.Literal;
 import org.openrdf.rio.RDFHandler;
@@ -43,7 +44,6 @@ public abstract class KiWiBatchHandler extends KiWiHandler implements RDFHandler
 
     private static Logger log = LoggerFactory.getLogger(KiWiBatchHandler.class);
 
-
     protected List<KiWiNode> nodeBacklog;
     protected List<KiWiTriple> tripleBacklog;
 
@@ -51,7 +51,6 @@ public abstract class KiWiBatchHandler extends KiWiHandler implements RDFHandler
     protected Map<String,KiWiUriResource> uriBacklogLookup;
     protected Map<String,KiWiAnonResource> bnodeBacklogLookup;
 
-
     protected String backend;
 
     /**
@@ -63,11 +62,9 @@ public abstract class KiWiBatchHandler extends KiWiHandler implements RDFHandler
      */
     public KiWiBatchHandler(String backend, KiWiStore store, KiWiLoaderConfiguration config) {
         super(store, config);
-
         this.backend = backend;
     }
 
-
     /**
      * Perform initialisation, e.g. dropping indexes or other preparations.
      */
@@ -121,10 +118,8 @@ public abstract class KiWiBatchHandler extends KiWiHandler implements RDFHandler
         this.bnodeBacklogLookup = new HashMap<>();
 
         super.startRDF();
-
     }
 
-
     /**
      * Signals the end of the RDF data. This method is called when all data has
      * been reported.
@@ -140,10 +135,7 @@ public abstract class KiWiBatchHandler extends KiWiHandler implements RDFHandler
         } catch (SQLException e) {
             throw new RDFHandlerException(e);
         }
-
-
         super.endRDF();
-
     }
 
 
@@ -220,14 +212,12 @@ public abstract class KiWiBatchHandler extends KiWiHandler implements RDFHandler
         }
     }
 
-
     /**
      * Flush the backlog (nodeBacklog and tripleBacklog) to the database; needs to be implemented by subclasses.
      * @throws SQLException
      */
     protected abstract void flushBacklogInternal() throws SQLException;
 
-
     private synchronized void flushBacklog() throws SQLException {
         flushBacklogInternal();
 
@@ -237,7 +227,6 @@ public abstract class KiWiBatchHandler extends KiWiHandler implements RDFHandler
         uriBacklogLookup.clear();
         bnodeBacklogLookup.clear();
         literalBacklogLookup.clear();
-
     }
 
     /**
@@ -255,4 +244,17 @@ public abstract class KiWiBatchHandler extends KiWiHandler implements RDFHandler
      */
     protected abstract void createIndexes() throws SQLException;
 
+    /**
+     * Return the schema version if necessary
+     *
+     * @return
+     * @throws SQLException
+     */
+    protected int getDbVersion() throws SQLException {
+        final KiWiConnection conn = store.getPersistence().getConnection();
+        final String version = conn.getMetadata("version");
+        conn.close();
+        return Integer.parseInt(version);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/marmotta/blob/fc5bb28d/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/KiWiPostgresHandler.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/KiWiPostgresHandler.java b/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/KiWiPostgresHandler.java
index 4f63efe..77b28c2 100644
--- a/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/KiWiPostgresHandler.java
+++ b/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/KiWiPostgresHandler.java
@@ -41,18 +41,22 @@ public class KiWiPostgresHandler extends KiWiBatchHandler implements RDFHandler
 
     private static Logger log = LoggerFactory.getLogger(KiWiPostgresHandler.class);
 
+    private String columns = "id,ntype,svalue,dvalue,ivalue,tvalue,tzoffset,bvalue,ltype,lang,createdAt";
 
-
-    public KiWiPostgresHandler(KiWiStore store, KiWiLoaderConfiguration config) {
+    public KiWiPostgresHandler(KiWiStore store, KiWiLoaderConfiguration config) throws SQLException {
         super("PostgreSQL", store, config);
-    }
 
+        final int version = getDbVersion();
+        if (version >= 5) {
+            columns += ",gvalue";
+        }
+    }
 
     @Override
     protected void flushBacklogInternal() throws SQLException {
         try {
             // flush out nodes
-            PGCopyOutputStream nodesOut = new PGCopyOutputStream(PGCopyUtil.getWrappedConnection(connection.getJDBCConnection()), "COPY nodes(id,ntype,svalue,dvalue,ivalue,tvalue,tzoffset,bvalue,ltype,lang,createdAt) FROM STDIN (FORMAT csv)");
+            PGCopyOutputStream nodesOut = new PGCopyOutputStream(PGCopyUtil.getWrappedConnection(connection.getJDBCConnection()), "COPY nodes(" + columns + ") FROM STDIN (FORMAT csv)");
             PGCopyUtil.flushNodes(nodeBacklog, nodesOut);
             nodesOut.close();
 
@@ -65,7 +69,6 @@ public class KiWiPostgresHandler extends KiWiBatchHandler implements RDFHandler
         }
     }
 
-
     @Override
     protected void dropIndexes() throws SQLException {
         try {

http://git-wip-us.apache.org/repos/asf/marmotta/blob/fc5bb28d/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/PGCopyUtil.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/PGCopyUtil.java b/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/PGCopyUtil.java
index 6483192..ec4fdc7 100644
--- a/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/PGCopyUtil.java
+++ b/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/PGCopyUtil.java
@@ -19,6 +19,7 @@ package org.apache.marmotta.kiwi.loader.pgsql;
 import org.apache.commons.lang3.math.NumberUtils;
 import org.apache.marmotta.kiwi.loader.csv.*;
 import org.apache.marmotta.kiwi.model.rdf.*;
+import org.apache.marmotta.kiwi.persistence.KiWiDialect;
 import org.joda.time.DateTime;
 import org.openrdf.model.URI;
 import org.postgresql.PGConnection;
@@ -43,7 +44,7 @@ import java.util.List;
 import java.util.Locale;
 
 /**
- * Postgres copy utility
+ * PostgreSQL copy utility
  *
  * @author Sebastian Schaffert (sschaffert@apache.org)
  */
@@ -65,7 +66,6 @@ public class PGCopyUtil {
             new SQLTimestampProcessor(),              // createdAt
     };
 
-
     final static CellProcessor[] tripleProcessors = new CellProcessor[] {
             new NotNull(),                            // triple ID
             new NodeIDProcessor(),                    // subject
@@ -94,19 +94,19 @@ public class PGCopyUtil {
         }
     }).build();
 
-
     /**
-     * Return a PGConnection wrapped by the tomcat connection pool so we are able to access PostgreSQL specific functionality.
-     * @param con
+     * Return a PGConnection wrapped by the tomcat connection pool,
+     * so we are able to access PostgreSQL specific functionality.
+     *
+     * @param conn
      * @return
      */
-    public static PGConnection getWrappedConnection(Connection con) throws SQLException {
-        if(con instanceof PGConnection) {
-            return (PGConnection)con;
+    public static PGConnection getWrappedConnection(Connection conn) throws SQLException {
+        if(conn instanceof PGConnection) {
+            return (PGConnection)conn;
         } else {
-            return (PGConnection) ((javax.sql.PooledConnection)con).getConnection();
+            return (PGConnection) ((javax.sql.PooledConnection)conn).getConnection();
         }
-
     }
 
     public static void flushTriples(Iterable<KiWiTriple> tripleBacklog, OutputStream out) throws IOException {
@@ -134,10 +134,14 @@ public class PGCopyUtil {
     }
 
     public static void flushNodes(Iterable<KiWiNode> nodeBacklog, OutputStream out) throws IOException {
+        flushNodes(nodeBacklog, out, KiWiDialect.VERSION);
+    }
+
+    public static void flushNodes(Iterable<KiWiNode> nodeBacklog, OutputStream out, int version) throws IOException {
         CsvListWriter writer = new CsvListWriter(new OutputStreamWriter(out), nodesPreference);
 
         // reuse the same array to avoid unnecessary object allocation
-        Object[] rowArray = new Object[12]; //FIXME: 11 in schema v4, 12 in v5
+        Object[] rowArray = new Object[version >= 5 ? 12 : 11]; //schema v5 adds a new 'geom' column
         List<Object> row = Arrays.asList(rowArray);
 
         for(KiWiNode n : nodeBacklog) {
@@ -179,7 +183,7 @@ public class PGCopyUtil {
                 log.warn("unknown node type, cannot flush to import stream: {}", n.getClass());
             }
 
-            writer.write(row, nodeProcessors);
+            writer.write(row, getNodeProcessors(version));
         }
         writer.close();
     }
@@ -196,7 +200,20 @@ public class PGCopyUtil {
         a[8] = dtype;
         a[9] = lang != null ? lang.getLanguage() : "";
         a[10] = created;
-        a[11] = geom; //FIXME: drop in v4 schema testing
+
+        if (a.length == 12) {
+            a[11] = geom; //schema v5
+        }
+    }
+
+    public static CellProcessor[] getNodeProcessors(int version) {
+        if (version >= 5) {
+            CellProcessor[] newNodeProcessors = Arrays.copyOf(nodeProcessors, nodeProcessors.length+1);
+            newNodeProcessors[nodeProcessors.length] = new Optional();
+            return newNodeProcessors;
+        } else {
+            return nodeProcessors;
+        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/marmotta/blob/fc5bb28d/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/KiWiHandlerTest.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/KiWiHandlerTest.java b/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/KiWiHandlerTest.java
index 1f54a2a..9ab4fd2 100644
--- a/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/KiWiHandlerTest.java
+++ b/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/KiWiHandlerTest.java
@@ -21,6 +21,7 @@ import org.apache.marmotta.kiwi.config.KiWiConfiguration;
 import org.apache.marmotta.kiwi.loader.generic.KiWiHandler;
 import org.apache.marmotta.kiwi.loader.mysql.KiWiMySQLHandler;
 import org.apache.marmotta.kiwi.loader.pgsql.KiWiPostgresHandler;
+import org.apache.marmotta.kiwi.persistence.KiWiConnection;
 import org.apache.marmotta.kiwi.persistence.mysql.MySQLDialect;
 import org.apache.marmotta.kiwi.persistence.pgsql.PostgreSQLDialect;
 import org.apache.marmotta.kiwi.sail.KiWiStore;
@@ -53,6 +54,8 @@ import java.util.List;
 @RunWith(KiWiDatabaseRunner.class)
 public class KiWiHandlerTest {
 
+    final Logger logger = LoggerFactory.getLogger(this.getClass());
+
     private KiWiStore store;
     private Repository repository;
 
@@ -64,7 +67,6 @@ public class KiWiHandlerTest {
         dbConfig.setFulltextLanguages(new String[] {"en"});
     }
 
-
     @Before
     public void initDatabase() throws RepositoryException, IOException, RDFParseException, SailException {
         store = new KiWiStore(dbConfig);
@@ -78,9 +80,6 @@ public class KiWiHandlerTest {
         repository.shutDown();
     }
 
-    final Logger logger =
-            LoggerFactory.getLogger(this.getClass());
-
     @Rule
     public TestWatcher watchman = new TestWatcher() {
         /**
@@ -131,8 +130,7 @@ public class KiWiHandlerTest {
 
     }
 
-
-    private void testImport(KiWiLoaderConfiguration c, String file, RDFFormat fmt) throws RDFParseException, IOException, RDFHandlerException {
+    private void testImport(KiWiLoaderConfiguration c, String file, RDFFormat fmt) throws RDFParseException, IOException, RDFHandlerException, SQLException {
         KiWiHandler handler;
         if(store.getPersistence().getDialect() instanceof PostgreSQLDialect) {
             handler = new KiWiPostgresHandler(store, c);
@@ -174,4 +172,17 @@ public class KiWiHandlerTest {
 
     }
 
+    /**
+     * Return the schema version if necessary
+     *
+     * @return
+     * @throws SQLException
+     */
+    protected int getDbVersion() throws SQLException {
+        final KiWiConnection conn = store.getPersistence().getConnection();
+        final String version = conn.getMetadata("version");
+        conn.close();
+        return Integer.parseInt(version);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/marmotta/blob/fc5bb28d/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/PGCopyUtilTest.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/PGCopyUtilTest.java b/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/PGCopyUtilTest.java
index e7f27a9..64f76ee 100644
--- a/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/PGCopyUtilTest.java
+++ b/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/PGCopyUtilTest.java
@@ -69,8 +69,6 @@ public class PGCopyUtilTest {
     final static KiWiUriResource TYPE_DATE = createURI(XSD.DateTime.stringValue());
     final static KiWiStringLiteral EMPTY = createLiteral("");
 
-
-
     private KiWiStore store;
 
     private SailRepository repository;
@@ -95,7 +93,7 @@ public class PGCopyUtilTest {
         log.info("cleaning up test setup...");
         if (store != null && store.isInitialized()) {
             try {
-            assertTrue(store.checkConsistency());
+                assertTrue(store.checkConsistency());
             } finally {
                 repository.shutDown();
             }
@@ -104,9 +102,11 @@ public class PGCopyUtilTest {
 
     @Test
     public void testWriteNodes() throws IOException, SQLException {
-        KiWiConnection con = store.getPersistence().getConnection();
+        final int version = getDbVersion();
+
+        KiWiConnection conn = store.getPersistence().getConnection();
 
-        PGCopyOutputStream out = new PGCopyOutputStream(PGCopyUtil.getWrappedConnection(con.getJDBCConnection()), "COPY nodes FROM STDIN (FORMAT csv)");
+        PGCopyOutputStream out = new PGCopyOutputStream(PGCopyUtil.getWrappedConnection(conn.getJDBCConnection()), "COPY nodes FROM STDIN (FORMAT csv)");
 
         long start = System.currentTimeMillis();
 
@@ -124,7 +124,7 @@ public class PGCopyUtilTest {
         }
 
         // flush out nodes
-        PGCopyUtil.flushNodes(nodes, out);
+        PGCopyUtil.flushNodes(nodes, out, version);
 
         out.close();
 
@@ -134,7 +134,7 @@ public class PGCopyUtilTest {
 
         // check if database contains the nodes (based on ID)
 
-        PreparedStatement stmt = con.getJDBCConnection().prepareStatement("SELECT * FROM nodes WHERE id = ?");
+        PreparedStatement stmt = conn.getJDBCConnection().prepareStatement("SELECT * FROM nodes WHERE id = ?");
         for(int i=0; i<nodes.size(); i++) {
             stmt.setLong(1, (long)i);
             ResultSet dbResult = stmt.executeQuery();
@@ -146,6 +146,19 @@ public class PGCopyUtilTest {
     }
 
     /**
+     * Return the schema version if necessary
+     *
+     * @return
+     * @throws SQLException
+     */
+    protected int getDbVersion() throws SQLException {
+        final KiWiConnection conn = store.getPersistence().getConnection();
+        final String version = conn.getMetadata("version");
+        conn.close();
+        return Integer.parseInt(version);
+    }
+
+    /**
      * Return a random URI, with a 10% chance of returning a URI that has already been used.
      * @return
      */
@@ -155,7 +168,6 @@ public class PGCopyUtilTest {
         return r;
     }
 
-
     protected static KiWiUriResource createURI(String uri) {
         KiWiUriResource r = new KiWiUriResource(uri);
         r.setId(id++);
@@ -197,5 +209,4 @@ public class PGCopyUtilTest {
         return r;
     }
 
-
 }

http://git-wip-us.apache.org/repos/asf/marmotta/blob/fc5bb28d/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java
index 04e819d..bfef466 100644
--- a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java
+++ b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java
@@ -202,7 +202,6 @@ public class KiWiConnection implements AutoCloseable {
      */
     public Connection getJDBCConnection() throws SQLException {
         requireJDBCConnection();
-
         return connection;
     }
 

http://git-wip-us.apache.org/repos/asf/marmotta/blob/fc5bb28d/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/sail/KiWiStore.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/sail/KiWiStore.java b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/sail/KiWiStore.java
index 2ac179e..bf4f977 100644
--- a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/sail/KiWiStore.java
+++ b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/sail/KiWiStore.java
@@ -59,10 +59,8 @@ public class KiWiStore extends NotifyingSailBase {
      */
     private String inferredContext;
 
-
     private boolean initialized = false;
 
-
     /**
      * Drop databases when shutdown is called. This option is mostly useful for testing.
      */
@@ -72,8 +70,6 @@ public class KiWiStore extends NotifyingSailBase {
         this.persistence    = persistence;
         this.defaultContext = defaultContext;
         this.inferredContext = inferredContext;
-
-
     }
 
     @Deprecated
@@ -130,7 +126,6 @@ public class KiWiStore extends NotifyingSailBase {
         return persistence;
     }
 
-
     /**
      * Drop databases when shutdown is called. This option is mostly useful for testing.
      */
@@ -231,4 +226,5 @@ public class KiWiStore extends NotifyingSailBase {
             throw new SailException("error calling consistency check",e);
         }
     }
+
 }