You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by rv...@apache.org on 2013/01/28 18:01:43 UTC

svn commit: r1439498 - in /jena/trunk/jena-arq/src: main/java/org/apache/jena/riot/lang/StreamedRDFIterator.java test/java/org/apache/jena/riot/lang/TestStreamedRDFIterators.java

Author: rvesse
Date: Mon Jan 28 17:01:43 2013
New Revision: 1439498

URL: http://svn.apache.org/viewvc?rev=1439498&view=rev
Log:
Refactor StreamedRDFIterator to use blocking polling in favour of incremental back off retry with non-blocking polling.  Improve tests to use semaphore to avoid thread contention issues leading to failing test runs

Modified:
    jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/lang/StreamedRDFIterator.java
    jena/trunk/jena-arq/src/test/java/org/apache/jena/riot/lang/TestStreamedRDFIterators.java

Modified: jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/lang/StreamedRDFIterator.java
URL: http://svn.apache.org/viewvc/jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/lang/StreamedRDFIterator.java?rev=1439498&r1=1439497&r2=1439498&view=diff
==============================================================================
--- jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/lang/StreamedRDFIterator.java (original)
+++ jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/lang/StreamedRDFIterator.java Mon Jan 28 17:01:43 2013
@@ -21,6 +21,7 @@ package org.apache.jena.riot.lang;
 import java.util.NoSuchElementException;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean ;
 
 import org.apache.jena.riot.system.LightweightPrefixMap;
@@ -104,40 +105,24 @@ public abstract class StreamedRDFIterato
      * Helper method for actually getting the next element
      */
     private void getNext() {
-        // Use an incremental back off retry strategy to avoid starving the
-        // producer thread which is trying to insert into the queue we
-        // are attempting to consume
-        // We keep a maximum on the back off amount as otherwise we can
-        // wait unduly long for new elements to be available
-
-        // The initial back off starts at 0 so we just retry immediately after
-        // the first failure
-        int backoff = 0;
-
         while (this.next == null) {
             // We use poll() in favour of the blocking take() as otherwise we
             // can hit a deadlock if we reach this line before finish() is
             // called whereby we would block indefinitely and never be able to
             // continue
-            this.next = buffer.poll();
+            try {
+                // We wait for a short amount of time, if we still get a null
+                // anyway we'll go around the loop and try again unless the
+                // finished flag is set
+                this.next = buffer.poll(25, TimeUnit.MILLISECONDS);
+            } catch (InterruptedException e1) {
+                // Ignore and continue
+            }
 
             // As soon as we see the finished signal has been set stop
             // waiting for more elements
             if (this.finished.get())
                 break;
-
-            // Back off
-            if (backoff > 0) {
-                try {
-                    Thread.sleep(backoff);
-                } catch (InterruptedException e) {
-                    // Ignore and continue
-                }
-            }
-            // Increment back off up to maximum
-            if (backoff < 5) {
-                backoff++;
-            }
         }
     }
 

Modified: jena/trunk/jena-arq/src/test/java/org/apache/jena/riot/lang/TestStreamedRDFIterators.java
URL: http://svn.apache.org/viewvc/jena/trunk/jena-arq/src/test/java/org/apache/jena/riot/lang/TestStreamedRDFIterators.java?rev=1439498&r1=1439497&r2=1439498&view=diff
==============================================================================
--- jena/trunk/jena-arq/src/test/java/org/apache/jena/riot/lang/TestStreamedRDFIterators.java (original)
+++ jena/trunk/jena-arq/src/test/java/org/apache/jena/riot/lang/TestStreamedRDFIterators.java Mon Jan 28 17:01:43 2013
@@ -20,7 +20,7 @@ package org.apache.jena.riot.lang;
 
 import java.io.ByteArrayInputStream;
 import java.nio.charset.Charset;
-import java.util.concurrent.* ;
+import java.util.concurrent.*;
 
 import junit.framework.Assert;
 
