You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@jena.apache.org by Paolo Castagna <ca...@googlemail.com> on 2012/03/09 15:47:39 UTC

Re: svn commit: r1298851 - /incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/SortedDataBag.java

Stephen, please have a look at this, give it a spin and let me know if we can
close JENA-157.

I'd like to find a better way to test this, perhaps MAX_SPILL_FILES should be
protected, so we can change it and add a proper unit test.

What do you think?

Paolo

castagna@apache.org wrote:
> Author: castagna
> Date: Fri Mar  9 14:38:53 2012
> New Revision: 1298851
> 
> URL: http://svn.apache.org/viewvc?rev=1298851&view=rev
> Log:
> JENA-157 - This adds a pre-merge phase to SortedDataBag to merge files 100 at the time max. If more than 100 files need to be merged, it is done in multiple rounds.
> 
> Modified:
>     incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/SortedDataBag.java
> 
> Modified: incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/SortedDataBag.java
> URL: http://svn.apache.org/viewvc/incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/SortedDataBag.java?rev=1298851&r1=1298850&r2=1298851&view=diff
> ==============================================================================
> --- incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/SortedDataBag.java (original)
> +++ incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/SortedDataBag.java Fri Mar  9 14:38:53 2012
> @@ -62,6 +62,8 @@ import org.openjena.atlas.lib.Sink ;
>   */
>  public class SortedDataBag<E> extends AbstractDataBag<E>
>  {
> +    private static final int MAX_SPILL_FILES = 100 ; // this is the maximum number of files to merge at the same time
> +
>      protected final ThresholdPolicy<E> policy;
>      protected final SerializationFactory<E> serializationFactory;
>      protected final Comparator<? super E> comparator;
> @@ -177,8 +179,15 @@ public class SortedDataBag<E> extends Ab
>       * @return an Iterator
>       */
>      @Override
> +	public Iterator<E> iterator()
> +    {
> +        preMerge();
> +
> +        return iterator(getSpillFiles().size());
> +    }
> +
>      @SuppressWarnings({ "unchecked", "rawtypes" })
> -    public Iterator<E> iterator()
> +    private Iterator<E> iterator(int size)
>      {
>          checkClosed();
>          
> @@ -197,15 +206,16 @@ public class SortedDataBag<E> extends Ab
>          
>          if (spilled)
>          {
> -            List<Iterator<E>> inputs = new ArrayList<Iterator<E>>();
> +            List<Iterator<E>> inputs = new ArrayList<Iterator<E>>(size + (memSize > 0 ? 1 : 0));
>                          
>              if (memSize > 0)
>              {
>                  inputs.add(memory.iterator());
>              }
>              
> -            for (File spillFile : getSpillFiles())
> +            for ( int i = 0; i < size; i++ )
>              {
> +                File spillFile = getSpillFiles().get(i);
>                  try
>                  {
>                      Iterator<E> irc = getInputIterator(spillFile);
> @@ -241,6 +251,38 @@ public class SortedDataBag<E> extends Ab
>          }
>      }
>      
> +    private void preMerge() {
> +        if (getSpillFiles() == null || getSpillFiles().size() <= MAX_SPILL_FILES) { return; }
> +
> +        try {
> +            while ( getSpillFiles().size() > MAX_SPILL_FILES ) {
> +                Sink<E> sink = serializationFactory.createSerializer(getSpillStream()) ;
> +                Iterator<E> ssi = iterator(MAX_SPILL_FILES) ;
> +                try {
> +                    while ( ssi.hasNext() ) {
> +                        sink.send( ssi.next() );
> +                    }
> +                } finally {
> +                    Iter.close(ssi) ;
> +                    sink.close() ;
> +                }
> +                
> +                List<File> toRemove = new ArrayList<File>(MAX_SPILL_FILES) ;
> +                for ( int i = 0; i < MAX_SPILL_FILES; i++ ) {
> +                    File file = getSpillFiles().get(i) ;
> +                    file.delete() ;
> +                    toRemove.add(file) ;
> +                }
> +
> +                getSpillFiles().removeAll(toRemove) ;
> +
> +                memory = new ArrayList<E>() ;
> +            }            
> +        } catch (IOException e) {
> +            throw new AtlasException(e) ;
> +        }
> +    }
> +
>      @Override
>      public void close()
>      {
> 
> 


Re: svn commit: r1298851 - /incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/SortedDataBag.java

Posted by Stephen Allen <sa...@apache.org>.
Hi Paolo,

Looks good.  I went ahead and changed MAX_SPILL_FILES to protected and
wrote a few unit tests.  I'll close the JIRA as well.

-Stephen

On Fri, Mar 9, 2012 at 6:47 AM, Paolo Castagna
<ca...@googlemail.com> wrote:
> Stephen, please have a look at this, give it a spin and let me know if we can
> close JENA-157.
>
> I'd like to find a better way to test this, perhaps MAX_SPILL_FILES should be
> protected, so we can change it and add a proper unit test.
>
> What do you think?
>
> Paolo
>
> castagna@apache.org wrote:
>> Author: castagna
>> Date: Fri Mar  9 14:38:53 2012
>> New Revision: 1298851
>>
>> URL: http://svn.apache.org/viewvc?rev=1298851&view=rev
>> Log:
>> JENA-157 - This adds a pre-merge phase to SortedDataBag to merge files 100 at the time max. If more than 100 files need to be merged, it is done in multiple rounds.
>>
>> Modified:
>>     incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/SortedDataBag.java
>>
>> Modified: incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/SortedDataBag.java
>> URL: http://svn.apache.org/viewvc/incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/SortedDataBag.java?rev=1298851&r1=1298850&r2=1298851&view=diff
>> ==============================================================================
>> --- incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/SortedDataBag.java (original)
>> +++ incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/SortedDataBag.java Fri Mar  9 14:38:53 2012
>> @@ -62,6 +62,8 @@ import org.openjena.atlas.lib.Sink ;
>>   */
>>  public class SortedDataBag<E> extends AbstractDataBag<E>
>>  {
>> +    private static final int MAX_SPILL_FILES = 100 ; // this is the maximum number of files to merge at the same time
>> +
>>      protected final ThresholdPolicy<E> policy;
>>      protected final SerializationFactory<E> serializationFactory;
>>      protected final Comparator<? super E> comparator;
>> @@ -177,8 +179,15 @@ public class SortedDataBag<E> extends Ab
>>       * @return an Iterator
>>       */
>>      @Override
>> +     public Iterator<E> iterator()
>> +    {
>> +        preMerge();
>> +
>> +        return iterator(getSpillFiles().size());
>> +    }
>> +
>>      @SuppressWarnings({ "unchecked", "rawtypes" })
>> -    public Iterator<E> iterator()
>> +    private Iterator<E> iterator(int size)
>>      {
>>          checkClosed();
>>
>> @@ -197,15 +206,16 @@ public class SortedDataBag<E> extends Ab
>>
>>          if (spilled)
>>          {
>> -            List<Iterator<E>> inputs = new ArrayList<Iterator<E>>();
>> +            List<Iterator<E>> inputs = new ArrayList<Iterator<E>>(size + (memSize > 0 ? 1 : 0));
>>
>>              if (memSize > 0)
>>              {
>>                  inputs.add(memory.iterator());
>>              }
>>
>> -            for (File spillFile : getSpillFiles())
>> +            for ( int i = 0; i < size; i++ )
>>              {
>> +                File spillFile = getSpillFiles().get(i);
>>                  try
>>                  {
>>                      Iterator<E> irc = getInputIterator(spillFile);
>> @@ -241,6 +251,38 @@ public class SortedDataBag<E> extends Ab
>>          }
>>      }
>>
>> +    private void preMerge() {
>> +        if (getSpillFiles() == null || getSpillFiles().size() <= MAX_SPILL_FILES) { return; }
>> +
>> +        try {
>> +            while ( getSpillFiles().size() > MAX_SPILL_FILES ) {
>> +                Sink<E> sink = serializationFactory.createSerializer(getSpillStream()) ;
>> +                Iterator<E> ssi = iterator(MAX_SPILL_FILES) ;
>> +                try {
>> +                    while ( ssi.hasNext() ) {
>> +                        sink.send( ssi.next() );
>> +                    }
>> +                } finally {
>> +                    Iter.close(ssi) ;
>> +                    sink.close() ;
>> +                }
>> +
>> +                List<File> toRemove = new ArrayList<File>(MAX_SPILL_FILES) ;
>> +                for ( int i = 0; i < MAX_SPILL_FILES; i++ ) {
>> +                    File file = getSpillFiles().get(i) ;
>> +                    file.delete() ;
>> +                    toRemove.add(file) ;
>> +                }
>> +
>> +                getSpillFiles().removeAll(toRemove) ;
>> +
>> +                memory = new ArrayList<E>() ;
>> +            }
>> +        } catch (IOException e) {
>> +            throw new AtlasException(e) ;
>> +        }
>> +    }
>> +
>>      @Override
>>      public void close()
>>      {
>>
>>
>