You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by ca...@apache.org on 2011/09/09 18:03:49 UTC

svn commit: r1167265 - in /incubator/jena/Jena2/ARQ/trunk: src-test/com/hp/hpl/jena/sparql/ src-test/com/hp/hpl/jena/sparql/engine/ src-test/com/hp/hpl/jena/sparql/engine/iterator/ src-test/org/openjena/atlas/data/ src/com/hp/hpl/jena/sparql/engine/ite...

Author: castagna
Date: Fri Sep  9 16:03:48 2011
New Revision: 1167265

URL: http://svn.apache.org/viewvc?rev=1167265&view=rev
Log:
JENA-44

QueryIterSort is now using a SortedDataBag to, eventually, spill on disk. The threshold to decide when to spill on disk is configured via ARQ symbol (i.e. spillOnDiskSortingThreshold) and by default the spilling is turned off. People can change the value of the threshold on a query level, dataset level or globally.

Thank you Sam for the initial patch and help in testing this, Stephen for the DataBags and Andy for all the feedback and help on this.

Similarly, we can commit and close JENA-45 now.

Added:
    incubator/jena/Jena2/ARQ/trunk/src-test/com/hp/hpl/jena/sparql/engine/TS_Engine.java   (with props)
    incubator/jena/Jena2/ARQ/trunk/src-test/com/hp/hpl/jena/sparql/engine/iterator/
    incubator/jena/Jena2/ARQ/trunk/src-test/com/hp/hpl/jena/sparql/engine/iterator/TestQueryIterSort.java   (with props)
    incubator/jena/Jena2/ARQ/trunk/src-test/org/openjena/atlas/data/DataBagExaminer.java   (with props)
    incubator/jena/Jena2/ARQ/trunk/src/org/openjena/atlas/data/ThresholdPolicyNever.java   (with props)
Modified:
    incubator/jena/Jena2/ARQ/trunk/src-test/com/hp/hpl/jena/sparql/ARQTestSuite.java
    incubator/jena/Jena2/ARQ/trunk/src-test/org/openjena/atlas/data/TestSortedDataBag.java
    incubator/jena/Jena2/ARQ/trunk/src/com/hp/hpl/jena/sparql/engine/iterator/QueryIterSort.java
    incubator/jena/Jena2/ARQ/trunk/src/org/openjena/atlas/data/BagFactory.java

Modified: incubator/jena/Jena2/ARQ/trunk/src-test/com/hp/hpl/jena/sparql/ARQTestSuite.java
URL: http://svn.apache.org/viewvc/incubator/jena/Jena2/ARQ/trunk/src-test/com/hp/hpl/jena/sparql/ARQTestSuite.java?rev=1167265&r1=1167264&r2=1167265&view=diff
==============================================================================
--- incubator/jena/Jena2/ARQ/trunk/src-test/com/hp/hpl/jena/sparql/ARQTestSuite.java (original)
+++ incubator/jena/Jena2/ARQ/trunk/src-test/com/hp/hpl/jena/sparql/ARQTestSuite.java Fri Sep  9 16:03:48 2011
@@ -16,6 +16,7 @@ import com.hp.hpl.jena.query.ARQ ;
 import com.hp.hpl.jena.sparql.algebra.TC_Algebra ;
 import com.hp.hpl.jena.sparql.api.TS_API ;
 import com.hp.hpl.jena.sparql.engine.binding.TestBindingStreams ;
+import com.hp.hpl.jena.sparql.engine.TS_Engine;
 import com.hp.hpl.jena.sparql.engine.main.QueryEngineMain ;
 import com.hp.hpl.jena.sparql.engine.ref.QueryEngineRef ;
 import com.hp.hpl.jena.sparql.expr.E_Function ;
@@ -110,6 +111,7 @@ public class ARQTestSuite extends TestSu
         ts.addTest(new JUnit4TestAdapter(TS_Graph.class)) ;
         ts.addTest(new JUnit4TestAdapter(TS_Lang.class)) ;
         ts.addTest(new JUnit4TestAdapter(TS_Solver.class)) ;
+        ts.addTest(new JUnit4TestAdapter(TS_Engine.class)) ; 
         
         return ts ;
     }

Added: incubator/jena/Jena2/ARQ/trunk/src-test/com/hp/hpl/jena/sparql/engine/TS_Engine.java
URL: http://svn.apache.org/viewvc/incubator/jena/Jena2/ARQ/trunk/src-test/com/hp/hpl/jena/sparql/engine/TS_Engine.java?rev=1167265&view=auto
==============================================================================
--- incubator/jena/Jena2/ARQ/trunk/src-test/com/hp/hpl/jena/sparql/engine/TS_Engine.java (added)
+++ incubator/jena/Jena2/ARQ/trunk/src-test/com/hp/hpl/jena/sparql/engine/TS_Engine.java Fri Sep  9 16:03:48 2011
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.hp.hpl.jena.sparql.engine;
+
+import org.junit.runner.RunWith ;
+import org.junit.runners.Suite ;
+
+import com.hp.hpl.jena.sparql.engine.iterator.TestQueryIterSort ;
+
+@RunWith(Suite.class)
+@Suite.SuiteClasses( {
+      TestQueryIterSort.class
+})
+
+public class TS_Engine {}
\ No newline at end of file

