You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by an...@apache.org on 2016/07/27 12:06:38 UTC
[09/11] jena git commit: Revised layout -- spaces not tabs, indent 4,
opening brace on same line.
Revised layout -- spaces not tabs, indent 4, opening brace on same line.
Project: http://git-wip-us.apache.org/repos/asf/jena/repo
Commit: http://git-wip-us.apache.org/repos/asf/jena/commit/cdc69f8e
Tree: http://git-wip-us.apache.org/repos/asf/jena/tree/cdc69f8e
Diff: http://git-wip-us.apache.org/repos/asf/jena/diff/cdc69f8e
Branch: refs/heads/master
Commit: cdc69f8ef28541a9d501d175a8b778f23c302907
Parents: cc76ba7
Author: Chris Dollin <eh...@googlemail.com>
Authored: Mon Jul 25 10:39:09 2016 +0100
Committer: Chris Dollin <eh...@googlemail.com>
Committed: Mon Jul 25 10:39:09 2016 +0100
----------------------------------------------------------------------
.../jena/atlas/data/AbortableComparator.java | 99 +++--
.../apache/jena/atlas/data/SortedDataBag.java | 421 ++++++++-----------
.../sparql/engine/iterator/QueryIterSort.java | 125 +++---
.../iterator/TestSortedDataBagCancellation.java | 362 ++++++++--------
4 files changed, 466 insertions(+), 541 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/jena/blob/cdc69f8e/jena-arq/src/main/java/org/apache/jena/atlas/data/AbortableComparator.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/main/java/org/apache/jena/atlas/data/AbortableComparator.java b/jena-arq/src/main/java/org/apache/jena/atlas/data/AbortableComparator.java
index f4286cf..124c6be 100644
--- a/jena-arq/src/main/java/org/apache/jena/atlas/data/AbortableComparator.java
+++ b/jena-arq/src/main/java/org/apache/jena/atlas/data/AbortableComparator.java
@@ -20,57 +20,52 @@ package org.apache.jena.atlas.data;
import java.util.Arrays;
import java.util.Comparator;
-public final class AbortableComparator<E> implements Comparator<E>
-{
- public AbortableComparator(Comparator<? super E> comparator)
- {
- this.baseComparator = comparator;
- }
-
- /**
- <code>AbandonSort</code> is the exception thrown from
- <code>AbortableComparator</code> to abandon a sort.
- */
- public static class AbandonSort extends RuntimeException
- {
- private static final long serialVersionUID = 1L;
- }
-
- public static enum Finish {COMPLETED, ABORTED}
-
- protected volatile boolean cancelled;
-
- final Comparator<? super E> baseComparator;
+public final class AbortableComparator<E> implements Comparator<E> {
+ public AbortableComparator(Comparator<? super E> comparator) {
+ this.baseComparator = comparator;
+ }
- @Override public int compare(E o1, E o2)
- {
- if (cancelled) throw new AbandonSort();
- return baseComparator.compare(o1, o2);
- }
-
- /**
- Sort the array <code>e</code> using this comparator
- with the additional ability to abort the sort.
- */
- public Finish abortableSort(E[] e)
- {
- try
- {
- Arrays.sort(e, this);
- }
- catch (AbandonSort s)
- {
- return Finish.ABORTED;
- }
- return Finish.COMPLETED;
- }
-
- /**
- Arrange that the next on-frequency cancellation test
- in compare will succeed, aborting the sort.
- */
- public void cancel()
- {
- cancelled = true;
- }
+ /**
+ * <code>AbandonSort</code> is the exception thrown from
+ * <code>AbortableComparator</code> to abandon a sort.
+ */
+ public static class AbandonSort extends RuntimeException {
+ private static final long serialVersionUID = 1L;
+ }
+
+ public static enum Finish {
+ COMPLETED, ABORTED
+ }
+
+ protected volatile boolean cancelled;
+
+ final Comparator<? super E> baseComparator;
+
+ @Override
+ public int compare(E o1, E o2) {
+ if (cancelled)
+ throw new AbandonSort();
+ return baseComparator.compare(o1, o2);
+ }
+
+ /**
+ * Sort the array <code>e</code> using this comparator with the additional
+ * ability to abort the sort.
+ */
+ public Finish abortableSort(E[] e) {
+ try {
+ Arrays.sort(e, this);
+ } catch (AbandonSort s) {
+ return Finish.ABORTED;
+ }
+ return Finish.COMPLETED;
+ }
+
+ /**
+ * Arrange that the next on-frequency cancellation test in compare will
+ * succeed, aborting the sort.
+ */
+ public void cancel() {
+ cancelled = true;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/jena/blob/cdc69f8e/jena-arq/src/main/java/org/apache/jena/atlas/data/SortedDataBag.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/main/java/org/apache/jena/atlas/data/SortedDataBag.java b/jena-arq/src/main/java/org/apache/jena/atlas/data/SortedDataBag.java
index b7c74ae..26ba388 100644
--- a/jena-arq/src/main/java/org/apache/jena/atlas/data/SortedDataBag.java
+++ b/jena-arq/src/main/java/org/apache/jena/atlas/data/SortedDataBag.java
@@ -18,163 +18,153 @@
package org.apache.jena.atlas.data;
-import java.io.File ;
-import java.io.FileNotFoundException ;
-import java.io.IOException ;
-import java.io.InputStream ;
-import java.io.OutputStream ;
-import java.util.ArrayList ;
-import java.util.Arrays ;
-import java.util.Comparator ;
-import java.util.Iterator ;
-import java.util.List ;
-import java.util.NoSuchElementException ;
-import java.util.PriorityQueue ;
-
-import org.apache.jena.atlas.AtlasException ;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.PriorityQueue;
+
+import org.apache.jena.atlas.AtlasException;
import org.apache.jena.atlas.data.AbortableComparator.Finish;
-import org.apache.jena.atlas.iterator.Iter ;
-import org.apache.jena.atlas.iterator.IteratorResourceClosing ;
-import org.apache.jena.atlas.lib.Closeable ;
-import org.apache.jena.atlas.lib.Sink ;
+import org.apache.jena.atlas.iterator.Iter;
+import org.apache.jena.atlas.iterator.IteratorResourceClosing;
+import org.apache.jena.atlas.lib.Closeable;
+import org.apache.jena.atlas.lib.Sink;
/**
* <p>
- * This data bag will gather items in memory until a size threshold is passed, at which point it will write
- * out all of the items to disk using the supplied serializer.
+ * This data bag will gather items in memory until a size threshold is passed,
+ * at which point it will write out all of the items to disk using the supplied
+ * serializer.
* </p>
* <p>
- * After adding is finished, call {@link #iterator()} to set up the data bag for reading back items and iterating over them.
- * The iterator will retrieve the items in sorted order using the supplied comparator.
+ * After adding is finished, call {@link #iterator()} to set up the data bag for
+ * reading back items and iterating over them. The iterator will retrieve the
+ * items in sorted order using the supplied comparator.
* </p>
* <p>
- * IMPORTANT: You may not add any more items after this call. You may subsequently call {@link #iterator()} multiple
- * times which will give you a new iterator for each invocation. If you do not consume the entire iterator, you should
- * call {@link Iter#close(Iterator)} to close any FileInputStreams associated with the iterator.
+ * IMPORTANT: You may not add any more items after this call. You may
+ * subsequently call {@link #iterator()} multiple times which will give you a
+ * new iterator for each invocation. If you do not consume the entire iterator,
+ * you should call {@link Iter#close(Iterator)} to close any FileInputStreams
+ * associated with the iterator.
* </p>
* <p>
- * Additionally, make sure to call {@link #close()} when you are finished to free any system resources (preferably in a finally block).
+ * Additionally, make sure to call {@link #close()} when you are finished to
+ * free any system resources (preferably in a finally block).
* </p>
* <p>
- * Implementation Notes: Data is stored in an ArrayList as it comes in. When it is time to spill, that
- * data is sorted and written to disk. An iterator will read in each file and perform a merge-sort as
- * the results are returned.
+ * Implementation Notes: Data is stored in an ArrayList as it comes in. When it
+ * is time to spill, that data is sorted and written to disk. An iterator will
+ * read in each file and perform a merge-sort as the results are returned.
* </p>
*/
-public class SortedDataBag<E> extends AbstractDataBag<E>
-{
+public class SortedDataBag<E> extends AbstractDataBag<E> {
/**
- * The the maximum number of files to merge at the same time. Without this, you can run out of file handles and other bad things.
+ * The the maximum number of files to merge at the same time. Without this,
+ * you can run out of file handles and other bad things.
*/
- protected static int MAX_SPILL_FILES = 100 ;
-
+ protected static int MAX_SPILL_FILES = 100;
+
protected final ThresholdPolicy<E> policy;
protected final SerializationFactory<E> serializationFactory;
protected final AbortableComparator<E> comparator;
-
+
protected boolean finishedAdding = false;
protected boolean spilled = false;
protected boolean closed = false;
-
- public SortedDataBag(ThresholdPolicy<E> policy, SerializationFactory<E> serializerFactory, Comparator<? super E> comparator)
- {
+
+ public SortedDataBag(ThresholdPolicy<E> policy, SerializationFactory<E> serializerFactory,
+ Comparator<? super E> comparator) {
this.policy = policy;
this.serializationFactory = serializerFactory;
this.comparator = new AbortableComparator<E>(comparator);
}
-
+
/**
- cancel arranges that further comparisons using the supplied
- comparator will abandon the sort in progress.
+ * cancel arranges that further comparisons using the supplied comparator
+ * will abandon the sort in progress.
*/
- public void cancel()
- {
- comparator.cancel();
- }
-
- /**
- isCancelled is true iff cancel has been called on this
- bags comparator. (Used in testing.)
- */
- public boolean isCancelled()
- {
- return comparator.cancelled;
- }
-
- protected void checkClosed()
- {
- if (closed) throw new AtlasException("SortedDataBag is closed, no operations can be performed on it.") ;
+ public void cancel() {
+ comparator.cancel();
}
-
+
+ /**
+ * isCancelled is true iff cancel has been called on this bags comparator.
+ * (Used in testing.)
+ */
+ public boolean isCancelled() {
+ return comparator.cancelled;
+ }
+
+ protected void checkClosed() {
+ if (closed)
+ throw new AtlasException("SortedDataBag is closed, no operations can be performed on it.");
+ }
+
@Override
- public boolean isSorted()
- {
+ public boolean isSorted() {
return true;
}
@Override
- public boolean isDistinct()
- {
+ public boolean isDistinct() {
return false;
}
@Override
- public void add(E item)
- {
+ public void add(E item) {
checkClosed();
if (finishedAdding)
throw new AtlasException("SortedDataBag: Cannot add any more items after the writing phase is complete.");
-
- if (policy.isThresholdExceeded())
- {
+
+ if (policy.isThresholdExceeded()) {
spill();
}
-
- if (memory.add(item))
- {
+
+ if (memory.add(item)) {
policy.increment(item);
size++;
}
}
-
+
@SuppressWarnings({ "unchecked" })
- protected void spill()
- {
+ protected void spill() {
// Make sure we have something to spill.
- if (memory.size() > 0)
- {
+ if (memory.size() > 0) {
OutputStream out;
- try
- {
+ try {
out = getSpillStream();
- }
- catch (IOException e)
- {
+ } catch (IOException e) {
throw new AtlasException(e);
}
-
- // Sort the tuples as an array. The CanAbortComparator will sort that
+
+ // Sort the tuples as an array. The CanAbortComparator will sort
+ // that
// array using Arrays.sort. The cast to E[] is safe. If the sort is
- // aborted, don't bother messing around with the serialisation. We'll
+ // aborted, don't bother messing around with the serialisation.
+ // We'll
// never get around to using it anyway.
-
+
E[] array = (E[]) memory.toArray();
- if (comparator.abortableSort(array) == Finish.COMPLETED)
- {
- Sink<E> serializer = serializationFactory.createSerializer(out);
- try
- {
- for (Object tuple : array)
- {
- serializer.send((E)tuple);
- }
- }
- finally
- {
- serializer.close();
- }
+ if (comparator.abortableSort(array) == Finish.COMPLETED) {
+ Sink<E> serializer = serializationFactory.createSerializer(out);
+ try {
+ for (Object tuple : array) {
+ serializer.send((E) tuple);
+ }
+ } finally {
+ serializer.close();
+ }
}
-
+
spilled = true;
policy.reset();
memory.clear();
@@ -182,272 +172,225 @@ public class SortedDataBag<E> extends AbstractDataBag<E>
}
@Override
- public void flush()
- {
+ public void flush() {
spill();
}
-
- protected Iterator<E> getInputIterator(File spillFile) throws FileNotFoundException
- {
+
+ protected Iterator<E> getInputIterator(File spillFile) throws FileNotFoundException {
InputStream in = getInputStream(spillFile);
- Iterator<E> deserializer = serializationFactory.createDeserializer(in) ;
+ Iterator<E> deserializer = serializationFactory.createDeserializer(in);
return new IteratorResourceClosing<>(deserializer, in);
}
/**
- * Returns an iterator over a set of elements of type E. If you do not exhaust
- * the iterator, you should call {@link org.apache.jena.atlas.iterator.Iter#close(Iterator)}
- * to be sure any open file handles are closed.
+ * Returns an iterator over a set of elements of type E. If you do not
+ * exhaust the iterator, you should call
+ * {@link org.apache.jena.atlas.iterator.Iter#close(Iterator)} to be sure
+ * any open file handles are closed.
*
* @return an Iterator
*/
@Override
- public Iterator<E> iterator()
- {
+ public Iterator<E> iterator() {
preMerge();
return iterator(getSpillFiles().size());
}
@SuppressWarnings({ "unchecked" })
- private Iterator<E> iterator(int size)
- {
+ private Iterator<E> iterator(int size) {
checkClosed();
-
+
int memSize = memory.size();
-
- // Constructing an iterator from this class is not thread-safe (just like all the the other methods)
- if (!finishedAdding && memSize > 1)
- {
+
+ // Constructing an iterator from this class is not thread-safe (just
+ // like all the the other methods)
+ if (!finishedAdding && memSize > 1) {
E[] array = (E[]) memory.toArray();
comparator.abortableSort(array); // don't care if we aborted or not
memory = Arrays.asList(array);
}
-
+
finishedAdding = true;
-
- if (spilled)
- {
+
+ if (spilled) {
List<Iterator<E>> inputs = new ArrayList<>(size + (memSize > 0 ? 1 : 0));
-
- if (memSize > 0)
- {
+
+ if (memSize > 0) {
inputs.add(memory.iterator());
}
-
- for ( int i = 0; i < size; i++ )
- {
+
+ for (int i = 0; i < size; i++) {
File spillFile = getSpillFiles().get(i);
- try
- {
+ try {
Iterator<E> irc = getInputIterator(spillFile);
inputs.add(irc);
- }
- catch (FileNotFoundException e)
- {
+ } catch (FileNotFoundException e) {
// Close any open streams before we throw an exception
- for (Iterator<E> it : inputs)
- {
+ for (Iterator<E> it : inputs) {
Iter.close(it);
}
-
+
throw new AtlasException("Cannot find one of the spill files", e);
}
}
-
+
SpillSortIterator<E> ssi = new SpillSortIterator<>(inputs, comparator);
registerCloseableIterator(ssi);
-
+
return ssi;
- }
- else
- {
- if (memSize > 0)
- {
+ } else {
+ if (memSize > 0) {
return memory.iterator();
- }
- else
- {
+ } else {
return Iter.nullIterator();
}
}
}
- private void preMerge()
- {
- if (getSpillFiles() == null || getSpillFiles().size() <= MAX_SPILL_FILES)
- {
- return ;
+ private void preMerge() {
+ if (getSpillFiles() == null || getSpillFiles().size() <= MAX_SPILL_FILES) {
+ return;
}
- try
- {
- while (getSpillFiles().size() > MAX_SPILL_FILES)
- {
- Sink<E> sink = serializationFactory.createSerializer(getSpillStream()) ;
- Iterator<E> ssi = iterator(MAX_SPILL_FILES) ;
- try
- {
- while (ssi.hasNext())
- {
- sink.send(ssi.next()) ;
+ try {
+ while (getSpillFiles().size() > MAX_SPILL_FILES) {
+ Sink<E> sink = serializationFactory.createSerializer(getSpillStream());
+ Iterator<E> ssi = iterator(MAX_SPILL_FILES);
+ try {
+ while (ssi.hasNext()) {
+ sink.send(ssi.next());
}
- }
- finally
- {
- Iter.close(ssi) ;
- sink.close() ;
+ } finally {
+ Iter.close(ssi);
+ sink.close();
}
- List<File> toRemove = new ArrayList<>(MAX_SPILL_FILES) ;
- for (int i = 0; i < MAX_SPILL_FILES; i++)
- {
- File file = getSpillFiles().get(i) ;
- file.delete() ;
- toRemove.add(file) ;
+ List<File> toRemove = new ArrayList<>(MAX_SPILL_FILES);
+ for (int i = 0; i < MAX_SPILL_FILES; i++) {
+ File file = getSpillFiles().get(i);
+ file.delete();
+ toRemove.add(file);
}
- getSpillFiles().removeAll(toRemove) ;
+ getSpillFiles().removeAll(toRemove);
- memory = new ArrayList<>() ;
+ memory = new ArrayList<>();
}
- }
- catch (IOException e)
- {
- throw new AtlasException(e) ;
+ } catch (IOException e) {
+ throw new AtlasException(e);
}
}
@Override
- public void close()
- {
- if (!closed)
- {
+ public void close() {
+ if (!closed) {
closeIterators();
deleteSpillFiles();
-
+
memory = null;
closed = true;
}
}
-
+
/**
* An iterator that handles getting the next tuple from the bag.
*/
- protected static class SpillSortIterator<T> implements Iterator<T>, Closeable
- {
+ protected static class SpillSortIterator<T> implements Iterator<T>, Closeable {
private final List<Iterator<T>> inputs;
private final Comparator<? super T> comp;
private final PriorityQueue<Item<T>> minHeap;
-
- public SpillSortIterator(List<Iterator<T>> inputs, Comparator<? super T> comp)
- {
+
+ public SpillSortIterator(List<Iterator<T>> inputs, Comparator<? super T> comp) {
this.inputs = inputs;
this.comp = comp;
this.minHeap = new PriorityQueue<>(inputs.size());
-
+
// Prime the heap
- for (int i=0; i<inputs.size(); i++)
- {
+ for (int i = 0; i < inputs.size(); i++) {
replaceItem(i);
}
}
-
- private void replaceItem(int index)
- {
+
+ private void replaceItem(int index) {
Iterator<T> it = inputs.get(index);
- if (it.hasNext())
- {
+ if (it.hasNext()) {
T tuple = it.next();
minHeap.add(new Item<>(index, tuple, comp));
}
}
@Override
- public boolean hasNext()
- {
+ public boolean hasNext() {
return (minHeap.peek() != null);
}
@Override
- public T next()
- {
- if (!hasNext())
- {
+ public T next() {
+ if (!hasNext()) {
throw new NoSuchElementException();
}
-
+
Item<T> curr = minHeap.poll();
// Read replacement item
replaceItem(curr.getIndex());
-
+
return curr.getTuple();
}
@Override
- public void remove()
- {
+ public void remove() {
throw new UnsupportedOperationException("SpillSortIterator.remove");
}
@Override
- public void close()
- {
- for (Iterator<T> it : inputs)
- {
+ public void close() {
+ for (Iterator<T> it : inputs) {
Iter.close(it);
}
}
-
- private final class Item<U> implements Comparable<Item<U>>
- {
+
+ private final class Item<U> implements Comparable<Item<U>> {
private final int index;
private final U tuple;
private final Comparator<? super U> c;
-
- public Item(int index, U tuple, Comparator<? super U> c)
- {
+
+ public Item(int index, U tuple, Comparator<? super U> c) {
this.index = index;
this.tuple = tuple;
this.c = c;
}
-
- public int getIndex()
- {
+
+ public int getIndex() {
return index;
}
-
- public U getTuple()
- {
+
+ public U getTuple() {
return tuple;
}
-
+
@Override
@SuppressWarnings("unchecked")
- public int compareTo(Item<U> o)
- {
- return (null != c) ? c.compare(tuple, o.getTuple()) : ((Comparable<U>)tuple).compareTo(o.getTuple());
+ public int compareTo(Item<U> o) {
+ return (null != c) ? c.compare(tuple, o.getTuple()) : ((Comparable<U>) tuple).compareTo(o.getTuple());
}
-
+
@SuppressWarnings("unchecked")
@Override
- public boolean equals(Object obj)
- {
- if (obj instanceof Item)
- {
- return compareTo((Item<U>)obj) == 0;
+ public boolean equals(Object obj) {
+ if (obj instanceof Item) {
+ return compareTo((Item<U>) obj) == 0;
}
-
+
return false;
}
-
+
@Override
- public int hashCode()
- {
+ public int hashCode() {
return tuple.hashCode();
}
}
-
+
}
}
http://git-wip-us.apache.org/repos/asf/jena/blob/cdc69f8e/jena-arq/src/main/java/org/apache/jena/sparql/engine/iterator/QueryIterSort.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/engine/iterator/QueryIterSort.java b/jena-arq/src/main/java/org/apache/jena/sparql/engine/iterator/QueryIterSort.java
index edaaa5c..3e565db 100644
--- a/jena-arq/src/main/java/org/apache/jena/sparql/engine/iterator/QueryIterSort.java
+++ b/jena-arq/src/main/java/org/apache/jena/sparql/engine/iterator/QueryIterSort.java
@@ -18,23 +18,23 @@
package org.apache.jena.sparql.engine.iterator;
-import java.util.Comparator ;
-import java.util.Iterator ;
-import java.util.List ;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
-import org.apache.jena.atlas.data.BagFactory ;
-import org.apache.jena.atlas.data.SortedDataBag ;
-import org.apache.jena.atlas.data.ThresholdPolicy ;
-import org.apache.jena.atlas.data.ThresholdPolicyFactory ;
-import org.apache.jena.atlas.iterator.IteratorDelayedInitialization ;
-import org.apache.jena.atlas.lib.Closeable ;
-import org.apache.jena.query.QueryCancelledException ;
-import org.apache.jena.query.SortCondition ;
-import org.apache.jena.riot.system.SerializationFactoryFinder ;
-import org.apache.jena.sparql.engine.ExecutionContext ;
-import org.apache.jena.sparql.engine.QueryIterator ;
-import org.apache.jena.sparql.engine.binding.Binding ;
-import org.apache.jena.sparql.engine.binding.BindingComparator ;
+import org.apache.jena.atlas.data.BagFactory;
+import org.apache.jena.atlas.data.SortedDataBag;
+import org.apache.jena.atlas.data.ThresholdPolicy;
+import org.apache.jena.atlas.data.ThresholdPolicyFactory;
+import org.apache.jena.atlas.iterator.IteratorDelayedInitialization;
+import org.apache.jena.atlas.lib.Closeable;
+import org.apache.jena.query.QueryCancelledException;
+import org.apache.jena.query.SortCondition;
+import org.apache.jena.riot.system.SerializationFactoryFinder;
+import org.apache.jena.sparql.engine.ExecutionContext;
+import org.apache.jena.sparql.engine.QueryIterator;
+import org.apache.jena.sparql.engine.binding.Binding;
+import org.apache.jena.sparql.engine.binding.BindingComparator;
/**
* Sort a query iterator. The sort will happen in-memory unless the size of the
@@ -44,61 +44,60 @@ import org.apache.jena.sparql.engine.binding.BindingComparator ;
*/
public class QueryIterSort extends QueryIterPlainWrapper {
- private final QueryIterator embeddedIterator ; // Keep a record of the
- // underlying source for
- // .cancel.
- final SortedDataBag<Binding> db ;
+ private final QueryIterator embeddedIterator; // Keep a record of the
+ // underlying source for
+ // .cancel.
+ final SortedDataBag<Binding> db;
- public QueryIterSort(QueryIterator qIter, List<SortCondition> conditions, ExecutionContext context) {
- this(qIter, new BindingComparator(conditions, context), context) ;
- }
+ public QueryIterSort(QueryIterator qIter, List<SortCondition> conditions, ExecutionContext context) {
+ this(qIter, new BindingComparator(conditions, context), context);
+ }
- public QueryIterSort(final QueryIterator qIter, final Comparator<Binding> comparator,
- final ExecutionContext context) {
- super(null, context) ;
- this.embeddedIterator = qIter ;
+ public QueryIterSort(final QueryIterator qIter, final Comparator<Binding> comparator,
+ final ExecutionContext context) {
+ super(null, context);
+ this.embeddedIterator = qIter;
- ThresholdPolicy<Binding> policy = ThresholdPolicyFactory.policyFromContext(context.getContext()) ;
- this.db = BagFactory.newSortedBag(policy, SerializationFactoryFinder.bindingSerializationFactory(),
- comparator) ;
+ ThresholdPolicy<Binding> policy = ThresholdPolicyFactory.policyFromContext(context.getContext());
+ this.db = BagFactory.newSortedBag(policy, SerializationFactoryFinder.bindingSerializationFactory(), comparator);
- this.setIterator(new SortedBindingIterator(qIter)) ;
- }
+ this.setIterator(new SortedBindingIterator(qIter));
+ }
- @Override
- public void requestCancel() {
- this.db.cancel() ;
- this.embeddedIterator.cancel() ;
- super.requestCancel() ;
- }
+ @Override
+ public void requestCancel() {
+ this.db.cancel();
+ this.embeddedIterator.cancel();
+ super.requestCancel();
+ }
- private class SortedBindingIterator extends IteratorDelayedInitialization<Binding> implements Closeable {
- private final QueryIterator qIter ;
+ private class SortedBindingIterator extends IteratorDelayedInitialization<Binding> implements Closeable {
+ private final QueryIterator qIter;
- public SortedBindingIterator(final QueryIterator qIter) {
- this.qIter = qIter ;
- }
+ public SortedBindingIterator(final QueryIterator qIter) {
+ this.qIter = qIter;
+ }
- @Override
- protected Iterator<Binding> initializeIterator() {
- try {
- db.addAll(qIter) ;
- return db.iterator() ;
- }
- // Should we catch other exceptions too? Theoretically
- // the user should be using this
- // iterator in a try/finally block, and thus will call
- // close() themselves.
- catch (QueryCancelledException e) {
- close() ;
- throw e ;
- }
- }
+ @Override
+ protected Iterator<Binding> initializeIterator() {
+ try {
+ db.addAll(qIter);
+ return db.iterator();
+ }
+ // Should we catch other exceptions too? Theoretically
+ // the user should be using this
+ // iterator in a try/finally block, and thus will call
+ // close() themselves.
+ catch (QueryCancelledException e) {
+ close();
+ throw e;
+ }
+ }
- @Override
- public void close() {
- db.close() ;
- }
- }
+ @Override
+ public void close() {
+ db.close();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/jena/blob/cdc69f8e/jena-arq/src/test/java/org/apache/jena/sparql/engine/iterator/TestSortedDataBagCancellation.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/test/java/org/apache/jena/sparql/engine/iterator/TestSortedDataBagCancellation.java b/jena-arq/src/test/java/org/apache/jena/sparql/engine/iterator/TestSortedDataBagCancellation.java
index 2220a39..0109ba8 100644
--- a/jena-arq/src/test/java/org/apache/jena/sparql/engine/iterator/TestSortedDataBagCancellation.java
+++ b/jena-arq/src/test/java/org/apache/jena/sparql/engine/iterator/TestSortedDataBagCancellation.java
@@ -55,191 +55,179 @@ import junit.framework.TestCase;
test the SortedDataBag correctly?
*/
-public class TestSortedDataBagCancellation extends TestCase
-{
-
- static final BindingMap b1 = BindingFactory.create();
- static final BindingMap b2 = BindingFactory.create();
- static final BindingMap b3 = BindingFactory.create();
- static final BindingMap b4 = BindingFactory.create();
-
- static
- {
- b1.add(Var.alloc("v1"), NodeFactory.createLiteral("alpha"));
- b2.add(Var.alloc("v2"), NodeFactory.createLiteral("beta"));
- b3.add(Var.alloc("v3"), NodeFactory.createLiteral("gamma"));
- b4.add(Var.alloc("v4"), NodeFactory.createLiteral("delta"));
- }
-
- final Context params = new Context();
-
- final OpExecutorFactory factory = new OpExecutorFactory() {
-
- @Override public OpExecutor create(ExecutionContext ec) {
- throw new UnsupportedOperationException();
- }
- };
-
- final Graph activeGraph = new GraphMemPlain();
-
- final DatasetGraph dataset = DatasetGraphFactory.create();
-
- final List<SortCondition> conditions = new ArrayList<SortCondition>();
-
- final ExecutionContext ec = new ExecutionContext
- (
- params,
- activeGraph,
- dataset,
- factory
- );
-
- final BindingComparator base_bc = new BindingComparator(conditions, ec);
- final SpecialBindingComparator bc = new SpecialBindingComparator(base_bc, ec);
-
- QueryIteratorItems baseIter = new QueryIteratorItems();
-
- {
- baseIter.bindings.add(b1);
- baseIter.bindings.add(b2);
- baseIter.bindings.add(b3);
- baseIter.bindings.add(b4);;
- }
-
- QueryIterSort qs = new QueryIterSort(baseIter, bc, ec);
-
- /**
- In this test, the iterator is not cancelled;
- all items should be delivered, and the compare
- count should be monotonic-nondecreasing.
- */
- @Test public void testIteratesToCompletion()
- {
- int count = 0;
- assertEquals(0, count = bc.count);
- Set<Binding> results = new HashSet<Binding>();
-
- assertTrue(qs.hasNext());
- assertTrue(bc.count >= count); count = bc.count;
- results.add(qs.next());
-
- assertTrue(qs.hasNext());
- assertTrue(bc.count >= count); count = bc.count;
- results.add(qs.next());
-
- assertTrue(qs.hasNext());
- assertTrue(bc.count >= count); count = bc.count;
- results.add(qs.next());
-
- assertTrue(qs.hasNext());
- assertTrue(bc.count >= count); count = bc.count;
- results.add(qs.next());
-
- assertFalse(qs.hasNext());
-
- Set<Binding> expected = new HashSet<Binding>();
- expected.add(b1);
- expected.add(b2);
- expected.add(b3);
- expected.add(b4);
-
- assertEquals(expected, results);
- }
-
- /**
- In this test, the iterator is cancelled after
- the first result is delivered. Any attempt to
- run the comparator should be trapped an exception
- thrown. The iterators should deliver no more values.
- */
- @Test public void testIteratesWithCancellation()
- {
- int count = 0;
- assertEquals(0, count = bc.count);
- Set<Binding> results = new HashSet<Binding>();
-
- assertTrue(qs.hasNext());
- assertTrue(bc.count >= count); count = bc.count;
- results.add(qs.next());
-
- qs.cancel();
- try
- {
- bc.noMoreCalls();
- while (qs.hasNext()) qs.next();
- }
- catch (QueryCancelledException qe)
- {
- assertTrue(qs.db.isCancelled());
- return;
-
- }
- fail("query was not cancelled");
- }
-
- /**
- A QueryIterator that delivers the elements of a list of bindings.
- */
- private static final class QueryIteratorItems extends QueryIteratorBase
- {
- List<Binding> bindings = new ArrayList<Binding>();
- int index = 0;
-
- @Override
- public void output(IndentedWriter out, SerializationContext sCxt) {
- out.write("a QueryIteratorItems");
- }
-
- @Override
- protected boolean hasNextBinding()
- {
- return index < bindings.size();
- }
-
- @Override
- protected Binding moveToNextBinding()
- {
- index += 1;
- return bindings.get(index - 1);
- }
-
- @Override
- protected void closeIterator()
- {
- }
-
- @Override
- protected void requestCancel()
- {
- }
- }
-
- /**
- A BindingComparator that wraps another BindingComparator
- and counts how many times compare() is called.
- */
- static class SpecialBindingComparator extends BindingComparator
- {
- final BindingComparator base;
- int count = 0;
- boolean trapCompare = false;
-
- public SpecialBindingComparator(BindingComparator base, ExecutionContext ec)
- {
- super(base.getConditions(), ec);
- this.base = base;
- }
-
- public void noMoreCalls() {
- trapCompare = true;
- }
-
- @Override
- public int compare(Binding x, Binding y)
- {
- if (trapCompare) throw new RuntimeException("compare() no longer allowed.");
- count += 1;
- return base.compare(x, y);
- }
- }
+public class TestSortedDataBagCancellation extends TestCase {
+
+ static final BindingMap b1 = BindingFactory.create();
+ static final BindingMap b2 = BindingFactory.create();
+ static final BindingMap b3 = BindingFactory.create();
+ static final BindingMap b4 = BindingFactory.create();
+
+ static {
+ b1.add(Var.alloc("v1"), NodeFactory.createLiteral("alpha"));
+ b2.add(Var.alloc("v2"), NodeFactory.createLiteral("beta"));
+ b3.add(Var.alloc("v3"), NodeFactory.createLiteral("gamma"));
+ b4.add(Var.alloc("v4"), NodeFactory.createLiteral("delta"));
+ }
+
+ final Context params = new Context();
+
+ final OpExecutorFactory factory = new OpExecutorFactory() {
+
+ @Override
+ public OpExecutor create(ExecutionContext ec) {
+ throw new UnsupportedOperationException();
+ }
+ };
+
+ final Graph activeGraph = new GraphMemPlain();
+
+ final DatasetGraph dataset = DatasetGraphFactory.create();
+
+ final List<SortCondition> conditions = new ArrayList<SortCondition>();
+
+ final ExecutionContext ec = new ExecutionContext(params, activeGraph, dataset, factory);
+
+ final BindingComparator base_bc = new BindingComparator(conditions, ec);
+ final SpecialBindingComparator bc = new SpecialBindingComparator(base_bc, ec);
+
+ QueryIteratorItems baseIter = new QueryIteratorItems();
+
+ {
+ baseIter.bindings.add(b1);
+ baseIter.bindings.add(b2);
+ baseIter.bindings.add(b3);
+ baseIter.bindings.add(b4);
+ ;
+ }
+
+ QueryIterSort qs = new QueryIterSort(baseIter, bc, ec);
+
+ /**
+ * In this test, the iterator is not cancelled; all items should be
+ * delivered, and the compare count should be monotonic-nondecreasing.
+ */
+ @Test
+ public void testIteratesToCompletion() {
+ int count = 0;
+ assertEquals(0, count = bc.count);
+ Set<Binding> results = new HashSet<Binding>();
+
+ assertTrue(qs.hasNext());
+ assertTrue(bc.count >= count);
+ count = bc.count;
+ results.add(qs.next());
+
+ assertTrue(qs.hasNext());
+ assertTrue(bc.count >= count);
+ count = bc.count;
+ results.add(qs.next());
+
+ assertTrue(qs.hasNext());
+ assertTrue(bc.count >= count);
+ count = bc.count;
+ results.add(qs.next());
+
+ assertTrue(qs.hasNext());
+ assertTrue(bc.count >= count);
+ count = bc.count;
+ results.add(qs.next());
+
+ assertFalse(qs.hasNext());
+
+ Set<Binding> expected = new HashSet<Binding>();
+ expected.add(b1);
+ expected.add(b2);
+ expected.add(b3);
+ expected.add(b4);
+
+ assertEquals(expected, results);
+ }
+
+ /**
+ * In this test, the iterator is cancelled after the first result is
+ * delivered. Any attempt to run the comparator should be trapped an
+ * exception thrown. The iterators should deliver no more values.
+ */
+ @Test
+ public void testIteratesWithCancellation() {
+ int count = 0;
+ assertEquals(0, count = bc.count);
+ Set<Binding> results = new HashSet<Binding>();
+
+ assertTrue(qs.hasNext());
+ assertTrue(bc.count >= count);
+ count = bc.count;
+ results.add(qs.next());
+
+ qs.cancel();
+ try {
+ bc.noMoreCalls();
+ while (qs.hasNext())
+ qs.next();
+ } catch (QueryCancelledException qe) {
+ assertTrue(qs.db.isCancelled());
+ return;
+
+ }
+ fail("query was not cancelled");
+ }
+
+ /**
+ * A QueryIterator that delivers the elements of a list of bindings.
+ */
+ private static final class QueryIteratorItems extends QueryIteratorBase {
+ List<Binding> bindings = new ArrayList<Binding>();
+ int index = 0;
+
+ @Override
+ public void output(IndentedWriter out, SerializationContext sCxt) {
+ out.write("a QueryIteratorItems");
+ }
+
+ @Override
+ protected boolean hasNextBinding() {
+ return index < bindings.size();
+ }
+
+ @Override
+ protected Binding moveToNextBinding() {
+ index += 1;
+ return bindings.get(index - 1);
+ }
+
+ @Override
+ protected void closeIterator() {
+ }
+
+ @Override
+ protected void requestCancel() {
+ }
+ }
+
+ /**
+ * A BindingComparator that wraps another BindingComparator and counts how
+ * many times compare() is called.
+ */
+ static class SpecialBindingComparator extends BindingComparator {
+ final BindingComparator base;
+ int count = 0;
+ boolean trapCompare = false;
+
+ public SpecialBindingComparator(BindingComparator base, ExecutionContext ec) {
+ super(base.getConditions(), ec);
+ this.base = base;
+ }
+
+ public void noMoreCalls() {
+ trapCompare = true;
+ }
+
+ @Override
+ public int compare(Binding x, Binding y) {
+ if (trapCompare)
+ throw new RuntimeException("compare() no longer allowed.");
+ count += 1;
+ return base.compare(x, y);
+ }
+ }
}
\ No newline at end of file