You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by an...@apache.org on 2016/07/27 12:06:30 UTC
[01/11] jena git commit: Fixes for JENA-1212
Repository: jena
Updated Branches:
refs/heads/master 1479b428f -> 275fa9c79
Fixes for JENA-1212
Project: http://git-wip-us.apache.org/repos/asf/jena/repo
Commit: http://git-wip-us.apache.org/repos/asf/jena/commit/d32cf8f4
Tree: http://git-wip-us.apache.org/repos/asf/jena/tree/d32cf8f4
Diff: http://git-wip-us.apache.org/repos/asf/jena/diff/d32cf8f4
Branch: refs/heads/master
Commit: d32cf8f45eab662618920731c065ade53dd951c9
Parents: 9230bbd
Author: Chris Dollin <eh...@googlemail.com>
Authored: Mon Jul 18 15:40:00 2016 +0100
Committer: Chris Dollin <eh...@googlemail.com>
Committed: Mon Jul 18 15:40:00 2016 +0100
----------------------------------------------------------------------
.../apache/jena/atlas/data/SortedDataBag.java | 115 +++++++++++++++----
.../sparql/engine/iterator/QueryIterSort.java | 1 +
2 files changed, 92 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/jena/blob/d32cf8f4/jena-arq/src/main/java/org/apache/jena/atlas/data/SortedDataBag.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/main/java/org/apache/jena/atlas/data/SortedDataBag.java b/jena-arq/src/main/java/org/apache/jena/atlas/data/SortedDataBag.java
index 3deacce..0102935 100644
--- a/jena-arq/src/main/java/org/apache/jena/atlas/data/SortedDataBag.java
+++ b/jena-arq/src/main/java/org/apache/jena/atlas/data/SortedDataBag.java
@@ -69,19 +69,81 @@ public class SortedDataBag<E> extends AbstractDataBag<E>
protected final ThresholdPolicy<E> policy;
protected final SerializationFactory<E> serializationFactory;
- protected final Comparator<? super E> comparator;
+ protected final CanAbortComparator comparator;
protected boolean finishedAdding = false;
protected boolean spilled = false;
protected boolean closed = false;
+ protected volatile boolean cancelled;
public SortedDataBag(ThresholdPolicy<E> policy, SerializationFactory<E> serializerFactory, Comparator<? super E> comparator)
{
this.policy = policy;
this.serializationFactory = serializerFactory;
- this.comparator = comparator;
+ this.comparator = new CanAbortComparator(comparator);
}
+ private final class CanAbortComparator implements Comparator<E>
+ {
+ /**
+ The test for whether the sort has been cancelled is
+ performed every <code>cancelTestFrequency</code> comparisons.
+ This reduces the (presumed) overhead of access to a
+ volatile boolean.
+ */
+ static final int cancelTestFrequency = 10000;
+
+ /**
+ Count of the number of times this comparator has been called.
+ */
+ int count = 0;
+
+ final Comparator<? super E> baseComparator;
+
+ public CanAbortComparator(Comparator<? super E> comparator)
+ {
+ this.baseComparator = comparator;
+ }
+
+ @Override public int compare(E o1, E o2)
+ {
+ count += 1;
+ if (count % cancelTestFrequency == 0)
+ {
+ if (cancelled) throw new AbandonSort();
+ }
+ return baseComparator.compare(o1, o2);
+ }
+
+ /**
+ Sort the array <code>e</code> using this comparator
+ with the additional ability to abort the sort.
+ */
+ public boolean abortableSort(E[] e) {
+ try { Arrays.sort(e, this); }
+ catch (AbandonSort s) { return true; }
+ return false;
+ }
+ }
+
+ /**
+ <code>AbandonSort</code> is the exception thrown from
+ <code>CanAbortComparator</code> to abandon a sort.
+ */
+ public static class AbandonSort extends RuntimeException
+ {
+ private static final long serialVersionUID = 1L;
+ }
+
+ /**
+ cancel arranges that further comparisons using the supplied
+ comparator will abandon the sort in progress.
+ */
+ public void cancel()
+ {
+ cancelled = true;
+ }
+
protected void checkClosed()
{
if (closed) throw new AtlasException("SortedDataBag is closed, no operations can be performed on it.") ;
@@ -118,7 +180,7 @@ public class SortedDataBag<E> extends AbstractDataBag<E>
}
}
- @SuppressWarnings({ "unchecked", "rawtypes" })
+ @SuppressWarnings({ "unchecked" })
protected void spill()
{
// Make sure we have something to spill.
@@ -134,25 +196,26 @@ public class SortedDataBag<E> extends AbstractDataBag<E>
throw new AtlasException(e);
}
- // Sort the tuples
- // Collections.sort() will copy to an array, sort, and then copy back. Avoid that
- // extra copy by copying to an array and using Arrays.sort(). Also it lets us use
- // Collection<E> instead of List<E> as the type for the memory object. Unfortunately
- // because of Java's crazy generics we have to do it as an Object array.
- Object[] array = memory.toArray();
- Arrays.sort(array, (Comparator)comparator);
+ // Sort the tuples as an array. The CanAbortComparator will sort that
+ // array using Arrays.sort. The cast to E[] is safe. If the sort is
+ // aborted, don't bother messing around with the serialisation. We'll
+ // never get around to using it anyway.
- Sink<E> serializer = serializationFactory.createSerializer(out);
- try
+ E[] array = (E[]) memory.toArray();
+ if (!comparator.abortableSort(array))
{
- for (Object tuple : array)
- {
- serializer.send((E)tuple);
- }
- }
- finally
- {
- serializer.close();
+ Sink<E> serializer = serializationFactory.createSerializer(out);
+ try
+ {
+ for (Object tuple : array)
+ {
+ serializer.send((E)tuple);
+ }
+ }
+ finally
+ {
+ serializer.close();
+ }
}
spilled = true;
@@ -189,7 +252,7 @@ public class SortedDataBag<E> extends AbstractDataBag<E>
return iterator(getSpillFiles().size());
}
- @SuppressWarnings({ "unchecked", "rawtypes" })
+ @SuppressWarnings({ "unchecked" })
private Iterator<E> iterator(int size)
{
checkClosed();
@@ -200,9 +263,13 @@ public class SortedDataBag<E> extends AbstractDataBag<E>
if (!finishedAdding && memSize > 1)
{
// Again, some ugliness for speed
- Object[] array = memory.toArray();
- Arrays.sort(array, (Comparator)comparator);
- memory = Arrays.asList((E[])array);
+ E[] array = (E[]) memory.toArray();
+ if (comparator.abortableSort(array))
+ {
+ // if we comment this back in, we lose the timeout message!
+ // return Iter.nullIterator();
+ }
+ memory = Arrays.asList(array);
}
finishedAdding = true;
http://git-wip-us.apache.org/repos/asf/jena/blob/d32cf8f4/jena-arq/src/main/java/org/apache/jena/sparql/engine/iterator/QueryIterSort.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/engine/iterator/QueryIterSort.java b/jena-arq/src/main/java/org/apache/jena/sparql/engine/iterator/QueryIterSort.java
index ecba3a9..7824e3f 100644
--- a/jena-arq/src/main/java/org/apache/jena/sparql/engine/iterator/QueryIterSort.java
+++ b/jena-arq/src/main/java/org/apache/jena/sparql/engine/iterator/QueryIterSort.java
@@ -67,6 +67,7 @@ public class QueryIterSort extends QueryIterPlainWrapper
@Override
public void requestCancel()
{
+ this.db.cancel();
this.embeddedIterator.cancel() ;
super.requestCancel() ;
}
[03/11] jena git commit: Renamed CanAbortComparator to
AbortableComparator.
Posted by an...@apache.org.
Renamed CanAbortComparator to AbortableComparator.
Moved volatile boolean into AbortableComparator and give that
class a cancel method, called from SortedDataBag, and make
AbortableComparator a static class so the management of the
comparator is explicit.
Project: http://git-wip-us.apache.org/repos/asf/jena/repo
Commit: http://git-wip-us.apache.org/repos/asf/jena/commit/5d09cb51
Tree: http://git-wip-us.apache.org/repos/asf/jena/tree/5d09cb51
Diff: http://git-wip-us.apache.org/repos/asf/jena/diff/5d09cb51
Branch: refs/heads/master
Commit: 5d09cb51f42635989ce8ead5e032a15bcf672100
Parents: 4493dd3
Author: Chris Dollin <eh...@googlemail.com>
Authored: Tue Jul 19 12:13:03 2016 +0100
Committer: Chris Dollin <eh...@googlemail.com>
Committed: Tue Jul 19 12:13:03 2016 +0100
----------------------------------------------------------------------
.../apache/jena/atlas/data/SortedDataBag.java | 24 ++++++++++++++------
1 file changed, 17 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/jena/blob/5d09cb51/jena-arq/src/main/java/org/apache/jena/atlas/data/SortedDataBag.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/main/java/org/apache/jena/atlas/data/SortedDataBag.java b/jena-arq/src/main/java/org/apache/jena/atlas/data/SortedDataBag.java
index f01e9d6..68de3f6 100644
--- a/jena-arq/src/main/java/org/apache/jena/atlas/data/SortedDataBag.java
+++ b/jena-arq/src/main/java/org/apache/jena/atlas/data/SortedDataBag.java
@@ -69,21 +69,20 @@ public class SortedDataBag<E> extends AbstractDataBag<E>
protected final ThresholdPolicy<E> policy;
protected final SerializationFactory<E> serializationFactory;
- protected final CanAbortComparator comparator;
+ protected final AbortableComparator<E> comparator;
protected boolean finishedAdding = false;
protected boolean spilled = false;
protected boolean closed = false;
- protected volatile boolean cancelled;
public SortedDataBag(ThresholdPolicy<E> policy, SerializationFactory<E> serializerFactory, Comparator<? super E> comparator)
{
this.policy = policy;
this.serializationFactory = serializerFactory;
- this.comparator = new CanAbortComparator(comparator);
+ this.comparator = new AbortableComparator<E>(comparator);
}
- private final class CanAbortComparator implements Comparator<E>
+ private static final class AbortableComparator<E> implements Comparator<E>
{
/**
The test for whether the sort has been cancelled is
@@ -98,9 +97,11 @@ public class SortedDataBag<E> extends AbstractDataBag<E>
*/
int count = 0;
+ protected volatile boolean cancelled;
+
final Comparator<? super E> baseComparator;
- public CanAbortComparator(Comparator<? super E> comparator)
+ public AbortableComparator(Comparator<? super E> comparator)
{
this.baseComparator = comparator;
}
@@ -131,11 +132,20 @@ public class SortedDataBag<E> extends AbstractDataBag<E>
}
return false;
}
+
+ /**
+ Arrange that the next on-frequency cancellation test
+ in compare will succeed, aborting the sort.
+ */
+ public void cancel()
+ {
+ cancelled = true;
+ }
}
/**
<code>AbandonSort</code> is the exception thrown from
- <code>CanAbortComparator</code> to abandon a sort.
+ <code>AbortableComparator</code> to abandon a sort.
*/
public static class AbandonSort extends RuntimeException
{
@@ -148,7 +158,7 @@ public class SortedDataBag<E> extends AbstractDataBag<E>
*/
public void cancel()
{
- cancelled = true;
+ comparator.cancel();
}
protected void checkClosed()
[10/11] jena git commit: remove blank line
Posted by an...@apache.org.
remove blank line
Project: http://git-wip-us.apache.org/repos/asf/jena/repo
Commit: http://git-wip-us.apache.org/repos/asf/jena/commit/222b77d2
Tree: http://git-wip-us.apache.org/repos/asf/jena/tree/222b77d2
Diff: http://git-wip-us.apache.org/repos/asf/jena/diff/222b77d2
Branch: refs/heads/master
Commit: 222b77d2aa02fe9ea199b18280fd194a1798480d
Parents: cdc69f8
Author: Chris Dollin <eh...@googlemail.com>
Authored: Tue Jul 26 13:56:54 2016 +0100
Committer: Chris Dollin <eh...@googlemail.com>
Committed: Tue Jul 26 13:56:54 2016 +0100
----------------------------------------------------------------------
.../jena/sparql/engine/iterator/TestSortedDataBagCancellation.java | 1 -
1 file changed, 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/jena/blob/222b77d2/jena-arq/src/test/java/org/apache/jena/sparql/engine/iterator/TestSortedDataBagCancellation.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/test/java/org/apache/jena/sparql/engine/iterator/TestSortedDataBagCancellation.java b/jena-arq/src/test/java/org/apache/jena/sparql/engine/iterator/TestSortedDataBagCancellation.java
index 0109ba8..2acf7ae 100644
--- a/jena-arq/src/test/java/org/apache/jena/sparql/engine/iterator/TestSortedDataBagCancellation.java
+++ b/jena-arq/src/test/java/org/apache/jena/sparql/engine/iterator/TestSortedDataBagCancellation.java
@@ -167,7 +167,6 @@ public class TestSortedDataBagCancellation extends TestCase {
} catch (QueryCancelledException qe) {
assertTrue(qs.db.isCancelled());
return;
-
}
fail("query was not cancelled");
}
[11/11] jena git commit: JENA-1212: Merge commit 'refs/pull/157/head'
of github.com:apache/jena
Posted by an...@apache.org.
JENA-1212: Merge commit 'refs/pull/157/head' of github.com:apache/jena
This closes #157.
Project: http://git-wip-us.apache.org/repos/asf/jena/repo
Commit: http://git-wip-us.apache.org/repos/asf/jena/commit/275fa9c7
Tree: http://git-wip-us.apache.org/repos/asf/jena/tree/275fa9c7
Diff: http://git-wip-us.apache.org/repos/asf/jena/diff/275fa9c7
Branch: refs/heads/master
Commit: 275fa9c798e9a75ae844f2ef31e3cfcf42bcd94f
Parents: 1479b42 222b77d
Author: Andy Seaborne <an...@apache.org>
Authored: Wed Jul 27 13:01:16 2016 +0100
Committer: Andy Seaborne <an...@apache.org>
Committed: Wed Jul 27 13:01:16 2016 +0100
----------------------------------------------------------------------
.../jena/atlas/data/AbortableComparator.java | 71 ++++
.../apache/jena/atlas/data/SortedDataBag.java | 426 +++++++++----------
.../sparql/engine/iterator/QueryIterSort.java | 102 +++--
.../iterator/TestSortedDataBagCancellation.java | 232 ++++++++++
4 files changed, 546 insertions(+), 285 deletions(-)
----------------------------------------------------------------------
[02/11] jena git commit: Layout: adopt usual brace styling for
abortableSort and CanAbortComparator.
Posted by an...@apache.org.
Layout: adopt usual brace styling for abortableSort and CanAbortComparator.
Project: http://git-wip-us.apache.org/repos/asf/jena/repo
Commit: http://git-wip-us.apache.org/repos/asf/jena/commit/4493dd3b
Tree: http://git-wip-us.apache.org/repos/asf/jena/tree/4493dd3b
Diff: http://git-wip-us.apache.org/repos/asf/jena/diff/4493dd3b
Branch: refs/heads/master
Commit: 4493dd3b570223bbcc00cfd4d4e87be2da02a055
Parents: d32cf8f
Author: Chris Dollin <eh...@googlemail.com>
Authored: Tue Jul 19 11:56:50 2016 +0100
Committer: Chris Dollin <eh...@googlemail.com>
Committed: Tue Jul 19 11:56:50 2016 +0100
----------------------------------------------------------------------
.../org/apache/jena/atlas/data/SortedDataBag.java | 17 ++++++++++++-----
1 file changed, 12 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/jena/blob/4493dd3b/jena-arq/src/main/java/org/apache/jena/atlas/data/SortedDataBag.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/main/java/org/apache/jena/atlas/data/SortedDataBag.java b/jena-arq/src/main/java/org/apache/jena/atlas/data/SortedDataBag.java
index 0102935..f01e9d6 100644
--- a/jena-arq/src/main/java/org/apache/jena/atlas/data/SortedDataBag.java
+++ b/jena-arq/src/main/java/org/apache/jena/atlas/data/SortedDataBag.java
@@ -101,9 +101,9 @@ public class SortedDataBag<E> extends AbstractDataBag<E>
final Comparator<? super E> baseComparator;
public CanAbortComparator(Comparator<? super E> comparator)
- {
+ {
this.baseComparator = comparator;
- }
+ }
@Override public int compare(E o1, E o2)
{
@@ -119,9 +119,16 @@ public class SortedDataBag<E> extends AbstractDataBag<E>
Sort the array <code>e</code> using this comparator
with the additional ability to abort the sort.
*/
- public boolean abortableSort(E[] e) {
- try { Arrays.sort(e, this); }
- catch (AbandonSort s) { return true; }
+ public boolean abortableSort(E[] e)
+ {
+ try
+ {
+ Arrays.sort(e, this);
+ }
+ catch (AbandonSort s)
+ {
+ return true;
+ }
return false;
}
}
[08/11] jena git commit: Added @Override in declaration of
SpecialBindingComparator.compare().
Posted by an...@apache.org.
Added @Override in declaration of SpecialBindingComparator.compare().
Project: http://git-wip-us.apache.org/repos/asf/jena/repo
Commit: http://git-wip-us.apache.org/repos/asf/jena/commit/cc76ba72
Tree: http://git-wip-us.apache.org/repos/asf/jena/tree/cc76ba72
Diff: http://git-wip-us.apache.org/repos/asf/jena/diff/cc76ba72
Branch: refs/heads/master
Commit: cc76ba72dcba387b798bde223095bc683bfd3127
Parents: 512b2a5
Author: Chris Dollin <eh...@googlemail.com>
Authored: Mon Jul 25 09:21:54 2016 +0100
Committer: Chris Dollin <eh...@googlemail.com>
Committed: Mon Jul 25 09:21:54 2016 +0100
----------------------------------------------------------------------
.../jena/sparql/engine/iterator/TestSortedDataBagCancellation.java | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/jena/blob/cc76ba72/jena-arq/src/test/java/org/apache/jena/sparql/engine/iterator/TestSortedDataBagCancellation.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/test/java/org/apache/jena/sparql/engine/iterator/TestSortedDataBagCancellation.java b/jena-arq/src/test/java/org/apache/jena/sparql/engine/iterator/TestSortedDataBagCancellation.java
index c29552e..2220a39 100644
--- a/jena-arq/src/test/java/org/apache/jena/sparql/engine/iterator/TestSortedDataBagCancellation.java
+++ b/jena-arq/src/test/java/org/apache/jena/sparql/engine/iterator/TestSortedDataBagCancellation.java
@@ -234,6 +234,7 @@ public class TestSortedDataBagCancellation extends TestCase
trapCompare = true;
}
+ @Override
public int compare(Binding x, Binding y)
{
if (trapCompare) throw new RuntimeException("compare() no longer allowed.");
[06/11] jena git commit: Eliminate unnecessary "only test cancelled
every Nth compare" code. Volatile access is cheap enough.
Posted by an...@apache.org.
Eliminate unnecessary "only test cancelled every Nth compare" code.
Volatile access is cheap enough.
Project: http://git-wip-us.apache.org/repos/asf/jena/repo
Commit: http://git-wip-us.apache.org/repos/asf/jena/commit/a3f3fdcb
Tree: http://git-wip-us.apache.org/repos/asf/jena/tree/a3f3fdcb
Diff: http://git-wip-us.apache.org/repos/asf/jena/diff/a3f3fdcb
Branch: refs/heads/master
Commit: a3f3fdcb0e816951bee4dd3c15ac095a44f33adf
Parents: 205671c
Author: Chris Dollin <eh...@googlemail.com>
Authored: Wed Jul 20 13:04:45 2016 +0100
Committer: Chris Dollin <eh...@googlemail.com>
Committed: Wed Jul 20 13:04:45 2016 +0100
----------------------------------------------------------------------
.../jena/atlas/data/AbortableComparator.java | 21 ++------------------
1 file changed, 2 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/jena/blob/a3f3fdcb/jena-arq/src/main/java/org/apache/jena/atlas/data/AbortableComparator.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/main/java/org/apache/jena/atlas/data/AbortableComparator.java b/jena-arq/src/main/java/org/apache/jena/atlas/data/AbortableComparator.java
index 408bd0e..f4286cf 100644
--- a/jena-arq/src/main/java/org/apache/jena/atlas/data/AbortableComparator.java
+++ b/jena-arq/src/main/java/org/apache/jena/atlas/data/AbortableComparator.java
@@ -30,7 +30,7 @@ public final class AbortableComparator<E> implements Comparator<E>
/**
<code>AbandonSort</code> is the exception thrown from
<code>AbortableComparator</code> to abandon a sort.
- */
+ */
public static class AbandonSort extends RuntimeException
{
private static final long serialVersionUID = 1L;
@@ -38,30 +38,13 @@ public final class AbortableComparator<E> implements Comparator<E>
public static enum Finish {COMPLETED, ABORTED}
- /**
- The test for whether the sort has been cancelled is
- performed every <code>cancelTestFrequency</code> comparisons.
- This reduces the (presumed) overhead of access to a
- volatile boolean.
- */
- static final int cancelTestFrequency = 10000;
-
- /**
- Count of the number of times this comparator has been called.
- */
- int count = 0;
-
protected volatile boolean cancelled;
final Comparator<? super E> baseComparator;
@Override public int compare(E o1, E o2)
{
- count += 1;
- if (count % cancelTestFrequency == 0)
- {
- if (cancelled) throw new AbandonSort();
- }
+ if (cancelled) throw new AbandonSort();
return baseComparator.compare(o1, o2);
}
[05/11] jena git commit: Clarify return value from abortableSort.
Posted by an...@apache.org.
Clarify return value from abortableSort.
Rather than a boolean (true => cancelled), return one of
Finish.ABORTED or Finish.COMPLETED as appropriate.
Project: http://git-wip-us.apache.org/repos/asf/jena/repo
Commit: http://git-wip-us.apache.org/repos/asf/jena/commit/205671cf
Tree: http://git-wip-us.apache.org/repos/asf/jena/tree/205671cf
Diff: http://git-wip-us.apache.org/repos/asf/jena/diff/205671cf
Branch: refs/heads/master
Commit: 205671cf292ed0213d6fc4791bd2903a79b9e383
Parents: d4c8962
Author: Chris Dollin <eh...@googlemail.com>
Authored: Tue Jul 19 14:57:18 2016 +0100
Committer: Chris Dollin <eh...@googlemail.com>
Committed: Tue Jul 19 14:57:18 2016 +0100
----------------------------------------------------------------------
.../jena/atlas/data/AbortableComparator.java | 20 ++--
.../apache/jena/atlas/data/SortedDataBag.java | 10 +-
.../sparql/engine/iterator/QueryIterSort.java | 116 +++++++++----------
3 files changed, 70 insertions(+), 76 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/jena/blob/205671cf/jena-arq/src/main/java/org/apache/jena/atlas/data/AbortableComparator.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/main/java/org/apache/jena/atlas/data/AbortableComparator.java b/jena-arq/src/main/java/org/apache/jena/atlas/data/AbortableComparator.java
index 7be58ec..408bd0e 100644
--- a/jena-arq/src/main/java/org/apache/jena/atlas/data/AbortableComparator.java
+++ b/jena-arq/src/main/java/org/apache/jena/atlas/data/AbortableComparator.java
@@ -27,14 +27,16 @@ public final class AbortableComparator<E> implements Comparator<E>
this.baseComparator = comparator;
}
- /**
- <code>AbandonSort</code> is the exception thrown from
- <code>AbortableComparator</code> to abandon a sort.
- */
+ /**
+ <code>AbandonSort</code> is the exception thrown from
+ <code>AbortableComparator</code> to abandon a sort.
+ */
public static class AbandonSort extends RuntimeException
- {
+ {
private static final long serialVersionUID = 1L;
- }
+ }
+
+ public static enum Finish {COMPLETED, ABORTED}
/**
The test for whether the sort has been cancelled is
@@ -67,7 +69,7 @@ public final class AbortableComparator<E> implements Comparator<E>
Sort the array <code>e</code> using this comparator
with the additional ability to abort the sort.
*/
- public boolean abortableSort(E[] e)
+ public Finish abortableSort(E[] e)
{
try
{
@@ -75,9 +77,9 @@ public final class AbortableComparator<E> implements Comparator<E>
}
catch (AbandonSort s)
{
- return true;
+ return Finish.ABORTED;
}
- return false;
+ return Finish.COMPLETED;
}
/**
http://git-wip-us.apache.org/repos/asf/jena/blob/205671cf/jena-arq/src/main/java/org/apache/jena/atlas/data/SortedDataBag.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/main/java/org/apache/jena/atlas/data/SortedDataBag.java b/jena-arq/src/main/java/org/apache/jena/atlas/data/SortedDataBag.java
index 9ae78e2..b248006 100644
--- a/jena-arq/src/main/java/org/apache/jena/atlas/data/SortedDataBag.java
+++ b/jena-arq/src/main/java/org/apache/jena/atlas/data/SortedDataBag.java
@@ -32,6 +32,7 @@ import java.util.NoSuchElementException ;
import java.util.PriorityQueue ;
import org.apache.jena.atlas.AtlasException ;
+import org.apache.jena.atlas.data.AbortableComparator.Finish;
import org.apache.jena.atlas.iterator.Iter ;
import org.apache.jena.atlas.iterator.IteratorResourceClosing ;
import org.apache.jena.atlas.lib.Closeable ;
@@ -149,7 +150,7 @@ public class SortedDataBag<E> extends AbstractDataBag<E>
// never get around to using it anyway.
E[] array = (E[]) memory.toArray();
- if (!comparator.abortableSort(array))
+ if (comparator.abortableSort(array) == Finish.COMPLETED)
{
Sink<E> serializer = serializationFactory.createSerializer(out);
try
@@ -209,13 +210,8 @@ public class SortedDataBag<E> extends AbstractDataBag<E>
// Constructing an iterator from this class is not thread-safe (just like all the the other methods)
if (!finishedAdding && memSize > 1)
{
- // Again, some ugliness for speed
E[] array = (E[]) memory.toArray();
- if (comparator.abortableSort(array))
- {
- // if we comment this back in, we lose the timeout message!
- // return Iter.nullIterator();
- }
+ comparator.abortableSort(array); // don't care if we aborted or not
memory = Arrays.asList(array);
}
http://git-wip-us.apache.org/repos/asf/jena/blob/205671cf/jena-arq/src/main/java/org/apache/jena/sparql/engine/iterator/QueryIterSort.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/engine/iterator/QueryIterSort.java b/jena-arq/src/main/java/org/apache/jena/sparql/engine/iterator/QueryIterSort.java
index 7824e3f..edaaa5c 100644
--- a/jena-arq/src/main/java/org/apache/jena/sparql/engine/iterator/QueryIterSort.java
+++ b/jena-arq/src/main/java/org/apache/jena/sparql/engine/iterator/QueryIterSort.java
@@ -36,73 +36,69 @@ import org.apache.jena.sparql.engine.QueryIterator ;
import org.apache.jena.sparql.engine.binding.Binding ;
import org.apache.jena.sparql.engine.binding.BindingComparator ;
-/**
- * Sort a query iterator. The sort will happen in-memory unless the size of the
+/**
+ * Sort a query iterator. The sort will happen in-memory unless the size of the
* iterator exceeds a configurable threshold. In that case, a disk sort is used.
*
* @see SortedDataBag
*/
-public class QueryIterSort extends QueryIterPlainWrapper
-{
- private final QueryIterator embeddedIterator; // Keep a record of the underlying source for .cancel.
- final SortedDataBag<Binding> db;
-
- public QueryIterSort(QueryIterator qIter, List<SortCondition> conditions, ExecutionContext context)
- {
- this(qIter, new BindingComparator(conditions, context), context) ;
- }
+public class QueryIterSort extends QueryIterPlainWrapper {
+ private final QueryIterator embeddedIterator ; // Keep a record of the
+ // underlying source for
+ // .cancel.
+ final SortedDataBag<Binding> db ;
- public QueryIterSort(final QueryIterator qIter, final Comparator<Binding> comparator, final ExecutionContext context)
- {
- super(null, context) ;
- this.embeddedIterator = qIter ;
-
- ThresholdPolicy<Binding> policy = ThresholdPolicyFactory.policyFromContext(context.getContext());
- this.db = BagFactory.newSortedBag(policy, SerializationFactoryFinder.bindingSerializationFactory(), comparator);
-
- this.setIterator(new SortedBindingIterator(qIter));
- }
+ public QueryIterSort(QueryIterator qIter, List<SortCondition> conditions, ExecutionContext context) {
+ this(qIter, new BindingComparator(conditions, context), context) ;
+ }
- @Override
- public void requestCancel()
- {
- this.db.cancel();
- this.embeddedIterator.cancel() ;
- super.requestCancel() ;
- }
+ public QueryIterSort(final QueryIterator qIter, final Comparator<Binding> comparator,
+ final ExecutionContext context) {
+ super(null, context) ;
+ this.embeddedIterator = qIter ;
- private class SortedBindingIterator extends IteratorDelayedInitialization<Binding> implements Closeable
- {
- private final QueryIterator qIter;
-
- public SortedBindingIterator(final QueryIterator qIter)
- {
- this.qIter = qIter;
- }
-
- @Override
- protected Iterator<Binding> initializeIterator()
- {
- try
- {
- db.addAll(qIter);
- return db.iterator();
- }
- // Should we catch other exceptions too? Theoretically the user should be using this
- // iterator in a try/finally block, and thus will call close() themselves.
- catch (QueryCancelledException e)
- {
- close();
- throw e;
- }
- }
+ ThresholdPolicy<Binding> policy = ThresholdPolicyFactory.policyFromContext(context.getContext()) ;
+ this.db = BagFactory.newSortedBag(policy, SerializationFactoryFinder.bindingSerializationFactory(),
+ comparator) ;
+
+ this.setIterator(new SortedBindingIterator(qIter)) ;
+ }
+
+ @Override
+ public void requestCancel() {
+ this.db.cancel() ;
+ this.embeddedIterator.cancel() ;
+ super.requestCancel() ;
+ }
+
+ private class SortedBindingIterator extends IteratorDelayedInitialization<Binding> implements Closeable {
+ private final QueryIterator qIter ;
+
+ public SortedBindingIterator(final QueryIterator qIter) {
+ this.qIter = qIter ;
+ }
+
+ @Override
+ protected Iterator<Binding> initializeIterator() {
+ try {
+ db.addAll(qIter) ;
+ return db.iterator() ;
+ }
+ // Should we catch other exceptions too? Theoretically
+ // the user should be using this
+ // iterator in a try/finally block, and thus will call
+ // close() themselves.
+ catch (QueryCancelledException e) {
+ close() ;
+ throw e ;
+ }
+ }
+
+ @Override
+ public void close() {
+ db.close() ;
+ }
+ }
- @Override
- public void close()
- {
- db.close();
- }
- }
-
}
[04/11] jena git commit: Host AbortableComparator in its own file.
Posted by an...@apache.org.
Host AbortableComparator in its own file.
Also include AbandonSort in AbortableComparator (which is the only
place that uses it).
Project: http://git-wip-us.apache.org/repos/asf/jena/repo
Commit: http://git-wip-us.apache.org/repos/asf/jena/commit/d4c89624
Tree: http://git-wip-us.apache.org/repos/asf/jena/tree/d4c89624
Diff: http://git-wip-us.apache.org/repos/asf/jena/diff/d4c89624
Branch: refs/heads/master
Commit: d4c8962409859f62e0152b081c4067c1c4e7f0f2
Parents: 5d09cb5
Author: Chris Dollin <eh...@googlemail.com>
Authored: Tue Jul 19 14:32:04 2016 +0100
Committer: Chris Dollin <eh...@googlemail.com>
Committed: Tue Jul 19 14:32:04 2016 +0100
----------------------------------------------------------------------
.../jena/atlas/data/AbortableComparator.java | 91 ++++++++++++++++++++
.../apache/jena/atlas/data/SortedDataBag.java | 70 ---------------
2 files changed, 91 insertions(+), 70 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/jena/blob/d4c89624/jena-arq/src/main/java/org/apache/jena/atlas/data/AbortableComparator.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/main/java/org/apache/jena/atlas/data/AbortableComparator.java b/jena-arq/src/main/java/org/apache/jena/atlas/data/AbortableComparator.java
new file mode 100644
index 0000000..7be58ec
--- /dev/null
+++ b/jena-arq/src/main/java/org/apache/jena/atlas/data/AbortableComparator.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jena.atlas.data;
+
+import java.util.Arrays;
+import java.util.Comparator;
+
+public final class AbortableComparator<E> implements Comparator<E>
+{
+ public AbortableComparator(Comparator<? super E> comparator)
+ {
+ this.baseComparator = comparator;
+ }
+
+ /**
+ <code>AbandonSort</code> is the exception thrown from
+ <code>AbortableComparator</code> to abandon a sort.
+ */
+ public static class AbandonSort extends RuntimeException
+ {
+ private static final long serialVersionUID = 1L;
+ }
+
+ /**
+ The test for whether the sort has been cancelled is
+ performed every <code>cancelTestFrequency</code> comparisons.
+ This reduces the (presumed) overhead of access to a
+ volatile boolean.
+ */
+ static final int cancelTestFrequency = 10000;
+
+ /**
+ Count of the number of times this comparator has been called.
+ */
+ int count = 0;
+
+ protected volatile boolean cancelled;
+
+ final Comparator<? super E> baseComparator;
+
+ @Override public int compare(E o1, E o2)
+ {
+ count += 1;
+ if (count % cancelTestFrequency == 0)
+ {
+ if (cancelled) throw new AbandonSort();
+ }
+ return baseComparator.compare(o1, o2);
+ }
+
+ /**
+ Sort the array <code>e</code> using this comparator
+ with the additional ability to abort the sort.
+ */
+ public boolean abortableSort(E[] e)
+ {
+ try
+ {
+ Arrays.sort(e, this);
+ }
+ catch (AbandonSort s)
+ {
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ Arrange that the next on-frequency cancellation test
+ in compare will succeed, aborting the sort.
+ */
+ public void cancel()
+ {
+ cancelled = true;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/jena/blob/d4c89624/jena-arq/src/main/java/org/apache/jena/atlas/data/SortedDataBag.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/main/java/org/apache/jena/atlas/data/SortedDataBag.java b/jena-arq/src/main/java/org/apache/jena/atlas/data/SortedDataBag.java
index 68de3f6..9ae78e2 100644
--- a/jena-arq/src/main/java/org/apache/jena/atlas/data/SortedDataBag.java
+++ b/jena-arq/src/main/java/org/apache/jena/atlas/data/SortedDataBag.java
@@ -82,76 +82,6 @@ public class SortedDataBag<E> extends AbstractDataBag<E>
this.comparator = new AbortableComparator<E>(comparator);
}
- private static final class AbortableComparator<E> implements Comparator<E>
- {
- /**
- The test for whether the sort has been cancelled is
- performed every <code>cancelTestFrequency</code> comparisons.
- This reduces the (presumed) overhead of access to a
- volatile boolean.
- */
- static final int cancelTestFrequency = 10000;
-
- /**
- Count of the number of times this comparator has been called.
- */
- int count = 0;
-
- protected volatile boolean cancelled;
-
- final Comparator<? super E> baseComparator;
-
- public AbortableComparator(Comparator<? super E> comparator)
- {
- this.baseComparator = comparator;
- }
-
- @Override public int compare(E o1, E o2)
- {
- count += 1;
- if (count % cancelTestFrequency == 0)
- {
- if (cancelled) throw new AbandonSort();
- }
- return baseComparator.compare(o1, o2);
- }
-
- /**
- Sort the array <code>e</code> using this comparator
- with the additional ability to abort the sort.
- */
- public boolean abortableSort(E[] e)
- {
- try
- {
- Arrays.sort(e, this);
- }
- catch (AbandonSort s)
- {
- return true;
- }
- return false;
- }
-
- /**
- Arrange that the next on-frequency cancellation test
- in compare will succeed, aborting the sort.
- */
- public void cancel()
- {
- cancelled = true;
- }
- }
-
- /**
- <code>AbandonSort</code> is the exception thrown from
- <code>AbortableComparator</code> to abandon a sort.
- */
- public static class AbandonSort extends RuntimeException
- {
- private static final long serialVersionUID = 1L;
- }
-
/**
cancel arranges that further comparisons using the supplied
comparator will abandon the sort in progress.
[09/11] jena git commit: Revised layout -- spaces not tabs, indent 4,
opening brace on same line.
Posted by an...@apache.org.
Revised layout -- spaces not tabs, indent 4, opening brace on same line.
Project: http://git-wip-us.apache.org/repos/asf/jena/repo
Commit: http://git-wip-us.apache.org/repos/asf/jena/commit/cdc69f8e
Tree: http://git-wip-us.apache.org/repos/asf/jena/tree/cdc69f8e
Diff: http://git-wip-us.apache.org/repos/asf/jena/diff/cdc69f8e
Branch: refs/heads/master
Commit: cdc69f8ef28541a9d501d175a8b778f23c302907
Parents: cc76ba7
Author: Chris Dollin <eh...@googlemail.com>
Authored: Mon Jul 25 10:39:09 2016 +0100
Committer: Chris Dollin <eh...@googlemail.com>
Committed: Mon Jul 25 10:39:09 2016 +0100
----------------------------------------------------------------------
.../jena/atlas/data/AbortableComparator.java | 99 +++--
.../apache/jena/atlas/data/SortedDataBag.java | 421 ++++++++-----------
.../sparql/engine/iterator/QueryIterSort.java | 125 +++---
.../iterator/TestSortedDataBagCancellation.java | 362 ++++++++--------
4 files changed, 466 insertions(+), 541 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/jena/blob/cdc69f8e/jena-arq/src/main/java/org/apache/jena/atlas/data/AbortableComparator.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/main/java/org/apache/jena/atlas/data/AbortableComparator.java b/jena-arq/src/main/java/org/apache/jena/atlas/data/AbortableComparator.java
index f4286cf..124c6be 100644
--- a/jena-arq/src/main/java/org/apache/jena/atlas/data/AbortableComparator.java
+++ b/jena-arq/src/main/java/org/apache/jena/atlas/data/AbortableComparator.java
@@ -20,57 +20,52 @@ package org.apache.jena.atlas.data;
import java.util.Arrays;
import java.util.Comparator;
-public final class AbortableComparator<E> implements Comparator<E>
-{
- public AbortableComparator(Comparator<? super E> comparator)
- {
- this.baseComparator = comparator;
- }
-
- /**
- <code>AbandonSort</code> is the exception thrown from
- <code>AbortableComparator</code> to abandon a sort.
- */
- public static class AbandonSort extends RuntimeException
- {
- private static final long serialVersionUID = 1L;
- }
-
- public static enum Finish {COMPLETED, ABORTED}
-
- protected volatile boolean cancelled;
-
- final Comparator<? super E> baseComparator;
+public final class AbortableComparator<E> implements Comparator<E> {
+ public AbortableComparator(Comparator<? super E> comparator) {
+ this.baseComparator = comparator;
+ }
- @Override public int compare(E o1, E o2)
- {
- if (cancelled) throw new AbandonSort();
- return baseComparator.compare(o1, o2);
- }
-
- /**
- Sort the array <code>e</code> using this comparator
- with the additional ability to abort the sort.
- */
- public Finish abortableSort(E[] e)
- {
- try
- {
- Arrays.sort(e, this);
- }
- catch (AbandonSort s)
- {
- return Finish.ABORTED;
- }
- return Finish.COMPLETED;
- }
-
- /**
- Arrange that the next on-frequency cancellation test
- in compare will succeed, aborting the sort.
- */
- public void cancel()
- {
- cancelled = true;
- }
+ /**
+ * <code>AbandonSort</code> is the exception thrown from
+ * <code>AbortableComparator</code> to abandon a sort.
+ */
+ public static class AbandonSort extends RuntimeException {
+ private static final long serialVersionUID = 1L;
+ }
+
+ public static enum Finish {
+ COMPLETED, ABORTED
+ }
+
+ protected volatile boolean cancelled;
+
+ final Comparator<? super E> baseComparator;
+
+ @Override
+ public int compare(E o1, E o2) {
+ if (cancelled)
+ throw new AbandonSort();
+ return baseComparator.compare(o1, o2);
+ }
+
+ /**
+ * Sort the array <code>e</code> using this comparator with the additional
+ * ability to abort the sort.
+ */
+ public Finish abortableSort(E[] e) {
+ try {
+ Arrays.sort(e, this);
+ } catch (AbandonSort s) {
+ return Finish.ABORTED;
+ }
+ return Finish.COMPLETED;
+ }
+
+ /**
+ * Arrange that the next on-frequency cancellation test in compare will
+ * succeed, aborting the sort.
+ */
+ public void cancel() {
+ cancelled = true;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/jena/blob/cdc69f8e/jena-arq/src/main/java/org/apache/jena/atlas/data/SortedDataBag.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/main/java/org/apache/jena/atlas/data/SortedDataBag.java b/jena-arq/src/main/java/org/apache/jena/atlas/data/SortedDataBag.java
index b7c74ae..26ba388 100644
--- a/jena-arq/src/main/java/org/apache/jena/atlas/data/SortedDataBag.java
+++ b/jena-arq/src/main/java/org/apache/jena/atlas/data/SortedDataBag.java
@@ -18,163 +18,153 @@
package org.apache.jena.atlas.data;
-import java.io.File ;
-import java.io.FileNotFoundException ;
-import java.io.IOException ;
-import java.io.InputStream ;
-import java.io.OutputStream ;
-import java.util.ArrayList ;
-import java.util.Arrays ;
-import java.util.Comparator ;
-import java.util.Iterator ;
-import java.util.List ;
-import java.util.NoSuchElementException ;
-import java.util.PriorityQueue ;
-
-import org.apache.jena.atlas.AtlasException ;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.PriorityQueue;
+
+import org.apache.jena.atlas.AtlasException;
import org.apache.jena.atlas.data.AbortableComparator.Finish;
-import org.apache.jena.atlas.iterator.Iter ;
-import org.apache.jena.atlas.iterator.IteratorResourceClosing ;
-import org.apache.jena.atlas.lib.Closeable ;
-import org.apache.jena.atlas.lib.Sink ;
+import org.apache.jena.atlas.iterator.Iter;
+import org.apache.jena.atlas.iterator.IteratorResourceClosing;
+import org.apache.jena.atlas.lib.Closeable;
+import org.apache.jena.atlas.lib.Sink;
/**
* <p>
- * This data bag will gather items in memory until a size threshold is passed, at which point it will write
- * out all of the items to disk using the supplied serializer.
+ * This data bag will gather items in memory until a size threshold is passed,
+ * at which point it will write out all of the items to disk using the supplied
+ * serializer.
* </p>
* <p>
- * After adding is finished, call {@link #iterator()} to set up the data bag for reading back items and iterating over them.
- * The iterator will retrieve the items in sorted order using the supplied comparator.
+ * After adding is finished, call {@link #iterator()} to set up the data bag for
+ * reading back items and iterating over them. The iterator will retrieve the
+ * items in sorted order using the supplied comparator.
* </p>
* <p>
- * IMPORTANT: You may not add any more items after this call. You may subsequently call {@link #iterator()} multiple
- * times which will give you a new iterator for each invocation. If you do not consume the entire iterator, you should
- * call {@link Iter#close(Iterator)} to close any FileInputStreams associated with the iterator.
+ * IMPORTANT: You may not add any more items after this call. You may
+ * subsequently call {@link #iterator()} multiple times which will give you a
+ * new iterator for each invocation. If you do not consume the entire iterator,
+ * you should call {@link Iter#close(Iterator)} to close any FileInputStreams
+ * associated with the iterator.
* </p>
* <p>
- * Additionally, make sure to call {@link #close()} when you are finished to free any system resources (preferably in a finally block).
+ * Additionally, make sure to call {@link #close()} when you are finished to
+ * free any system resources (preferably in a finally block).
* </p>
* <p>
- * Implementation Notes: Data is stored in an ArrayList as it comes in. When it is time to spill, that
- * data is sorted and written to disk. An iterator will read in each file and perform a merge-sort as
- * the results are returned.
+ * Implementation Notes: Data is stored in an ArrayList as it comes in. When it
+ * is time to spill, that data is sorted and written to disk. An iterator will
+ * read in each file and perform a merge-sort as the results are returned.
* </p>
*/
-public class SortedDataBag<E> extends AbstractDataBag<E>
-{
+public class SortedDataBag<E> extends AbstractDataBag<E> {
/**
- * The the maximum number of files to merge at the same time. Without this, you can run out of file handles and other bad things.
+ * The the maximum number of files to merge at the same time. Without this,
+ * you can run out of file handles and other bad things.
*/
- protected static int MAX_SPILL_FILES = 100 ;
-
+ protected static int MAX_SPILL_FILES = 100;
+
protected final ThresholdPolicy<E> policy;
protected final SerializationFactory<E> serializationFactory;
protected final AbortableComparator<E> comparator;
-
+
protected boolean finishedAdding = false;
protected boolean spilled = false;
protected boolean closed = false;
-
- public SortedDataBag(ThresholdPolicy<E> policy, SerializationFactory<E> serializerFactory, Comparator<? super E> comparator)
- {
+
+ public SortedDataBag(ThresholdPolicy<E> policy, SerializationFactory<E> serializerFactory,
+ Comparator<? super E> comparator) {
this.policy = policy;
this.serializationFactory = serializerFactory;
this.comparator = new AbortableComparator<E>(comparator);
}
-
+
/**
- cancel arranges that further comparisons using the supplied
- comparator will abandon the sort in progress.
+ * cancel arranges that further comparisons using the supplied comparator
+ * will abandon the sort in progress.
*/
- public void cancel()
- {
- comparator.cancel();
- }
-
- /**
- isCancelled is true iff cancel has been called on this
- bags comparator. (Used in testing.)
- */
- public boolean isCancelled()
- {
- return comparator.cancelled;
- }
-
- protected void checkClosed()
- {
- if (closed) throw new AtlasException("SortedDataBag is closed, no operations can be performed on it.") ;
+ public void cancel() {
+ comparator.cancel();
}
-
+
+ /**
+ * isCancelled is true iff cancel has been called on this bags comparator.
+ * (Used in testing.)
+ */
+ public boolean isCancelled() {
+ return comparator.cancelled;
+ }
+
+ protected void checkClosed() {
+ if (closed)
+ throw new AtlasException("SortedDataBag is closed, no operations can be performed on it.");
+ }
+
@Override
- public boolean isSorted()
- {
+ public boolean isSorted() {
return true;
}
@Override
- public boolean isDistinct()
- {
+ public boolean isDistinct() {
return false;
}
@Override
- public void add(E item)
- {
+ public void add(E item) {
checkClosed();
if (finishedAdding)
throw new AtlasException("SortedDataBag: Cannot add any more items after the writing phase is complete.");
-
- if (policy.isThresholdExceeded())
- {
+
+ if (policy.isThresholdExceeded()) {
spill();
}
-
- if (memory.add(item))
- {
+
+ if (memory.add(item)) {
policy.increment(item);
size++;
}
}
-
+
@SuppressWarnings({ "unchecked" })
- protected void spill()
- {
+ protected void spill() {
// Make sure we have something to spill.
- if (memory.size() > 0)
- {
+ if (memory.size() > 0) {
OutputStream out;
- try
- {
+ try {
out = getSpillStream();
- }
- catch (IOException e)
- {
+ } catch (IOException e) {
throw new AtlasException(e);
}
-
- // Sort the tuples as an array. The CanAbortComparator will sort that
+
+ // Sort the tuples as an array. The CanAbortComparator will sort
+ // that
// array using Arrays.sort. The cast to E[] is safe. If the sort is
- // aborted, don't bother messing around with the serialisation. We'll
+ // aborted, don't bother messing around with the serialisation.
+ // We'll
// never get around to using it anyway.
-
+
E[] array = (E[]) memory.toArray();
- if (comparator.abortableSort(array) == Finish.COMPLETED)
- {
- Sink<E> serializer = serializationFactory.createSerializer(out);
- try
- {
- for (Object tuple : array)
- {
- serializer.send((E)tuple);
- }
- }
- finally
- {
- serializer.close();
- }
+ if (comparator.abortableSort(array) == Finish.COMPLETED) {
+ Sink<E> serializer = serializationFactory.createSerializer(out);
+ try {
+ for (Object tuple : array) {
+ serializer.send((E) tuple);
+ }
+ } finally {
+ serializer.close();
+ }
}
-
+
spilled = true;
policy.reset();
memory.clear();
@@ -182,272 +172,225 @@ public class SortedDataBag<E> extends AbstractDataBag<E>
}
@Override
- public void flush()
- {
+ public void flush() {
spill();
}
-
- protected Iterator<E> getInputIterator(File spillFile) throws FileNotFoundException
- {
+
+ protected Iterator<E> getInputIterator(File spillFile) throws FileNotFoundException {
InputStream in = getInputStream(spillFile);
- Iterator<E> deserializer = serializationFactory.createDeserializer(in) ;
+ Iterator<E> deserializer = serializationFactory.createDeserializer(in);
return new IteratorResourceClosing<>(deserializer, in);
}
/**
- * Returns an iterator over a set of elements of type E. If you do not exhaust
- * the iterator, you should call {@link org.apache.jena.atlas.iterator.Iter#close(Iterator)}
- * to be sure any open file handles are closed.
+ * Returns an iterator over a set of elements of type E. If you do not
+ * exhaust the iterator, you should call
+ * {@link org.apache.jena.atlas.iterator.Iter#close(Iterator)} to be sure
+ * any open file handles are closed.
*
* @return an Iterator
*/
@Override
- public Iterator<E> iterator()
- {
+ public Iterator<E> iterator() {
preMerge();
return iterator(getSpillFiles().size());
}
@SuppressWarnings({ "unchecked" })
- private Iterator<E> iterator(int size)
- {
+ private Iterator<E> iterator(int size) {
checkClosed();
-
+
int memSize = memory.size();
-
- // Constructing an iterator from this class is not thread-safe (just like all the the other methods)
- if (!finishedAdding && memSize > 1)
- {
+
+ // Constructing an iterator from this class is not thread-safe (just
+ // like all the the other methods)
+ if (!finishedAdding && memSize > 1) {
E[] array = (E[]) memory.toArray();
comparator.abortableSort(array); // don't care if we aborted or not
memory = Arrays.asList(array);
}
-
+
finishedAdding = true;
-
- if (spilled)
- {
+
+ if (spilled) {
List<Iterator<E>> inputs = new ArrayList<>(size + (memSize > 0 ? 1 : 0));
-
- if (memSize > 0)
- {
+
+ if (memSize > 0) {
inputs.add(memory.iterator());
}
-
- for ( int i = 0; i < size; i++ )
- {
+
+ for (int i = 0; i < size; i++) {
File spillFile = getSpillFiles().get(i);
- try
- {
+ try {
Iterator<E> irc = getInputIterator(spillFile);
inputs.add(irc);
- }
- catch (FileNotFoundException e)
- {
+ } catch (FileNotFoundException e) {
// Close any open streams before we throw an exception
- for (Iterator<E> it : inputs)
- {
+ for (Iterator<E> it : inputs) {
Iter.close(it);
}
-
+
throw new AtlasException("Cannot find one of the spill files", e);
}
}
-
+
SpillSortIterator<E> ssi = new SpillSortIterator<>(inputs, comparator);
registerCloseableIterator(ssi);
-
+
return ssi;
- }
- else
- {
- if (memSize > 0)
- {
+ } else {
+ if (memSize > 0) {
return memory.iterator();
- }
- else
- {
+ } else {
return Iter.nullIterator();
}
}
}
- private void preMerge()
- {
- if (getSpillFiles() == null || getSpillFiles().size() <= MAX_SPILL_FILES)
- {
- return ;
+ private void preMerge() {
+ if (getSpillFiles() == null || getSpillFiles().size() <= MAX_SPILL_FILES) {
+ return;
}
- try
- {
- while (getSpillFiles().size() > MAX_SPILL_FILES)
- {
- Sink<E> sink = serializationFactory.createSerializer(getSpillStream()) ;
- Iterator<E> ssi = iterator(MAX_SPILL_FILES) ;
- try
- {
- while (ssi.hasNext())
- {
- sink.send(ssi.next()) ;
+ try {
+ while (getSpillFiles().size() > MAX_SPILL_FILES) {
+ Sink<E> sink = serializationFactory.createSerializer(getSpillStream());
+ Iterator<E> ssi = iterator(MAX_SPILL_FILES);
+ try {
+ while (ssi.hasNext()) {
+ sink.send(ssi.next());
}
- }
- finally
- {
- Iter.close(ssi) ;
- sink.close() ;
+ } finally {
+ Iter.close(ssi);
+ sink.close();
}
- List<File> toRemove = new ArrayList<>(MAX_SPILL_FILES) ;
- for (int i = 0; i < MAX_SPILL_FILES; i++)
- {
- File file = getSpillFiles().get(i) ;
- file.delete() ;
- toRemove.add(file) ;
+ List<File> toRemove = new ArrayList<>(MAX_SPILL_FILES);
+ for (int i = 0; i < MAX_SPILL_FILES; i++) {
+ File file = getSpillFiles().get(i);
+ file.delete();
+ toRemove.add(file);
}
- getSpillFiles().removeAll(toRemove) ;
+ getSpillFiles().removeAll(toRemove);
- memory = new ArrayList<>() ;
+ memory = new ArrayList<>();
}
- }
- catch (IOException e)
- {
- throw new AtlasException(e) ;
+ } catch (IOException e) {
+ throw new AtlasException(e);
}
}
@Override
- public void close()
- {
- if (!closed)
- {
+ public void close() {
+ if (!closed) {
closeIterators();
deleteSpillFiles();
-
+
memory = null;
closed = true;
}
}
-
+
/**
* An iterator that handles getting the next tuple from the bag.
*/
- protected static class SpillSortIterator<T> implements Iterator<T>, Closeable
- {
+ protected static class SpillSortIterator<T> implements Iterator<T>, Closeable {
private final List<Iterator<T>> inputs;
private final Comparator<? super T> comp;
private final PriorityQueue<Item<T>> minHeap;
-
- public SpillSortIterator(List<Iterator<T>> inputs, Comparator<? super T> comp)
- {
+
+ public SpillSortIterator(List<Iterator<T>> inputs, Comparator<? super T> comp) {
this.inputs = inputs;
this.comp = comp;
this.minHeap = new PriorityQueue<>(inputs.size());
-
+
// Prime the heap
- for (int i=0; i<inputs.size(); i++)
- {
+ for (int i = 0; i < inputs.size(); i++) {
replaceItem(i);
}
}
-
- private void replaceItem(int index)
- {
+
+ private void replaceItem(int index) {
Iterator<T> it = inputs.get(index);
- if (it.hasNext())
- {
+ if (it.hasNext()) {
T tuple = it.next();
minHeap.add(new Item<>(index, tuple, comp));
}
}
@Override
- public boolean hasNext()
- {
+ public boolean hasNext() {
return (minHeap.peek() != null);
}
@Override
- public T next()
- {
- if (!hasNext())
- {
+ public T next() {
+ if (!hasNext()) {
throw new NoSuchElementException();
}
-
+
Item<T> curr = minHeap.poll();
// Read replacement item
replaceItem(curr.getIndex());
-
+
return curr.getTuple();
}
@Override
- public void remove()
- {
+ public void remove() {
throw new UnsupportedOperationException("SpillSortIterator.remove");
}
@Override
- public void close()
- {
- for (Iterator<T> it : inputs)
- {
+ public void close() {
+ for (Iterator<T> it : inputs) {
Iter.close(it);
}
}
-
- private final class Item<U> implements Comparable<Item<U>>
- {
+
+ private final class Item<U> implements Comparable<Item<U>> {
private final int index;
private final U tuple;
private final Comparator<? super U> c;
-
- public Item(int index, U tuple, Comparator<? super U> c)
- {
+
+ public Item(int index, U tuple, Comparator<? super U> c) {
this.index = index;
this.tuple = tuple;
this.c = c;
}
-
- public int getIndex()
- {
+
+ public int getIndex() {
return index;
}
-
- public U getTuple()
- {
+
+ public U getTuple() {
return tuple;
}
-
+
@Override
@SuppressWarnings("unchecked")
- public int compareTo(Item<U> o)
- {
- return (null != c) ? c.compare(tuple, o.getTuple()) : ((Comparable<U>)tuple).compareTo(o.getTuple());
+ public int compareTo(Item<U> o) {
+ return (null != c) ? c.compare(tuple, o.getTuple()) : ((Comparable<U>) tuple).compareTo(o.getTuple());
}
-
+
@SuppressWarnings("unchecked")
@Override
- public boolean equals(Object obj)
- {
- if (obj instanceof Item)
- {
- return compareTo((Item<U>)obj) == 0;
+ public boolean equals(Object obj) {
+ if (obj instanceof Item) {
+ return compareTo((Item<U>) obj) == 0;
}
-
+
return false;
}
-
+
@Override
- public int hashCode()
- {
+ public int hashCode() {
return tuple.hashCode();
}
}
-
+
}
}
http://git-wip-us.apache.org/repos/asf/jena/blob/cdc69f8e/jena-arq/src/main/java/org/apache/jena/sparql/engine/iterator/QueryIterSort.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/engine/iterator/QueryIterSort.java b/jena-arq/src/main/java/org/apache/jena/sparql/engine/iterator/QueryIterSort.java
index edaaa5c..3e565db 100644
--- a/jena-arq/src/main/java/org/apache/jena/sparql/engine/iterator/QueryIterSort.java
+++ b/jena-arq/src/main/java/org/apache/jena/sparql/engine/iterator/QueryIterSort.java
@@ -18,23 +18,23 @@
package org.apache.jena.sparql.engine.iterator;
-import java.util.Comparator ;
-import java.util.Iterator ;
-import java.util.List ;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
-import org.apache.jena.atlas.data.BagFactory ;
-import org.apache.jena.atlas.data.SortedDataBag ;
-import org.apache.jena.atlas.data.ThresholdPolicy ;
-import org.apache.jena.atlas.data.ThresholdPolicyFactory ;
-import org.apache.jena.atlas.iterator.IteratorDelayedInitialization ;
-import org.apache.jena.atlas.lib.Closeable ;
-import org.apache.jena.query.QueryCancelledException ;
-import org.apache.jena.query.SortCondition ;
-import org.apache.jena.riot.system.SerializationFactoryFinder ;
-import org.apache.jena.sparql.engine.ExecutionContext ;
-import org.apache.jena.sparql.engine.QueryIterator ;
-import org.apache.jena.sparql.engine.binding.Binding ;
-import org.apache.jena.sparql.engine.binding.BindingComparator ;
+import org.apache.jena.atlas.data.BagFactory;
+import org.apache.jena.atlas.data.SortedDataBag;
+import org.apache.jena.atlas.data.ThresholdPolicy;
+import org.apache.jena.atlas.data.ThresholdPolicyFactory;
+import org.apache.jena.atlas.iterator.IteratorDelayedInitialization;
+import org.apache.jena.atlas.lib.Closeable;
+import org.apache.jena.query.QueryCancelledException;
+import org.apache.jena.query.SortCondition;
+import org.apache.jena.riot.system.SerializationFactoryFinder;
+import org.apache.jena.sparql.engine.ExecutionContext;
+import org.apache.jena.sparql.engine.QueryIterator;
+import org.apache.jena.sparql.engine.binding.Binding;
+import org.apache.jena.sparql.engine.binding.BindingComparator;
/**
* Sort a query iterator. The sort will happen in-memory unless the size of the
@@ -44,61 +44,60 @@ import org.apache.jena.sparql.engine.binding.BindingComparator ;
*/
public class QueryIterSort extends QueryIterPlainWrapper {
- private final QueryIterator embeddedIterator ; // Keep a record of the
- // underlying source for
- // .cancel.
- final SortedDataBag<Binding> db ;
+ private final QueryIterator embeddedIterator; // Keep a record of the
+ // underlying source for
+ // .cancel.
+ final SortedDataBag<Binding> db;
- public QueryIterSort(QueryIterator qIter, List<SortCondition> conditions, ExecutionContext context) {
- this(qIter, new BindingComparator(conditions, context), context) ;
- }
+ public QueryIterSort(QueryIterator qIter, List<SortCondition> conditions, ExecutionContext context) {
+ this(qIter, new BindingComparator(conditions, context), context);
+ }
- public QueryIterSort(final QueryIterator qIter, final Comparator<Binding> comparator,
- final ExecutionContext context) {
- super(null, context) ;
- this.embeddedIterator = qIter ;
+ public QueryIterSort(final QueryIterator qIter, final Comparator<Binding> comparator,
+ final ExecutionContext context) {
+ super(null, context);
+ this.embeddedIterator = qIter;
- ThresholdPolicy<Binding> policy = ThresholdPolicyFactory.policyFromContext(context.getContext()) ;
- this.db = BagFactory.newSortedBag(policy, SerializationFactoryFinder.bindingSerializationFactory(),
- comparator) ;
+ ThresholdPolicy<Binding> policy = ThresholdPolicyFactory.policyFromContext(context.getContext());
+ this.db = BagFactory.newSortedBag(policy, SerializationFactoryFinder.bindingSerializationFactory(), comparator);
- this.setIterator(new SortedBindingIterator(qIter)) ;
- }
+ this.setIterator(new SortedBindingIterator(qIter));
+ }
- @Override
- public void requestCancel() {
- this.db.cancel() ;
- this.embeddedIterator.cancel() ;
- super.requestCancel() ;
- }
+ @Override
+ public void requestCancel() {
+ this.db.cancel();
+ this.embeddedIterator.cancel();
+ super.requestCancel();
+ }
- private class SortedBindingIterator extends IteratorDelayedInitialization<Binding> implements Closeable {
- private final QueryIterator qIter ;
+ private class SortedBindingIterator extends IteratorDelayedInitialization<Binding> implements Closeable {
+ private final QueryIterator qIter;
- public SortedBindingIterator(final QueryIterator qIter) {
- this.qIter = qIter ;
- }
+ public SortedBindingIterator(final QueryIterator qIter) {
+ this.qIter = qIter;
+ }
- @Override
- protected Iterator<Binding> initializeIterator() {
- try {
- db.addAll(qIter) ;
- return db.iterator() ;
- }
- // Should we catch other exceptions too? Theoretically
- // the user should be using this
- // iterator in a try/finally block, and thus will call
- // close() themselves.
- catch (QueryCancelledException e) {
- close() ;
- throw e ;
- }
- }
+ @Override
+ protected Iterator<Binding> initializeIterator() {
+ try {
+ db.addAll(qIter);
+ return db.iterator();
+ }
+ // Should we catch other exceptions too? Theoretically
+ // the user should be using this
+ // iterator in a try/finally block, and thus will call
+ // close() themselves.
+ catch (QueryCancelledException e) {
+ close();
+ throw e;
+ }
+ }
- @Override
- public void close() {
- db.close() ;
- }
- }
+ @Override
+ public void close() {
+ db.close();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/jena/blob/cdc69f8e/jena-arq/src/test/java/org/apache/jena/sparql/engine/iterator/TestSortedDataBagCancellation.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/test/java/org/apache/jena/sparql/engine/iterator/TestSortedDataBagCancellation.java b/jena-arq/src/test/java/org/apache/jena/sparql/engine/iterator/TestSortedDataBagCancellation.java
index 2220a39..0109ba8 100644
--- a/jena-arq/src/test/java/org/apache/jena/sparql/engine/iterator/TestSortedDataBagCancellation.java
+++ b/jena-arq/src/test/java/org/apache/jena/sparql/engine/iterator/TestSortedDataBagCancellation.java
@@ -55,191 +55,179 @@ import junit.framework.TestCase;
test the SortedDataBag correctly?
*/
-public class TestSortedDataBagCancellation extends TestCase
-{
-
- static final BindingMap b1 = BindingFactory.create();
- static final BindingMap b2 = BindingFactory.create();
- static final BindingMap b3 = BindingFactory.create();
- static final BindingMap b4 = BindingFactory.create();
-
- static
- {
- b1.add(Var.alloc("v1"), NodeFactory.createLiteral("alpha"));
- b2.add(Var.alloc("v2"), NodeFactory.createLiteral("beta"));
- b3.add(Var.alloc("v3"), NodeFactory.createLiteral("gamma"));
- b4.add(Var.alloc("v4"), NodeFactory.createLiteral("delta"));
- }
-
- final Context params = new Context();
-
- final OpExecutorFactory factory = new OpExecutorFactory() {
-
- @Override public OpExecutor create(ExecutionContext ec) {
- throw new UnsupportedOperationException();
- }
- };
-
- final Graph activeGraph = new GraphMemPlain();
-
- final DatasetGraph dataset = DatasetGraphFactory.create();
-
- final List<SortCondition> conditions = new ArrayList<SortCondition>();
-
- final ExecutionContext ec = new ExecutionContext
- (
- params,
- activeGraph,
- dataset,
- factory
- );
-
- final BindingComparator base_bc = new BindingComparator(conditions, ec);
- final SpecialBindingComparator bc = new SpecialBindingComparator(base_bc, ec);
-
- QueryIteratorItems baseIter = new QueryIteratorItems();
-
- {
- baseIter.bindings.add(b1);
- baseIter.bindings.add(b2);
- baseIter.bindings.add(b3);
- baseIter.bindings.add(b4);;
- }
-
- QueryIterSort qs = new QueryIterSort(baseIter, bc, ec);
-
- /**
- In this test, the iterator is not cancelled;
- all items should be delivered, and the compare
- count should be monotonic-nondecreasing.
- */
- @Test public void testIteratesToCompletion()
- {
- int count = 0;
- assertEquals(0, count = bc.count);
- Set<Binding> results = new HashSet<Binding>();
-
- assertTrue(qs.hasNext());
- assertTrue(bc.count >= count); count = bc.count;
- results.add(qs.next());
-
- assertTrue(qs.hasNext());
- assertTrue(bc.count >= count); count = bc.count;
- results.add(qs.next());
-
- assertTrue(qs.hasNext());
- assertTrue(bc.count >= count); count = bc.count;
- results.add(qs.next());
-
- assertTrue(qs.hasNext());
- assertTrue(bc.count >= count); count = bc.count;
- results.add(qs.next());
-
- assertFalse(qs.hasNext());
-
- Set<Binding> expected = new HashSet<Binding>();
- expected.add(b1);
- expected.add(b2);
- expected.add(b3);
- expected.add(b4);
-
- assertEquals(expected, results);
- }
-
- /**
- In this test, the iterator is cancelled after
- the first result is delivered. Any attempt to
- run the comparator should be trapped an exception
- thrown. The iterators should deliver no more values.
- */
- @Test public void testIteratesWithCancellation()
- {
- int count = 0;
- assertEquals(0, count = bc.count);
- Set<Binding> results = new HashSet<Binding>();
-
- assertTrue(qs.hasNext());
- assertTrue(bc.count >= count); count = bc.count;
- results.add(qs.next());
-
- qs.cancel();
- try
- {
- bc.noMoreCalls();
- while (qs.hasNext()) qs.next();
- }
- catch (QueryCancelledException qe)
- {
- assertTrue(qs.db.isCancelled());
- return;
-
- }
- fail("query was not cancelled");
- }
-
- /**
- A QueryIterator that delivers the elements of a list of bindings.
- */
- private static final class QueryIteratorItems extends QueryIteratorBase
- {
- List<Binding> bindings = new ArrayList<Binding>();
- int index = 0;
-
- @Override
- public void output(IndentedWriter out, SerializationContext sCxt) {
- out.write("a QueryIteratorItems");
- }
-
- @Override
- protected boolean hasNextBinding()
- {
- return index < bindings.size();
- }
-
- @Override
- protected Binding moveToNextBinding()
- {
- index += 1;
- return bindings.get(index - 1);
- }
-
- @Override
- protected void closeIterator()
- {
- }
-
- @Override
- protected void requestCancel()
- {
- }
- }
-
- /**
- A BindingComparator that wraps another BindingComparator
- and counts how many times compare() is called.
- */
- static class SpecialBindingComparator extends BindingComparator
- {
- final BindingComparator base;
- int count = 0;
- boolean trapCompare = false;
-
- public SpecialBindingComparator(BindingComparator base, ExecutionContext ec)
- {
- super(base.getConditions(), ec);
- this.base = base;
- }
-
- public void noMoreCalls() {
- trapCompare = true;
- }
-
- @Override
- public int compare(Binding x, Binding y)
- {
- if (trapCompare) throw new RuntimeException("compare() no longer allowed.");
- count += 1;
- return base.compare(x, y);
- }
- }
+public class TestSortedDataBagCancellation extends TestCase {
+
+ static final BindingMap b1 = BindingFactory.create();
+ static final BindingMap b2 = BindingFactory.create();
+ static final BindingMap b3 = BindingFactory.create();
+ static final BindingMap b4 = BindingFactory.create();
+
+ static {
+ b1.add(Var.alloc("v1"), NodeFactory.createLiteral("alpha"));
+ b2.add(Var.alloc("v2"), NodeFactory.createLiteral("beta"));
+ b3.add(Var.alloc("v3"), NodeFactory.createLiteral("gamma"));
+ b4.add(Var.alloc("v4"), NodeFactory.createLiteral("delta"));
+ }
+
+ final Context params = new Context();
+
+ final OpExecutorFactory factory = new OpExecutorFactory() {
+
+ @Override
+ public OpExecutor create(ExecutionContext ec) {
+ throw new UnsupportedOperationException();
+ }
+ };
+
+ final Graph activeGraph = new GraphMemPlain();
+
+ final DatasetGraph dataset = DatasetGraphFactory.create();
+
+ final List<SortCondition> conditions = new ArrayList<SortCondition>();
+
+ final ExecutionContext ec = new ExecutionContext(params, activeGraph, dataset, factory);
+
+ final BindingComparator base_bc = new BindingComparator(conditions, ec);
+ final SpecialBindingComparator bc = new SpecialBindingComparator(base_bc, ec);
+
+ QueryIteratorItems baseIter = new QueryIteratorItems();
+
+ {
+ baseIter.bindings.add(b1);
+ baseIter.bindings.add(b2);
+ baseIter.bindings.add(b3);
+ baseIter.bindings.add(b4);
+ ;
+ }
+
+ QueryIterSort qs = new QueryIterSort(baseIter, bc, ec);
+
+ /**
+ * In this test, the iterator is not cancelled; all items should be
+ * delivered, and the compare count should be monotonic-nondecreasing.
+ */
+ @Test
+ public void testIteratesToCompletion() {
+ int count = 0;
+ assertEquals(0, count = bc.count);
+ Set<Binding> results = new HashSet<Binding>();
+
+ assertTrue(qs.hasNext());
+ assertTrue(bc.count >= count);
+ count = bc.count;
+ results.add(qs.next());
+
+ assertTrue(qs.hasNext());
+ assertTrue(bc.count >= count);
+ count = bc.count;
+ results.add(qs.next());
+
+ assertTrue(qs.hasNext());
+ assertTrue(bc.count >= count);
+ count = bc.count;
+ results.add(qs.next());
+
+ assertTrue(qs.hasNext());
+ assertTrue(bc.count >= count);
+ count = bc.count;
+ results.add(qs.next());
+
+ assertFalse(qs.hasNext());
+
+ Set<Binding> expected = new HashSet<Binding>();
+ expected.add(b1);
+ expected.add(b2);
+ expected.add(b3);
+ expected.add(b4);
+
+ assertEquals(expected, results);
+ }
+
+ /**
+ * In this test, the iterator is cancelled after the first result is
+ * delivered. Any attempt to run the comparator should be trapped an
+ * exception thrown. The iterators should deliver no more values.
+ */
+ @Test
+ public void testIteratesWithCancellation() {
+ int count = 0;
+ assertEquals(0, count = bc.count);
+ Set<Binding> results = new HashSet<Binding>();
+
+ assertTrue(qs.hasNext());
+ assertTrue(bc.count >= count);
+ count = bc.count;
+ results.add(qs.next());
+
+ qs.cancel();
+ try {
+ bc.noMoreCalls();
+ while (qs.hasNext())
+ qs.next();
+ } catch (QueryCancelledException qe) {
+ assertTrue(qs.db.isCancelled());
+ return;
+
+ }
+ fail("query was not cancelled");
+ }
+
+ /**
+ * A QueryIterator that delivers the elements of a list of bindings.
+ */
+ private static final class QueryIteratorItems extends QueryIteratorBase {
+ List<Binding> bindings = new ArrayList<Binding>();
+ int index = 0;
+
+ @Override
+ public void output(IndentedWriter out, SerializationContext sCxt) {
+ out.write("a QueryIteratorItems");
+ }
+
+ @Override
+ protected boolean hasNextBinding() {
+ return index < bindings.size();
+ }
+
+ @Override
+ protected Binding moveToNextBinding() {
+ index += 1;
+ return bindings.get(index - 1);
+ }
+
+ @Override
+ protected void closeIterator() {
+ }
+
+ @Override
+ protected void requestCancel() {
+ }
+ }
+
+ /**
+ * A BindingComparator that wraps another BindingComparator and counts how
+ * many times compare() is called.
+ */
+ static class SpecialBindingComparator extends BindingComparator {
+ final BindingComparator base;
+ int count = 0;
+ boolean trapCompare = false;
+
+ public SpecialBindingComparator(BindingComparator base, ExecutionContext ec) {
+ super(base.getConditions(), ec);
+ this.base = base;
+ }
+
+ public void noMoreCalls() {
+ trapCompare = true;
+ }
+
+ @Override
+ public int compare(Binding x, Binding y) {
+ if (trapCompare)
+ throw new RuntimeException("compare() no longer allowed.");
+ count += 1;
+ return base.compare(x, y);
+ }
+ }
}
\ No newline at end of file
[07/11] jena git commit: Draft for testing cancellable sort()s.
Posted by an...@apache.org.
Draft for testing cancellable sort()s.
The overall idea is to set up a query iterator that delivers
some bunch of values but after only a few (== 1) values have
been pulled the iterator is cancelled; we then show that
no more values can be pulled from the iterator and that the
sort has been abandoned.
This is a draft.
Project: http://git-wip-us.apache.org/repos/asf/jena/repo
Commit: http://git-wip-us.apache.org/repos/asf/jena/commit/512b2a5e
Tree: http://git-wip-us.apache.org/repos/asf/jena/tree/512b2a5e
Diff: http://git-wip-us.apache.org/repos/asf/jena/diff/512b2a5e
Branch: refs/heads/master
Commit: 512b2a5e0e470732b8e3405c124df61d91a7cfab
Parents: a3f3fdc
Author: chris dollin <eh...@gmail.com>
Authored: Fri Jul 22 16:10:07 2016 +0100
Committer: chris dollin <eh...@gmail.com>
Committed: Fri Jul 22 16:10:07 2016 +0100
----------------------------------------------------------------------
.../apache/jena/atlas/data/SortedDataBag.java | 17 +-
.../iterator/TestSortedDataBagCancellation.java | 244 +++++++++++++++++++
2 files changed, 257 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/jena/blob/512b2a5e/jena-arq/src/main/java/org/apache/jena/atlas/data/SortedDataBag.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/main/java/org/apache/jena/atlas/data/SortedDataBag.java b/jena-arq/src/main/java/org/apache/jena/atlas/data/SortedDataBag.java
index b248006..b7c74ae 100644
--- a/jena-arq/src/main/java/org/apache/jena/atlas/data/SortedDataBag.java
+++ b/jena-arq/src/main/java/org/apache/jena/atlas/data/SortedDataBag.java
@@ -83,14 +83,23 @@ public class SortedDataBag<E> extends AbstractDataBag<E>
this.comparator = new AbortableComparator<E>(comparator);
}
- /**
- cancel arranges that further comparisons using the supplied
- comparator will abandon the sort in progress.
- */
+ /**
+ cancel arranges that further comparisons using the supplied
+ comparator will abandon the sort in progress.
+ */
public void cancel()
{
comparator.cancel();
}
+
+ /**
+ isCancelled is true iff cancel has been called on this
+ bags comparator. (Used in testing.)
+ */
+ public boolean isCancelled()
+ {
+ return comparator.cancelled;
+ }
protected void checkClosed()
{
http://git-wip-us.apache.org/repos/asf/jena/blob/512b2a5e/jena-arq/src/test/java/org/apache/jena/sparql/engine/iterator/TestSortedDataBagCancellation.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/test/java/org/apache/jena/sparql/engine/iterator/TestSortedDataBagCancellation.java b/jena-arq/src/test/java/org/apache/jena/sparql/engine/iterator/TestSortedDataBagCancellation.java
new file mode 100644
index 0000000..c29552e
--- /dev/null
+++ b/jena-arq/src/test/java/org/apache/jena/sparql/engine/iterator/TestSortedDataBagCancellation.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jena.sparql.engine.iterator;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.jena.atlas.io.IndentedWriter;
+import org.apache.jena.graph.Graph;
+import org.apache.jena.graph.NodeFactory;
+import org.apache.jena.query.QueryCancelledException;
+import org.apache.jena.query.SortCondition;
+import org.apache.jena.sparql.core.DatasetGraph;
+import org.apache.jena.sparql.core.DatasetGraphFactory;
+import org.apache.jena.sparql.core.Var;
+import org.apache.jena.sparql.engine.ExecutionContext;
+import org.apache.jena.sparql.engine.binding.Binding;
+import org.apache.jena.sparql.engine.binding.BindingComparator;
+import org.apache.jena.sparql.engine.binding.BindingFactory;
+import org.apache.jena.sparql.engine.binding.BindingMap;
+import org.apache.jena.sparql.engine.iterator.QueryIterSort;
+import org.apache.jena.sparql.engine.iterator.QueryIteratorBase;
+import org.apache.jena.sparql.engine.main.OpExecutor;
+import org.apache.jena.sparql.engine.main.OpExecutorFactory;
+import org.apache.jena.sparql.graph.GraphMemPlain;
+import org.apache.jena.sparql.serializer.SerializationContext;
+import org.apache.jena.sparql.util.Context;
+import org.junit.Test;
+
+import junit.framework.TestCase;
+
+/*
+ Test that a SortedDataBag used inside a QueryIterSort
+ does indeed cut off when cancelled.
+
+ This is horribly clunky because of the effort of
+ setting up. Maybe we should instead be content to
+ test the SortedDataBag correctly?
+
+*/
+public class TestSortedDataBagCancellation extends TestCase
+{
+
+ static final BindingMap b1 = BindingFactory.create();
+ static final BindingMap b2 = BindingFactory.create();
+ static final BindingMap b3 = BindingFactory.create();
+ static final BindingMap b4 = BindingFactory.create();
+
+ static
+ {
+ b1.add(Var.alloc("v1"), NodeFactory.createLiteral("alpha"));
+ b2.add(Var.alloc("v2"), NodeFactory.createLiteral("beta"));
+ b3.add(Var.alloc("v3"), NodeFactory.createLiteral("gamma"));
+ b4.add(Var.alloc("v4"), NodeFactory.createLiteral("delta"));
+ }
+
+ final Context params = new Context();
+
+ final OpExecutorFactory factory = new OpExecutorFactory() {
+
+ @Override public OpExecutor create(ExecutionContext ec) {
+ throw new UnsupportedOperationException();
+ }
+ };
+
+ final Graph activeGraph = new GraphMemPlain();
+
+ final DatasetGraph dataset = DatasetGraphFactory.create();
+
+ final List<SortCondition> conditions = new ArrayList<SortCondition>();
+
+ final ExecutionContext ec = new ExecutionContext
+ (
+ params,
+ activeGraph,
+ dataset,
+ factory
+ );
+
+ final BindingComparator base_bc = new BindingComparator(conditions, ec);
+ final SpecialBindingComparator bc = new SpecialBindingComparator(base_bc, ec);
+
+ QueryIteratorItems baseIter = new QueryIteratorItems();
+
+ {
+ baseIter.bindings.add(b1);
+ baseIter.bindings.add(b2);
+ baseIter.bindings.add(b3);
+ baseIter.bindings.add(b4);;
+ }
+
+ QueryIterSort qs = new QueryIterSort(baseIter, bc, ec);
+
+ /**
+ In this test, the iterator is not cancelled;
+ all items should be delivered, and the compare
+ count should be monotonic-nondecreasing.
+ */
+ @Test public void testIteratesToCompletion()
+ {
+ int count = 0;
+ assertEquals(0, count = bc.count);
+ Set<Binding> results = new HashSet<Binding>();
+
+ assertTrue(qs.hasNext());
+ assertTrue(bc.count >= count); count = bc.count;
+ results.add(qs.next());
+
+ assertTrue(qs.hasNext());
+ assertTrue(bc.count >= count); count = bc.count;
+ results.add(qs.next());
+
+ assertTrue(qs.hasNext());
+ assertTrue(bc.count >= count); count = bc.count;
+ results.add(qs.next());
+
+ assertTrue(qs.hasNext());
+ assertTrue(bc.count >= count); count = bc.count;
+ results.add(qs.next());
+
+ assertFalse(qs.hasNext());
+
+ Set<Binding> expected = new HashSet<Binding>();
+ expected.add(b1);
+ expected.add(b2);
+ expected.add(b3);
+ expected.add(b4);
+
+ assertEquals(expected, results);
+ }
+
+ /**
+ In this test, the iterator is cancelled after
+ the first result is delivered. Any attempt to
+ run the comparator should be trapped an exception
+ thrown. The iterators should deliver no more values.
+ */
+ @Test public void testIteratesWithCancellation()
+ {
+ int count = 0;
+ assertEquals(0, count = bc.count);
+ Set<Binding> results = new HashSet<Binding>();
+
+ assertTrue(qs.hasNext());
+ assertTrue(bc.count >= count); count = bc.count;
+ results.add(qs.next());
+
+ qs.cancel();
+ try
+ {
+ bc.noMoreCalls();
+ while (qs.hasNext()) qs.next();
+ }
+ catch (QueryCancelledException qe)
+ {
+ assertTrue(qs.db.isCancelled());
+ return;
+
+ }
+ fail("query was not cancelled");
+ }
+
+ /**
+ A QueryIterator that delivers the elements of a list of bindings.
+ */
+ private static final class QueryIteratorItems extends QueryIteratorBase
+ {
+ List<Binding> bindings = new ArrayList<Binding>();
+ int index = 0;
+
+ @Override
+ public void output(IndentedWriter out, SerializationContext sCxt) {
+ out.write("a QueryIteratorItems");
+ }
+
+ @Override
+ protected boolean hasNextBinding()
+ {
+ return index < bindings.size();
+ }
+
+ @Override
+ protected Binding moveToNextBinding()
+ {
+ index += 1;
+ return bindings.get(index - 1);
+ }
+
+ @Override
+ protected void closeIterator()
+ {
+ }
+
+ @Override
+ protected void requestCancel()
+ {
+ }
+ }
+
+ /**
+ A BindingComparator that wraps another BindingComparator
+ and counts how many times compare() is called.
+ */
+ static class SpecialBindingComparator extends BindingComparator
+ {
+ final BindingComparator base;
+ int count = 0;
+ boolean trapCompare = false;
+
+ public SpecialBindingComparator(BindingComparator base, ExecutionContext ec)
+ {
+ super(base.getConditions(), ec);
+ this.base = base;
+ }
+
+ public void noMoreCalls() {
+ trapCompare = true;
+ }
+
+ public int compare(Binding x, Binding y)
+ {
+ if (trapCompare) throw new RuntimeException("compare() no longer allowed.");
+ count += 1;
+ return base.compare(x, y);
+ }
+ }
+}
\ No newline at end of file