Propchange: incubator/jena/Jena2/ARQ/trunk/src-test/com/hp/hpl/jena/sparql/engine/TS_Engine.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/jena/Jena2/ARQ/trunk/src-test/com/hp/hpl/jena/sparql/engine/iterator/TestQueryIterSort.java
URL: http://svn.apache.org/viewvc/incubator/jena/Jena2/ARQ/trunk/src-test/com/hp/hpl/jena/sparql/engine/iterator/TestQueryIterSort.java?rev=1167265&view=auto
==============================================================================
--- incubator/jena/Jena2/ARQ/trunk/src-test/com/hp/hpl/jena/sparql/engine/iterator/TestQueryIterSort.java (added)
+++ incubator/jena/Jena2/ARQ/trunk/src-test/com/hp/hpl/jena/sparql/engine/iterator/TestQueryIterSort.java Fri Sep  9 16:03:48 2011
@@ -0,0 +1,362 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.hp.hpl.jena.sparql.engine.iterator;
+
+import static org.junit.Assert.assertEquals ;
+import static org.junit.Assert.assertNotNull ;
+import static org.junit.Assert.assertTrue ;
+
+import java.util.ArrayList ;
+import java.util.Iterator ;
+import java.util.List ;
+import java.util.Random ;
+
+import org.junit.Before ;
+import org.junit.Test ;
+import org.openjena.atlas.data.DataBagExaminer ;
+import org.openjena.atlas.io.IndentedWriter ;
+
+import com.hp.hpl.jena.datatypes.xsd.XSDDatatype ;
+import com.hp.hpl.jena.graph.Graph ;
+import com.hp.hpl.jena.graph.Node ;
+import com.hp.hpl.jena.query.Query ;
+import com.hp.hpl.jena.query.QueryCancelledException ;
+import com.hp.hpl.jena.query.SortCondition ;
+import com.hp.hpl.jena.shared.PrefixMapping ;
+import com.hp.hpl.jena.sparql.ARQNotImplemented ;
+import com.hp.hpl.jena.sparql.core.DatasetGraph ;
+import com.hp.hpl.jena.sparql.core.Var ;
+import com.hp.hpl.jena.sparql.engine.ExecutionContext ;
+import com.hp.hpl.jena.sparql.engine.QueryIterator ;
+import com.hp.hpl.jena.sparql.engine.binding.Binding ;
+import com.hp.hpl.jena.sparql.engine.binding.BindingComparator ;
+import com.hp.hpl.jena.sparql.engine.binding.BindingFactory ;
+import com.hp.hpl.jena.sparql.engine.main.OpExecutorFactory ;
+import com.hp.hpl.jena.sparql.expr.ExprVar ;
+import com.hp.hpl.jena.sparql.serializer.SerializationContext ;
+import com.hp.hpl.jena.sparql.util.Context ;
+
+public class TestQueryIterSort {
+
+    private static final String LETTERS = "qwertyuiopasdfghjklzxcvbnmQWERTYUIOPASDFGHJKLZXCVBNM" ;
+    private Random random ;
+    private List<Binding> unsorted ;
+    private BindingComparator comparator ;
+    private CallbackIterator iterator ;
+
+    @Before
+    public void setup() 
+    {
+        random = new Random();
+        Var[] vars = new Var[]{
+            Var.alloc("1"), Var.alloc("2"), Var.alloc("3"),
+            Var.alloc("4"), Var.alloc("5"), Var.alloc("6"),
+            Var.alloc("7"), Var.alloc("8"), Var.alloc("9"), Var.alloc("0")
+        };
+        unsorted = new ArrayList<Binding>();
+        for(int i = 0; i < 500; i++){
+            unsorted.add(randomBinding(vars));
+        }
+        
+        List<SortCondition> conditions = new ArrayList<SortCondition>(); 
+        conditions.add(new SortCondition(new ExprVar("8"), Query.ORDER_ASCENDING));
+        comparator = new BindingComparator(conditions);
+        
+        iterator = new CallbackIterator(unsorted.iterator(), 25, null);
+        iterator.setCallback(new Callback() {
+            //@Override 
+            public void call() { throw new QueryCancelledException() ; }
+        });
+    }
+    
+    @Test
+    public void testNoSpill()
+    {
+        iterator.setCallback(new Callback() { public void call() { /* do nothing */ } });
+        assertEquals(0, iterator.getReturnedElementCount());
+        Context context = new Context() ;
+        ExecutionContext executionContext = new ExecutionContext(context, (Graph)null, (DatasetGraph)null, (OpExecutorFactory)null) ;
+        QueryIterSort qIter = new QueryIterSort(iterator, comparator, executionContext) ;
+        try
+        {
+            assertEquals(0, iterator.getReturnedElementCount()) ;
+            assertEquals(0, DataBagExaminer.countTemporaryFiles(qIter.db)) ;
+            qIter.hasNext() ;
+            assertEquals(500, iterator.getReturnedElementCount()) ;
+            assertEquals(0, DataBagExaminer.countTemporaryFiles(qIter.db)) ;
+        }
+        finally
+        {
+            qIter.close() ;
+        }
+    }
+    
+    @Test
+    public void testCleanAfterClose()
+    {
+        iterator.setCallback(new Callback() { public void call() { /* do nothing */ } });
+        assertEquals(0, iterator.getReturnedElementCount());
+        Context context = new Context() ;
+        context.set(QueryIterSort.spillOnDiskSortingThreshold, 10L) ;
+        ExecutionContext executionContext = new ExecutionContext(context, (Graph)null, (DatasetGraph)null, (OpExecutorFactory)null) ;
+        QueryIterSort qIter = new QueryIterSort(iterator, comparator, executionContext) ;
+        try
+        {
+            assertEquals(0, iterator.getReturnedElementCount()) ;
+            assertEquals(0, DataBagExaminer.countTemporaryFiles(qIter.db)) ;
+            qIter.hasNext() ;
+            assertEquals(500, iterator.getReturnedElementCount()) ;
+            assertEquals(49, DataBagExaminer.countTemporaryFiles(qIter.db)) ;
+        }
+        finally
+        {
+            qIter.close() ;
+        }
+        
+        assertEquals(0, DataBagExaminer.countTemporaryFiles(qIter.db)) ;
+    }
+    
+    @Test
+    public void testCleanAfterExhaustion()
+    {
+        iterator.setCallback(new Callback() { public void call() { /* do nothing */ } });
+        assertEquals(0, iterator.getReturnedElementCount());
+        Context context = new Context() ;
+        context.set(QueryIterSort.spillOnDiskSortingThreshold, 10L) ;
+        ExecutionContext executionContext = new ExecutionContext(context, (Graph)null, (DatasetGraph)null, (OpExecutorFactory)null) ;
+        QueryIterSort qIter = new QueryIterSort(iterator, comparator, executionContext) ;
+        
+        // Usually qIter should be in a try/finally block, but we are testing the case that the user forgot to do that.
+        // As a failsafe, QueryIteratorBase should close it when the iterator is exhausted.
+        assertEquals(0, iterator.getReturnedElementCount()) ;
+        assertEquals(0, DataBagExaminer.countTemporaryFiles(qIter.db)) ;
+        qIter.hasNext() ;
+        assertEquals(500, iterator.getReturnedElementCount()) ;
+        assertEquals(49, DataBagExaminer.countTemporaryFiles(qIter.db)) ;
+        while (qIter.hasNext())
+        {
+            qIter.next();
+        }
+        assertEquals(0, DataBagExaminer.countTemporaryFiles(qIter.db)) ;
+    }
+    
+    @Test(expected=QueryCancelledException.class)
+    public void testCancelInterruptsInitialisation() 
+    {
+
+        assertEquals(0, iterator.getReturnedElementCount());
+        Context context = new Context() ;
+        context.set(QueryIterSort.spillOnDiskSortingThreshold, 10L) ;
+        ExecutionContext executionContext = new ExecutionContext(context, (Graph)null, (DatasetGraph)null, (OpExecutorFactory)null) ;
+        QueryIterSort qIter = new QueryIterSort(iterator, comparator, executionContext) ;
+        try 
+        {
+            assertEquals(0, iterator.getReturnedElementCount()) ;
+            assertEquals(0, DataBagExaminer.countTemporaryFiles(qIter.db)) ;
+            qIter.cancel() ;
+            qIter.hasNext() ;  // throws a QueryCancelledException
+        } 
+        finally 
+        {
+            assertTrue(iterator.isCanceled()) ;
+            assertEquals(0, iterator.getReturnedElementCount()) ;
+            assertEquals(0, DataBagExaminer.countTemporaryFiles(qIter.db));
+            qIter.close() ;
+        }
+        
+        assertEquals(0, DataBagExaminer.countTemporaryFiles(qIter.db)) ;
+    }
+    
+    @Test(expected=QueryCancelledException.class)
+    public void testCancelInterruptsExternalSortAfterStartingIteration() 
+    {
+        assertEquals(0, iterator.getReturnedElementCount());
+        Context context = new Context() ;
+        context.set(QueryIterSort.spillOnDiskSortingThreshold, 10L) ;
+        ExecutionContext executionContext = new ExecutionContext(context, (Graph)null, (DatasetGraph)null, (OpExecutorFactory)null) ;
+        QueryIterSort qIter = new QueryIterSort(iterator, comparator, executionContext) ;
+        try
+        {
+            assertEquals(0, iterator.getReturnedElementCount()) ;
+            assertEquals(0, DataBagExaminer.countTemporaryFiles(qIter.db)) ;
+            qIter.hasNext() ;  // throws a QueryCancelledException
+        }
+        catch ( QueryCancelledException e )
+        {
+            // expected
+            assertEquals(26, iterator.getReturnedElementCount()) ;
+            // This is zero because QueryIteratorBase will call close() before throwing the QueryCancelledException.
+            // It does this as a failsafe in case the user doesn't close the QueryIterator themselves.
+            assertEquals(0, DataBagExaminer.countTemporaryFiles(qIter.db)) ;
+            throw e ;
+        }
+        finally
+        {
+            qIter.close() ;
+        }
+        
+        assertEquals(0, DataBagExaminer.countTemporaryFiles(qIter.db)) ;
+    }
+    
+    @Test(expected=QueryCancelledException.class)
+    public void testCancelInterruptsExternalSortAtStartOfIteration() 
+    {
+        iterator = new CallbackIterator(unsorted.iterator(), 25, null);
+        iterator.setCallback(new Callback() { public void call() { /* do nothing */ } });
+        assertEquals(0, iterator.getReturnedElementCount());
+        Context context = new Context() ;
+        context.set(QueryIterSort.spillOnDiskSortingThreshold, 10L) ;
+        ExecutionContext executionContext = new ExecutionContext(context, (Graph)null, (DatasetGraph)null, (OpExecutorFactory)null) ;
+        QueryIterSort qIter = new QueryIterSort(iterator, comparator, executionContext) ;
+        try 
+        {
+            assertTrue(qIter.hasNext()) ;
+            assertEquals(49, DataBagExaminer.countTemporaryFiles(qIter.db));
+            assertNotNull(qIter.next()) ;
+            assertTrue(qIter.hasNext()) ;
+            qIter.cancel() ;
+            qIter.hasNext() ;  // throws a QueryCancelledException
+        }
+        finally 
+        {
+            //assertTrue(iterator.isCanceled()) ;
+            assertEquals(500, iterator.getReturnedElementCount()) ;
+            assertEquals(0, DataBagExaminer.countTemporaryFiles(qIter.db));
+            qIter.close() ;
+        }
+        
+        assertEquals(0, DataBagExaminer.countTemporaryFiles(qIter.db)) ;
+    }
+
+    private Binding randomBinding(Var[] vars)
+    {
+        Binding binding = BindingFactory.create();
+        binding.add(vars[0], Node.createAnon());
+        binding.add(vars[1], Node.createURI(randomURI()));
+        binding.add(vars[2], Node.createURI(randomURI()));
+        binding.add(vars[3], Node.createLiteral(randomString(20)));
+        binding.add(vars[4], Node.createAnon());
+        binding.add(vars[5], Node.createURI(randomURI()));
+        binding.add(vars[6], Node.createURI(randomURI()));
+        binding.add(vars[7], Node.createLiteral(randomString(5)));
+        binding.add(vars[8], Node.createLiteral("" + random.nextInt(), null, XSDDatatype.XSDinteger));
+        binding.add(vars[9], Node.createAnon());
+        return binding;
+    }
+    
+    private String randomURI() 
+    {
+        return String.format("http://%s.example.com/%s", randomString(10), randomString(10));
+    }
+    
+    private String randomString(int length)
+    {
+        StringBuilder builder = new StringBuilder();
+        for(int i = 0; i < length; i++){
+            builder.append(LETTERS.charAt(random.nextInt(LETTERS.length())));
+        }
+        return builder.toString();
+    }
+    
+
+    private class CallbackIterator implements QueryIterator
+    {
+        int elementsReturned = 0 ;
+        Callback callback ;
+        int trigger ;
+        Iterator<Binding> delegate ;
+        boolean canceled = false ;
+        
+        public CallbackIterator(Iterator<Binding> delegate, int trigger, Callback callback)
+        {
+            this.delegate = delegate ;
+            this.callback = callback ;
+            this.trigger = trigger ;
+        }
+        
+        public void setCallback(Callback callback) 
+        {
+            this.callback = callback ;
+        }
+        
+        //@Override
+        public boolean hasNext() 
+        {
+            return delegate.hasNext() ;
+        }
+
+        //@Override
+        public Binding next() 
+        {
+            if (elementsReturned++ >= trigger)
+            {
+                callback.call() ;
+            }
+            return delegate.next() ;
+        }
+
+        //@Override
+        public void remove()
+        {
+            delegate.remove() ;
+        }
+        
+        public int getReturnedElementCount()
+        {
+            return elementsReturned ;
+        }
+
+        public boolean isCanceled() {
+            return canceled ;
+        }
+
+        //@Override
+        public Binding nextBinding() 
+        {
+            if (elementsReturned++ >= trigger) callback.call() ;
+            return delegate.next() ;
+        }
+
+        //@Override
+        @SuppressWarnings("deprecation") public void abort() { throw new ARQNotImplemented() ; }
+        
+        //@Override
+        public void cancel() { canceled = true ; }
+        
+        //@Override
+        public void close() { throw new ARQNotImplemented() ; }
+        
+        //@Override
+        public void output(IndentedWriter out, SerializationContext sCxt) { throw new ARQNotImplemented() ; }
+        
+        //@Override
+        public String toString(PrefixMapping pmap) { throw new ARQNotImplemented() ; }
+        
+        //@Override
+        public void output(IndentedWriter out) { throw new ARQNotImplemented() ; }
+
+    }
+
+    private interface Callback
+    {
+        public void call() ;
+    }
+
+}
+

