You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by sa...@apache.org on 2011/11/24 01:13:12 UTC

svn commit: r1205673 - in /incubator/jena/Jena2/ARQ/trunk/src/main/java: com/hp/hpl/jena/sparql/engine/iterator/QueryIterDistinct.java org/openjena/atlas/data/BagFactory.java org/openjena/atlas/data/DistinctDataNet.java

Author: sallen
Date: Thu Nov 24 00:13:11 2011
New Revision: 1205673

URL: http://svn.apache.org/viewvc?rev=1205673&view=rev
Log:
JENA-119 (Eliminate memory bounds during query execution) - made QueryIterDistinct non-memory bound.

Modified:
    incubator/jena/Jena2/ARQ/trunk/src/main/java/com/hp/hpl/jena/sparql/engine/iterator/QueryIterDistinct.java
    incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/BagFactory.java
    incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/DistinctDataNet.java

Modified: incubator/jena/Jena2/ARQ/trunk/src/main/java/com/hp/hpl/jena/sparql/engine/iterator/QueryIterDistinct.java
URL: http://svn.apache.org/viewvc/incubator/jena/Jena2/ARQ/trunk/src/main/java/com/hp/hpl/jena/sparql/engine/iterator/QueryIterDistinct.java?rev=1205673&r1=1205672&r2=1205673&view=diff
==============================================================================
--- incubator/jena/Jena2/ARQ/trunk/src/main/java/com/hp/hpl/jena/sparql/engine/iterator/QueryIterDistinct.java (original)
+++ incubator/jena/Jena2/ARQ/trunk/src/main/java/com/hp/hpl/jena/sparql/engine/iterator/QueryIterDistinct.java Thu Nov 24 00:13:11 2011
@@ -16,44 +16,125 @@
  * limitations under the License.
  */
 
-package com.hp.hpl.jena.sparql.engine.iterator;
+package com.hp.hpl.jena.sparql.engine.iterator ;
 
-import java.util.HashSet ;
-import java.util.Set ;
+import java.util.ArrayList ;
+import java.util.Comparator ;
+import java.util.Iterator ;
+import java.util.NoSuchElementException ;
+
+import org.openjena.atlas.data.BagFactory ;
+import org.openjena.atlas.data.DistinctDataNet ;
+import org.openjena.atlas.data.ThresholdPolicy ;
+import org.openjena.atlas.data.ThresholdPolicyFactory ;
+import org.openjena.atlas.iterator.Iter ;
+import org.openjena.riot.SerializationFactoryFinder ;
 
+import com.hp.hpl.jena.query.SortCondition ;
 import com.hp.hpl.jena.sparql.engine.ExecutionContext ;
 import com.hp.hpl.jena.sparql.engine.QueryIterator ;
 import com.hp.hpl.jena.sparql.engine.binding.Binding ;
+import com.hp.hpl.jena.sparql.engine.binding.BindingComparator ;
+import com.hp.hpl.jena.util.iterator.NiceIterator ;
 
