You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by ca...@apache.org on 2011/09/26 21:05:25 UTC

svn commit: r1176000 - in /incubator/jena/Scratch/PC/tdbloader2/trunk/src: main/java/cmd/ main/java/org/apache/jena/tdbloader2/ test/java/cmd/

Author: castagna
Date: Mon Sep 26 19:05:25 2011
New Revision: 1176000

URL: http://svn.apache.org/viewvc?rev=1176000&view=rev
Log:
JENA-117

Added:
    incubator/jena/Scratch/PC/tdbloader2/trunk/src/test/java/cmd/TS_TDBLoader2.java   (with props)
Modified:
    incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/cmd/tdbloader2.java
    incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/NodeTableBuilder2.java
    incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/PairSerializationFactory.java
    incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/QuadSerializationFactory.java
    incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/TripleSerializationFactory.java

Modified: incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/cmd/tdbloader2.java
URL: http://svn.apache.org/viewvc/incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/cmd/tdbloader2.java?rev=1176000&r1=1175999&r2=1176000&view=diff
==============================================================================
--- incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/cmd/tdbloader2.java (original)
+++ incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/cmd/tdbloader2.java Mon Sep 26 19:05:25 2011
@@ -36,7 +36,10 @@ import org.apache.jena.tdbloader2.Triple
 import org.apache.jena.tdbloader2.TupleComparator;
 import org.openjena.atlas.AtlasException;
 import org.openjena.atlas.data.DataBag;
+import org.openjena.atlas.data.SerializationFactory;
+import org.openjena.atlas.data.ThresholdPolicy;
 import org.openjena.atlas.data.ThresholdPolicyCount;
+import org.openjena.atlas.data.ThresholdPolicyMemory;
 import org.openjena.atlas.io.IO;
 import org.openjena.atlas.iterator.Iter;
 import org.openjena.atlas.iterator.Transform;
@@ -102,6 +105,7 @@ public class tdbloader2 extends CmdGener
     private static ArgDecl argBufferSize    = new ArgDecl(ArgDecl.HasValue, "buf", "buffer-size") ;
     private static ArgDecl argGzipOutside   = new ArgDecl(ArgDecl.NoValue,  "gzip-outside") ;
     private static ArgDecl argSpillSize     = new ArgDecl(ArgDecl.HasValue, "spill", "spill-size") ;
+    private static ArgDecl argSpillSizeAuto = new ArgDecl(ArgDecl.NoValue,  "spill-auto", "spill-size-auto") ;
     private static ArgDecl argNoStats       = new ArgDecl(ArgDecl.NoValue,  "no-stats") ;
     private static ArgDecl argNoBuffer      = new ArgDecl(ArgDecl.NoValue,  "no-buffer") ;
     private static ArgDecl argMaxMergeFiles = new ArgDecl(ArgDecl.HasValue, "max-merge-files") ;
@@ -110,11 +114,12 @@ public class tdbloader2 extends CmdGener
     private String locationString ;
     private List<String> datafiles ;
     public static int spill_size = 1000000 ;
+    public static boolean spill_size_auto = false ;
     public static boolean no_stats = false ;
     
-    private ThresholdPolicyCount<Tuple<Long>> policy ;
     private Comparator<Tuple<Long>> comparator = new TupleComparator();
