You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by ca...@apache.org on 2011/09/26 21:05:25 UTC
svn commit: r1176000 - in /incubator/jena/Scratch/PC/tdbloader2/trunk/src:
main/java/cmd/ main/java/org/apache/jena/tdbloader2/ test/java/cmd/
Author: castagna
Date: Mon Sep 26 19:05:25 2011
New Revision: 1176000
URL: http://svn.apache.org/viewvc?rev=1176000&view=rev
Log:
JENA-117
Added:
incubator/jena/Scratch/PC/tdbloader2/trunk/src/test/java/cmd/TS_TDBLoader2.java (with props)
Modified:
incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/cmd/tdbloader2.java
incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/NodeTableBuilder2.java
incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/PairSerializationFactory.java
incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/QuadSerializationFactory.java
incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/TripleSerializationFactory.java
Modified: incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/cmd/tdbloader2.java
URL: http://svn.apache.org/viewvc/incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/cmd/tdbloader2.java?rev=1176000&r1=1175999&r2=1176000&view=diff
==============================================================================
--- incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/cmd/tdbloader2.java (original)
+++ incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/cmd/tdbloader2.java Mon Sep 26 19:05:25 2011
@@ -36,7 +36,10 @@ import org.apache.jena.tdbloader2.Triple
import org.apache.jena.tdbloader2.TupleComparator;
import org.openjena.atlas.AtlasException;
import org.openjena.atlas.data.DataBag;
+import org.openjena.atlas.data.SerializationFactory;
+import org.openjena.atlas.data.ThresholdPolicy;
import org.openjena.atlas.data.ThresholdPolicyCount;
+import org.openjena.atlas.data.ThresholdPolicyMemory;
import org.openjena.atlas.io.IO;
import org.openjena.atlas.iterator.Iter;
import org.openjena.atlas.iterator.Transform;
@@ -102,6 +105,7 @@ public class tdbloader2 extends CmdGener
private static ArgDecl argBufferSize = new ArgDecl(ArgDecl.HasValue, "buf", "buffer-size") ;
private static ArgDecl argGzipOutside = new ArgDecl(ArgDecl.NoValue, "gzip-outside") ;
private static ArgDecl argSpillSize = new ArgDecl(ArgDecl.HasValue, "spill", "spill-size") ;
+ private static ArgDecl argSpillSizeAuto = new ArgDecl(ArgDecl.NoValue, "spill-auto", "spill-size-auto") ;
private static ArgDecl argNoStats = new ArgDecl(ArgDecl.NoValue, "no-stats") ;
private static ArgDecl argNoBuffer = new ArgDecl(ArgDecl.NoValue, "no-buffer") ;
private static ArgDecl argMaxMergeFiles = new ArgDecl(ArgDecl.HasValue, "max-merge-files") ;
@@ -110,11 +114,12 @@ public class tdbloader2 extends CmdGener
private String locationString ;
private List<String> datafiles ;
public static int spill_size = 1000000 ;
+ public static boolean spill_size_auto = false ;
public static boolean no_stats = false ;
- private ThresholdPolicyCount<Tuple<Long>> policy ;
private Comparator<Tuple<Long>> comparator = new TupleComparator();
-
+ private TripleSerializationFactory tripleSerializationFactory = new TripleSerializationFactory() ;
+ private QuadSerializationFactory quadSerializationFactory = new QuadSerializationFactory() ;
public static void main(String...argv)
{
@@ -131,6 +136,7 @@ public class tdbloader2 extends CmdGener
super.add(argBufferSize, "--buffer-size", "The size of buffers for IO in bytes") ;
super.add(argGzipOutside, "--gzip-outside", "GZIP...(Buffered...())") ;
super.add(argSpillSize, "--spill-size", "The size of spillable segments in tuples|records") ;
+ super.add(argSpillSizeAuto, "--spill-size-auto", "Automatically set the size of spillable segments") ;
super.add(argNoStats, "--no-stats", "Do not generate the stats file") ;
super.add(argNoBuffer, "--no-buffer", "Do not use Buffered{Input|Output}Stream") ;
super.add(argMaxMergeFiles, "--max-merge-files", "Specify the maximum number of files to merge at the same time (default: 100)") ;
@@ -156,6 +162,8 @@ public class tdbloader2 extends CmdGener
DataStreamFactory.setBuffered( ! super.hasArg(argNoBuffer) ) ;
if ( super.hasArg(argMaxMergeFiles) )
MultiThreadedSortedDataBag.MAX_SPILL_FILES = Integer.valueOf(super.getValue(argMaxMergeFiles)) ;
+ if ( super.hasArg(argSpillSizeAuto) )
+ spill_size_auto = true ;
datafiles = super.getPositional() ;
@@ -169,7 +177,16 @@ public class tdbloader2 extends CmdGener
cmdError("File does not exist: "+filename) ;
}
- policy = new ThresholdPolicyCount<Tuple<Long>>(spill_size);
+ }
+
+ private ThresholdPolicy<Tuple<Long>> getThresholdPolicy(SerializationFactory<Tuple<Long>> serializationFactory) {
+ if ( spill_size_auto == true ) {
+ long memory = Math.round( Runtime.getRuntime().maxMemory() * 0.065 ) ; // in bytes
+ cmdLog.info("Threshold spill is: " + memory) ;
+ return new ThresholdPolicyMemory<Tuple<Long>>(memory, serializationFactory);
+ } else {
+ return new ThresholdPolicyCount<Tuple<Long>>(spill_size);
+ }
}
@Override
@@ -187,8 +204,8 @@ public class tdbloader2 extends CmdGener
ProgressLogger monitorTotal = new ProgressLogger(cmdLog, "tuples", BulkLoader.DataTickPoint,BulkLoader.superTick) ;
monitorTotal.start() ;
- DataBag<Tuple<Long>> outputTriples = new MultiThreadedSortedDataBag<Tuple<Long>>(policy, new TripleSerializationFactory(), comparator);
- DataBag<Tuple<Long>> outputQuads = new MultiThreadedSortedDataBag<Tuple<Long>>(policy, new QuadSerializationFactory(), comparator);
+ DataBag<Tuple<Long>> outputTriples = new MultiThreadedSortedDataBag<Tuple<Long>>(getThresholdPolicy(tripleSerializationFactory), new TripleSerializationFactory(), comparator);
+ DataBag<Tuple<Long>> outputQuads = new MultiThreadedSortedDataBag<Tuple<Long>>(getThresholdPolicy(quadSerializationFactory), new QuadSerializationFactory(), comparator);
// Node table and input data using node ids (rather than RDF node values)
Sink<Quad> sink = new NodeTableBuilder2(dsg, monitorTotal, outputTriples, outputQuads) ;
@@ -218,10 +235,10 @@ public class tdbloader2 extends CmdGener
}
sink.close() ;
- spill(outputTriples) ;
+ // spill(outputTriples) ;
bptSPO = createBPlusTreeIndex(Names.primaryIndexTriples, outputTriples) ;
- spill(outputQuads) ;
+ // spill(outputQuads) ;
bptGSPO = createBPlusTreeIndex(Names.primaryIndexQuads, outputQuads) ;
} finally {
outputTriples.close() ;
@@ -264,7 +281,7 @@ public class tdbloader2 extends CmdGener
return this.getClass().getName() ;
}
- private void spill ( DataBag<?> bag ) {
+ public static void spill ( DataBag<?> bag ) {
if ( bag instanceof MultiThreadedSortedDataBag<?> ) {
((MultiThreadedSortedDataBag<?>)bag).spill() ;
}
@@ -333,9 +350,9 @@ public class tdbloader2 extends CmdGener
DataBag<Tuple<Long>> outTuples ;
if ( size == 3 ) {
- outTuples = new MultiThreadedSortedDataBag<Tuple<Long>>(policy, new TripleSerializationFactory(), comparator);
+ outTuples = new MultiThreadedSortedDataBag<Tuple<Long>>(getThresholdPolicy(tripleSerializationFactory), tripleSerializationFactory, comparator);
} else {
- outTuples = new MultiThreadedSortedDataBag<Tuple<Long>>(policy, new QuadSerializationFactory(), comparator);
+ outTuples = new MultiThreadedSortedDataBag<Tuple<Long>>(getThresholdPolicy(quadSerializationFactory), quadSerializationFactory, comparator);
}
cmdLog.info("Index: sorting data for " + indexName + " index...") ;
Modified: incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/NodeTableBuilder2.java
URL: http://svn.apache.org/viewvc/incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/NodeTableBuilder2.java?rev=1176000&r1=1175999&r2=1176000&view=diff
==============================================================================
--- incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/NodeTableBuilder2.java (original)
+++ incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/NodeTableBuilder2.java Mon Sep 26 19:05:25 2011
@@ -18,6 +18,9 @@
package org.apache.jena.tdbloader2;
+import static cmd.tdbloader2.spill_size_auto;
+import static cmd.tdbloader2.spill_size;
+// import static cmd.tdbloader2.spill;
import static com.hp.hpl.jena.tdb.lib.NodeLib.setHash;
import static com.hp.hpl.jena.tdb.sys.SystemTDB.LenNodeHash;
import static com.hp.hpl.jena.tdb.sys.SystemTDB.SizeOfNodeId;
@@ -31,7 +34,10 @@ import java.util.Iterator;
import org.apache.commons.codec.binary.Hex;
import org.openjena.atlas.AtlasException;
import org.openjena.atlas.data.DataBag;
+import org.openjena.atlas.data.SerializationFactory;
+import org.openjena.atlas.data.ThresholdPolicy;
import org.openjena.atlas.data.ThresholdPolicyCount;
+import org.openjena.atlas.data.ThresholdPolicyMemory;
import org.openjena.atlas.iterator.Iter;
import org.openjena.atlas.iterator.Transform;
import org.openjena.atlas.lib.Bytes;
@@ -72,10 +78,10 @@ public class NodeTableBuilder2 implement
private ProgressLogger monitor ;
private StatsCollectorNodeId stats ;
- private ThresholdPolicyCount<Pair<byte[], byte[]>> policy = new ThresholdPolicyCount<Pair<byte[], byte[]>>(tdbloader2.spill_size) ;
private DataBag<Pair<byte[], byte[]>> sdb01 ;
private DataBag<Pair<byte[], byte[]>> sdb02 ;
private DataBag<Pair<byte[], byte[]>> sdb03 ;
+ private SerializationFactory<Pair<byte[], byte[]>> serializationFactory = new PairSerializationFactory() ;
private MessageDigest digest ;
@@ -87,6 +93,7 @@ public class NodeTableBuilder2 implement
this.dsg = dsg ;
this.monitor = monitor ;
+ this.log = monitor.getLogger() ;
String filename = new FileSet(dsg.getLocation(), Names.indexId2Node).filename(Names.extNodeData) ;
this.objects = FileFactory.createObjectFileDisk(filename) ;
@@ -95,20 +102,26 @@ public class NodeTableBuilder2 implement
this.outputQuads = outputQuads ;
this.stats = new StatsCollectorNodeId() ;
- this.sdb01 = new MultiThreadedSortedDataBag<Pair<byte[], byte[]>>(policy, new PairSerializationFactory(), new PairComparator());
- this.sdb02 = new MultiThreadedSortedDataBag<Pair<byte[], byte[]>>(policy, new PairSerializationFactory(), new PairComparator());
- this.sdb03 = new MultiThreadedSortedDataBag<Pair<byte[], byte[]>>(policy, new PairSerializationFactory(), new PairComparator());
+ this.sdb01 = new MultiThreadedSortedDataBag<Pair<byte[], byte[]>>(getThresholdPolicy(), serializationFactory, new PairComparator());
try {
this.digest = MessageDigest.getInstance("MD5") ;
} catch (NoSuchAlgorithmException e) {
throw new AtlasException(e) ;
}
-
- this.log = monitor.getLogger() ;
}
public StatsCollectorNodeId getCollector() { return stats ; }
+
+ private ThresholdPolicy<Pair<byte[], byte[]>> getThresholdPolicy() {
+ if ( spill_size_auto == true ) {
+ long memory = Math.round( Runtime.getRuntime().maxMemory() * 0.065 ) ; // in bytes
+ log.info("Threshold spill is: " + memory) ;
+ return new ThresholdPolicyMemory<Pair<byte[], byte[]>>(memory, serializationFactory);
+ } else {
+ return new ThresholdPolicyCount<Pair<byte[], byte[]>>(spill_size);
+ }
+ }
@Override
public void send(Quad quad)
@@ -164,6 +177,12 @@ public class NodeTableBuilder2 implement
}
private void buildNodesObjectFile() {
+
+ // spill(sdb01) ;
+
+ this.sdb02 = new MultiThreadedSortedDataBag<Pair<byte[], byte[]>>(getThresholdPolicy(), serializationFactory, new PairComparator());
+ this.sdb03 = new MultiThreadedSortedDataBag<Pair<byte[], byte[]>>(getThresholdPolicy(), serializationFactory, new PairComparator());
+
try {
log.info("Node Table (1/3): building nodes.dat and sorting hash|id ...") ;
ProgressLogger monitor01 = new ProgressLogger(log, "records for node table (1/3) phase", BulkLoader.DataTickPoint,BulkLoader.superTick) ;
@@ -200,6 +219,8 @@ public class NodeTableBuilder2 implement
} finally {
sdb01.close() ;
sdb01 = null ;
+// spill (sdb02) ;
+// spill (sdb03) ;
}
}
Modified: incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/PairSerializationFactory.java
URL: http://svn.apache.org/viewvc/incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/PairSerializationFactory.java?rev=1176000&r1=1175999&r2=1176000&view=diff
==============================================================================
--- incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/PairSerializationFactory.java (original)
+++ incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/PairSerializationFactory.java Mon Sep 26 19:05:25 2011
@@ -22,7 +22,6 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.util.Iterator;
-import org.openjena.atlas.AtlasException;
import org.openjena.atlas.data.SerializationFactory;
import org.openjena.atlas.lib.Pair;
import org.openjena.atlas.lib.Sink;
@@ -30,5 +29,5 @@ import org.openjena.atlas.lib.Sink;
public class PairSerializationFactory implements SerializationFactory<Pair<byte[], byte[]>> {
@Override public Iterator<Pair<byte[], byte[]>> createDeserializer(InputStream in) { return new PairInputStream(in); }
@Override public Sink<Pair<byte[], byte[]>> createSerializer(OutputStream out) { return new PairOutputStream(out); }
- @Override public long getEstimatedMemorySize(Pair<byte[], byte[]> item) { throw new AtlasException("Method not implemented.") ; }
+ @Override public long getEstimatedMemorySize(Pair<byte[], byte[]> item) { return 8 + item.getLeft().length + item.getRight().length ; } // 8 because 4 bytes for the length of each of the pair items
}
Modified: incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/QuadSerializationFactory.java
URL: http://svn.apache.org/viewvc/incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/QuadSerializationFactory.java?rev=1176000&r1=1175999&r2=1176000&view=diff
==============================================================================
--- incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/QuadSerializationFactory.java (original)
+++ incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/QuadSerializationFactory.java Mon Sep 26 19:05:25 2011
@@ -22,7 +22,6 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.util.Iterator;
-import org.openjena.atlas.AtlasException;
import org.openjena.atlas.data.SerializationFactory;
import org.openjena.atlas.lib.Sink;
import org.openjena.atlas.lib.Tuple;
@@ -30,5 +29,5 @@ import org.openjena.atlas.lib.Tuple;
public class QuadSerializationFactory implements SerializationFactory<Tuple<Long>> {
@Override public Iterator<Tuple<Long>> createDeserializer(InputStream in) { return new TupleInputStream(in, 4); }
@Override public Sink<Tuple<Long>> createSerializer(OutputStream out) { return new TupleOutputStream(out); }
- @Override public long getEstimatedMemorySize(Tuple<Long> item) { throw new AtlasException("Method not implemented.") ; }
+ @Override public long getEstimatedMemorySize(Tuple<Long> item) { return 32L ; } // 8 * 4 = 32 bytes
}
\ No newline at end of file
Modified: incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/TripleSerializationFactory.java
URL: http://svn.apache.org/viewvc/incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/TripleSerializationFactory.java?rev=1176000&r1=1175999&r2=1176000&view=diff
==============================================================================
--- incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/TripleSerializationFactory.java (original)
+++ incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/TripleSerializationFactory.java Mon Sep 26 19:05:25 2011
@@ -22,7 +22,6 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.util.Iterator;
-import org.openjena.atlas.AtlasException;
import org.openjena.atlas.data.SerializationFactory;
import org.openjena.atlas.lib.Sink;
import org.openjena.atlas.lib.Tuple;
@@ -30,5 +29,5 @@ import org.openjena.atlas.lib.Tuple;
public class TripleSerializationFactory implements SerializationFactory<Tuple<Long>> {
@Override public Iterator<Tuple<Long>> createDeserializer(InputStream in) { return new TupleInputStream(in, 3); }
@Override public Sink<Tuple<Long>> createSerializer(OutputStream out) { return new TupleOutputStream(out); }
- @Override public long getEstimatedMemorySize(Tuple<Long> item) { throw new AtlasException("Method not implemented.") ; }
+ @Override public long getEstimatedMemorySize(Tuple<Long> item) { return 24L ; } // 8 * 3 = 24 bytes
}
\ No newline at end of file
Added: incubator/jena/Scratch/PC/tdbloader2/trunk/src/test/java/cmd/TS_TDBLoader2.java
URL: http://svn.apache.org/viewvc/incubator/jena/Scratch/PC/tdbloader2/trunk/src/test/java/cmd/TS_TDBLoader2.java?rev=1176000&view=auto
==============================================================================
--- incubator/jena/Scratch/PC/tdbloader2/trunk/src/test/java/cmd/TS_TDBLoader2.java (added)
+++ incubator/jena/Scratch/PC/tdbloader2/trunk/src/test/java/cmd/TS_TDBLoader2.java Mon Sep 26 19:05:25 2011
@@ -0,0 +1,15 @@
+package cmd;
+
+import org.apache.jena.tdbloader2.TestMultiThreadedSortedDataBag;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+@RunWith(Suite.class)
+@Suite.SuiteClasses( {
+
+ TestMultiThreadedSortedDataBag.class,
+ TestTDBLoader2.class
+
+})
+
+public class TS_TDBLoader2 {}
\ No newline at end of file
Propchange: incubator/jena/Scratch/PC/tdbloader2/trunk/src/test/java/cmd/TS_TDBLoader2.java
------------------------------------------------------------------------------
svn:mime-type = text/plain