You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@jena.apache.org by Paolo Castagna <ca...@googlemail.com> on 2012/03/09 15:47:39 UTC
Re: svn commit: r1298851 - /incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/SortedDataBag.java
Stephen, please have a look at this, give it a spin and let me know if we can
close JENA-157.
I'd like to find a better way to test this, perhaps MAX_SPILL_FILES should be
protected, so we can change it and add a proper unit test.
What do you think?
Paolo
castagna@apache.org wrote:
> Author: castagna
> Date: Fri Mar 9 14:38:53 2012
> New Revision: 1298851
>
> URL: http://svn.apache.org/viewvc?rev=1298851&view=rev
> Log:
> JENA-157 - This adds a pre-merge phase to SortedDataBag to merge files 100 at the time max. If more than 100 files need to be merged, it is done in multiple rounds.
>
> Modified:
> incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/SortedDataBag.java
>
> Modified: incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/SortedDataBag.java
> URL: http://svn.apache.org/viewvc/incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/SortedDataBag.java?rev=1298851&r1=1298850&r2=1298851&view=diff
> ==============================================================================
> --- incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/SortedDataBag.java (original)
> +++ incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/SortedDataBag.java Fri Mar 9 14:38:53 2012
> @@ -62,6 +62,8 @@ import org.openjena.atlas.lib.Sink ;
> */
> public class SortedDataBag<E> extends AbstractDataBag<E>
> {
> + private static final int MAX_SPILL_FILES = 100 ; // this is the maximum number of files to merge at the same time
> +
> protected final ThresholdPolicy<E> policy;
> protected final SerializationFactory<E> serializationFactory;
> protected final Comparator<? super E> comparator;
> @@ -177,8 +179,15 @@ public class SortedDataBag<E> extends Ab
> * @return an Iterator
> */
> @Override
> + public Iterator<E> iterator()
> + {
> + preMerge();
> +
> + return iterator(getSpillFiles().size());
> + }
> +
> @SuppressWarnings({ "unchecked", "rawtypes" })
> - public Iterator<E> iterator()
> + private Iterator<E> iterator(int size)
> {
> checkClosed();
>
> @@ -197,15 +206,16 @@ public class SortedDataBag<E> extends Ab
>
> if (spilled)
> {
> - List<Iterator<E>> inputs = new ArrayList<Iterator<E>>();
> + List<Iterator<E>> inputs = new ArrayList<Iterator<E>>(size + (memSize > 0 ? 1 : 0));
>
> if (memSize > 0)
> {
> inputs.add(memory.iterator());
> }
>
> - for (File spillFile : getSpillFiles())
> + for ( int i = 0; i < size; i++ )
> {
> + File spillFile = getSpillFiles().get(i);
> try
> {
> Iterator<E> irc = getInputIterator(spillFile);
> @@ -241,6 +251,38 @@ public class SortedDataBag<E> extends Ab
> }
> }
>
> + private void preMerge() {
> + if (getSpillFiles() == null || getSpillFiles().size() <= MAX_SPILL_FILES) { return; }
> +
> + try {
> + while ( getSpillFiles().size() > MAX_SPILL_FILES ) {
> + Sink<E> sink = serializationFactory.createSerializer(getSpillStream()) ;
> + Iterator<E> ssi = iterator(MAX_SPILL_FILES) ;
> + try {
> + while ( ssi.hasNext() ) {
> + sink.send( ssi.next() );
> + }
> + } finally {
> + Iter.close(ssi) ;
> + sink.close() ;
> + }
> +
> + List<File> toRemove = new ArrayList<File>(MAX_SPILL_FILES) ;
> + for ( int i = 0; i < MAX_SPILL_FILES; i++ ) {
> + File file = getSpillFiles().get(i) ;
> + file.delete() ;
> + toRemove.add(file) ;
> + }
> +
> + getSpillFiles().removeAll(toRemove) ;
> +
> + memory = new ArrayList<E>() ;
> + }
> + } catch (IOException e) {
> + throw new AtlasException(e) ;
> + }
> + }
> +
> @Override
> public void close()
> {
>
>
Re: svn commit: r1298851 - /incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/SortedDataBag.java
Posted by Stephen Allen <sa...@apache.org>.
Hi Paolo,
Looks good. I went ahead and changed MAX_SPILL_FILES to protected and
wrote a few unit tests. I'll close the JIRA as well.
-Stephen
On Fri, Mar 9, 2012 at 6:47 AM, Paolo Castagna
<ca...@googlemail.com> wrote:
> Stephen, please have a look at this, give it a spin and let me know if we can
> close JENA-157.
>
> I'd like to find a better way to test this, perhaps MAX_SPILL_FILES should be
> protected, so we can change it and add a proper unit test.
>
> What do you think?
>
> Paolo
>
> castagna@apache.org wrote:
>> Author: castagna
>> Date: Fri Mar 9 14:38:53 2012
>> New Revision: 1298851
>>
>> URL: http://svn.apache.org/viewvc?rev=1298851&view=rev
>> Log:
>> JENA-157 - This adds a pre-merge phase to SortedDataBag to merge files 100 at the time max. If more than 100 files need to be merged, it is done in multiple rounds.
>>
>> Modified:
>> incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/SortedDataBag.java
>>
>> Modified: incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/SortedDataBag.java
>> URL: http://svn.apache.org/viewvc/incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/SortedDataBag.java?rev=1298851&r1=1298850&r2=1298851&view=diff
>> ==============================================================================
>> --- incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/SortedDataBag.java (original)
>> +++ incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/SortedDataBag.java Fri Mar 9 14:38:53 2012
>> @@ -62,6 +62,8 @@ import org.openjena.atlas.lib.Sink ;
>> */
>> public class SortedDataBag<E> extends AbstractDataBag<E>
>> {
>> + private static final int MAX_SPILL_FILES = 100 ; // this is the maximum number of files to merge at the same time
>> +
>> protected final ThresholdPolicy<E> policy;
>> protected final SerializationFactory<E> serializationFactory;
>> protected final Comparator<? super E> comparator;
>> @@ -177,8 +179,15 @@ public class SortedDataBag<E> extends Ab
>> * @return an Iterator
>> */
>> @Override
>> + public Iterator<E> iterator()
>> + {
>> + preMerge();
>> +
>> + return iterator(getSpillFiles().size());
>> + }
>> +
>> @SuppressWarnings({ "unchecked", "rawtypes" })
>> - public Iterator<E> iterator()
>> + private Iterator<E> iterator(int size)
>> {
>> checkClosed();
>>
>> @@ -197,15 +206,16 @@ public class SortedDataBag<E> extends Ab
>>
>> if (spilled)
>> {
>> - List<Iterator<E>> inputs = new ArrayList<Iterator<E>>();
>> + List<Iterator<E>> inputs = new ArrayList<Iterator<E>>(size + (memSize > 0 ? 1 : 0));
>>
>> if (memSize > 0)
>> {
>> inputs.add(memory.iterator());
>> }
>>
>> - for (File spillFile : getSpillFiles())
>> + for ( int i = 0; i < size; i++ )
>> {
>> + File spillFile = getSpillFiles().get(i);
>> try
>> {
>> Iterator<E> irc = getInputIterator(spillFile);
>> @@ -241,6 +251,38 @@ public class SortedDataBag<E> extends Ab
>> }
>> }
>>
>> + private void preMerge() {
>> + if (getSpillFiles() == null || getSpillFiles().size() <= MAX_SPILL_FILES) { return; }
>> +
>> + try {
>> + while ( getSpillFiles().size() > MAX_SPILL_FILES ) {
>> + Sink<E> sink = serializationFactory.createSerializer(getSpillStream()) ;
>> + Iterator<E> ssi = iterator(MAX_SPILL_FILES) ;
>> + try {
>> + while ( ssi.hasNext() ) {
>> + sink.send( ssi.next() );
>> + }
>> + } finally {
>> + Iter.close(ssi) ;
>> + sink.close() ;
>> + }
>> +
>> + List<File> toRemove = new ArrayList<File>(MAX_SPILL_FILES) ;
>> + for ( int i = 0; i < MAX_SPILL_FILES; i++ ) {
>> + File file = getSpillFiles().get(i) ;
>> + file.delete() ;
>> + toRemove.add(file) ;
>> + }
>> +
>> + getSpillFiles().removeAll(toRemove) ;
>> +
>> + memory = new ArrayList<E>() ;
>> + }
>> + } catch (IOException e) {
>> + throw new AtlasException(e) ;
>> + }
>> + }
>> +
>> @Override
>> public void close()
>> {
>>
>>
>