You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by rv...@apache.org on 2013/01/25 12:05:43 UTC
svn commit: r1438452 - in /jena/trunk/jena-arq/src:
main/java/org/apache/jena/riot/lang/StreamedRDFIterator.java
test/java/org/apache/jena/riot/lang/TestStreamedRDFIterators.java
Author: rvesse
Date: Fri Jan 25 11:05:43 2013
New Revision: 1438452
URL: http://svn.apache.org/viewvc?rev=1438452&view=rev
Log:
StreamedRDFIterator now tracks base URI, also tracks whether anyone actually called start() so if the user tries to iterate before that happens they get an IllegalStateException rather than a thread hang
Modified:
jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/lang/StreamedRDFIterator.java
jena/trunk/jena-arq/src/test/java/org/apache/jena/riot/lang/TestStreamedRDFIterators.java
Modified: jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/lang/StreamedRDFIterator.java
URL: http://svn.apache.org/viewvc/jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/lang/StreamedRDFIterator.java?rev=1438452&r1=1438451&r2=1438452&view=diff
==============================================================================
--- jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/lang/StreamedRDFIterator.java (original)
+++ jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/lang/StreamedRDFIterator.java Fri Jan 25 11:05:43 2013
@@ -43,9 +43,10 @@ import org.apache.jena.riot.system.Strea
public abstract class StreamedRDFIterator<T> implements RDFParserOutputIterator<T> {
protected BlockingQueue<T> buffer;
- private boolean finished = false;
+ private boolean started = false, finished = false;
private T next = null;
private PrefixMap prefixes = new PrefixMap();
+ private String baseIri;
/**
* Constant for default buffer size
@@ -77,9 +78,16 @@ public abstract class StreamedRDFIterato
this.buffer = new ArrayBlockingQueue<T>(bufferSize, fair);
}
+ /**
+ * Returns whether further elements are available
+ * @return True if more elements are available, false otherwise
+ * @exception IllegalStateException Thrown if you try to read from the iterator before the stream has had the {@link #start()} method called on it
+ */
@Override
public final boolean hasNext() {
- if (this.next != null) {
+ if (!this.started) {
+ throw new IllegalStateException("Tried to read from iterator before the Stream was started, please ensure that a producer thread has called start() on the stream before attempting to iterate over it");
+ } else if (this.next != null) {
return true;
} else if (this.finished && buffer.isEmpty()) {
return false;
@@ -89,6 +97,9 @@ public abstract class StreamedRDFIterato
}
}
+ /**
+ * Helper method for actually getting the next element
+ */
private void getNext() {
// Use an incremental back off retry strategy to avoid starving the
// producer thread which is trying to insert into the queue we
@@ -127,9 +138,17 @@ public abstract class StreamedRDFIterato
}
}
+ /**
+ * Gets the next element from the iterator
+ * @return Next element
+ * @exception IllegalStateException Thrown if you try to iterate before the stream has had the {@link #start()} method called
+ * @exception NoSuchElementException Thrown if there are no further elements
+ */
@Override
public final T next() {
- if (this.next != null) {
+ if (!this.started) {
+ throw new IllegalStateException("Tried to read from iterator before the Stream was started, please ensure that a producer thread has called start() on the stream before attempting to iterate over it");
+ } else if (this.next != null) {
T t = this.next;
this.next = null;
return t;
@@ -147,17 +166,28 @@ public abstract class StreamedRDFIterato
}
@Override
- public void start() {
- // No-op
+ public final void start() {
+ if (this.started) {
+ throw new IllegalStateException("A StreamedRDFIterator is not reusable, please create a new instance");
+ }
+ this.started = true;
}
@Override
- public void base(String base) {
- // Base URIs are ignored
+ public final void base(String base) {
+ this.baseIri = base;
+ }
+
+ /**
+ * Gets the most recently seen Base IRI
+ * @return Base IRI
+ */
+ public String getBaseIri() {
+ return this.baseIri;
}
@Override
- public void prefix(String prefix, String iri) {
+ public final void prefix(String prefix, String iri) {
this.prefixes.add(prefix, iri);
}
Modified: jena/trunk/jena-arq/src/test/java/org/apache/jena/riot/lang/TestStreamedRDFIterators.java
URL: http://svn.apache.org/viewvc/jena/trunk/jena-arq/src/test/java/org/apache/jena/riot/lang/TestStreamedRDFIterators.java?rev=1438452&r1=1438451&r2=1438452&view=diff
==============================================================================
--- jena/trunk/jena-arq/src/test/java/org/apache/jena/riot/lang/TestStreamedRDFIterators.java (original)
+++ jena/trunk/jena-arq/src/test/java/org/apache/jena/riot/lang/TestStreamedRDFIterators.java Fri Jan 25 11:05:43 2013
@@ -19,7 +19,6 @@
package org.apache.jena.riot.lang;
import java.io.ByteArrayInputStream;
-import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
@@ -32,7 +31,6 @@ import java.util.concurrent.TimeoutExcep
import junit.framework.Assert;
import org.apache.jena.atlas.lib.Tuple;
-import org.apache.jena.riot.Lang;
import org.apache.jena.riot.RDFDataMgr;
import org.apache.jena.riot.RDFLanguages;
import org.junit.AfterClass;
@@ -562,6 +560,8 @@ public class TestStreamedRDFIterators {
// Run the threads
Future<?> genResult = executor.submit(runParser);
+ //Need to insert a sleep as otherwise we can see the IllegalStateException
+ Thread.sleep(250);
Future<Integer> result = executor.submit(consumeTriples);
Integer count = 0;
try {
@@ -604,4 +604,22 @@ public class TestStreamedRDFIterators {
public void streamed_triples_bad_02() throws TimeoutException, InterruptedException {
test_streamed_triples_bad("@prefix : <http://example> . :s :p :o . :x :y", 1);
}
+
+ /**
+ * Tests attempting to access the iterator before the stream has been started
+ */
+ @Test(expected=IllegalStateException.class)
+ public void streamed_state_bad_01() {
+ StreamedTriplesIterator iter = new StreamedTriplesIterator();
+ iter.hasNext();
+ }
+
+ /**
+ * Tests attempting to access the iterator before the stream has been started
+ */
+ @Test(expected=IllegalStateException.class)
+ public void streamed_state_bad_02() {
+ StreamedTriplesIterator iter = new StreamedTriplesIterator();
+ iter.next();
+ }
}