You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by ca...@apache.org on 2011/09/23 23:01:17 UTC

svn commit: r1175018 - in /incubator/jena/Scratch/PC/tdbloader2/trunk: README pom.xml src/main/java/cmd/tdbloader2.java src/main/java/org/apache/jena/tdbloader2/MultiThreadedSortedDataBag.java

Author: castagna
Date: Fri Sep 23 21:01:17 2011
New Revision: 1175018

URL: http://svn.apache.org/viewvc?rev=1175018&view=rev
Log:
JENA-117

Modified:
    incubator/jena/Scratch/PC/tdbloader2/trunk/README
    incubator/jena/Scratch/PC/tdbloader2/trunk/pom.xml
    incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/cmd/tdbloader2.java
    incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/MultiThreadedSortedDataBag.java

Modified: incubator/jena/Scratch/PC/tdbloader2/trunk/README
URL: http://svn.apache.org/viewvc/incubator/jena/Scratch/PC/tdbloader2/trunk/README?rev=1175018&r1=1175017&r2=1175018&view=diff
==============================================================================
--- incubator/jena/Scratch/PC/tdbloader2/trunk/README (original)
+++ incubator/jena/Scratch/PC/tdbloader2/trunk/README Fri Sep 23 21:01:17 2011
@@ -2,4 +2,34 @@ tdbloader2
 ----------
 
 This is an experimental (pure Java) version of tdbloader2.
-See also: https://issues.apache.org/jira/browse/JENA-117
\ No newline at end of file
+See also: https://issues.apache.org/jira/browse/JENA-117
+
+If you want to try it, here is how you can checkout and compile it:
+
+  cd /tmp
+  svn co https://svn.apache.org/repos/asf/incubator/jena/Scratch/PC/tdbloader2/trunk/ tdbloader2
+  cd /tmp/tdbloader2
+  mvn package
+
+This is how you can run it:
+
+  java -cp target/tdbloader2-0.1-incubating-SNAPSHOT-jar-with-dependencies.jar -server -d64 -Xmx6144M cmd.tdbloader2 --no-stats --compression --spill-size 1500000 --loc /tmp/tdb /opt/datasets/raw/openlibrary-2011-06-02.nt.gz
+
+For a list of the options:
+
+  java -cp target/tdbloader2-0.1-incubating-SNAPSHOT-jar-with-dependencies.jar cmd.tdbloader2 -h
+
+  cmd.tdbloader2 --loc=DIR FILE ...
+  General
+      -v   --verbose         Verbose
+      -q   --quiet           Run with minimal output
+      --debug                Output information for debugging
+      --help
+      --version              Version information
+      --loc                  Location
+      --compression          Use compression for intermediate files
+      --buffer-size          The size of buffers for IO in bytes
+      --gzip-outside         GZIP...(Buffered...())
+      --spill-size           The size of spillable segments in tuples|records
+      --no-stats             Do not generate the stats file
+      --no-buffer            Do not use Buffered{Input|Output}Stream

Modified: incubator/jena/Scratch/PC/tdbloader2/trunk/pom.xml
URL: http://svn.apache.org/viewvc/incubator/jena/Scratch/PC/tdbloader2/trunk/pom.xml?rev=1175018&r1=1175017&r2=1175018&view=diff
==============================================================================
--- incubator/jena/Scratch/PC/tdbloader2/trunk/pom.xml (original)
+++ incubator/jena/Scratch/PC/tdbloader2/trunk/pom.xml Fri Sep 23 21:01:17 2011
@@ -105,40 +105,22 @@
 
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-source-plugin</artifactId>
-        <version>2.1.2</version>
+        <artifactId>maven-assembly-plugin</artifactId>
+        <version>2.2.1</version>
+        <configuration>
+          <descriptorRefs>
+            <descriptorRef>jar-with-dependencies</descriptorRef>
+          </descriptorRefs>
+        </configuration>
         <executions>
           <execution>
-            <id>attach-sources</id>
+            <id>make-assembly</id>
             <phase>package</phase>
             <goals>
-              <goal>jar-no-fork</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-javadoc-plugin</artifactId>
-        <executions>
-          <execution>
-            <id>attach-javadocs</id>
-            <goals>
-              <goal>jar</goal>
+              <goal>single</goal>
             </goals>
           </execution>
         </executions>
-        <configuration>
-          <version>true</version>
-          <show>public</show>
-          <quiet>true</quiet>
-          <encoding>${project.build.sourceEncoding}</encoding>
-          <windowtitle>${project.name} ${project.version}</windowtitle>
-          <doctitle>${project.name} ${project.version}</doctitle>
-          <!-- Exclude the implementation -->
-          <includePackageNames>org.apache.jena.larq</includePackageNames>
-        </configuration>
       </plugin>
 
       <plugin>

Modified: incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/cmd/tdbloader2.java
URL: http://svn.apache.org/viewvc/incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/cmd/tdbloader2.java?rev=1175018&r1=1175017&r2=1175018&view=diff
==============================================================================
--- incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/cmd/tdbloader2.java (original)
+++ incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/cmd/tdbloader2.java Fri Sep 23 21:01:17 2011
@@ -376,26 +376,24 @@ public class tdbloader2 extends CmdGener
     // Utility methods for RDF parsing...
 
     public static final NodeToLabel nodeToLabel = NodeToLabel.createBNodeByLabelAsGiven();
+    private static final Prologue prologue = new Prologue(null, IRIResolver.createNoResolve()); 
 
     public static String serialize(Node node) {
-        Prologue prologue = new Prologue(null, IRIResolver.createNoResolve()); 
         StringWriter out = new StringWriter();
         OutputLangUtils.output(out, node, prologue, nodeToLabel);
         return out.toString();
     }
     
     private static ParserProfile createParserProfile(String runId, String filename) {
-        Prologue prologue = new Prologue(null, IRIResolver.createNoResolve()); 
         LabelToNode labelMapping = new CustomLabelToNode(runId, filename);
         return new ParserProfileBase(prologue, ErrorHandlerFactory.errorHandlerStd, labelMapping);
     }
 
     private static ParserProfile createParserProfile() {
-        Prologue prologue = new Prologue(null, IRIResolver.createNoResolve()); 
         LabelToNode labelMapping = LabelToNode.createUseLabelAsGiven() ;
         return new ParserProfileBase(prologue, ErrorHandlerFactory.errorHandlerStd, labelMapping);
     }
-    
+
     private static ParserProfile profile = createParserProfile();
 
     public static Node parse(String string) {

Modified: incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/MultiThreadedSortedDataBag.java
URL: http://svn.apache.org/viewvc/incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/MultiThreadedSortedDataBag.java?rev=1175018&r1=1175017&r2=1175018&view=diff
==============================================================================
--- incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/MultiThreadedSortedDataBag.java (original)
+++ incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/MultiThreadedSortedDataBag.java Fri Sep 23 21:01:17 2011
@@ -32,6 +32,8 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.PriorityQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import org.openjena.atlas.AtlasException;
 import org.openjena.atlas.data.AbstractDataBag;
@@ -48,10 +50,14 @@ public class MultiThreadedSortedDataBag<
     private final SerializationFactory<E> serializationFactory;
     private final Comparator<? super E> comparator;
     
+    private ExecutorService pool = Executors.newSingleThreadExecutor();
+    private boolean multithreaded = true ;
+    
     protected boolean finishedAdding = false;
     protected boolean spilled = false;
     protected boolean closed = false;
     
+    
     public MultiThreadedSortedDataBag(ThresholdPolicy<E> policy, SerializationFactory<E> serializerFactory, Comparator<? super E> comparator)
     {
         this.policy = policy;
@@ -115,7 +121,11 @@ public class MultiThreadedSortedDataBag<
             Object[] array = memory.toArray();
             Sink<E> serializer = serializationFactory.createSerializer(out);
 
-            new Spiller (array, serializer).start();
+            if ( multithreaded ) {
+                pool.execute( new Spiller (array, serializer) ) ;
+            } else {
+                spill(array, serializer) ;
+            }
 
             spilled = true;
             policy.reset();
@@ -123,7 +133,7 @@ public class MultiThreadedSortedDataBag<
         }
     }
 
-    class Spiller extends Thread {
+    class Spiller implements Runnable {
         
         private Object[] array ;
         private Sink<E> serializer ;
@@ -133,23 +143,26 @@ public class MultiThreadedSortedDataBag<
             this.serializer = serializer ;
         }
         
-        @SuppressWarnings("unchecked")
         @Override
         public void run() {
-            Arrays.sort(array, (Comparator)comparator);
-            try
-            {
-                for (Object tuple : array)
-                {
-                    serializer.send((E)tuple);
-                }
-            }
-            finally
+            spill(array, serializer) ;
+        }
+    }
+    
+    @SuppressWarnings("unchecked")
+    private final void spill (Object[] array, Sink<E> serializer) {
+        Arrays.sort(array, (Comparator)comparator);
+        try
+        {
+            for (Object tuple : array)
             {
-                serializer.close();
+                serializer.send((E)tuple);
             }
         }
-        
+        finally
+        {
+            serializer.close();
+        }
     }
     
     public void flush()
@@ -184,6 +197,8 @@ public class MultiThreadedSortedDataBag<
         
         if (spilled)
         {
+            if ( ( multithreaded ) && ( ! pool.isShutdown() ) ) pool.shutdown() ;
+            
             List<Iterator<E>> inputs = new ArrayList<Iterator<E>>(spillFiles.size() + (memSize > 0 ? 1 : 0));
                         
             if (memSize > 0)