You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@marmotta.apache.org by ss...@apache.org on 2014/01/23 18:30:02 UTC
[2/4] git commit: added Marmotta Loader implementation as better
replacement for KiWiLoader
added Marmotta Loader implementation as better replacement for KiWiLoader
Project: http://git-wip-us.apache.org/repos/asf/marmotta/repo
Commit: http://git-wip-us.apache.org/repos/asf/marmotta/commit/ce4431e7
Tree: http://git-wip-us.apache.org/repos/asf/marmotta/tree/ce4431e7
Diff: http://git-wip-us.apache.org/repos/asf/marmotta/diff/ce4431e7
Branch: refs/heads/develop
Commit: ce4431e7d225c9edc51fac8ce3455baee8ef6fa2
Parents: d1403db
Author: Sebastian Schaffert <ss...@apache.org>
Authored: Thu Jan 23 18:21:38 2014 +0100
Committer: Sebastian Schaffert <ss...@apache.org>
Committed: Thu Jan 23 18:21:38 2014 +0100
----------------------------------------------------------------------
.../apache/marmotta/kiwi/loader/KiWiLoader.java | 33 +-
.../kiwi/loader/KiWiLoaderConfiguration.java | 22 -
.../kiwi/loader/generic/KiWiBatchHandler.java | 1 -
.../kiwi/loader/generic/KiWiHandler.java | 24 -
.../kiwi/loader/generic/Statistics.java | 223 --------
loader/marmotta-loader-berkeley/pom.xml | 84 +++
.../berkeley/BerkeleyDBLoaderBackend.java | 80 +++
...org.apache.marmotta.loader.api.LoaderBackend | 1 +
.../src/main/resources/logback.xml | 27 +
loader/marmotta-loader-core/pom.xml | 160 ++++++
.../marmotta/loader/api/LoaderBackend.java | 42 ++
.../marmotta/loader/api/LoaderHandler.java | 28 +
.../marmotta/loader/api/LoaderOptions.java | 64 +++
.../marmotta/loader/context/ContextHandler.java | 37 ++
.../marmotta/loader/core/MarmottaLoader.java | 530 +++++++++++++++++++
.../functions/BackendIdentifierFunction.java | 16 +
.../loader/sesame/SesameLoaderHandler.java | 39 ++
.../marmotta/loader/statistics/Statistics.java | 215 ++++++++
.../loader/statistics/StatisticsHandler.java | 69 +++
.../marmotta/loader/util/DirectoryFilter.java | 25 +
.../marmotta/loader/util/UnitFormatter.java | 44 ++
.../loader/wrapper/LoaderHandlerWrapper.java | 123 +++++
.../marmotta/loader/core/test/CLITest.java | 151 ++++++
.../marmotta/loader/core/test/LoadTest.java | 150 ++++++
.../core/test/dummy/DummyLoaderBackend.java | 51 ++
.../core/test/dummy/DummyLoaderHandler.java | 51 ++
...org.apache.marmotta.loader.api.LoaderBackend | 1 +
.../src/test/resources/demo-data.rdf | 78 +++
.../src/test/resources/demo-data.rdf.bz2 | Bin 0 -> 1469 bytes
.../src/test/resources/demo-data.rdf.gz | Bin 0 -> 1314 bytes
loader/marmotta-loader-hbase/pom.xml | 116 ++++
.../src/main/assembly/launcher.xml | 20 +
.../loader/hbase/HBaseLoaderBackend.java | 105 ++++
...org.apache.marmotta.loader.api.LoaderBackend | 1 +
.../src/main/resources/logback.xml | 32 ++
loader/marmotta-loader-kiwi/pom.xml | 140 +++++
.../marmotta/loader/kiwi/KiWiLoaderBackend.java | 89 ++++
.../marmotta/loader/kiwi/KiWiLoaderHandler.java | 190 +++++++
...org.apache.marmotta.loader.api.LoaderBackend | 1 +
loader/marmotta-loader-titan/pom.xml | 79 +++
.../loader/titan/TitanLoaderHandler.java | 111 ++++
.../marmotta/loader/titan/TitanRDFHandler.java | 274 ++++++++++
loader/pom.xml | 85 +++
pom.xml | 1 +
44 files changed, 3313 insertions(+), 300 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/marmotta/blob/ce4431e7/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/KiWiLoader.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/KiWiLoader.java b/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/KiWiLoader.java
index f317079..9481f88 100644
--- a/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/KiWiLoader.java
+++ b/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/KiWiLoader.java
@@ -17,13 +17,7 @@
*/
package org.apache.marmotta.kiwi.loader;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.commons.cli.PosixParser;
+import org.apache.commons.cli.*;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
import org.apache.commons.configuration.Configuration;
@@ -40,24 +34,12 @@ import org.apache.marmotta.kiwi.persistence.pgsql.PostgreSQLDialect;
import org.apache.marmotta.kiwi.sail.KiWiStore;
import org.openrdf.repository.RepositoryException;
import org.openrdf.repository.sail.SailRepository;
-import org.openrdf.rio.RDFFormat;
-import org.openrdf.rio.RDFHandlerException;
-import org.openrdf.rio.RDFParseException;
-import org.openrdf.rio.RDFParser;
-import org.openrdf.rio.Rio;
-import org.openrdf.rio.UnsupportedRDFormatException;
+import org.openrdf.rio.*;
import org.openrdf.rio.helpers.BasicParserSettings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.BufferedInputStream;
-import java.io.Console;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FilenameFilter;
-import java.io.IOException;
-import java.io.InputStream;
+import java.io.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.regex.Pattern;
@@ -123,9 +105,6 @@ public class KiWiLoader {
protected String context;
protected boolean isVersioningEnabled;
protected boolean isReasoningEnabled;
- protected boolean isStatisticsEnabled;
-
- protected String statisticsGraph;
protected KiWiStore store;
protected SailRepository repository;
@@ -138,7 +117,6 @@ public class KiWiLoader {
isVersioningEnabled = false;
isReasoningEnabled = false;
- isStatisticsEnabled = false;
}
/**
@@ -258,8 +236,6 @@ public class KiWiLoader {
KiWiLoader loader = new KiWiLoader(kiwi, baseUri, context);
loader.setVersioningEnabled(cmd.hasOption("versioning"));
loader.setReasoningEnabled(cmd.hasOption("reasoning"));
- loader.isStatisticsEnabled = cmd.hasOption("statistics");
- loader.statisticsGraph = cmd.getOptionValue("statistics");
loader.initialize();
log.info("Starting import");
@@ -461,8 +437,6 @@ public class KiWiLoader {
if (context != null) {
config.setContext(context);
}
- config.setStatistics(isStatisticsEnabled);
- config.setStatisticsGraph(statisticsGraph);
if(kiwi.getDialect() instanceof PostgreSQLDialect) {
config.setCommitBatchSize(100000);
@@ -579,7 +553,6 @@ public class KiWiLoader {
options.addOption(null, "reasoning", false, "enable reasoning");
options.addOption(null, "versioning", false, "enable versioning");
- options.addOption("S", "statistics", true, "enable statistics collection");
options.addOption(null, "drop-indexes", false, "drop indexes before importing (increases bulk-load performance but requires exclusive access)");
http://git-wip-us.apache.org/repos/asf/marmotta/blob/ce4431e7/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/KiWiLoaderConfiguration.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/KiWiLoaderConfiguration.java b/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/KiWiLoaderConfiguration.java
index 0cddd94..3c2198c 100644
--- a/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/KiWiLoaderConfiguration.java
+++ b/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/KiWiLoaderConfiguration.java
@@ -96,26 +96,4 @@ public class KiWiLoaderConfiguration {
}
- /**
- * Statistics collection (using rrd) enabled? Will generate performance graphs at certain time intervals.
- *
- * @return
- */
- public boolean isStatistics() {
- return config.getBoolean(LOADER_STATISTICS_ENABLED, false);
- }
-
- public void setStatistics(boolean v) {
- config.setProperty(LOADER_STATISTICS_ENABLED,v);
- }
-
-
- public String getStatisticsGraph() {
- return config.getString(LOADER_STATISTICS_GRAPH, "kiwiloader.png");
- }
-
- public void setStatisticsGraph(String path) {
- config.setProperty(LOADER_STATISTICS_GRAPH, path);
- }
-
}
http://git-wip-us.apache.org/repos/asf/marmotta/blob/ce4431e7/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/generic/KiWiBatchHandler.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/generic/KiWiBatchHandler.java b/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/generic/KiWiBatchHandler.java
index 4cbd29d..23b85f2 100644
--- a/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/generic/KiWiBatchHandler.java
+++ b/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/generic/KiWiBatchHandler.java
@@ -218,7 +218,6 @@ public abstract class KiWiBatchHandler extends KiWiHandler implements RDFHandler
connection.commit();
}
- printStatistics();
}
}
http://git-wip-us.apache.org/repos/asf/marmotta/blob/ce4431e7/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/generic/KiWiHandler.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/generic/KiWiHandler.java b/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/generic/KiWiHandler.java
index b9ee1f1..488c974 100644
--- a/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/generic/KiWiHandler.java
+++ b/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/generic/KiWiHandler.java
@@ -24,8 +24,6 @@ import java.util.IllformedLocaleException;
import java.util.Locale;
import java.util.concurrent.ExecutionException;
-import static org.apache.marmotta.kiwi.loader.util.UnitFormatter.formatSize;
-
/**
* A fast-lane RDF import handler that allows bulk-importing triples into a KiWi triplestore. It directly accesses
* the database using a KiWiConnection. Note that certain configuration options will make the import "unsafe"
@@ -55,8 +53,6 @@ public class KiWiHandler implements RDFHandler {
// if non-null, all imported statements will have this context (regardless whether they specified a different context)
private KiWiResource overrideContext;
- private Statistics statistics;
-
// only used when statement existance check is enabled
protected DBTripleRegistry registry;
@@ -104,10 +100,6 @@ public class KiWiHandler implements RDFHandler {
}
- if(config.isStatistics()) {
- statistics = new Statistics(this);
- statistics.startSampling();
- }
initialised = true;
}
@@ -125,9 +117,6 @@ public class KiWiHandler implements RDFHandler {
initialised = false;
- if(config.isStatistics() && statistics != null) {
- statistics.stopSampling();
- }
}
/**
@@ -455,23 +444,10 @@ public class KiWiHandler implements RDFHandler {
}
connection.commit();
-
- printStatistics();
}
}
- protected void printStatistics() {
- if(statistics != null) {
- statistics.printStatistics();
- } else {
- log.info("imported {} triples ({}/sec)", triples, formatSize((config.getCommitBatchSize() * 1000) / (System.currentTimeMillis() - previous)) );
- previous = System.currentTimeMillis();
- }
-
-
- }
-
/**
* Handles a comment.
http://git-wip-us.apache.org/repos/asf/marmotta/blob/ce4431e7/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/generic/Statistics.java
----------------------------------------------------------------------
diff --git a/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/generic/Statistics.java b/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/generic/Statistics.java
deleted file mode 100644
index 1c2353a..0000000
--- a/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/generic/Statistics.java
+++ /dev/null
@@ -1,223 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.marmotta.kiwi.loader.generic;
-
-import org.apache.marmotta.kiwi.loader.util.UnitFormatter;
-import org.rrd4j.ConsolFun;
-import org.rrd4j.DsType;
-import org.rrd4j.core.*;
-import org.rrd4j.graph.RrdGraph;
-import org.rrd4j.graph.RrdGraphDef;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.imageio.ImageIO;
-import java.awt.*;
-import java.awt.image.BufferedImage;
-import java.io.File;
-import java.io.IOException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Collect statistics from a KiWiHandler by sampling at given time intervals and logging to a RRD database.
- *
- * @author Sebastian Schaffert (sschaffert@apache.org)
- */
-public class Statistics {
-
- private static Logger log = LoggerFactory.getLogger(Statistics.class);
-
- private KiWiHandler handler;
-
-
- protected RrdDb statDB;
- protected Sample statSample;
- protected long statLastDump;
-
- protected long SAMPLE_INTERVAL = TimeUnit.SECONDS.toSeconds(5L);
- protected long DIAGRAM_INTERVAL = TimeUnit.MINUTES.toSeconds(5L);
-
- protected ScheduledExecutorService statSampler;
-
- private long start, previous;
-
- public Statistics(KiWiHandler handler) {
- this.handler = handler;
- }
-
-
- public void startSampling() {
- log.info("statistics gathering enabled; starting statistics database");
-
- this.start = System.currentTimeMillis();
- this.previous = System.currentTimeMillis();
-
- File statFile = new File("kiwiloader.rrd");
- if(statFile.exists()) {
- log.info("deleting old statistics database");
- statFile.delete();
- }
-
- RrdDef stCfg = new RrdDef("kiwiloader.rrd");
- stCfg.setStep(SAMPLE_INTERVAL);
- stCfg.addDatasource("triples", DsType.COUNTER, 600, Double.NaN, Double.NaN);
- stCfg.addDatasource("nodes", DsType.COUNTER, 600, Double.NaN, Double.NaN);
- stCfg.addDatasource("nodes-loaded", DsType.COUNTER, 600, Double.NaN, Double.NaN);
- stCfg.addArchive(ConsolFun.AVERAGE, 0.5, 1, 1440); // every five seconds for 2 hours
- stCfg.addArchive(ConsolFun.AVERAGE, 0.5, 12, 1440); // every minute for 1 day
- stCfg.addArchive(ConsolFun.AVERAGE, 0.5, 60, 1440); // every five minutes for five days
-
- try {
- statDB = new RrdDb(stCfg);
- statSample = statDB.createSample();
- statLastDump = System.currentTimeMillis();
-
- // start a sampler thread to run at the SAMPLE_INTERVAL
- statSampler = Executors.newScheduledThreadPool(2);
- statSampler.scheduleAtFixedRate(new StatisticsUpdater(),0, SAMPLE_INTERVAL, TimeUnit.SECONDS);
-
- // create a statistics diagram every 5 minutes
- statSampler.scheduleAtFixedRate(new DiagramUpdater(),DIAGRAM_INTERVAL,DIAGRAM_INTERVAL,TimeUnit.SECONDS);
- } catch (IOException e) {
- log.warn("could not initialize statistics database: {}",e.getMessage());
- }
-
- }
-
- public void stopSampling() {
- DiagramUpdater du = new DiagramUpdater();
- du.run();
-
- if(statDB != null) {
- try {
- statDB.close();
- } catch (IOException e) {
- log.warn("could not close statistics database...");
- }
- }
- if(statSampler != null) {
- statSampler.shutdown();
- }
- }
-
- public void printStatistics() {
- if(statSample != null) {
- try {
- long time = System.currentTimeMillis() / 1000;
-
- FetchRequest minRequest = statDB.createFetchRequest(ConsolFun.AVERAGE, time - 60 , time);
- FetchData minData = minRequest.fetchData();
- double triplesLastMin = minData.getAggregate("triples", ConsolFun.AVERAGE);
-
- FetchRequest hourRequest = statDB.createFetchRequest(ConsolFun.AVERAGE, time - (60 * 60) , time);
- FetchData hourData = hourRequest.fetchData();
- double triplesLastHour = hourData.getAggregate("triples", ConsolFun.AVERAGE);
-
- if(triplesLastMin != Double.NaN) {
- log.info("imported {} triples; statistics: {}/sec, {}/sec (last min), {}/sec (last hour)", UnitFormatter.formatSize(handler.triples), UnitFormatter.formatSize((handler.config.getCommitBatchSize() * 1000) / (System.currentTimeMillis() - previous)), UnitFormatter.formatSize(triplesLastMin), UnitFormatter.formatSize(triplesLastHour));
- } else {
- log.info("imported {} triples ({}/sec, no long-time averages available)", UnitFormatter.formatSize(handler.triples), UnitFormatter.formatSize((handler.config.getCommitBatchSize() * 1000) / (System.currentTimeMillis() - previous)));
- }
- previous = System.currentTimeMillis();
-
- } catch (IOException e) {
- log.warn("error updating statistics: {}", e.getMessage());
- }
- } else {
- }
-
-
- }
-
-
- private class StatisticsUpdater implements Runnable {
- @Override
- public void run() {
-
- try {
- long time = System.currentTimeMillis() / 1000;
-
- synchronized (statSample) {
- statSample.setTime(time);
- statSample.setValues(handler.triples, handler.nodes, handler.nodesLoaded);
- statSample.update();
- }
-
- } catch (Exception e) {
- log.warn("could not update statistics database: {}", e.getMessage());
- }
- }
- }
-
-
- private class DiagramUpdater implements Runnable {
- @Override
- public void run() {
- try {
- File gFile = new File(handler.config.getStatisticsGraph());
-
- if(gFile.exists()) {
- gFile.delete();
- }
-
- // generate PNG diagram
- RrdGraphDef gDef = new RrdGraphDef();
- gDef.setFilename("-");
- gDef.setWidth(800);
- gDef.setHeight(600);
- gDef.setStartTime(start / 1000);
- gDef.setEndTime(System.currentTimeMillis() / 1000);
- gDef.setTitle("KiWiLoader Performance");
- gDef.setVerticalLabel("number/sec");
- gDef.setAntiAliasing(true);
-
-
- gDef.datasource("triples", "kiwiloader.rrd", "triples", ConsolFun.AVERAGE);
- gDef.datasource("nodes", "kiwiloader.rrd", "nodes", ConsolFun.AVERAGE);
- gDef.datasource("nodes-loaded", "kiwiloader.rrd", "nodes-loaded", ConsolFun.AVERAGE);
- gDef.datasource("cache-hits", "kiwiloader.rrd", "cache-hits", ConsolFun.AVERAGE);
- gDef.datasource("cache-misses", "kiwiloader.rrd", "cache-misses", ConsolFun.AVERAGE);
-
- gDef.line("triples", Color.BLUE, "Triples Written", 3F);
- gDef.line("nodes", Color.MAGENTA, "Nodes Written", 3F);
- gDef.line("nodes-loaded", Color.CYAN, "Nodes Loaded", 3F);
- gDef.line("cache-hits", Color.GREEN, "Node Cache Hits");
- gDef.line("cache-misses", Color.ORANGE, "Node Cache Misses");
-
-
- gDef.setImageFormat("png");
- gDef.gprint("triples", ConsolFun.AVERAGE, "average triples/sec: %,.0f\\l");
- gDef.gprint("nodes", ConsolFun.AVERAGE, "average nodes/sec: %,.0f\\l");
-
- RrdGraph graph = new RrdGraph(gDef);
- BufferedImage img = new BufferedImage(900,750, BufferedImage.TYPE_INT_RGB);
- graph.render(img.getGraphics());
- ImageIO.write(img, "png", gFile);
-
- log.info("updated statistics diagram generated in {}", handler.config.getStatisticsGraph());
-
- statLastDump = System.currentTimeMillis();
- } catch (Exception ex) {
- log.warn("error creating statistics diagram: {}", ex.getMessage());
- }
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/marmotta/blob/ce4431e7/loader/marmotta-loader-berkeley/pom.xml
----------------------------------------------------------------------
diff --git a/loader/marmotta-loader-berkeley/pom.xml b/loader/marmotta-loader-berkeley/pom.xml
new file mode 100644
index 0000000..adf4418
--- /dev/null
+++ b/loader/marmotta-loader-berkeley/pom.xml
@@ -0,0 +1,84 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.marmotta</groupId>
+ <artifactId>marmotta-parent</artifactId>
+ <version>3.2.0-SNAPSHOT</version>
+ <relativePath>../../parent</relativePath>
+ </parent>
+
+ <artifactId>marmotta-loader-berkeley</artifactId>
+ <name>Loader: BerkeleyDB Backend</name>
+
+ <description>
+ Apache Marmotta bulk loader backend for loading large datasets into a Titan/BerkeleyDB backend.
+ </description>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>2.2</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <transformers>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer"/>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <mainClass>org.apache.marmotta.loader.core.MarmottaLoader</mainClass>
+ </transformer>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer">
+ <addHeader>false</addHeader>
+ </transformer>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.marmotta</groupId>
+ <artifactId>marmotta-loader-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.marmotta</groupId>
+ <artifactId>marmotta-loader-titan</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.thinkaurelius.titan</groupId>
+ <artifactId>titan-berkeleyje</artifactId>
+ <version>${titan.version}</version>
+ </dependency>
+
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/marmotta/blob/ce4431e7/loader/marmotta-loader-berkeley/src/main/java/org/apache/marmotta/loader/berkeley/BerkeleyDBLoaderBackend.java
----------------------------------------------------------------------
diff --git a/loader/marmotta-loader-berkeley/src/main/java/org/apache/marmotta/loader/berkeley/BerkeleyDBLoaderBackend.java b/loader/marmotta-loader-berkeley/src/main/java/org/apache/marmotta/loader/berkeley/BerkeleyDBLoaderBackend.java
new file mode 100644
index 0000000..b6397a5
--- /dev/null
+++ b/loader/marmotta-loader-berkeley/src/main/java/org/apache/marmotta/loader/berkeley/BerkeleyDBLoaderBackend.java
@@ -0,0 +1,80 @@
+package org.apache.marmotta.loader.berkeley;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.MapConfiguration;
+import org.apache.marmotta.loader.api.LoaderBackend;
+import org.apache.marmotta.loader.api.LoaderHandler;
+import org.apache.marmotta.loader.titan.TitanLoaderHandler;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Add file description here!
+ *
+ * @author Sebastian Schaffert (sschaffert@apache.org)
+ */
+public class BerkeleyDBLoaderBackend implements LoaderBackend {
+
+
+ /**
+ * Return a unique identifier for the loader; used for identifying the loader to choose on the command line
+ * in case more than one loader implementation is available.
+ * <p/>
+ * Should match with the regular expression [a-z][a-z0-9]*
+ *
+ * @return
+ */
+ @Override
+ public String getIdentifier() {
+ return "berkeley";
+ }
+
+ /**
+ * Create the RDFHandler to be used for bulk-loading, optionally using the configuration passed as argument.
+ *
+ * @param configuration
+ * @return a newly created RDFHandler instance
+ */
+ @Override
+ public LoaderHandler createLoader(Configuration configuration) {
+
+ Configuration titanCfg = new MapConfiguration(new HashMap<String,Object>());
+ titanCfg.setProperty("storage.backend", "berkeleyje");
+ //titanCfg.setProperty("storage.batch-loading", true);
+
+ if(configuration.containsKey("backend.berkeley.storage-directory")) {
+ titanCfg.setProperty("storage.directory", configuration.getString("backend.berkeley.storage-directory"));
+ }
+
+ titanCfg.setProperty("storage.buffer-size", 100000);
+
+ return new TitanLoaderHandler(titanCfg);
+ }
+
+ /**
+ * Return any additional options that this backend offers (e.g. for connecting to a database etc).
+ * If there are no additional options, return an empty collection.
+ *
+ * @return
+ */
+ @Override
+ public Collection<Option> getOptions() {
+ Set<Option> options = new HashSet<>();
+
+ Option directory =
+ OptionBuilder.withArgName("storage-directory")
+ .hasArgs(1)
+ .withDescription("directory where to store the berkeley database")
+ .withLongOpt("storage-directory")
+ .create('S');
+ options.add(directory);
+
+
+ return options;
+ }
+}
http://git-wip-us.apache.org/repos/asf/marmotta/blob/ce4431e7/loader/marmotta-loader-berkeley/src/main/resources/META-INF/services/org.apache.marmotta.loader.api.LoaderBackend
----------------------------------------------------------------------
diff --git a/loader/marmotta-loader-berkeley/src/main/resources/META-INF/services/org.apache.marmotta.loader.api.LoaderBackend b/loader/marmotta-loader-berkeley/src/main/resources/META-INF/services/org.apache.marmotta.loader.api.LoaderBackend
new file mode 100644
index 0000000..c9b7bd6
--- /dev/null
+++ b/loader/marmotta-loader-berkeley/src/main/resources/META-INF/services/org.apache.marmotta.loader.api.LoaderBackend
@@ -0,0 +1 @@
+org.apache.marmotta.loader.berkeley.BerkeleyDBLoaderBackend
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/marmotta/blob/ce4431e7/loader/marmotta-loader-berkeley/src/main/resources/logback.xml
----------------------------------------------------------------------
diff --git a/loader/marmotta-loader-berkeley/src/main/resources/logback.xml b/loader/marmotta-loader-berkeley/src/main/resources/logback.xml
new file mode 100644
index 0000000..1bfecff
--- /dev/null
+++ b/loader/marmotta-loader-berkeley/src/main/resources/logback.xml
@@ -0,0 +1,27 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<configuration>
+ <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%d{HH:mm:ss.SSS} %highlight(%level) %cyan(%logger{15}) - %m%n</pattern>
+ </encoder>
+ </appender>
+ <root level="${root-level:-INFO}">
+ <appender-ref ref="CONSOLE"/>
+ </root>
+</configuration>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/marmotta/blob/ce4431e7/loader/marmotta-loader-core/pom.xml
----------------------------------------------------------------------
diff --git a/loader/marmotta-loader-core/pom.xml b/loader/marmotta-loader-core/pom.xml
new file mode 100644
index 0000000..af87efb
--- /dev/null
+++ b/loader/marmotta-loader-core/pom.xml
@@ -0,0 +1,160 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.marmotta</groupId>
+ <artifactId>marmotta-parent</artifactId>
+ <version>3.2.0-SNAPSHOT</version>
+ <relativePath>../../parent</relativePath>
+ </parent>
+
+ <artifactId>marmotta-loader-core</artifactId>
+ <name>Loader: Core Library</name>
+
+ <description>
+ Apache Marmotta bulk loader core functionalities, which need to be complemented with backend libraries
+ implementing the concrete loading functionality.
+ </description>
+
+
+ <dependencies>
+ <dependency>
+ <groupId>org.openrdf.sesame</groupId>
+ <artifactId>sesame-model</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.openrdf.sesame</groupId>
+ <artifactId>sesame-repository-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.marmotta</groupId>
+ <artifactId>marmotta-commons</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>commons-configuration</groupId>
+ <artifactId>commons-configuration</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-compress</artifactId>
+ </dependency>
+
+ <!-- Performance Statistics -->
+ <dependency>
+ <groupId>org.rrd4j</groupId>
+ <artifactId>rrd4j</artifactId>
+ <version>2.2</version>
+ </dependency>
+
+ <!-- command line parsing -->
+ <dependency>
+ <groupId>commons-cli</groupId>
+ <artifactId>commons-cli</artifactId>
+ <version>1.2</version>
+ </dependency>
+
+
+ <!-- logging -->
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-ext</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>jcl-over-slf4j</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>log4j-over-slf4j</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>jul-to-slf4j</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ </dependency>
+
+ <!-- Sesame Parsers -->
+ <dependency>
+ <groupId>org.openrdf.sesame</groupId>
+ <artifactId>sesame-rio-rdfxml</artifactId>
+ <scope>runtime</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.openrdf.sesame</groupId>
+ <artifactId>sesame-rio-turtle</artifactId>
+ <scope>runtime</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.openrdf.sesame</groupId>
+ <artifactId>sesame-rio-n3</artifactId>
+ <scope>runtime</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.openrdf.sesame</groupId>
+ <artifactId>sesame-rio-nquads</artifactId>
+ <scope>runtime</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.openrdf.sesame</groupId>
+ <artifactId>sesame-rio-ntriples</artifactId>
+ <scope>runtime</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.openrdf.sesame</groupId>
+ <artifactId>sesame-rio-rdfjson</artifactId>
+ <scope>runtime</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.openrdf.sesame</groupId>
+ <artifactId>sesame-rio-trig</artifactId>
+ <scope>runtime</scope>
+ </dependency>
+
+
+ <!-- Testing -->
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-library</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/marmotta/blob/ce4431e7/loader/marmotta-loader-core/src/main/java/org/apache/marmotta/loader/api/LoaderBackend.java
----------------------------------------------------------------------
diff --git a/loader/marmotta-loader-core/src/main/java/org/apache/marmotta/loader/api/LoaderBackend.java b/loader/marmotta-loader-core/src/main/java/org/apache/marmotta/loader/api/LoaderBackend.java
new file mode 100644
index 0000000..24eb4bb
--- /dev/null
+++ b/loader/marmotta-loader-core/src/main/java/org/apache/marmotta/loader/api/LoaderBackend.java
@@ -0,0 +1,42 @@
+package org.apache.marmotta.loader.api;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.configuration.Configuration;
+
+import java.util.Collection;
+
+/**
+ * Specification for loader backends. Implementations will be injected using the Java ServiceLoader API
+ * and provide singleton factories to create RDFHandlers.
+ *
+ * @author Sebastian Schaffert (sschaffert@apache.org)
+ */
+public interface LoaderBackend {
+
+ /**
+ * Return a unique identifier for the loader; used for identifying the loader to choose on the command line
+ * in case more than one loader implementation is available.
+ * <p/>
+ * Should match with the regular expression [a-z][a-z0-9]*
+ *
+ * @return
+ */
+ public String getIdentifier();
+
+ /**
+ * Create the RDFHandler to be used for bulk-loading, optionally using the configuration passed as argument.
+ *
+ * @return a newly created RDFHandler instance
+ */
+ public LoaderHandler createLoader(Configuration configuration);
+
+
+ /**
+ * Return any additional options that this backend offers (e.g. for connecting to a database etc).
+ * If there are no additional options, return an empty collection.
+ *
+ * @return
+ */
+ public Collection<Option> getOptions();
+
+}
http://git-wip-us.apache.org/repos/asf/marmotta/blob/ce4431e7/loader/marmotta-loader-core/src/main/java/org/apache/marmotta/loader/api/LoaderHandler.java
----------------------------------------------------------------------
diff --git a/loader/marmotta-loader-core/src/main/java/org/apache/marmotta/loader/api/LoaderHandler.java b/loader/marmotta-loader-core/src/main/java/org/apache/marmotta/loader/api/LoaderHandler.java
new file mode 100644
index 0000000..deb1a2f
--- /dev/null
+++ b/loader/marmotta-loader-core/src/main/java/org/apache/marmotta/loader/api/LoaderHandler.java
@@ -0,0 +1,28 @@
+package org.apache.marmotta.loader.api;
+
+import org.openrdf.rio.RDFHandler;
+import org.openrdf.rio.RDFHandlerException;
+
+/**
+ * Add file description here!
+ *
+ * @author Sebastian Schaffert (sschaffert@apache.org)
+ */
+public interface LoaderHandler extends RDFHandler {
+
+ /**
+ * Initialise the handler, performing any initialisation steps that are necessary before bulk importing can
+ * start (e.g. dropping indexes or establishing a connection).
+ *
+ * @throws RDFHandlerException
+ */
+ public void initialise() throws RDFHandlerException;
+
+ /**
+ * Peform cleanup on shutdown, e.g. re-creating indexes after import completed or freeing resources acquired by
+ * the handler.
+ */
+ public void shutdown() throws RDFHandlerException;
+
+
+}
http://git-wip-us.apache.org/repos/asf/marmotta/blob/ce4431e7/loader/marmotta-loader-core/src/main/java/org/apache/marmotta/loader/api/LoaderOptions.java
----------------------------------------------------------------------
diff --git a/loader/marmotta-loader-core/src/main/java/org/apache/marmotta/loader/api/LoaderOptions.java b/loader/marmotta-loader-core/src/main/java/org/apache/marmotta/loader/api/LoaderOptions.java
new file mode 100644
index 0000000..b8f33cb
--- /dev/null
+++ b/loader/marmotta-loader-core/src/main/java/org/apache/marmotta/loader/api/LoaderOptions.java
@@ -0,0 +1,64 @@
+package org.apache.marmotta.loader.api;
+
+/**
+ * Contains configuration keys that can be used in a loader configuration.
+ *
+ * @author Sebastian Schaffert (sschaffert@apache.org)
+ */
+public class LoaderOptions {
+
+ /**
+ * Base URI to use. Configuration values need to be proper URI Strings.
+ */
+ public static final String BASE_URI = "loader.base";
+
+ /**
+ * Compression used by input. Either gzip, bzip2, or auto/null.
+ */
+ public static final String COMPRESSION = "loader.compression";
+
+ /**
+ * Backend to use by loader (in case it cannot be auto-detected)
+ */
+ public static final String BACKEND = "loader.backend";
+
+
+ /**
+ * Optional URI of context to import data into.
+ */
+ public static final String CONTEXT = "loader.context";
+
+
+ /**
+ * MIME type of format to import (auto-guessing if not given)
+ */
+ public static final String FORMAT = "loader.format";
+
+
+ /**
+ * Paths to files to import.
+ */
+ public static final String FILES = "loader.files";
+
+
+ /**
+ * Paths to directories to import
+ */
+ public static final String DIRS = "loader.dirs";
+
+ /**
+ * Enable statistics collection. Configuration value needs to be a boolean.
+ */
+ public static final String STATISTICS_ENABLED = "loader.statistics.enabled";
+
+ /**
+ * Write statistics graph into this file. Configuration option must be the path to a writeable file.
+ */
+ public static final String STATISTICS_GRAPH = "loader.statistics.graph";
+
+ /**
+ * Interval at which to write out statistics information.
+ */
+ public static final String STATISTICS_INTERVAL = "loader.statistics.interval";
+
+}
http://git-wip-us.apache.org/repos/asf/marmotta/blob/ce4431e7/loader/marmotta-loader-core/src/main/java/org/apache/marmotta/loader/context/ContextHandler.java
----------------------------------------------------------------------
diff --git a/loader/marmotta-loader-core/src/main/java/org/apache/marmotta/loader/context/ContextHandler.java b/loader/marmotta-loader-core/src/main/java/org/apache/marmotta/loader/context/ContextHandler.java
new file mode 100644
index 0000000..18d5941
--- /dev/null
+++ b/loader/marmotta-loader-core/src/main/java/org/apache/marmotta/loader/context/ContextHandler.java
@@ -0,0 +1,37 @@
+package org.apache.marmotta.loader.context;
+
+import org.apache.marmotta.loader.api.LoaderHandler;
+import org.apache.marmotta.loader.wrapper.LoaderHandlerWrapper;
+import org.openrdf.model.Statement;
+import org.openrdf.model.URI;
+import org.openrdf.model.impl.ContextStatementImpl;
+import org.openrdf.rio.RDFHandlerException;
+
+/**
+ * A handler adding a pre-defined context to each statement
+ *
+ * @author Sebastian Schaffert (sschaffert@apache.org)
+ */
+public class ContextHandler extends LoaderHandlerWrapper {
+
+ private URI context;
+
+ public ContextHandler(LoaderHandler handler, URI context) {
+ super(handler);
+ this.context = context;
+ }
+
+
+ /**
+ * Handles a statement.
+ *
+ * @param st The statement.
+ * @throws org.openrdf.rio.RDFHandlerException If the RDF handler has encountered an unrecoverable error.
+ */
+ @Override
+ public void handleStatement(Statement st) throws RDFHandlerException {
+ Statement wrapped = new ContextStatementImpl(st.getSubject(),st.getPredicate(),st.getObject(), context);
+
+ super.handleStatement(wrapped);
+ }
+}
http://git-wip-us.apache.org/repos/asf/marmotta/blob/ce4431e7/loader/marmotta-loader-core/src/main/java/org/apache/marmotta/loader/core/MarmottaLoader.java
----------------------------------------------------------------------
diff --git a/loader/marmotta-loader-core/src/main/java/org/apache/marmotta/loader/core/MarmottaLoader.java b/loader/marmotta-loader-core/src/main/java/org/apache/marmotta/loader/core/MarmottaLoader.java
new file mode 100644
index 0000000..dd9af5a
--- /dev/null
+++ b/loader/marmotta-loader-core/src/main/java/org/apache/marmotta/loader/core/MarmottaLoader.java
@@ -0,0 +1,530 @@
+package org.apache.marmotta.loader.core;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.commons.cli.*;
+import org.apache.commons.compress.compressors.CompressorException;
+import org.apache.commons.compress.compressors.CompressorStreamFactory;
+import org.apache.commons.compress.compressors.bzip2.BZip2Utils;
+import org.apache.commons.compress.compressors.gzip.GzipUtils;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.MapConfiguration;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.marmotta.loader.api.LoaderBackend;
+import org.apache.marmotta.loader.api.LoaderHandler;
+import org.apache.marmotta.loader.api.LoaderOptions;
+import org.apache.marmotta.loader.context.ContextHandler;
+import org.apache.marmotta.loader.functions.BackendIdentifierFunction;
+import org.apache.marmotta.loader.statistics.StatisticsHandler;
+import org.apache.marmotta.loader.util.DirectoryFilter;
+import org.openrdf.model.impl.URIImpl;
+import org.openrdf.rio.*;
+import org.openrdf.rio.helpers.BasicParserSettings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Add file description here!
+ *
+ * @author Sebastian Schaffert (sschaffert@apache.org)
+ */
+public class MarmottaLoader {
+
+ private static ServiceLoader<LoaderBackend> backends = ServiceLoader.load(LoaderBackend.class);
+
+
+ private static Logger log = LoggerFactory.getLogger(MarmottaLoader.class);
+
+
+ private Configuration configuration;
+
+ public MarmottaLoader(Configuration configuration) {
+ this.configuration = configuration;
+ }
+
+ /**
+ * Load data according to the specification given in the configuration from all directories and files specified
+ * in the configuration. Returns the handler used for importing (in case further operations should be performed).
+ */
+ public LoaderHandler load() throws RDFHandlerException {
+ LoaderHandler handler = getLoader();
+
+ if(configuration.containsKey(LoaderOptions.CONTEXT)) {
+ handler = new ContextHandler(handler, new URIImpl(configuration.getString(LoaderOptions.CONTEXT)));
+ }
+
+ if(configuration.containsKey(LoaderOptions.STATISTICS_ENABLED)) {
+ handler = new StatisticsHandler(handler, configuration);
+ }
+
+ handler.initialise();
+
+ if(configuration.containsKey(LoaderOptions.DIRS)) {
+ for(String dirname : configuration.getStringArray(LoaderOptions.DIRS)) {
+ File dir = new File(dirname);
+
+ try {
+ loadDirectory(dir, handler, getRDFFormat(configuration.getString(LoaderOptions.FORMAT)), configuration.getString(LoaderOptions.COMPRESSION));
+ } catch (RDFParseException | IOException e) {
+ log.warn("error importing directory {}: {}", dir, e.getMessage());
+ }
+ }
+ }
+
+ if(configuration.containsKey(LoaderOptions.FILES)) {
+ for(String fname : configuration.getStringArray(LoaderOptions.FILES)) {
+ File f = new File(fname);
+
+ try {
+ loadFile(f, handler, getRDFFormat(configuration.getString(LoaderOptions.FORMAT)), configuration.getString(LoaderOptions.COMPRESSION));
+ } catch (RDFParseException | IOException e) {
+ log.warn("error importing file {}: {}", f, e.getMessage());
+ }
+ }
+ }
+
+ handler.shutdown();
+
+ return handler;
+ }
+
+ /**
+ * Load data from the inputstream given as first argument into the handler given as second argument.
+ *
+ * @param inStream input byte stream to read the data from; must be plain data in the format given as argument
+ * @param handler handler to add the data to
+ * @param format format to use for creating the parser
+ * @throws RDFParseException
+ * @throws IOException
+ */
+ public void load(InputStream inStream, LoaderHandler handler, RDFFormat format) throws RDFParseException, IOException {
+ try {
+
+ RDFParser parser = createParser(format);
+ parser.setRDFHandler(handler);
+ parser.parse(inStream,configuration.getString(LoaderOptions.BASE_URI, "http://localhost/"));
+
+ } catch (RDFHandlerException e) {
+ log.error("error loading stream data in format {}: {}", format, e.getMessage());
+ }
+ }
+
+ /**
+ * Load data from the reader given as first argument into the handler given as second argument.
+ *
+ * @param reader character stream to read the data from; must be plain data in the format given as argument
+ * @param handler handler to add the data to
+ * @param format format to use for creating the parser
+ * @throws RDFParseException
+ * @throws IOException
+ */
+ public void load(Reader reader, LoaderHandler handler, RDFFormat format) throws RDFParseException, IOException {
+ try {
+
+ RDFParser parser = createParser(format);
+ parser.setRDFHandler(handler);
+ parser.parse(reader,configuration.getString(LoaderOptions.BASE_URI, "http://localhost/"));
+
+ } catch (RDFHandlerException e) {
+ log.error("error loading stream data in format {}: {}", format, e.getMessage());
+ }
+ }
+
+
+
+
+ /**
+ * Load data from the reader given as first argument into the handler given as second argument.
+ *
+ * @param file file to read the data from; in case a compression format is not explicitly given, the method will
+ * try to decide from the file name if the file is in a compressed format
+ * @param handler handler to add the data to
+ * @param format format to use for creating the parser or null for auto-detection
+ * @param compression compression format to use, or null for auto-detection (see formats in org.apache.commons.compress.compressors.CompressorStreamFactory)
+ * @throws RDFParseException
+ * @throws IOException
+ */
+ public void loadFile(File file, LoaderHandler handler, RDFFormat format, String compression) throws RDFParseException, IOException {
+ log.info("loading file {} ...", file);
+
+ // detect the file compression
+ String detectedCompression = detectCompression(file);
+ if(compression == null) {
+ if(detectedCompression != null) {
+ log.info("using auto-detected compression ({})", detectedCompression);
+ compression = detectedCompression;
+ }
+ } else {
+ if(detectedCompression != null && !compression.equals(detectedCompression)) {
+ log.warn("user-specified compression ({}) overrides auto-detected compression ({})", compression, detectedCompression);
+ } else {
+ log.info("using user-specified compression ({})", compression);
+ }
+ }
+
+
+ // detect the file format
+ RDFFormat detectedFormat = RDFFormat.forFileName(uncompressedName(file));
+ if(format == null) {
+ if(detectedFormat != null) {
+ log.info("using auto-detected format ({})", detectedFormat.getName());
+ format = detectedFormat;
+ } else {
+ throw new RDFParseException("could not detect input format of file "+ file);
+ }
+ } else {
+ if(detectedFormat != null && !format.equals(detectedFormat)) {
+ log.warn("user-specified format ({}) overrides auto-detected format ({})", format.getName(), detectedFormat.getName());
+ }
+ }
+
+ // create input stream from file and wrap in compressor stream
+ InputStream in;
+ InputStream fin = new BufferedInputStream(new FileInputStream(file));
+ try {
+ if(compression != null) {
+ in = new CompressorStreamFactory().createCompressorInputStream(compression, fin);
+ } else {
+ in = new CompressorStreamFactory().createCompressorInputStream(fin);
+ }
+ } catch (CompressorException ex) {
+ log.info("no compression detected, using plain input stream");
+ in = fin;
+ }
+
+ // load using the input stream
+ load(in, handler, format);
+ }
+
+
+ /**
+ * Load data from the reader given as first argument into the handler given as second argument.
+ *
+ * @param directory directory to read the data from; in case a compression format is not explicitly given, the method will
+ * try to decide from the file name if the file is in a compressed format
+ * @param handler handler to add the data to
+ * @param format format to use for creating the parser or null for auto-detection
+ * @param compression compression format to use, or null for auto-detection (see formats in org.apache.commons.compress.compressors.CompressorStreamFactory)
+ * @throws RDFParseException
+ * @throws IOException
+ */
+ public void loadDirectory(File directory, LoaderHandler handler, RDFFormat format, String compression) throws RDFParseException, IOException {
+ log.info("loading files in directory {} ...", directory);
+ if(directory.exists() && directory.isDirectory()) {
+ for(File f : directory.listFiles(new DirectoryFilter())) {
+ try {
+ loadFile(f, handler,format,compression);
+ } catch (RDFParseException | IOException e) {
+ log.warn("error importing file {}: {}", f, e.getMessage());
+ }
+ }
+ } else {
+ throw new RDFParseException("could not load files from directory "+directory+": it does not exist or is not a directory");
+ }
+ }
+
+ /**
+ * Detect the compression format from the filename, or null in case auto-detection failed.
+ * @param file
+ * @return
+ */
+ private String detectCompression(File file) {
+ if(BZip2Utils.isCompressedFilename(file.getName())) {
+ return CompressorStreamFactory.BZIP2;
+ } else if(GzipUtils.isCompressedFilename(file.getName())) {
+ return CompressorStreamFactory.GZIP;
+ } else {
+ return null;
+ }
+ }
+
+ private String uncompressedName(File file) {
+ if(BZip2Utils.isCompressedFilename(file.getAbsolutePath())) {
+ return BZip2Utils.getUncompressedFilename(file.getName());
+ } else if(GzipUtils.isCompressedFilename(file.getAbsolutePath())) {
+ return GzipUtils.getUncompressedFilename(file.getName());
+ } else {
+ return file.getName();
+ }
+ }
+
+ /**
+ * Create a parser for the given format, turning off some of the stricter configuration settings so we
+ * can handle more messy data without errors.
+ * @param format
+ * @return
+ */
+ private RDFParser createParser(RDFFormat format) {
+ RDFParser parser = Rio.createParser(format);
+ parser.getParserConfig().addNonFatalError(BasicParserSettings.VERIFY_DATATYPE_VALUES);
+ parser.getParserConfig().addNonFatalError(BasicParserSettings.FAIL_ON_UNKNOWN_DATATYPES);
+ parser.getParserConfig().addNonFatalError(BasicParserSettings.NORMALIZE_DATATYPE_VALUES);
+
+ return parser;
+ }
+
+
+ /**
+ * Determine loader from configuration options.
+ * @return
+ */
+ private LoaderHandler getLoader() {
+ if(configuration.containsKey(LoaderOptions.BACKEND)) {
+ for(LoaderBackend backend : backends) {
+ if(StringUtils.equalsIgnoreCase(backend.getIdentifier(), configuration.getString(LoaderOptions.BACKEND))) {
+ return backend.createLoader(configuration);
+ }
+ }
+ }
+
+ List<LoaderBackend> backendSet = Lists.newArrayList(backends);
+ if(backendSet.size() == 1) {
+ return backendSet.get(0).createLoader(configuration);
+ }
+
+ throw new IllegalStateException("there are multiple backends available, please select one explicitly using -B");
+
+ }
+
+ /**
+ * Get the RDF format from a user specfication (either mime type or short cut)
+ * @param spec
+ * @return
+ */
+ private static RDFFormat getRDFFormat(String spec) {
+ if(StringUtils.equalsIgnoreCase(spec,"turtle")) {
+ return RDFFormat.TURTLE;
+ } else if(StringUtils.equalsIgnoreCase(spec,"n3")) {
+ return RDFFormat.N3;
+ } else if(StringUtils.equalsIgnoreCase(spec,"rdf")) {
+ return RDFFormat.RDFXML;
+ } else if(StringUtils.equalsIgnoreCase(spec,"xml")) {
+ return RDFFormat.RDFXML;
+ } else if(spec != null) {
+ return RDFFormat.forMIMEType(spec);
+ } else {
+ return null;
+ }
+ }
+
+ private static void printHelp(Options options) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp(MarmottaLoader.class.getSimpleName(), options, true);
+ }
+
+ /**
+ * Build command line options. Base options are:
+ * <ul>
+ * <li>-h | --help: show help</li>
+ * <li>-B | --backend: backend to use (kiwi, hbase, berkeleydb)</li>
+ * <li>-z | --gzip: input is GZIP encoded</li>
+ * <li>-j | --bzip2: input is BZIP2 encoded</li>
+ * <li>-c | --context: URI of a context to add the statement to</li>
+ * <li>-b | --base: URI to use as base for resolving relative URIs</li>
+ * <li>-f | --file: input file to use for loading</li>
+ * <li>-d | --dir: input directory containing files to use for loading</li>
+ * <li>-t | --type: input format to use for parsing (MIME type)</li>
+ * <li>-s | --statistics: collect statistics and write a graph into the file given</li>
+ * </ul>
+ *
+ * In addition, loader backends can provide their own additional command line options.
+ *
+ * @return
+ */
+ private static Options buildOptions() {
+ final Options options = new Options();
+
+ options.addOption("h", "help", false, "print this help");
+
+ OptionGroup compression = new OptionGroup();
+ compression.addOption(new Option("z", "gzip", false, "input is gzip compressed"));
+ compression.addOption(new Option("j", "bzip2", false, "input is bzip2 compressed"));
+ options.addOptionGroup(compression);
+
+ final Option backend =
+ OptionBuilder.withArgName("backend")
+ .hasArgs(1)
+ .withDescription("backend to use (" + StringUtils.join(Iterators.transform(backends.iterator(), new BackendIdentifierFunction()), ", ") + ")")
+ .withLongOpt("backend")
+ .create('B');
+ options.addOption(backend);
+
+ final Option base =
+ OptionBuilder.withArgName("base")
+ .hasArgs(1)
+ .withDescription("base URI to use for resolving relative URIs")
+ .withLongOpt("base")
+ .create('b');
+ options.addOption(base);
+
+ final Option context =
+ OptionBuilder.withArgName("context")
+ .hasArgs(1)
+ .withDescription("URI of a context to add the statement to")
+ .withLongOpt("context")
+ .create('c');
+ options.addOption(context);
+
+
+ final Option format =
+ OptionBuilder.withArgName("type")
+ .hasArgs(1)
+ .withDescription("input format to use for parsing (MIME type) in case auto-guessing does not work")
+ .withLongOpt("type")
+ .create('t');
+ options.addOption(format);
+
+ // input options
+ OptionGroup input = new OptionGroup();
+ input.setRequired(true);
+ final Option file =
+ OptionBuilder.withArgName("file")
+ .hasArgs(Option.UNLIMITED_VALUES)
+ .withDescription("input file(s) to load")
+ .withLongOpt("file")
+ .create('f');
+ input.addOption(file);
+
+ final Option directories =
+ OptionBuilder.withArgName("dir")
+ .hasArgs(Option.UNLIMITED_VALUES)
+ .withDescription("input directories(s) to load")
+ .withLongOpt("dir")
+ .create('d');
+ input.addOption(directories);
+ options.addOptionGroup(input);
+
+
+ final Option statistics =
+ OptionBuilder.withArgName("statistics")
+ .withDescription("collect statistics and write a graph into the file given")
+ .withLongOpt("statistics")
+ .hasOptionalArg()
+ .create('s');
+ options.addOption(statistics);
+
+ final Option property =
+ OptionBuilder.withArgName("property=value")
+ .hasArgs(2)
+ .withValueSeparator()
+ .withDescription("set configuration property to value")
+ .create("D");
+ options.addOption(property);
+
+ for(LoaderBackend b : backends) {
+ for(Option o : b.getOptions()) {
+ options.addOption(o);
+ }
+ }
+
+ return options;
+ }
+
+
+ public static Configuration parseOptions(String[] args) throws ParseException {
+ Options options = buildOptions();
+
+ CommandLineParser parser = new PosixParser();
+ CommandLine cmd = parser.parse(options, args);
+
+ Configuration result = new MapConfiguration(new HashMap<String,Object>());
+
+ if(cmd.hasOption('B')) {
+ // check backends
+ Set<String> existing = Sets.newHashSet(Iterators.transform (backends.iterator(), new BackendIdentifierFunction()));
+ if(!existing.contains(cmd.getOptionValue('B'))) {
+ throw new ParseException("the backend " + cmd.getOptionValue('B') + " does not exist");
+ }
+
+ result.setProperty(LoaderOptions.BACKEND, cmd.getOptionValue('B'));
+ }
+
+ if(cmd.hasOption('b')) {
+ result.setProperty(LoaderOptions.BASE_URI, cmd.getOptionValue('b'));
+ }
+
+ if(cmd.hasOption('z')) {
+ result.setProperty(LoaderOptions.COMPRESSION, CompressorStreamFactory.GZIP);
+ }
+
+ if(cmd.hasOption('j')) {
+ result.setProperty(LoaderOptions.COMPRESSION, CompressorStreamFactory.BZIP2);
+ }
+
+ if(cmd.hasOption('c')) {
+ result.setProperty(LoaderOptions.CONTEXT, cmd.getOptionValue('c'));
+ }
+
+ if(cmd.hasOption('t')) {
+ RDFFormat fmt = getRDFFormat(cmd.getOptionValue('t'));
+ if(fmt == null) {
+ throw new ParseException("unrecognized MIME type: " + cmd.getOptionValue('t'));
+ }
+
+ result.setProperty(LoaderOptions.FORMAT, fmt);
+ }
+
+ if(cmd.hasOption('f')) {
+ result.setProperty(LoaderOptions.FILES, Arrays.asList(cmd.getOptionValues('f')));
+ }
+
+ if(cmd.hasOption('d')) {
+ result.setProperty(LoaderOptions.DIRS, Arrays.asList(cmd.getOptionValues('d')));
+ }
+
+ if(cmd.hasOption('s')) {
+ result.setProperty(LoaderOptions.STATISTICS_ENABLED, true);
+ result.setProperty(LoaderOptions.STATISTICS_GRAPH, cmd.getOptionValue('s'));
+ }
+
+ if(cmd.hasOption('D')) {
+ for(Map.Entry e : cmd.getOptionProperties("D").entrySet()) {
+ result.setProperty(e.getKey().toString(), e.getValue());
+ }
+ }
+
+ for(LoaderBackend b : backends) {
+ for(Option option : b.getOptions()) {
+ if(cmd.hasOption(option.getOpt())) {
+ String key = String.format("backend.%s.%s", b.getIdentifier(), option.getLongOpt() != null ? option.getLongOpt() : option.getOpt());
+ if(option.hasArg()) {
+ if(option.hasArgs()) {
+ result.setProperty(key, Arrays.asList(cmd.getOptionValues(option.getOpt())));
+ } else {
+ result.setProperty(key, cmd.getOptionValue(option.getOpt()));
+ }
+ } else {
+ result.setProperty(key, true);
+ }
+ }
+ }
+ }
+
+ return result;
+ }
+
+ /**
+ * Parse command line arguments and start import
+ *
+ * @param args
+ */
+ public static void main(String[] args) {
+ try {
+ Configuration cfg = parseOptions(args);
+
+ MarmottaLoader loader = new MarmottaLoader(cfg);
+ loader.load();
+
+ } catch (Exception e) {
+ System.err.println("error parsing command line options: "+e.getMessage());
+ log.warn("Exception Details:",e);
+ printHelp(buildOptions());
+ System.exit(1);
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/marmotta/blob/ce4431e7/loader/marmotta-loader-core/src/main/java/org/apache/marmotta/loader/functions/BackendIdentifierFunction.java
----------------------------------------------------------------------
diff --git a/loader/marmotta-loader-core/src/main/java/org/apache/marmotta/loader/functions/BackendIdentifierFunction.java b/loader/marmotta-loader-core/src/main/java/org/apache/marmotta/loader/functions/BackendIdentifierFunction.java
new file mode 100644
index 0000000..c54c831
--- /dev/null
+++ b/loader/marmotta-loader-core/src/main/java/org/apache/marmotta/loader/functions/BackendIdentifierFunction.java
@@ -0,0 +1,16 @@
+package org.apache.marmotta.loader.functions;
+
+import com.google.common.base.Function;
+import org.apache.marmotta.loader.api.LoaderBackend;
+
+/**
+ * Add file description here!
+ *
+ * @author Sebastian Schaffert (sschaffert@apache.org)
+ */
+public class BackendIdentifierFunction implements Function<LoaderBackend, String> {
+ @Override
+ public String apply(LoaderBackend input) {
+ return input.getIdentifier();
+ }
+}
http://git-wip-us.apache.org/repos/asf/marmotta/blob/ce4431e7/loader/marmotta-loader-core/src/main/java/org/apache/marmotta/loader/sesame/SesameLoaderHandler.java
----------------------------------------------------------------------
diff --git a/loader/marmotta-loader-core/src/main/java/org/apache/marmotta/loader/sesame/SesameLoaderHandler.java b/loader/marmotta-loader-core/src/main/java/org/apache/marmotta/loader/sesame/SesameLoaderHandler.java
new file mode 100644
index 0000000..5c7d21f
--- /dev/null
+++ b/loader/marmotta-loader-core/src/main/java/org/apache/marmotta/loader/sesame/SesameLoaderHandler.java
@@ -0,0 +1,39 @@
+package org.apache.marmotta.loader.sesame;
+
+import org.apache.marmotta.loader.api.LoaderHandler;
+import org.openrdf.rio.RDFHandler;
+import org.openrdf.rio.RDFHandlerException;
+import org.openrdf.rio.helpers.RDFHandlerWrapper;
+
+/**
+ * A simple wrapper that allows using standard RDF handlers as MarmottaHandler
+ *
+ * @author Sebastian Schaffert (sschaffert@apache.org)
+ */
+public class SesameLoaderHandler extends RDFHandlerWrapper implements LoaderHandler {
+
+
+ public SesameLoaderHandler(RDFHandler... rdfHandlers) {
+ super(rdfHandlers);
+ }
+
+ /**
+ * Initialise the handler, performing any initialisation steps that are necessary before bulk importing can
+ * start (e.g. dropping indexes or establishing a connection).
+ *
+ * @throws org.openrdf.rio.RDFHandlerException
+ */
+ @Override
+ public void initialise() throws RDFHandlerException {
+
+ }
+
+ /**
+ * Peform cleanup on shutdown, e.g. re-creating indexes after import completed or freeing resources acquired by
+ * the handler.
+ */
+ @Override
+ public void shutdown() throws RDFHandlerException {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/marmotta/blob/ce4431e7/loader/marmotta-loader-core/src/main/java/org/apache/marmotta/loader/statistics/Statistics.java
----------------------------------------------------------------------
diff --git a/loader/marmotta-loader-core/src/main/java/org/apache/marmotta/loader/statistics/Statistics.java b/loader/marmotta-loader-core/src/main/java/org/apache/marmotta/loader/statistics/Statistics.java
new file mode 100644
index 0000000..c44fdf1
--- /dev/null
+++ b/loader/marmotta-loader-core/src/main/java/org/apache/marmotta/loader/statistics/Statistics.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.marmotta.loader.statistics;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.marmotta.loader.api.LoaderOptions;
+import org.apache.marmotta.loader.util.UnitFormatter;
+import org.rrd4j.ConsolFun;
+import org.rrd4j.DsType;
+import org.rrd4j.core.*;
+import org.rrd4j.graph.RrdGraph;
+import org.rrd4j.graph.RrdGraphDef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.imageio.ImageIO;
+import java.awt.*;
+import java.awt.image.BufferedImage;
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Collect statistics from a KiWiHandler by sampling at given time intervals and logging to a RRD database.
+ *
+ * @author Sebastian Schaffert (sschaffert@apache.org)
+ */
+public class Statistics {
+
+ private static Logger log = LoggerFactory.getLogger(Statistics.class);
+
+ private StatisticsHandler handler;
+
+
+ protected RrdDb statDB;
+ protected Sample statSample;
+ protected long statLastDump;
+
+ protected long SAMPLE_INTERVAL = TimeUnit.SECONDS.toSeconds(5L);
+ protected long DIAGRAM_INTERVAL = TimeUnit.MINUTES.toSeconds(5L);
+
+ protected ScheduledExecutorService statSampler;
+
+ private long start, previous;
+
+ private Configuration configuration;
+
+ public Statistics(StatisticsHandler handler, Configuration configuration) {
+ this.handler = handler;
+ this.configuration = configuration;
+ }
+
+
+ public void startSampling() {
+ log.info("statistics gathering enabled; starting statistics database");
+
+ this.start = System.currentTimeMillis();
+ this.previous = System.currentTimeMillis();
+
+ File statFile = new File("kiwiloader.rrd");
+ if(statFile.exists()) {
+ log.info("deleting old statistics database");
+ statFile.delete();
+ }
+
+ RrdDef stCfg = new RrdDef("kiwiloader.rrd");
+ stCfg.setStep(SAMPLE_INTERVAL);
+ stCfg.addDatasource("triples", DsType.COUNTER, 600, Double.NaN, Double.NaN);
+ stCfg.addArchive(ConsolFun.AVERAGE, 0.5, 1, 1440); // every five seconds for 2 hours
+ stCfg.addArchive(ConsolFun.AVERAGE, 0.5, 12, 1440); // every minute for 1 day
+ stCfg.addArchive(ConsolFun.AVERAGE, 0.5, 60, 1440); // every five minutes for five days
+
+ try {
+ statDB = new RrdDb(stCfg);
+ statSample = statDB.createSample();
+ statLastDump = System.currentTimeMillis();
+
+ // start a sampler thread to run at the SAMPLE_INTERVAL
+ statSampler = Executors.newScheduledThreadPool(2);
+ statSampler.scheduleAtFixedRate(new StatisticsUpdater(),0, SAMPLE_INTERVAL, TimeUnit.SECONDS);
+
+ // create a statistics diagram every 5 minutes
+ statSampler.scheduleAtFixedRate(new DiagramUpdater(),DIAGRAM_INTERVAL,DIAGRAM_INTERVAL,TimeUnit.SECONDS);
+ } catch (IOException e) {
+ log.warn("could not initialize statistics database: {}",e.getMessage());
+ }
+
+ }
+
+ public void stopSampling() {
+ DiagramUpdater du = new DiagramUpdater();
+ du.run();
+
+ if(statDB != null) {
+ try {
+ statDB.close();
+ } catch (IOException e) {
+ log.warn("could not close statistics database...");
+ }
+ }
+ if(statSampler != null) {
+ statSampler.shutdown();
+ }
+ }
+
+ public void printStatistics() {
+ if(statSample != null) {
+ try {
+ long time = System.currentTimeMillis() / 1000;
+
+ FetchRequest minRequest = statDB.createFetchRequest(ConsolFun.AVERAGE, time - 60 , time);
+ FetchData minData = minRequest.fetchData();
+ double triplesLastMin = minData.getAggregate("triples", ConsolFun.AVERAGE);
+
+ FetchRequest hourRequest = statDB.createFetchRequest(ConsolFun.AVERAGE, time - (60 * 60) , time);
+ FetchData hourData = hourRequest.fetchData();
+ double triplesLastHour = hourData.getAggregate("triples", ConsolFun.AVERAGE);
+
+ if(triplesLastMin != Double.NaN) {
+ log.info("imported {} triples; statistics: {}/sec (last min), {}/sec (last hour)", UnitFormatter.formatSize(handler.triples), UnitFormatter.formatSize(triplesLastMin), UnitFormatter.formatSize(triplesLastHour));
+ }
+ previous = System.currentTimeMillis();
+
+ } catch (IOException e) {
+ log.warn("error updating statistics: {}", e.getMessage());
+ }
+ } else {
+ }
+
+
+ }
+
+
+ private class StatisticsUpdater implements Runnable {
+ @Override
+ public void run() {
+
+ try {
+ long time = System.currentTimeMillis() / 1000;
+
+ synchronized (statSample) {
+ statSample.setTime(time);
+ statSample.setValues(handler.triples);
+ statSample.update();
+ }
+
+ } catch (Exception e) {
+ log.warn("could not update statistics database: {}", e.getMessage());
+ }
+ }
+ }
+
+
+ private class DiagramUpdater implements Runnable {
+ @Override
+ public void run() {
+ try {
+ File gFile = new File(configuration.getString(LoaderOptions.STATISTICS_GRAPH, File.createTempFile("marmotta-loader","png").getAbsolutePath()));
+
+ if(gFile.exists()) {
+ gFile.delete();
+ }
+
+ // generate PNG diagram
+ RrdGraphDef gDef = new RrdGraphDef();
+ gDef.setFilename("-");
+ gDef.setWidth(800);
+ gDef.setHeight(600);
+ gDef.setStartTime(start / 1000);
+ gDef.setEndTime(System.currentTimeMillis() / 1000);
+ gDef.setTitle("KiWiLoader Performance");
+ gDef.setVerticalLabel("number/sec");
+ gDef.setAntiAliasing(true);
+
+
+ gDef.datasource("triples", "kiwiloader.rrd", "triples", ConsolFun.AVERAGE);
+
+ gDef.line("triples", Color.BLUE, "Triples Written", 3F);
+
+
+ gDef.setImageFormat("png");
+ gDef.gprint("triples", ConsolFun.AVERAGE, "average triples/sec: %,.0f\\l");
+
+ RrdGraph graph = new RrdGraph(gDef);
+ BufferedImage img = new BufferedImage(900,750, BufferedImage.TYPE_INT_RGB);
+ graph.render(img.getGraphics());
+ ImageIO.write(img, "png", gFile);
+
+ log.info("updated statistics diagram generated in {}", gFile.getAbsolutePath());
+
+ statLastDump = System.currentTimeMillis();
+ } catch (Exception ex) {
+ log.warn("error creating statistics diagram: {}", ex.getMessage());
+ }
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/marmotta/blob/ce4431e7/loader/marmotta-loader-core/src/main/java/org/apache/marmotta/loader/statistics/StatisticsHandler.java
----------------------------------------------------------------------
diff --git a/loader/marmotta-loader-core/src/main/java/org/apache/marmotta/loader/statistics/StatisticsHandler.java b/loader/marmotta-loader-core/src/main/java/org/apache/marmotta/loader/statistics/StatisticsHandler.java
new file mode 100644
index 0000000..7e4d858
--- /dev/null
+++ b/loader/marmotta-loader-core/src/main/java/org/apache/marmotta/loader/statistics/StatisticsHandler.java
@@ -0,0 +1,69 @@
+package org.apache.marmotta.loader.statistics;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.marmotta.loader.api.LoaderHandler;
+import org.apache.marmotta.loader.api.LoaderOptions;
+import org.apache.marmotta.loader.wrapper.LoaderHandlerWrapper;
+import org.openrdf.model.Statement;
+import org.openrdf.rio.RDFHandlerException;
+
+/**
+ * Add file description here!
+ *
+ * @author Sebastian Schaffert (sschaffert@apache.org)
+ */
+public class StatisticsHandler extends LoaderHandlerWrapper implements LoaderHandler {
+
+ protected long triples = 0;
+
+ private Statistics statistics;
+
+ private Configuration configuration;
+
+ public StatisticsHandler(LoaderHandler handler, Configuration configuration) {
+ super(handler);
+ this.configuration = configuration;
+ }
+
+ /**
+ * Initialise the handler, performing any initialisation steps that are necessary before bulk importing can
+ * start (e.g. dropping indexes or establishing a connection).
+ *
+ * @throws org.openrdf.rio.RDFHandlerException
+ */
+ @Override
+ public void initialise() throws RDFHandlerException {
+ statistics = new Statistics(this,configuration);
+ statistics.startSampling();
+
+ super.initialise();
+ }
+
+ /**
+ * Peform cleanup on shutdown, e.g. re-creating indexes after import completed or freeing resources acquired by
+ * the handler.
+ */
+ @Override
+ public void shutdown() throws RDFHandlerException {
+ super.shutdown();
+
+ statistics.stopSampling();
+ }
+
+ /**
+ * Handles a statement.
+ *
+ * @param st The statement.
+ * @throws org.openrdf.rio.RDFHandlerException If the RDF handler has encountered an unrecoverable error.
+ */
+ @Override
+ public void handleStatement(Statement st) throws RDFHandlerException {
+ super.handleStatement(st);
+
+ triples++;
+
+ if(triples % configuration.getLong(LoaderOptions.STATISTICS_INTERVAL, 10000L) == 0) {
+ statistics.printStatistics();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/marmotta/blob/ce4431e7/loader/marmotta-loader-core/src/main/java/org/apache/marmotta/loader/util/DirectoryFilter.java
----------------------------------------------------------------------
diff --git a/loader/marmotta-loader-core/src/main/java/org/apache/marmotta/loader/util/DirectoryFilter.java b/loader/marmotta-loader-core/src/main/java/org/apache/marmotta/loader/util/DirectoryFilter.java
new file mode 100644
index 0000000..19920e9
--- /dev/null
+++ b/loader/marmotta-loader-core/src/main/java/org/apache/marmotta/loader/util/DirectoryFilter.java
@@ -0,0 +1,25 @@
+package org.apache.marmotta.loader.util;
+
+import java.io.File;
+import java.io.FileFilter;
+
+/**
+ * Add file description here!
+ *
+ * @author Sebastian Schaffert (sschaffert@apache.org)
+ */
+public class DirectoryFilter implements FileFilter {
+
+ /**
+ * Tests whether or not the specified abstract pathname should be
+ * included in a pathname list.
+ *
+ * @param pathname The abstract pathname to be tested
+ * @return <code>true</code> if and only if <code>pathname</code>
+ * should be included
+ */
+ @Override
+ public boolean accept(File pathname) {
+ return pathname.exists() && !pathname.isDirectory();
+ }
+}
http://git-wip-us.apache.org/repos/asf/marmotta/blob/ce4431e7/loader/marmotta-loader-core/src/main/java/org/apache/marmotta/loader/util/UnitFormatter.java
----------------------------------------------------------------------
diff --git a/loader/marmotta-loader-core/src/main/java/org/apache/marmotta/loader/util/UnitFormatter.java b/loader/marmotta-loader-core/src/main/java/org/apache/marmotta/loader/util/UnitFormatter.java
new file mode 100644
index 0000000..033842f
--- /dev/null
+++ b/loader/marmotta-loader-core/src/main/java/org/apache/marmotta/loader/util/UnitFormatter.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.marmotta.loader.util;
+
+/**
+ * Provides helper methods for formatting values in different units (currently only allows to format sizes by turning
+ * them into Giga, Mega, Kilo...)
+ *
+ * @author Sebastian Schaffert (sschaffert@apache.org)
+ */
+public class UnitFormatter {
+
+
+ public static String formatSize(double value) {
+ if(value == Double.NaN) {
+ return "unknown";
+ } else if(value < 1000 * 10) {
+ return String.format("%,d", (int)value);
+ } else {
+ int exp = (int) (Math.log(value) / Math.log(1000));
+ if(exp < 1) {
+ return String.format("%,d", (int)value);
+ } else {
+ char pre = "KMGTPE".charAt(exp-1);
+ return String.format("%.1f %s", value / Math.pow(1000, exp), pre);
+ }
+ }
+ }
+}