You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by ca...@apache.org on 2011/09/23 20:15:35 UTC
svn commit: r1174932 - in /incubator/jena/Scratch/PC/tdbloader2/trunk/src:
main/java/cmd/ main/java/org/apache/jena/tdbloader2/ test/java/cmd/
Author: castagna
Date: Fri Sep 23 18:15:35 2011
New Revision: 1174932
URL: http://svn.apache.org/viewvc?rev=1174932&view=rev
Log:
JENA-117
Added:
incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/DataStreamFactory.java (with props)
Modified:
incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/cmd/tdbloader2.java
incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/CustomLabelToNode.java
incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/MultiThreadedSortedDataBag.java
incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/NodeTableBuilder2.java
incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/PairInputStream.java
incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/PairOutputStream.java
incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/ProgressLogger.java
incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/TupleInputStream.java
incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/TupleOutputStream.java
incubator/jena/Scratch/PC/tdbloader2/trunk/src/test/java/cmd/TestTDBLoader2.java
Modified: incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/cmd/tdbloader2.java
URL: http://svn.apache.org/viewvc/incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/cmd/tdbloader2.java?rev=1174932&r1=1174931&r2=1174932&view=diff
==============================================================================
--- incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/cmd/tdbloader2.java (original)
+++ incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/cmd/tdbloader2.java Fri Sep 23 18:15:35 2011
@@ -18,24 +18,16 @@
package cmd;
-import static com.hp.hpl.jena.sparql.util.Utils.nowAsString;
import static com.hp.hpl.jena.tdb.sys.SystemTDB.SizeOfLong;
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
import java.io.InputStream;
-import java.io.OutputStream;
import java.io.StringWriter;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
-import java.util.zip.GZIPInputStream;
-import java.util.zip.GZIPOutputStream;
import org.apache.jena.tdbloader2.CustomLabelToNode;
+import org.apache.jena.tdbloader2.DataStreamFactory;
import org.apache.jena.tdbloader2.MultiThreadedSortedDataBag;
import org.apache.jena.tdbloader2.NodeTableBuilder2;
import org.apache.jena.tdbloader2.ProgressLogger;
@@ -71,6 +63,7 @@ import org.openjena.riot.tokens.Token;
import org.openjena.riot.tokens.Tokenizer;
import org.openjena.riot.tokens.TokenizerFactory;
import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import tdb.cmdline.CmdTDB;
import arq.cmd.CmdException;
@@ -81,7 +74,6 @@ import com.hp.hpl.jena.graph.Node;
import com.hp.hpl.jena.graph.Triple;
import com.hp.hpl.jena.sparql.core.Quad;
import com.hp.hpl.jena.sparql.util.Utils;
-import com.hp.hpl.jena.tdb.TDB;
import com.hp.hpl.jena.tdb.base.block.BlockMgr;
import com.hp.hpl.jena.tdb.base.block.BlockMgrFactory;
import com.hp.hpl.jena.tdb.base.file.FileSet;
@@ -101,30 +93,27 @@ import com.hp.hpl.jena.tdb.sys.SystemTDB
public class tdbloader2 extends CmdGeneral
{
static { Log.setLog4j() ; }
- private static Logger cmdLog =TDB.logLoader ;
+ private static Logger cmdLog = LoggerFactory.getLogger(tdbloader2.class) ;
- private static ArgDecl argLocation = new ArgDecl(ArgDecl.HasValue, "loc", "location") ;
- private static ArgDecl argCompression = new ArgDecl(ArgDecl.NoValue, "comp", "compression") ;
- private static ArgDecl argBufferSize = new ArgDecl(ArgDecl.HasValue, "buf", "buffer-size") ;
- private static ArgDecl argGzipOutside = new ArgDecl(ArgDecl.NoValue, "gzip-outside") ;
- private static ArgDecl argSpillSize = new ArgDecl(ArgDecl.HasValue, "spill", "spill-size") ;
- private static ArgDecl argNoStats = new ArgDecl(ArgDecl.NoValue, "no-stats") ;
- private static ArgDecl argNoBuffer = new ArgDecl(ArgDecl.NoValue, "no-buffer") ;
+ private static String runId = String.valueOf(System.currentTimeMillis()) ; // a unique identifier for this run, it's used for blank node labels
+ private static ArgDecl argLocation = new ArgDecl(ArgDecl.HasValue, "loc", "location") ;
+ private static ArgDecl argCompression = new ArgDecl(ArgDecl.NoValue, "comp", "compression") ;
+ private static ArgDecl argBufferSize = new ArgDecl(ArgDecl.HasValue, "buf", "buffer-size") ;
+ private static ArgDecl argGzipOutside = new ArgDecl(ArgDecl.NoValue, "gzip-outside") ;
+ private static ArgDecl argSpillSize = new ArgDecl(ArgDecl.HasValue, "spill", "spill-size") ;
+ private static ArgDecl argNoStats = new ArgDecl(ArgDecl.NoValue, "no-stats") ;
+ private static ArgDecl argNoBuffer = new ArgDecl(ArgDecl.NoValue, "no-buffer") ;
+
+ private Location location ;
private String locationString ;
private List<String> datafiles ;
- private Location location ;
- private static boolean compression ;
- private static boolean gzip_outside = false ;
- private static int buffer_size = 8192 ;
public static int spill_size = 1000000 ;
public static boolean no_stats = false ;
- private static boolean no_buffer = false ;
private ThresholdPolicyCount<Tuple<Long>> policy ;
private Comparator<Tuple<Long>> comparator = new TupleComparator();
- private static String runId = String.valueOf(System.currentTimeMillis());
public static void main(String...argv)
{
@@ -136,13 +125,13 @@ public class tdbloader2 extends CmdGener
public tdbloader2(String...argv)
{
super(argv) ;
- super.add(argLocation, "--loc", "Location") ;
- super.add(argCompression, "--compression", "Use compression for intermediate files") ;
- super.add(argBufferSize, "--buffer-size", "The size of buffers for IO in bytes") ;
+ super.add(argLocation, "--loc", "Location") ;
+ super.add(argCompression, "--compression", "Use compression for intermediate files") ;
+ super.add(argBufferSize, "--buffer-size", "The size of buffers for IO in bytes") ;
super.add(argGzipOutside, "--gzip-outside", "GZIP...(Buffered...())") ;
- super.add(argSpillSize, "--spill-size", "The size of spillable segments in tuples|records") ;
- super.add(argNoStats, "--no-stats", "Do not generate the stats file") ;
- super.add(argNoBuffer, "--no-buffer", "Do not use Buffered{Input|Output}Stream") ;
+ super.add(argSpillSize, "--spill-size", "The size of spillable segments in tuples|records") ;
+ super.add(argNoStats, "--no-stats", "Do not generate the stats file") ;
+ super.add(argNoBuffer, "--no-buffer", "Do not use Buffered{Input|Output}Stream") ;
}
@Override
@@ -152,15 +141,18 @@ public class tdbloader2 extends CmdGener
locationString = super.getValue(argLocation) ;
location = new Location(locationString) ;
- compression = super.hasArg(argCompression) ;
- if ( super.hasArg(argBufferSize) )
- buffer_size = Integer.valueOf(super.getValue(argBufferSize)) ;
- gzip_outside = super.hasArg(argGzipOutside) ;
+
if ( super.hasArg(argSpillSize) )
spill_size = Integer.valueOf(super.getValue(argSpillSize)) ;
no_stats = super.hasArg(argNoStats) ;
- no_buffer = super.hasArg(argNoBuffer) ;
+ // this is to try different ways to create Input/Output streams
+ DataStreamFactory.setUseCompression( super.hasArg(argCompression) ) ;
+ DataStreamFactory.setGZIPOutside( super.hasArg(argGzipOutside) ) ;
+ if ( super.hasArg(argBufferSize) )
+ DataStreamFactory.setBufferSize( Integer.valueOf(super.getValue(argBufferSize)) ) ;
+ DataStreamFactory.setBuffered( ! super.hasArg(argNoBuffer) ) ;
+
datafiles = super.getPositional() ;
for( String filename : datafiles)
@@ -172,7 +164,7 @@ public class tdbloader2 extends CmdGener
if ( ! FileOps.exists(filename) )
cmdError("File does not exist: "+filename) ;
}
-
+
policy = new ThresholdPolicyCount<Tuple<Long>>(spill_size);
}
@@ -181,48 +173,56 @@ public class tdbloader2 extends CmdGener
{
// This formats the location correctly.
DatasetGraphTDB dsg = SetupTDB.buildDataset(location) ;
-
+
// so close indexes and the prefix table.
dsg.getTripleTable().getNodeTupleTable().getTupleTable().close();
dsg.getQuadTable().getNodeTupleTable().getTupleTable().close();
// Later - attach prefix table to parser.
dsg.getPrefixes().close() ;
-
+
ProgressLogger monitorTotal = new ProgressLogger(cmdLog, "tuples", BulkLoader.DataTickPoint,BulkLoader.superTick) ;
+ monitorTotal.start() ;
DataBag<Tuple<Long>> outputTriples = new MultiThreadedSortedDataBag<Tuple<Long>>(policy, new TripleSerializationFactory(), comparator);
DataBag<Tuple<Long>> outputQuads = new MultiThreadedSortedDataBag<Tuple<Long>>(policy, new QuadSerializationFactory(), comparator);
-
+
// Node table and input data using node ids (rather than RDF node values)
Sink<Quad> sink = new NodeTableBuilder2(dsg, monitorTotal, outputTriples, outputQuads) ;
Sink<Triple> sink2 = new SinkExtendTriplesToQuads(sink) ;
-
- monitorTotal.start() ;
- for( String filename : datafiles)
- {
- if ( datafiles.size() > 0 )
- cmdLog.info("Load: "+filename+" -- "+Utils.nowAsString()) ;
-
- InputStream in = IO.openFile(filename) ;
- Tokenizer tokenizer = TokenizerFactory.makeTokenizerUTF8(in) ;
- ParserProfile profile = createParserProfile(runId, filename);
- Lang lang = Lang.guess(filename, Lang.NQUADS) ;
- if ( lang.isTriples() ) {
- LangNTriples parser = new LangNTriples(tokenizer, profile, sink2) ;
- parser.parse() ;
- } else {
- LangNQuads parser = new LangNQuads(tokenizer, profile, sink) ;
- parser.parse() ;
+
+ // Build primary indexes: SPO and GSPO
+ BPlusTree bptSPO = null ;
+ BPlusTree bptGSPO = null ;
+ try {
+ for( String filename : datafiles)
+ {
+ if ( datafiles.size() > 0 )
+ cmdLog.info("Load: "+filename+" -- "+Utils.nowAsString()) ;
+
+ InputStream in = IO.openFile(filename) ;
+ Tokenizer tokenizer = TokenizerFactory.makeTokenizerUTF8(in) ;
+ ParserProfile profile = createParserProfile(runId, filename);
+ Lang lang = Lang.guess(filename, Lang.NQUADS) ;
+ if ( lang.isTriples() ) {
+ LangNTriples parser = new LangNTriples(tokenizer, profile, sink2) ;
+ parser.parse() ;
+ } else {
+ LangNQuads parser = new LangNQuads(tokenizer, profile, sink) ;
+ parser.parse() ;
+ }
+ IO.close(in) ; // TODO: final {}
}
- IO.close(in) ; // TODO: final {}
- }
- sink.close() ;
+ sink.close() ;
-// dsg.sync() ;
-// dsg.close() ;
-
- BPlusTree bptGSPO = createBPlusTreeIndex(Names.primaryIndexQuads, outputQuads) ;
- BPlusTree bptSPO = createBPlusTreeIndex(Names.primaryIndexTriples, outputTriples) ;
+ spill(outputTriples) ;
+ bptSPO = createBPlusTreeIndex(Names.primaryIndexTriples, outputTriples) ;
+
+ spill(outputQuads) ;
+ bptGSPO = createBPlusTreeIndex(Names.primaryIndexQuads, outputQuads) ;
+ } finally {
+ outputTriples.close() ;
+ outputQuads.close() ;
+ }
// Secondary POS and OSP indexes
for ( String indexName : Names.tripleIndexes ) {
@@ -237,31 +237,33 @@ public class tdbloader2 extends CmdGener
createBPlusTreeIndex(indexName, new ColumnMap(Names.primaryIndexQuads, indexName), bptGSPO) ;
}
}
-
- outputTriples.close() ;
- outputQuads.close() ;
-
- if ( !no_stats ) {
+ if ( ! no_stats ) {
if ( ! location.isMem() ) {
dsg = SetupTDB.buildDataset(location) ;
Stats.write(dsg, ((NodeTableBuilder2)sink).getCollector()) ;
}
}
-
- // ---- Monitor
- print ( monitorTotal ) ;
+ ProgressLogger.print ( cmdLog, monitorTotal ) ;
}
- public static void print ( ProgressLogger monitor ) {
- long time = monitor.finish() ;
+ @Override
+ protected String getSummary()
+ {
+ return getCommandName()+" --loc=DIR FILE ..." ;
+ }
- long total = monitor.getTicks() ;
- float elapsedSecs = time/1000F ;
- float rate = (elapsedSecs!=0) ? total/elapsedSecs : 0 ;
- String str = String.format("Total: %,d tuples : %,.2f seconds : %,.2f tuples/sec [%s]", total, elapsedSecs, rate, nowAsString()) ;
- cmdLog.info(str) ;
+ @Override
+ protected String getCommandName()
+ {
+ return this.getClass().getName() ;
+ }
+
+ private void spill ( DataBag<?> bag ) {
+ if ( bag instanceof MultiThreadedSortedDataBag<?> ) {
+ ((MultiThreadedSortedDataBag<?>)bag).spill() ;
+ }
}
private BPlusTree createBPlusTreeIndex(String indexName, DataBag<Tuple<Long>> tuples) {
@@ -302,15 +304,24 @@ public class tdbloader2 extends CmdGener
return record ;
}
};
- Iterator<Record> iter = Iter.iter(tuples.iterator()).map(transformTuple2Record) ;
- BPlusTree bpt2 = BPlusTreeRewriter.packIntoBPlusTree(iter, bptParams, recordFactory, blkMgrNodes, blkMgrRecords) ;
- bpt2.sync() ;
-
- print ( monitor ) ;
+ BPlusTree bpt2 ;
+ Iterator<Tuple<Long>> it = tuples.iterator() ;
+ Iterator<Record> iter = null ;
+ try {
+ iter = Iter.iter(it).map(transformTuple2Record) ;
+ bpt2 = BPlusTreeRewriter.packIntoBPlusTree(iter, bptParams, recordFactory, blkMgrNodes, blkMgrRecords) ;
+ bpt2.sync() ;
+ } finally {
+ Iter.close(it) ;
+ Iter.close(iter) ;
+ }
+
+ ProgressLogger.print ( cmdLog, monitor ) ;
+
return bpt2 ;
}
-
+
private void createBPlusTreeIndex(String indexName, final ColumnMap colMap, BPlusTree bpt) {
final int size = indexName.length() ;
@@ -326,25 +337,34 @@ public class tdbloader2 extends CmdGener
cmdLog.info("Index: sorting data for " + indexName + " index...") ;
final ProgressLogger monitor = new ProgressLogger(cmdLog, "records to " + indexName, BulkLoader.DataTickPoint,BulkLoader.superTick) ;
monitor.start() ;
-
+
Transform<Record, Tuple<Long>> transformTuple2Tuple = new Transform<Record, Tuple<Long>>() {
- @Override public Tuple<Long> convert(Record record) {
- Long[] ids = new Long[size] ;
- for ( int i = 0 ; i < size ; i++ ) {
- ids[colMap.fetchSlotIdx(i)] = Bytes.getLong(record.getKey(), i*SizeOfLong) ;
- }
- monitor.tick() ;
- return Tuple.create(ids) ;
- }
+ @Override public Tuple<Long> convert(Record record) {
+ Long[] ids = new Long[size] ;
+ for ( int i = 0 ; i < size ; i++ ) {
+ ids[colMap.fetchSlotIdx(i)] = Bytes.getLong(record.getKey(), i*SizeOfLong) ;
+ }
+ monitor.tick() ;
+ return Tuple.create(ids) ;
+ }
};
- outTuples.addAll(Iter.iter(bpt.iterator()).map(transformTuple2Tuple).iterator()) ;
-
- print ( monitor ) ;
-
- createBPlusTreeIndex(indexName, outTuples) ;
-
- outTuples.close() ;
- outTuples = null ;
+
+ // Reads BPlusTree index and sort it for a different index according to ColumnMap
+ try {
+ Iterator<Record> bptIter = bpt.iterator() ;
+ try {
+ outTuples.addAll(Iter.iter(bptIter).map(transformTuple2Tuple).iterator()) ;
+ } finally {
+ Iter.close(bptIter) ;
+ }
+
+ ProgressLogger.print ( cmdLog, monitor ) ;
+
+ createBPlusTreeIndex(indexName, outTuples) ;
+ } finally {
+ outTuples.close() ;
+ outTuples = null ;
+ }
}
private void deleteExistingBPlusTreeIndex(String indexName) {
@@ -352,11 +372,15 @@ public class tdbloader2 extends CmdGener
FileOps.delete(location.absolute(indexName, Names.bptExt2)) ;
}
+
+ // Utility methods for RDF parsing...
public static final NodeToLabel nodeToLabel = NodeToLabel.createBNodeByLabelAsGiven();
+
public static String serialize(Node node) {
+ Prologue prologue = new Prologue(null, IRIResolver.createNoResolve());
StringWriter out = new StringWriter();
- OutputLangUtils.output(out, node, null, nodeToLabel);
+ OutputLangUtils.output(out, node, prologue, nodeToLabel);
return out.toString();
}
@@ -373,6 +397,7 @@ public class tdbloader2 extends CmdGener
}
private static ParserProfile profile = createParserProfile();
+
public static Node parse(String string) {
Tokenizer tokenizer = TokenizerFactory.makeTokenizerString(string) ;
if ( ! tokenizer.hasNext() )
@@ -383,51 +408,7 @@ public class tdbloader2 extends CmdGener
Log.warn(RiotLib.class, "String has more than one token in it: "+string) ;
return n ;
}
-
- @Override
- protected String getSummary()
- {
- return getCommandName()+" --loc=DIR FILE ..." ;
- }
-
- @Override
- protected String getCommandName()
- {
- return this.getClass().getName() ;
- }
- public static DataOutputStream createDataOutputStream(OutputStream out) {
- try {
- if ( no_buffer ) {
- return new DataOutputStream( compression ? new GZIPOutputStream(out) : out ) ;
- } else {
- if ( gzip_outside ) {
- return new DataOutputStream( compression ? new GZIPOutputStream(new BufferedOutputStream(out, buffer_size)) : new BufferedOutputStream(out, buffer_size) ) ;
- } else {
- return new DataOutputStream( compression ? new BufferedOutputStream(new GZIPOutputStream(out, buffer_size)) : new BufferedOutputStream(out, buffer_size) ) ;
- }
- }
-
- } catch (IOException e) {
- throw new AtlasException(e) ;
- }
- }
-
- public static DataInputStream createDataInputStream(InputStream in) {
- try {
- if ( no_buffer ) {
- return new DataInputStream( compression ? new GZIPInputStream(in) : in ) ;
- } else {
- if ( gzip_outside ) {
- return new DataInputStream( compression ? new GZIPInputStream(new BufferedInputStream(in, buffer_size)) : new BufferedInputStream(in, buffer_size) ) ;
- } else {
- return new DataInputStream( compression ? new BufferedInputStream(new GZIPInputStream(in, buffer_size)) : new BufferedInputStream(in, buffer_size) ) ;
- }
- }
- } catch (IOException e) {
- throw new AtlasException(e) ;
- }
- }
}
Modified: incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/CustomLabelToNode.java
URL: http://svn.apache.org/viewvc/incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/CustomLabelToNode.java?rev=1174932&r1=1174931&r2=1174932&view=diff
==============================================================================
--- incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/CustomLabelToNode.java (original)
+++ incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/CustomLabelToNode.java Fri Sep 23 18:15:35 2011
@@ -56,7 +56,7 @@ public class CustomLabelToNode extends L
@Override
public Node create(String label) {
- String strLabel = "mrbnode_" + runId.hashCode() + "_" + filename.hashCode() + "_" + label;
+ String strLabel = "tdbloader2_" + runId.hashCode() + "_" + filename.hashCode() + "_" + label;
log.debug ("create({}) = {}", label, strLabel);
return Node.createAnon(new AnonId(strLabel)) ;
}
Added: incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/DataStreamFactory.java
URL: http://svn.apache.org/viewvc/incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/DataStreamFactory.java?rev=1174932&view=auto
==============================================================================
--- incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/DataStreamFactory.java (added)
+++ incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/DataStreamFactory.java Fri Sep 23 18:15:35 2011
@@ -0,0 +1,73 @@
+package org.apache.jena.tdbloader2;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+import org.openjena.atlas.AtlasException;
+
+public class DataStreamFactory {
+
+ private static boolean use_buffered_streams = true ;
+ private static boolean use_compression = true ;
+ private static boolean gzip_outside = true ;
+ private static int buffer_size = 8192 ; // bytes
+
+ public static DataOutputStream createDataOutputStream(OutputStream out) {
+ return createDataOutputStream(out, getBuffered(), getGZIPOutside(), getUseCompression(), getBufferSize());
+ }
+
+ public static DataInputStream createDataInputStream(InputStream in) {
+ return createDataInputStream(in, getBuffered(), getGZIPOutside(), getUseCompression(), getBufferSize()) ;
+ }
+
+ private static boolean getBuffered() { return use_buffered_streams ; }
+ public static void setBuffered(boolean buffered) { use_buffered_streams = buffered ; }
+ private static boolean getUseCompression() { return use_compression ; }
+ public static void setUseCompression(boolean compression) { use_compression = compression ; }
+ private static boolean getGZIPOutside() { return gzip_outside; }
+ public static void setGZIPOutside(boolean outside) { gzip_outside = outside ; }
+ private static int getBufferSize() { return buffer_size ; }
+ public static void setBufferSize(int size) { buffer_size = size ; }
+
+
+ public static DataOutputStream createDataOutputStream(OutputStream out, boolean buffered, boolean gzip_outside, boolean compression, int buffer_size) {
+ try {
+ if ( ! buffered ) {
+ return new DataOutputStream( compression ? new GZIPOutputStream(out) : out ) ;
+ } else {
+ if ( gzip_outside ) {
+ return new DataOutputStream( compression ? new GZIPOutputStream(new BufferedOutputStream(out, buffer_size)) : new BufferedOutputStream(out, buffer_size) ) ;
+ } else {
+ return new DataOutputStream( compression ? new BufferedOutputStream(new GZIPOutputStream(out, buffer_size)) : new BufferedOutputStream(out, buffer_size) ) ;
+ }
+ }
+
+ } catch (IOException e) {
+ throw new AtlasException(e) ;
+ }
+ }
+
+ public static DataInputStream createDataInputStream(InputStream in, boolean buffered, boolean gzip_outside, boolean compression, int buffer_size) {
+ try {
+ if ( ! buffered ) {
+ return new DataInputStream( compression ? new GZIPInputStream(in) : in ) ;
+ } else {
+ if ( gzip_outside ) {
+ return new DataInputStream( compression ? new GZIPInputStream(new BufferedInputStream(in, buffer_size)) : new BufferedInputStream(in, buffer_size) ) ;
+ } else {
+ return new DataInputStream( compression ? new BufferedInputStream(new GZIPInputStream(in, buffer_size)) : new BufferedInputStream(in, buffer_size) ) ;
+ }
+ }
+ } catch (IOException e) {
+ throw new AtlasException(e) ;
+ }
+ }
+
+}
Propchange: incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/DataStreamFactory.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/MultiThreadedSortedDataBag.java
URL: http://svn.apache.org/viewvc/incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/MultiThreadedSortedDataBag.java?rev=1174932&r1=1174931&r2=1174932&view=diff
==============================================================================
--- incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/MultiThreadedSortedDataBag.java (original)
+++ incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/MultiThreadedSortedDataBag.java Fri Sep 23 18:15:35 2011
@@ -92,7 +92,7 @@ public class MultiThreadedSortedDataBag<
}
}
- private void spill()
+ public void spill()
{
// Make sure we have something to spill.
if (memory.size() > 0)
Modified: incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/NodeTableBuilder2.java
URL: http://svn.apache.org/viewvc/incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/NodeTableBuilder2.java?rev=1174932&r1=1174931&r2=1174932&view=diff
==============================================================================
--- incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/NodeTableBuilder2.java (original)
+++ incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/NodeTableBuilder2.java Fri Sep 23 18:15:35 2011
@@ -24,6 +24,7 @@ import static com.hp.hpl.jena.tdb.sys.Sy
import java.io.File;
import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Iterator;
@@ -37,6 +38,7 @@ import org.openjena.atlas.iterator.Trans
import org.openjena.atlas.lib.Bytes;
import org.openjena.atlas.lib.Pair;
import org.openjena.atlas.lib.Sink;
+import org.openjena.atlas.lib.StrUtils;
import org.openjena.atlas.lib.Tuple;
import org.slf4j.Logger;
@@ -54,7 +56,8 @@ import com.hp.hpl.jena.tdb.base.record.R
import com.hp.hpl.jena.tdb.index.bplustree.BPlusTree;
import com.hp.hpl.jena.tdb.index.bplustree.BPlusTreeParams;
import com.hp.hpl.jena.tdb.index.bplustree.BPlusTreeRewriter;
-import com.hp.hpl.jena.tdb.lib.NodeLib;
+import com.hp.hpl.jena.tdb.nodetable.Nodec;
+import com.hp.hpl.jena.tdb.nodetable.NodecSSE;
import com.hp.hpl.jena.tdb.solver.stats.StatsCollectorNodeId;
import com.hp.hpl.jena.tdb.store.DatasetGraphTDB;
import com.hp.hpl.jena.tdb.store.Hash;
@@ -104,36 +107,37 @@ public class NodeTableBuilder2 implement
} catch (NoSuchAlgorithmException e) {
throw new AtlasException(e) ;
}
-
- this.log = monitor.getLogger() ;
+ this.log = monitor.getLogger() ;
}
+ public StatsCollectorNodeId getCollector() { return stats ; }
+
@Override
public void send(Quad quad)
{
- Node s = quad.getSubject() ;
- Node p = quad.getPredicate() ;
- Node o = quad.getObject() ;
- Node g = null ;
- // Union graph?!
- if ( ! quad.isTriple() && ! quad.isDefaultGraph() )
- g = quad.getGraph() ;
-
try {
+ byte[] s = tdbloader2.serialize(quad.getSubject()).getBytes("UTF-8") ;
+ byte[] p = tdbloader2.serialize(quad.getPredicate()).getBytes("UTF-8") ;
+ byte[] o = tdbloader2.serialize(quad.getObject()).getBytes("UTF-8") ;
+ byte[] g = null ;
+ // Union graph?!
+ if ( ! quad.isTriple() && ! quad.isDefaultGraph() )
+ g = tdbloader2.serialize(quad.getGraph()).getBytes("UTF-8") ;
+
digest.reset() ;
- digest.update(s.toString().getBytes("UTF-8")) ; // TODO: should we do something better here?
- digest.update(p.toString().getBytes("UTF-8")) ;
- digest.update(o.toString().getBytes("UTF-8")) ;
+ digest.update(s) ; // TODO: should we do something better here?
+ digest.update(p) ;
+ digest.update(o) ;
if ( g != null )
digest.update(g.toString().getBytes("UTF-8")) ;
String md5 = new String(Hex.encodeHex(digest.digest())) ;
- sdb01.add(new Pair<byte[], byte[]>(tdbloader2.serialize(s).getBytes("UTF-8"), (md5 + "|s").getBytes("UTF-8"))) ;
- sdb01.add(new Pair<byte[], byte[]>(tdbloader2.serialize(p).getBytes("UTF-8"), (md5 + "|p").getBytes("UTF-8"))) ;
- sdb01.add(new Pair<byte[], byte[]>(tdbloader2.serialize(o).getBytes("UTF-8"), (md5 + "|o").getBytes("UTF-8"))) ;
+ sdb01.add(new Pair<byte[], byte[]>(s, (md5 + "|s").getBytes("UTF-8"))) ;
+ sdb01.add(new Pair<byte[], byte[]>(p, (md5 + "|p").getBytes("UTF-8"))) ;
+ sdb01.add(new Pair<byte[], byte[]>(o, (md5 + "|o").getBytes("UTF-8"))) ;
if ( g != null )
- sdb01.add(new Pair<byte[], byte[]>(tdbloader2.serialize(g).getBytes("UTF-8"), (md5 + "|g").getBytes("UTF-8"))) ;
+ sdb01.add(new Pair<byte[], byte[]>(g, (md5 + "|g").getBytes("UTF-8"))) ;
} catch (UnsupportedEncodingException e) {
throw new AtlasException(e) ;
}
@@ -151,6 +155,18 @@ public class NodeTableBuilder2 implement
public void close() {
flush() ;
+ // nodes.dat
+ buildNodesObjectFile() ;
+ generateSortedHashNodeIdDataBag() ;
+ // node2id.dat and node2id.idn
+ buildNodeTableBPTreeIndex() ;
+
+ outputTriples.flush() ;
+ outputQuads.flush() ;
+ objects.sync() ;
+ }
+
+ private void buildNodesObjectFile() {
try {
log.info("Node Table (1/3): building nodes.dat and sorting hash|id ...") ;
ProgressLogger monitor01 = new ProgressLogger(log, "records for node table (1/3) phase", BulkLoader.DataTickPoint,BulkLoader.superTick) ;
@@ -166,7 +182,7 @@ public class NodeTableBuilder2 implement
curr = leftIn ;
// generate the node id
Node node = tdbloader2.parse(leftIn) ;
- id = NodeLib.encodeStore(node, objects) ;
+ id = encodeStore(node, objects) ;
// add to hash|id
Hash hash = new Hash(SystemTDB.LenNodeHash);
setHash(hash, node);
@@ -181,15 +197,49 @@ public class NodeTableBuilder2 implement
sdb02.add(pair02) ;
monitor01.tick() ;
}
+ ProgressLogger.print ( log, monitor01 ) ;
+ } catch (UnsupportedEncodingException e) {
+ throw new AtlasException(e) ;
+ } finally {
sdb01.close() ;
sdb01 = null ;
- tdbloader2.print ( monitor01 ) ;
-
+ }
+ }
+
+ private static Nodec nodec = new NodecSSE() ;
+ final private static char MarkerChar = '_' ;
+ final private static char[] invalidIRIChars = { MarkerChar , ' ' } ;
+ private long encodeStore (Node node, ObjectFile objects) {
+ int maxSize = nodec.maxSize(node) ;
+ ByteBuffer bb = objects.allocWrite(maxSize) ;
+
+ if ( node.isURI() )
+ {
+ // Pesky spaces etc
+ String x = StrUtils.encodeHex(node.getURI(), MarkerChar, invalidIRIChars) ;
+ if ( x != node.getURI() )
+ node = Node.createURI(x) ;
+ }
+
+ // Node->String
+ String str = tdbloader2.serialize(node) ;
+ // String -> bytes
+ int x = Bytes.toByteBuffer(str, bb) ;
+ bb.position(0) ; // Around the space used
+ bb.limit(x) ; // The space we have used.
+
+ long id = objects.completeWrite(bb) ;
+ return id ;
+ }
+
+
+ private void generateSortedHashNodeIdDataBag() {
+ try {
log.info("Node Table (2/3): generating input data using node ids...") ;
final ProgressLogger monitor02 = new ProgressLogger(log, "records for node table (2/3) phase", BulkLoader.DataTickPoint,BulkLoader.superTick) ;
monitor02.start() ;
Iterator<Pair<byte[], byte[]>> iter02 = sdb02.iterator() ;
- curr = null ;
+ String curr = null ;
Long s = null ;
Long p = null ;
Long o = null ;
@@ -217,51 +267,51 @@ public class NodeTableBuilder2 implement
}
write (g, s, p, o) ; // ensure we write the last triple|quad
- sdb02.close() ;
- sdb02 = null ;
- tdbloader2.print ( monitor02 ) ;
+ ProgressLogger.print ( log, monitor02 ) ;
} catch (UnsupportedEncodingException e) {
throw new AtlasException(e) ;
+ } finally {
+ sdb02.close() ;
+ sdb02 = null ;
}
-
-
- // Node table B+Tree index (i.e. node2id.dat/idn)
- log.info("Node Table (3/3): building node table B+Tree index (i.e. node2id.dat and node2id.idn files)...") ;
- final ProgressLogger monitor03 = new ProgressLogger(log, "records for node table (3/3) phase", BulkLoader.DataTickPoint,BulkLoader.superTick) ;
- monitor03.start() ;
- String path = dsg.getLocation().getDirectoryPath() ;
- new File(path, "node2id.dat").delete() ;
- new File(path, "node2id.idn").delete() ;
-
- final RecordFactory recordFactory = new RecordFactory(LenNodeHash, SizeOfNodeId) ;
- Transform<Pair<byte[], byte[]>, Record> transformPair2Record = new Transform<Pair<byte[], byte[]>, Record>() {
- @Override public Record convert(Pair<byte[], byte[]> pair) {
- monitor03.tick() ;
- return recordFactory.create(pair.getLeft(), pair.getRight()) ;
- }
- };
+ }
- int order = BPlusTreeParams.calcOrder(SystemTDB.BlockSize, recordFactory) ;
- BPlusTreeParams bptParams = new BPlusTreeParams(order, recordFactory) ;
- int readCacheSize = 10 ;
- int writeCacheSize = 100 ;
- FileSet destination = new FileSet(dsg.getLocation(), Names.indexNode2Id) ;
- BlockMgr blkMgrNodes = BlockMgrFactory.create(destination, Names.bptExt1, SystemTDB.BlockSize, readCacheSize, writeCacheSize) ;
- BlockMgr blkMgrRecords = BlockMgrFactory.create(destination, Names.bptExt2, SystemTDB.BlockSize, readCacheSize, writeCacheSize) ;
- Iterator<Record> iter2 = Iter.iter(sdb03.iterator()).map(transformPair2Record) ;
- BPlusTree bpt2 = BPlusTreeRewriter.packIntoBPlusTree(iter2, bptParams, recordFactory, blkMgrNodes, blkMgrRecords) ;
- bpt2.sync() ;
- sdb03.close() ;
- sdb03 = null ;
- tdbloader2.print ( monitor03 ) ;
+ private void buildNodeTableBPTreeIndex() {
+ try {
+ // Node table B+Tree index (i.e. node2id.dat/idn)
+ log.info("Node Table (3/3): building node table B+Tree index (i.e. node2id.dat and node2id.idn files)...") ;
+ final ProgressLogger monitor03 = new ProgressLogger(log, "records for node table (3/3) phase", BulkLoader.DataTickPoint,BulkLoader.superTick) ;
+ monitor03.start() ;
+ String path = dsg.getLocation().getDirectoryPath() ;
+ new File(path, "node2id.dat").delete() ;
+ new File(path, "node2id.idn").delete() ;
+
+ final RecordFactory recordFactory = new RecordFactory(LenNodeHash, SizeOfNodeId) ;
+ Transform<Pair<byte[], byte[]>, Record> transformPair2Record = new Transform<Pair<byte[], byte[]>, Record>() {
+ @Override public Record convert(Pair<byte[], byte[]> pair) {
+ monitor03.tick() ;
+ return recordFactory.create(pair.getLeft(), pair.getRight()) ;
+ }
+ };
- outputTriples.flush() ;
- outputQuads.flush() ;
- objects.sync() ;
+ int order = BPlusTreeParams.calcOrder(SystemTDB.BlockSize, recordFactory) ;
+ BPlusTreeParams bptParams = new BPlusTreeParams(order, recordFactory) ;
+ int readCacheSize = 10 ;
+ int writeCacheSize = 100 ;
+ FileSet destination = new FileSet(dsg.getLocation(), Names.indexNode2Id) ;
+ BlockMgr blkMgrNodes = BlockMgrFactory.create(destination, Names.bptExt1, SystemTDB.BlockSize, readCacheSize, writeCacheSize) ;
+ BlockMgr blkMgrRecords = BlockMgrFactory.create(destination, Names.bptExt2, SystemTDB.BlockSize, readCacheSize, writeCacheSize) ;
+ Iterator<Record> iter2 = Iter.iter(sdb03.iterator()).map(transformPair2Record) ;
+ BPlusTree bpt2 = BPlusTreeRewriter.packIntoBPlusTree(iter2, bptParams, recordFactory, blkMgrNodes, blkMgrRecords) ;
+ bpt2.sync() ;
+
+ ProgressLogger.print ( log, monitor03 ) ;
+ } finally {
+ sdb03.close() ;
+ sdb03 = null ;
+ }
}
-
- public StatsCollectorNodeId getCollector() { return stats ; }
-
+
private void write (Long g, Long s, Long p, Long o) {
// System.out.println ("> ( " + g + ", " + s + ", " + p + ", " + o + " )") ;
if ( g != null ) {
Modified: incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/PairInputStream.java
URL: http://svn.apache.org/viewvc/incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/PairInputStream.java?rev=1174932&r1=1174931&r2=1174932&view=diff
==============================================================================
--- incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/PairInputStream.java (original)
+++ incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/PairInputStream.java Fri Sep 23 18:15:35 2011
@@ -27,15 +27,13 @@ import org.openjena.atlas.AtlasException
import org.openjena.atlas.lib.Closeable;
import org.openjena.atlas.lib.Pair;
-import cmd.tdbloader2;
-
public class PairInputStream implements Iterator<Pair<byte[], byte[]>>, Closeable {
private DataInputStream in ;
private Pair<byte[], byte[]> slot = null ;
public PairInputStream(InputStream in) {
- this.in = tdbloader2.createDataInputStream(in) ;
+ this.in = DataStreamFactory.createDataInputStream(in) ;
slot = readNext() ;
}
Modified: incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/PairOutputStream.java
URL: http://svn.apache.org/viewvc/incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/PairOutputStream.java?rev=1174932&r1=1174931&r2=1174932&view=diff
==============================================================================
--- incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/PairOutputStream.java (original)
+++ incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/PairOutputStream.java Fri Sep 23 18:15:35 2011
@@ -26,14 +26,12 @@ import org.openjena.atlas.AtlasException
import org.openjena.atlas.lib.Pair;
import org.openjena.atlas.lib.Sink;
-import cmd.tdbloader2;
-
public class PairOutputStream implements Sink<Pair<byte[], byte[]>> {
private DataOutputStream out ;
public PairOutputStream(OutputStream out) {
- this.out = tdbloader2.createDataOutputStream(out) ;
+ this.out = DataStreamFactory.createDataOutputStream(out) ;
}
@Override
Modified: incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/ProgressLogger.java
URL: http://svn.apache.org/viewvc/incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/ProgressLogger.java?rev=1174932&r1=1174931&r2=1174932&view=diff
==============================================================================
--- incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/ProgressLogger.java (original)
+++ incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/ProgressLogger.java Fri Sep 23 18:15:35 2011
@@ -108,11 +108,19 @@ public class ProgressLogger
}
}
-
-
static boolean tickPoint(long counter, long quantum)
{
return counter%quantum == 0 ;
}
+ public static void print ( Logger log, ProgressLogger monitor )
+ {
+ long time = monitor.finish() ;
+ long total = monitor.getTicks() ;
+ float elapsedSecs = time/1000F ;
+ float rate = (elapsedSecs!=0) ? total/elapsedSecs : 0 ;
+ String str = String.format("Total: %,d tuples : %,.2f seconds : %,.2f tuples/sec [%s]", total, elapsedSecs, rate, nowAsString()) ;
+ log.info(str) ;
+ }
+
}
Modified: incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/TupleInputStream.java
URL: http://svn.apache.org/viewvc/incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/TupleInputStream.java?rev=1174932&r1=1174931&r2=1174932&view=diff
==============================================================================
--- incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/TupleInputStream.java (original)
+++ incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/TupleInputStream.java Fri Sep 23 18:15:35 2011
@@ -27,8 +27,6 @@ import org.openjena.atlas.AtlasException
import org.openjena.atlas.lib.Closeable;
import org.openjena.atlas.lib.Tuple;
-import cmd.tdbloader2;
-
public class TupleInputStream implements Iterator<Tuple<Long>>, Closeable {
private DataInputStream in ;
@@ -36,7 +34,7 @@ public class TupleInputStream implements
private Tuple<Long> slot = null ;
public TupleInputStream(InputStream in, int size) {
- this.in = tdbloader2.createDataInputStream(in) ;
+ this.in = DataStreamFactory.createDataInputStream(in) ;
this.size = size ;
slot = readNext() ;
}
Modified: incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/TupleOutputStream.java
URL: http://svn.apache.org/viewvc/incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/TupleOutputStream.java?rev=1174932&r1=1174931&r2=1174932&view=diff
==============================================================================
--- incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/TupleOutputStream.java (original)
+++ incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/TupleOutputStream.java Fri Sep 23 18:15:35 2011
@@ -27,14 +27,12 @@ import org.openjena.atlas.AtlasException
import org.openjena.atlas.lib.Sink;
import org.openjena.atlas.lib.Tuple;
-import cmd.tdbloader2;
-
public class TupleOutputStream implements Sink<Tuple<Long>> {
private DataOutputStream out ;
public TupleOutputStream(OutputStream out) {
- this.out = tdbloader2.createDataOutputStream(out) ;
+ this.out = DataStreamFactory.createDataOutputStream(out) ;
}
@Override
Modified: incubator/jena/Scratch/PC/tdbloader2/trunk/src/test/java/cmd/TestTDBLoader2.java
URL: http://svn.apache.org/viewvc/incubator/jena/Scratch/PC/tdbloader2/trunk/src/test/java/cmd/TestTDBLoader2.java?rev=1174932&r1=1174931&r2=1174932&view=diff
==============================================================================
--- incubator/jena/Scratch/PC/tdbloader2/trunk/src/test/java/cmd/TestTDBLoader2.java (original)
+++ incubator/jena/Scratch/PC/tdbloader2/trunk/src/test/java/cmd/TestTDBLoader2.java Fri Sep 23 18:15:35 2011
@@ -98,7 +98,8 @@ public class TestTDBLoader2 {
ArrayList<String> datafiles = new ArrayList<String>();
File files[] = path.listFiles();
for (File file : files) {
- datafiles.add(file.getAbsolutePath());
+ if ( file.isFile() )
+ datafiles.add(file.getAbsolutePath());
}
arguments.addAll(datafiles);
args = arguments.toArray(new String[]{}) ;