Propchange: incubator/jena/Jena2/ARQ/trunk/src-test/com/hp/hpl/jena/sparql/engine/iterator/TestQueryIterSort.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/jena/Jena2/ARQ/trunk/src-test/org/openjena/atlas/data/DataBagExaminer.java
URL: http://svn.apache.org/viewvc/incubator/jena/Jena2/ARQ/trunk/src-test/org/openjena/atlas/data/DataBagExaminer.java?rev=1167265&view=auto
==============================================================================
--- incubator/jena/Jena2/ARQ/trunk/src-test/org/openjena/atlas/data/DataBagExaminer.java (added)
+++ incubator/jena/Jena2/ARQ/trunk/src-test/org/openjena/atlas/data/DataBagExaminer.java Fri Sep  9 16:03:48 2011
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.openjena.atlas.data;
+
+import java.io.File ;
+
+/**
+ * This class exists so that we can examine the internals of a DataBag during testing (which we can do here since it is in the same package).
+ */
+public class DataBagExaminer
+{
+    public static int countTemporaryFiles(AbstractDataBag<?> bag)
+    {
+        int count = 0;
+        for (File tempFile : bag.spillFiles)
+        {
+            if (tempFile.exists())
+            {
+                count++;
+            }
+        }
+        return count;
+    }
+}
+

