You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by sa...@apache.org on 2011/11/24 01:13:12 UTC
svn commit: r1205673 - in /incubator/jena/Jena2/ARQ/trunk/src/main/java:
com/hp/hpl/jena/sparql/engine/iterator/QueryIterDistinct.java
org/openjena/atlas/data/BagFactory.java
org/openjena/atlas/data/DistinctDataNet.java
Author: sallen
Date: Thu Nov 24 00:13:11 2011
New Revision: 1205673
URL: http://svn.apache.org/viewvc?rev=1205673&view=rev
Log:
JENA-119 (Eliminate memory bounds during query execution) - made QueryIterDistinct non-memory bound.
Modified:
incubator/jena/Jena2/ARQ/trunk/src/main/java/com/hp/hpl/jena/sparql/engine/iterator/QueryIterDistinct.java
incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/BagFactory.java
incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/DistinctDataNet.java
Modified: incubator/jena/Jena2/ARQ/trunk/src/main/java/com/hp/hpl/jena/sparql/engine/iterator/QueryIterDistinct.java
URL: http://svn.apache.org/viewvc/incubator/jena/Jena2/ARQ/trunk/src/main/java/com/hp/hpl/jena/sparql/engine/iterator/QueryIterDistinct.java?rev=1205673&r1=1205672&r2=1205673&view=diff
==============================================================================
--- incubator/jena/Jena2/ARQ/trunk/src/main/java/com/hp/hpl/jena/sparql/engine/iterator/QueryIterDistinct.java (original)
+++ incubator/jena/Jena2/ARQ/trunk/src/main/java/com/hp/hpl/jena/sparql/engine/iterator/QueryIterDistinct.java Thu Nov 24 00:13:11 2011
@@ -16,44 +16,125 @@
* limitations under the License.
*/
-package com.hp.hpl.jena.sparql.engine.iterator;
+package com.hp.hpl.jena.sparql.engine.iterator ;
-import java.util.HashSet ;
-import java.util.Set ;
+import java.util.ArrayList ;
+import java.util.Comparator ;
+import java.util.Iterator ;
+import java.util.NoSuchElementException ;
+
+import org.openjena.atlas.data.BagFactory ;
+import org.openjena.atlas.data.DistinctDataNet ;
+import org.openjena.atlas.data.ThresholdPolicy ;
+import org.openjena.atlas.data.ThresholdPolicyFactory ;
+import org.openjena.atlas.iterator.Iter ;
+import org.openjena.riot.SerializationFactoryFinder ;
+import com.hp.hpl.jena.query.SortCondition ;
import com.hp.hpl.jena.sparql.engine.ExecutionContext ;
import com.hp.hpl.jena.sparql.engine.QueryIterator ;
import com.hp.hpl.jena.sparql.engine.binding.Binding ;
+import com.hp.hpl.jena.sparql.engine.binding.BindingComparator ;
+import com.hp.hpl.jena.util.iterator.NiceIterator ;
-/** A QueryIterator that surpresses items already seen.
- * Like com.hp.hpl.jena.util.iterators.UniqueExtendedIterator
- * except this one works on QueryIterators (and hence ClosableIterators) */
-
-public class QueryIterDistinct extends QueryIterDistinctReduced
+/**
+ * A QueryIterator that suppresses items already seen. This will stream results
+ * until the spill to disk threshold is passed. At that point, it will not
+ * return any results until the input iterator has been exhausted.
+ *
+ * @see DistinctDataNet
+ */
+public class QueryIterDistinct extends QueryIter
{
- private Set<Binding> seen = new HashSet<Binding>() ;
+ private final QueryIterator inputIterator ;
+ final DistinctDataNet<Binding> db ;
+
+ boolean initialized = false ;
+ boolean finished = false ;
+ Binding slot ;
+ Iterator<Binding> dbIter ;
+
+ public QueryIterDistinct(QueryIterator qIter, ExecutionContext context)
+ {
+ super(context) ;
+ this.inputIterator = qIter ;
+
+ ThresholdPolicy<Binding> policy = ThresholdPolicyFactory.policyFromContext(context.getContext()) ;
+ Comparator<Binding> comparator = new BindingComparator(new ArrayList<SortCondition>(), context) ;
+ this.db = BagFactory.newDistinctNet(policy, SerializationFactoryFinder.bindingSerializationFactory(), comparator) ;
+ }
+
+ @Override
+ public void requestCancel()
+ {
+ inputIterator.cancel() ;
+ }
- public QueryIterDistinct(QueryIterator iter, ExecutionContext context)
+ private void init()
+ {
+ if ( !initialized )
+ {
+ fill() ;
+ initialized = true ;
+ }
+ }
+
+ private void fill()
{
- super(iter, context) ;
+ while ( inputIterator.hasNext() )
+ {
+ slot = inputIterator.next() ;
+ if ( db.netAdd(slot) )
+ {
+ return ;
+ }
+ }
+ if ( null == dbIter )
+ {
+ dbIter = db.netIterator() ;
+ }
+ while ( dbIter.hasNext() )
+ {
+ slot = dbIter.next() ;
+ return ;
+ }
+ close() ;
}
@Override
- protected void closeSubIterator()
+ protected boolean hasNextBinding()
{
- seen = null ;
- super.closeSubIterator() ;
+ init() ;
+ return !finished ;
}
@Override
- protected boolean isDuplicate(Binding binding)
+ protected Binding moveToNextBinding()
{
- return seen.contains(binding) ;
+ if ( finished )
+ throw new NoSuchElementException() ;
+ init() ;
+ Binding toReturn = slot ;
+ fill() ;
+ return toReturn ;
}
@Override
- protected void remember(Binding binding)
+ protected void closeIterator()
{
- seen.add(binding) ;
+ if ( inputIterator != null )
+ {
+ NiceIterator.close(inputIterator) ;
+ // In case we wrapped, for example, another QueryIterator.
+ Iter.close(inputIterator) ;
+ }
+ if ( dbIter != null )
+ {
+ Iter.close(dbIter) ;
+ }
+ finished = true ;
+ slot = null ;
+ dbIter = null ;
+ db.close() ;
}
}
Modified: incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/BagFactory.java
URL: http://svn.apache.org/viewvc/incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/BagFactory.java?rev=1205673&r1=1205672&r2=1205673&view=diff
==============================================================================
--- incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/BagFactory.java (original)
+++ incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/BagFactory.java Thu Nov 24 00:13:11 2011
@@ -65,4 +65,20 @@ public class BagFactory
{
return new DistinctDataBag<T>(policy, serializerFactory, comparator);
}
+
+ /**
+ * Get a distinct data net.
+ */
+ public static <T extends Comparable<? super T>> DistinctDataNet<T> newDistinctNet(ThresholdPolicy<T> policy, SerializationFactory<T> serializerFactory)
+ {
+ return newDistinctNet(policy, serializerFactory, null);
+ }
+
+ /**
+ * Get a distinct data net.
+ */
+ public static <T> DistinctDataNet<T> newDistinctNet(ThresholdPolicy<T> policy, SerializationFactory<T> serializerFactory, Comparator<T> comparator)
+ {
+ return new DistinctDataNet<T>(policy, serializerFactory, comparator);
+ }
}
Modified: incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/DistinctDataNet.java
URL: http://svn.apache.org/viewvc/incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/DistinctDataNet.java?rev=1205673&r1=1205672&r2=1205673&view=diff
==============================================================================
--- incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/DistinctDataNet.java (original)
+++ incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/DistinctDataNet.java Thu Nov 24 00:13:11 2011
@@ -77,8 +77,11 @@ public class DistinctDataNet<E> extends
protected void deleteSpillFiles()
{
super.deleteSpillFiles();
- FileOps.delete(firstSpillFile, false);
- firstSpillFile = null;
+ if (null != firstSpillFile)
+ {
+ FileOps.delete(firstSpillFile, false);
+ firstSpillFile = null;
+ }
}
// Used by the .iterator() method