You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by sa...@apache.org on 2011/11/23 23:50:03 UTC

svn commit: r1205644 - in /incubator/jena/Jena2/ARQ/trunk/src: main/java/org/openjena/atlas/data/ test/java/org/openjena/atlas/data/

Author: sallen
Date: Wed Nov 23 22:50:01 2011
New Revision: 1205644

URL: http://svn.apache.org/viewvc?rev=1205644&view=rev
Log:
Added DistinctDataNet that enables the creation of a semi-streaming DISTINCT iterator backed by spill files.  To be used in JENA-119.

Added:
    incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/DistinctDataNet.java
    incubator/jena/Jena2/ARQ/trunk/src/test/java/org/openjena/atlas/data/TestDistinctDataNet.java
Modified:
    incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/AbstractDataBag.java
    incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/DefaultDataBag.java
    incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/DistinctDataBag.java
    incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/SortedDataBag.java
    incubator/jena/Jena2/ARQ/trunk/src/test/java/org/openjena/atlas/data/DataBagExaminer.java
    incubator/jena/Jena2/ARQ/trunk/src/test/java/org/openjena/atlas/data/TS_Data.java
    incubator/jena/Jena2/ARQ/trunk/src/test/java/org/openjena/atlas/data/TestDistinctDataBag.java
    incubator/jena/Jena2/ARQ/trunk/src/test/java/org/openjena/atlas/data/TestSortedDataBag.java

Modified: incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/AbstractDataBag.java
URL: http://svn.apache.org/viewvc/incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/AbstractDataBag.java?rev=1205644&r1=1205643&r2=1205644&view=diff
==============================================================================
--- incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/AbstractDataBag.java (original)
+++ incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/AbstractDataBag.java Wed Nov 23 22:50:01 2011
@@ -18,10 +18,14 @@
 
 package org.openjena.atlas.data;
 
+import java.io.BufferedInputStream ;
 import java.io.BufferedOutputStream ;
 import java.io.File ;
+import java.io.FileInputStream ;
+import java.io.FileNotFoundException ;
 import java.io.FileOutputStream ;
 import java.io.IOException ;
+import java.io.InputStream ;
 import java.io.OutputStream ;
 import java.lang.ref.WeakReference ;
 import java.util.ArrayList ;
@@ -38,7 +42,7 @@ import org.openjena.atlas.lib.FileOps ;
  */
 public abstract class AbstractDataBag<E> implements DataBag<E>
 {
-    protected final List<File> spillFiles = new ArrayList<File>();
+    private final List<File> spillFiles = new ArrayList<File>();
     protected Collection<E> memory = new ArrayList<E>();
     
     private final List<WeakReference<Closeable>> closeableIterators = new ArrayList<WeakReference<Closeable>>();
@@ -92,15 +96,33 @@ public abstract class AbstractDataBag<E>
         return tmpFile ;
     }
     
+    /**
+     * Register the spill file handle for use later in the iterator.
+     */
+    protected void registerSpillFile(File spillFile)
+    {
+        spillFiles.add(spillFile);
+    }
+    
+    protected static OutputStream getOutputStream(File file) throws FileNotFoundException
+    {
+        return new BufferedOutputStream(new FileOutputStream(file));
+    }
+    
+    protected static InputStream getInputStream(File file) throws FileNotFoundException
+    {
+        return new BufferedInputStream(new FileInputStream(file));
+    }
+    
     /** 
-     * Get a file to spill contents to.  The file will be registered in the spillFiles array.
+     * Get a stream to spill contents to.  The file that backs this stream will be registered in the spillFiles array.
      * @return stream to write tuples to
      */
-    protected OutputStream getSpillFile() throws IOException
+    protected OutputStream getSpillStream() throws IOException
     {
         File outputFile = getNewTemporaryFile();
-        OutputStream toReturn = new BufferedOutputStream(new FileOutputStream(outputFile));
-        spillFiles.add(outputFile);
+        OutputStream toReturn = getOutputStream(outputFile);
+        registerSpillFile(outputFile);
         
         return toReturn;
     }
@@ -133,6 +155,11 @@ public abstract class AbstractDataBag<E>
         }
     }
     
+    protected List<File> getSpillFiles()
+    {
+        return spillFiles;
+    }
+    
     protected void deleteSpillFiles()
     {
         for (File file : spillFiles)

Modified: incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/DefaultDataBag.java
URL: http://svn.apache.org/viewvc/incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/DefaultDataBag.java?rev=1205644&r1=1205643&r2=1205644&view=diff
==============================================================================
--- incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/DefaultDataBag.java (original)
+++ incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/DefaultDataBag.java Wed Nov 23 22:50:01 2011
@@ -112,7 +112,7 @@ public class DefaultDataBag<E> extends A
         // a prolonged period of time.
         try
         {
-            out = getSpillFile();
+            out = getSpillStream();
         }
         catch (IOException e)
         {
@@ -161,7 +161,7 @@ public class DefaultDataBag<E> extends A
         // Create a new reader
         if (policy.isThresholdExceeded())
         {
-            File spillFile = spillFiles.get(0);
+            File spillFile = getSpillFiles().get(0);
             
             InputStream in;
             try

Modified: incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/DistinctDataBag.java
URL: http://svn.apache.org/viewvc/incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/DistinctDataBag.java?rev=1205644&r1=1205643&r2=1205644&view=diff
==============================================================================
--- incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/DistinctDataBag.java (original)
+++ incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/DistinctDataBag.java Wed Nov 23 22:50:01 2011
@@ -95,7 +95,7 @@ public class DistinctDataBag<E> extends 
         }
     }
     
-    private class DistinctReducedIterator<T> extends PeekIterator<T> implements Closeable
+    protected static class DistinctReducedIterator<T> extends PeekIterator<T> implements Closeable
     {
         private Iterator<T> iter;
         

Added: incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/DistinctDataNet.java
URL: http://svn.apache.org/viewvc/incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/DistinctDataNet.java?rev=1205644&view=auto
==============================================================================
--- incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/DistinctDataNet.java (added)
+++ incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/DistinctDataNet.java Wed Nov 23 22:50:01 2011
@@ -0,0 +1,282 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.openjena.atlas.data;
+
+import java.io.File ;
+import java.io.FileNotFoundException ;
+import java.util.ArrayList ;
+import java.util.Comparator ;
+import java.util.Iterator ;
+import java.util.List ;
+import java.util.NoSuchElementException ;
+
+import org.openjena.atlas.AtlasException ;
+import org.openjena.atlas.iterator.Iter ;
+import org.openjena.atlas.lib.Closeable ;
+import org.openjena.atlas.lib.FileOps ;
+
+
+/**
+ * This class is like {@link DistinctDataBag} except that you are informed if the item you just
+ * added was known to be distinct.  This will normally only work until the first spill.  After that,
+ * the system may not be able to tell for sure, and will thus return false.  When you are finished
+ * adding items, you may call {@link #netIterator()} to get any distinct items that are in the
+ * spill files but were not indicated as distinct previously.  This is useful for a distinct
+ * operator that streams results until it exceeds the spill threshold.
+ */
+public class DistinctDataNet<E> extends DistinctDataBag<E>
+{
+    protected File firstSpillFile;
+    
+    public DistinctDataNet(ThresholdPolicy<E> policy, SerializationFactory<E> serializerFactory, Comparator<E> comparator)
+    {
+        super(policy, serializerFactory, comparator) ;
+    }
+    
+    /**
+     * @return true if the item added is known to be distinct.
+     */
+    public boolean netAdd(E item)
+    {
+        long s = size ;
+        super.add(item) ;
+        return !spilled && size > s ;
+    }
+    
+    @Override
+    protected void registerSpillFile(File spillFile)
+    {
+        // If this is the first time spilling, then keep this spill file separate
+        if (!spilled)
+        {
+            firstSpillFile = spillFile;
+        }
+        else
+        {
+            super.registerSpillFile(spillFile);
+        }
+    }
+    
+    @Override
+    protected void deleteSpillFiles()
+    {
+        super.deleteSpillFiles();
+        FileOps.delete(firstSpillFile, false);
+        firstSpillFile = null;
+    }
+    
+    // Used by the .iterator() method
+    @Override
+    protected List<File> getSpillFiles()
+    {
+        List<File> toReturn = new ArrayList<File>(super.getSpillFiles());
+        if (null != firstSpillFile)
+        {
+            toReturn.add(firstSpillFile);
+        }
+        return toReturn;
+    }
+    
+    // TODO: Will be used by the .netIterator() method
+    protected List<File> getNetSpillFiles()
+    {
+        return super.getSpillFiles();
+    }
+    
+    /**
+     * Returns an iterator to all additional items that are distinct but were
+     * not reported to be so at the time {@link #netAdd(Object)} was invoked.
+     * <p/>
+     * If you do not exhaust the iterator, you should call {@link org.openjena.atlas.iterator.Iter#close(Iterator)}
+     * to be sure any open file handles are closed.
+     */
+    public Iterator<E> netIterator()
+    {
+        // If we havn't spilled, then we have already indicated all distinct values via .netAdd()
+        if (!spilled)
+        {
+            return Iter.nullIter();
+        }
+        
+        Iterator<E> blacklist;
+        try
+        {
+            blacklist = getInputIterator(firstSpillFile);
+        }
+        catch ( FileNotFoundException e )
+        {
+            throw new AtlasException("Cannot find the first spill file", e);
+        }
+        
+        // TODO: Improve performance by making the superclass .iterator() use getNetSpillFiles()
+        // instead of getSpillFiles() so it doesn't contain the contents of the first file
+        Iterator<E> rest = super.iterator();
+        
+        SortedDiffIterator<E> sdi = SortedDiffIterator.create(rest, blacklist, comparator);
+        registerCloseableIterator(sdi);
+        
+        return sdi;
+    }
+    
+    /**
+     * Produces the set difference of two sorted set sequences.
+     */
+    protected static class SortedDiffIterator<T> implements Iterator<T>, Closeable
+    {
+        private final Iterator<T> grayList;
+        private final Iterator<T> blackList;
+        private final Comparator<? super T> comp;
+        
+        private boolean finished = false;
+        private boolean blackSlotFull = false;
+        private T white;
+        private T black;
+        
+        /**
+         * Produces the set difference of two sorted set sequences using the natural ordering of the items
+         * (null items will always be considered less than any other items).
+         * 
+         * @param first An Iterator&lt;T&gt; whose elements that are not also in second will be returned.
+         * @param second An Iterator&lt;T&gt; whose elements that also occur in the first sequence will cause those elements to be removed from the returned sequence. 
+         */
+        public static <S extends Comparable<? super S>> SortedDiffIterator<S> create(Iterator<S> first, Iterator<S> second)
+        {
+            return create(first, second, new Comparator<S>()
+            {
+                @Override
+                public int compare(S o1, S o2)
+                {
+                    if (null == o1 && null == o2) return 0;
+                    if (null == o1) return -1;
+                    if (null == o2) return 1;
+                    return o1.compareTo(o2);
+                }
+            });
+        }
+        
+        /**
+         * Produces the set difference of two sorted set sequences using the specified comparator.
+         * 
+         * @param first An Iterator&lt;T&gt; whose elements that are not also in second will be returned.
+         * @param second An Iterator&lt;T&gt; whose elements that also occur in the first sequence will cause those elements to be removed from the returned sequence.
+         * @param comparator The comparator used to compare the elements from each iterator. 
+         */
+        public static <S> SortedDiffIterator<S> create(Iterator<S> first, Iterator<S> second, Comparator<? super S> comparator)
+        {
+            return new SortedDiffIterator<S>(first, second, comparator);
+        }
+        
+        
+        private SortedDiffIterator(Iterator<T> first, Iterator<T> second, Comparator<? super T> comparator)
+        {
+            this.grayList = first;
+            this.blackList = second;
+            this.comp = comparator;
+            
+            // Prime the white item
+            fill();
+        }
+        
+        private void fill()
+        {
+            if (finished) return;
+            
+            if (!grayList.hasNext())
+            {
+                close();
+                return;
+            }
+            
+            if (!blackSlotFull)
+            {
+                if (!blackList.hasNext())
+                {
+                    white = grayList.next();
+                    return;
+                }
+                
+                black = blackList.next();
+                blackSlotFull = true;
+            }
+            
+            // Outer loop advances white
+            while (true)
+            {
+                if (!grayList.hasNext())
+                {
+                    close();
+                    return;
+                }
+                white = grayList.next();
+                
+                int cmp = comp.compare(white, black);
+                
+                if (cmp < 0) return;
+                
+                // Inner loop advances black until white is less than or equal to it
+                while (cmp > 0)
+                {
+                    if (!blackList.hasNext())
+                    {
+                        black = null;
+                        blackSlotFull = false;
+                        return;
+                    }
+                    black = blackList.next();
+                    cmp = comp.compare(white, black);
+                    
+                    if (cmp < 0) return;
+                }
+            }
+        }
+        
+        @Override
+        public boolean hasNext()
+        {
+            return !finished;
+        }
+
+        @Override
+        public T next()
+        {
+            if (finished) throw new NoSuchElementException();
+            T toReturn = white;
+            fill();
+            return toReturn;
+        }
+
+        @Override
+        public void remove()
+        {
+            throw new UnsupportedOperationException("SortedDiffIterator.remove");
+        }
+        
+        @Override
+        public void close()
+        {
+            finished = true;
+            white = null;
+            black = null;
+            Iter.close(grayList);
+            Iter.close(blackList);
+        }
+    }
+
+}
+

Modified: incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/SortedDataBag.java
URL: http://svn.apache.org/viewvc/incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/SortedDataBag.java?rev=1205644&r1=1205643&r2=1205644&view=diff
==============================================================================
--- incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/SortedDataBag.java (original)
+++ incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/atlas/data/SortedDataBag.java Wed Nov 23 22:50:01 2011
@@ -18,9 +18,7 @@
 
 package org.openjena.atlas.data;
 
-import java.io.BufferedInputStream ;
 import java.io.File ;
-import java.io.FileInputStream ;
 import java.io.FileNotFoundException ;
 import java.io.IOException ;
 import java.io.InputStream ;
@@ -64,9 +62,9 @@ import org.openjena.atlas.lib.Sink ;
  */
 public class SortedDataBag<E> extends AbstractDataBag<E>
 {
-    private final ThresholdPolicy<E> policy;
-    private final SerializationFactory<E> serializationFactory;
-    private final Comparator<? super E> comparator;
+    protected final ThresholdPolicy<E> policy;
+    protected final SerializationFactory<E> serializationFactory;
+    protected final Comparator<? super E> comparator;
     
     protected boolean finishedAdding = false;
     protected boolean spilled = false;
@@ -116,7 +114,7 @@ public class SortedDataBag<E> extends Ab
     }
     
     @SuppressWarnings({ "unchecked", "rawtypes" })
-    private void spill()
+    protected void spill()
     {
         // Make sure we have something to spill.
         if (memory.size() > 0)
@@ -124,7 +122,7 @@ public class SortedDataBag<E> extends Ab
             OutputStream out;
             try
             {
-                out = getSpillFile();
+                out = getSpillStream();
             }
             catch (IOException e)
             {
@@ -163,6 +161,13 @@ public class SortedDataBag<E> extends Ab
     {
         spill();
     }
+    
+    protected Iterator<E> getInputIterator(File spillFile) throws FileNotFoundException
+    {
+        InputStream in = getInputStream(spillFile);
+        Iterator<E> deserializer = serializationFactory.createDeserializer(in) ;
+        return new IteratorResourceClosing<E>(deserializer, in);
+    }
 
     /**
      * Returns an iterator over a set of elements of type E.  If you do not exhaust
@@ -192,21 +197,18 @@ public class SortedDataBag<E> extends Ab
         
         if (spilled)
         {
-            List<Iterator<E>> inputs = new ArrayList<Iterator<E>>(spillFiles.size() + (memSize > 0 ? 1 : 0));
+            List<Iterator<E>> inputs = new ArrayList<Iterator<E>>();
                         
             if (memSize > 0)
             {
                 inputs.add(memory.iterator());
             }
             
-            for (File spillFile : spillFiles)
+            for (File spillFile : getSpillFiles())
             {
                 try
                 {
-                    InputStream in = new BufferedInputStream(new FileInputStream(spillFile));
-                    
-                    Iterator<E> deserializer = serializationFactory.createDeserializer(in) ;
-                    IteratorResourceClosing<E> irc = new IteratorResourceClosing<E>(deserializer, in);
+                    Iterator<E> irc = getInputIterator(spillFile);
                     inputs.add(irc);
                 }
                 catch (FileNotFoundException e)
@@ -255,7 +257,7 @@ public class SortedDataBag<E> extends Ab
     /**
      * An iterator that handles getting the next tuple from the bag.
      */
-    private class SpillSortIterator<T> implements Iterator<T>, Closeable
+    protected static class SpillSortIterator<T> implements Iterator<T>, Closeable
     {
         private final List<Iterator<T>> inputs;
         private final Comparator<? super T> comp;

Modified: incubator/jena/Jena2/ARQ/trunk/src/test/java/org/openjena/atlas/data/DataBagExaminer.java
URL: http://svn.apache.org/viewvc/incubator/jena/Jena2/ARQ/trunk/src/test/java/org/openjena/atlas/data/DataBagExaminer.java?rev=1205644&r1=1205643&r2=1205644&view=diff
==============================================================================
--- incubator/jena/Jena2/ARQ/trunk/src/test/java/org/openjena/atlas/data/DataBagExaminer.java (original)
+++ incubator/jena/Jena2/ARQ/trunk/src/test/java/org/openjena/atlas/data/DataBagExaminer.java Wed Nov 23 22:50:01 2011
@@ -28,7 +28,7 @@ public class DataBagExaminer
     public static int countTemporaryFiles(AbstractDataBag<?> bag)
     {
         int count = 0;
-        for (File tempFile : bag.spillFiles)
+        for (File tempFile : bag.getSpillFiles())
         {
             if (tempFile.exists())
             {

Modified: incubator/jena/Jena2/ARQ/trunk/src/test/java/org/openjena/atlas/data/TS_Data.java
URL: http://svn.apache.org/viewvc/incubator/jena/Jena2/ARQ/trunk/src/test/java/org/openjena/atlas/data/TS_Data.java?rev=1205644&r1=1205643&r2=1205644&view=diff
==============================================================================
--- incubator/jena/Jena2/ARQ/trunk/src/test/java/org/openjena/atlas/data/TS_Data.java (original)
+++ incubator/jena/Jena2/ARQ/trunk/src/test/java/org/openjena/atlas/data/TS_Data.java Wed Nov 23 22:50:01 2011
@@ -29,6 +29,7 @@ import org.junit.runners.Suite.SuiteClas
 { 
     TestSortedDataBag.class, 
     TestDistinctDataBag.class,
+    TestDistinctDataNet.class,
     TestThresholdPolicyCount.class
 })
 

Modified: incubator/jena/Jena2/ARQ/trunk/src/test/java/org/openjena/atlas/data/TestDistinctDataBag.java
URL: http://svn.apache.org/viewvc/incubator/jena/Jena2/ARQ/trunk/src/test/java/org/openjena/atlas/data/TestDistinctDataBag.java?rev=1205644&r1=1205643&r2=1205644&view=diff
==============================================================================
--- incubator/jena/Jena2/ARQ/trunk/src/test/java/org/openjena/atlas/data/TestDistinctDataBag.java (original)
+++ incubator/jena/Jena2/ARQ/trunk/src/test/java/org/openjena/atlas/data/TestDistinctDataBag.java Wed Nov 23 22:50:01 2011
@@ -114,12 +114,15 @@ public class TestDistinctDataBag extends
                 new ThresholdPolicyCount<Binding>(10),
                 SerializationFactoryFinder.bindingSerializationFactory(),
                 new BindingComparator(new ArrayList<SortCondition>()));
+        
+        List<File> spillFiles = new ArrayList<File>();
         try
         {
             db.addAll(undistinct);
+            spillFiles.addAll(db.getSpillFiles());
             
             int count = 0;
-            for (File file : db.spillFiles)
+            for (File file : spillFiles)
             {
                 if (file.exists())
                 {
@@ -142,7 +145,7 @@ public class TestDistinctDataBag extends
         }
         
         int count = 0;
-        for (File file : db.spillFiles)
+        for (File file : spillFiles)
         {
             if (file.exists())
             {

Added: incubator/jena/Jena2/ARQ/trunk/src/test/java/org/openjena/atlas/data/TestDistinctDataNet.java
URL: http://svn.apache.org/viewvc/incubator/jena/Jena2/ARQ/trunk/src/test/java/org/openjena/atlas/data/TestDistinctDataNet.java?rev=1205644&view=auto
==============================================================================
--- incubator/jena/Jena2/ARQ/trunk/src/test/java/org/openjena/atlas/data/TestDistinctDataNet.java (added)
+++ incubator/jena/Jena2/ARQ/trunk/src/test/java/org/openjena/atlas/data/TestDistinctDataNet.java Wed Nov 23 22:50:01 2011
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.openjena.atlas.data;
+
+import java.io.File ;
+import java.util.ArrayList ;
+import java.util.Arrays ;
+import java.util.Iterator ;
+import java.util.List ;
+import java.util.Random ;
+
+import junit.framework.TestCase ;
+
+import org.junit.Test ;
+import org.openjena.atlas.iterator.Iter ;
+import org.openjena.riot.SerializationFactoryFinder ;
+
+import com.hp.hpl.jena.datatypes.xsd.XSDDatatype ;
+import com.hp.hpl.jena.graph.Node ;
+import com.hp.hpl.jena.query.SortCondition ;
+import com.hp.hpl.jena.sparql.core.Var ;
+import com.hp.hpl.jena.sparql.engine.binding.Binding ;
+import com.hp.hpl.jena.sparql.engine.binding.BindingComparator ;
+import com.hp.hpl.jena.sparql.engine.binding.BindingFactory ;
+import com.hp.hpl.jena.sparql.engine.binding.BindingMap ;
+import com.hp.hpl.jena.sparql.resultset.ResultSetCompare ;
+import com.hp.hpl.jena.sparql.sse.Item ;
+import com.hp.hpl.jena.sparql.sse.SSE ;
+import com.hp.hpl.jena.sparql.sse.builders.BuilderBinding ;
+import com.hp.hpl.jena.sparql.util.NodeUtils ;
+
+public class TestDistinctDataNet extends TestCase
+{
+    private static final String LETTERS = "qwertyuiopasdfghjklzxcvbnmQWERTYUIOPASDFGHJKLZXCVBNM";
+    Random random = new Random();
+    
+    static Binding b12 = build("(?a 1) (?b 2)") ;
+    static Binding b19 = build("(?a 1) (?b 9)") ;
+    static Binding b02 = build("(?b 2)") ;
+    static Binding b10 = build("(?a 1)") ;
+    static Binding b0  = build("") ;
+    static Binding bb1 = build("(?a _:XYZ) (?b 1)");
+    static Binding x10 = build("(?x <http://example/abc>)") ;
+    
+    @Test
+    public void testDistinct()
+    {
+        List<Binding> undistinct = new ArrayList<Binding>();
+        undistinct.add(b12);
+        undistinct.add(b19);
+        undistinct.add(b02);
+        undistinct.add(b12);
+        undistinct.add(b19);
+        undistinct.add(b12);
+        undistinct.add(b02);
+        undistinct.add(x10);
+        
+        List<Binding> control = Iter.toList(Iter.distinct(undistinct.iterator()));
+        List<Binding> distinct = new ArrayList<Binding>();
+        
+        
+        DistinctDataNet<Binding> db = new DistinctDataNet<Binding>(
+                new ThresholdPolicyCount<Binding>(2),
+                SerializationFactoryFinder.bindingSerializationFactory(),
+                new BindingComparator(new ArrayList<SortCondition>())); 
+        try
+        {
+            db.addAll(undistinct);
+            
+            Iterator<Binding> iter = db.iterator(); 
+            while (iter.hasNext())
+            {
+                distinct.add(iter.next());
+            }
+            Iter.close(iter);
+        }
+        finally
+        {
+            db.close();
+        }
+        
+        assertEquals(control.size(), distinct.size());
+        assertTrue(ResultSetCompare.equalsByTest(control, distinct, NodeUtils.sameTerm));
+    }
+    
+    @Test
+    public void testDistinct2()
+    {
+        List<Binding> undistinct = new ArrayList<Binding>();
+        undistinct.add(b12);
+        undistinct.add(b19);
+        undistinct.add(b02);
+        undistinct.add(b12);
+        undistinct.add(b19);
+        undistinct.add(b12);
+        undistinct.add(b02);
+        undistinct.add(x10);
+        
+        List<Binding> control = Iter.toList(Iter.distinct(undistinct.iterator()));
+        List<Binding> distinct = new ArrayList<Binding>();
+        
+        
+        DistinctDataNet<Binding> db = new DistinctDataNet<Binding>(
+                new ThresholdPolicyCount<Binding>(2),
+                SerializationFactoryFinder.bindingSerializationFactory(),
+                new BindingComparator(new ArrayList<SortCondition>())); 
+        try
+        {
+            for (Binding b : undistinct)
+            {
+                if (db.netAdd(b))
+                {
+                    distinct.add(b);
+                }
+            }
+            
+            Iterator<Binding> iter = db.netIterator(); 
+            while (iter.hasNext())
+            {
+                distinct.add(iter.next());
+            }
+            Iter.close(iter);
+        }
+        finally
+        {
+            db.close();
+        }
+        
+        assertEquals(control.size(), distinct.size());
+        assertTrue(ResultSetCompare.equalsByTest(control, distinct, NodeUtils.sameTerm));
+    }
+    
+    @Test
+    public void testTemporaryFilesAreCleanedUpAfterCompletion()
+    {
+        List<Binding> undistinct = new ArrayList<Binding>();
+        random = new Random();
+        Var[] vars = new Var[]{
+            Var.alloc("1"), Var.alloc("2"), Var.alloc("3"),
+            Var.alloc("4"), Var.alloc("5"), Var.alloc("6"),
+            Var.alloc("7"), Var.alloc("8"), Var.alloc("9"), Var.alloc("0")
+        };
+        for(int i = 0; i < 500; i++){
+            undistinct.add(randomBinding(vars));
+        }
+        
+        DistinctDataNet<Binding> db = new DistinctDataNet<Binding>(
+                new ThresholdPolicyCount<Binding>(10),
+                SerializationFactoryFinder.bindingSerializationFactory(),
+                new BindingComparator(new ArrayList<SortCondition>()));
+        
+        List<File> spillFiles = new ArrayList<File>();
+        try
+        {
+            db.addAll(undistinct);
+            spillFiles.addAll(db.getSpillFiles());
+            
+            int count = 0;
+            for (File file : spillFiles)
+            {
+                if (file.exists())
+                {
+                    count++;
+                }
+            }
+            // 500 bindings divided into 50 chunks (49 in files, and 1 in memory)
+            assertEquals(49, count);
+            
+            Iterator<Binding> iter = db.iterator();
+            while (iter.hasNext())
+            {
+                iter.next();
+            }
+            Iter.close(iter);
+        }
+        finally
+        {
+            db.close();
+        }
+        
+        int count = 0;
+        for (File file : spillFiles)
+        {
+            if (file.exists())
+            {
+                count++;
+            }
+        }
+        assertEquals(0, count);
+    }
+    
+    private void testDiff(String first, String second, String expected)
+    {
+        DistinctDataNet.SortedDiffIterator<String> sdi = DistinctDataNet.SortedDiffIterator.create(
+                Arrays.asList(first.split(" ")).iterator(),
+                Arrays.asList(second.split(" ")).iterator());
+        
+    }
+    
+    private void testDiff(String[] first, String[] second, String expected)
+    {
+        DistinctDataNet.SortedDiffIterator<String> sdi = DistinctDataNet.SortedDiffIterator.create(
+                Arrays.asList(first).iterator(),
+                Arrays.asList(second).iterator());
+        
+        StringBuilder sb = new StringBuilder();
+        boolean firstTime = true;
+        while (sdi.hasNext())
+        {
+            if (!firstTime)
+            {
+                sb.append(" ");
+            }
+            firstTime = false;
+            
+            String s = sdi.next();            
+            if (null == s)
+            {
+                s = "null";
+            }
+            sb.append(s);
+        }
+        
+        assertEquals(expected, sb.toString());
+    }
+    
+    @Test
+    public void testSortedDiffIterator()
+    {
+        testDiff("a b e g i j", "b g h", "a e i j");
+        testDiff("a b e g i j", "", "a b e g i j");
+        testDiff("", "b g h", "");
+        testDiff("", "", "");
+        testDiff("a", "a", "");
+        testDiff("a", "b", "a");
+        testDiff("b", "a", "b");
+        testDiff("a b e g i j", "b g h z", "a e i j");
+        testDiff("a b c", "a b c", "");
+        
+        testDiff(new String[] {null, "a", "b", "e", "g", "i", "j", }, new String[] { "b", "g", "h", }, "null a e i j");
+        testDiff(new String[] {"a", "b", "e", "g", "i", "j", }, new String[] { null, "b", "g", "h", }, "a e i j");
+        testDiff(new String[] {null, "a", "b", "e", "g", "i", "j", }, new String[] { null, "b", "g", "h", }, "a e i j");
+    }
+    
+
+    private static Binding build(String string)
+    {
+        Item item = SSE.parse("(binding "+string+")") ;
+        return BuilderBinding.build(item) ;
+    }
+    
+    private Binding randomBinding(Var[] vars)
+    {
+        BindingMap binding = BindingFactory.create();
+        binding.add(vars[0], Node.createAnon());
+        binding.add(vars[1], Node.createURI(randomURI()));
+        binding.add(vars[2], Node.createURI(randomURI()));
+        binding.add(vars[3], Node.createLiteral(randomString(20)));
+        binding.add(vars[4], Node.createAnon());
+        binding.add(vars[5], Node.createURI(randomURI()));
+        binding.add(vars[6], Node.createURI(randomURI()));
+        binding.add(vars[7], Node.createLiteral(randomString(5)));
+        binding.add(vars[8], Node.createLiteral("" + random.nextInt(), null, XSDDatatype.XSDinteger));
+        binding.add(vars[9], Node.createAnon());
+        return binding;
+    }
+
+    public String randomURI() 
+    {
+        return String.format("http://%s.example.com/%s", randomString(10), randomString(10));
+    }
+    
+    public String randomString(int length)
+    {
+        StringBuilder builder = new StringBuilder();
+        for(int i = 0; i < length; i++){
+            builder.append(LETTERS.charAt(random.nextInt(LETTERS.length())));
+        }
+        return builder.toString();
+    }
+}

Modified: incubator/jena/Jena2/ARQ/trunk/src/test/java/org/openjena/atlas/data/TestSortedDataBag.java
URL: http://svn.apache.org/viewvc/incubator/jena/Jena2/ARQ/trunk/src/test/java/org/openjena/atlas/data/TestSortedDataBag.java?rev=1205644&r1=1205643&r2=1205644&view=diff
==============================================================================
--- incubator/jena/Jena2/ARQ/trunk/src/test/java/org/openjena/atlas/data/TestSortedDataBag.java (original)
+++ incubator/jena/Jena2/ARQ/trunk/src/test/java/org/openjena/atlas/data/TestSortedDataBag.java Wed Nov 23 22:50:01 2011
@@ -108,12 +108,15 @@ public class TestSortedDataBag extends T
                 new ThresholdPolicyCount<Binding>(10),
                 SerializationFactoryFinder.bindingSerializationFactory(),
                 comparator);
+        
+        List<File> spillFiles = new ArrayList<File>();
         try
         {
             db.addAll(unsorted);
+            spillFiles.addAll(db.getSpillFiles());
             
             int count = 0;
-            for (File file : db.spillFiles)
+            for (File file : spillFiles)
             {
                 if (file.exists())
                 {
@@ -136,7 +139,7 @@ public class TestSortedDataBag extends T
         }
         
         int count = 0;
-        for (File file : db.spillFiles)
+        for (File file : spillFiles)
         {
             if (file.exists())
             {