Propchange: incubator/jena/Jena2/ARQ/trunk/src-test/org/openjena/atlas/data/DataBagExaminer.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: incubator/jena/Jena2/ARQ/trunk/src-test/org/openjena/atlas/data/TestSortedDataBag.java
URL: http://svn.apache.org/viewvc/incubator/jena/Jena2/ARQ/trunk/src-test/org/openjena/atlas/data/TestSortedDataBag.java?rev=1167265&r1=1167264&r2=1167265&view=diff
==============================================================================
--- incubator/jena/Jena2/ARQ/trunk/src-test/org/openjena/atlas/data/TestSortedDataBag.java (original)
+++ incubator/jena/Jena2/ARQ/trunk/src-test/org/openjena/atlas/data/TestSortedDataBag.java Fri Sep  9 16:03:48 2011
@@ -145,133 +145,6 @@ public class TestSortedDataBag extends T
         assertEquals(0, count);
     }
     
-    // TODO Port the tests below to use the new DataBags
-    
-//    @Test
-//    public void testCancelInterruptsInitialisation() throws Exception
-//    {
-//        List<SortCondition> conditions = new ArrayList<SortCondition>(); 
-//        conditions.add(new SortCondition(new ExprVar("8"), Query.ORDER_ASCENDING));
-//        final BindingComparator comparator = new BindingComparator(conditions);
-//        
-//        final File tmpDir = new File(System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString());
-//        tmpDir.deleteOnExit();
-//        tmpDir.mkdirs();
-//        assertEquals(0, tmpDir.listFiles().length);
-//        
-//        SortedDataBag<Binding> db = new SortedDataBag<Binding>(
-//                new ThresholdPolicyCount<Binding>(10),
-//                SerializationFactoryFinder.bindingSerializationFactory(),
-//                comparator);
-//        
-//        
-//        //final ExternalBindingSort sort = new ExternalBindingSort();
-//        final CallbackIterator<Binding> iterator = new CallbackIterator<Binding>(unsorted.iterator(), 5, null);
-//        
-//        final QueryIterSort sort = new QueryIterSort(iterator, comparator, (ExecutionContext)null);
-//        
-//        iterator.setCallback(
-//            new Callback(){
-//                public void call() {
-//                    sort.cancel();
-//                }
-//        });
-//        
-//        assertEquals(0, iterator.getReturnedElementCount());
-//        int batchSize = 10;
-//        QueryIterator qIter = sort.sort(iterator, comparator, batchSize, 1, tmpDir);
-//        assertEquals(batchSize, iterator.getReturnedElementCount());
-//        assertTrue(qIter instanceof QueryIterNullIterator);
-//    }
-    
-//    @Test    
-//    public void testCancelInterruptsExternalSortAtStartOfIteration()
-//    {
-//        List<SortCondition> conditions = new ArrayList<SortCondition>(); 
-//        conditions.add(new SortCondition(new ExprVar("8"), Query.ORDER_ASCENDING));
-//        conditions.add(new SortCondition(new ExprVar("1"), Query.ORDER_ASCENDING));
-//        conditions.add(new SortCondition(new ExprVar("0"), Query.ORDER_DESCENDING));
-//        BindingComparator comparator = new BindingComparator(conditions);
-//        
-//        ExternalBindingSort sort = new ExternalBindingSort();
-//        List<Binding> sorted = new ArrayList<Binding>();
-//        QueryIterator iter = sort.sort(unsorted.iterator(), comparator, 
-//                                        10, 1, new File(System.getProperty("java.io.tmpdir")));
-//        try{
-//            iter.cancel();
-//        }catch(Exception e){
-//            fail("Unexpected exception");
-//        }
-//        getNextAndExpectException(iter);
-//    }
-//
-//
-//    @Test
-//    public void testCancelInterruptsExternalSortAfterStartingIteration()
-//    {
-//        List<SortCondition> conditions = new ArrayList<SortCondition>(); 
-//        conditions.add(new SortCondition(new ExprVar("8"), Query.ORDER_ASCENDING));
-//        conditions.add(new SortCondition(new ExprVar("1"), Query.ORDER_ASCENDING));
-//        conditions.add(new SortCondition(new ExprVar("0"), Query.ORDER_DESCENDING));
-//        BindingComparator comparator = new BindingComparator(conditions);
-//        
-//        ExternalBindingSort sort = new ExternalBindingSort();
-//        List<Binding> sorted = new ArrayList<Binding>();
-//        QueryIterator iter = sort.sort(unsorted.iterator(), comparator, 
-//                                        10, 1, new File(System.getProperty("java.io.tmpdir")));
-//        try{
-//            assertNotNull(iter.next());
-//            assertNotNull(iter.next());
-//            assertNotNull(iter.next());
-//            assertNotNull(iter.next());
-//            assertNotNull(iter.next());
-//            iter.cancel();
-//        }catch(Exception e){
-//            fail("Unexpected exception");
-//        }
-//        getNextAndExpectException(iter);
-//    }
-//        
-//    @Test public void testTemporaryFilesAreCleanedUpAfterCancellationDuringInitialisation()
-//    {   
-//        List<SortCondition> conditions = new ArrayList<SortCondition>(); 
-//        conditions.add(new SortCondition(new ExprVar("8"), Query.ORDER_ASCENDING));
-//        final BindingComparator comparator = new BindingComparator(conditions);
-//        final File tmpDir = new File(System.getProperty("java.io.tmpdir"), 
-//                                                    UUID.randomUUID().toString());
-//        tmpDir.deleteOnExit();
-//        tmpDir.mkdirs();
-//        assertEquals(0, tmpDir.listFiles().length);
-//        
-//        final ExternalBindingSort sort = new ExternalBindingSort();
-//        final CallbackIterator<Binding> iterator = new CallbackIterator<Binding>(unsorted.iterator(), 30, 
-//            new Callback(){
-//                public void call() {
-//                    sort.cancel();
-//                }
-//        }); 
-//        QueryIterator qIter = sort.sort(iterator, comparator, 10, 1, tmpDir);
-//        assertEquals(0, tmpDir.listFiles().length);
-//    }
-//
-//    @Test public void testTemporaryFilesAreCleanedUpAfterCancellationDuringIteration()
-//    {   
-//        List<SortCondition> conditions = new ArrayList<SortCondition>(); 
-//        conditions.add(new SortCondition(new ExprVar("8"), Query.ORDER_ASCENDING));
-//        final BindingComparator comparator = new BindingComparator(conditions);
-//        final File tmpDir = new File(System.getProperty("java.io.tmpdir"), 
-//                                                    UUID.randomUUID().toString());
-//        tmpDir.deleteOnExit();
-//        tmpDir.mkdirs();
-//        assertEquals(0, tmpDir.listFiles().length);
-//        
-//        ExternalBindingSort sort = new ExternalBindingSort();
-//        QueryIterator qIter = sort.sort(unsorted.iterator(), comparator, 10, 1, tmpDir);
-//        assertTrue(tmpDir.listFiles().length > 0);
-//        qIter.cancel();
-//        assertEquals(0, tmpDir.listFiles().length);
-//    }
-    
     private Binding randomBinding(Var[] vars)
     {
         Binding binding = BindingFactory.create();
@@ -313,44 +186,5 @@ public class TestSortedDataBag extends T
             fail("Unexpected exception");
         }
     }
-    
-    private class CallbackIterator<T> implements Iterator<T>{
-        int elementsReturned = 0;
-        Callback callback;
-        int trigger;
-        Iterator<T> delegate;
-        
-        public CallbackIterator(Iterator<T> delegate, int trigger, Callback callback){
-            this.delegate = delegate;
-            this.callback = callback;
-            this.trigger = trigger;
-        }
-        
-        public void setCallback(Callback callback) {
-            this.callback = callback;
-        }
-        
-        public boolean hasNext() {
-            return delegate.hasNext();
-        }
 
-        public T next() {
-            if (elementsReturned++ >= trigger){
-                callback.call();
-            }
-            return delegate.next();
-        }
-
-        public void remove() {
-            delegate.remove();
-        }
-        
-        public int getReturnedElementCount(){
-            return elementsReturned;
-        } 
-    }
-    
-    private interface Callback{
-        public void call();
-    }
 }

Modified: incubator/jena/Jena2/ARQ/trunk/src/com/hp/hpl/jena/sparql/engine/iterator/QueryIterSort.java
URL: http://svn.apache.org/viewvc/incubator/jena/Jena2/ARQ/trunk/src/com/hp/hpl/jena/sparql/engine/iterator/QueryIterSort.java?rev=1167265&r1=1167264&r2=1167265&view=diff
==============================================================================
--- incubator/jena/Jena2/ARQ/trunk/src/com/hp/hpl/jena/sparql/engine/iterator/QueryIterSort.java (original)
+++ incubator/jena/Jena2/ARQ/trunk/src/com/hp/hpl/jena/sparql/engine/iterator/QueryIterSort.java Fri Sep  9 16:03:48 2011
@@ -8,71 +8,101 @@
 
 package com.hp.hpl.jena.sparql.engine.iterator;
 
-import java.util.ArrayList ;
-import java.util.Arrays ;
 import java.util.Comparator ;
 import java.util.Iterator ;
 import java.util.List ;
 
+import org.openjena.atlas.data.BagFactory ;
+import org.openjena.atlas.data.SortedDataBag ;
+import org.openjena.atlas.data.ThresholdPolicy ;
+import org.openjena.atlas.data.ThresholdPolicyCount ;
+import org.openjena.atlas.data.ThresholdPolicyNever ;
 import org.openjena.atlas.iterator.IteratorDelayedInitialization ;
+import org.openjena.atlas.lib.Closeable ;
+import org.openjena.riot.SerializationFactoryFinder ;
 
+import com.hp.hpl.jena.query.QueryCancelledException ;
 import com.hp.hpl.jena.query.SortCondition ;
+import com.hp.hpl.jena.sparql.ARQConstants ;
 import com.hp.hpl.jena.sparql.engine.ExecutionContext ;
 import com.hp.hpl.jena.sparql.engine.QueryIterator ;
 import com.hp.hpl.jena.sparql.engine.binding.Binding ;
 import com.hp.hpl.jena.sparql.engine.binding.BindingComparator ;
+import com.hp.hpl.jena.sparql.util.Symbol ;
 
 /** 
- * Sort a query iterator.  Uses an in-memory sort, so limiting the size of
- * iterators that can be handled.
+ * Sort a query iterator.  The sort will happen in-memory unless the size of the
+ * iterator exceeds a configurable threshold. In that case, a disk sort is used.
+ * 
+ * @see SortedDataBag
  */
