You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by sa...@apache.org on 2013/01/29 02:48:08 UTC

svn commit: r1439731 - in /jena/trunk/jena-arq/src: main/java/org/apache/jena/riot/ main/java/org/apache/jena/riot/lang/ main/java/org/openjena/riot/ test/java/org/apache/jena/riot/ test/java/org/apache/jena/riot/lang/

Author: sallen
Date: Tue Jan 29 01:48:07 2013
New Revision: 1439731

URL: http://svn.apache.org/viewvc?rev=1439731&view=rev
Log:
Convert RiotParsePuller into two piped classes.

Added:
    jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedQuadsStream.java
    jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedRDFIterator.java
    jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedRDFStream.java
    jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedTriplesStream.java
    jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedTuplesStream.java
    jena/trunk/jena-arq/src/test/java/org/apache/jena/riot/TestRiotReader.java
    jena/trunk/jena-arq/src/test/java/org/apache/jena/riot/lang/TestPipedRDFIterators.java
Removed:
    jena/trunk/jena-arq/src/main/java/org/openjena/riot/RiotParsePuller.java
    jena/trunk/jena-arq/src/main/java/org/openjena/riot/RiotQuadParsePuller.java
    jena/trunk/jena-arq/src/main/java/org/openjena/riot/RiotTripleParsePuller.java
Modified:
    jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/RiotReader.java
    jena/trunk/jena-arq/src/main/java/org/openjena/riot/RiotReader.java
    jena/trunk/jena-arq/src/test/java/org/apache/jena/riot/TS_ReaderRIOT.java

Modified: jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/RiotReader.java
URL: http://svn.apache.org/viewvc/jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/RiotReader.java?rev=1439731&r1=1439730&r2=1439731&view=diff
==============================================================================
--- jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/RiotReader.java (original)
+++ jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/RiotReader.java Tue Jan 29 01:48:07 2013
@@ -35,12 +35,23 @@ import org.apache.jena.atlas.io.PeekRead
 import org.apache.jena.atlas.iterator.IteratorResourceClosing ;
 import org.apache.jena.atlas.json.io.parser.TokenizerJSON ;
 import org.apache.jena.atlas.lib.Sink ;
-import org.apache.jena.riot.lang.* ;
-import org.apache.jena.riot.system.* ;
+import org.apache.jena.riot.lang.LangNQuads ;
+import org.apache.jena.riot.lang.LangNTriples ;
+import org.apache.jena.riot.lang.LangRDFJSON ;
+import org.apache.jena.riot.lang.LangRDFXML ;
+import org.apache.jena.riot.lang.LangRIOT ;
+import org.apache.jena.riot.lang.LangTriG ;
+import org.apache.jena.riot.lang.LangTurtle ;
+import org.apache.jena.riot.lang.PipedQuadsStream ;
+import org.apache.jena.riot.lang.PipedRDFIterator ;
+import org.apache.jena.riot.lang.PipedTriplesStream ;
+import org.apache.jena.riot.system.ErrorHandlerFactory ;
+import org.apache.jena.riot.system.IRIResolver ;
+import org.apache.jena.riot.system.RiotLib ;
+import org.apache.jena.riot.system.StreamRDF ;
+import org.apache.jena.riot.system.StreamRDFLib ;
 import org.apache.jena.riot.tokens.Tokenizer ;
 import org.apache.jena.riot.tokens.TokenizerFactory ;
-import org.openjena.riot.RiotQuadParsePuller ;
-import org.openjena.riot.RiotTripleParsePuller ;
 
 import com.hp.hpl.jena.graph.Triple ;
 import com.hp.hpl.jena.sparql.core.Quad ;
@@ -225,8 +236,7 @@ public class RiotReader
         parse(in, lang, baseIRI, dest) ;
     }
 
-    // TODO create a Tokenizer version of this method
-    public static Iterator<Triple> createIteratorTriples(InputStream input, Lang lang, String baseIRI)
+    public static Iterator<Triple> createIteratorTriples(final InputStream input, final Lang lang, final String baseIRI)
     {
         // Special case N-Triples, because the RIOT reader has a pull interface
         if ( RDFLanguages.sameLang(RDFLanguages.NTRIPLES, lang) )
@@ -236,26 +246,47 @@ public class RiotReader
         else
         {
             // Otherwise, we have to spin up a thread to deal with it
-            RiotTripleParsePuller parsePuller = new RiotTripleParsePuller(input, lang, baseIRI);
-            parsePuller.parse();
-            return parsePuller;
+            final PipedRDFIterator<Triple> it = new PipedRDFIterator<Triple>();
+            final PipedTriplesStream out = new PipedTriplesStream(it);
+            
+            Thread t = new Thread(new Runnable()
+            {
+                @Override
+                public void run()
+                {
+                    parse(input, lang, baseIRI, out);
+                }
+            });
+            t.start();
+            
+            return it;
         }
     }
     
-    // TODO create a Tokenizer version of this method
-    public static Iterator<Quad> createIteratorQuads(InputStream input, Lang lang, String baseIRI)
+    public static Iterator<Quad> createIteratorQuads(final InputStream input, final Lang lang, final String baseIRI)
     {
         // Special case N-Quads, because the RIOT reader has a pull interface
-        if (  RDFLanguages.sameLang(RDFLanguages.NTRIPLES, lang) )
+        if ( RDFLanguages.sameLang(RDFLanguages.NQUADS, lang) )
         {
             return new IteratorResourceClosing<Quad>(createParserNQuads(input, null), input);
         }
         else
         {
             // Otherwise, we have to spin up a thread to deal with it
-            RiotQuadParsePuller parsePuller = new RiotQuadParsePuller(input, lang, baseIRI);
-            parsePuller.parse();
-            return parsePuller;
+            final PipedRDFIterator<Quad> it = new PipedRDFIterator<Quad>();
+            final PipedQuadsStream out = new PipedQuadsStream(it);
+            
+            Thread t = new Thread(new Runnable()
+            {
+                @Override
+                public void run()
+                {
+                    parse(input, lang, baseIRI, out);
+                }
+            });
+            t.start();
+            
+            return it;
         }
     }
     