-    
+    private TripleSerializationFactory tripleSerializationFactory = new TripleSerializationFactory() ;
+    private QuadSerializationFactory quadSerializationFactory = new QuadSerializationFactory() ;
     
     public static void main(String...argv)
     {
@@ -131,6 +136,7 @@ public class tdbloader2 extends CmdGener
         super.add(argBufferSize,    "--buffer-size",       "The size of buffers for IO in bytes") ;
         super.add(argGzipOutside,   "--gzip-outside",      "GZIP...(Buffered...())") ;
         super.add(argSpillSize,     "--spill-size",        "The size of spillable segments in tuples|records") ;
+        super.add(argSpillSizeAuto, "--spill-size-auto",   "Automatically set the size of spillable segments") ;
         super.add(argNoStats,       "--no-stats",          "Do not generate the stats file") ;
         super.add(argNoBuffer,      "--no-buffer",         "Do not use Buffered{Input|Output}Stream") ;
         super.add(argMaxMergeFiles, "--max-merge-files",   "Specify the maximum number of files to merge at the same time (default: 100)") ;
@@ -156,6 +162,8 @@ public class tdbloader2 extends CmdGener
         DataStreamFactory.setBuffered( ! super.hasArg(argNoBuffer) ) ;
         if ( super.hasArg(argMaxMergeFiles) )
             MultiThreadedSortedDataBag.MAX_SPILL_FILES = Integer.valueOf(super.getValue(argMaxMergeFiles)) ;
+        if ( super.hasArg(argSpillSizeAuto) ) 
+            spill_size_auto = true ;
         
         datafiles  = super.getPositional() ;
 
@@ -169,7 +177,16 @@ public class tdbloader2 extends CmdGener
                 cmdError("File does not exist: "+filename) ;
         }
 
-        policy = new ThresholdPolicyCount<Tuple<Long>>(spill_size);
+    }
+    
+    private ThresholdPolicy<Tuple<Long>> getThresholdPolicy(SerializationFactory<Tuple<Long>> serializationFactory) {
+        if ( spill_size_auto == true ) {
+            long memory = Math.round( Runtime.getRuntime().maxMemory() * 0.065 ) ; // in bytes
+            cmdLog.info("Threshold spill is: " + memory) ;
+            return new ThresholdPolicyMemory<Tuple<Long>>(memory, serializationFactory);
+        } else {
+            return new ThresholdPolicyCount<Tuple<Long>>(spill_size);            
+        }
     }
     
     @Override
@@ -187,8 +204,8 @@ public class tdbloader2 extends CmdGener
         ProgressLogger monitorTotal = new ProgressLogger(cmdLog, "tuples", BulkLoader.DataTickPoint,BulkLoader.superTick) ;
         monitorTotal.start() ;
 
-        DataBag<Tuple<Long>> outputTriples = new MultiThreadedSortedDataBag<Tuple<Long>>(policy, new TripleSerializationFactory(), comparator);
-        DataBag<Tuple<Long>> outputQuads = new MultiThreadedSortedDataBag<Tuple<Long>>(policy, new QuadSerializationFactory(), comparator);
+        DataBag<Tuple<Long>> outputTriples = new MultiThreadedSortedDataBag<Tuple<Long>>(getThresholdPolicy(tripleSerializationFactory), new TripleSerializationFactory(), comparator);
+        DataBag<Tuple<Long>> outputQuads = new MultiThreadedSortedDataBag<Tuple<Long>>(getThresholdPolicy(quadSerializationFactory), new QuadSerializationFactory(), comparator);
 
         // Node table and input data using node ids (rather than RDF node values)
         Sink<Quad> sink = new NodeTableBuilder2(dsg, monitorTotal, outputTriples, outputQuads) ; 
@@ -218,10 +235,10 @@ public class tdbloader2 extends CmdGener
             }
             sink.close() ;
 
-            spill(outputTriples) ;
+            // spill(outputTriples) ;
             bptSPO = createBPlusTreeIndex(Names.primaryIndexTriples, outputTriples) ;
 
-            spill(outputQuads) ;
+            // spill(outputQuads) ;
             bptGSPO = createBPlusTreeIndex(Names.primaryIndexQuads, outputQuads) ;
         } finally {
             outputTriples.close() ;
@@ -264,7 +281,7 @@ public class tdbloader2 extends CmdGener
         return this.getClass().getName() ;
     }
     
-    private void spill ( DataBag<?> bag ) {
+    public static void spill ( DataBag<?> bag ) {
         if ( bag instanceof MultiThreadedSortedDataBag<?> ) {
             ((MultiThreadedSortedDataBag<?>)bag).spill() ;
         }
@@ -333,9 +350,9 @@ public class tdbloader2 extends CmdGener
     	
     	DataBag<Tuple<Long>> outTuples ;
     	if ( size == 3 ) {
-    		outTuples = new MultiThreadedSortedDataBag<Tuple<Long>>(policy, new TripleSerializationFactory(), comparator);
+    		outTuples = new MultiThreadedSortedDataBag<Tuple<Long>>(getThresholdPolicy(tripleSerializationFactory), tripleSerializationFactory, comparator);
     	} else {
-    		outTuples = new MultiThreadedSortedDataBag<Tuple<Long>>(policy, new QuadSerializationFactory(), comparator);
+    		outTuples = new MultiThreadedSortedDataBag<Tuple<Long>>(getThresholdPolicy(quadSerializationFactory), quadSerializationFactory, comparator);
     	}
     	
         cmdLog.info("Index: sorting data for " + indexName + " index...") ;

Modified: incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/NodeTableBuilder2.java
URL: http://svn.apache.org/viewvc/incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/NodeTableBuilder2.java?rev=1176000&r1=1175999&r2=1176000&view=diff
==============================================================================
--- incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/NodeTableBuilder2.java (original)
+++ incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/NodeTableBuilder2.java Mon Sep 26 19:05:25 2011
@@ -18,6 +18,9 @@
 
 package org.apache.jena.tdbloader2;
 
+import static cmd.tdbloader2.spill_size_auto;
+import static cmd.tdbloader2.spill_size;
+// import static cmd.tdbloader2.spill;
 import static com.hp.hpl.jena.tdb.lib.NodeLib.setHash;
 import static com.hp.hpl.jena.tdb.sys.SystemTDB.LenNodeHash;
 import static com.hp.hpl.jena.tdb.sys.SystemTDB.SizeOfNodeId;
@@ -31,7 +34,10 @@ import java.util.Iterator;
 import org.apache.commons.codec.binary.Hex;
 import org.openjena.atlas.AtlasException;
 import org.openjena.atlas.data.DataBag;
+import org.openjena.atlas.data.SerializationFactory;
+import org.openjena.atlas.data.ThresholdPolicy;
 import org.openjena.atlas.data.ThresholdPolicyCount;
+import org.openjena.atlas.data.ThresholdPolicyMemory;
 import org.openjena.atlas.iterator.Iter;
 import org.openjena.atlas.iterator.Transform;
 import org.openjena.atlas.lib.Bytes;
@@ -72,10 +78,10 @@ public class NodeTableBuilder2 implement
     private ProgressLogger monitor ;
     private StatsCollectorNodeId stats ;
     
-    private ThresholdPolicyCount<Pair<byte[], byte[]>> policy = new ThresholdPolicyCount<Pair<byte[], byte[]>>(tdbloader2.spill_size) ;
     private DataBag<Pair<byte[], byte[]>> sdb01 ;
     private DataBag<Pair<byte[], byte[]>> sdb02 ;
     private DataBag<Pair<byte[], byte[]>> sdb03 ;
+    private SerializationFactory<Pair<byte[], byte[]>> serializationFactory = new PairSerializationFactory() ;
     
     private MessageDigest digest ;
     
@@ -87,6 +93,7 @@ public class NodeTableBuilder2 implement
 
         this.dsg = dsg ;
         this.monitor = monitor ;
+        this.log = monitor.getLogger() ;
 
         String filename = new FileSet(dsg.getLocation(), Names.indexId2Node).filename(Names.extNodeData) ;
         this.objects = FileFactory.createObjectFileDisk(filename) ; 
@@ -95,20 +102,26 @@ public class NodeTableBuilder2 implement
         this.outputQuads = outputQuads ; 
         this.stats = new StatsCollectorNodeId() ;
         
-        this.sdb01 = new MultiThreadedSortedDataBag<Pair<byte[], byte[]>>(policy, new PairSerializationFactory(), new PairComparator());
-        this.sdb02 = new MultiThreadedSortedDataBag<Pair<byte[], byte[]>>(policy, new PairSerializationFactory(), new PairComparator());
-        this.sdb03 = new MultiThreadedSortedDataBag<Pair<byte[], byte[]>>(policy, new PairSerializationFactory(), new PairComparator());
+        this.sdb01 = new MultiThreadedSortedDataBag<Pair<byte[], byte[]>>(getThresholdPolicy(), serializationFactory, new PairComparator());
         
         try {
             this.digest = MessageDigest.getInstance("MD5") ;               
         } catch (NoSuchAlgorithmException e) {
             throw new AtlasException(e) ;
         }
-
-        this.log = monitor.getLogger() ;
     }
     
     public StatsCollectorNodeId getCollector() { return stats ; }
+
+    private ThresholdPolicy<Pair<byte[], byte[]>> getThresholdPolicy() {
+        if ( spill_size_auto == true ) {
+            long memory = Math.round( Runtime.getRuntime().maxMemory() * 0.065 ) ; // in bytes
+            log.info("Threshold spill is: " + memory) ;
+            return new ThresholdPolicyMemory<Pair<byte[], byte[]>>(memory, serializationFactory);
+        } else {
+            return new ThresholdPolicyCount<Pair<byte[], byte[]>>(spill_size);            
+        }
+    }
     
     @Override
     public void send(Quad quad)
@@ -164,6 +177,12 @@ public class NodeTableBuilder2 implement
     }
     
     private void buildNodesObjectFile() {
+        
+        // spill(sdb01) ;
+        
+        this.sdb02 = new MultiThreadedSortedDataBag<Pair<byte[], byte[]>>(getThresholdPolicy(), serializationFactory, new PairComparator());
+        this.sdb03 = new MultiThreadedSortedDataBag<Pair<byte[], byte[]>>(getThresholdPolicy(), serializationFactory, new PairComparator());
+        
         try {
             log.info("Node Table (1/3): building nodes.dat and sorting hash|id ...") ;
             ProgressLogger monitor01 = new ProgressLogger(log, "records for node table (1/3) phase", BulkLoader.DataTickPoint,BulkLoader.superTick) ;
@@ -200,6 +219,8 @@ public class NodeTableBuilder2 implement
         } finally {
             sdb01.close() ;
             sdb01 = null ;
+//            spill (sdb02) ;
+//            spill (sdb03) ;
         }
     }
     

Modified: incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/PairSerializationFactory.java
URL: http://svn.apache.org/viewvc/incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/PairSerializationFactory.java?rev=1176000&r1=1175999&r2=1176000&view=diff
==============================================================================
--- incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/PairSerializationFactory.java (original)
+++ incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/PairSerializationFactory.java Mon Sep 26 19:05:25 2011
@@ -22,7 +22,6 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.Iterator;
 
-import org.openjena.atlas.AtlasException;
 import org.openjena.atlas.data.SerializationFactory;
 import org.openjena.atlas.lib.Pair;
 import org.openjena.atlas.lib.Sink;
@@ -30,5 +29,5 @@ import org.openjena.atlas.lib.Sink;
 public class PairSerializationFactory implements SerializationFactory<Pair<byte[], byte[]>> {
     @Override public Iterator<Pair<byte[], byte[]>> createDeserializer(InputStream in) { return new PairInputStream(in); }
     @Override public Sink<Pair<byte[], byte[]>> createSerializer(OutputStream out) { return new PairOutputStream(out); }
-    @Override public long getEstimatedMemorySize(Pair<byte[], byte[]> item) { throw new AtlasException("Method not implemented.") ; }
+    @Override public long getEstimatedMemorySize(Pair<byte[], byte[]> item) { return 8 + item.getLeft().length + item.getRight().length ; } // 8 because 4 bytes for the length of each of the pair items
 }