@@ -44,7 +44,7 @@ import com.hp.hpl.jena.sparql.util.NodeF
  */
 public class TestStreamedRDFIterators {
     private static final Logger LOGGER = LoggerFactory.getLogger(TestStreamedRDFIterators.class);
-    
+
     private static ExecutorService executor;
 
     /**
@@ -74,8 +74,8 @@ public class TestStreamedRDFIterators {
 
         // Create the StreamedTriplesIterator with a small buffer
         final StreamedTriplesIterator stream = new StreamedTriplesIterator(bufferSize, fair);
-        
-        final Semaphore initialization = new Semaphore(0) ;
+
+        final Semaphore initialization = new Semaphore(0);
 
         // Create a runnable that will generate triples
         Runnable genTriples = new Runnable() {
@@ -83,7 +83,7 @@ public class TestStreamedRDFIterators {
             @Override
             public void run() {
                 stream.start();
-                initialization.release() ;      // Let the consumer start 
+                initialization.release(); // Let the consumer start
                 // Generate triples
                 for (int i = 1; i <= generateSize; i++) {
                     Triple t = new Triple(Node.createAnon(), Node.createURI("http://predicate"), NodeFactory.intToNode(i));
@@ -99,7 +99,7 @@ public class TestStreamedRDFIterators {
 
             @Override
             public Integer call() throws Exception {
-                initialization.acquire() ;  // Wait for producer.
+                initialization.acquire(); // Wait for producer.
                 int count = 0;
                 while (stream.hasNext()) {
                     stream.next();
@@ -221,12 +221,15 @@ public class TestStreamedRDFIterators {
         // Create the StreamedQuadsIterator
         final StreamedQuadsIterator stream = new StreamedQuadsIterator(bufferSize, fair);
 
+        final Semaphore initialization = new Semaphore(0);
+
         // Create a runnable that will generate quads
         Runnable genQuads = new Runnable() {
 
             @Override
             public void run() {
                 stream.start();
+                initialization.release();
                 // Generate quads
                 for (int i = 1; i <= generateSize; i++) {
                     Quad q = new Quad(Node.createURI("http://graph"), Node.createAnon(), Node.createURI("http://predicate"),
@@ -243,12 +246,7 @@ public class TestStreamedRDFIterators {
 
             @Override
             public Integer call() throws Exception {
-                int waits = 0;
-                while (!stream.canIterate()) {
-                    Thread.sleep(250);
-                    waits++;
-                    if (waits == 4) throw new Exception("Iterate failed to be ready in a timely fashion");
-                }
+                initialization.acquire();
                 int count = 0;
                 while (stream.hasNext()) {
                     stream.next();
@@ -370,12 +368,15 @@ public class TestStreamedRDFIterators {
         // Create the StreamedTuplesIterator
         final StreamedTuplesIterator stream = new StreamedTuplesIterator(bufferSize, fair);
 
+        final Semaphore initialization = new Semaphore(0);
+
         // Create a runnable that will generate tuples
         Runnable genQuads = new Runnable() {
 
             @Override
             public void run() {
                 stream.start();
+                initialization.release();
                 // Generate tuples
                 for (int i = 1; i <= generateSize; i++) {
                     Tuple<Node> t = Tuple.create(Node.createURI("http://graph"), Node.createAnon(),
@@ -392,12 +393,7 @@ public class TestStreamedRDFIterators {
 
             @Override
             public Integer call() throws Exception {
-                int waits = 0;
-                while (!stream.canIterate()) {
-                    Thread.sleep(250);
-                    waits++;
-                    if (waits == 4) throw new Exception("Iterate failed to be ready in a timely fashion");
-                }
+                initialization.acquire();
                 int count = 0;
                 while (stream.hasNext()) {
                     stream.next();
@@ -531,9 +527,14 @@ public class TestStreamedRDFIterators {
     }
 
     /**
-     * Tests that the iterate copes correctly in the case of hitting a parser error
-     * @param data Data string (Turtle format) which should be malformed
-     * @param expected Number of valid triples expected to be generated before the error is hit
+     * Tests that the iterate copes correctly in the case of hitting a parser
+     * error
+     * 
+     * @param data
+     *            Data string (Turtle format) which should be malformed
+     * @param expected
+     *            Number of valid triples expected to be generated before the
+     *            error is hit
      * @throws TimeoutException
      * @throws InterruptedException
      */
@@ -542,6 +543,8 @@ public class TestStreamedRDFIterators {
         // Create the StreamedTriplesIterator
         final StreamedTriplesIterator stream = new StreamedTriplesIterator();
 
+        final Semaphore initialization = new Semaphore(0);
+
         // Create a runnable that will try to parse the bad data
         Runnable runParser = new Runnable() {
 
@@ -549,7 +552,12 @@ public class TestStreamedRDFIterators {
             public void run() {
                 Charset utf8 = Charset.forName("utf8");
                 ByteArrayInputStream input = new ByteArrayInputStream(data.getBytes(utf8));
-                RDFDataMgr.parse(stream, input, null, RDFLanguages.TURTLE, null);
+                try {
+                    RDFDataMgr.parse(stream, input, null, RDFLanguages.TURTLE, null);
+                } catch (Throwable t) {
+                    // Ignore the error
+                }
+                initialization.release();
                 return;
             }
         };
@@ -559,12 +567,7 @@ public class TestStreamedRDFIterators {
 
             @Override
             public Integer call() throws Exception {
-                int waits = 0;
-                while (!stream.canIterate()) {
-                    Thread.sleep(250);
-                    waits++;
-                    if (waits == 4) throw new Exception("Iterate failed to be ready in a timely fashion");
-                }
+                initialization.acquire();
                 int count = 0;
                 while (stream.hasNext()) {
                     stream.next();
@@ -585,22 +588,25 @@ public class TestStreamedRDFIterators {
             try {
                 genResult.get();
             } catch (ExecutionException ex) {
-                //This is as expected, ignore
+                // This is as expected, ignore
                 LOGGER.warn("Errored as expected", ex);
             }
-            // However we expect the consumer to still have been notified of failure
+            // However we expect the consumer to still have been notified of
+            // failure
             throw e;
         } catch (ExecutionException e) {
-            //This was not expected
+            // This was not expected
             Assert.fail(e.getMessage());
         }
-        
-        //Since the produce thread failed the consumer thread should give the expected count
+
+        // Since the produce thread failed the consumer thread should give the
+        // expected count
         Assert.assertEquals(expected, (int) count);
     }
-    
+
     /**
      * Test failure of the iterator
+     * 
      * @throws TimeoutException
      * @throws InterruptedException
      */
@@ -608,9 +614,10 @@ public class TestStreamedRDFIterators {
     public void streamed_triples_bad_01() throws TimeoutException, InterruptedException {
         test_streamed_triples_bad("@prefix : <http://unterminated", 0);
     }
-    
+
     /**
      * Test failure of the iterator
+     * 
      * @throws TimeoutException
      * @throws InterruptedException
      */
@@ -618,20 +625,22 @@ public class TestStreamedRDFIterators {
     public void streamed_triples_bad_02() throws TimeoutException, InterruptedException {
         test_streamed_triples_bad("@prefix : <http://example> . :s :p :o . :x :y", 1);
     }
-    
+
     /**
-     * Tests attempting to access the iterator before the stream has been started
+     * Tests attempting to access the iterator before the stream has been
+     * started
      */
-    @Test(expected=IllegalStateException.class)
+    @Test(expected = IllegalStateException.class)
     public void streamed_state_bad_01() {
         StreamedTriplesIterator iter = new StreamedTriplesIterator();
         iter.hasNext();
     }
-    
+
     /**
-     * Tests attempting to access the iterator before the stream has been started
+     * Tests attempting to access the iterator before the stream has been
+     * started
      */
-    @Test(expected=IllegalStateException.class)
+    @Test(expected = IllegalStateException.class)
     public void streamed_state_bad_02() {
         StreamedTriplesIterator iter = new StreamedTriplesIterator();
         iter.next();