Added: jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedQuadsStream.java
URL: http://svn.apache.org/viewvc/jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedQuadsStream.java?rev=1439731&view=auto
==============================================================================
--- jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedQuadsStream.java (added)
+++ jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedQuadsStream.java Tue Jan 29 01:48:07 2013
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.riot.lang ;
+
+import org.apache.jena.atlas.lib.Tuple ;
+import org.apache.jena.riot.system.StreamRDF ;
+
+import com.hp.hpl.jena.graph.Node ;
+import com.hp.hpl.jena.graph.Triple ;
+import com.hp.hpl.jena.sparql.core.Quad ;
+
+/**
+ * Implementation of a producer class that sends Quads; must be connected to a {@code PipedRDFIterator<Quad>}. 
+ */
+public class PipedQuadsStream extends PipedRDFStream<Quad> implements StreamRDF
+{
+    /**
+     * Creates a piped quads stream connected to the specified piped 
+     * RDF iterator.  Quads written to this stream will then be 
+     * available as input from <code>sink</code>.
+     *
+     * @param sink The piped RDF iterator to connect to.
+     */
+    public PipedQuadsStream(PipedRDFIterator<Quad> sink)
+    {
+        super(sink) ;
+    }
+
+    @Override
+    public void triple(Triple triple)
+    {
+        // Triples are discarded
+    }
+
+    @Override
+    public void quad(Quad quad)
+    {
+        receive(quad) ;
+    }
+
+    @Override
+    public void tuple(Tuple<Node> tuple)
+    {
+        // Tuples are discarded
+    }
+}

Added: jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedRDFIterator.java
URL: http://svn.apache.org/viewvc/jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedRDFIterator.java?rev=1439731&view=auto
==============================================================================
--- jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedRDFIterator.java (added)
+++ jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedRDFIterator.java Tue Jan 29 01:48:07 2013
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.riot.lang ;
+
+import java.util.Iterator ;
+import java.util.NoSuchElementException ;
+import java.util.concurrent.ArrayBlockingQueue ;
+import java.util.concurrent.BlockingQueue ;
+import java.util.concurrent.CancellationException ;
+import java.util.concurrent.TimeUnit ;
+
+import org.apache.jena.atlas.lib.Closeable ;
+import org.apache.jena.riot.RiotException ;
+import org.apache.jena.riot.system.LightweightPrefixMap ;
+import org.apache.jena.riot.system.PrefixMap ;
+
+/**
+ * A {@code PipedRDFIterator} should be connected to a {@link PipedRDFStream} implementation;
+ * the piped iterator then provides whatever RDF primitives are written to the {@code PipedRDFStream}.
+ * Typically, data is read from a {@code PipedRDFIterator} by one thread and data is written
+ * to the corresponding {@code PipedRDFStream} by some other thread.  Attempting to use both
+ * objects from a single thread is not recommended, as it may deadlock the thread.
+ * The {@code PipedRDFIterator} contains a buffer, decoupling read operations from write operations,
+ * within limits.
+ * <p/>
+ * Inspired by Java's {@link java.io.PipedInputStream} and {@link java.io.PipedOutputStream}
+ * 
+ * @param <T> The type of the RDF primitive, should be one of {@code Triple}, {@code Quad}, or {@code Tuple<Node>}
+ * 
+ * @see PipedTriplesStream
+ * @see PipedQuadsStream
+ * @see PipedTuplesStream
+ */
+public class PipedRDFIterator<T> implements Iterator<T>, Closeable
+{
+    /**
+     * Constant for default buffer size
+     */
+    public static final int DEFAULT_BUFFER_SIZE = 10000 ;
+    
+    private static final int ITERATOR_POLL_TIMEOUT = 1000 ; // one second
+    private static final TimeUnit ITERATOR_POLL_TIMEUNIT = TimeUnit.MILLISECONDS ;
+
+    private final BlockingQueue<T> queue ;
+
+    @SuppressWarnings("unchecked")
+    private final T endMarker = (T) new Object() ;
+
+    private volatile boolean closedByReader = false ;
+    private volatile boolean closedByWriter = false ;
+    private volatile Thread readSide ;
+    private volatile Thread writeSide ;
+    
+    private boolean connected = false ;
+
+    private T slot ;
+
+    private final Object lock = new Object() ; // protects baseIri and prefixes
+    private String baseIri ;
+    private final LightweightPrefixMap prefixes = new PrefixMap() ;
+
+
+    /**
+     * Creates a new piped RDF iterator with the default buffer size of {@code DEFAULT_BUFFER_SIZE}.
+     * <p>
+     * Buffer size must be chosen carefully in order to avoid performance
+     * problems, if you set the buffer size too low you will experience a lot of
+     * blocked calls so it will take longer to consume the data from the
+     * iterator. For best performance the buffer size should be at least 10% of
+     * the expected input size though you may need to tune this depending on how
+     * fast your consumer thread is.
+     * </p>
+     */
+    public PipedRDFIterator()
+    {
+        this(DEFAULT_BUFFER_SIZE);
+    }
+    
+    /**
+     * Creates a new piped RDF iterator
+     * <p>
+     * Buffer size must be chosen carefully in order to avoid performance
+     * problems, if you set the buffer size too low you will experience a lot of
+     * blocked calls so it will take longer to consume the data from the
+     * iterator. For best performance the buffer size should be at least 10% of
+     * the expected input size though you may need to tune this depending on how
+     * fast your consumer thread is.
+     * </p>
+     * 
+     * @param bufferSize
+     *            Buffer size
+     */
+    public PipedRDFIterator(int bufferSize)
+    {
+        this.queue = new ArrayBlockingQueue<T>(bufferSize) ;
+    }
+
+    /**
+     * Creates a new piped RDF iterator
+     * <p>
+     * Buffer size must be chosen carefully in order to avoid performance
+     * problems, if you set the buffer size too low you will experience a lot of
+     * blocked calls so it will take longer to consume the data from the
+     * iterator. For best performance the buffer size should be at least 10% of
+     * the expected input size though you may need to tune this depending on how
+     * fast your consumer thread is.
+     * </p>
+     * <p>
+     * The fair parameter controls whether the locking policy used for the
+     * buffer is fair. When enabled this reduces throughput but also reduces the
+     * chance of thread starvation.
+     * </p>
+     * 
+     * @param bufferSize
+     *            Buffer size
+     * @param fair
+     *            Whether the buffer should use a fair locking policy
+     */
+    public PipedRDFIterator(int bufferSize, boolean fair)
+    {
+        this.queue = new ArrayBlockingQueue<T>(bufferSize, fair) ;
+    }
+
+    @Override
+    public boolean hasNext()
+    {
+        if (!connected)
+            throw new IllegalStateException("Pipe not connected");
+        
+        if (closedByReader)
+            throw new RiotException("Pipe closed");
+        
+        readSide = Thread.currentThread();
+        
+        if (slot != null)
+            return true ;
+        while (true)
+        {
+            try
+            {
+                slot = queue.poll(ITERATOR_POLL_TIMEOUT, ITERATOR_POLL_TIMEUNIT) ;
+            }
+            catch (InterruptedException e)
+            {
+                throw new CancellationException() ;
+            }
+
+            if (null != slot)
+                break ;
+            
+            // If the producer thread died and did not call finish() then declare this pipe to be "broken"
+            // Since check is after the break, we will drain as much as possible out of the queue before throwing this exception
+            if (writeSide != null && !writeSide.isAlive() && !closedByWriter)
+            {
+                closedByReader = true ;
+                throw new RiotException("Write end dead") ;
+            }
+        }
+
+        if (slot == endMarker)
+        {
+            closedByReader = true ;
+            slot = null ;
+            return false ;
+        }
+        return true ;
+    }
+
+    @Override
+    public T next()
+    {
+        if (!hasNext())
+            throw new NoSuchElementException() ;
+        T item = slot ;
+        slot = null ;
+        return item ;
+    }
+
+    @Override
+    public void remove()
+    {
+        throw new UnsupportedOperationException() ;
+    }
+    
+    private void checkStateForReceive()
+    {
+        if (closedByWriter || closedByReader)
+        {
+            throw new RiotException("Pipe closed") ;
+        }
+        else if (readSide != null && !readSide.isAlive())
+        {
+            throw new RiotException("Read end dead") ;
+        }
+    }
+    
+    protected void connect()
+    {
+        this.connected = true;
+    }
+
+    protected void receive(T t)
+    {
+        checkStateForReceive();
+        writeSide = Thread.currentThread() ;
+
+        try
+        {
+            queue.put(t) ;
+        }
+        catch (InterruptedException e)
+        {
+            throw new CancellationException() ;
+        }
+    }
+
+    protected void base(String base)
+    {
+        synchronized (lock)
+        {
+            this.baseIri = base ;
+        }
+    }
+
+    /**
+     * Gets the most recently seen Base IRI
+     * 
+     * @return Base IRI
+     */
+    public String getBaseIri()
+    {
+        synchronized (lock)
+        {
+            return baseIri ;
+        }
+    }
+
+    protected void prefix(String prefix, String iri)
+    {
+        synchronized (lock)
+        {
+            prefixes.add(prefix, iri) ;
+        }
+    }
+
+    /**
+     * Gets the prefix map which contains the prefixes seen so far in the stream
+     * 
+     * @return Prefix Map
+     */
+    public LightweightPrefixMap getPrefixes()
+    {
+        synchronized (lock)
+        {
+            // Need to return a copy since PrefixMap is not concurrent
+            return new PrefixMap(prefixes) ;
+        }
+    }
+
+    protected void start()
+    {
+        // Do nothing
+    }
+
+    // Called by the producer
+    protected void finish()
+    {
+        receive(endMarker);
+        closedByWriter = true;
+    }
+
+    // Called by the consumer
+    @Override
+    public void close()
+    {
+        closedByReader = true ;
+    }
+}