Modified: incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/QuadSerializationFactory.java
URL: http://svn.apache.org/viewvc/incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/QuadSerializationFactory.java?rev=1176000&r1=1175999&r2=1176000&view=diff
==============================================================================
--- incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/QuadSerializationFactory.java (original)
+++ incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/QuadSerializationFactory.java Mon Sep 26 19:05:25 2011
@@ -22,7 +22,6 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.Iterator;
 
-import org.openjena.atlas.AtlasException;
 import org.openjena.atlas.data.SerializationFactory;
 import org.openjena.atlas.lib.Sink;
 import org.openjena.atlas.lib.Tuple;
@@ -30,5 +29,5 @@ import org.openjena.atlas.lib.Tuple;
 public class QuadSerializationFactory implements SerializationFactory<Tuple<Long>> {
     @Override public Iterator<Tuple<Long>> createDeserializer(InputStream in) { return new TupleInputStream(in, 4); }
     @Override public Sink<Tuple<Long>> createSerializer(OutputStream out) { return new TupleOutputStream(out); }
-    @Override public long getEstimatedMemorySize(Tuple<Long> item) { throw new AtlasException("Method not implemented.") ; }
+    @Override public long getEstimatedMemorySize(Tuple<Long> item) { return 32L ; } // 8 * 4 = 32 bytes
 }
\ No newline at end of file

Modified: incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/TripleSerializationFactory.java
URL: http://svn.apache.org/viewvc/incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/TripleSerializationFactory.java?rev=1176000&r1=1175999&r2=1176000&view=diff
==============================================================================
--- incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/TripleSerializationFactory.java (original)
+++ incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/TripleSerializationFactory.java Mon Sep 26 19:05:25 2011
@@ -22,7 +22,6 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.Iterator;
 
-import org.openjena.atlas.AtlasException;
 import org.openjena.atlas.data.SerializationFactory;
 import org.openjena.atlas.lib.Sink;
 import org.openjena.atlas.lib.Tuple;
@@ -30,5 +29,5 @@ import org.openjena.atlas.lib.Tuple;
 public class TripleSerializationFactory implements SerializationFactory<Tuple<Long>> {
     @Override public Iterator<Tuple<Long>> createDeserializer(InputStream in) { return new TupleInputStream(in, 3); }
     @Override public Sink<Tuple<Long>> createSerializer(OutputStream out) { return new TupleOutputStream(out); }
-    @Override public long getEstimatedMemorySize(Tuple<Long> item) { throw new AtlasException("Method not implemented.") ; }
+    @Override public long getEstimatedMemorySize(Tuple<Long> item) { return 24L ; } // 8 * 3 = 24 bytes
 }
\ No newline at end of file

Added: incubator/jena/Scratch/PC/tdbloader2/trunk/src/test/java/cmd/TS_TDBLoader2.java
URL: http://svn.apache.org/viewvc/incubator/jena/Scratch/PC/tdbloader2/trunk/src/test/java/cmd/TS_TDBLoader2.java?rev=1176000&view=auto
==============================================================================
--- incubator/jena/Scratch/PC/tdbloader2/trunk/src/test/java/cmd/TS_TDBLoader2.java (added)
+++ incubator/jena/Scratch/PC/tdbloader2/trunk/src/test/java/cmd/TS_TDBLoader2.java Mon Sep 26 19:05:25 2011
@@ -0,0 +1,15 @@
+package cmd;
+
+import org.apache.jena.tdbloader2.TestMultiThreadedSortedDataBag;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+@RunWith(Suite.class)
+@Suite.SuiteClasses( {
+    
+    TestMultiThreadedSortedDataBag.class, 
+    TestTDBLoader2.class
+    
+})
+
+public class TS_TDBLoader2 {}
\ No newline at end of file

Propchange: incubator/jena/Scratch/PC/tdbloader2/trunk/src/test/java/cmd/TS_TDBLoader2.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain