You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by ca...@apache.org on 2011/09/26 12:28:12 UTC

svn commit: r1175775 - in /incubator/jena/Scratch/PC/tdbloader2/trunk: ./ src/main/java/cmd/ src/main/java/org/apache/jena/tdbloader2/ src/test/java/org/ src/test/java/org/apache/ src/test/java/org/apache/jena/ src/test/java/org/apache/jena/tdbloader2/

Author: castagna
Date: Mon Sep 26 10:28:12 2011
New Revision: 1175775

URL: http://svn.apache.org/viewvc?rev=1175775&view=rev
Log:
JENA-117

Added:
    incubator/jena/Scratch/PC/tdbloader2/trunk/src/test/java/org/
    incubator/jena/Scratch/PC/tdbloader2/trunk/src/test/java/org/apache/
    incubator/jena/Scratch/PC/tdbloader2/trunk/src/test/java/org/apache/jena/
    incubator/jena/Scratch/PC/tdbloader2/trunk/src/test/java/org/apache/jena/tdbloader2/
    incubator/jena/Scratch/PC/tdbloader2/trunk/src/test/java/org/apache/jena/tdbloader2/TestMultiThreadedSortedDataBag.java   (with props)
Modified:
    incubator/jena/Scratch/PC/tdbloader2/trunk/   (props changed)
    incubator/jena/Scratch/PC/tdbloader2/trunk/README
    incubator/jena/Scratch/PC/tdbloader2/trunk/TODO
    incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/cmd/tdbloader2.java
    incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/MultiThreadedSortedDataBag.java

Propchange: incubator/jena/Scratch/PC/tdbloader2/trunk/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Mon Sep 26 10:28:12 2011
@@ -1 +1,3 @@
 target
+
+*.log

Modified: incubator/jena/Scratch/PC/tdbloader2/trunk/README
URL: http://svn.apache.org/viewvc/incubator/jena/Scratch/PC/tdbloader2/trunk/README?rev=1175775&r1=1175774&r2=1175775&view=diff
==============================================================================
--- incubator/jena/Scratch/PC/tdbloader2/trunk/README (original)
+++ incubator/jena/Scratch/PC/tdbloader2/trunk/README Mon Sep 26 10:28:12 2011
@@ -33,3 +33,5 @@ For a list of the options:
       --spill-size           The size of spillable segments in tuples|records
       --no-stats             Do not generate the stats file
       --no-buffer            Do not use Buffered{Input|Output}Stream
+      --max-merge-files      Specify the maximum number of files to merge at the same time (default: 100)
+

Modified: incubator/jena/Scratch/PC/tdbloader2/trunk/TODO
URL: http://svn.apache.org/viewvc/incubator/jena/Scratch/PC/tdbloader2/trunk/TODO?rev=1175775&r1=1175774&r2=1175775&view=diff
==============================================================================
--- incubator/jena/Scratch/PC/tdbloader2/trunk/TODO (original)
+++ incubator/jena/Scratch/PC/tdbloader2/trunk/TODO Mon Sep 26 10:28:12 2011
@@ -3,9 +3,5 @@ TODO
 
  - Support N3, TURTLE, RDF/XML, etc... not only N-Triples | N-Quads.
 
- - A better SpillSortIterator when there are many files. (Also, make sure
-   to avoid many open files...). See the preMerge() method here:
-   https://svn.apache.org/repos/asf/pig/trunk/src/org/apache/pig/data/SortedDataBag.java
-
  - Use ThresholdPolicyMemory instead of ThresholdPolicyCount (this needs
    to bites estimates).

Modified: incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/cmd/tdbloader2.java
URL: http://svn.apache.org/viewvc/incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/cmd/tdbloader2.java?rev=1175775&r1=1175774&r2=1175775&view=diff
==============================================================================
--- incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/cmd/tdbloader2.java (original)
+++ incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/cmd/tdbloader2.java Mon Sep 26 10:28:12 2011
@@ -97,13 +97,14 @@ public class tdbloader2 extends CmdGener
 
     private static String runId = String.valueOf(System.currentTimeMillis()) ; // a unique identifier for this run, it's used for blank node labels
 
-    private static ArgDecl argLocation     = new ArgDecl(ArgDecl.HasValue, "loc", "location") ;
-    private static ArgDecl argCompression  = new ArgDecl(ArgDecl.NoValue,  "comp", "compression") ;
-    private static ArgDecl argBufferSize   = new ArgDecl(ArgDecl.HasValue, "buf", "buffer-size") ;
-    private static ArgDecl argGzipOutside  = new ArgDecl(ArgDecl.NoValue,  "gzip-outside") ;
-    private static ArgDecl argSpillSize    = new ArgDecl(ArgDecl.HasValue, "spill", "spill-size") ;
-    private static ArgDecl argNoStats      = new ArgDecl(ArgDecl.NoValue,  "no-stats") ;
-    private static ArgDecl argNoBuffer     = new ArgDecl(ArgDecl.NoValue,  "no-buffer") ;
+    private static ArgDecl argLocation      = new ArgDecl(ArgDecl.HasValue, "loc", "location") ;
+    private static ArgDecl argCompression   = new ArgDecl(ArgDecl.NoValue,  "comp", "compression") ;
+    private static ArgDecl argBufferSize    = new ArgDecl(ArgDecl.HasValue, "buf", "buffer-size") ;
+    private static ArgDecl argGzipOutside   = new ArgDecl(ArgDecl.NoValue,  "gzip-outside") ;
+    private static ArgDecl argSpillSize     = new ArgDecl(ArgDecl.HasValue, "spill", "spill-size") ;
+    private static ArgDecl argNoStats       = new ArgDecl(ArgDecl.NoValue,  "no-stats") ;
+    private static ArgDecl argNoBuffer      = new ArgDecl(ArgDecl.NoValue,  "no-buffer") ;
+    private static ArgDecl argMaxMergeFiles = new ArgDecl(ArgDecl.HasValue, "max-merge-files") ;
 
     private Location location ;
     private String locationString ;
@@ -125,13 +126,14 @@ public class tdbloader2 extends CmdGener
     public tdbloader2(String...argv)
     {
         super(argv) ;
-        super.add(argLocation,    "--loc",          "Location") ;
-        super.add(argCompression, "--compression",  "Use compression for intermediate files") ;
-        super.add(argBufferSize,  "--buffer-size",  "The size of buffers for IO in bytes") ;
-        super.add(argGzipOutside, "--gzip-outside", "GZIP...(Buffered...())") ;
-        super.add(argSpillSize,   "--spill-size",   "The size of spillable segments in tuples|records") ;
-        super.add(argNoStats,     "--no-stats",     "Do not generate the stats file") ;
-        super.add(argNoBuffer,    "--no-buffer",    "Do not use Buffered{Input|Output}Stream") ;
+        super.add(argLocation,      "--loc",               "Location") ;
+        super.add(argCompression,   "--compression",       "Use compression for intermediate files") ;
+        super.add(argBufferSize,    "--buffer-size",       "The size of buffers for IO in bytes") ;
+        super.add(argGzipOutside,   "--gzip-outside",      "GZIP...(Buffered...())") ;
+        super.add(argSpillSize,     "--spill-size",        "The size of spillable segments in tuples|records") ;
+        super.add(argNoStats,       "--no-stats",          "Do not generate the stats file") ;
+        super.add(argNoBuffer,      "--no-buffer",         "Do not use Buffered{Input|Output}Stream") ;
+        super.add(argMaxMergeFiles, "--max-merge-files",   "Specify the maximum number of files to merge at the same time (default: 100)") ;
     }
         
     @Override
@@ -152,6 +154,8 @@ public class tdbloader2 extends CmdGener
         if ( super.hasArg(argBufferSize) ) 
             DataStreamFactory.setBufferSize( Integer.valueOf(super.getValue(argBufferSize)) ) ;
         DataStreamFactory.setBuffered( ! super.hasArg(argNoBuffer) ) ;
+        if ( super.hasArg(argMaxMergeFiles) )
+            MultiThreadedSortedDataBag.MAX_SPILL_FILES = Integer.valueOf(super.getValue(argMaxMergeFiles)) ;
         
         datafiles  = super.getPositional() ;
 

Modified: incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/MultiThreadedSortedDataBag.java
URL: http://svn.apache.org/viewvc/incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/MultiThreadedSortedDataBag.java?rev=1175775&r1=1175774&r2=1175775&view=diff
==============================================================================
--- incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/MultiThreadedSortedDataBag.java (original)
+++ incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/MultiThreadedSortedDataBag.java Mon Sep 26 10:28:12 2011
@@ -61,6 +61,8 @@ public class MultiThreadedSortedDataBag<
     protected boolean finishedAdding = false;
     protected boolean spilled = false;
     protected boolean closed = false;
+
+    public static int MAX_SPILL_FILES = 100 ;
     
     
     public MultiThreadedSortedDataBag(ThresholdPolicy<E> policy, SerializationFactory<E> serializerFactory, Comparator<? super E> comparator)
@@ -72,7 +74,7 @@ public class MultiThreadedSortedDataBag<
         // this will prevent to have more than once spiller running and one queued up
         this.pool.setRejectedExecutionHandler(this.block) ;
     }
-    
+   
     protected void checkClosed()
     {
         if (closed) throw new AtlasException("SortedDataBag is closed, no operations can be performed on it.") ;
@@ -87,6 +89,10 @@ public class MultiThreadedSortedDataBag<
     {
         return false;
     }
+    
+    protected List<File> getSpillFiles() {
+        return spillFiles ;
+    }
 
     public void add(E item)
     {
@@ -185,9 +191,16 @@ public class MultiThreadedSortedDataBag<
      * 
      * @return an Iterator
      */
-    @SuppressWarnings({ "unchecked" })
     public Iterator<E> iterator()
     {
+        preMerge();
+        
+        return iterator(spillFiles.size());
+    }
+
+    @SuppressWarnings({ "unchecked" })
+    private Iterator<E> iterator(int size)
+    {
         checkClosed();
         
         int memSize = memory.size();
@@ -212,18 +225,18 @@ public class MultiThreadedSortedDataBag<
                 } catch (InterruptedException e) {
                     throw new AtlasException(e) ;
                 }
-
             }
             
-            List<Iterator<E>> inputs = new ArrayList<Iterator<E>>(spillFiles.size() + (memSize > 0 ? 1 : 0));
+            List<Iterator<E>> inputs = new ArrayList<Iterator<E>>(size + (memSize > 0 ? 1 : 0));
                         
             if (memSize > 0)
             {
                 inputs.add(memory.iterator());
             }
             
-            for (File spillFile : spillFiles)
+            for ( int i = 0; i < size; i++ )
             {
+                File spillFile = spillFiles.get(i);
                 try
                 {
                     InputStream in = new BufferedInputStream(new FileInputStream(spillFile));
@@ -262,6 +275,39 @@ public class MultiThreadedSortedDataBag<
         }
     }
     
+    private void preMerge() {
+        if (spillFiles == null || spillFiles.size() <= MAX_SPILL_FILES) { return; }
+
+        try {
+            while ( spillFiles.size() > MAX_SPILL_FILES ) {
+                Sink<E> sink = serializationFactory.createSerializer(getSpillFile()) ;
+                Iterator<E> ssi = iterator(MAX_SPILL_FILES) ;
+                try {
+                    while ( ssi.hasNext() ) {
+                        sink.send( ssi.next() );
+                    }
+                } finally {
+                    Iter.close(ssi) ;
+                    sink.close() ;
+                }
+                
+                List<File> toRemove = new ArrayList<File>(MAX_SPILL_FILES) ;
+                for ( int i = 0; i < MAX_SPILL_FILES; i++ ) {
+                    File file = spillFiles.get(i) ;
+                    file.delete() ;
+                    toRemove.add(file) ;
+                }
+
+                spillFiles.removeAll(toRemove) ;
+                
+                memory = new ArrayList<E>() ;
+            }            
+        } catch (IOException e) {
+            throw new AtlasException(e) ;
+        }
+
+    }    
+    
     public void close()
     {
         if (!closed)
@@ -385,7 +431,7 @@ public class MultiThreadedSortedDataBag<
                 return tuple.hashCode();
             }
         }
-        
+
     }
 
 }

