You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by ca...@apache.org on 2011/09/26 12:28:12 UTC
svn commit: r1175775 - in /incubator/jena/Scratch/PC/tdbloader2/trunk: ./
src/main/java/cmd/ src/main/java/org/apache/jena/tdbloader2/
src/test/java/org/ src/test/java/org/apache/ src/test/java/org/apache/jena/
src/test/java/org/apache/jena/tdbloader2/
Author: castagna
Date: Mon Sep 26 10:28:12 2011
New Revision: 1175775
URL: http://svn.apache.org/viewvc?rev=1175775&view=rev
Log:
JENA-117
Added:
incubator/jena/Scratch/PC/tdbloader2/trunk/src/test/java/org/
incubator/jena/Scratch/PC/tdbloader2/trunk/src/test/java/org/apache/
incubator/jena/Scratch/PC/tdbloader2/trunk/src/test/java/org/apache/jena/
incubator/jena/Scratch/PC/tdbloader2/trunk/src/test/java/org/apache/jena/tdbloader2/
incubator/jena/Scratch/PC/tdbloader2/trunk/src/test/java/org/apache/jena/tdbloader2/TestMultiThreadedSortedDataBag.java (with props)
Modified:
incubator/jena/Scratch/PC/tdbloader2/trunk/ (props changed)
incubator/jena/Scratch/PC/tdbloader2/trunk/README
incubator/jena/Scratch/PC/tdbloader2/trunk/TODO
incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/cmd/tdbloader2.java
incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/MultiThreadedSortedDataBag.java
Propchange: incubator/jena/Scratch/PC/tdbloader2/trunk/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Mon Sep 26 10:28:12 2011
@@ -1 +1,3 @@
target
+
+*.log
Modified: incubator/jena/Scratch/PC/tdbloader2/trunk/README
URL: http://svn.apache.org/viewvc/incubator/jena/Scratch/PC/tdbloader2/trunk/README?rev=1175775&r1=1175774&r2=1175775&view=diff
==============================================================================
--- incubator/jena/Scratch/PC/tdbloader2/trunk/README (original)
+++ incubator/jena/Scratch/PC/tdbloader2/trunk/README Mon Sep 26 10:28:12 2011
@@ -33,3 +33,5 @@ For a list of the options:
--spill-size The size of spillable segments in tuples|records
--no-stats Do not generate the stats file
--no-buffer Do not use Buffered{Input|Output}Stream
+ --max-merge-files Specify the maximum number of files to merge at the same time (default: 100)
+
Modified: incubator/jena/Scratch/PC/tdbloader2/trunk/TODO
URL: http://svn.apache.org/viewvc/incubator/jena/Scratch/PC/tdbloader2/trunk/TODO?rev=1175775&r1=1175774&r2=1175775&view=diff
==============================================================================
--- incubator/jena/Scratch/PC/tdbloader2/trunk/TODO (original)
+++ incubator/jena/Scratch/PC/tdbloader2/trunk/TODO Mon Sep 26 10:28:12 2011
@@ -3,9 +3,5 @@ TODO
- Support N3, TURTLE, RDF/XML, etc... not only N-Triples | N-Quads.
- - A better SpillSortIterator when there are many files. (Also, make sure
- to avoid many open files...). See the preMerge() method here:
- https://svn.apache.org/repos/asf/pig/trunk/src/org/apache/pig/data/SortedDataBag.java
-
- Use ThresholdPolicyMemory instead of ThresholdPolicyCount (this needs
to bites estimates).
Modified: incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/cmd/tdbloader2.java
URL: http://svn.apache.org/viewvc/incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/cmd/tdbloader2.java?rev=1175775&r1=1175774&r2=1175775&view=diff
==============================================================================
--- incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/cmd/tdbloader2.java (original)
+++ incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/cmd/tdbloader2.java Mon Sep 26 10:28:12 2011
@@ -97,13 +97,14 @@ public class tdbloader2 extends CmdGener
private static String runId = String.valueOf(System.currentTimeMillis()) ; // a unique identifier for this run, it's used for blank node labels
- private static ArgDecl argLocation = new ArgDecl(ArgDecl.HasValue, "loc", "location") ;
- private static ArgDecl argCompression = new ArgDecl(ArgDecl.NoValue, "comp", "compression") ;
- private static ArgDecl argBufferSize = new ArgDecl(ArgDecl.HasValue, "buf", "buffer-size") ;
- private static ArgDecl argGzipOutside = new ArgDecl(ArgDecl.NoValue, "gzip-outside") ;
- private static ArgDecl argSpillSize = new ArgDecl(ArgDecl.HasValue, "spill", "spill-size") ;
- private static ArgDecl argNoStats = new ArgDecl(ArgDecl.NoValue, "no-stats") ;
- private static ArgDecl argNoBuffer = new ArgDecl(ArgDecl.NoValue, "no-buffer") ;
+ private static ArgDecl argLocation = new ArgDecl(ArgDecl.HasValue, "loc", "location") ;
+ private static ArgDecl argCompression = new ArgDecl(ArgDecl.NoValue, "comp", "compression") ;
+ private static ArgDecl argBufferSize = new ArgDecl(ArgDecl.HasValue, "buf", "buffer-size") ;
+ private static ArgDecl argGzipOutside = new ArgDecl(ArgDecl.NoValue, "gzip-outside") ;
+ private static ArgDecl argSpillSize = new ArgDecl(ArgDecl.HasValue, "spill", "spill-size") ;
+ private static ArgDecl argNoStats = new ArgDecl(ArgDecl.NoValue, "no-stats") ;
+ private static ArgDecl argNoBuffer = new ArgDecl(ArgDecl.NoValue, "no-buffer") ;
+ private static ArgDecl argMaxMergeFiles = new ArgDecl(ArgDecl.HasValue, "max-merge-files") ;
private Location location ;
private String locationString ;
@@ -125,13 +126,14 @@ public class tdbloader2 extends CmdGener
public tdbloader2(String...argv)
{
super(argv) ;
- super.add(argLocation, "--loc", "Location") ;
- super.add(argCompression, "--compression", "Use compression for intermediate files") ;
- super.add(argBufferSize, "--buffer-size", "The size of buffers for IO in bytes") ;
- super.add(argGzipOutside, "--gzip-outside", "GZIP...(Buffered...())") ;
- super.add(argSpillSize, "--spill-size", "The size of spillable segments in tuples|records") ;
- super.add(argNoStats, "--no-stats", "Do not generate the stats file") ;
- super.add(argNoBuffer, "--no-buffer", "Do not use Buffered{Input|Output}Stream") ;
+ super.add(argLocation, "--loc", "Location") ;
+ super.add(argCompression, "--compression", "Use compression for intermediate files") ;
+ super.add(argBufferSize, "--buffer-size", "The size of buffers for IO in bytes") ;
+ super.add(argGzipOutside, "--gzip-outside", "GZIP...(Buffered...())") ;
+ super.add(argSpillSize, "--spill-size", "The size of spillable segments in tuples|records") ;
+ super.add(argNoStats, "--no-stats", "Do not generate the stats file") ;
+ super.add(argNoBuffer, "--no-buffer", "Do not use Buffered{Input|Output}Stream") ;
+ super.add(argMaxMergeFiles, "--max-merge-files", "Specify the maximum number of files to merge at the same time (default: 100)") ;
}
@Override
@@ -152,6 +154,8 @@ public class tdbloader2 extends CmdGener
if ( super.hasArg(argBufferSize) )
DataStreamFactory.setBufferSize( Integer.valueOf(super.getValue(argBufferSize)) ) ;
DataStreamFactory.setBuffered( ! super.hasArg(argNoBuffer) ) ;
+ if ( super.hasArg(argMaxMergeFiles) )
+ MultiThreadedSortedDataBag.MAX_SPILL_FILES = Integer.valueOf(super.getValue(argMaxMergeFiles)) ;
datafiles = super.getPositional() ;
Modified: incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/MultiThreadedSortedDataBag.java
URL: http://svn.apache.org/viewvc/incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/MultiThreadedSortedDataBag.java?rev=1175775&r1=1175774&r2=1175775&view=diff
==============================================================================
--- incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/MultiThreadedSortedDataBag.java (original)
+++ incubator/jena/Scratch/PC/tdbloader2/trunk/src/main/java/org/apache/jena/tdbloader2/MultiThreadedSortedDataBag.java Mon Sep 26 10:28:12 2011
@@ -61,6 +61,8 @@ public class MultiThreadedSortedDataBag<
protected boolean finishedAdding = false;
protected boolean spilled = false;
protected boolean closed = false;
+
+ public static int MAX_SPILL_FILES = 100 ;
public MultiThreadedSortedDataBag(ThresholdPolicy<E> policy, SerializationFactory<E> serializerFactory, Comparator<? super E> comparator)
@@ -72,7 +74,7 @@ public class MultiThreadedSortedDataBag<
// this will prevent to have more than once spiller running and one queued up
this.pool.setRejectedExecutionHandler(this.block) ;
}
-
+
protected void checkClosed()
{
if (closed) throw new AtlasException("SortedDataBag is closed, no operations can be performed on it.") ;
@@ -87,6 +89,10 @@ public class MultiThreadedSortedDataBag<
{
return false;
}
+
+ protected List<File> getSpillFiles() {
+ return spillFiles ;
+ }
public void add(E item)
{
@@ -185,9 +191,16 @@ public class MultiThreadedSortedDataBag<
*
* @return an Iterator
*/
- @SuppressWarnings({ "unchecked" })
public Iterator<E> iterator()
{
+ preMerge();
+
+ return iterator(spillFiles.size());
+ }
+
+ @SuppressWarnings({ "unchecked" })
+ private Iterator<E> iterator(int size)
+ {
checkClosed();
int memSize = memory.size();
@@ -212,18 +225,18 @@ public class MultiThreadedSortedDataBag<
} catch (InterruptedException e) {
throw new AtlasException(e) ;
}
-
}
- List<Iterator<E>> inputs = new ArrayList<Iterator<E>>(spillFiles.size() + (memSize > 0 ? 1 : 0));
+ List<Iterator<E>> inputs = new ArrayList<Iterator<E>>(size + (memSize > 0 ? 1 : 0));
if (memSize > 0)
{
inputs.add(memory.iterator());
}
- for (File spillFile : spillFiles)
+ for ( int i = 0; i < size; i++ )
{
+ File spillFile = spillFiles.get(i);
try
{
InputStream in = new BufferedInputStream(new FileInputStream(spillFile));
@@ -262,6 +275,39 @@ public class MultiThreadedSortedDataBag<
}
}
+ private void preMerge() {
+ if (spillFiles == null || spillFiles.size() <= MAX_SPILL_FILES) { return; }
+
+ try {
+ while ( spillFiles.size() > MAX_SPILL_FILES ) {
+ Sink<E> sink = serializationFactory.createSerializer(getSpillFile()) ;
+ Iterator<E> ssi = iterator(MAX_SPILL_FILES) ;
+ try {
+ while ( ssi.hasNext() ) {
+ sink.send( ssi.next() );
+ }
+ } finally {
+ Iter.close(ssi) ;
+ sink.close() ;
+ }
+
+ List<File> toRemove = new ArrayList<File>(MAX_SPILL_FILES) ;
+ for ( int i = 0; i < MAX_SPILL_FILES; i++ ) {
+ File file = spillFiles.get(i) ;
+ file.delete() ;
+ toRemove.add(file) ;
+ }
+
+ spillFiles.removeAll(toRemove) ;
+
+ memory = new ArrayList<E>() ;
+ }
+ } catch (IOException e) {
+ throw new AtlasException(e) ;
+ }
+
+ }
+
public void close()
{
if (!closed)
@@ -385,7 +431,7 @@ public class MultiThreadedSortedDataBag<
return tuple.hashCode();
}
}
-
+
}
}
Added: incubator/jena/Scratch/PC/tdbloader2/trunk/src/test/java/org/apache/jena/tdbloader2/TestMultiThreadedSortedDataBag.java
URL: http://svn.apache.org/viewvc/incubator/jena/Scratch/PC/tdbloader2/trunk/src/test/java/org/apache/jena/tdbloader2/TestMultiThreadedSortedDataBag.java?rev=1175775&view=auto
==============================================================================
--- incubator/jena/Scratch/PC/tdbloader2/trunk/src/test/java/org/apache/jena/tdbloader2/TestMultiThreadedSortedDataBag.java (added)
+++ incubator/jena/Scratch/PC/tdbloader2/trunk/src/test/java/org/apache/jena/tdbloader2/TestMultiThreadedSortedDataBag.java Mon Sep 26 10:28:12 2011
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.tdbloader2;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+
+import junit.framework.TestCase;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.openjena.atlas.data.ThresholdPolicyCount;
+import org.openjena.atlas.iterator.Iter;
+import org.openjena.riot.SerializationFactoryFinder;
+
+import com.hp.hpl.jena.query.Query;
+import com.hp.hpl.jena.query.SortCondition;
+import com.hp.hpl.jena.sparql.core.Var;
+import com.hp.hpl.jena.sparql.engine.binding.Binding;
+import com.hp.hpl.jena.sparql.engine.binding.BindingComparator;
+import com.hp.hpl.jena.sparql.engine.binding.BindingFactory;
+import com.hp.hpl.jena.sparql.expr.ExprVar;
+import com.hp.hpl.jena.sparql.util.NodeFactory;
+
+public class TestMultiThreadedSortedDataBag extends TestCase
+{
+ private List<Binding> unsorted;
+ private BindingComparator comparator ;
+ private MultiThreadedSortedDataBag<Binding> db ;
+
+ private int N = 500 ;
+ private int MAX_SPILL_FILES = 10 ;
+ private int THRESHOLD = 5 ;
+
+ @Before @Override public void setUp()
+ {
+ Random random = new Random();
+ unsorted = new ArrayList<Binding>();
+ Var var = Var.alloc("x");
+ for(int i = 0; i < N; i++){
+ unsorted.add(BindingFactory.binding(var, NodeFactory.intToNode(random.nextInt())));
+ }
+
+ List<SortCondition> conditions = new ArrayList<SortCondition>();
+ conditions.add(new SortCondition(new ExprVar("x"), Query.ORDER_ASCENDING));
+ comparator = new BindingComparator(conditions);
+
+ db = new MultiThreadedSortedDataBag<Binding>( new ThresholdPolicyCount<Binding>(THRESHOLD), SerializationFactoryFinder.bindingSerializationFactory(), comparator);
+ MultiThreadedSortedDataBag.MAX_SPILL_FILES = MAX_SPILL_FILES ;
+ }
+
+ @Test public void testSorting()
+ {
+ List<Binding> sorted = new ArrayList<Binding>();
+ try
+ {
+ db.addAll(unsorted);
+ Iterator<Binding> iter = db.iterator();
+ while (iter.hasNext())
+ {
+ sorted.add(iter.next());
+ }
+ Iter.close(iter);
+ }
+ finally
+ {
+ db.close();
+ }
+
+ Collections.sort(unsorted, comparator);
+ assertEquals(unsorted, sorted);
+ }
+
+ @Test public void testTemporaryFilesAreCleanedUpAfterCompletion()
+ {
+ try
+ {
+ db.addAll(unsorted);
+
+ int count = 0;
+ for (File file : db.getSpillFiles())
+ {
+ if (file.exists())
+ {
+ count++;
+ }
+ }
+ assertEquals(N / THRESHOLD - 1, count);
+
+ Iterator<Binding> iter = db.iterator();
+ while (iter.hasNext())
+ {
+ iter.next();
+ }
+ Iter.close(iter);
+ }
+ finally
+ {
+ db.close();
+ }
+
+ int count = 0;
+ for (File file : db.getSpillFiles())
+ {
+ if (file.exists())
+ {
+ count++;
+ }
+ }
+ assertEquals(0, count);
+ }
+
+}
Propchange: incubator/jena/Scratch/PC/tdbloader2/trunk/src/test/java/org/apache/jena/tdbloader2/TestMultiThreadedSortedDataBag.java
------------------------------------------------------------------------------
svn:mime-type = text/plain