-/** A QueryIterator that surpresses items already seen. 
- * Like com.hp.hpl.jena.util.iterators.UniqueExtendedIterator
- * except this one works on QueryIterators (and hence ClosableIterators) */
-
-public class QueryIterDistinct extends QueryIterDistinctReduced
+/**
+ * A QueryIterator that suppresses items already seen. This will stream results
+ * until the spill to disk threshold is passed. At that point, it will not
+ * return any results until the input iterator has been exhausted.
+ * 
+ * @see DistinctDataNet
+ */
+public class QueryIterDistinct extends QueryIter
 {
-    private Set<Binding> seen = new HashSet<Binding>() ;
+    private final QueryIterator inputIterator ;
+    final DistinctDataNet<Binding> db ;
+
+    boolean initialized = false ;
+    boolean finished = false ;
+    Binding slot ;
+    Iterator<Binding> dbIter ;
+
+    public QueryIterDistinct(QueryIterator qIter, ExecutionContext context)
+    {
+        super(context) ;
+        this.inputIterator = qIter ;
+
+        ThresholdPolicy<Binding> policy = ThresholdPolicyFactory.policyFromContext(context.getContext()) ;
+        Comparator<Binding> comparator = new BindingComparator(new ArrayList<SortCondition>(), context) ;
+        this.db = BagFactory.newDistinctNet(policy, SerializationFactoryFinder.bindingSerializationFactory(), comparator) ;
+    }
+
+    @Override
+    public void requestCancel()
+    {
+        inputIterator.cancel() ;
+    }
     
-    public QueryIterDistinct(QueryIterator iter, ExecutionContext context)
+    private void init()
+    {
+        if ( !initialized )
+        {
+            fill() ;
+            initialized = true ;
+        }
+    }
+
+    private void fill()
     {
-        super(iter, context)  ;
+        while ( inputIterator.hasNext() )
+        {
+            slot = inputIterator.next() ;
+            if ( db.netAdd(slot) )
+            {
+                return ;
+            }
+        }
+        if ( null == dbIter )
+        {
+            dbIter = db.netIterator() ;
+        }
+        while ( dbIter.hasNext() )
+        {
+            slot = dbIter.next() ;
+            return ;
+        }
+        close() ;
     }
 
     @Override
-    protected void closeSubIterator()
+    protected boolean hasNextBinding()
     {
-        seen = null ;
-        super.closeSubIterator() ;
+        init() ;
+        return !finished ;
     }
 
     @Override
-    protected boolean isDuplicate(Binding binding)
+    protected Binding moveToNextBinding()
     {
-        return seen.contains(binding) ;
+        if ( finished )
+            throw new NoSuchElementException() ;
+        init() ;
+        Binding toReturn = slot ;
+        fill() ;
+        return toReturn ;
     }
 
     @Override
-    protected void remember(Binding binding)
+    protected void closeIterator()
     {
-        seen.add(binding) ;
+        if ( inputIterator != null )
+        {
+            NiceIterator.close(inputIterator) ;
+            // In case we wrapped, for example, another QueryIterator.
+            Iter.close(inputIterator) ;
+        }
+        if ( dbIter != null )
+        {
+            Iter.close(dbIter) ;
+        }
+        finished = true ;
+        slot = null ;
+        dbIter = null ;
+        db.close() ;
     }
 }

Modified: incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/BagFactory.java
URL: http://svn.apache.org/viewvc/incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/BagFactory.java?rev=1205673&r1=1205672&r2=1205673&view=diff
==============================================================================
--- incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/BagFactory.java (original)
+++ incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/BagFactory.java Thu Nov 24 00:13:11 2011
@@ -65,4 +65,20 @@ public class BagFactory
     {
         return new DistinctDataBag<T>(policy, serializerFactory, comparator);
     }
+    
+    /**
+     * Get a distinct data net.
+     */
+    public static <T extends Comparable<? super T>> DistinctDataNet<T> newDistinctNet(ThresholdPolicy<T> policy, SerializationFactory<T> serializerFactory)
+    {
+        return newDistinctNet(policy, serializerFactory, null);
+    }
+
+    /**
+     * Get a distinct data net.
+     */
+    public static <T> DistinctDataNet<T> newDistinctNet(ThresholdPolicy<T> policy, SerializationFactory<T> serializerFactory, Comparator<T> comparator)
+    {
+        return new DistinctDataNet<T>(policy, serializerFactory, comparator);
+    }
 }

Modified: incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/DistinctDataNet.java
URL: http://svn.apache.org/viewvc/incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/DistinctDataNet.java?rev=1205673&r1=1205672&r2=1205673&view=diff
==============================================================================
--- incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/DistinctDataNet.java (original)
+++ incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/DistinctDataNet.java Thu Nov 24 00:13:11 2011
@@ -77,8 +77,11 @@ public class DistinctDataNet<E> extends 
     protected void deleteSpillFiles()
     {
         super.deleteSpillFiles();
-        FileOps.delete(firstSpillFile, false);
-        firstSpillFile = null;
+        if (null != firstSpillFile)
+        {
+            FileOps.delete(firstSpillFile, false);
+            firstSpillFile = null;
+        }
     }
     
     // Used by the .iterator() method