-// See JENA-44
 
 public class QueryIterSort extends QueryIterPlainWrapper
 {
+    public static final Symbol spillOnDiskSortingThreshold = ARQConstants.allocSymbol("spillOnDiskSortingThreshold") ;
+    private static final long defaultSpillOnDiskSortingThreshold = -1 ; // off by default
+    
 	private final QueryIterator embeddedIterator;      // Keep a record of the underlying source for .cancel.
+	final SortedDataBag<Binding> db;
 	
     public QueryIterSort(QueryIterator qIter, List<SortCondition> conditions, ExecutionContext context)
     {
         this(qIter, new BindingComparator(conditions, context), context) ;
     }
 
-    public QueryIterSort(QueryIterator qIter, Comparator<Binding> comparator, ExecutionContext context)
+    public QueryIterSort(final QueryIterator qIter, final Comparator<Binding> comparator, final ExecutionContext context)
     {
         super(null, context) ;
-        this.embeddedIterator = qIter;
-        this.setIterator(sort(qIter, comparator));
+        this.embeddedIterator = qIter ;
+        
+        long threshold = (Long)context.getContext().get(spillOnDiskSortingThreshold, defaultSpillOnDiskSortingThreshold) ;
+        ThresholdPolicy<Binding> policy = (threshold >= 0) ? new ThresholdPolicyCount<Binding>(threshold) : new ThresholdPolicyNever<Binding>() ;
+        this.db = BagFactory.newSortedBag(policy, SerializationFactoryFinder.bindingSerializationFactory(), comparator);
+        
+        this.setIterator(new SortedBindingIterator(qIter));
     }
 
     @Override
     public void requestCancel()
     {
-        this.embeddedIterator.cancel();
+        this.embeddedIterator.cancel() ;
         super.requestCancel() ;
     }
 
-    private Iterator<Binding> sort(final QueryIterator qIter, final Comparator<Binding> comparator)
+    private class SortedBindingIterator extends IteratorDelayedInitialization<Binding> implements Closeable
     {
-        return new IteratorDelayedInitialization<Binding>() {
-            @Override
-            protected Iterator<Binding> initializeIterator()
+        private final QueryIterator qIter;
+        
+        public SortedBindingIterator(final QueryIterator qIter)
+        {
+            this.qIter = qIter;
+        }
+        
+        @Override
+        protected Iterator<Binding> initializeIterator()
+        {
+            try
             {
-                // Be careful about duplicates.
-                // Used to use a TreeSet but, well, that's a set.
-                List<Binding> x = new ArrayList<Binding>() ;
-                for ( ; qIter.hasNext() ; )
-                {
-                    Binding b = qIter.next() ;
-                    x.add(b) ;
-                }
-                Binding[] y = x.toArray(new Binding[]{}) ;
-                x = null ;      // Drop the List now - might be big.  Unlikely to really make a real difference.  But we can try.
-                Arrays.sort(y, comparator) ;
-                x = Arrays.asList(y) ;
-                return x.iterator() ;
-        	}
-		};
+                db.addAll(qIter);
+            }
+            // Should we catch other exceptions too?  Theoretically the user should be using this
+            // iterator in a try/finally block, and thus will call close() themselves. 
+            catch (QueryCancelledException e)
+            {
+                close();
+                throw e;
+            }
+            
+            return db.iterator();
+        }
+
+        //@Override
+        public void close()
+        {
+            db.close();
+        }
     }
