You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by sa...@apache.org on 2013/01/29 02:48:08 UTC
svn commit: r1439731 - in /jena/trunk/jena-arq/src:
main/java/org/apache/jena/riot/ main/java/org/apache/jena/riot/lang/
main/java/org/openjena/riot/ test/java/org/apache/jena/riot/
test/java/org/apache/jena/riot/lang/
Author: sallen
Date: Tue Jan 29 01:48:07 2013
New Revision: 1439731
URL: http://svn.apache.org/viewvc?rev=1439731&view=rev
Log:
Convert RiotParsePuller into two piped classes.
Added:
jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedQuadsStream.java
jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedRDFIterator.java
jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedRDFStream.java
jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedTriplesStream.java
jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedTuplesStream.java
jena/trunk/jena-arq/src/test/java/org/apache/jena/riot/TestRiotReader.java
jena/trunk/jena-arq/src/test/java/org/apache/jena/riot/lang/TestPipedRDFIterators.java
Removed:
jena/trunk/jena-arq/src/main/java/org/openjena/riot/RiotParsePuller.java
jena/trunk/jena-arq/src/main/java/org/openjena/riot/RiotQuadParsePuller.java
jena/trunk/jena-arq/src/main/java/org/openjena/riot/RiotTripleParsePuller.java
Modified:
jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/RiotReader.java
jena/trunk/jena-arq/src/main/java/org/openjena/riot/RiotReader.java
jena/trunk/jena-arq/src/test/java/org/apache/jena/riot/TS_ReaderRIOT.java
Modified: jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/RiotReader.java
URL: http://svn.apache.org/viewvc/jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/RiotReader.java?rev=1439731&r1=1439730&r2=1439731&view=diff
==============================================================================
--- jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/RiotReader.java (original)
+++ jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/RiotReader.java Tue Jan 29 01:48:07 2013
@@ -35,12 +35,23 @@ import org.apache.jena.atlas.io.PeekRead
import org.apache.jena.atlas.iterator.IteratorResourceClosing ;
import org.apache.jena.atlas.json.io.parser.TokenizerJSON ;
import org.apache.jena.atlas.lib.Sink ;
-import org.apache.jena.riot.lang.* ;
-import org.apache.jena.riot.system.* ;
+import org.apache.jena.riot.lang.LangNQuads ;
+import org.apache.jena.riot.lang.LangNTriples ;
+import org.apache.jena.riot.lang.LangRDFJSON ;
+import org.apache.jena.riot.lang.LangRDFXML ;
+import org.apache.jena.riot.lang.LangRIOT ;
+import org.apache.jena.riot.lang.LangTriG ;
+import org.apache.jena.riot.lang.LangTurtle ;
+import org.apache.jena.riot.lang.PipedQuadsStream ;
+import org.apache.jena.riot.lang.PipedRDFIterator ;
+import org.apache.jena.riot.lang.PipedTriplesStream ;
+import org.apache.jena.riot.system.ErrorHandlerFactory ;
+import org.apache.jena.riot.system.IRIResolver ;
+import org.apache.jena.riot.system.RiotLib ;
+import org.apache.jena.riot.system.StreamRDF ;
+import org.apache.jena.riot.system.StreamRDFLib ;
import org.apache.jena.riot.tokens.Tokenizer ;
import org.apache.jena.riot.tokens.TokenizerFactory ;
-import org.openjena.riot.RiotQuadParsePuller ;
-import org.openjena.riot.RiotTripleParsePuller ;
import com.hp.hpl.jena.graph.Triple ;
import com.hp.hpl.jena.sparql.core.Quad ;
@@ -225,8 +236,7 @@ public class RiotReader
parse(in, lang, baseIRI, dest) ;
}
- // TODO create a Tokenizer version of this method
- public static Iterator<Triple> createIteratorTriples(InputStream input, Lang lang, String baseIRI)
+ public static Iterator<Triple> createIteratorTriples(final InputStream input, final Lang lang, final String baseIRI)
{
// Special case N-Triples, because the RIOT reader has a pull interface
if ( RDFLanguages.sameLang(RDFLanguages.NTRIPLES, lang) )
@@ -236,26 +246,47 @@ public class RiotReader
else
{
// Otherwise, we have to spin up a thread to deal with it
- RiotTripleParsePuller parsePuller = new RiotTripleParsePuller(input, lang, baseIRI);
- parsePuller.parse();
- return parsePuller;
+ final PipedRDFIterator<Triple> it = new PipedRDFIterator<Triple>();
+ final PipedTriplesStream out = new PipedTriplesStream(it);
+
+ Thread t = new Thread(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ parse(input, lang, baseIRI, out);
+ }
+ });
+ t.start();
+
+ return it;
}
}
- // TODO create a Tokenizer version of this method
- public static Iterator<Quad> createIteratorQuads(InputStream input, Lang lang, String baseIRI)
+ public static Iterator<Quad> createIteratorQuads(final InputStream input, final Lang lang, final String baseIRI)
{
// Special case N-Quads, because the RIOT reader has a pull interface
- if ( RDFLanguages.sameLang(RDFLanguages.NTRIPLES, lang) )
+ if ( RDFLanguages.sameLang(RDFLanguages.NQUADS, lang) )
{
return new IteratorResourceClosing<Quad>(createParserNQuads(input, null), input);
}
else
{
// Otherwise, we have to spin up a thread to deal with it
- RiotQuadParsePuller parsePuller = new RiotQuadParsePuller(input, lang, baseIRI);
- parsePuller.parse();
- return parsePuller;
+ final PipedRDFIterator<Quad> it = new PipedRDFIterator<Quad>();
+ final PipedQuadsStream out = new PipedQuadsStream(it);
+
+ Thread t = new Thread(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ parse(input, lang, baseIRI, out);
+ }
+ });
+ t.start();
+
+ return it;
}
}
Added: jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedQuadsStream.java
URL: http://svn.apache.org/viewvc/jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedQuadsStream.java?rev=1439731&view=auto
==============================================================================
--- jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedQuadsStream.java (added)
+++ jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedQuadsStream.java Tue Jan 29 01:48:07 2013
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.riot.lang ;
+
+import org.apache.jena.atlas.lib.Tuple ;
+import org.apache.jena.riot.system.StreamRDF ;
+
+import com.hp.hpl.jena.graph.Node ;
+import com.hp.hpl.jena.graph.Triple ;
+import com.hp.hpl.jena.sparql.core.Quad ;
+
+/**
+ * Implementation of a producer class that sends Quads; must be connected to a {@code PipedRDFIterator<Quad>}.
+ */
+public class PipedQuadsStream extends PipedRDFStream<Quad> implements StreamRDF
+{
+ /**
+ * Creates a piped quads stream connected to the specified piped
+ * RDF iterator. Quads written to this stream will then be
+ * available as input from <code>sink</code>.
+ *
+ * @param sink The piped RDF iterator to connect to.
+ */
+ public PipedQuadsStream(PipedRDFIterator<Quad> sink)
+ {
+ super(sink) ;
+ }
+
+ @Override
+ public void triple(Triple triple)
+ {
+ // Triples are discarded
+ }
+
+ @Override
+ public void quad(Quad quad)
+ {
+ receive(quad) ;
+ }
+
+ @Override
+ public void tuple(Tuple<Node> tuple)
+ {
+ // Tuples are discarded
+ }
+}
Added: jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedRDFIterator.java
URL: http://svn.apache.org/viewvc/jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedRDFIterator.java?rev=1439731&view=auto
==============================================================================
--- jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedRDFIterator.java (added)
+++ jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedRDFIterator.java Tue Jan 29 01:48:07 2013
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.riot.lang ;
+
+import java.util.Iterator ;
+import java.util.NoSuchElementException ;
+import java.util.concurrent.ArrayBlockingQueue ;
+import java.util.concurrent.BlockingQueue ;
+import java.util.concurrent.CancellationException ;
+import java.util.concurrent.TimeUnit ;
+
+import org.apache.jena.atlas.lib.Closeable ;
+import org.apache.jena.riot.RiotException ;
+import org.apache.jena.riot.system.LightweightPrefixMap ;
+import org.apache.jena.riot.system.PrefixMap ;
+
+/**
+ * A {@code PipedRDFIterator} should be connected to a {@link PipedRDFStream} implementation;
+ * the piped iterator then provides whatever RDF primitives are written to the {@code PipedRDFStream}.
+ * Typically, data is read from a {@code PipedRDFIterator} by one thread and data is written
+ * to the corresponding {@code PipedRDFStream} by some other thread. Attempting to use both
+ * objects from a single thread is not recommended, as it may deadlock the thread.
+ * The {@code PipedRDFIterator} contains a buffer, decoupling read operations from write operations,
+ * within limits.
+ * <p/>
+ * Inspired by Java's {@link java.io.PipedInputStream} and {@link java.io.PipedOutputStream}
+ *
+ * @param <T> The type of the RDF primitive, should be one of {@code Triple}, {@code Quad}, or {@code Tuple<Node>}
+ *
+ * @see PipedTriplesStream
+ * @see PipedQuadsStream
+ * @see PipedTuplesStream
+ */
+public class PipedRDFIterator<T> implements Iterator<T>, Closeable
+{
+ /**
+ * Constant for default buffer size
+ */
+ public static final int DEFAULT_BUFFER_SIZE = 10000 ;
+
+ private static final int ITERATOR_POLL_TIMEOUT = 1000 ; // one second
+ private static final TimeUnit ITERATOR_POLL_TIMEUNIT = TimeUnit.MILLISECONDS ;
+
+ private final BlockingQueue<T> queue ;
+
+ @SuppressWarnings("unchecked")
+ private final T endMarker = (T) new Object() ;
+
+ private volatile boolean closedByReader = false ;
+ private volatile boolean closedByWriter = false ;
+ private volatile Thread readSide ;
+ private volatile Thread writeSide ;
+
+ private boolean connected = false ;
+
+ private T slot ;
+
+ private final Object lock = new Object() ; // protects baseIri and prefixes
+ private String baseIri ;
+ private final LightweightPrefixMap prefixes = new PrefixMap() ;
+
+
+ /**
+ * Creates a new piped RDF iterator with the default buffer size of {@code DEFAULT_BUFFER_SIZE}.
+ * <p>
+ * Buffer size must be chosen carefully in order to avoid performance
+ * problems, if you set the buffer size too low you will experience a lot of
+ * blocked calls so it will take longer to consume the data from the
+ * iterator. For best performance the buffer size should be at least 10% of
+ * the expected input size though you may need to tune this depending on how
+ * fast your consumer thread is.
+ * </p>
+ */
+ public PipedRDFIterator()
+ {
+ this(DEFAULT_BUFFER_SIZE);
+ }
+
+ /**
+ * Creates a new piped RDF iterator
+ * <p>
+ * Buffer size must be chosen carefully in order to avoid performance
+ * problems, if you set the buffer size too low you will experience a lot of
+ * blocked calls so it will take longer to consume the data from the
+ * iterator. For best performance the buffer size should be at least 10% of
+ * the expected input size though you may need to tune this depending on how
+ * fast your consumer thread is.
+ * </p>
+ *
+ * @param bufferSize
+ * Buffer size
+ */
+ public PipedRDFIterator(int bufferSize)
+ {
+ this.queue = new ArrayBlockingQueue<T>(bufferSize) ;
+ }
+
+ /**
+ * Creates a new piped RDF iterator
+ * <p>
+ * Buffer size must be chosen carefully in order to avoid performance
+ * problems, if you set the buffer size too low you will experience a lot of
+ * blocked calls so it will take longer to consume the data from the
+ * iterator. For best performance the buffer size should be at least 10% of
+ * the expected input size though you may need to tune this depending on how
+ * fast your consumer thread is.
+ * </p>
+ * <p>
+ * The fair parameter controls whether the locking policy used for the
+ * buffer is fair. When enabled this reduces throughput but also reduces the
+ * chance of thread starvation.
+ * </p>
+ *
+ * @param bufferSize
+ * Buffer size
+ * @param fair
+ * Whether the buffer should use a fair locking policy
+ */
+ public PipedRDFIterator(int bufferSize, boolean fair)
+ {
+ this.queue = new ArrayBlockingQueue<T>(bufferSize, fair) ;
+ }
+
+ @Override
+ public boolean hasNext()
+ {
+ if (!connected)
+ throw new IllegalStateException("Pipe not connected");
+
+ if (closedByReader)
+ throw new RiotException("Pipe closed");
+
+ readSide = Thread.currentThread();
+
+ if (slot != null)
+ return true ;
+ while (true)
+ {
+ try
+ {
+ slot = queue.poll(ITERATOR_POLL_TIMEOUT, ITERATOR_POLL_TIMEUNIT) ;
+ }
+ catch (InterruptedException e)
+ {
+ throw new CancellationException() ;
+ }
+
+ if (null != slot)
+ break ;
+
+ // If the producer thread died and did not call finish() then declare this pipe to be "broken"
+ // Since check is after the break, we will drain as much as possible out of the queue before throwing this exception
+ if (writeSide != null && !writeSide.isAlive() && !closedByWriter)
+ {
+ closedByReader = true ;
+ throw new RiotException("Write end dead") ;
+ }
+ }
+
+ if (slot == endMarker)
+ {
+ closedByReader = true ;
+ slot = null ;
+ return false ;
+ }
+ return true ;
+ }
+
+ @Override
+ public T next()
+ {
+ if (!hasNext())
+ throw new NoSuchElementException() ;
+ T item = slot ;
+ slot = null ;
+ return item ;
+ }
+
+ @Override
+ public void remove()
+ {
+ throw new UnsupportedOperationException() ;
+ }
+
+ private void checkStateForReceive()
+ {
+ if (closedByWriter || closedByReader)
+ {
+ throw new RiotException("Pipe closed") ;
+ }
+ else if (readSide != null && !readSide.isAlive())
+ {
+ throw new RiotException("Read end dead") ;
+ }
+ }
+
+ protected void connect()
+ {
+ this.connected = true;
+ }
+
+ protected void receive(T t)
+ {
+ checkStateForReceive();
+ writeSide = Thread.currentThread() ;
+
+ try
+ {
+ queue.put(t) ;
+ }
+ catch (InterruptedException e)
+ {
+ throw new CancellationException() ;
+ }
+ }
+
+ protected void base(String base)
+ {
+ synchronized (lock)
+ {
+ this.baseIri = base ;
+ }
+ }
+
+ /**
+ * Gets the most recently seen Base IRI
+ *
+ * @return Base IRI
+ */
+ public String getBaseIri()
+ {
+ synchronized (lock)
+ {
+ return baseIri ;
+ }
+ }
+
+ protected void prefix(String prefix, String iri)
+ {
+ synchronized (lock)
+ {
+ prefixes.add(prefix, iri) ;
+ }
+ }
+
+ /**
+ * Gets the prefix map which contains the prefixes seen so far in the stream
+ *
+ * @return Prefix Map
+ */
+ public LightweightPrefixMap getPrefixes()
+ {
+ synchronized (lock)
+ {
+ // Need to return a copy since PrefixMap is not concurrent
+ return new PrefixMap(prefixes) ;
+ }
+ }
+
+ protected void start()
+ {
+ // Do nothing
+ }
+
+ // Called by the producer
+ protected void finish()
+ {
+ receive(endMarker);
+ closedByWriter = true;
+ }
+
+ // Called by the consumer
+ @Override
+ public void close()
+ {
+ closedByReader = true ;
+ }
+}
Added: jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedRDFStream.java
URL: http://svn.apache.org/viewvc/jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedRDFStream.java?rev=1439731&view=auto
==============================================================================
--- jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedRDFStream.java (added)
+++ jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedRDFStream.java Tue Jan 29 01:48:07 2013
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.riot.lang ;
+
+import org.apache.jena.riot.system.StreamRDF ;
+
+/**
+ * Abstract implementation of a producer class that implements {@code StreamRDF};
+ * use one of the concrete implementations that match the RDF primitive you are using.
+ *
+ * @see PipedTriplesStream
+ * @see PipedQuadsStream
+ * @see PipedTuplesStream
+ */
+public abstract class PipedRDFStream<T> implements StreamRDF
+{
+ private final PipedRDFIterator<T> sink ;
+
+ protected PipedRDFStream(PipedRDFIterator<T> sink)
+ {
+ this.sink = sink ;
+ this.sink.connect();
+ }
+
+ protected void receive(T t)
+ {
+ sink.receive(t) ;
+ }
+
+ @Override
+ public void base(String base)
+ {
+ sink.base(base) ;
+ }
+
+ @Override
+ public void prefix(String prefix, String iri)
+ {
+ sink.prefix(prefix, iri) ;
+ }
+
+ @Override
+ public void start()
+ {
+ sink.start() ;
+ }
+
+ @Override
+ public void finish()
+ {
+ sink.finish() ;
+ }
+}
Added: jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedTriplesStream.java
URL: http://svn.apache.org/viewvc/jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedTriplesStream.java?rev=1439731&view=auto
==============================================================================
--- jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedTriplesStream.java (added)
+++ jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedTriplesStream.java Tue Jan 29 01:48:07 2013
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.riot.lang ;
+
+import org.apache.jena.atlas.lib.Tuple ;
+import org.apache.jena.riot.system.StreamRDF ;
+
+import com.hp.hpl.jena.graph.Node ;
+import com.hp.hpl.jena.graph.Triple ;
+import com.hp.hpl.jena.sparql.core.Quad ;
+
+/**
+ * Implementation of a producer class that sends Triples; must be connected to a {@code PipedRDFIterator<Triple>}.
+ */
+public class PipedTriplesStream extends PipedRDFStream<Triple> implements StreamRDF
+{
+ /**
+ * Creates a piped triples stream connected to the specified piped
+ * RDF iterator. Triples written to this stream will then be
+ * available as input from <code>sink</code>.
+ *
+ * @param sink The piped RDF iterator to connect to.
+ */
+ public PipedTriplesStream(PipedRDFIterator<Triple> sink)
+ {
+ super(sink) ;
+ }
+
+ @Override
+ public void triple(Triple triple)
+ {
+ receive(triple) ;
+ }
+
+ @Override
+ public void quad(Quad quad)
+ {
+ // Quads are discarded
+ }
+
+ @Override
+ public void tuple(Tuple<Node> tuple)
+ {
+ // Tuples are discarded
+ }
+}
Added: jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedTuplesStream.java
URL: http://svn.apache.org/viewvc/jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedTuplesStream.java?rev=1439731&view=auto
==============================================================================
--- jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedTuplesStream.java (added)
+++ jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedTuplesStream.java Tue Jan 29 01:48:07 2013
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.riot.lang ;
+
+import org.apache.jena.atlas.lib.Tuple ;
+import org.apache.jena.riot.system.StreamRDF ;
+
+import com.hp.hpl.jena.graph.Node ;
+import com.hp.hpl.jena.graph.Triple ;
+import com.hp.hpl.jena.sparql.core.Quad ;
+
+/**
+ * Implementation of a producer class that sends @{code Tuple<Node>}; must be connected to a {@code PipedRDFIterator<Tuple<Node>}.
+ */
+public class PipedTuplesStream extends PipedRDFStream<Tuple<Node>> implements StreamRDF
+{
+ /**
+ * Creates a piped tuples stream connected to the specified piped
+ * RDF iterator. Tuples written to this stream will then be
+ * available as input from <code>sink</code>.
+ *
+ * @param sink The piped RDF iterator to connect to.
+ */
+ public PipedTuplesStream(PipedRDFIterator<Tuple<Node>> sink)
+ {
+ super(sink) ;
+ }
+
+ @Override
+ public void triple(Triple triple)
+ {
+ // Triples are discarded
+ }
+
+ @Override
+ public void quad(Quad quad)
+ {
+ // Quads are discarded
+ }
+
+ @Override
+ public void tuple(Tuple<Node> tuple)
+ {
+ receive(tuple) ;
+ }
+}
Modified: jena/trunk/jena-arq/src/main/java/org/openjena/riot/RiotReader.java
URL: http://svn.apache.org/viewvc/jena/trunk/jena-arq/src/main/java/org/openjena/riot/RiotReader.java?rev=1439731&r1=1439730&r2=1439731&view=diff
==============================================================================
--- jena/trunk/jena-arq/src/main/java/org/openjena/riot/RiotReader.java (original)
+++ jena/trunk/jena-arq/src/main/java/org/openjena/riot/RiotReader.java Tue Jan 29 01:48:07 2013
@@ -32,8 +32,21 @@ import org.apache.jena.atlas.lib.Sink ;
import org.apache.jena.riot.Lang ;
import org.apache.jena.riot.RDFDataMgr ;
import org.apache.jena.riot.RDFLanguages ;
-import org.apache.jena.riot.lang.* ;
-import org.apache.jena.riot.system.* ;
+import org.apache.jena.riot.lang.LangNQuads ;
+import org.apache.jena.riot.lang.LangNTriples ;
+import org.apache.jena.riot.lang.LangRDFJSON ;
+import org.apache.jena.riot.lang.LangRDFXML ;
+import org.apache.jena.riot.lang.LangRIOT ;
+import org.apache.jena.riot.lang.LangTriG ;
+import org.apache.jena.riot.lang.LangTurtle ;
+import org.apache.jena.riot.lang.PipedQuadsStream ;
+import org.apache.jena.riot.lang.PipedRDFIterator ;
+import org.apache.jena.riot.lang.PipedTriplesStream ;
+import org.apache.jena.riot.system.ErrorHandlerFactory ;
+import org.apache.jena.riot.system.IRIResolver ;
+import org.apache.jena.riot.system.RiotLib ;
+import org.apache.jena.riot.system.StreamRDF ;
+import org.apache.jena.riot.system.StreamRDFLib ;
import org.apache.jena.riot.tokens.Tokenizer ;
import org.apache.jena.riot.tokens.TokenizerFactory ;
@@ -198,8 +211,7 @@ public class RiotReader
return org.apache.jena.riot.RiotReader.createParser(tokenizer, lang, baseIRI, dest) ;
}
- // TODO create a Tokenizer version of this method
- public static Iterator<Triple> createIteratorTriples(InputStream input, Lang lang, String baseIRI)
+ public static Iterator<Triple> createIteratorTriples(final InputStream input, final Lang lang, final String baseIRI)
{
// Special case N-Triples, because the RIOT reader has a pull interface
if ( RDFLanguages.sameLang(RDFLanguages.NTRIPLES, lang) )
@@ -209,9 +221,20 @@ public class RiotReader
else
{
// Otherwise, we have to spin up a thread to deal with it
- RiotTripleParsePuller parsePuller = new RiotTripleParsePuller(input, lang, baseIRI);
- parsePuller.parse();
- return parsePuller;
+ final PipedRDFIterator<Triple> it = new PipedRDFIterator<Triple>();
+ final PipedTriplesStream out = new PipedTriplesStream(it);
+
+ Thread t = new Thread(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ parseTriples(input, lang, baseIRI, out);
+ }
+ });
+ t.start();
+
+ return it;
}
}
@@ -235,8 +258,7 @@ public class RiotReader
return createParserTriples(tokenizer, lang, baseIRI, dest) ;
}
- // TODO create a Tokenizer version of this method
- public static Iterator<Quad> createIteratorQuads(InputStream input, Lang lang, String baseIRI)
+ public static Iterator<Quad> createIteratorQuads(final InputStream input, final Lang lang, final String baseIRI)
{
// Special case N-Quads, because the RIOT reader has a pull interface
if ( RDFLanguages.sameLang(RDFLanguages.NTRIPLES, lang) )
@@ -246,9 +268,20 @@ public class RiotReader
else
{
// Otherwise, we have to spin up a thread to deal with it
- RiotQuadParsePuller parsePuller = new RiotQuadParsePuller(input, lang, baseIRI);
- parsePuller.parse();
- return parsePuller;
+ final PipedRDFIterator<Quad> it = new PipedRDFIterator<Quad>();
+ final PipedQuadsStream out = new PipedQuadsStream(it);
+
+ Thread t = new Thread(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ parseQuads(input, lang, baseIRI, out);
+ }
+ });
+ t.start();
+
+ return it;
}
}
Modified: jena/trunk/jena-arq/src/test/java/org/apache/jena/riot/TS_ReaderRIOT.java
URL: http://svn.apache.org/viewvc/jena/trunk/jena-arq/src/test/java/org/apache/jena/riot/TS_ReaderRIOT.java?rev=1439731&r1=1439730&r2=1439731&view=diff
==============================================================================
--- jena/trunk/jena-arq/src/test/java/org/apache/jena/riot/TS_ReaderRIOT.java (original)
+++ jena/trunk/jena-arq/src/test/java/org/apache/jena/riot/TS_ReaderRIOT.java Tue Jan 29 01:48:07 2013
@@ -31,6 +31,7 @@ import org.junit.runners.Suite.SuiteClas
, TestStreamManager.class
, TestJenaReaderRIOT.class
, TestReadData.class
+ , TestRiotReader.class
})
public class TS_ReaderRIOT
Added: jena/trunk/jena-arq/src/test/java/org/apache/jena/riot/TestRiotReader.java
URL: http://svn.apache.org/viewvc/jena/trunk/jena-arq/src/test/java/org/apache/jena/riot/TestRiotReader.java?rev=1439731&view=auto
==============================================================================
--- jena/trunk/jena-arq/src/test/java/org/apache/jena/riot/TestRiotReader.java (added)
+++ jena/trunk/jena-arq/src/test/java/org/apache/jena/riot/TestRiotReader.java Tue Jan 29 01:48:07 2013
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.riot;
+
+import static org.junit.Assert.assertEquals ;
+import static org.junit.Assert.assertFalse ;
+import static org.junit.Assert.assertNotNull ;
+import static org.junit.Assert.assertTrue ;
+
+import java.io.ByteArrayInputStream ;
+import java.util.Iterator ;
+
+import org.apache.jena.atlas.lib.StrUtils ;
+import org.junit.Test ;
+
+import com.hp.hpl.jena.graph.Triple ;
+
+public class TestRiotReader
+{
+ @Test
+ public void testCreateIteratorTriples_01()
+ {
+ Iterator<Triple> it = RiotReader.createIteratorTriples(new ByteArrayInputStream("".getBytes()), RDFLanguages.NTRIPLES, "http://example/");
+
+ assertFalse(it.hasNext());
+ }
+
+ @Test
+ public void testCreateIteratorTriples_02()
+ {
+ String x = StrUtils.strjoinNL(
+ "<rdf:RDF",
+ " xmlns:rdf=\"http://www.w3.org/1999/02/22-rdf-syntax-ns#\"",
+ " xmlns:j.0=\"http://example/\">" ,
+ " <rdf:Description rdf:about=\"http://example/s\">" ,
+ " <j.0:p rdf:resource=\"http://example/o\"/>" ,
+ " </rdf:Description>" ,
+ "</rdf:RDF>") ;
+
+ Iterator<Triple> it = RiotReader.createIteratorTriples(new ByteArrayInputStream(x.getBytes()), RDFLanguages.RDFXML, "http://example/");
+
+ assertTrue(it.hasNext());
+ Triple t = it.next();
+ assertNotNull(t);
+ assertEquals("http://example/s", t.getSubject().getURI());
+ assertEquals("http://example/p", t.getPredicate().getURI());
+ assertEquals("http://example/o", t.getObject().getURI());
+
+ assertFalse(it.hasNext());
+ }
+}
Added: jena/trunk/jena-arq/src/test/java/org/apache/jena/riot/lang/TestPipedRDFIterators.java
URL: http://svn.apache.org/viewvc/jena/trunk/jena-arq/src/test/java/org/apache/jena/riot/lang/TestPipedRDFIterators.java?rev=1439731&view=auto
==============================================================================
--- jena/trunk/jena-arq/src/test/java/org/apache/jena/riot/lang/TestPipedRDFIterators.java (added)
+++ jena/trunk/jena-arq/src/test/java/org/apache/jena/riot/lang/TestPipedRDFIterators.java Tue Jan 29 01:48:07 2013
@@ -0,0 +1,670 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.riot.lang;
+
+import java.io.ByteArrayInputStream ;
+import java.nio.charset.Charset ;
+import java.util.concurrent.Callable ;
+import java.util.concurrent.ExecutionException ;
+import java.util.concurrent.ExecutorService ;
+import java.util.concurrent.Executors ;
+import java.util.concurrent.Future ;
+import java.util.concurrent.TimeUnit ;
+import java.util.concurrent.TimeoutException ;
+
+import junit.framework.Assert ;
+
+import org.apache.jena.atlas.lib.Tuple ;
+import org.apache.jena.riot.RDFDataMgr ;
+import org.apache.jena.riot.RDFLanguages ;
+import org.apache.jena.riot.RiotException ;
+import org.junit.AfterClass ;
+import org.junit.BeforeClass ;
+import org.junit.Test ;
+import org.slf4j.Logger ;
+import org.slf4j.LoggerFactory ;
+
+import com.hp.hpl.jena.graph.Node ;
+import com.hp.hpl.jena.graph.Triple ;
+import com.hp.hpl.jena.sparql.core.Quad ;
+import com.hp.hpl.jena.sparql.util.NodeFactory ;
+
+/**
+ * Tests for the {@link PipedRDFIterator} implementation
+ *
+ */
+public class TestPipedRDFIterators {
+ private static final Logger LOGGER = LoggerFactory.getLogger(TestPipedRDFIterators.class);
+
+ private static ExecutorService executor;
+
+ /**
+ * Create our thread pool
+ */
+ @BeforeClass
+ public static void setup() {
+ // We use far more than the required 2 threads to avoid intermittent
+ // deadlock issues
+ // that can otherwise occur
+ executor = Executors.newFixedThreadPool(10);
+ }
+
+ /**
+ * Destroy our thread pool
+ *
+ * @throws InterruptedException
+ */
+ @AfterClass
+ public static void teardown() throws InterruptedException {
+ executor.shutdownNow();
+ executor.awaitTermination(10, TimeUnit.SECONDS);
+ }
+
+ private void test_streamed_triples(int bufferSize, final int generateSize, boolean fair) throws InterruptedException,
+ ExecutionException, TimeoutException {
+
+ final PipedRDFIterator<Triple> it = new PipedRDFIterator<Triple>(bufferSize, fair);
+ final PipedTriplesStream out = new PipedTriplesStream(it);
+
+ // Create a runnable that will generate triples
+ Runnable genTriples = new Runnable() {
+
+ @Override
+ public void run() {
+ out.start();
+ // Generate triples
+ for (int i = 1; i <= generateSize; i++) {
+ Triple t = new Triple(Node.createAnon(), Node.createURI("http://predicate"), NodeFactory.intToNode(i));
+ out.triple(t);
+ }
+ out.finish();
+ return;
+ }
+ };
+
+ // Create a runnable that will consume triples
+ Callable<Integer> consumeTriples = new Callable<Integer>() {
+
+ @Override
+ public Integer call() throws Exception {
+ int count = 0;
+ while (it.hasNext()) {
+ it.next();
+ count++;
+ }
+ return count;
+ }
+ };
+
+ // Run the threads
+ Future<?> genResult = executor.submit(genTriples);
+ Future<Integer> result = executor.submit(consumeTriples);
+ Integer count = 0;
+ try {
+ count = result.get(5, TimeUnit.SECONDS);
+ } catch (TimeoutException e) {
+ // Check that it wasn't the producer thread erroring that caused us
+ // to time out
+ genResult.get();
+ // It wasn't so throw the original error
+ throw e;
+ }
+ Assert.assertEquals(generateSize, (int) count);
+ }
+
+ /**
+ * Test that blocking and waiting work nicely
+ *
+ * @throws ExecutionException
+ * @throws InterruptedException
+ * @throws TimeoutException
+ */
+ @Test
+ public void streamed_triples_iterator_01() throws InterruptedException, ExecutionException, TimeoutException {
+ // Buffer size is tiny
+ this.test_streamed_triples(1, 100, true);
+ }
+
+ /**
+ * Test that blocking and waiting work nicely
+ *
+ * @throws ExecutionException
+ * @throws InterruptedException
+ * @throws TimeoutException
+ */
+ @Test
+ public void streamed_triples_iterator_02() throws InterruptedException, ExecutionException, TimeoutException {
+ // Buffer size is much smaller than generated triples
+ this.test_streamed_triples(10, 1000, false);
+ }
+
+ /**
+ * Test that blocking and waiting work nicely
+ *
+ * @throws ExecutionException
+ * @throws InterruptedException
+ * @throws TimeoutException
+ */
+ @Test
+ public void streamed_triples_iterator_03() throws InterruptedException, ExecutionException, TimeoutException {
+ // Buffer size is smaller than generated triples
+ this.test_streamed_triples(100, 1000, false);
+ }
+
+ /**
+ * Test where blocking should rarely occur
+ *
+ * @throws ExecutionException
+ * @throws InterruptedException
+ * @throws TimeoutException
+ */
+ @Test
+ public void streamed_triples_iterator_04() throws InterruptedException, ExecutionException, TimeoutException {
+ // Buffer size is same as generated triples
+ this.test_streamed_triples(1000, 1000, false);
+ }
+
+ /**
+ * Test where blocking should rarely occur
+ *
+ * @throws ExecutionException
+ * @throws InterruptedException
+ * @throws TimeoutException
+ */
+ @Test
+ public void streamed_triples_iterator_05() throws InterruptedException, ExecutionException, TimeoutException {
+ // Buffer size is much larger than generated triples
+ this.test_streamed_triples(10000, 1000, false);
+ }
+
+ /**
+ * Test where blocking may occur
+ *
+ * @throws ExecutionException
+ * @throws InterruptedException
+ * @throws TimeoutException
+ */
+ @Test
+ public void streamed_triples_iterator_06() throws InterruptedException, ExecutionException, TimeoutException {
+ // Buffer size is small relative to generated triples
+ this.test_streamed_triples(1000, 100000, false);
+ }
+
+ /**
+ * Test where blocking may occur
+ *
+ * @throws ExecutionException
+ * @throws InterruptedException
+ * @throws TimeoutException
+ */
+ @Test
+ public void streamed_triples_iterator_07() throws InterruptedException, ExecutionException, TimeoutException {
+ // Buffer size is small relative to generated triples
+ this.test_streamed_triples(10000, 100000, false);
+ }
+
+ private void test_streamed_quads(int bufferSize, final int generateSize, boolean fair) throws InterruptedException,
+ ExecutionException, TimeoutException {
+
+ final PipedRDFIterator<Quad> it = new PipedRDFIterator<Quad>(bufferSize, fair);
+ final PipedQuadsStream out = new PipedQuadsStream(it);
+
+ // Create a runnable that will generate quads
+ Runnable genQuads = new Runnable() {
+
+ @Override
+ public void run() {
+ out.start();
+ // Generate quads
+ for (int i = 1; i <= generateSize; i++) {
+ Quad q = new Quad(Node.createURI("http://graph"), Node.createAnon(), Node.createURI("http://predicate"),
+ NodeFactory.intToNode(i));
+ out.quad(q);
+ }
+ out.finish();
+ return;
+ }
+ };
+
+ // Create a runnable that will consume quads
+ Callable<Integer> consumeQuads = new Callable<Integer>() {
+
+ @Override
+ public Integer call() throws Exception {
+ int count = 0;
+ while (it.hasNext()) {
+ it.next();
+ count++;
+ }
+ return count;
+ }
+ };
+
+ // Run the threads
+ Future<?> genResult = executor.submit(genQuads);
+ Future<Integer> result = executor.submit(consumeQuads);
+ Integer count = 0;
+ try {
+ count = result.get(5, TimeUnit.SECONDS);
+ } catch (TimeoutException e) {
+ // Check that it wasn't the producer thread erroring that caused us
+ // to time out
+ genResult.get();
+ // It wasn't so throw the original error
+ throw e;
+ }
+ Assert.assertEquals(generateSize, (int) count);
+ }
+
+ /**
+ * Test that blocking and waiting work nicely
+ *
+ * @throws ExecutionException
+ * @throws InterruptedException
+ * @throws TimeoutException
+ */
+ @Test
+ public void streamed_quads_iterator_01() throws InterruptedException, ExecutionException, TimeoutException {
+ // Buffer size is tiny
+ this.test_streamed_quads(1, 100, true);
+ }
+
+ /**
+ * Test that blocking and waiting work nicely
+ *
+ * @throws ExecutionException
+ * @throws InterruptedException
+ * @throws TimeoutException
+ */
+ @Test
+ public void streamed_quads_iterator_02() throws InterruptedException, ExecutionException, TimeoutException {
+ // Buffer size is much smaller than generated quads
+ this.test_streamed_quads(10, 1000, false);
+ }
+
+ /**
+ * Test that blocking and waiting work nicely
+ *
+ * @throws ExecutionException
+ * @throws InterruptedException
+ * @throws TimeoutException
+ */
+ @Test
+ public void streamed_quads_iterator_03() throws InterruptedException, ExecutionException, TimeoutException {
+ // Buffer size is smaller than generated quads
+ this.test_streamed_quads(100, 1000, false);
+ }
+
+ /**
+ * Test where blocking should rarely occur
+ *
+ * @throws ExecutionException
+ * @throws InterruptedException
+ * @throws TimeoutException
+ */
+ @Test
+ public void streamed_quads_iterator_04() throws InterruptedException, ExecutionException, TimeoutException {
+ // Buffer size is same as generated quads
+ this.test_streamed_quads(1000, 1000, false);
+ }
+
+ /**
+ * Test where blocking should rarely occur
+ *
+ * @throws ExecutionException
+ * @throws InterruptedException
+ * @throws TimeoutException
+ */
+ @Test
+ public void streamed_quads_iterator_05() throws InterruptedException, ExecutionException, TimeoutException {
+ // Buffer size is much larger than generated quads
+ this.test_streamed_quads(10000, 1000, false);
+ }
+
+ /**
+ * Test where blocking may occur
+ *
+ * @throws ExecutionException
+ * @throws InterruptedException
+ * @throws TimeoutException
+ */
+ @Test
+ public void streamed_quads_iterator_06() throws InterruptedException, ExecutionException, TimeoutException {
+ // Buffer size is small relative to generated quads
+ this.test_streamed_quads(1000, 100000, false);
+ }
+
+ /**
+ * Test where blocking may occur
+ *
+ * @throws ExecutionException
+ * @throws InterruptedException
+ * @throws TimeoutException
+ */
+ @Test
+ public void streamed_quads_iterator_07() throws InterruptedException, ExecutionException, TimeoutException {
+ // Buffer size is small relative to generated quads
+ this.test_streamed_quads(10000, 100000, false);
+ }
+
+ private void test_streamed_tuples(int bufferSize, final int generateSize, boolean fair) throws InterruptedException,
+ ExecutionException, TimeoutException {
+
+ final PipedRDFIterator<Tuple<Node>> it = new PipedRDFIterator<Tuple<Node>>();
+ final PipedTuplesStream out = new PipedTuplesStream(it);
+
+ // Create a runnable that will generate tuples
+ Runnable genQuads = new Runnable() {
+
+ @Override
+ public void run() {
+ out.start();
+ // Generate tuples
+ for (int i = 1; i <= generateSize; i++) {
+ Tuple<Node> t = Tuple.create(Node.createURI("http://graph"), Node.createAnon(),
+ Node.createURI("http://predicate"), NodeFactory.intToNode(i));
+ out.tuple(t);
+ }
+ out.finish();
+ return;
+ }
+ };
+
+ // Create a runnable that will consume tuples
+ Callable<Integer> consumeQuads = new Callable<Integer>() {
+
+ @Override
+ public Integer call() throws Exception {
+ int count = 0;
+ while (it.hasNext()) {
+ it.next();
+ count++;
+ }
+ return count;
+ }
+ };
+
+ // Run the threads
+ Future<?> genResult = executor.submit(genQuads);
+ Future<Integer> result = executor.submit(consumeQuads);
+ Integer count = 0;
+ try {
+ count = result.get(5, TimeUnit.SECONDS);
+ } catch (TimeoutException e) {
+ // Check that it wasn't the producer thread erroring that caused us
+ // to time out
+ genResult.get();
+ // It wasn't so throw the original error
+ throw e;
+ }
+ Assert.assertEquals(generateSize, (int) count);
+ }
+
+ /**
+ * Test that blocking and waiting work nicely
+ *
+ * @throws ExecutionException
+ * @throws InterruptedException
+ * @throws TimeoutException
+ */
+ @Test
+ public void streamed_tuples_iterator_01() throws InterruptedException, ExecutionException, TimeoutException {
+ // Buffer size is tiny
+ this.test_streamed_tuples(1, 100, true);
+ }
+
+ /**
+ * Test that blocking and waiting work nicely
+ *
+ * @throws ExecutionException
+ * @throws InterruptedException
+ * @throws TimeoutException
+ */
+ @Test
+ public void streamed_tuples_iterator_02() throws InterruptedException, ExecutionException, TimeoutException {
+ // Buffer size is much smaller than generated tuples
+ this.test_streamed_tuples(10, 1000, false);
+ }
+
+ /**
+ * Test that blocking and waiting work nicely
+ *
+ * @throws ExecutionException
+ * @throws InterruptedException
+ * @throws TimeoutException
+ */
+ @Test
+ public void streamed_tuples_iterator_03() throws InterruptedException, ExecutionException, TimeoutException {
+ // Buffer size is smaller than generated tuples
+ this.test_streamed_tuples(100, 1000, false);
+ }
+
+ /**
+ * Test where blocking should rarely occur
+ *
+ * @throws ExecutionException
+ * @throws InterruptedException
+ * @throws TimeoutException
+ */
+ @Test
+ public void streamed_tuples_iterator_04() throws InterruptedException, ExecutionException, TimeoutException {
+ // Buffer size is same as generated tuples
+ this.test_streamed_tuples(1000, 1000, false);
+ }
+
+ /**
+ * Test where blocking should rarely occur
+ *
+ * @throws ExecutionException
+ * @throws InterruptedException
+ * @throws TimeoutException
+ */
+ @Test
+ public void streamed_tuples_iterator_05() throws InterruptedException, ExecutionException, TimeoutException {
+ // Buffer size is much larger than generated tuples
+ this.test_streamed_tuples(10000, 1000, false);
+ }
+
+ /**
+ * Test where blocking may occur
+ *
+ * @throws ExecutionException
+ * @throws InterruptedException
+ * @throws TimeoutException
+ */
+ @Test
+ public void streamed_tuples_iterator_06() throws InterruptedException, ExecutionException, TimeoutException {
+ // Buffer size is small relative to generated tuples
+ this.test_streamed_tuples(1000, 100000, false);
+ }
+
+ /**
+ * Test where blocking may occur
+ *
+ * @throws ExecutionException
+ * @throws InterruptedException
+ * @throws TimeoutException
+ */
+ @Test
+ public void streamed_tuples_iterator_07() throws InterruptedException, ExecutionException, TimeoutException {
+ // Buffer size is small relative to generated tuples
+ this.test_streamed_tuples(10000, 100000, false);
+ }
+
+ /**
+ * Test for bad buffer size
+ */
+ @Test(expected = IllegalArgumentException.class)
+ public void streamed_instantiation_bad_01() {
+ new PipedRDFIterator<Triple>(0);
+ }
+
+ /**
+ * Test for bad buffer size
+ */
+ @Test(expected = IllegalArgumentException.class)
+ public void streamed_instantiation_bad_02() {
+ new PipedRDFIterator<Triple>(-1);
+ }
+
+ /**
+ * Tests that the iterate copes correctly in the case of hitting a parser
+ * error
+ *
+ * @param data
+ * Data string (Turtle format) which should be malformed
+ * @param expected
+ * Number of valid triples expected to be generated before the
+ * error is hit
+ * @throws TimeoutException
+ * @throws InterruptedException
+ */
+ private void test_streamed_triples_bad(final String data, int expected) throws TimeoutException, InterruptedException {
+
+
+ final PipedRDFIterator<Triple> it = new PipedRDFIterator<Triple>();
+ final PipedTriplesStream out = new PipedTriplesStream(it);
+
+ // Create a runnable that will try to parse the bad data
+ Runnable runParser = new Runnable() {
+
+ @Override
+ public void run() {
+ Charset utf8 = Charset.forName("utf8");
+ ByteArrayInputStream input = new ByteArrayInputStream(data.getBytes(utf8));
+ try {
+ RDFDataMgr.parse(out, input, null, RDFLanguages.TURTLE, null);
+ } catch (Throwable t) {
+ // Ignore the error
+ }
+ return;
+ }
+ };
+
+ // Create a runnable that will consume triples
+ Callable<Integer> consumeTriples = new Callable<Integer>() {
+
+ @Override
+ public Integer call() throws Exception {
+ int count = 0;
+ while (it.hasNext()) {
+ it.next();
+ count++;
+ }
+ return count;
+ }
+ };
+
+ // Run the threads
+ Future<?> genResult = executor.submit(runParser);
+ Future<Integer> result = executor.submit(consumeTriples);
+ Integer count = 0;
+ try {
+ count = result.get(5, TimeUnit.SECONDS);
+ } catch (TimeoutException e) {
+ // We expect the producer thread to have errored
+ try {
+ genResult.get();
+ } catch (ExecutionException ex) {
+ // This is as expected, ignore
+ LOGGER.warn("Errored as expected", ex);
+ }
+ // However we expect the consumer to still have been notified of
+ // failure
+ throw e;
+ } catch (ExecutionException e) {
+ // This was not expected
+ Assert.fail(e.getMessage());
+ }
+
+ // Since the produce thread failed the consumer thread should give the
+ // expected count
+ Assert.assertEquals(expected, (int) count);
+ }
+
+ /**
+ * Test failure of the iterator
+ *
+ * @throws TimeoutException
+ * @throws InterruptedException
+ */
+ @Test
+ public void streamed_triples_bad_01() throws TimeoutException, InterruptedException {
+ test_streamed_triples_bad("@prefix : <http://unterminated", 0);
+ }
+
+ /**
+ * Test failure of the iterator
+ *
+ * @throws TimeoutException
+ * @throws InterruptedException
+ */
+ @Test
+ public void streamed_triples_bad_02() throws TimeoutException, InterruptedException {
+ test_streamed_triples_bad("@prefix : <http://example> . :s :p :o . :x :y", 1);
+ }
+
+ /**
+ * Tests attempting to access the iterator before the stream has been connected
+ */
+ @Test(expected = IllegalStateException.class)
+ public void streamed_state_bad_01() {
+ PipedRDFIterator<Triple> it = new PipedRDFIterator<Triple>();
+ it.hasNext();
+ }
+
+ /**
+ * Tests attempting to access the iterator after the producer dies
+ */
+ @Test(expected = RiotException.class)
+ public void streamed_state_bad_02() {
+
+ final PipedRDFIterator<Triple> it = new PipedRDFIterator<Triple>();
+ final PipedTriplesStream out = new PipedTriplesStream(it);
+
+ Thread t = new Thread(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ out.start();
+ out.triple(Triple.create(Node.createURI("urn:s"), Node.createURI("urn:p"), Node.createURI("urn:o")));
+ throw new RuntimeException("die!");
+ }
+ });
+
+ // Because this is a unit test, set an exception handler to suppress the normal printing of the stacktrace to stderr
+ t.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler()
+ {
+ @Override
+ public void uncaughtException(Thread t, Throwable e)
+ {
+ // Do nothing
+ }
+ });
+
+ t.start();
+
+ Assert.assertTrue(it.hasNext());
+ it.next();
+
+ // Should throw a RiotException
+ it.hasNext();
+ }
+}