You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by rv...@apache.org on 2013/01/25 12:05:43 UTC

svn commit: r1438452 - in /jena/trunk/jena-arq/src: main/java/org/apache/jena/riot/lang/StreamedRDFIterator.java test/java/org/apache/jena/riot/lang/TestStreamedRDFIterators.java

Author: rvesse
Date: Fri Jan 25 11:05:43 2013
New Revision: 1438452

URL: http://svn.apache.org/viewvc?rev=1438452&view=rev
Log:
StreamedRDFIterator now tracks base URI, also tracks whether anyone actually called start() so if the user tries to iterate before that happens they get an IllegalStateException rather than a thread hang

Modified:
    jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/lang/StreamedRDFIterator.java
    jena/trunk/jena-arq/src/test/java/org/apache/jena/riot/lang/TestStreamedRDFIterators.java

Modified: jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/lang/StreamedRDFIterator.java
URL: http://svn.apache.org/viewvc/jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/lang/StreamedRDFIterator.java?rev=1438452&r1=1438451&r2=1438452&view=diff
==============================================================================
--- jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/lang/StreamedRDFIterator.java (original)
+++ jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/lang/StreamedRDFIterator.java Fri Jan 25 11:05:43 2013
@@ -43,9 +43,10 @@ import org.apache.jena.riot.system.Strea
 public abstract class StreamedRDFIterator<T> implements RDFParserOutputIterator<T> {
 
     protected BlockingQueue<T> buffer;
-    private boolean finished = false;
+    private boolean started = false, finished = false;
     private T next = null;
     private PrefixMap prefixes = new PrefixMap();
+    private String baseIri;
 
     /**
      * Constant for default buffer size
@@ -77,9 +78,16 @@ public abstract class StreamedRDFIterato
         this.buffer = new ArrayBlockingQueue<T>(bufferSize, fair);
     }
 
+    /**
+     * Returns whether further elements are available
+     * @return True if more elements are available, false otherwise
+     * @exception IllegalStateException Thrown if you try to read from the iterator before the stream has had the {@link #start()} method called on it
+     */
     @Override
     public final boolean hasNext() {
-        if (this.next != null) {
+        if (!this.started) {
+            throw new IllegalStateException("Tried to read from iterator before the Stream was started, please ensure that a producer thread has called start() on the stream before attempting to iterate over it");
+        } else if (this.next != null) {
             return true;
         } else if (this.finished && buffer.isEmpty()) {
             return false;
@@ -89,6 +97,9 @@ public abstract class StreamedRDFIterato
         }
     }
 
+    /**
+     * Helper method for actually getting the next element
+     */
     private void getNext() {
         // Use an incremental back off retry strategy to avoid starving the
         // producer thread which is trying to insert into the queue we
@@ -127,9 +138,17 @@ public abstract class StreamedRDFIterato
         }
     }
 
+    /**
+     * Gets the next element from the iterator
+     * @return Next element
+     * @exception IllegalStateException Thrown if you try to iterate before the stream has had the {@link #start()} method called
+     * @exception NoSuchElementException Thrown if there are no further elements
+     */
     @Override
     public final T next() {
-        if (this.next != null) {
+        if (!this.started) {
+            throw new IllegalStateException("Tried to read from iterator before the Stream was started, please ensure that a producer thread has called start() on the stream before attempting to iterate over it");
+        } else if (this.next != null) {
             T t = this.next;
             this.next = null;
             return t;
@@ -147,17 +166,28 @@ public abstract class StreamedRDFIterato
     }
 
     @Override
-    public void start() {
-        // No-op
+    public final void start() {
+        if (this.started) {
+            throw new IllegalStateException("A StreamedRDFIterator is not reusable, please create a new instance");
+        }
+        this.started = true;
     }
 
     @Override
-    public void base(String base) {
-        // Base URIs are ignored
+    public final void base(String base) {
+        this.baseIri = base;
+    }
+    
+    /**
+     * Gets the most recently seen Base IRI
+     * @return Base IRI
+     */
+    public String getBaseIri() {
+        return this.baseIri;
     }
 
     @Override
-    public void prefix(String prefix, String iri) {
+    public final void prefix(String prefix, String iri) {
         this.prefixes.add(prefix, iri);
     }
     

Modified: jena/trunk/jena-arq/src/test/java/org/apache/jena/riot/lang/TestStreamedRDFIterators.java
URL: http://svn.apache.org/viewvc/jena/trunk/jena-arq/src/test/java/org/apache/jena/riot/lang/TestStreamedRDFIterators.java?rev=1438452&r1=1438451&r2=1438452&view=diff
==============================================================================
--- jena/trunk/jena-arq/src/test/java/org/apache/jena/riot/lang/TestStreamedRDFIterators.java (original)
+++ jena/trunk/jena-arq/src/test/java/org/apache/jena/riot/lang/TestStreamedRDFIterators.java Fri Jan 25 11:05:43 2013
@@ -19,7 +19,6 @@
 package org.apache.jena.riot.lang;
 
 import java.io.ByteArrayInputStream;
-import java.io.InputStream;
 import java.nio.charset.Charset;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
@@ -32,7 +31,6 @@ import java.util.concurrent.TimeoutExcep
 import junit.framework.Assert;
 
 import org.apache.jena.atlas.lib.Tuple;
-import org.apache.jena.riot.Lang;
 import org.apache.jena.riot.RDFDataMgr;
 import org.apache.jena.riot.RDFLanguages;
 import org.junit.AfterClass;
@@ -562,6 +560,8 @@ public class TestStreamedRDFIterators {
 
         // Run the threads
         Future<?> genResult = executor.submit(runParser);
+        //Need to insert a sleep as otherwise we can see the IllegalStateException
+        Thread.sleep(250);
         Future<Integer> result = executor.submit(consumeTriples);
         Integer count = 0;
         try {
@@ -604,4 +604,22 @@ public class TestStreamedRDFIterators {
     public void streamed_triples_bad_02() throws TimeoutException, InterruptedException {
         test_streamed_triples_bad("@prefix : <http://example> . :s :p :o . :x :y", 1);
     }
+    
+    /**
+     * Tests attempting to access the iterator before the stream has been started
+     */
+    @Test(expected=IllegalStateException.class)
+    public void streamed_state_bad_01() {
+        StreamedTriplesIterator iter = new StreamedTriplesIterator();
+        iter.hasNext();
+    }
+    
+    /**
+     * Tests attempting to access the iterator before the stream has been started
+     */
+    @Test(expected=IllegalStateException.class)
+    public void streamed_state_bad_02() {
+        StreamedTriplesIterator iter = new StreamedTriplesIterator();
+        iter.next();
+    }
 }