Added: incubator/jena/Scratch/PC/tdbloader2/trunk/src/test/java/org/apache/jena/tdbloader2/TestMultiThreadedSortedDataBag.java
URL: http://svn.apache.org/viewvc/incubator/jena/Scratch/PC/tdbloader2/trunk/src/test/java/org/apache/jena/tdbloader2/TestMultiThreadedSortedDataBag.java?rev=1175775&view=auto
==============================================================================
--- incubator/jena/Scratch/PC/tdbloader2/trunk/src/test/java/org/apache/jena/tdbloader2/TestMultiThreadedSortedDataBag.java (added)
+++ incubator/jena/Scratch/PC/tdbloader2/trunk/src/test/java/org/apache/jena/tdbloader2/TestMultiThreadedSortedDataBag.java Mon Sep 26 10:28:12 2011
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.tdbloader2;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+
+import junit.framework.TestCase;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.openjena.atlas.data.ThresholdPolicyCount;
+import org.openjena.atlas.iterator.Iter;
+import org.openjena.riot.SerializationFactoryFinder;
+
+import com.hp.hpl.jena.query.Query;
+import com.hp.hpl.jena.query.SortCondition;
+import com.hp.hpl.jena.sparql.core.Var;
+import com.hp.hpl.jena.sparql.engine.binding.Binding;
+import com.hp.hpl.jena.sparql.engine.binding.BindingComparator;
+import com.hp.hpl.jena.sparql.engine.binding.BindingFactory;
+import com.hp.hpl.jena.sparql.expr.ExprVar;
+import com.hp.hpl.jena.sparql.util.NodeFactory;
+
+public class TestMultiThreadedSortedDataBag extends TestCase
+{
+    private List<Binding> unsorted;
+    private BindingComparator comparator ;
+    private MultiThreadedSortedDataBag<Binding> db ;
+    
+    private int N = 500 ;
+    private int MAX_SPILL_FILES = 10 ;
+    private int THRESHOLD = 5 ; 
+
+    @Before @Override public void setUp() 
+    {
+        Random random = new Random();
+        unsorted = new ArrayList<Binding>();
+        Var var = Var.alloc("x");
+        for(int i = 0; i < N; i++){
+            unsorted.add(BindingFactory.binding(var, NodeFactory.intToNode(random.nextInt())));
+        }
+        
+        List<SortCondition> conditions = new ArrayList<SortCondition>(); 
+        conditions.add(new SortCondition(new ExprVar("x"), Query.ORDER_ASCENDING));
+        comparator = new BindingComparator(conditions);
+        
+        db = new MultiThreadedSortedDataBag<Binding>( new ThresholdPolicyCount<Binding>(THRESHOLD), SerializationFactoryFinder.bindingSerializationFactory(), comparator);
+        MultiThreadedSortedDataBag.MAX_SPILL_FILES = MAX_SPILL_FILES ;
+    }
+
+    @Test public void testSorting() 
+    {
+        List<Binding> sorted = new ArrayList<Binding>();
+        try
+        {
+            db.addAll(unsorted);
+            Iterator<Binding> iter = db.iterator(); 
+            while (iter.hasNext())
+            {
+                sorted.add(iter.next());
+            }
+            Iter.close(iter);
+        }
+        finally
+        {
+            db.close();
+        }
+        
+        Collections.sort(unsorted, comparator);
+        assertEquals(unsorted, sorted);
+    }
+    
+    @Test public void testTemporaryFilesAreCleanedUpAfterCompletion()
+    {
+        try
+        {
+            db.addAll(unsorted);
+            
+            int count = 0;
+            for (File file : db.getSpillFiles())
+            {
+                if (file.exists())
+                {
+                    count++;
+                }
+            }
+            assertEquals(N / THRESHOLD - 1, count);
+            
+            Iterator<Binding> iter = db.iterator();
+            while (iter.hasNext())
+            {
+                iter.next();
+            }
+            Iter.close(iter);
+        }
+        finally
+        {
+            db.close();
+        }
+        
+        int count = 0;
+        for (File file : db.getSpillFiles())
+        {
+            if (file.exists())
+            {
+                count++;
+            }
+        }
+        assertEquals(0, count);
+    }
+    
+}

Propchange: incubator/jena/Scratch/PC/tdbloader2/trunk/src/test/java/org/apache/jena/tdbloader2/TestMultiThreadedSortedDataBag.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain