You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by rv...@apache.org on 2013/01/28 18:01:43 UTC
svn commit: r1439498 - in /jena/trunk/jena-arq/src:
main/java/org/apache/jena/riot/lang/StreamedRDFIterator.java
test/java/org/apache/jena/riot/lang/TestStreamedRDFIterators.java
Author: rvesse
Date: Mon Jan 28 17:01:43 2013
New Revision: 1439498
URL: http://svn.apache.org/viewvc?rev=1439498&view=rev
Log:
Refactor StreamedRDFIterator to use blocking polling in favour of incremental back off retry with non-blocking polling. Improve tests to use semaphore to avoid thread contention issues leading to failing test runs
Modified:
jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/lang/StreamedRDFIterator.java
jena/trunk/jena-arq/src/test/java/org/apache/jena/riot/lang/TestStreamedRDFIterators.java
Modified: jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/lang/StreamedRDFIterator.java
URL: http://svn.apache.org/viewvc/jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/lang/StreamedRDFIterator.java?rev=1439498&r1=1439497&r2=1439498&view=diff
==============================================================================
--- jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/lang/StreamedRDFIterator.java (original)
+++ jena/trunk/jena-arq/src/main/java/org/apache/jena/riot/lang/StreamedRDFIterator.java Mon Jan 28 17:01:43 2013
@@ -21,6 +21,7 @@ package org.apache.jena.riot.lang;
import java.util.NoSuchElementException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean ;
import org.apache.jena.riot.system.LightweightPrefixMap;
@@ -104,40 +105,24 @@ public abstract class StreamedRDFIterato
* Helper method for actually getting the next element
*/
private void getNext() {
- // Use an incremental back off retry strategy to avoid starving the
- // producer thread which is trying to insert into the queue we
- // are attempting to consume
- // We keep a maximum on the back off amount as otherwise we can
- // wait unduly long for new elements to be available
-
- // The initial back off starts at 0 so we just retry immediately after
- // the first failure
- int backoff = 0;
-
while (this.next == null) {
// We use poll() in favour of the blocking take() as otherwise we
// can hit a deadlock if we reach this line before finish() is
// called whereby we would block indefinitely and never be able to
// continue
- this.next = buffer.poll();
+ try {
+ // We wait for a short amount of time, if we still get a null
+ // anyway we'll go around the loop and try again unless the
+ // finished flag is set
+ this.next = buffer.poll(25, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e1) {
+ // Ignore and continue
+ }
// As soon as we see the finished signal has been set stop
// waiting for more elements
if (this.finished.get())
break;
-
- // Back off
- if (backoff > 0) {
- try {
- Thread.sleep(backoff);
- } catch (InterruptedException e) {
- // Ignore and continue
- }
- }
- // Increment back off up to maximum
- if (backoff < 5) {
- backoff++;
- }
}
}
Modified: jena/trunk/jena-arq/src/test/java/org/apache/jena/riot/lang/TestStreamedRDFIterators.java
URL: http://svn.apache.org/viewvc/jena/trunk/jena-arq/src/test/java/org/apache/jena/riot/lang/TestStreamedRDFIterators.java?rev=1439498&r1=1439497&r2=1439498&view=diff
==============================================================================
--- jena/trunk/jena-arq/src/test/java/org/apache/jena/riot/lang/TestStreamedRDFIterators.java (original)
+++ jena/trunk/jena-arq/src/test/java/org/apache/jena/riot/lang/TestStreamedRDFIterators.java Mon Jan 28 17:01:43 2013
@@ -20,7 +20,7 @@ package org.apache.jena.riot.lang;
import java.io.ByteArrayInputStream;
import java.nio.charset.Charset;
-import java.util.concurrent.* ;
+import java.util.concurrent.*;
import junit.framework.Assert;
@@ -44,7 +44,7 @@ import com.hp.hpl.jena.sparql.util.NodeF
*/
public class TestStreamedRDFIterators {
private static final Logger LOGGER = LoggerFactory.getLogger(TestStreamedRDFIterators.class);
-
+
private static ExecutorService executor;
/**
@@ -74,8 +74,8 @@ public class TestStreamedRDFIterators {
// Create the StreamedTriplesIterator with a small buffer
final StreamedTriplesIterator stream = new StreamedTriplesIterator(bufferSize, fair);
-
- final Semaphore initialization = new Semaphore(0) ;
+
+ final Semaphore initialization = new Semaphore(0);
// Create a runnable that will generate triples
Runnable genTriples = new Runnable() {
@@ -83,7 +83,7 @@ public class TestStreamedRDFIterators {
@Override
public void run() {
stream.start();
- initialization.release() ; // Let the consumer start
+ initialization.release(); // Let the consumer start
// Generate triples
for (int i = 1; i <= generateSize; i++) {
Triple t = new Triple(Node.createAnon(), Node.createURI("http://predicate"), NodeFactory.intToNode(i));
@@ -99,7 +99,7 @@ public class TestStreamedRDFIterators {
@Override
public Integer call() throws Exception {
- initialization.acquire() ; // Wait for producer.
+ initialization.acquire(); // Wait for producer.
int count = 0;
while (stream.hasNext()) {
stream.next();
@@ -221,12 +221,15 @@ public class TestStreamedRDFIterators {
// Create the StreamedQuadsIterator
final StreamedQuadsIterator stream = new StreamedQuadsIterator(bufferSize, fair);
+ final Semaphore initialization = new Semaphore(0);
+
// Create a runnable that will generate quads
Runnable genQuads = new Runnable() {
@Override
public void run() {
stream.start();
+ initialization.release();
// Generate quads
for (int i = 1; i <= generateSize; i++) {
Quad q = new Quad(Node.createURI("http://graph"), Node.createAnon(), Node.createURI("http://predicate"),
@@ -243,12 +246,7 @@ public class TestStreamedRDFIterators {
@Override
public Integer call() throws Exception {
- int waits = 0;
- while (!stream.canIterate()) {
- Thread.sleep(250);
- waits++;
- if (waits == 4) throw new Exception("Iterate failed to be ready in a timely fashion");
- }
+ initialization.acquire();
int count = 0;
while (stream.hasNext()) {
stream.next();
@@ -370,12 +368,15 @@ public class TestStreamedRDFIterators {
// Create the StreamedTuplesIterator
final StreamedTuplesIterator stream = new StreamedTuplesIterator(bufferSize, fair);
+ final Semaphore initialization = new Semaphore(0);
+
// Create a runnable that will generate tuples
Runnable genQuads = new Runnable() {
@Override
public void run() {
stream.start();
+ initialization.release();
// Generate tuples
for (int i = 1; i <= generateSize; i++) {
Tuple<Node> t = Tuple.create(Node.createURI("http://graph"), Node.createAnon(),
@@ -392,12 +393,7 @@ public class TestStreamedRDFIterators {
@Override
public Integer call() throws Exception {
- int waits = 0;
- while (!stream.canIterate()) {
- Thread.sleep(250);
- waits++;
- if (waits == 4) throw new Exception("Iterate failed to be ready in a timely fashion");
- }
+ initialization.acquire();
int count = 0;
while (stream.hasNext()) {
stream.next();
@@ -531,9 +527,14 @@ public class TestStreamedRDFIterators {
}
/**
- * Tests that the iterate copes correctly in the case of hitting a parser error
- * @param data Data string (Turtle format) which should be malformed
- * @param expected Number of valid triples expected to be generated before the error is hit
+ * Tests that the iterate copes correctly in the case of hitting a parser
+ * error
+ *
+ * @param data
+ * Data string (Turtle format) which should be malformed
+ * @param expected
+ * Number of valid triples expected to be generated before the
+ * error is hit
* @throws TimeoutException
* @throws InterruptedException
*/
@@ -542,6 +543,8 @@ public class TestStreamedRDFIterators {
// Create the StreamedTriplesIterator
final StreamedTriplesIterator stream = new StreamedTriplesIterator();
+ final Semaphore initialization = new Semaphore(0);
+
// Create a runnable that will try to parse the bad data
Runnable runParser = new Runnable() {
@@ -549,7 +552,12 @@ public class TestStreamedRDFIterators {
public void run() {
Charset utf8 = Charset.forName("utf8");
ByteArrayInputStream input = new ByteArrayInputStream(data.getBytes(utf8));
- RDFDataMgr.parse(stream, input, null, RDFLanguages.TURTLE, null);
+ try {
+ RDFDataMgr.parse(stream, input, null, RDFLanguages.TURTLE, null);
+ } catch (Throwable t) {
+ // Ignore the error
+ }
+ initialization.release();
return;
}
};
@@ -559,12 +567,7 @@ public class TestStreamedRDFIterators {
@Override
public Integer call() throws Exception {
- int waits = 0;
- while (!stream.canIterate()) {
- Thread.sleep(250);
- waits++;
- if (waits == 4) throw new Exception("Iterate failed to be ready in a timely fashion");
- }
+ initialization.acquire();
int count = 0;
while (stream.hasNext()) {
stream.next();
@@ -585,22 +588,25 @@ public class TestStreamedRDFIterators {
try {
genResult.get();
} catch (ExecutionException ex) {
- //This is as expected, ignore
+ // This is as expected, ignore
LOGGER.warn("Errored as expected", ex);
}
- // However we expect the consumer to still have been notified of failure
+ // However we expect the consumer to still have been notified of
+ // failure
throw e;
} catch (ExecutionException e) {
- //This was not expected
+ // This was not expected
Assert.fail(e.getMessage());
}
-
- //Since the produce thread failed the consumer thread should give the expected count
+
+ // Since the produce thread failed the consumer thread should give the
+ // expected count
Assert.assertEquals(expected, (int) count);
}
-
+
/**
* Test failure of the iterator
+ *
* @throws TimeoutException
* @throws InterruptedException
*/
@@ -608,9 +614,10 @@ public class TestStreamedRDFIterators {
public void streamed_triples_bad_01() throws TimeoutException, InterruptedException {
test_streamed_triples_bad("@prefix : <http://unterminated", 0);
}
-
+
/**
* Test failure of the iterator
+ *
* @throws TimeoutException
* @throws InterruptedException
*/
@@ -618,20 +625,22 @@ public class TestStreamedRDFIterators {
public void streamed_triples_bad_02() throws TimeoutException, InterruptedException {
test_streamed_triples_bad("@prefix : <http://example> . :s :p :o . :x :y", 1);
}
-
+
/**
- * Tests attempting to access the iterator before the stream has been started
+ * Tests attempting to access the iterator before the stream has been
+ * started
*/
- @Test(expected=IllegalStateException.class)
+ @Test(expected = IllegalStateException.class)
public void streamed_state_bad_01() {
StreamedTriplesIterator iter = new StreamedTriplesIterator();
iter.hasNext();
}
-
+
/**
- * Tests attempting to access the iterator before the stream has been started
+ * Tests attempting to access the iterator before the stream has been
+ * started
*/
- @Test(expected=IllegalStateException.class)
+ @Test(expected = IllegalStateException.class)
public void streamed_state_bad_02() {
StreamedTriplesIterator iter = new StreamedTriplesIterator();
iter.next();