+    
 }
 
 /*

Modified: incubator/jena/Jena2/ARQ/trunk/src/org/openjena/atlas/data/BagFactory.java
URL: http://svn.apache.org/viewvc/incubator/jena/Jena2/ARQ/trunk/src/org/openjena/atlas/data/BagFactory.java?rev=1167265&r1=1167264&r2=1167265&view=diff
==============================================================================
--- incubator/jena/Jena2/ARQ/trunk/src/org/openjena/atlas/data/BagFactory.java (original)
+++ incubator/jena/Jena2/ARQ/trunk/src/org/openjena/atlas/data/BagFactory.java Fri Sep  9 16:03:48 2011
@@ -93,6 +93,14 @@ public class BagFactory
     public static <T> SortedDataBag<T> newSortedBag(SerializationFactory<T> serializerFactory, Comparator<T> comparator)
     {
         ThresholdPolicy<T> policy = newDefaultPolicy();
+        return newSortedBag(policy, serializerFactory, comparator);
+    }
+    
+    /**
+     * Get a sorted data bag.
+     */
+    public static <T> SortedDataBag<T> newSortedBag(ThresholdPolicy<T> policy, SerializationFactory<T> serializerFactory, Comparator<T> comparator)
+    {
         return new SortedDataBag<T>(policy, serializerFactory, comparator);
     }
     
@@ -110,6 +118,15 @@ public class BagFactory
     public static <T> DistinctDataBag<T> newDistinctBag(SerializationFactory<T> serializerFactory, Comparator<T> comparator)
     {
         ThresholdPolicy<T> policy = newDefaultPolicy();
+        return newDistinctBag(policy, serializerFactory, comparator);
+    }
+
+    /**
+     * Get a distinct data bag.
+     */
+    public static <T> DistinctDataBag<T> newDistinctBag(ThresholdPolicy<T> policy, SerializationFactory<T> serializerFactory, Comparator<T> comparator)
+    {
         return new DistinctDataBag<T>(policy, serializerFactory, comparator);
     }
+
 }

Added: incubator/jena/Jena2/ARQ/trunk/src/org/openjena/atlas/data/ThresholdPolicyNever.java
URL: http://svn.apache.org/viewvc/incubator/jena/Jena2/ARQ/trunk/src/org/openjena/atlas/data/ThresholdPolicyNever.java?rev=1167265&view=auto
==============================================================================
--- incubator/jena/Jena2/ARQ/trunk/src/org/openjena/atlas/data/ThresholdPolicyNever.java (added)
+++ incubator/jena/Jena2/ARQ/trunk/src/org/openjena/atlas/data/ThresholdPolicyNever.java Fri Sep  9 16:03:48 2011
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.openjena.atlas.data;
+
+/**
+ * A threshold policy that is never exceeded.
+ */
+public class ThresholdPolicyNever<T> implements ThresholdPolicy<T>
+{
+    public ThresholdPolicyNever()
+    {
+        // Do nothing
+    }
+
+    /* (non-Javadoc)
+     * @see org.openjena.atlas.io.ThresholdPolicy#increment(java.lang.Object)
+     */
+    public void increment(T item)
+    {
+        // Do nothing
+    }
+
+    /* (non-Javadoc)
+     * @see org.openjena.atlas.io.ThresholdPolicy#isThresholdExceeded()
+     */
+    public boolean isThresholdExceeded()
+    {
+        return false ;
+    }
+
+    /*
+     * (non-Javadoc)
+     * @see org.openjena.atlas.data.ThresholdPolicy#reset()
+     */
+    public void reset()
+    {
+        // Do nothing
+    }
+}

Propchange: incubator/jena/Jena2/ARQ/trunk/src/org/openjena/atlas/data/ThresholdPolicyNever.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain