You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@marmotta.apache.org by ss...@apache.org on 2013/11/15 20:46:51 UTC
[1/3] git commit: towards fast bulk importing into PostgreSQL: using
CSV streams and COPY
Updated Branches:
refs/heads/develop 531b15164 -> 1d9325dc9
towards fast bulk importing into PostgreSQL: using CSV streams and COPY
Project: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/commit/be5eddcc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/tree/be5eddcc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/diff/be5eddcc
Branch: refs/heads/develop
Commit: be5eddccbf8969badce9e4062a63258f499e2a8f
Parents: 4232ded
Author: Sebastian Schaffert <ss...@apache.org>
Authored: Fri Nov 15 18:54:17 2013 +0100
Committer: Sebastian Schaffert <ss...@apache.org>
Committed: Fri Nov 15 18:54:17 2013 +0100
----------------------------------------------------------------------
libraries/kiwi/kiwi-loader/pom.xml | 8 +-
.../marmotta/kiwi/loader/KiWiHandler.java | 391 ------------------
.../apache/marmotta/kiwi/loader/KiWiLoader.java | 1 +
.../kiwi/loader/generic/KiWiHandler.java | 392 +++++++++++++++++++
.../kiwi/loader/pgsql/KiWiPostgresHandler.java | 111 ++++++
.../marmotta/kiwi/loader/pgsql/csv/CSVUtil.java | 125 ++++++
.../loader/pgsql/csv/LanguageProcessor.java | 48 +++
.../kiwi/loader/pgsql/csv/NodeIDProcessor.java | 49 +++
.../loader/pgsql/csv/NodeTypeProcessor.java | 70 ++++
.../loader/pgsql/csv/SQLBooleanProcessor.java | 51 +++
.../kiwi/loader/pgsql/csv/SQLDateProcessor.java | 51 +++
.../loader/pgsql/csv/SQLTimestampProcessor.java | 36 ++
.../marmotta/kiwi/loader/CSVUtilTest.java | 118 ++++++
.../marmotta/kiwi/loader/KiWiHandlerTest.java | 1 +
.../evaluation/KiWiEvaluationStrategyImpl.java | 4 +
.../kiwi/test/junit/KiWiDatabaseRunner.java | 25 +-
parent/pom.xml | 5 +
17 files changed, 1083 insertions(+), 403 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/be5eddcc/libraries/kiwi/kiwi-loader/pom.xml
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-loader/pom.xml b/libraries/kiwi/kiwi-loader/pom.xml
index 59e225a..8e964b5 100644
--- a/libraries/kiwi/kiwi-loader/pom.xml
+++ b/libraries/kiwi/kiwi-loader/pom.xml
@@ -73,7 +73,6 @@
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
- <version>1.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.marmotta</groupId>
@@ -101,6 +100,13 @@
<artifactId>slf4j-api</artifactId>
</dependency>
+ <!-- PostgreSQL bulk importing -->
+ <dependency>
+ <groupId>net.sf.supercsv</groupId>
+ <artifactId>super-csv</artifactId>
+ <version>2.1.0</version>
+ </dependency>
+
<dependency>
<groupId>com.h2database</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/be5eddcc/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/KiWiHandler.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/KiWiHandler.java b/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/KiWiHandler.java
deleted file mode 100644
index 19c9bd6..0000000
--- a/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/KiWiHandler.java
+++ /dev/null
@@ -1,391 +0,0 @@
-package org.apache.marmotta.kiwi.loader;
-
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import org.apache.marmotta.commons.sesame.model.Namespaces;
-import org.apache.marmotta.commons.util.DateUtils;
-import org.apache.marmotta.kiwi.model.rdf.*;
-import org.apache.marmotta.kiwi.persistence.KiWiConnection;
-import org.apache.marmotta.kiwi.sail.KiWiStore;
-import org.openrdf.model.BNode;
-import org.openrdf.model.Literal;
-import org.openrdf.model.Statement;
-import org.openrdf.model.URI;
-import org.openrdf.model.Value;
-import org.openrdf.model.impl.URIImpl;
-import org.openrdf.rio.RDFHandler;
-import org.openrdf.rio.RDFHandlerException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.sql.SQLException;
-import java.util.Date;
-import java.util.IllformedLocaleException;
-import java.util.Locale;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-
-/**
- * A fast-lane RDF import handler that allows bulk-importing triples into a KiWi triplestore. It directly accesses
- * the database using a KiWiConnection. Note that certain configuration options will make the import "unsafe"
- * because they turn off expensive existance checks. If you are not careful and import the same data twice, this
- * might mean duplicate entries in the database.
- *
- * @author Sebastian Schaffert (sschaffert@apache.org)
- */
-public class KiWiHandler implements RDFHandler {
-
- private static Logger log = LoggerFactory.getLogger(KiWiHandler.class);
-
- private KiWiConnection connection;
- private KiWiStore store;
-
- long count = 0;
- long start = 0;
- long previous = 0;
-
- private KiWiLoaderConfiguration config;
-
- private LoadingCache<Literal, KiWiLiteral> literalCache;
- private LoadingCache<URI, KiWiUriResource> uriCache;
- private LoadingCache<BNode, KiWiAnonResource> bnodeCache;
- private LoadingCache<String,Locale> localeCache;
-
- // if non-null, all imported statements will have this context (regardless whether they specified a different context)
- private KiWiResource overrideContext;
-
- public KiWiHandler(KiWiStore store, KiWiLoaderConfiguration config) {
- this.config = config;
- this.store = store;
-
- this.literalCache = CacheBuilder.newBuilder()
- .maximumSize(100000)
- .expireAfterAccess(10, TimeUnit.MINUTES)
- .build(new CacheLoader<Literal, KiWiLiteral>() {
- @Override
- public KiWiLiteral load(Literal l) throws Exception {
- return createLiteral(l);
- }
- });
-
- this.uriCache = CacheBuilder.newBuilder()
- .maximumSize(500000)
- .expireAfterAccess(10, TimeUnit.MINUTES)
- .build(new CacheLoader<URI, KiWiUriResource>() {
- @Override
- public KiWiUriResource load(URI key) throws Exception {
- return createURI(key.stringValue());
- }
- });
-
- this.bnodeCache = CacheBuilder.newBuilder()
- .maximumSize(10000)
- .expireAfterAccess(10, TimeUnit.MINUTES)
- .build(new CacheLoader<BNode, KiWiAnonResource>() {
- @Override
- public KiWiAnonResource load(BNode key) throws Exception {
- return createBNode(key.stringValue());
- }
- });
-
- this.localeCache = CacheBuilder.newBuilder()
- .maximumSize(100)
- .build(new CacheLoader<String, Locale>() {
- @Override
- public Locale load(String lang) throws Exception {
- try {
- Locale.Builder builder = new Locale.Builder();
- builder.setLanguageTag(lang);
- return builder.build();
- } catch (IllformedLocaleException ex) {
- log.warn("malformed language literal (language: {})", lang);
- return null;
- }
- }
- });
-
- }
-
- /**
- * Signals the end of the RDF data. This method is called when all data has
- * been reported.
- *
- * @throws org.openrdf.rio.RDFHandlerException
- * If the RDF handler has encountered an unrecoverable error.
- */
- @Override
- public void endRDF() throws RDFHandlerException {
- try {
- connection.commit();
- connection.close();
- } catch (SQLException e) {
- throw new RDFHandlerException(e);
- }
-
- log.info("KiWiLoader: RDF bulk import of {} triples finished after {} ms", count, System.currentTimeMillis() - start);
- }
-
- /**
- * Signals the start of the RDF data. This method is called before any data
- * is reported.
- *
- * @throws org.openrdf.rio.RDFHandlerException
- * If the RDF handler has encountered an unrecoverable error.
- */
- @Override
- public void startRDF() throws RDFHandlerException {
- log.info("KiWiLoader: starting RDF bulk import");
- try {
- this.connection = store.getPersistence().getConnection();
- } catch (SQLException e) {
- throw new RDFHandlerException(e);
- }
-
- this.start = System.currentTimeMillis();
- this.previous = System.currentTimeMillis();
-
- if(config.getContext() != null) {
- try {
- this.overrideContext = (KiWiResource)convertNode(new URIImpl(config.getContext()));
- } catch (ExecutionException e) {
- log.error("could not create/load resource",e);
- }
- }
- }
-
- /**
- * Handles a namespace declaration/definition. A namespace declaration
- * associates a (short) prefix string with the namespace's URI. The prefix
- * for default namespaces, which do not have an associated prefix, are
- * represented as empty strings.
- *
- * @param prefix The prefix for the namespace, or an empty string in case of a
- * default namespace.
- * @param uri The URI that the prefix maps to.
- * @throws org.openrdf.rio.RDFHandlerException
- * If the RDF handler has encountered an unrecoverable error.
- */
- @Override
- public void handleNamespace(String prefix, String uri) throws RDFHandlerException {
- try {
- connection.storeNamespace(new KiWiNamespace(prefix,uri));
- } catch (SQLException e) {
- throw new RDFHandlerException(e);
- }
- }
-
- /**
- * Handles a statement.
- *
- * @param st The statement.
- * @throws org.openrdf.rio.RDFHandlerException
- * If the RDF handler has encountered an unrecoverable error.
- */
- @Override
- public void handleStatement(Statement st) throws RDFHandlerException {
- try {
- KiWiResource subject = (KiWiResource)convertNode(st.getSubject());
- KiWiUriResource predicate = (KiWiUriResource)convertNode(st.getPredicate());
- KiWiNode object = convertNode(st.getObject());
- KiWiResource context;
-
- if(this.overrideContext != null) {
- context = this.overrideContext;
- } else {
- context = (KiWiResource)convertNode(st.getContext());
- }
-
- KiWiTriple result = new KiWiTriple(subject,predicate,object,context);
- if(config.isStatementExistanceCheck()) {
- result.setId(connection.getTripleId(subject, predicate, object, context, true));
- }
- connection.storeTriple(result);
-
- count++;
-
- if(count % config.getCommitBatchSize() == 0) {
- connection.commit();
-
- log.info("imported {} triples ({}/sec)", count, (config.getCommitBatchSize() * 1000) / (System.currentTimeMillis() - previous));
- previous = System.currentTimeMillis();
- }
- } catch (SQLException | ExecutionException e) {
- throw new RDFHandlerException(e);
- }
-
- }
-
-
- private KiWiNode convertNode(Value value) throws ExecutionException {
- if(value == null) {
- return null;
- } else if(value instanceof KiWiNode) {
- return (KiWiNode)value;
- } else if(value instanceof URI) {
- return uriCache.get((URI)value);
- } else if(value instanceof BNode) {
- return bnodeCache.get(((BNode)value));
- } else if(value instanceof Literal) {
- Literal l = (Literal)value;
- return literalCache.get(l);
- } else {
- throw new IllegalArgumentException("the value passed as argument does not have the correct type");
- }
-
- }
-
- private KiWiLiteral createLiteral(Literal l) throws ExecutionException {
- String value = l.getLabel();
- String lang = l.getLanguage();
- URI type = l.getDatatype();
-
-
- Locale locale;
- if(lang != null) {
- locale = localeCache.get(lang);
- } else {
- locale = null;
- }
- if(locale == null) {
- lang = null;
- }
-
-
- KiWiLiteral result;
- final KiWiUriResource rtype = type==null ? null : uriCache.get(type);
-
- try {
-
- try {
- // differentiate between the different types of the value
- if (type == null) {
- // FIXME: MARMOTTA-39 (this is to avoid a NullPointerException in the following if-clauses)
- result = connection.loadLiteral(value.toString(), lang, rtype);
-
- if(result == null) {
- result = new KiWiStringLiteral(value.toString(), locale, rtype);
- }
- } else if(type.equals(Namespaces.NS_XSD+"dateTime")) {
- // parse if necessary
- final Date dvalue = DateUtils.parseDate(value.toString());
-
- result = connection.loadLiteral(dvalue);
-
- if(result == null) {
- result= new KiWiDateLiteral(dvalue, rtype);
- }
- } else if(type.equals(Namespaces.NS_XSD+"integer") || type.equals(Namespaces.NS_XSD+"long")) {
- long ivalue = Long.parseLong(value.toString());
-
- result = connection.loadLiteral(ivalue);
-
- if(result == null) {
- result= new KiWiIntLiteral(ivalue, rtype);
- }
- } else if(type.equals(Namespaces.NS_XSD+"double") || type.equals(Namespaces.NS_XSD+"float")) {
- double dvalue = Double.parseDouble(value.toString());
-
- result = connection.loadLiteral(dvalue);
-
- if(result == null) {
- result= new KiWiDoubleLiteral(dvalue, rtype);
- }
- } else if(type.equals(Namespaces.NS_XSD+"boolean")) {
- boolean bvalue = Boolean.parseBoolean(value.toString());
-
- result = connection.loadLiteral(bvalue);
-
- if(result == null) {
- result= new KiWiBooleanLiteral(bvalue, rtype);
- }
- } else {
- result = connection.loadLiteral(value.toString(), lang, rtype);
-
- if(result == null) {
- result = new KiWiStringLiteral(value.toString(), locale, rtype);
- }
- }
- } catch(IllegalArgumentException ex) {
- // malformed number or date
- log.warn("malformed argument for typed literal of type {}: {}", rtype.stringValue(), value);
- KiWiUriResource mytype = createURI(Namespaces.NS_XSD+"string");
-
- result = connection.loadLiteral(value.toString(), lang, mytype);
-
- if(result == null) {
- result = new KiWiStringLiteral(value.toString(), locale, mytype);
- }
-
- }
-
- if(result.getId() == null) {
- connection.storeNode(result, false);
- }
-
- return result;
-
-
- } catch (SQLException e) {
- log.error("database error, could not load literal",e);
- throw new IllegalStateException("database error, could not load literal",e);
- }
- }
-
- private KiWiUriResource createURI(String uri) {
- try {
- // first look in the registry for newly created resources if the resource has already been created and
- // is still volatile
- KiWiUriResource result = connection.loadUriResource(uri);
-
- if(result == null) {
- result = new KiWiUriResource(uri);
-
- connection.storeNode(result, false);
-
- }
- if(result.getId() == null) {
- log.error("node ID is null!");
- }
-
- return result;
- } catch (SQLException e) {
- log.error("database error, could not load URI resource",e);
- throw new IllegalStateException("database error, could not load URI resource",e);
- }
- }
-
- private KiWiAnonResource createBNode(String nodeID) {
- try {
- // first look in the registry for newly created resources if the resource has already been created and
- // is still volatile
- KiWiAnonResource result = connection.loadAnonResource(nodeID);
-
- if(result == null) {
- result = new KiWiAnonResource(nodeID);
- connection.storeNode(result, false);
- }
- if(result.getId() == null) {
- log.error("node ID is null!");
- }
-
- return result;
- } catch (SQLException e) {
- log.error("database error, could not load anonymous resource",e);
- throw new IllegalStateException("database error, could not load anonymous resource",e);
- }
- }
-
-
-
- /**
- * Handles a comment.
- *
- * @param comment The comment.
- * @throws org.openrdf.rio.RDFHandlerException
- * If the RDF handler has encountered an unrecoverable error.
- */
- @Override
- public void handleComment(String comment) throws RDFHandlerException {
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/be5eddcc/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/KiWiLoader.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/KiWiLoader.java b/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/KiWiLoader.java
index 67b7024..c15b3db 100644
--- a/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/KiWiLoader.java
+++ b/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/KiWiLoader.java
@@ -30,6 +30,7 @@ import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.marmotta.kiwi.config.KiWiConfiguration;
+import org.apache.marmotta.kiwi.loader.generic.KiWiHandler;
import org.apache.marmotta.kiwi.persistence.KiWiDialect;
import org.apache.marmotta.kiwi.persistence.h2.H2Dialect;
import org.apache.marmotta.kiwi.persistence.mysql.MySQLDialect;
http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/be5eddcc/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/generic/KiWiHandler.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/generic/KiWiHandler.java b/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/generic/KiWiHandler.java
new file mode 100644
index 0000000..f289c4e
--- /dev/null
+++ b/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/generic/KiWiHandler.java
@@ -0,0 +1,392 @@
+package org.apache.marmotta.kiwi.loader.generic;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import org.apache.marmotta.commons.sesame.model.Namespaces;
+import org.apache.marmotta.commons.util.DateUtils;
+import org.apache.marmotta.kiwi.loader.KiWiLoaderConfiguration;
+import org.apache.marmotta.kiwi.model.rdf.*;
+import org.apache.marmotta.kiwi.persistence.KiWiConnection;
+import org.apache.marmotta.kiwi.sail.KiWiStore;
+import org.openrdf.model.BNode;
+import org.openrdf.model.Literal;
+import org.openrdf.model.Statement;
+import org.openrdf.model.URI;
+import org.openrdf.model.Value;
+import org.openrdf.model.impl.URIImpl;
+import org.openrdf.rio.RDFHandler;
+import org.openrdf.rio.RDFHandlerException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.SQLException;
+import java.util.Date;
+import java.util.IllformedLocaleException;
+import java.util.Locale;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A fast-lane RDF import handler that allows bulk-importing triples into a KiWi triplestore. It directly accesses
+ * the database using a KiWiConnection. Note that certain configuration options will make the import "unsafe"
+ * because they turn off expensive existence checks. If you are not careful and import the same data twice, this
+ * might mean duplicate entries in the database.
+ *
+ * @author Sebastian Schaffert (sschaffert@apache.org)
+ */
+public class KiWiHandler implements RDFHandler {
+
+ private static Logger log = LoggerFactory.getLogger(KiWiHandler.class);
+
+ private KiWiConnection connection;
+ private KiWiStore store;
+
+ long count = 0;
+ long start = 0;
+ long previous = 0;
+
+ private KiWiLoaderConfiguration config;
+
+ private LoadingCache<Literal, KiWiLiteral> literalCache;
+ private LoadingCache<URI, KiWiUriResource> uriCache;
+ private LoadingCache<BNode, KiWiAnonResource> bnodeCache;
+ private LoadingCache<String,Locale> localeCache;
+
+ // if non-null, all imported statements will have this context (regardless whether they specified a different context)
+ private KiWiResource overrideContext;
+
+ public KiWiHandler(KiWiStore store, KiWiLoaderConfiguration config) {
+ this.config = config;
+ this.store = store;
+
+ this.literalCache = CacheBuilder.newBuilder()
+ .maximumSize(100000)
+ .expireAfterAccess(10, TimeUnit.MINUTES)
+ .build(new CacheLoader<Literal, KiWiLiteral>() {
+ @Override
+ public KiWiLiteral load(Literal l) throws Exception {
+ return createLiteral(l);
+ }
+ });
+
+ this.uriCache = CacheBuilder.newBuilder()
+ .maximumSize(500000)
+ .expireAfterAccess(10, TimeUnit.MINUTES)
+ .build(new CacheLoader<URI, KiWiUriResource>() {
+ @Override
+ public KiWiUriResource load(URI key) throws Exception {
+ return createURI(key.stringValue());
+ }
+ });
+
+ this.bnodeCache = CacheBuilder.newBuilder()
+ .maximumSize(10000)
+ .expireAfterAccess(10, TimeUnit.MINUTES)
+ .build(new CacheLoader<BNode, KiWiAnonResource>() {
+ @Override
+ public KiWiAnonResource load(BNode key) throws Exception {
+ return createBNode(key.stringValue());
+ }
+ });
+
+ this.localeCache = CacheBuilder.newBuilder()
+ .maximumSize(100)
+ .build(new CacheLoader<String, Locale>() {
+ @Override
+ public Locale load(String lang) throws Exception {
+ try {
+ Locale.Builder builder = new Locale.Builder();
+ builder.setLanguageTag(lang);
+ return builder.build();
+ } catch (IllformedLocaleException ex) {
+ log.warn("malformed language literal (language: {})", lang);
+ return null;
+ }
+ }
+ });
+
+ }
+
+ /**
+ * Signals the end of the RDF data. This method is called when all data has
+ * been reported.
+ *
+ * @throws org.openrdf.rio.RDFHandlerException
+ * If the RDF handler has encountered an unrecoverable error.
+ */
+ @Override
+ public void endRDF() throws RDFHandlerException {
+ try {
+ connection.commit();
+ connection.close();
+ } catch (SQLException e) {
+ throw new RDFHandlerException(e);
+ }
+
+ log.info("KiWiLoader: RDF bulk import of {} triples finished after {} ms", count, System.currentTimeMillis() - start);
+ }
+
+ /**
+ * Signals the start of the RDF data. This method is called before any data
+ * is reported.
+ *
+ * @throws org.openrdf.rio.RDFHandlerException
+ * If the RDF handler has encountered an unrecoverable error.
+ */
+ @Override
+ public void startRDF() throws RDFHandlerException {
+ log.info("KiWiLoader: starting RDF bulk import");
+ try {
+ this.connection = store.getPersistence().getConnection();
+ } catch (SQLException e) {
+ throw new RDFHandlerException(e);
+ }
+
+ this.start = System.currentTimeMillis();
+ this.previous = System.currentTimeMillis();
+
+ if(config.getContext() != null) {
+ try {
+ this.overrideContext = (KiWiResource)convertNode(new URIImpl(config.getContext()));
+ } catch (ExecutionException e) {
+ log.error("could not create/load resource",e);
+ }
+ }
+ }
+
+ /**
+ * Handles a namespace declaration/definition. A namespace declaration
+ * associates a (short) prefix string with the namespace's URI. The prefix
+ * for default namespaces, which do not have an associated prefix, are
+ * represented as empty strings.
+ *
+ * @param prefix The prefix for the namespace, or an empty string in case of a
+ * default namespace.
+ * @param uri The URI that the prefix maps to.
+ * @throws org.openrdf.rio.RDFHandlerException
+ * If the RDF handler has encountered an unrecoverable error.
+ */
+ @Override
+ public void handleNamespace(String prefix, String uri) throws RDFHandlerException {
+ try {
+ connection.storeNamespace(new KiWiNamespace(prefix,uri));
+ } catch (SQLException e) {
+ throw new RDFHandlerException(e);
+ }
+ }
+
+ /**
+ * Handles a statement.
+ *
+ * @param st The statement.
+ * @throws org.openrdf.rio.RDFHandlerException
+ * If the RDF handler has encountered an unrecoverable error.
+ */
+ @Override
+ public void handleStatement(Statement st) throws RDFHandlerException {
+ try {
+ KiWiResource subject = (KiWiResource)convertNode(st.getSubject());
+ KiWiUriResource predicate = (KiWiUriResource)convertNode(st.getPredicate());
+ KiWiNode object = convertNode(st.getObject());
+ KiWiResource context;
+
+ if(this.overrideContext != null) {
+ context = this.overrideContext;
+ } else {
+ context = (KiWiResource)convertNode(st.getContext());
+ }
+
+ KiWiTriple result = new KiWiTriple(subject,predicate,object,context);
+ if(config.isStatementExistanceCheck()) {
+ result.setId(connection.getTripleId(subject, predicate, object, context, true));
+ }
+ connection.storeTriple(result);
+
+ count++;
+
+ if(count % config.getCommitBatchSize() == 0) {
+ connection.commit();
+
+ log.info("imported {} triples ({}/sec)", count, (config.getCommitBatchSize() * 1000) / (System.currentTimeMillis() - previous));
+ previous = System.currentTimeMillis();
+ }
+ } catch (SQLException | ExecutionException e) {
+ throw new RDFHandlerException(e);
+ }
+
+ }
+
+
+ private KiWiNode convertNode(Value value) throws ExecutionException {
+ if(value == null) {
+ return null;
+ } else if(value instanceof KiWiNode) {
+ return (KiWiNode)value;
+ } else if(value instanceof URI) {
+ return uriCache.get((URI)value);
+ } else if(value instanceof BNode) {
+ return bnodeCache.get(((BNode)value));
+ } else if(value instanceof Literal) {
+ Literal l = (Literal)value;
+ return literalCache.get(l);
+ } else {
+ throw new IllegalArgumentException("the value passed as argument does not have the correct type");
+ }
+
+ }
+
+ private KiWiLiteral createLiteral(Literal l) throws ExecutionException {
+ String value = l.getLabel();
+ String lang = l.getLanguage();
+ URI type = l.getDatatype();
+
+
+ Locale locale;
+ if(lang != null) {
+ locale = localeCache.get(lang);
+ } else {
+ locale = null;
+ }
+ if(locale == null) {
+ lang = null;
+ }
+
+
+ KiWiLiteral result;
+ final KiWiUriResource rtype = type==null ? null : uriCache.get(type);
+
+ try {
+
+ try {
+ // differentiate between the different types of the value
+ if (type == null) {
+ // FIXME: MARMOTTA-39 (this is to avoid a NullPointerException in the following if-clauses)
+ result = connection.loadLiteral(value.toString(), lang, rtype);
+
+ if(result == null) {
+ result = new KiWiStringLiteral(value.toString(), locale, rtype);
+ }
+ } else if(type.equals(Namespaces.NS_XSD+"dateTime")) {
+ // parse if necessary
+ final Date dvalue = DateUtils.parseDate(value.toString());
+
+ result = connection.loadLiteral(dvalue);
+
+ if(result == null) {
+ result= new KiWiDateLiteral(dvalue, rtype);
+ }
+ } else if(type.equals(Namespaces.NS_XSD+"integer") || type.equals(Namespaces.NS_XSD+"long")) {
+ long ivalue = Long.parseLong(value.toString());
+
+ result = connection.loadLiteral(ivalue);
+
+ if(result == null) {
+ result= new KiWiIntLiteral(ivalue, rtype);
+ }
+ } else if(type.equals(Namespaces.NS_XSD+"double") || type.equals(Namespaces.NS_XSD+"float")) {
+ double dvalue = Double.parseDouble(value.toString());
+
+ result = connection.loadLiteral(dvalue);
+
+ if(result == null) {
+ result= new KiWiDoubleLiteral(dvalue, rtype);
+ }
+ } else if(type.equals(Namespaces.NS_XSD+"boolean")) {
+ boolean bvalue = Boolean.parseBoolean(value.toString());
+
+ result = connection.loadLiteral(bvalue);
+
+ if(result == null) {
+ result= new KiWiBooleanLiteral(bvalue, rtype);
+ }
+ } else {
+ result = connection.loadLiteral(value.toString(), lang, rtype);
+
+ if(result == null) {
+ result = new KiWiStringLiteral(value.toString(), locale, rtype);
+ }
+ }
+ } catch(IllegalArgumentException ex) {
+ // malformed number or date
+ log.warn("malformed argument for typed literal of type {}: {}", rtype.stringValue(), value);
+ KiWiUriResource mytype = createURI(Namespaces.NS_XSD+"string");
+
+ result = connection.loadLiteral(value.toString(), lang, mytype);
+
+ if(result == null) {
+ result = new KiWiStringLiteral(value.toString(), locale, mytype);
+ }
+
+ }
+
+ if(result.getId() == null) {
+ connection.storeNode(result, false);
+ }
+
+ return result;
+
+
+ } catch (SQLException e) {
+ log.error("database error, could not load literal",e);
+ throw new IllegalStateException("database error, could not load literal",e);
+ }
+ }
+
+ private KiWiUriResource createURI(String uri) {
+ try {
+ // first look in the registry for newly created resources if the resource has already been created and
+ // is still volatile
+ KiWiUriResource result = connection.loadUriResource(uri);
+
+ if(result == null) {
+ result = new KiWiUriResource(uri);
+
+ connection.storeNode(result, false);
+
+ }
+ if(result.getId() == null) {
+ log.error("node ID is null!");
+ }
+
+ return result;
+ } catch (SQLException e) {
+ log.error("database error, could not load URI resource",e);
+ throw new IllegalStateException("database error, could not load URI resource",e);
+ }
+ }
+
+ private KiWiAnonResource createBNode(String nodeID) {
+ try {
+ // first look in the registry for newly created resources if the resource has already been created and
+ // is still volatile
+ KiWiAnonResource result = connection.loadAnonResource(nodeID);
+
+ if(result == null) {
+ result = new KiWiAnonResource(nodeID);
+ connection.storeNode(result, false);
+ }
+ if(result.getId() == null) {
+ log.error("node ID is null!");
+ }
+
+ return result;
+ } catch (SQLException e) {
+ log.error("database error, could not load anonymous resource",e);
+ throw new IllegalStateException("database error, could not load anonymous resource",e);
+ }
+ }
+
+
+
+ /**
+ * Handles a comment.
+ *
+ * @param comment The comment.
+ * @throws org.openrdf.rio.RDFHandlerException
+ * If the RDF handler has encountered an unrecoverable error.
+ */
+ @Override
+ public void handleComment(String comment) throws RDFHandlerException {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/be5eddcc/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/KiWiPostgresHandler.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/KiWiPostgresHandler.java b/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/KiWiPostgresHandler.java
new file mode 100644
index 0000000..35f8805
--- /dev/null
+++ b/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/KiWiPostgresHandler.java
@@ -0,0 +1,111 @@
+package org.apache.marmotta.kiwi.loader.pgsql;
+
+import org.apache.marmotta.kiwi.loader.KiWiLoaderConfiguration;
+import org.apache.marmotta.kiwi.model.rdf.KiWiNode;
+import org.apache.marmotta.kiwi.model.rdf.KiWiTriple;
+import org.apache.marmotta.kiwi.persistence.KiWiConnection;
+import org.apache.marmotta.kiwi.sail.KiWiStore;
+import org.openrdf.model.Statement;
+import org.openrdf.rio.RDFHandler;
+import org.openrdf.rio.RDFHandlerException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * A fast-lane RDF import handler for PostgreSQL backends. This importer takes advantage of the PostgreSQL COPY command
+ * that allows direct injection into the database. It works by creating an intermediate CSV buffer that is flushed into
+ * the databases in batches (using a configurable batch size).
+ *
+ * @author Sebastian Schaffert (sschaffert@apache.org)
+ */
+public class KiWiPostgresHandler implements RDFHandler {
+
+ private static Logger log = LoggerFactory.getLogger(KiWiPostgresHandler.class);
+
+
+ private KiWiConnection connection;
+ private KiWiStore store;
+
+ private KiWiLoaderConfiguration config;
+
+ private List<KiWiNode> nodeBacklog;
+ private List<KiWiTriple> tripleBacklog;
+
+
+
+
+ public KiWiPostgresHandler(KiWiStore store, KiWiLoaderConfiguration config) {
+ this.store = store;
+ this.config = config;
+ }
+
+ /**
+ * Signals the end of the RDF data. This method is called when all data has
+ * been reported.
+ *
+ * @throws org.openrdf.rio.RDFHandlerException
+ * If the RDF handler has encountered an unrecoverable error.
+ */
+ @Override
+ public void endRDF() throws RDFHandlerException {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ /**
+ * Signals the start of the RDF data. This method is called before any data
+ * is reported.
+ *
+ * @throws org.openrdf.rio.RDFHandlerException
+ * If the RDF handler has encountered an unrecoverable error.
+ */
+ @Override
+ public void startRDF() throws RDFHandlerException {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ /**
+ * Handles a namespace declaration/definition. A namespace declaration
+ * associates a (short) prefix string with the namespace's URI. The prefix
+ * for default namespaces, which do not have an associated prefix, are
+ * represented as empty strings.
+ *
+ * @param prefix The prefix for the namespace, or an empty string in case of a
+ * default namespace.
+ * @param uri The URI that the prefix maps to.
+ * @throws org.openrdf.rio.RDFHandlerException
+ * If the RDF handler has encountered an unrecoverable error.
+ */
+ @Override
+ public void handleNamespace(String prefix, String uri) throws RDFHandlerException {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ /**
+ * Handles a statement.
+ *
+ * @param st The statement.
+ * @throws org.openrdf.rio.RDFHandlerException
+ * If the RDF handler has encountered an unrecoverable error.
+ */
+ @Override
+ public void handleStatement(Statement st) throws RDFHandlerException {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ /**
+ * Handles a comment.
+ *
+ * @param comment The comment.
+ * @throws org.openrdf.rio.RDFHandlerException
+ * If the RDF handler has encountered an unrecoverable error.
+ */
+ @Override
+ public void handleComment(String comment) throws RDFHandlerException {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/be5eddcc/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/csv/CSVUtil.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/csv/CSVUtil.java b/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/csv/CSVUtil.java
new file mode 100644
index 0000000..9d4010b
--- /dev/null
+++ b/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/csv/CSVUtil.java
@@ -0,0 +1,125 @@
+package org.apache.marmotta.kiwi.loader.pgsql.csv;
+
+import org.apache.marmotta.kiwi.model.rdf.KiWiAnonResource;
+import org.apache.marmotta.kiwi.model.rdf.KiWiBooleanLiteral;
+import org.apache.marmotta.kiwi.model.rdf.KiWiDateLiteral;
+import org.apache.marmotta.kiwi.model.rdf.KiWiDoubleLiteral;
+import org.apache.marmotta.kiwi.model.rdf.KiWiIntLiteral;
+import org.apache.marmotta.kiwi.model.rdf.KiWiNode;
+import org.apache.marmotta.kiwi.model.rdf.KiWiStringLiteral;
+import org.apache.marmotta.kiwi.model.rdf.KiWiTriple;
+import org.apache.marmotta.kiwi.model.rdf.KiWiUriResource;
+import org.openrdf.model.URI;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.supercsv.cellprocessor.Optional;
+import org.supercsv.cellprocessor.constraint.NotNull;
+import org.supercsv.cellprocessor.constraint.Unique;
+import org.supercsv.cellprocessor.ift.CellProcessor;
+import org.supercsv.io.CsvListWriter;
+import org.supercsv.prefs.CsvPreference;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+import java.util.Locale;
+
+/**
+ * Add file description here!
+ *
+ * @author Sebastian Schaffert (sschaffert@apache.org)
+ */
+public class CSVUtil {
+
+ private static Logger log = LoggerFactory.getLogger(CSVUtil.class);
+
+
+ final static CellProcessor[] nodeProcessors = new CellProcessor[] {
+ new Unique(), // node ID
+ new NodeTypeProcessor(), // ntype
+ new NotNull(), // svalue
+ new Optional(), // dvalue
+ new Optional(), // ivalue
+ new SQLTimestampProcessor(), // tvalue
+ new Optional(new SQLBooleanProcessor()), // bvalue
+ new Optional(new NodeIDProcessor()), // ltype
+ new Optional(new LanguageProcessor()), // lang
+ new SQLTimestampProcessor(), // createdAt
+ };
+
+
+ final static CellProcessor[] tripleProcessors = new CellProcessor[] {
+ new Unique(), // triple ID
+ new NodeIDProcessor(), // subject
+ new NodeIDProcessor(), // predicate
+ new NodeIDProcessor(), // object
+ new Optional(new NodeIDProcessor()), // context
+ new Optional(new NodeIDProcessor()), // creator
+ new SQLBooleanProcessor(), // inferred
+ new SQLBooleanProcessor(), // deleted
+ new SQLTimestampProcessor(), // createdAt
+ new SQLTimestampProcessor(), // deletedAt
+ };
+
+
+ public static void flushTriples(Iterable<KiWiTriple> tripleBacklog, OutputStream out) throws IOException {
+ CsvListWriter writer = new CsvListWriter(new OutputStreamWriter(out), CsvPreference.STANDARD_PREFERENCE);
+ for(KiWiTriple t : tripleBacklog) {
+ List<Object> row = Arrays.<Object>asList(
+ t.getId(), t.getSubject(), t.getPredicate(), t.getObject(), t.getContext(), t.getCreator(), t.isInferred(), t.isDeleted(), t.getCreated(), t.getDeletedAt()
+ );
+
+ if(row != null) {
+ writer.write(row, tripleProcessors);
+ }
+ }
+ writer.close();
+ }
+
+
+
+ public static void flushNodes(Iterable<KiWiNode> nodeBacklog, OutputStream out) throws IOException {
+ CsvListWriter writer = new CsvListWriter(new OutputStreamWriter(out), CsvPreference.STANDARD_PREFERENCE);
+ for(KiWiNode n : nodeBacklog) {
+ List<Object> row = null;
+ if(n instanceof KiWiUriResource) {
+ KiWiUriResource u = (KiWiUriResource)n;
+ row = createNodeList(u.getId(), u.getClass(), u.stringValue(), null, null, null, null, null, null, u.getCreated());
+ } else if(n instanceof KiWiAnonResource) {
+ KiWiAnonResource a = (KiWiAnonResource)n;
+ row = createNodeList(a.getId(), a.getClass(), a.stringValue(), null, null, null, null, null, null, a.getCreated());
+ } else if(n instanceof KiWiIntLiteral) {
+ KiWiIntLiteral l = (KiWiIntLiteral)n;
+ row = createNodeList(l.getId(), l.getClass(), l.getContent(), l.getDoubleContent(), l.getIntContent(), null, null, l.getDatatype(), l.getLocale(), l.getCreated());
+ } else if(n instanceof KiWiDoubleLiteral) {
+ KiWiDoubleLiteral l = (KiWiDoubleLiteral)n;
+ row = createNodeList(l.getId(), l.getClass(), l.getContent(), l.getDoubleContent(), null, null, null, l.getDatatype(), l.getLocale(), l.getCreated());
+ } else if(n instanceof KiWiBooleanLiteral) {
+ KiWiBooleanLiteral l = (KiWiBooleanLiteral)n;
+ row = createNodeList(l.getId(), l.getClass(), l.getContent(), null, null, null, l.booleanValue(), l.getDatatype(), l.getLocale(), l.getCreated());
+ } else if(n instanceof KiWiDateLiteral) {
+ KiWiDateLiteral l = (KiWiDateLiteral)n;
+ row = createNodeList(l.getId(), l.getClass(), l.getContent(), null, null, l.getDateContent(), null, l.getDatatype(), l.getLocale(), l.getCreated());
+ } else if(n instanceof KiWiStringLiteral) {
+ KiWiStringLiteral l = (KiWiStringLiteral)n;
+ row = createNodeList(l.getId(), l.getClass(), l.getContent(), null, null, null, null, l.getDatatype(), l.getLocale(), l.getCreated());
+ } else {
+ log.warn("unknown node type, cannot flush to import stream: {}", n.getClass());
+ }
+
+ if(row != null) {
+ writer.write(row, nodeProcessors);
+ }
+ }
+ writer.close();
+ }
+
+ private static List<Object> createNodeList(Long id, Class type, String content, Double dbl, Long lng, Date date, Boolean bool, URI dtype, Locale lang, Date created) {
+ return Arrays.asList(new Object[]{
+ id, type, content, dbl, lng, date, bool, dtype, lang, created
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/be5eddcc/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/csv/LanguageProcessor.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/csv/LanguageProcessor.java b/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/csv/LanguageProcessor.java
new file mode 100644
index 0000000..03d7175
--- /dev/null
+++ b/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/csv/LanguageProcessor.java
@@ -0,0 +1,48 @@
+package org.apache.marmotta.kiwi.loader.pgsql.csv;
+
+import org.supercsv.cellprocessor.CellProcessorAdaptor;
+import org.supercsv.cellprocessor.ift.CellProcessor;
+import org.supercsv.exception.SuperCsvCellProcessorException;
+import org.supercsv.util.CsvContext;
+
+import java.util.Locale;
+
+/**
+ * Add file description here!
+ *
+ * @author Sebastian Schaffert (sschaffert@apache.org)
+ */
+public class LanguageProcessor extends CellProcessorAdaptor implements CellProcessor {
+
+ /**
+ * Constructor used by CellProcessors to indicate that they are the last processor in the chain.
+ */
+ public LanguageProcessor() {
+ }
+
+ /**
+ * Constructor used by CellProcessors that require <tt>CellProcessor</tt> chaining (further processing is required).
+ *
+ * @param next the next <tt>CellProcessor</tt> in the chain
+ * @throws NullPointerException if next is null
+ */
+ public LanguageProcessor(CellProcessor next) {
+ super(next);
+ }
+
+ /**
+ * This method is invoked by the framework when the processor needs to process data or check constraints.
+ *
+ * @since 1.0
+ */
+ @Override
+ public Object execute(Object value, CsvContext context) {
+ validateInputNotNull(value, context);
+
+ if( !(value instanceof Locale) ) {
+ throw new SuperCsvCellProcessorException(Locale.class, value, context, this);
+ }
+
+ return ((Locale)value).getLanguage();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/be5eddcc/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/csv/NodeIDProcessor.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/csv/NodeIDProcessor.java b/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/csv/NodeIDProcessor.java
new file mode 100644
index 0000000..27a478d
--- /dev/null
+++ b/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/csv/NodeIDProcessor.java
@@ -0,0 +1,49 @@
+package org.apache.marmotta.kiwi.loader.pgsql.csv;
+
+import org.apache.marmotta.kiwi.model.rdf.KiWiNode;
+import org.apache.marmotta.kiwi.model.rdf.KiWiUriResource;
+import org.supercsv.cellprocessor.CellProcessorAdaptor;
+import org.supercsv.cellprocessor.ift.CellProcessor;
+import org.supercsv.exception.SuperCsvCellProcessorException;
+import org.supercsv.util.CsvContext;
+
+/**
+ * Add file description here!
+ *
+ * @author Sebastian Schaffert (sschaffert@apache.org)
+ */
+public class NodeIDProcessor extends CellProcessorAdaptor implements CellProcessor {
+
+ /**
+ * Constructor used by CellProcessors to indicate that they are the last processor in the chain.
+ */
+ public NodeIDProcessor() {
+ }
+
+ /**
+ * Constructor used by CellProcessors that require <tt>CellProcessor</tt> chaining (further processing is required).
+ *
+ * @param next the next <tt>CellProcessor</tt> in the chain
+ * @throws NullPointerException if next is null
+ */
+ public NodeIDProcessor(CellProcessor next) {
+ super(next);
+ }
+
+ /**
+ * This method is invoked by the framework when the processor needs to process data or check constraints.
+ *
+ * @since 1.0
+ */
+ @Override
+ public Object execute(Object value, CsvContext context) {
+ validateInputNotNull(value, context);
+
+ if( !(value instanceof KiWiNode) ) {
+ throw new SuperCsvCellProcessorException(KiWiUriResource.class, value, context, this);
+ }
+
+ return ((KiWiNode)value).getId();
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/be5eddcc/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/csv/NodeTypeProcessor.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/csv/NodeTypeProcessor.java b/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/csv/NodeTypeProcessor.java
new file mode 100644
index 0000000..d9280b0
--- /dev/null
+++ b/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/csv/NodeTypeProcessor.java
@@ -0,0 +1,70 @@
+package org.apache.marmotta.kiwi.loader.pgsql.csv;
+
+import org.apache.marmotta.kiwi.model.rdf.KiWiAnonResource;
+import org.apache.marmotta.kiwi.model.rdf.KiWiBooleanLiteral;
+import org.apache.marmotta.kiwi.model.rdf.KiWiDateLiteral;
+import org.apache.marmotta.kiwi.model.rdf.KiWiDoubleLiteral;
+import org.apache.marmotta.kiwi.model.rdf.KiWiIntLiteral;
+import org.apache.marmotta.kiwi.model.rdf.KiWiStringLiteral;
+import org.apache.marmotta.kiwi.model.rdf.KiWiUriResource;
+import org.supercsv.cellprocessor.CellProcessorAdaptor;
+import org.supercsv.cellprocessor.ift.CellProcessor;
+import org.supercsv.exception.SuperCsvCellProcessorException;
+import org.supercsv.util.CsvContext;
+
+/**
+ * convert KiWiNode subclasses into their proper nodetype
+ *
+ * @author Sebastian Schaffert (sschaffert@apache.org)
+ */
+public class NodeTypeProcessor extends CellProcessorAdaptor implements CellProcessor {
+
+ /**
+ * Constructor used by CellProcessors to indicate that they are the last processor in the chain.
+ */
+ public NodeTypeProcessor() {
+ }
+
+ /**
+ * Constructor used by CellProcessors that require <tt>CellProcessor</tt> chaining (further processing is required).
+ *
+ * @param next the next <tt>CellProcessor</tt> in the chain
+ * @throws NullPointerException if next is null
+ */
+ public NodeTypeProcessor(CellProcessor next) {
+ super(next);
+ }
+
+ /**
+ * This method is invoked by the framework when the processor needs to process data or check constraints.
+ *
+ * @since 1.0
+ */
+ @Override
+ public Object execute(Object value, CsvContext context) {
+ validateInputNotNull(value, context);
+
+ if( !(value instanceof Class) ) {
+ throw new SuperCsvCellProcessorException(Class.class, value, context, this);
+ }
+
+
+ if(KiWiUriResource.class.equals(value)) {
+ return "uri";
+ } else if(KiWiAnonResource.class.equals(value)) {
+ return "bnode";
+ } else if(KiWiStringLiteral.class.equals(value)) {
+ return "string";
+ } else if(KiWiIntLiteral.class.equals(value)) {
+ return "int";
+ } else if(KiWiDoubleLiteral.class.equals(value)) {
+ return "double";
+ } else if(KiWiDateLiteral.class.equals(value)) {
+ return "date";
+ } else if(KiWiBooleanLiteral.class.equals(value)) {
+ return "boolean";
+ } else {
+ return "string";
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/be5eddcc/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/csv/SQLBooleanProcessor.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/csv/SQLBooleanProcessor.java b/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/csv/SQLBooleanProcessor.java
new file mode 100644
index 0000000..49fa48a
--- /dev/null
+++ b/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/csv/SQLBooleanProcessor.java
@@ -0,0 +1,51 @@
+package org.apache.marmotta.kiwi.loader.pgsql.csv;
+
+import org.supercsv.cellprocessor.CellProcessorAdaptor;
+import org.supercsv.cellprocessor.ift.BoolCellProcessor;
+import org.supercsv.cellprocessor.ift.CellProcessor;
+import org.supercsv.exception.SuperCsvCellProcessorException;
+import org.supercsv.util.CsvContext;
+
+/**
+ * Add file description here!
+ *
+ * @author Sebastian Schaffert (sschaffert@apache.org)
+ */
+public class SQLBooleanProcessor extends CellProcessorAdaptor implements BoolCellProcessor {
+
+ /**
+ * Constructor used by CellProcessors to indicate that they are the last processor in the chain.
+ */
+ public SQLBooleanProcessor() {
+ }
+
+ /**
+ * Constructor used by CellProcessors that require <tt>CellProcessor</tt> chaining (further processing is required).
+ *
+ * @param next the next <tt>CellProcessor</tt> in the chain
+ * @throws NullPointerException if next is null
+ */
+ public SQLBooleanProcessor(CellProcessor next) {
+ super(next);
+ }
+
+ /**
+ * This method is invoked by the framework when the processor needs to process data or check constraints.
+ *
+ * @since 1.0
+ */
+ @Override
+ public Object execute(Object value, CsvContext context) {
+ validateInputNotNull(value, context);
+
+ if( !(value instanceof Boolean) ) {
+ throw new SuperCsvCellProcessorException(Boolean.class, value, context, this);
+ }
+
+ if( ((Boolean)value).booleanValue()) {
+ return "t";
+ } else {
+ return "f";
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/be5eddcc/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/csv/SQLDateProcessor.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/csv/SQLDateProcessor.java b/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/csv/SQLDateProcessor.java
new file mode 100644
index 0000000..1fbb719
--- /dev/null
+++ b/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/csv/SQLDateProcessor.java
@@ -0,0 +1,51 @@
+package org.apache.marmotta.kiwi.loader.pgsql.csv;
+
+import org.supercsv.cellprocessor.CellProcessorAdaptor;
+import org.supercsv.cellprocessor.ift.CellProcessor;
+import org.supercsv.cellprocessor.ift.DateCellProcessor;
+import org.supercsv.exception.SuperCsvCellProcessorException;
+import org.supercsv.util.CsvContext;
+
+import java.util.Date;
+
+/**
+ * Add file description here!
+ *
+ * @author Sebastian Schaffert (sschaffert@apache.org)
+ */
+public class SQLDateProcessor extends CellProcessorAdaptor implements DateCellProcessor {
+
+
+ /**
+ * Constructor used by CellProcessors to indicate that they are the last processor in the chain.
+ */
+ public SQLDateProcessor() {
+ }
+
+ /**
+ * Constructor used by CellProcessors that require <tt>CellProcessor</tt> chaining (further processing is required).
+ *
+ * @param next the next <tt>CellProcessor</tt> in the chain
+ * @throws NullPointerException if next is null
+ */
+ public SQLDateProcessor(CellProcessor next) {
+ super(next);
+ }
+
+ /**
+ * This method is invoked by the framework when the processor needs to process data or check constraints.
+ *
+ * @since 1.0
+ */
+ @Override
+ public Object execute(Object value, CsvContext context) {
+ validateInputNotNull(value, context);
+
+ if( !(value instanceof Date) ) {
+ throw new SuperCsvCellProcessorException(Date.class, value, context, this);
+ }
+
+ java.sql.Date date = new java.sql.Date(((Date)value).getTime());
+ return date.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/be5eddcc/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/csv/SQLTimestampProcessor.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/csv/SQLTimestampProcessor.java b/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/csv/SQLTimestampProcessor.java
new file mode 100644
index 0000000..2a3fc0f
--- /dev/null
+++ b/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/csv/SQLTimestampProcessor.java
@@ -0,0 +1,36 @@
+package org.apache.marmotta.kiwi.loader.pgsql.csv;
+
+import org.supercsv.cellprocessor.CellProcessorAdaptor;
+import org.supercsv.cellprocessor.ift.DateCellProcessor;
+import org.supercsv.exception.SuperCsvCellProcessorException;
+import org.supercsv.util.CsvContext;
+
+import java.sql.Timestamp;
+import java.util.Date;
+
+/**
+ * Add file description here!
+ *
+ * @author Sebastian Schaffert (sschaffert@apache.org)
+ */
+public class SQLTimestampProcessor extends CellProcessorAdaptor implements DateCellProcessor {
+
+ /**
+ * This method is invoked by the framework when the processor needs to process data or check constraints.
+ *
+ * @since 1.0
+ */
+ @Override
+ public Object execute(Object value, CsvContext context) {
+ if(value == null) {
+ return null;
+ }
+
+ if( !(value instanceof Date) ) {
+ throw new SuperCsvCellProcessorException(Date.class, value, context, this);
+ }
+
+ Timestamp date = new Timestamp(((Date)value).getTime());
+ return date.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/be5eddcc/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/CSVUtilTest.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/CSVUtilTest.java b/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/CSVUtilTest.java
new file mode 100644
index 0000000..d646c84
--- /dev/null
+++ b/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/CSVUtilTest.java
@@ -0,0 +1,118 @@
+package org.apache.marmotta.kiwi.loader;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.marmotta.commons.vocabulary.XSD;
+import org.apache.marmotta.kiwi.loader.pgsql.csv.CSVUtil;
+import org.apache.marmotta.kiwi.model.rdf.KiWiAnonResource;
+import org.apache.marmotta.kiwi.model.rdf.KiWiBooleanLiteral;
+import org.apache.marmotta.kiwi.model.rdf.KiWiDateLiteral;
+import org.apache.marmotta.kiwi.model.rdf.KiWiDoubleLiteral;
+import org.apache.marmotta.kiwi.model.rdf.KiWiIntLiteral;
+import org.apache.marmotta.kiwi.model.rdf.KiWiNode;
+import org.apache.marmotta.kiwi.model.rdf.KiWiStringLiteral;
+import org.apache.marmotta.kiwi.model.rdf.KiWiUriResource;
+import org.junit.Test;
+
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+
+/**
+ * Add file description here!
+ *
+ * @author Sebastian Schaffert (sschaffert@apache.org)
+ */
+public class CSVUtilTest {
+
+
+ protected static Random rnd = new Random();
+
+ protected static long id = 0;
+
+
+ final static KiWiUriResource TYPE_INT = createURI(XSD.Integer.stringValue());
+ final static KiWiUriResource TYPE_DBL = createURI(XSD.Double.stringValue());
+ final static KiWiUriResource TYPE_BOOL = createURI(XSD.Boolean.stringValue());
+ final static KiWiUriResource TYPE_DATE = createURI(XSD.DateTime.stringValue());
+
+
+
+
+
+ @Test
+ public void testWriteNodes() throws IOException {
+ FileOutputStream out = new FileOutputStream("/tmp/nodes.csv");
+
+
+ List<KiWiNode> nodes = new ArrayList<>(10000);
+
+ nodes.add(TYPE_INT);
+ nodes.add(TYPE_DBL);
+ nodes.add(TYPE_BOOL);
+ nodes.add(TYPE_DATE);
+
+ // randomly create 10000 nodes
+ for(int i=0; i<10000; i++) {
+ nodes.add(randomObject());
+ }
+
+ // flush out nodes
+ CSVUtil.flushNodes(nodes,out);
+
+ out.close();
+
+ }
+
+
+
+ /**
+ * Return a random URI, with a 10% chance of returning a URI that has already been used.
+ * @return
+ */
+ protected static KiWiUriResource randomURI() {
+ KiWiUriResource r = new KiWiUriResource("http://localhost/"+ RandomStringUtils.randomAlphanumeric(8));
+ r.setId(id++);
+ return r;
+ }
+
+
+ protected static KiWiUriResource createURI(String uri) {
+ KiWiUriResource r = new KiWiUriResource(uri);
+ r.setId(id++);
+ return r;
+ }
+
+ /**
+ * Return a random RDF value, either a reused object (10% chance) or of any other kind.
+ * @return
+ */
+ protected static KiWiNode randomObject() {
+ KiWiNode object;
+ switch(rnd.nextInt(7)) {
+ case 0: object = new KiWiUriResource("http://localhost/"+ RandomStringUtils.randomAlphanumeric(8));
+ break;
+ case 1: object = new KiWiAnonResource(UUID.randomUUID().toString());
+ break;
+ case 2: object = new KiWiStringLiteral(RandomStringUtils.randomAscii(40));
+ break;
+ case 3: object = new KiWiIntLiteral(rnd.nextLong(), TYPE_INT);
+ break;
+ case 4: object = new KiWiDoubleLiteral(rnd.nextDouble(), TYPE_DBL);
+ break;
+ case 5: object = new KiWiBooleanLiteral(rnd.nextBoolean(), TYPE_BOOL);
+ break;
+ case 6: object = new KiWiDateLiteral(new Date(), TYPE_DATE);
+ break;
+ default: object = new KiWiUriResource("http://localhost/"+ RandomStringUtils.randomAlphanumeric(8));
+ break;
+
+ }
+ object.setId(id++);
+ return object;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/be5eddcc/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/KiWiHandlerTest.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/KiWiHandlerTest.java b/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/KiWiHandlerTest.java
index 59509a6..ec617de 100644
--- a/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/KiWiHandlerTest.java
+++ b/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/KiWiHandlerTest.java
@@ -1,6 +1,7 @@
package org.apache.marmotta.kiwi.loader;
import org.apache.marmotta.kiwi.config.KiWiConfiguration;
+import org.apache.marmotta.kiwi.loader.generic.KiWiHandler;
import org.apache.marmotta.kiwi.sail.KiWiStore;
import org.apache.marmotta.kiwi.test.junit.KiWiDatabaseRunner;
import org.junit.After;
http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/be5eddcc/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/evaluation/KiWiEvaluationStrategyImpl.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/evaluation/KiWiEvaluationStrategyImpl.java b/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/evaluation/KiWiEvaluationStrategyImpl.java
index 19ede97..16c6707 100644
--- a/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/evaluation/KiWiEvaluationStrategyImpl.java
+++ b/libraries/kiwi/kiwi-sparql/src/main/java/org/apache/marmotta/kiwi/sparql/evaluation/KiWiEvaluationStrategyImpl.java
@@ -119,6 +119,10 @@ public class KiWiEvaluationStrategyImpl extends EvaluationStrategyImpl{
}
}
+ @Override
+ public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(LeftJoin leftJoin, BindingSet bindings) throws QueryEvaluationException {
+ return super.evaluate(leftJoin, bindings);
+ }
/**
* Test if a tuple expression is supported nby the optimized evaluation; in this case we can apply a specific optimization.
http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/be5eddcc/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/junit/KiWiDatabaseRunner.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/junit/KiWiDatabaseRunner.java b/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/junit/KiWiDatabaseRunner.java
index 170fdba..46d159d 100644
--- a/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/junit/KiWiDatabaseRunner.java
+++ b/libraries/kiwi/kiwi-triplestore/src/test/java/org/apache/marmotta/kiwi/test/junit/KiWiDatabaseRunner.java
@@ -16,16 +16,6 @@
*/
package org.apache.marmotta.kiwi.test.junit;
-import java.lang.annotation.Annotation;
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-
import org.apache.commons.lang3.ArrayUtils;
import org.apache.marmotta.kiwi.config.KiWiConfiguration;
import org.apache.marmotta.kiwi.persistence.KiWiDialect;
@@ -48,6 +38,16 @@ import org.junit.runners.model.Statement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.lang.annotation.Annotation;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
/**
* Specialized {@link Parameterized} runner for UnitTests that injects the database config for KiWi.
* <p>
@@ -235,8 +235,11 @@ public class KiWiDatabaseRunner extends Suite {
@Override
protected String testName(FrameworkMethod method) {
- return method.getName() + "(" + config.getName() + ")";
+ //return method.getName() + "(" + config.getName() + ")";
+ return method.getName();
}
+
+
@Override
protected void validateConstructor(List<Throwable> errors) {
http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/be5eddcc/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index 08242b4..ceb7b03 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -607,6 +607,11 @@
<version>1.8</version>
</dependency>
<dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-compress</artifactId>
+ <version>1.6</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>4.3</version>
[2/3] git commit: first version of PostgreSQL bulk loader
Posted by ss...@apache.org.
first version of PostgreSQL bulk loader
Project: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/commit/6f2ef0d7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/tree/6f2ef0d7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/diff/6f2ef0d7
Branch: refs/heads/develop
Commit: 6f2ef0d780c24a0cb547fcc40881bb6287712927
Parents: be5eddc
Author: Sebastian Schaffert <ss...@apache.org>
Authored: Fri Nov 15 20:40:54 2013 +0100
Committer: Sebastian Schaffert <ss...@apache.org>
Committed: Fri Nov 15 20:40:54 2013 +0100
----------------------------------------------------------------------
.../apache/marmotta/kiwi/loader/KiWiLoader.java | 9 +-
.../kiwi/loader/generic/KiWiHandler.java | 59 +++---
.../kiwi/loader/pgsql/KiWiPostgresHandler.java | 168 ++++++++++++-----
.../marmotta/kiwi/loader/pgsql/csv/CSVUtil.java | 125 -------------
.../kiwi/loader/pgsql/csv/PGCopyUtil.java | 142 ++++++++++++++
.../marmotta/kiwi/loader/CSVUtilTest.java | 118 ------------
.../marmotta/kiwi/loader/PGCopyUtilTest.java | 185 +++++++++++++++++++
.../marmotta/kiwi/model/rdf/KiWiTriple.java | 2 +-
8 files changed, 488 insertions(+), 320 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/6f2ef0d7/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/KiWiLoader.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/KiWiLoader.java b/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/KiWiLoader.java
index c15b3db..0972971 100644
--- a/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/KiWiLoader.java
+++ b/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/KiWiLoader.java
@@ -31,6 +31,7 @@ import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.marmotta.kiwi.config.KiWiConfiguration;
import org.apache.marmotta.kiwi.loader.generic.KiWiHandler;
+import org.apache.marmotta.kiwi.loader.pgsql.KiWiPostgresHandler;
import org.apache.marmotta.kiwi.persistence.KiWiDialect;
import org.apache.marmotta.kiwi.persistence.h2.H2Dialect;
import org.apache.marmotta.kiwi.persistence.mysql.MySQLDialect;
@@ -338,7 +339,13 @@ public class KiWiLoader {
config.setContext(context);
}
- KiWiHandler handler = new KiWiHandler(store,config);
+ KiWiHandler handler;
+ if(kiwi.getDialect() instanceof PostgreSQLDialect) {
+ config.setCommitBatchSize(10000);
+ handler = new KiWiPostgresHandler(store,config);
+ } else {
+ handler = new KiWiHandler(store,config);
+ }
RDFParser parser = Rio.createParser(forFileName);
parser.setRDFHandler(handler);
http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/6f2ef0d7/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/generic/KiWiHandler.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/generic/KiWiHandler.java b/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/generic/KiWiHandler.java
index f289c4e..189d268 100644
--- a/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/generic/KiWiHandler.java
+++ b/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/generic/KiWiHandler.java
@@ -39,19 +39,19 @@ public class KiWiHandler implements RDFHandler {
private static Logger log = LoggerFactory.getLogger(KiWiHandler.class);
- private KiWiConnection connection;
- private KiWiStore store;
+ protected KiWiConnection connection;
+ protected KiWiStore store;
- long count = 0;
- long start = 0;
- long previous = 0;
+ protected long count = 0;
+ protected long start = 0;
+ protected long previous = 0;
- private KiWiLoaderConfiguration config;
+ protected KiWiLoaderConfiguration config;
- private LoadingCache<Literal, KiWiLiteral> literalCache;
- private LoadingCache<URI, KiWiUriResource> uriCache;
- private LoadingCache<BNode, KiWiAnonResource> bnodeCache;
- private LoadingCache<String,Locale> localeCache;
+ protected LoadingCache<Literal, KiWiLiteral> literalCache;
+ protected LoadingCache<URI, KiWiUriResource> uriCache;
+ protected LoadingCache<BNode, KiWiAnonResource> bnodeCache;
+ protected LoadingCache<String,Locale> localeCache;
// if non-null, all imported statements will have this context (regardless whether they specified a different context)
private KiWiResource overrideContext;
@@ -201,16 +201,8 @@ public class KiWiHandler implements RDFHandler {
if(config.isStatementExistanceCheck()) {
result.setId(connection.getTripleId(subject, predicate, object, context, true));
}
- connection.storeTriple(result);
+ storeTriple(result);
- count++;
-
- if(count % config.getCommitBatchSize() == 0) {
- connection.commit();
-
- log.info("imported {} triples ({}/sec)", count, (config.getCommitBatchSize() * 1000) / (System.currentTimeMillis() - previous));
- previous = System.currentTimeMillis();
- }
} catch (SQLException | ExecutionException e) {
throw new RDFHandlerException(e);
}
@@ -236,7 +228,7 @@ public class KiWiHandler implements RDFHandler {
}
- private KiWiLiteral createLiteral(Literal l) throws ExecutionException {
+ protected KiWiLiteral createLiteral(Literal l) throws ExecutionException {
String value = l.getLabel();
String lang = l.getLanguage();
URI type = l.getDatatype();
@@ -321,7 +313,7 @@ public class KiWiHandler implements RDFHandler {
}
if(result.getId() == null) {
- connection.storeNode(result, false);
+ storeNode(result);
}
return result;
@@ -333,7 +325,7 @@ public class KiWiHandler implements RDFHandler {
}
}
- private KiWiUriResource createURI(String uri) {
+ protected KiWiUriResource createURI(String uri) {
try {
// first look in the registry for newly created resources if the resource has already been created and
// is still volatile
@@ -342,7 +334,7 @@ public class KiWiHandler implements RDFHandler {
if(result == null) {
result = new KiWiUriResource(uri);
- connection.storeNode(result, false);
+ storeNode(result);
}
if(result.getId() == null) {
@@ -356,7 +348,7 @@ public class KiWiHandler implements RDFHandler {
}
}
- private KiWiAnonResource createBNode(String nodeID) {
+ protected KiWiAnonResource createBNode(String nodeID) {
try {
// first look in the registry for newly created resources if the resource has already been created and
// is still volatile
@@ -364,7 +356,7 @@ public class KiWiHandler implements RDFHandler {
if(result == null) {
result = new KiWiAnonResource(nodeID);
- connection.storeNode(result, false);
+ storeNode(result);
}
if(result.getId() == null) {
log.error("node ID is null!");
@@ -378,6 +370,23 @@ public class KiWiHandler implements RDFHandler {
}
+ protected void storeNode(KiWiNode node) throws SQLException {
+ connection.storeNode(node, false);
+ }
+
+ protected void storeTriple(KiWiTriple result) throws SQLException {
+ connection.storeTriple(result);
+
+ count++;
+
+ if(count % config.getCommitBatchSize() == 0) {
+ connection.commit();
+
+ log.info("imported {} triples ({}/sec)", count, (config.getCommitBatchSize() * 1000) / (System.currentTimeMillis() - previous));
+ previous = System.currentTimeMillis();
+ }
+ }
+
/**
* Handles a comment.
http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/6f2ef0d7/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/KiWiPostgresHandler.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/KiWiPostgresHandler.java b/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/KiWiPostgresHandler.java
index 35f8805..3e6b0eb 100644
--- a/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/KiWiPostgresHandler.java
+++ b/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/KiWiPostgresHandler.java
@@ -1,17 +1,28 @@
package org.apache.marmotta.kiwi.loader.pgsql;
import org.apache.marmotta.kiwi.loader.KiWiLoaderConfiguration;
+import org.apache.marmotta.kiwi.loader.generic.KiWiHandler;
+import org.apache.marmotta.kiwi.loader.pgsql.csv.PGCopyUtil;
+import org.apache.marmotta.kiwi.model.rdf.KiWiAnonResource;
+import org.apache.marmotta.kiwi.model.rdf.KiWiLiteral;
import org.apache.marmotta.kiwi.model.rdf.KiWiNode;
import org.apache.marmotta.kiwi.model.rdf.KiWiTriple;
-import org.apache.marmotta.kiwi.persistence.KiWiConnection;
+import org.apache.marmotta.kiwi.model.rdf.KiWiUriResource;
import org.apache.marmotta.kiwi.sail.KiWiStore;
-import org.openrdf.model.Statement;
+import org.openrdf.model.Literal;
import org.openrdf.rio.RDFHandler;
import org.openrdf.rio.RDFHandlerException;
+import org.postgresql.copy.PGCopyOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
/**
* A fast-lane RDF import handler for PostgreSQL backends. This importer takes advantage of the PostgreSQL COPY command
@@ -20,40 +31,26 @@ import java.util.List;
*
* @author Sebastian Schaffert (sschaffert@apache.org)
*/
-public class KiWiPostgresHandler implements RDFHandler {
+public class KiWiPostgresHandler extends KiWiHandler implements RDFHandler {
private static Logger log = LoggerFactory.getLogger(KiWiPostgresHandler.class);
- private KiWiConnection connection;
- private KiWiStore store;
-
- private KiWiLoaderConfiguration config;
-
private List<KiWiNode> nodeBacklog;
private List<KiWiTriple> tripleBacklog;
-
+ private Map<Literal,KiWiLiteral> literalBacklogLookup;
+ private Map<String,KiWiUriResource> uriBacklogLookup;
+ private Map<String,KiWiAnonResource> bnodeBacklogLookup;
public KiWiPostgresHandler(KiWiStore store, KiWiLoaderConfiguration config) {
- this.store = store;
- this.config = config;
+ super(store, config);
}
/**
- * Signals the end of the RDF data. This method is called when all data has
- * been reported.
- *
- * @throws org.openrdf.rio.RDFHandlerException
- * If the RDF handler has encountered an unrecoverable error.
- */
- @Override
- public void endRDF() throws RDFHandlerException {
- //To change body of implemented methods use File | Settings | File Templates.
- }
- /**
+ /**
* Signals the start of the RDF data. This method is called before any data
* is reported.
*
@@ -62,50 +59,121 @@ public class KiWiPostgresHandler implements RDFHandler {
*/
@Override
public void startRDF() throws RDFHandlerException {
- //To change body of implemented methods use File | Settings | File Templates.
+ this.tripleBacklog = new ArrayList<>(config.getStatementBatchSize());
+ this.nodeBacklog = new ArrayList<>(config.getStatementBatchSize()*2);
+ this.literalBacklogLookup = new HashMap<>();
+ this.uriBacklogLookup = new HashMap<>();
+ this.bnodeBacklogLookup = new HashMap<>();
+
+ super.startRDF();
}
+
/**
- * Handles a namespace declaration/definition. A namespace declaration
- * associates a (short) prefix string with the namespace's URI. The prefix
- * for default namespaces, which do not have an associated prefix, are
- * represented as empty strings.
+ * Signals the end of the RDF data. This method is called when all data has
+ * been reported.
*
- * @param prefix The prefix for the namespace, or an empty string in case of a
- * default namespace.
- * @param uri The URI that the prefix maps to.
* @throws org.openrdf.rio.RDFHandlerException
* If the RDF handler has encountered an unrecoverable error.
*/
@Override
- public void handleNamespace(String prefix, String uri) throws RDFHandlerException {
- //To change body of implemented methods use File | Settings | File Templates.
+ public void endRDF() throws RDFHandlerException {
+ try {
+ flushBacklog();
+ } catch (SQLException e) {
+ throw new RDFHandlerException(e);
+ }
+
+ super.endRDF();
+
}
- /**
- * Handles a statement.
- *
- * @param st The statement.
- * @throws org.openrdf.rio.RDFHandlerException
- * If the RDF handler has encountered an unrecoverable error.
- */
+
@Override
- public void handleStatement(Statement st) throws RDFHandlerException {
- //To change body of implemented methods use File | Settings | File Templates.
+ protected KiWiAnonResource createBNode(String nodeID) {
+ // check in backlog, if not found call super method
+ KiWiAnonResource result = bnodeBacklogLookup.get(nodeID);
+ if(result == null) {
+ result = super.createBNode(nodeID);
+ }
+ return result;
}
- /**
- * Handles a comment.
- *
- * @param comment The comment.
- * @throws org.openrdf.rio.RDFHandlerException
- * If the RDF handler has encountered an unrecoverable error.
- */
@Override
- public void handleComment(String comment) throws RDFHandlerException {
- //To change body of implemented methods use File | Settings | File Templates.
+ protected KiWiLiteral createLiteral(Literal l) throws ExecutionException {
+ KiWiLiteral result = literalBacklogLookup.get(l);
+ if(result == null) {
+ result = super.createLiteral(l);
+ }
+ return result;
}
+ @Override
+ protected KiWiUriResource createURI(String uri) {
+ KiWiUriResource result = uriBacklogLookup.get(uri);
+ if(result == null) {
+ result = super.createURI(uri);
+ }
+ return result;
+ }
+ @Override
+ protected void storeNode(KiWiNode node) throws SQLException {
+ if(node.getId() == null) {
+ node.setId(connection.getNextSequence("nodes"));
+ }
+
+ nodeBacklog.add(node);
+
+ if(node instanceof KiWiUriResource) {
+ uriBacklogLookup.put(node.stringValue(),(KiWiUriResource)node);
+ } else if(node instanceof KiWiAnonResource) {
+ bnodeBacklogLookup.put(node.stringValue(), (KiWiAnonResource)node);
+ } else if(node instanceof KiWiLiteral) {
+ literalBacklogLookup.put((KiWiLiteral)node, (KiWiLiteral)node);
+ }
+ }
+ @Override
+ protected void storeTriple(KiWiTriple result) throws SQLException {
+ if(result.getId() == null) {
+ result.setId(connection.getNextSequence("triples"));
+ }
+
+ tripleBacklog.add(result);
+
+ count++;
+
+ if(count % config.getCommitBatchSize() == 0) {
+ flushBacklog();
+ connection.commit();
+
+ log.info("imported {} triples ({}/sec)", count, (config.getCommitBatchSize() * 1000) / (System.currentTimeMillis() - previous));
+ previous = System.currentTimeMillis();
+ }
+ }
+
+ private synchronized void flushBacklog() throws SQLException {
+ try {
+ // flush out nodes
+ PGCopyOutputStream nodesOut = new PGCopyOutputStream(PGCopyUtil.getWrappedConnection(connection.getJDBCConnection()), "COPY nodes FROM STDIN (FORMAT csv)");
+ PGCopyUtil.flushNodes(nodeBacklog, nodesOut);
+ nodesOut.close();
+
+ // flush out triples
+ PGCopyOutputStream triplesOut = new PGCopyOutputStream(PGCopyUtil.getWrappedConnection(connection.getJDBCConnection()), "COPY triples FROM STDIN (FORMAT csv)");
+ PGCopyUtil.flushTriples(tripleBacklog, triplesOut);
+ triplesOut.close();
+ } catch (IOException ex) {
+ throw new SQLException("error while flushing out data",ex);
+ }
+
+ nodeBacklog.clear();
+ tripleBacklog.clear();
+
+ uriBacklogLookup.clear();
+ bnodeBacklogLookup.clear();
+ literalBacklogLookup.clear();
+
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/6f2ef0d7/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/csv/CSVUtil.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/csv/CSVUtil.java b/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/csv/CSVUtil.java
deleted file mode 100644
index 9d4010b..0000000
--- a/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/csv/CSVUtil.java
+++ /dev/null
@@ -1,125 +0,0 @@
-package org.apache.marmotta.kiwi.loader.pgsql.csv;
-
-import org.apache.marmotta.kiwi.model.rdf.KiWiAnonResource;
-import org.apache.marmotta.kiwi.model.rdf.KiWiBooleanLiteral;
-import org.apache.marmotta.kiwi.model.rdf.KiWiDateLiteral;
-import org.apache.marmotta.kiwi.model.rdf.KiWiDoubleLiteral;
-import org.apache.marmotta.kiwi.model.rdf.KiWiIntLiteral;
-import org.apache.marmotta.kiwi.model.rdf.KiWiNode;
-import org.apache.marmotta.kiwi.model.rdf.KiWiStringLiteral;
-import org.apache.marmotta.kiwi.model.rdf.KiWiTriple;
-import org.apache.marmotta.kiwi.model.rdf.KiWiUriResource;
-import org.openrdf.model.URI;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.supercsv.cellprocessor.Optional;
-import org.supercsv.cellprocessor.constraint.NotNull;
-import org.supercsv.cellprocessor.constraint.Unique;
-import org.supercsv.cellprocessor.ift.CellProcessor;
-import org.supercsv.io.CsvListWriter;
-import org.supercsv.prefs.CsvPreference;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.OutputStreamWriter;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.List;
-import java.util.Locale;
-
-/**
- * Add file description here!
- *
- * @author Sebastian Schaffert (sschaffert@apache.org)
- */
-public class CSVUtil {
-
- private static Logger log = LoggerFactory.getLogger(CSVUtil.class);
-
-
- final static CellProcessor[] nodeProcessors = new CellProcessor[] {
- new Unique(), // node ID
- new NodeTypeProcessor(), // ntype
- new NotNull(), // svalue
- new Optional(), // dvalue
- new Optional(), // ivalue
- new SQLTimestampProcessor(), // tvalue
- new Optional(new SQLBooleanProcessor()), // bvalue
- new Optional(new NodeIDProcessor()), // ltype
- new Optional(new LanguageProcessor()), // lang
- new SQLTimestampProcessor(), // createdAt
- };
-
-
- final static CellProcessor[] tripleProcessors = new CellProcessor[] {
- new Unique(), // triple ID
- new NodeIDProcessor(), // subject
- new NodeIDProcessor(), // predicate
- new NodeIDProcessor(), // object
- new Optional(new NodeIDProcessor()), // context
- new Optional(new NodeIDProcessor()), // creator
- new SQLBooleanProcessor(), // inferred
- new SQLBooleanProcessor(), // deleted
- new SQLTimestampProcessor(), // createdAt
- new SQLTimestampProcessor(), // deletedAt
- };
-
-
- public static void flushTriples(Iterable<KiWiTriple> tripleBacklog, OutputStream out) throws IOException {
- CsvListWriter writer = new CsvListWriter(new OutputStreamWriter(out), CsvPreference.STANDARD_PREFERENCE);
- for(KiWiTriple t : tripleBacklog) {
- List<Object> row = Arrays.<Object>asList(
- t.getId(), t.getSubject(), t.getPredicate(), t.getObject(), t.getContext(), t.getCreator(), t.isInferred(), t.isDeleted(), t.getCreated(), t.getDeletedAt()
- );
-
- if(row != null) {
- writer.write(row, tripleProcessors);
- }
- }
- writer.close();
- }
-
-
-
- public static void flushNodes(Iterable<KiWiNode> nodeBacklog, OutputStream out) throws IOException {
- CsvListWriter writer = new CsvListWriter(new OutputStreamWriter(out), CsvPreference.STANDARD_PREFERENCE);
- for(KiWiNode n : nodeBacklog) {
- List<Object> row = null;
- if(n instanceof KiWiUriResource) {
- KiWiUriResource u = (KiWiUriResource)n;
- row = createNodeList(u.getId(), u.getClass(), u.stringValue(), null, null, null, null, null, null, u.getCreated());
- } else if(n instanceof KiWiAnonResource) {
- KiWiAnonResource a = (KiWiAnonResource)n;
- row = createNodeList(a.getId(), a.getClass(), a.stringValue(), null, null, null, null, null, null, a.getCreated());
- } else if(n instanceof KiWiIntLiteral) {
- KiWiIntLiteral l = (KiWiIntLiteral)n;
- row = createNodeList(l.getId(), l.getClass(), l.getContent(), l.getDoubleContent(), l.getIntContent(), null, null, l.getDatatype(), l.getLocale(), l.getCreated());
- } else if(n instanceof KiWiDoubleLiteral) {
- KiWiDoubleLiteral l = (KiWiDoubleLiteral)n;
- row = createNodeList(l.getId(), l.getClass(), l.getContent(), l.getDoubleContent(), null, null, null, l.getDatatype(), l.getLocale(), l.getCreated());
- } else if(n instanceof KiWiBooleanLiteral) {
- KiWiBooleanLiteral l = (KiWiBooleanLiteral)n;
- row = createNodeList(l.getId(), l.getClass(), l.getContent(), null, null, null, l.booleanValue(), l.getDatatype(), l.getLocale(), l.getCreated());
- } else if(n instanceof KiWiDateLiteral) {
- KiWiDateLiteral l = (KiWiDateLiteral)n;
- row = createNodeList(l.getId(), l.getClass(), l.getContent(), null, null, l.getDateContent(), null, l.getDatatype(), l.getLocale(), l.getCreated());
- } else if(n instanceof KiWiStringLiteral) {
- KiWiStringLiteral l = (KiWiStringLiteral)n;
- row = createNodeList(l.getId(), l.getClass(), l.getContent(), null, null, null, null, l.getDatatype(), l.getLocale(), l.getCreated());
- } else {
- log.warn("unknown node type, cannot flush to import stream: {}", n.getClass());
- }
-
- if(row != null) {
- writer.write(row, nodeProcessors);
- }
- }
- writer.close();
- }
-
- private static List<Object> createNodeList(Long id, Class type, String content, Double dbl, Long lng, Date date, Boolean bool, URI dtype, Locale lang, Date created) {
- return Arrays.asList(new Object[]{
- id, type, content, dbl, lng, date, bool, dtype, lang, created
- });
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/6f2ef0d7/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/csv/PGCopyUtil.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/csv/PGCopyUtil.java b/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/csv/PGCopyUtil.java
new file mode 100644
index 0000000..075ecb1
--- /dev/null
+++ b/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/pgsql/csv/PGCopyUtil.java
@@ -0,0 +1,142 @@
+package org.apache.marmotta.kiwi.loader.pgsql.csv;
+
+import org.apache.marmotta.kiwi.model.rdf.KiWiAnonResource;
+import org.apache.marmotta.kiwi.model.rdf.KiWiBooleanLiteral;
+import org.apache.marmotta.kiwi.model.rdf.KiWiDateLiteral;
+import org.apache.marmotta.kiwi.model.rdf.KiWiDoubleLiteral;
+import org.apache.marmotta.kiwi.model.rdf.KiWiIntLiteral;
+import org.apache.marmotta.kiwi.model.rdf.KiWiNode;
+import org.apache.marmotta.kiwi.model.rdf.KiWiStringLiteral;
+import org.apache.marmotta.kiwi.model.rdf.KiWiTriple;
+import org.apache.marmotta.kiwi.model.rdf.KiWiUriResource;
+import org.openrdf.model.URI;
+import org.postgresql.PGConnection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.supercsv.cellprocessor.Optional;
+import org.supercsv.cellprocessor.constraint.NotNull;
+import org.supercsv.cellprocessor.constraint.Unique;
+import org.supercsv.cellprocessor.ift.CellProcessor;
+import org.supercsv.io.CsvListWriter;
+import org.supercsv.prefs.CsvPreference;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+import java.util.Locale;
+
+/**
+ * Add file description here!
+ *
+ * @author Sebastian Schaffert (sschaffert@apache.org)
+ */
+public class PGCopyUtil {
+
+ private static Logger log = LoggerFactory.getLogger(PGCopyUtil.class);
+
+
+ final static CellProcessor[] nodeProcessors = new CellProcessor[] {
+ new Unique(), // node ID
+ new NodeTypeProcessor(), // ntype
+ new NotNull(), // svalue
+ new Optional(), // dvalue
+ new Optional(), // ivalue
+ new SQLTimestampProcessor(), // tvalue
+ new Optional(new SQLBooleanProcessor()), // bvalue
+ new Optional(new NodeIDProcessor()), // ltype
+ new Optional(new LanguageProcessor()), // lang
+ new SQLTimestampProcessor(), // createdAt
+ };
+
+
+ final static CellProcessor[] tripleProcessors = new CellProcessor[] {
+ new Unique(), // triple ID
+ new NodeIDProcessor(), // subject
+ new NodeIDProcessor(), // predicate
+ new NodeIDProcessor(), // object
+ new Optional(new NodeIDProcessor()), // context
+ new Optional(new NodeIDProcessor()), // creator
+ new SQLBooleanProcessor(), // inferred
+ new SQLBooleanProcessor(), // deleted
+ new SQLTimestampProcessor(), // createdAt
+ new SQLTimestampProcessor(), // deletedAt
+ };
+
+
+ /**
+ * Return a PGConnection wrapped by the tomcat connection pool so we are able to access PostgreSQL specific functionality.
+ * @param con
+ * @return
+ */
+ public static PGConnection getWrappedConnection(Connection con) throws SQLException {
+ if(con instanceof PGConnection) {
+ return (PGConnection)con;
+ } else {
+ return (PGConnection) ((javax.sql.PooledConnection)con).getConnection();
+ }
+
+ }
+
+ public static void flushTriples(Iterable<KiWiTriple> tripleBacklog, OutputStream out) throws IOException {
+ CsvListWriter writer = new CsvListWriter(new OutputStreamWriter(out), CsvPreference.STANDARD_PREFERENCE);
+ for(KiWiTriple t : tripleBacklog) {
+ List<Object> row = Arrays.<Object>asList(
+ t.getId(), t.getSubject(), t.getPredicate(), t.getObject(), t.getContext(), t.getCreator(), t.isInferred(), t.isDeleted(), t.getCreated(), t.getDeletedAt()
+ );
+
+ if(row != null) {
+ writer.write(row, tripleProcessors);
+ }
+ }
+ writer.close();
+ }
+
+
+
+ public static void flushNodes(Iterable<KiWiNode> nodeBacklog, OutputStream out) throws IOException {
+ CsvListWriter writer = new CsvListWriter(new OutputStreamWriter(out), CsvPreference.STANDARD_PREFERENCE);
+ for(KiWiNode n : nodeBacklog) {
+ List<Object> row = null;
+ if(n instanceof KiWiUriResource) {
+ KiWiUriResource u = (KiWiUriResource)n;
+ row = createNodeList(u.getId(), u.getClass(), u.stringValue(), null, null, null, null, null, null, u.getCreated());
+ } else if(n instanceof KiWiAnonResource) {
+ KiWiAnonResource a = (KiWiAnonResource)n;
+ row = createNodeList(a.getId(), a.getClass(), a.stringValue(), null, null, null, null, null, null, a.getCreated());
+ } else if(n instanceof KiWiIntLiteral) {
+ KiWiIntLiteral l = (KiWiIntLiteral)n;
+ row = createNodeList(l.getId(), l.getClass(), l.getContent(), l.getDoubleContent(), l.getIntContent(), null, null, l.getDatatype(), l.getLocale(), l.getCreated());
+ } else if(n instanceof KiWiDoubleLiteral) {
+ KiWiDoubleLiteral l = (KiWiDoubleLiteral)n;
+ row = createNodeList(l.getId(), l.getClass(), l.getContent(), l.getDoubleContent(), null, null, null, l.getDatatype(), l.getLocale(), l.getCreated());
+ } else if(n instanceof KiWiBooleanLiteral) {
+ KiWiBooleanLiteral l = (KiWiBooleanLiteral)n;
+ row = createNodeList(l.getId(), l.getClass(), l.getContent(), null, null, null, l.booleanValue(), l.getDatatype(), l.getLocale(), l.getCreated());
+ } else if(n instanceof KiWiDateLiteral) {
+ KiWiDateLiteral l = (KiWiDateLiteral)n;
+ row = createNodeList(l.getId(), l.getClass(), l.getContent(), null, null, l.getDateContent(), null, l.getDatatype(), l.getLocale(), l.getCreated());
+ } else if(n instanceof KiWiStringLiteral) {
+ KiWiStringLiteral l = (KiWiStringLiteral)n;
+ row = createNodeList(l.getId(), l.getClass(), l.getContent(), null, null, null, null, l.getDatatype(), l.getLocale(), l.getCreated());
+ } else {
+ log.warn("unknown node type, cannot flush to import stream: {}", n.getClass());
+ }
+
+ if(row != null) {
+ writer.write(row, nodeProcessors);
+ }
+ }
+ writer.close();
+ }
+
+ private static List<Object> createNodeList(Long id, Class type, String content, Double dbl, Long lng, Date date, Boolean bool, URI dtype, Locale lang, Date created) {
+ return Arrays.asList(new Object[]{
+ id, type, content, dbl, lng, date, bool, dtype, lang, created
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/6f2ef0d7/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/CSVUtilTest.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/CSVUtilTest.java b/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/CSVUtilTest.java
deleted file mode 100644
index d646c84..0000000
--- a/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/CSVUtilTest.java
+++ /dev/null
@@ -1,118 +0,0 @@
-package org.apache.marmotta.kiwi.loader;
-
-import org.apache.commons.lang3.RandomStringUtils;
-import org.apache.marmotta.commons.vocabulary.XSD;
-import org.apache.marmotta.kiwi.loader.pgsql.csv.CSVUtil;
-import org.apache.marmotta.kiwi.model.rdf.KiWiAnonResource;
-import org.apache.marmotta.kiwi.model.rdf.KiWiBooleanLiteral;
-import org.apache.marmotta.kiwi.model.rdf.KiWiDateLiteral;
-import org.apache.marmotta.kiwi.model.rdf.KiWiDoubleLiteral;
-import org.apache.marmotta.kiwi.model.rdf.KiWiIntLiteral;
-import org.apache.marmotta.kiwi.model.rdf.KiWiNode;
-import org.apache.marmotta.kiwi.model.rdf.KiWiStringLiteral;
-import org.apache.marmotta.kiwi.model.rdf.KiWiUriResource;
-import org.junit.Test;
-
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-import java.util.Random;
-import java.util.UUID;
-
-/**
- * Add file description here!
- *
- * @author Sebastian Schaffert (sschaffert@apache.org)
- */
-public class CSVUtilTest {
-
-
- protected static Random rnd = new Random();
-
- protected static long id = 0;
-
-
- final static KiWiUriResource TYPE_INT = createURI(XSD.Integer.stringValue());
- final static KiWiUriResource TYPE_DBL = createURI(XSD.Double.stringValue());
- final static KiWiUriResource TYPE_BOOL = createURI(XSD.Boolean.stringValue());
- final static KiWiUriResource TYPE_DATE = createURI(XSD.DateTime.stringValue());
-
-
-
-
-
- @Test
- public void testWriteNodes() throws IOException {
- FileOutputStream out = new FileOutputStream("/tmp/nodes.csv");
-
-
- List<KiWiNode> nodes = new ArrayList<>(10000);
-
- nodes.add(TYPE_INT);
- nodes.add(TYPE_DBL);
- nodes.add(TYPE_BOOL);
- nodes.add(TYPE_DATE);
-
- // randomly create 10000 nodes
- for(int i=0; i<10000; i++) {
- nodes.add(randomObject());
- }
-
- // flush out nodes
- CSVUtil.flushNodes(nodes,out);
-
- out.close();
-
- }
-
-
-
- /**
- * Return a random URI, with a 10% chance of returning a URI that has already been used.
- * @return
- */
- protected static KiWiUriResource randomURI() {
- KiWiUriResource r = new KiWiUriResource("http://localhost/"+ RandomStringUtils.randomAlphanumeric(8));
- r.setId(id++);
- return r;
- }
-
-
- protected static KiWiUriResource createURI(String uri) {
- KiWiUriResource r = new KiWiUriResource(uri);
- r.setId(id++);
- return r;
- }
-
- /**
- * Return a random RDF value, either a reused object (10% chance) or of any other kind.
- * @return
- */
- protected static KiWiNode randomObject() {
- KiWiNode object;
- switch(rnd.nextInt(7)) {
- case 0: object = new KiWiUriResource("http://localhost/"+ RandomStringUtils.randomAlphanumeric(8));
- break;
- case 1: object = new KiWiAnonResource(UUID.randomUUID().toString());
- break;
- case 2: object = new KiWiStringLiteral(RandomStringUtils.randomAscii(40));
- break;
- case 3: object = new KiWiIntLiteral(rnd.nextLong(), TYPE_INT);
- break;
- case 4: object = new KiWiDoubleLiteral(rnd.nextDouble(), TYPE_DBL);
- break;
- case 5: object = new KiWiBooleanLiteral(rnd.nextBoolean(), TYPE_BOOL);
- break;
- case 6: object = new KiWiDateLiteral(new Date(), TYPE_DATE);
- break;
- default: object = new KiWiUriResource("http://localhost/"+ RandomStringUtils.randomAlphanumeric(8));
- break;
-
- }
- object.setId(id++);
- return object;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/6f2ef0d7/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/PGCopyUtilTest.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/PGCopyUtilTest.java b/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/PGCopyUtilTest.java
new file mode 100644
index 0000000..c264f45
--- /dev/null
+++ b/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/PGCopyUtilTest.java
@@ -0,0 +1,185 @@
+package org.apache.marmotta.kiwi.loader;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.marmotta.commons.vocabulary.XSD;
+import org.apache.marmotta.kiwi.config.KiWiConfiguration;
+import org.apache.marmotta.kiwi.loader.pgsql.csv.PGCopyUtil;
+import org.apache.marmotta.kiwi.model.rdf.KiWiAnonResource;
+import org.apache.marmotta.kiwi.model.rdf.KiWiBooleanLiteral;
+import org.apache.marmotta.kiwi.model.rdf.KiWiDateLiteral;
+import org.apache.marmotta.kiwi.model.rdf.KiWiDoubleLiteral;
+import org.apache.marmotta.kiwi.model.rdf.KiWiIntLiteral;
+import org.apache.marmotta.kiwi.model.rdf.KiWiNode;
+import org.apache.marmotta.kiwi.model.rdf.KiWiStringLiteral;
+import org.apache.marmotta.kiwi.model.rdf.KiWiUriResource;
+import org.apache.marmotta.kiwi.persistence.KiWiConnection;
+import org.apache.marmotta.kiwi.persistence.pgsql.PostgreSQLDialect;
+import org.apache.marmotta.kiwi.sail.KiWiStore;
+import org.apache.marmotta.kiwi.test.helper.DBConnectionChecker;
+import org.apache.marmotta.kiwi.test.junit.KiWiDatabaseRunner;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.openrdf.repository.RepositoryException;
+import org.openrdf.repository.sail.SailRepository;
+import org.openrdf.sail.SailException;
+import org.postgresql.copy.PGCopyOutputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Add file description here!
+ *
+ * @author Sebastian Schaffert (sschaffert@apache.org)
+ */
+public class PGCopyUtilTest {
+
+ private static Logger log = LoggerFactory.getLogger(PGCopyUtilTest.class);
+
+ protected static Random rnd = new Random();
+
+ protected static long id = 0;
+
+
+ final static KiWiUriResource TYPE_INT = createURI(XSD.Integer.stringValue());
+ final static KiWiUriResource TYPE_DBL = createURI(XSD.Double.stringValue());
+ final static KiWiUriResource TYPE_BOOL = createURI(XSD.Boolean.stringValue());
+ final static KiWiUriResource TYPE_DATE = createURI(XSD.DateTime.stringValue());
+
+
+
+ private KiWiStore store;
+
+ private SailRepository repository;
+
+ @Before
+ public void setup() throws RepositoryException {
+ log.info("creating test setup...");
+
+ KiWiConfiguration psql = KiWiDatabaseRunner.createKiWiConfig("PostgreSQL", new PostgreSQLDialect());
+ DBConnectionChecker.checkDatabaseAvailability(psql);
+
+ rnd = new Random();
+
+ store = new KiWiStore(psql);
+ repository = new SailRepository(store);
+ repository.initialize();
+ }
+
+ @After
+ public void dropDatabase() throws RepositoryException, SQLException, SailException {
+ log.info("cleaning up test setup...");
+ if (store != null && store.isInitialized()) {
+ assertTrue(store.checkConsistency());
+ store.closeValueFactory(); // release all connections before dropping the database
+ store.getPersistence().dropDatabase();
+ repository.shutDown();
+ }
+ }
+
+
+
+
+ @Test
+ public void testWriteNodes() throws IOException, SQLException {
+ KiWiConnection con = store.getPersistence().getConnection();
+
+ PGCopyOutputStream out = new PGCopyOutputStream(PGCopyUtil.getWrappedConnection(con.getJDBCConnection()), "COPY nodes FROM STDIN (FORMAT csv)");
+
+ long start = System.currentTimeMillis();
+
+ List<KiWiNode> nodes = new ArrayList<>(10000);
+
+ nodes.add(TYPE_INT);
+ nodes.add(TYPE_DBL);
+ nodes.add(TYPE_BOOL);
+ nodes.add(TYPE_DATE);
+
+ // randomly create 10000 nodes
+ for(int i=0; i<100000; i++) {
+ nodes.add(randomObject());
+ }
+
+ // flush out nodes
+ PGCopyUtil.flushNodes(nodes, out);
+
+ out.close();
+
+ long imported = System.currentTimeMillis();
+
+ log.info("imported {} nodes in {} ms", nodes.size(), imported-start);
+
+ // check if database contains the nodes (based on ID)
+
+ PreparedStatement stmt = con.getJDBCConnection().prepareStatement("SELECT * FROM nodes WHERE id = ?");
+ for(int i=0; i<id; i++) {
+ stmt.setLong(1, (long)i);
+ ResultSet dbResult = stmt.executeQuery();
+ Assert.assertTrue(dbResult.next());
+ }
+
+ log.info("checked {} nodes in {} ms", nodes.size(), System.currentTimeMillis()-imported);
+ }
+
+
+
+ /**
+ * Return a random URI, with a 10% chance of returning a URI that has already been used.
+ * @return
+ */
+ protected static KiWiUriResource randomURI() {
+ KiWiUriResource r = new KiWiUriResource("http://localhost/"+ RandomStringUtils.randomAlphanumeric(8));
+ r.setId(id++);
+ return r;
+ }
+
+
+ protected static KiWiUriResource createURI(String uri) {
+ KiWiUriResource r = new KiWiUriResource(uri);
+ r.setId(id++);
+ return r;
+ }
+
+ /**
+ * Return a random RDF value, either a reused object (10% chance) or of any other kind.
+ * @return
+ */
+ protected static KiWiNode randomObject() {
+ KiWiNode object;
+ switch(rnd.nextInt(7)) {
+ case 0: object = new KiWiUriResource("http://localhost/"+ RandomStringUtils.randomAlphanumeric(8));
+ break;
+ case 1: object = new KiWiAnonResource(UUID.randomUUID().toString());
+ break;
+ case 2: object = new KiWiStringLiteral(RandomStringUtils.randomAscii(40));
+ break;
+ case 3: object = new KiWiIntLiteral(rnd.nextLong(), TYPE_INT);
+ break;
+ case 4: object = new KiWiDoubleLiteral(rnd.nextDouble(), TYPE_DBL);
+ break;
+ case 5: object = new KiWiBooleanLiteral(rnd.nextBoolean(), TYPE_BOOL);
+ break;
+ case 6: object = new KiWiDateLiteral(new Date(), TYPE_DATE);
+ break;
+ default: object = new KiWiUriResource("http://localhost/"+ RandomStringUtils.randomAlphanumeric(8));
+ break;
+
+ }
+ object.setId(id++);
+ return object;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-marmotta/blob/6f2ef0d7/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/model/rdf/KiWiTriple.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/model/rdf/KiWiTriple.java b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/model/rdf/KiWiTriple.java
index 4f1b21c..2faf620 100644
--- a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/model/rdf/KiWiTriple.java
+++ b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/model/rdf/KiWiTriple.java
@@ -211,7 +211,7 @@ public class KiWiTriple implements Statement, Serializable {
* @return
*/
public Date getDeletedAt() {
- return new Date(deletedAt.getTime());
+ return deletedAt == null ? null : new Date(deletedAt.getTime());
}
/**
[3/3] git commit: Merge remote-tracking branch 'origin/develop' into
develop
Posted by ss...@apache.org.
Merge remote-tracking branch 'origin/develop' into develop
Project: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/commit/1d9325dc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/tree/1d9325dc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-marmotta/diff/1d9325dc
Branch: refs/heads/develop
Commit: 1d9325dc987ede775995e25c313af157c45b4752
Parents: 6f2ef0d 531b151
Author: Sebastian Schaffert <ss...@apache.org>
Authored: Fri Nov 15 20:46:38 2013 +0100
Committer: Sebastian Schaffert <ss...@apache.org>
Committed: Fri Nov 15 20:46:38 2013 +0100
----------------------------------------------------------------------
.../platform/core/webservices/resource/MetaWebService.java | 2 +-
.../platform/core/webservices/resource/ResourceWebService.java | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------