Added: jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedRDFStream.java
URL: http://svn.apache.org/viewvc/jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedRDFStream.java?rev=1439731&view=auto
==============================================================================
--- jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedRDFStream.java (added)
+++ jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedRDFStream.java Tue Jan 29 01:48:07 2013
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.riot.lang ;
+
+import org.apache.jena.riot.system.StreamRDF ;
+
+/**
+ * Abstract implementation of a producer class that implements {@code StreamRDF};
+ * use one of the concrete implementations that match the RDF primitive you are using.
+ * 
+ * @see PipedTriplesStream
+ * @see PipedQuadsStream
+ * @see PipedTuplesStream
+ */
+public abstract class PipedRDFStream<T> implements StreamRDF
+{
+    private final PipedRDFIterator<T> sink ;
+
+    protected PipedRDFStream(PipedRDFIterator<T> sink)
+    {
+        this.sink = sink ;
+        this.sink.connect();
+    }
+
+    protected void receive(T t)
+    {
+        sink.receive(t) ;
+    }
+
+    @Override
+    public void base(String base)
+    {
+        sink.base(base) ;
+    }
+
+    @Override
+    public void prefix(String prefix, String iri)
+    {
+        sink.prefix(prefix, iri) ;
+    }
+
+    @Override
+    public void start()
+    {
+        sink.start() ;
+    }
+
+    @Override
+    public void finish()
+    {
+        sink.finish() ;
+    }
+}

Added: jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedTriplesStream.java
URL: http://svn.apache.org/viewvc/jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedTriplesStream.java?rev=1439731&view=auto
==============================================================================
--- jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedTriplesStream.java (added)
+++ jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedTriplesStream.java Tue Jan 29 01:48:07 2013
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.riot.lang ;
+
+import org.apache.jena.atlas.lib.Tuple ;
+import org.apache.jena.riot.system.StreamRDF ;
+
+import com.hp.hpl.jena.graph.Node ;
+import com.hp.hpl.jena.graph.Triple ;
+import com.hp.hpl.jena.sparql.core.Quad ;
+
+/**
+ * Implementation of a producer class that sends Triples; must be connected to a {@code PipedRDFIterator<Triple>}. 
+ */
+public class PipedTriplesStream extends PipedRDFStream<Triple> implements StreamRDF
+{
+    /**
+     * Creates a piped triples stream connected to the specified piped 
+     * RDF iterator.  Triples written to this stream will then be 
+     * available as input from <code>sink</code>.
+     *
+     * @param sink The piped RDF iterator to connect to.
+     */
+    public PipedTriplesStream(PipedRDFIterator<Triple> sink)
+    {
+        super(sink) ;
+    }
+
+    @Override
+    public void triple(Triple triple)
+    {
+        receive(triple) ;
+    }
+
+    @Override
+    public void quad(Quad quad)
+    {
+        // Quads are discarded
+    }
+
+    @Override
+    public void tuple(Tuple<Node> tuple)
+    {
+        // Tuples are discarded
+    }
+}

Added: jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedTuplesStream.java
URL: http://svn.apache.org/viewvc/jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedTuplesStream.java?rev=1439731&view=auto
==============================================================================
--- jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedTuplesStream.java (added)
+++ jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedTuplesStream.java Tue Jan 29 01:48:07 2013
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.riot.lang ;
+
+import org.apache.jena.atlas.lib.Tuple ;
+import org.apache.jena.riot.system.StreamRDF ;
+
+import com.hp.hpl.jena.graph.Node ;
+import com.hp.hpl.jena.graph.Triple ;
+import com.hp.hpl.jena.sparql.core.Quad ;
+
+/**
+ * Implementation of a producer class that sends @{code Tuple<Node>}; must be connected to a {@code PipedRDFIterator<Tuple<Node>}. 
+ */
+public class PipedTuplesStream extends PipedRDFStream<Tuple<Node>> implements StreamRDF
+{
+    /**
+     * Creates a piped tuples stream connected to the specified piped 
+     * RDF iterator.  Tuples written to this stream will then be 
+     * available as input from <code>sink</code>.
+     *
+     * @param sink The piped RDF iterator to connect to.
+     */
+    public PipedTuplesStream(PipedRDFIterator<Tuple<Node>> sink)
+    {
+        super(sink) ;
+    }
+
+    @Override
+    public void triple(Triple triple)
+    {
+        // Triples are discarded
+    }
+
+    @Override
+    public void quad(Quad quad)
+    {
+        // Quads are discarded
+    }
+
+    @Override
+    public void tuple(Tuple<Node> tuple)
+    {
+        receive(tuple) ;
+    }
+}

Modified: jena/trunk/jena-arq/src/main/java/org/openjena/riot/RiotReader.java
URL: http://svn.apache.org/viewvc/jena/trunk/jena-arq/src/main/java/org/openjena/riot/RiotReader.java?rev=1439731&r1=1439730&r2=1439731&view=diff
==============================================================================
--- jena/trunk/jena-arq/src/main/java/org/openjena/riot/RiotReader.java (original)
+++ jena/trunk/jena-arq/src/main/java/org/openjena/riot/RiotReader.java Tue Jan 29 01:48:07 2013
@@ -32,8 +32,21 @@ import org.apache.jena.atlas.lib.Sink ;
 import org.apache.jena.riot.Lang ;
 import org.apache.jena.riot.RDFDataMgr ;
 import org.apache.jena.riot.RDFLanguages ;
-import org.apache.jena.riot.lang.* ;
-import org.apache.jena.riot.system.* ;
+import org.apache.jena.riot.lang.LangNQuads ;
+import org.apache.jena.riot.lang.LangNTriples ;
+import org.apache.jena.riot.lang.LangRDFJSON ;
+import org.apache.jena.riot.lang.LangRDFXML ;
+import org.apache.jena.riot.lang.LangRIOT ;
+import org.apache.jena.riot.lang.LangTriG ;
+import org.apache.jena.riot.lang.LangTurtle ;
+import org.apache.jena.riot.lang.PipedQuadsStream ;
+import org.apache.jena.riot.lang.PipedRDFIterator ;
+import org.apache.jena.riot.lang.PipedTriplesStream ;
+import org.apache.jena.riot.system.ErrorHandlerFactory ;
+import org.apache.jena.riot.system.IRIResolver ;
+import org.apache.jena.riot.system.RiotLib ;
+import org.apache.jena.riot.system.StreamRDF ;
+import org.apache.jena.riot.system.StreamRDFLib ;
 import org.apache.jena.riot.tokens.Tokenizer ;
 import org.apache.jena.riot.tokens.TokenizerFactory ;
 
@@ -198,8 +211,7 @@ public class RiotReader
         return org.apache.jena.riot.RiotReader.createParser(tokenizer, lang, baseIRI, dest) ;
     }
     
-    // TODO create a Tokenizer version of this method
-    public static Iterator<Triple> createIteratorTriples(InputStream input, Lang lang, String baseIRI)
+    public static Iterator<Triple> createIteratorTriples(final InputStream input, final Lang lang, final String baseIRI)
     {
         // Special case N-Triples, because the RIOT reader has a pull interface
         if ( RDFLanguages.sameLang(RDFLanguages.NTRIPLES, lang) )
@@ -209,9 +221,20 @@ public class RiotReader
         else
         {
             // Otherwise, we have to spin up a thread to deal with it
-            RiotTripleParsePuller parsePuller = new RiotTripleParsePuller(input, lang, baseIRI);
-            parsePuller.parse();
-            return parsePuller;
+            final PipedRDFIterator<Triple> it = new PipedRDFIterator<Triple>();
+            final PipedTriplesStream out = new PipedTriplesStream(it);
+            
+            Thread t = new Thread(new Runnable()
+            {
+                @Override
+                public void run()
+                {
+                    parseTriples(input, lang, baseIRI, out);
+                }
+            });
+            t.start();
+            
+            return it;
         }
     }
     
@@ -235,8 +258,7 @@ public class RiotReader
         return createParserTriples(tokenizer, lang, baseIRI, dest) ;
     }
     
-    // TODO create a Tokenizer version of this method
-    public static Iterator<Quad> createIteratorQuads(InputStream input, Lang lang, String baseIRI)
+    public static Iterator<Quad> createIteratorQuads(final InputStream input, final Lang lang, final String baseIRI)
     {
         // Special case N-Quads, because the RIOT reader has a pull interface
         if (  RDFLanguages.sameLang(RDFLanguages.NTRIPLES, lang) )
@@ -246,9 +268,20 @@ public class RiotReader
         else
         {
             // Otherwise, we have to spin up a thread to deal with it
-            RiotQuadParsePuller parsePuller = new RiotQuadParsePuller(input, lang, baseIRI);
-            parsePuller.parse();
-            return parsePuller;
+            final PipedRDFIterator<Quad> it = new PipedRDFIterator<Quad>();
+            final PipedQuadsStream out = new PipedQuadsStream(it);
+            
+            Thread t = new Thread(new Runnable()
+            {
+                @Override
+                public void run()
+                {
+                    parseQuads(input, lang, baseIRI, out);
+                }
+            });
+            t.start();
+            
+            return it;
         }
     }
     

Modified: jena/trunk/jena-arq/src/test/java/org/apache/jena/riot/TS_ReaderRIOT.java
URL: http://svn.apache.org/viewvc/jena/trunk/jena-arq/src/test/java/org/apache/jena/riot/TS_ReaderRIOT.java?rev=1439731&r1=1439730&r2=1439731&view=diff
==============================================================================
--- jena/trunk/jena-arq/src/test/java/org/apache/jena/riot/TS_ReaderRIOT.java (original)
+++ jena/trunk/jena-arq/src/test/java/org/apache/jena/riot/TS_ReaderRIOT.java Tue Jan 29 01:48:07 2013
@@ -31,6 +31,7 @@ import org.junit.runners.Suite.SuiteClas
     , TestStreamManager.class 
     , TestJenaReaderRIOT.class
     , TestReadData.class
+    , TestRiotReader.class
 })
 
 public class TS_ReaderRIOT

Added: jena/trunk/jena-arq/src/test/java/org/apache/jena/riot/TestRiotReader.java
URL: http://svn.apache.org/viewvc/jena/trunk/jena-arq/src/test/java/org/apache/jena/riot/TestRiotReader.java?rev=1439731&view=auto
==============================================================================
--- jena/trunk/jena-arq/src/test/java/org/apache/jena/riot/TestRiotReader.java (added)
+++ jena/trunk/jena-arq/src/test/java/org/apache/jena/riot/TestRiotReader.java Tue Jan 29 01:48:07 2013
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.riot;
+
+import static org.junit.Assert.assertEquals ;
+import static org.junit.Assert.assertFalse ;
+import static org.junit.Assert.assertNotNull ;
+import static org.junit.Assert.assertTrue ;
+
+import java.io.ByteArrayInputStream ;
+import java.util.Iterator ;
+
+import org.apache.jena.atlas.lib.StrUtils ;
+import org.junit.Test ;
+
+import com.hp.hpl.jena.graph.Triple ;
+
+public class TestRiotReader
+{
+    @Test
+    public void testCreateIteratorTriples_01()
+    {
+        Iterator<Triple> it = RiotReader.createIteratorTriples(new ByteArrayInputStream("".getBytes()), RDFLanguages.NTRIPLES, "http://example/");
+        
+        assertFalse(it.hasNext());
+    }
+    
+    @Test
+    public void testCreateIteratorTriples_02()
+    {
+        String x = StrUtils.strjoinNL(
+                "<rdf:RDF", 
+                "   xmlns:rdf=\"http://www.w3.org/1999/02/22-rdf-syntax-ns#\"",
+                "   xmlns:j.0=\"http://example/\">" ,
+                "  <rdf:Description rdf:about=\"http://example/s\">" ,
+                "     <j.0:p rdf:resource=\"http://example/o\"/>" ,
+                "   </rdf:Description>" ,
+                "</rdf:RDF>") ;
+        
+        Iterator<Triple> it = RiotReader.createIteratorTriples(new ByteArrayInputStream(x.getBytes()), RDFLanguages.RDFXML, "http://example/");
+        
+        assertTrue(it.hasNext());
+        Triple t = it.next();
+        assertNotNull(t);
+        assertEquals("http://example/s", t.getSubject().getURI());
+        assertEquals("http://example/p", t.getPredicate().getURI());
+        assertEquals("http://example/o", t.getObject().getURI());
+        
+        assertFalse(it.hasNext());
+    }
+}

Added: jena/trunk/jena-arq/src/test/java/org/apache/jena/riot/lang/TestPipedRDFIterators.java
URL: http://svn.apache.org/viewvc/jena/trunk/jena-arq/src/test/java/org/apache/jena/riot/lang/TestPipedRDFIterators.java?rev=1439731&view=auto
==============================================================================
--- jena/trunk/jena-arq/src/test/java/org/apache/jena/riot/lang/TestPipedRDFIterators.java (added)
+++ jena/trunk/jena-arq/src/test/java/org/apache/jena/riot/lang/TestPipedRDFIterators.java Tue Jan 29 01:48:07 2013
@@ -0,0 +1,670 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.riot.lang;
+
+import java.io.ByteArrayInputStream ;
+import java.nio.charset.Charset ;
+import java.util.concurrent.Callable ;
+import java.util.concurrent.ExecutionException ;
+import java.util.concurrent.ExecutorService ;
+import java.util.concurrent.Executors ;
+import java.util.concurrent.Future ;
+import java.util.concurrent.TimeUnit ;
+import java.util.concurrent.TimeoutException ;
+
+import junit.framework.Assert ;
+
+import org.apache.jena.atlas.lib.Tuple ;
+import org.apache.jena.riot.RDFDataMgr ;
+import org.apache.jena.riot.RDFLanguages ;
+import org.apache.jena.riot.RiotException ;
+import org.junit.AfterClass ;
+import org.junit.BeforeClass ;
+import org.junit.Test ;
+import org.slf4j.Logger ;
+import org.slf4j.LoggerFactory ;
+
+import com.hp.hpl.jena.graph.Node ;
+import com.hp.hpl.jena.graph.Triple ;
+import com.hp.hpl.jena.sparql.core.Quad ;
+import com.hp.hpl.jena.sparql.util.NodeFactory ;
+
+/**
+ * Tests for the {@link PipedRDFIterator} implementation
+ * 
+ */
+public class TestPipedRDFIterators {
+    private static final Logger LOGGER = LoggerFactory.getLogger(TestPipedRDFIterators.class);
+
+    private static ExecutorService executor;
+
+    /**
+     * Create our thread pool
+     */
+    @BeforeClass
+    public static void setup() {
+        // We use far more than the required 2 threads to avoid intermittent
+        // deadlock issues
+        // that can otherwise occur
+        executor = Executors.newFixedThreadPool(10);
+    }
+
+    /**
+     * Destroy our thread pool
+     * 
+     * @throws InterruptedException
+     */
+    @AfterClass
+    public static void teardown() throws InterruptedException {
+        executor.shutdownNow();
+        executor.awaitTermination(10, TimeUnit.SECONDS);
+    }
+
+    private void test_streamed_triples(int bufferSize, final int generateSize, boolean fair) throws InterruptedException,
+            ExecutionException, TimeoutException {
+        
+        final PipedRDFIterator<Triple> it = new PipedRDFIterator<Triple>(bufferSize, fair);
+        final PipedTriplesStream out = new PipedTriplesStream(it);
+
+        // Create a runnable that will generate triples
+        Runnable genTriples = new Runnable() {
+
+            @Override
+            public void run() {
+                out.start();
+                // Generate triples
+                for (int i = 1; i <= generateSize; i++) {
+                    Triple t = new Triple(Node.createAnon(), Node.createURI("http://predicate"), NodeFactory.intToNode(i));
+                    out.triple(t);
+                }
+                out.finish();
+                return;
+            }
+        };
+
+        // Create a runnable that will consume triples
+        Callable<Integer> consumeTriples = new Callable<Integer>() {
+
+            @Override
+            public Integer call() throws Exception {
+                int count = 0;
+                while (it.hasNext()) {
+                    it.next();
+                    count++;
+                }
+                return count;
+            }
+        };
+
+        // Run the threads
+        Future<?> genResult = executor.submit(genTriples);
+        Future<Integer> result = executor.submit(consumeTriples);
+        Integer count = 0;
+        try {
+            count = result.get(5, TimeUnit.SECONDS);
+        } catch (TimeoutException e) {
+            // Check that it wasn't the producer thread erroring that caused us
+            // to time out
+            genResult.get();
+            // It wasn't so throw the original error
+            throw e;
+        }
+        Assert.assertEquals(generateSize, (int) count);
+    }
+
+    /**
+     * Test that blocking and waiting work nicely
+     * 
+     * @throws ExecutionException
+     * @throws InterruptedException
+     * @throws TimeoutException
+     */
+    @Test
+    public void streamed_triples_iterator_01() throws InterruptedException, ExecutionException, TimeoutException {
+        // Buffer size is tiny
+        this.test_streamed_triples(1, 100, true);
+    }
+
+    /**
+     * Test that blocking and waiting work nicely
+     * 
+     * @throws ExecutionException
+     * @throws InterruptedException
+     * @throws TimeoutException
+     */
+    @Test
+    public void streamed_triples_iterator_02() throws InterruptedException, ExecutionException, TimeoutException {
+        // Buffer size is much smaller than generated triples
+        this.test_streamed_triples(10, 1000, false);
+    }
+
+    /**
+     * Test that blocking and waiting work nicely
+     * 
+     * @throws ExecutionException
+     * @throws InterruptedException
+     * @throws TimeoutException
+     */
+    @Test
+    public void streamed_triples_iterator_03() throws InterruptedException, ExecutionException, TimeoutException {
+        // Buffer size is smaller than generated triples
+        this.test_streamed_triples(100, 1000, false);
+    }
+
+    /**
+     * Test where blocking should rarely occur
+     * 
+     * @throws ExecutionException
+     * @throws InterruptedException
+     * @throws TimeoutException
+     */
+    @Test
+    public void streamed_triples_iterator_04() throws InterruptedException, ExecutionException, TimeoutException {
+        // Buffer size is same as generated triples
+        this.test_streamed_triples(1000, 1000, false);
+    }
+
+    /**
+     * Test where blocking should rarely occur
+     * 
+     * @throws ExecutionException
+     * @throws InterruptedException
+     * @throws TimeoutException
+     */
+    @Test
+    public void streamed_triples_iterator_05() throws InterruptedException, ExecutionException, TimeoutException {
+        // Buffer size is much larger than generated triples
+        this.test_streamed_triples(10000, 1000, false);
+    }
+
+    /**
+     * Test where blocking may occur
+     * 
+     * @throws ExecutionException
+     * @throws InterruptedException
+     * @throws TimeoutException
+     */
+    @Test
+    public void streamed_triples_iterator_06() throws InterruptedException, ExecutionException, TimeoutException {
+        // Buffer size is small relative to generated triples
+        this.test_streamed_triples(1000, 100000, false);
+    }
+
+    /**
+     * Test where blocking may occur
+     * 
+     * @throws ExecutionException
+     * @throws InterruptedException
+     * @throws TimeoutException
+     */
+    @Test
+    public void streamed_triples_iterator_07() throws InterruptedException, ExecutionException, TimeoutException {
+        // Buffer size is small relative to generated triples
+        this.test_streamed_triples(10000, 100000, false);
+    }
+
+    private void test_streamed_quads(int bufferSize, final int generateSize, boolean fair) throws InterruptedException,
+            ExecutionException, TimeoutException {
+        
+        final PipedRDFIterator<Quad> it = new PipedRDFIterator<Quad>(bufferSize, fair);
+        final PipedQuadsStream out = new PipedQuadsStream(it);
+
+        // Create a runnable that will generate quads
+        Runnable genQuads = new Runnable() {
+
+            @Override
+            public void run() {
+                out.start();
+                // Generate quads
+                for (int i = 1; i <= generateSize; i++) {
+                    Quad q = new Quad(Node.createURI("http://graph"), Node.createAnon(), Node.createURI("http://predicate"),
+                            NodeFactory.intToNode(i));
+                    out.quad(q);
+                }
+                out.finish();
+                return;
+            }
+        };
+
+        // Create a runnable that will consume quads
+        Callable<Integer> consumeQuads = new Callable<Integer>() {
+
+            @Override
+            public Integer call() throws Exception {
+                int count = 0;
+                while (it.hasNext()) {
+                    it.next();
+                    count++;
+                }
+                return count;
+            }
+        };
+
+        // Run the threads
+        Future<?> genResult = executor.submit(genQuads);
+        Future<Integer> result = executor.submit(consumeQuads);
+        Integer count = 0;
+        try {
+            count = result.get(5, TimeUnit.SECONDS);
+        } catch (TimeoutException e) {
+            // Check that it wasn't the producer thread erroring that caused us
+            // to time out
+            genResult.get();
+            // It wasn't so throw the original error
+            throw e;
+        }
+        Assert.assertEquals(generateSize, (int) count);
+    }
+
+    /**
+     * Test that blocking and waiting work nicely
+     * 
+     * @throws ExecutionException
+     * @throws InterruptedException
+     * @throws TimeoutException
+     */
+    @Test
+    public void streamed_quads_iterator_01() throws InterruptedException, ExecutionException, TimeoutException {
+        // Buffer size is tiny
+        this.test_streamed_quads(1, 100, true);
+    }
+
+    /**
+     * Test that blocking and waiting work nicely
+     * 
+     * @throws ExecutionException
+     * @throws InterruptedException
+     * @throws TimeoutException
+     */
+    @Test
+    public void streamed_quads_iterator_02() throws InterruptedException, ExecutionException, TimeoutException {
+        // Buffer size is much smaller than generated quads
+        this.test_streamed_quads(10, 1000, false);
+    }
+
+    /**
+     * Test that blocking and waiting work nicely
+     * 
+     * @throws ExecutionException
+     * @throws InterruptedException
+     * @throws TimeoutException
+     */
+    @Test
+    public void streamed_quads_iterator_03() throws InterruptedException, ExecutionException, TimeoutException {
+        // Buffer size is smaller than generated quads
+        this.test_streamed_quads(100, 1000, false);
+    }
+
+    /**
+     * Test where blocking should rarely occur
+     * 
+     * @throws ExecutionException
+     * @throws InterruptedException
+     * @throws TimeoutException
+     */
+    @Test
+    public void streamed_quads_iterator_04() throws InterruptedException, ExecutionException, TimeoutException {
+        // Buffer size is same as generated quads
+        this.test_streamed_quads(1000, 1000, false);
+    }
+
+    /**
+     * Test where blocking should rarely occur
+     * 
+     * @throws ExecutionException
+     * @throws InterruptedException
+     * @throws TimeoutException
+     */
+    @Test
+    public void streamed_quads_iterator_05() throws InterruptedException, ExecutionException, TimeoutException {
+        // Buffer size is much larger than generated quads
+        this.test_streamed_quads(10000, 1000, false);
+    }
+
+    /**
+     * Test where blocking may occur
+     * 
+     * @throws ExecutionException
+     * @throws InterruptedException
+     * @throws TimeoutException
+     */
+    @Test
+    public void streamed_quads_iterator_06() throws InterruptedException, ExecutionException, TimeoutException {
+        // Buffer size is small relative to generated quads
+        this.test_streamed_quads(1000, 100000, false);
+    }
+
+    /**
+     * Test where blocking may occur
+     * 
+     * @throws ExecutionException
+     * @throws InterruptedException
+     * @throws TimeoutException
+     */
+    @Test
+    public void streamed_quads_iterator_07() throws InterruptedException, ExecutionException, TimeoutException {
+        // Buffer size is small relative to generated quads
+        this.test_streamed_quads(10000, 100000, false);
+    }
+
+    private void test_streamed_tuples(int bufferSize, final int generateSize, boolean fair) throws InterruptedException,
+            ExecutionException, TimeoutException {
+        
+        final PipedRDFIterator<Tuple<Node>> it = new PipedRDFIterator<Tuple<Node>>();
+        final PipedTuplesStream out = new PipedTuplesStream(it);
+        
+        // Create a runnable that will generate tuples
+        Runnable genQuads = new Runnable() {
+
+            @Override
+            public void run() {
+                out.start();
+                // Generate tuples
+                for (int i = 1; i <= generateSize; i++) {
+                    Tuple<Node> t = Tuple.create(Node.createURI("http://graph"), Node.createAnon(),
+                            Node.createURI("http://predicate"), NodeFactory.intToNode(i));
+                    out.tuple(t);
+                }
+                out.finish();
+                return;
+            }
+        };
+
+        // Create a runnable that will consume tuples
+        Callable<Integer> consumeQuads = new Callable<Integer>() {
+
+            @Override
+            public Integer call() throws Exception {
+                int count = 0;
+                while (it.hasNext()) {
+                    it.next();
+                    count++;
+                }
+                return count;
+            }
+        };
+
+        // Run the threads
+        Future<?> genResult = executor.submit(genQuads);
+        Future<Integer> result = executor.submit(consumeQuads);
+        Integer count = 0;
+        try {
+            count = result.get(5, TimeUnit.SECONDS);
+        } catch (TimeoutException e) {
+            // Check that it wasn't the producer thread erroring that caused us
+            // to time out
+            genResult.get();
+            // It wasn't so throw the original error
+            throw e;
+        }
+        Assert.assertEquals(generateSize, (int) count);
+    }
+
+    /**
+     * Test that blocking and waiting work nicely
+     * 
+     * @throws ExecutionException
+     * @throws InterruptedException
+     * @throws TimeoutException
+     */
+    @Test
+    public void streamed_tuples_iterator_01() throws InterruptedException, ExecutionException, TimeoutException {
+        // Buffer size is tiny
+        this.test_streamed_tuples(1, 100, true);
+    }
+
+    /**
+     * Test that blocking and waiting work nicely
+     * 
+     * @throws ExecutionException
+     * @throws InterruptedException
+     * @throws TimeoutException
+     */
+    @Test
+    public void streamed_tuples_iterator_02() throws InterruptedException, ExecutionException, TimeoutException {
+        // Buffer size is much smaller than generated tuples
+        this.test_streamed_tuples(10, 1000, false);
+    }
+
+    /**
+     * Test that blocking and waiting work nicely
+     * 
+     * @throws ExecutionException
+     * @throws InterruptedException
+     * @throws TimeoutException
+     */
+    @Test
+    public void streamed_tuples_iterator_03() throws InterruptedException, ExecutionException, TimeoutException {
+        // Buffer size is smaller than generated tuples
+        this.test_streamed_tuples(100, 1000, false);
+    }
+
+    /**
+     * Test where blocking should rarely occur
+     * 
+     * @throws ExecutionException
+     * @throws InterruptedException
+     * @throws TimeoutException
+     */
+    @Test
+    public void streamed_tuples_iterator_04() throws InterruptedException, ExecutionException, TimeoutException {
+        // Buffer size is same as generated tuples
+        this.test_streamed_tuples(1000, 1000, false);
+    }
+
+    /**
+     * Test where blocking should rarely occur
+     * 
+     * @throws ExecutionException
+     * @throws InterruptedException
+     * @throws TimeoutException
+     */
+    @Test
+    public void streamed_tuples_iterator_05() throws InterruptedException, ExecutionException, TimeoutException {
+        // Buffer size is much larger than generated tuples
+        this.test_streamed_tuples(10000, 1000, false);
+    }
+
+    /**
+     * Test where blocking may occur
+     * 
+     * @throws ExecutionException
+     * @throws InterruptedException
+     * @throws TimeoutException
+     */
+    @Test
+    public void streamed_tuples_iterator_06() throws InterruptedException, ExecutionException, TimeoutException {
+        // Buffer size is small relative to generated tuples
+        this.test_streamed_tuples(1000, 100000, false);
+    }
+
+    /**
+     * Test where blocking may occur
+     * 
+     * @throws ExecutionException
+     * @throws InterruptedException
+     * @throws TimeoutException
+     */
+    @Test
+    public void streamed_tuples_iterator_07() throws InterruptedException, ExecutionException, TimeoutException {
+        // Buffer size is small relative to generated tuples
+        this.test_streamed_tuples(10000, 100000, false);
+    }
+
+    /**
+     * Test for bad buffer size
+     */
+    @Test(expected = IllegalArgumentException.class)
+    public void streamed_instantiation_bad_01() {
+        new PipedRDFIterator<Triple>(0);
+    }
+
+    /**
+     * Test for bad buffer size
+     */
+    @Test(expected = IllegalArgumentException.class)
+    public void streamed_instantiation_bad_02() {
+        new PipedRDFIterator<Triple>(-1);
+    }
+
+    /**
+     * Tests that the iterate copes correctly in the case of hitting a parser
+     * error
+     * 
+     * @param data
+     *            Data string (Turtle format) which should be malformed
+     * @param expected
+     *            Number of valid triples expected to be generated before the
+     *            error is hit
+     * @throws TimeoutException
+     * @throws InterruptedException
+     */
+    private void test_streamed_triples_bad(final String data, int expected) throws TimeoutException, InterruptedException {
+
+        
+        final PipedRDFIterator<Triple> it = new PipedRDFIterator<Triple>();
+        final PipedTriplesStream out = new PipedTriplesStream(it);
+
+        // Create a runnable that will try to parse the bad data
+        Runnable runParser = new Runnable() {
+
+            @Override
+            public void run() {
+                Charset utf8 = Charset.forName("utf8");
+                ByteArrayInputStream input = new ByteArrayInputStream(data.getBytes(utf8));
+                try {
+                    RDFDataMgr.parse(out, input, null, RDFLanguages.TURTLE, null);
+                } catch (Throwable t) {
+                    // Ignore the error
+                }
+                return;
+            }
+        };
+
+        // Create a runnable that will consume triples
+        Callable<Integer> consumeTriples = new Callable<Integer>() {
+
+            @Override
+            public Integer call() throws Exception {
+                int count = 0;
+                while (it.hasNext()) {
+                    it.next();
+                    count++;
+                }
+                return count;
+            }
+        };
+
+        // Run the threads
+        Future<?> genResult = executor.submit(runParser);
+        Future<Integer> result = executor.submit(consumeTriples);
+        Integer count = 0;
+        try {
+            count = result.get(5, TimeUnit.SECONDS);
+        } catch (TimeoutException e) {
+            // We expect the producer thread to have errored
+            try {
+                genResult.get();
+            } catch (ExecutionException ex) {
+                // This is as expected, ignore
+                LOGGER.warn("Errored as expected", ex);
+            }
+            // However we expect the consumer to still have been notified of
+            // failure
+            throw e;
+        } catch (ExecutionException e) {
+            // This was not expected
+            Assert.fail(e.getMessage());
+        }
+
+        // Since the produce thread failed the consumer thread should give the
+        // expected count
+        Assert.assertEquals(expected, (int) count);
+    }
+
+    /**
+     * Test failure of the iterator
+     * 
+     * @throws TimeoutException
+     * @throws InterruptedException
+     */
+    @Test
+    public void streamed_triples_bad_01() throws TimeoutException, InterruptedException {
+        test_streamed_triples_bad("@prefix : <http://unterminated", 0);
+    }
+
+    /**
+     * Test failure of the iterator
+     * 
+     * @throws TimeoutException
+     * @throws InterruptedException
+     */
+    @Test
+    public void streamed_triples_bad_02() throws TimeoutException, InterruptedException {
+        test_streamed_triples_bad("@prefix : <http://example> . :s :p :o . :x :y", 1);
+    }
+
+    /**
+     * Tests attempting to access the iterator before the stream has been connected
+     */
+    @Test(expected = IllegalStateException.class)
+    public void streamed_state_bad_01() {
+        PipedRDFIterator<Triple> it = new PipedRDFIterator<Triple>();
+        it.hasNext();
+    }
+
+    /**
+     * Tests attempting to access the iterator after the producer dies 
+     */
+    @Test(expected = RiotException.class)
+    public void streamed_state_bad_02() {
+        
+        final PipedRDFIterator<Triple> it = new PipedRDFIterator<Triple>();
+        final PipedTriplesStream out = new PipedTriplesStream(it);
+        
+        Thread t = new Thread(new Runnable()
+        {
+            @Override
+            public void run()
+            {
+                out.start();
+                out.triple(Triple.create(Node.createURI("urn:s"), Node.createURI("urn:p"), Node.createURI("urn:o")));
+                throw new RuntimeException("die!");
+            }
+        });
+        
+        // Because this is a unit test, set an exception handler to suppress the normal printing of the stacktrace to stderr
+        t.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler()
+        {
+            @Override
+            public void uncaughtException(Thread t, Throwable e)
+            {
+                // Do nothing
+            }
+        });
+        
+        t.start();
+        
+        Assert.assertTrue(it.hasNext());
+        it.next();
+        
+        // Should throw a RiotException
+        it.hasNext();
+    }
+}