You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@marmotta.apache.org by wi...@apache.org on 2015/10/12 18:07:59 UTC
marmotta git commit: MARMOTTA-584: dynamically adapted the low-level
loader statements
Repository: marmotta
Updated Branches:
refs/heads/MARMOTTA-584 fa32ed2a7 -> fc5bb28de
MARMOTTA-584: dynamically adapted the low-level loader statements
Project: http://git-wip-us.apache.org/repos/asf/marmotta/repo
Commit: http://git-wip-us.apache.org/repos/asf/marmotta/commit/fc5bb28d
Tree: http://git-wip-us.apache.org/repos/asf/marmotta/tree/fc5bb28d
Diff: http://git-wip-us.apache.org/repos/asf/marmotta/diff/fc5bb28d
Branch: refs/heads/MARMOTTA-584
Commit: fc5bb28dea7ca1a3a66818591f49eabf0d81d6e6
Parents: fa32ed2
Author: Sergio Fernández <wi...@apache.org>
Authored: Mon Oct 12 18:07:39 2015 +0200
Committer: Sergio Fernández <wi...@apache.org>
Committed: Mon Oct 12 18:07:39 2015 +0200
----------------------------------------------------------------------
.../kiwi/loader/generic/KiWiBatchHandler.java | 26 ++++++------
.../kiwi/loader/pgsql/KiWiPostgresHandler.java | 13 +++---
.../marmotta/kiwi/loader/pgsql/PGCopyUtil.java | 43 ++++++++++++++------
.../marmotta/kiwi/loader/KiWiHandlerTest.java | 23 ++++++++---
.../marmotta/kiwi/loader/PGCopyUtilTest.java | 29 +++++++++----
.../kiwi/persistence/KiWiConnection.java | 1 -
.../apache/marmotta/kiwi/sail/KiWiStore.java | 6 +--
7 files changed, 90 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/marmotta/blob/fc5bb28d/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/generic/KiWiBatchHandler.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/generic/KiWiBatchHandler.java b/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/generic/KiWiBatchHandler.java
index 8dc0c14..a28649d 100644
--- a/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/generic/KiWiBatchHandler.java
+++ b/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/generic/KiWiBatchHandler.java
@@ -20,6 +20,7 @@ package org.apache.marmotta.kiwi.loader.generic;
import org.apache.marmotta.commons.sesame.model.LiteralCommons;
import org.apache.marmotta.kiwi.loader.KiWiLoaderConfiguration;
import org.apache.marmotta.kiwi.model.rdf.*;
+import org.apache.marmotta.kiwi.persistence.KiWiConnection;
import org.apache.marmotta.kiwi.sail.KiWiStore;
import org.openrdf.model.Literal;
import org.openrdf.rio.RDFHandler;
@@ -43,7 +44,6 @@ public abstract class KiWiBatchHandler extends KiWiHandler implements RDFHandler
private static Logger log = LoggerFactory.getLogger(KiWiBatchHandler.class);
-
protected List<KiWiNode> nodeBacklog;
protected List<KiWiTriple> tripleBacklog;
@@ -51,7 +51,6 @@ public abstract class KiWiBatchHandler extends KiWiHandler implements RDFHandler
protected Map<String,KiWiUriResource> uriBacklogLookup;
protected Map<String,KiWiAnonResource> bnodeBacklogLookup;
-
protected String backend;
/**
@@ -63,11 +62,9 @@ public abstract class KiWiBatchHandler extends KiWiHandler implements RDFHandler
*/
public KiWiBatchHandler(String backend, KiWiStore store, KiWiLoaderConfiguration config) {
super(store, config);
-
this.backend = backend;
}
-
/**
* Perform initialisation, e.g. dropping indexes or other preparations.
*/
@@ -121,10 +118,8 @@ public abstract class KiWiBatchHandler extends KiWiHandler implements RDFHandler
this.bnodeBacklogLookup = new HashMap<>();
super.startRDF();
-
}
-
/**
* Signals the end of the RDF data. This method is called when all data has
* been reported.
@@ -140,10 +135,7 @@ public abstract class KiWiBatchHandler extends KiWiHandler implements RDFHandler
} catch (SQLException e) {
throw new RDFHandlerException(e);
}
-
-
super.endRDF();
-
}
@@ -220,14 +212,12 @@ public abstract class KiWiBatchHandler extends KiWiHandler implements RDFHandler
}
}
-
/**
* Flush the backlog (nodeBacklog and tripleBacklog) to the database; needs to be implemented by subclasses.
* @throws SQLException
*/
protected abstract void flushBacklogInternal() throws SQLException;
-
private synchronized void flushBacklog() throws SQLException {
flushBacklogInternal();
@@ -237,7 +227,6 @@ public abstract class KiWiBatchHandler extends KiWiHandler implements RDFHandler
uriBacklogLookup.clear();
bnodeBacklogLookup.clear();
literalBacklogLookup.clear();
-
}
/**
@@ -255,4 +244,17 @@ public abstract class KiWiBatchHandler extends KiWiHandler implements RDFHandler
*/
protected abstract void createIndexes() throws SQLException;
+ /**
+ * Return the schema version if necessary
+ *
+ * @return
+ * @throws SQLException
+ */
+ protected int getDbVersion() throws SQLException {
+ final KiWiConnection conn = store.getPersistence().getConnection();
+ final String version = conn.getMetadata("version");
+ conn.close();
+ return Integer.parseInt(version);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/marmotta/blob/fc5bb28d/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/KiWiPostgresHandler.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/KiWiPostgresHandler.java b/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/KiWiPostgresHandler.java
index 4f63efe..77b28c2 100644
--- a/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/KiWiPostgresHandler.java
+++ b/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/KiWiPostgresHandler.java
@@ -41,18 +41,22 @@ public class KiWiPostgresHandler extends KiWiBatchHandler implements RDFHandler
private static Logger log = LoggerFactory.getLogger(KiWiPostgresHandler.class);
+ private String columns = "id,ntype,svalue,dvalue,ivalue,tvalue,tzoffset,bvalue,ltype,lang,createdAt";
-
- public KiWiPostgresHandler(KiWiStore store, KiWiLoaderConfiguration config) {
+ public KiWiPostgresHandler(KiWiStore store, KiWiLoaderConfiguration config) throws SQLException {
super("PostgreSQL", store, config);
- }
+ final int version = getDbVersion();
+ if (version >= 5) {
+ columns += ",gvalue";
+ }
+ }
@Override
protected void flushBacklogInternal() throws SQLException {
try {
// flush out nodes
- PGCopyOutputStream nodesOut = new PGCopyOutputStream(PGCopyUtil.getWrappedConnection(connection.getJDBCConnection()), "COPY nodes(id,ntype,svalue,dvalue,ivalue,tvalue,tzoffset,bvalue,ltype,lang,createdAt) FROM STDIN (FORMAT csv)");
+ PGCopyOutputStream nodesOut = new PGCopyOutputStream(PGCopyUtil.getWrappedConnection(connection.getJDBCConnection()), "COPY nodes(" + columns + ") FROM STDIN (FORMAT csv)");
PGCopyUtil.flushNodes(nodeBacklog, nodesOut);
nodesOut.close();
@@ -65,7 +69,6 @@ public class KiWiPostgresHandler extends KiWiBatchHandler implements RDFHandler
}
}
-
@Override
protected void dropIndexes() throws SQLException {
try {
http://git-wip-us.apache.org/repos/asf/marmotta/blob/fc5bb28d/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/PGCopyUtil.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/PGCopyUtil.java b/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/PGCopyUtil.java
index 6483192..ec4fdc7 100644
--- a/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/PGCopyUtil.java
+++ b/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/PGCopyUtil.java
@@ -19,6 +19,7 @@ package org.apache.marmotta.kiwi.loader.pgsql;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.marmotta.kiwi.loader.csv.*;
import org.apache.marmotta.kiwi.model.rdf.*;
+import org.apache.marmotta.kiwi.persistence.KiWiDialect;
import org.joda.time.DateTime;
import org.openrdf.model.URI;
import org.postgresql.PGConnection;
@@ -43,7 +44,7 @@ import java.util.List;
import java.util.Locale;
/**
- * Postgres copy utility
+ * PostgreSQL copy utility
*
* @author Sebastian Schaffert (sschaffert@apache.org)
*/
@@ -65,7 +66,6 @@ public class PGCopyUtil {
new SQLTimestampProcessor(), // createdAt
};
-
final static CellProcessor[] tripleProcessors = new CellProcessor[] {
new NotNull(), // triple ID
new NodeIDProcessor(), // subject
@@ -94,19 +94,19 @@ public class PGCopyUtil {
}
}).build();
-
/**
- * Return a PGConnection wrapped by the tomcat connection pool so we are able to access PostgreSQL specific functionality.
- * @param con
+ * Return a PGConnection wrapped by the tomcat connection pool,
+ * so we are able to access PostgreSQL specific functionality.
+ *
+ * @param conn
* @return
*/
- public static PGConnection getWrappedConnection(Connection con) throws SQLException {
- if(con instanceof PGConnection) {
- return (PGConnection)con;
+ public static PGConnection getWrappedConnection(Connection conn) throws SQLException {
+ if(conn instanceof PGConnection) {
+ return (PGConnection)conn;
} else {
- return (PGConnection) ((javax.sql.PooledConnection)con).getConnection();
+ return (PGConnection) ((javax.sql.PooledConnection)conn).getConnection();
}
-
}
public static void flushTriples(Iterable<KiWiTriple> tripleBacklog, OutputStream out) throws IOException {
@@ -134,10 +134,14 @@ public class PGCopyUtil {
}
public static void flushNodes(Iterable<KiWiNode> nodeBacklog, OutputStream out) throws IOException {
+ flushNodes(nodeBacklog, out, KiWiDialect.VERSION);
+ }
+
+ public static void flushNodes(Iterable<KiWiNode> nodeBacklog, OutputStream out, int version) throws IOException {
CsvListWriter writer = new CsvListWriter(new OutputStreamWriter(out), nodesPreference);
// reuse the same array to avoid unnecessary object allocation
- Object[] rowArray = new Object[12]; //FIXME: 11 in schema v4, 12 in v5
+ Object[] rowArray = new Object[version >= 5 ? 12 : 11]; //schema v5 adds a new 'geom' column
List<Object> row = Arrays.asList(rowArray);
for(KiWiNode n : nodeBacklog) {
@@ -179,7 +183,7 @@ public class PGCopyUtil {
log.warn("unknown node type, cannot flush to import stream: {}", n.getClass());
}
- writer.write(row, nodeProcessors);
+ writer.write(row, getNodeProcessors(version));
}
writer.close();
}
@@ -196,7 +200,20 @@ public class PGCopyUtil {
a[8] = dtype;
a[9] = lang != null ? lang.getLanguage() : "";
a[10] = created;
- a[11] = geom; //FIXME: drop in v4 schema testing
+
+ if (a.length == 12) {
+ a[11] = geom; //schema v5
+ }
+ }
+
+ public static CellProcessor[] getNodeProcessors(int version) {
+ if (version >= 5) {
+ CellProcessor[] newNodeProcessors = Arrays.copyOf(nodeProcessors, nodeProcessors.length+1);
+ newNodeProcessors[nodeProcessors.length] = new Optional();
+ return newNodeProcessors;
+ } else {
+ return nodeProcessors;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/marmotta/blob/fc5bb28d/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/KiWiHandlerTest.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/KiWiHandlerTest.java b/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/KiWiHandlerTest.java
index 1f54a2a..9ab4fd2 100644
--- a/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/KiWiHandlerTest.java
+++ b/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/KiWiHandlerTest.java
@@ -21,6 +21,7 @@ import org.apache.marmotta.kiwi.config.KiWiConfiguration;
import org.apache.marmotta.kiwi.loader.generic.KiWiHandler;
import org.apache.marmotta.kiwi.loader.mysql.KiWiMySQLHandler;
import org.apache.marmotta.kiwi.loader.pgsql.KiWiPostgresHandler;
+import org.apache.marmotta.kiwi.persistence.KiWiConnection;
import org.apache.marmotta.kiwi.persistence.mysql.MySQLDialect;
import org.apache.marmotta.kiwi.persistence.pgsql.PostgreSQLDialect;
import org.apache.marmotta.kiwi.sail.KiWiStore;
@@ -53,6 +54,8 @@ import java.util.List;
@RunWith(KiWiDatabaseRunner.class)
public class KiWiHandlerTest {
+ final Logger logger = LoggerFactory.getLogger(this.getClass());
+
private KiWiStore store;
private Repository repository;
@@ -64,7 +67,6 @@ public class KiWiHandlerTest {
dbConfig.setFulltextLanguages(new String[] {"en"});
}
-
@Before
public void initDatabase() throws RepositoryException, IOException, RDFParseException, SailException {
store = new KiWiStore(dbConfig);
@@ -78,9 +80,6 @@ public class KiWiHandlerTest {
repository.shutDown();
}
- final Logger logger =
- LoggerFactory.getLogger(this.getClass());
-
@Rule
public TestWatcher watchman = new TestWatcher() {
/**
@@ -131,8 +130,7 @@ public class KiWiHandlerTest {
}
-
- private void testImport(KiWiLoaderConfiguration c, String file, RDFFormat fmt) throws RDFParseException, IOException, RDFHandlerException {
+ private void testImport(KiWiLoaderConfiguration c, String file, RDFFormat fmt) throws RDFParseException, IOException, RDFHandlerException, SQLException {
KiWiHandler handler;
if(store.getPersistence().getDialect() instanceof PostgreSQLDialect) {
handler = new KiWiPostgresHandler(store, c);
@@ -174,4 +172,17 @@ public class KiWiHandlerTest {
}
+ /**
+ * Return the schema version if necessary
+ *
+ * @return
+ * @throws SQLException
+ */
+ protected int getDbVersion() throws SQLException {
+ final KiWiConnection conn = store.getPersistence().getConnection();
+ final String version = conn.getMetadata("version");
+ conn.close();
+ return Integer.parseInt(version);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/marmotta/blob/fc5bb28d/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/PGCopyUtilTest.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/PGCopyUtilTest.java b/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/PGCopyUtilTest.java
index e7f27a9..64f76ee 100644
--- a/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/PGCopyUtilTest.java
+++ b/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/PGCopyUtilTest.java
@@ -69,8 +69,6 @@ public class PGCopyUtilTest {
final static KiWiUriResource TYPE_DATE = createURI(XSD.DateTime.stringValue());
final static KiWiStringLiteral EMPTY = createLiteral("");
-
-
private KiWiStore store;
private SailRepository repository;
@@ -95,7 +93,7 @@ public class PGCopyUtilTest {
log.info("cleaning up test setup...");
if (store != null && store.isInitialized()) {
try {
- assertTrue(store.checkConsistency());
+ assertTrue(store.checkConsistency());
} finally {
repository.shutDown();
}
@@ -104,9 +102,11 @@ public class PGCopyUtilTest {
@Test
public void testWriteNodes() throws IOException, SQLException {
- KiWiConnection con = store.getPersistence().getConnection();
+ final int version = getDbVersion();
+
+ KiWiConnection conn = store.getPersistence().getConnection();
- PGCopyOutputStream out = new PGCopyOutputStream(PGCopyUtil.getWrappedConnection(con.getJDBCConnection()), "COPY nodes FROM STDIN (FORMAT csv)");
+ PGCopyOutputStream out = new PGCopyOutputStream(PGCopyUtil.getWrappedConnection(conn.getJDBCConnection()), "COPY nodes FROM STDIN (FORMAT csv)");
long start = System.currentTimeMillis();
@@ -124,7 +124,7 @@ public class PGCopyUtilTest {
}
// flush out nodes
- PGCopyUtil.flushNodes(nodes, out);
+ PGCopyUtil.flushNodes(nodes, out, version);
out.close();
@@ -134,7 +134,7 @@ public class PGCopyUtilTest {
// check if database contains the nodes (based on ID)
- PreparedStatement stmt = con.getJDBCConnection().prepareStatement("SELECT * FROM nodes WHERE id = ?");
+ PreparedStatement stmt = conn.getJDBCConnection().prepareStatement("SELECT * FROM nodes WHERE id = ?");
for(int i=0; i<nodes.size(); i++) {
stmt.setLong(1, (long)i);
ResultSet dbResult = stmt.executeQuery();
@@ -146,6 +146,19 @@ public class PGCopyUtilTest {
}
/**
+ * Return the schema version if necessary
+ *
+ * @return
+ * @throws SQLException
+ */
+ protected int getDbVersion() throws SQLException {
+ final KiWiConnection conn = store.getPersistence().getConnection();
+ final String version = conn.getMetadata("version");
+ conn.close();
+ return Integer.parseInt(version);
+ }
+
+ /**
* Return a random URI, with a 10% chance of returning a URI that has already been used.
* @return
*/
@@ -155,7 +168,6 @@ public class PGCopyUtilTest {
return r;
}
-
protected static KiWiUriResource createURI(String uri) {
KiWiUriResource r = new KiWiUriResource(uri);
r.setId(id++);
@@ -197,5 +209,4 @@ public class PGCopyUtilTest {
return r;
}
-
}
http://git-wip-us.apache.org/repos/asf/marmotta/blob/fc5bb28d/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java
index 04e819d..bfef466 100644
--- a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java
+++ b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java
@@ -202,7 +202,6 @@ public class KiWiConnection implements AutoCloseable {
*/
public Connection getJDBCConnection() throws SQLException {
requireJDBCConnection();
-
return connection;
}
http://git-wip-us.apache.org/repos/asf/marmotta/blob/fc5bb28d/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/sail/KiWiStore.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/sail/KiWiStore.java b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/sail/KiWiStore.java
index 2ac179e..bf4f977 100644
--- a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/sail/KiWiStore.java
+++ b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/sail/KiWiStore.java
@@ -59,10 +59,8 @@ public class KiWiStore extends NotifyingSailBase {
*/
private String inferredContext;
-
private boolean initialized = false;
-
/**
* Drop databases when shutdown is called. This option is mostly useful for testing.
*/
@@ -72,8 +70,6 @@ public class KiWiStore extends NotifyingSailBase {
this.persistence = persistence;
this.defaultContext = defaultContext;
this.inferredContext = inferredContext;
-
-
}
@Deprecated
@@ -130,7 +126,6 @@ public class KiWiStore extends NotifyingSailBase {
return persistence;
}
-
/**
* Drop databases when shutdown is called. This option is mostly useful for testing.
*/
@@ -231,4 +226,5 @@ public class KiWiStore extends NotifyingSailBase {
throw new SailException("error calling consistency check",e);
}
}
+
}